X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/f6b32c14f4c8680d2a6b7a4d71758e76ca2c3554..60893c6cc6b08504d06aaa2e4eee5df29a479da1:/src/redis.c diff --git a/src/redis.c b/src/redis.c index 7b023dc5..64df9073 100644 --- a/src/redis.c +++ b/src/redis.c @@ -852,6 +852,8 @@ void createSharedObjects(void) { shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); shared.del = createStringObject("DEL",3); + shared.rpop = createStringObject("RPOP",4); + shared.lpop = createStringObject("LPOP",4); for (j = 0; j < REDIS_SHARED_INTEGERS; j++) { shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j); shared.integers[j]->encoding = REDIS_ENCODING_INT; @@ -962,6 +964,7 @@ void initServerConfig() { populateCommandTable(); server.delCommand = lookupCommandByCString("del"); server.multiCommand = lookupCommandByCString("multi"); + server.lpushCommand = lookupCommandByCString("lpush"); /* Slow log */ server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN; @@ -1160,6 +1163,43 @@ void resetCommandTableStats(void) { } } +/* ========================== Redis OP Array API ============================ */ + +void redisOpArrayInit(redisOpArray *oa) { + oa->ops = NULL; + oa->numops = 0; +} + +int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid, + robj **argv, int argc, int target) +{ + redisOp *op; + + oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1)); + op = oa->ops+oa->numops; + op->cmd = cmd; + op->dbid = dbid; + op->argv = argv; + op->argc = argc; + op->target = target; + oa->numops++; + return oa->numops; +} + +void redisOpArrayFree(redisOpArray *oa) { + while(oa->numops) { + int j; + redisOp *op; + + oa->numops--; + op = oa->ops+oa->numops; + for (j = 0; j < op->argc; j++) + decrRefCount(op->argv[j]); + zfree(op->argv); + } + zfree(oa->ops); +} + /* ====================== Commands lookup and execution ===================== */ struct redisCommand *lookupCommand(sds name) { @@ -1175,10 +1215,42 @@ struct redisCommand *lookupCommandByCString(char *s) { return cmd; } +/* Propagate the specified command (in the context of the specified database id) + * to AOF, Slaves and Monitors. + * + * flags are an xor between: + * + REDIS_PROPAGATE_NONE (no propagation of command at all) + * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled) + * + REDIS_PROPAGATE_REPL (propagate into the replication link) + */ +void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, + int flags) +{ + if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF) + feedAppendOnlyFile(cmd,dbid,argv,argc); + if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves)) + replicationFeedSlaves(server.slaves,dbid,argv,argc); +} + +/* Used inside commands to schedule the propagation of additional commands + * after the current command is propagated to AOF / Replication. */ +void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, + int target) +{ + redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target); +} + /* Call() is the core of Redis execution of a command */ void call(redisClient *c, int flags) { long long dirty, start = ustime(), duration; + /* Sent the command to clients in MONITOR mode, only if the commands are + * not geneated from reading an AOF. */ + if (listLength(server.monitors) && !server.loading) + replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc); + + /* Call the command. */ + redisOpArrayInit(&server.also_propagate); dirty = server.dirty; c->cmd->proc(c); dirty = server.dirty-dirty; @@ -1189,20 +1261,37 @@ void call(redisClient *c, int flags) { if (server.loading && c->flags & REDIS_LUA_CLIENT) flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS); + /* Log the command into the Slow log if needed, and populate the + * per-command statistics that we show in INFO commandstats. */ if (flags & REDIS_CALL_SLOWLOG) slowlogPushEntryIfNeeded(c->argv,c->argc,duration); if (flags & REDIS_CALL_STATS) { c->cmd->microseconds += duration; c->cmd->calls++; } + + /* Propagate the command into the AOF and replication link */ if (flags & REDIS_CALL_PROPAGATE) { - if (server.aof_state != REDIS_AOF_OFF && dirty > 0) - feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc); - if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) && - listLength(server.slaves)) - replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc); - if (listLength(server.monitors)) - replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc); + int flags = REDIS_PROPAGATE_NONE; + + if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) + flags |= REDIS_PROPAGATE_REPL; + if (dirty) + flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF); + if (flags != REDIS_PROPAGATE_NONE) + propagate(c->cmd,c->db->id,c->argv,c->argc,flags); + } + /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional + * PUSH command. */ + if (server.also_propagate.numops) { + int j; + redisOp *rop; + + for (j = 0; j < server.also_propagate.numops; j++) { + rop = &server.also_propagate.ops[j]; + propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target); + } + redisOpArrayFree(&server.also_propagate); } server.stat_numcommands++; } @@ -1802,8 +1891,8 @@ int freeMemoryIfNeeded(void) { size_t mem_used, mem_tofree, mem_freed; int slaves = listLength(server.slaves); - /* Remove the size of slaves output buffers from the count of used - * memory. */ + /* Remove the size of slaves output buffers and AOF buffer from the + * count of used memory. */ mem_used = zmalloc_used_memory(); if (slaves) { listIter li; @@ -1819,6 +1908,10 @@ int freeMemoryIfNeeded(void) { mem_used -= obuf_bytes; } } + if (server.aof_state != REDIS_AOF_OFF) { + mem_used -= sdslen(server.aof_buf); + mem_used -= sdslen(server.aof_rewrite_buf); + } /* Check if we are over the memory limit. */ if (mem_used <= server.maxmemory) return REDIS_OK; @@ -1828,7 +1921,6 @@ int freeMemoryIfNeeded(void) { /* Compute how much memory we need to free. */ mem_tofree = mem_used - server.maxmemory; - printf("USED: %zu, TOFREE: %zu\n", mem_used, mem_tofree); mem_freed = 0; while (mem_freed < mem_tofree) { int j, k, keys_freed = 0; @@ -1919,7 +2011,6 @@ int freeMemoryIfNeeded(void) { delta = (long long) zmalloc_used_memory(); dbDelete(db,keyobj); delta -= (long long) zmalloc_used_memory(); - // printf("%lld\n",delta); mem_freed += delta; server.stat_evictedkeys++; decrRefCount(keyobj); @@ -1929,27 +2020,7 @@ int freeMemoryIfNeeded(void) { * start spending so much time here that is impossible to * deliver data to the slaves fast enough, so we force the * transmission here inside the loop. */ - if (slaves) { - listIter li; - listNode *ln; - - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - redisClient *slave = listNodeValue(ln); - int events; - - events = aeGetFileEvents(server.el,slave->fd); - printf("EVENTS: %d\n", events); - if (events & AE_WRITABLE && - slave->replstate == REDIS_REPL_ONLINE && - listLength(slave->reply)) - { - printf("SLAVE %d -> %d\n", - slave->fd, (int) listLength(slave->reply)); - sendReplyToClient(server.el,slave->fd,slave,0); - } - } - } + if (slaves) flushSlavesOutputBuffers(); } } if (!keys_freed) return REDIS_ERR; /* nothing to free... */