From eeb34eff52eb77ff387ea7b316b157aa4337bb7f Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 28 Feb 2012 18:03:08 +0100 Subject: [PATCH] Added a new API to replicate an additional command after the replication of the currently executed command, in order to propagte the LPUSH originating from RPOPLPUSH and indirectly by BRPOPLPUSH. --- src/redis.c | 26 ++++++++++++++++++++++++++ src/redis.h | 16 +++++++++++++++- src/t_list.c | 51 ++++++++++++++++++++++++++++++--------------------- 3 files changed, 71 insertions(+), 22 deletions(-) diff --git a/src/redis.c b/src/redis.c index 647c5867..2eed4703 100644 --- a/src/redis.c +++ b/src/redis.c @@ -962,6 +962,7 @@ void initServerConfig() { populateCommandTable(); server.delCommand = lookupCommandByCString("del"); server.multiCommand = lookupCommandByCString("multi"); + server.lpushCommand = lookupCommandByCString("lpush"); /* Slow log */ server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN; @@ -1192,6 +1193,20 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, replicationFeedSlaves(server.slaves,dbid,argv,argc); } +/* Used inside commands to propatate an additional command if needed. */ +void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, + int target) +{ + propagatedItem *pi = &server.also_propagate; + + redisAssert(pi->target == REDIS_PROPAGATE_NONE); + pi->cmd = cmd; + pi->dbid = dbid; + pi->argv = argv; + pi->argc = argc; + pi->target = target; +} + /* Call() is the core of Redis execution of a command */ void call(redisClient *c, int flags) { long long dirty, start = ustime(), duration; @@ -1202,6 +1217,7 @@ void call(redisClient *c, int flags) { replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc); /* Call the command. */ + server.also_propagate.target = REDIS_PROPAGATE_NONE; dirty = server.dirty; c->cmd->proc(c); dirty = server.dirty-dirty; @@ -1232,6 +1248,16 @@ void call(redisClient *c, int flags) { if (flags != REDIS_PROPAGATE_NONE) propagate(c->cmd,c->db->id,c->argv,c->argc,flags); } + /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional + * PUSH command. */ + if (server.also_propagate.target != REDIS_PROPAGATE_NONE) { + int j; + propagatedItem *pi = &server.also_propagate; + + propagate(pi->cmd, pi->dbid, pi->argv, pi->argc, pi->target); + for (j = 0; j < pi->argc; j++) decrRefCount(pi->argv[j]); + zfree(pi->argv); + } server.stat_numcommands++; } diff --git a/src/redis.h b/src/redis.h index f1e407b1..09a746cc 100644 --- a/src/redis.h +++ b/src/redis.h @@ -399,6 +399,17 @@ typedef struct clientBufferLimitsConfig { time_t soft_limit_seconds; } clientBufferLimitsConfig; +/* Currently only used to additionally propagate more commands to AOF/Replication + * after the propagation of the executed command. + * The structure contains everything needed to propagate a command: + * argv and argc, the ID of the database, pointer to the command table entry, + * and finally the target, that is an xor between REDIS_PROPAGATE_* flags. */ +typedef struct propagatedItem { + robj **argv; + int argc, dbid, target; + struct redisCommand *cmd; +} propagatedItem; + /*----------------------------------------------------------------------------- * Redis cluster data structures *----------------------------------------------------------------------------*/ @@ -562,7 +573,7 @@ struct redisServer { off_t loading_loaded_bytes; time_t loading_start_time; /* Fast pointers to often looked up command */ - struct redisCommand *delCommand, *multiCommand; + struct redisCommand *delCommand, *multiCommand, *lpushCommand; int cronloops; /* Number of times the cron function run */ time_t lastsave; /* Unix time of last save succeeede */ /* Fields used only for stats */ @@ -612,6 +623,8 @@ struct redisServer { int saveparamslen; /* Number of saving points */ char *rdb_filename; /* Name of RDB file */ int rdb_compression; /* Use compression in RDB? */ + /* Propagation of commands in AOF / replication */ + propagatedItem also_propagate; /* Additional command to propagate. */ /* Logging */ char *logfile; /* Path of log file */ int syslog_enabled; /* Is syslog enabled? */ @@ -956,6 +969,7 @@ struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommandByCString(char *s); void call(redisClient *c, int flags); void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags); +void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target); int prepareForShutdown(); void redisLog(int level, const char *fmt, ...); void redisLogRaw(int level, const char *msg); diff --git a/src/t_list.c b/src/t_list.c index f0c6790e..636c556c 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -288,6 +288,18 @@ void pushGenericCommand(redisClient *c, int where) { addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0)); if (pushed) signalModifiedKey(c->db,c->argv[1]); server.dirty += pushed; + + /* Alter the replication of the command accordingly to the number of + * list elements delivered to clients waiting into a blocking operation. + * We do that only if there were waiting clients, and only if still some + * element was pushed into the list (othewise dirty is 0 and nothign will + * be propagated). */ + if (waiting && pushed) { + /* CMD KEY a b C D E */ + for (j = 2; j < pushed+2; j++) + rewriteClientCommandArgument(c,j,c->argv[j+waiting]); + c->argc -= waiting; + } } void lpushCommand(redisClient *c) { @@ -655,8 +667,6 @@ void lremCommand(redisClient *c) { */ void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) { - robj *aux; - if (!handleClientsWaitingListPush(origclient,dstkey,value)) { /* Create the list if the key does not exist */ if (!dstobj) { @@ -666,27 +676,19 @@ void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, signalModifiedKey(c->db,dstkey); } listTypePush(dstobj,value,REDIS_HEAD); - /* If we are pushing as a result of LPUSH against a key - * watched by BRPOPLPUSH, we need to rewrite the command vector - * as an LPUSH. - * - * If this is called directly by RPOPLPUSH (either directly - * or via a BRPOPLPUSH where the popped list exists) - * we should replicate the RPOPLPUSH command itself. */ - if (c != origclient) { - aux = createStringObject("LPUSH",5); - rewriteClientCommandVector(origclient,3,aux,dstkey,value); - decrRefCount(aux); - } else { - /* Make sure to always use RPOPLPUSH in the replication / AOF, - * even if the original command was BRPOPLPUSH. */ - aux = createStringObject("RPOPLPUSH",9); - rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]); - decrRefCount(aux); + /* Additionally propagate this PUSH operation together with + * the operation performed by the command. */ + { + robj **argv = zmalloc(sizeof(robj*)*3); + argv[0] = createStringObject("LPUSH",5); + argv[1] = dstkey; + argv[2] = value; + incrRefCount(argv[1]); + incrRefCount(argv[2]); + alsoPropagate(server.lpushCommand,c->db->id,argv,3, + REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL); } - server.dirty++; } - /* Always send the pushed value to the client. */ addReplyBulk(c,value); } @@ -717,6 +719,13 @@ void rpoplpushCommand(redisClient *c) { signalModifiedKey(c->db,touchedkey); decrRefCount(touchedkey); server.dirty++; + + /* Replicate this as a simple RPOP since the LPUSH side is replicated + * by rpoplpushHandlePush() call if needed (it may not be needed + * if a client is blocking wait a push against the list). */ + rewriteClientCommandVector(c,2, + resetRefCount(createStringObject("RPOP",4)), + c->argv[1]); } } -- 2.45.2