]> git.saurik.com Git - redis.git/commitdiff
Added a new API to replicate an additional command after the replication of the curre...
authorantirez <antirez@gmail.com>
Tue, 28 Feb 2012 17:03:08 +0000 (18:03 +0100)
committerantirez <antirez@gmail.com>
Tue, 28 Feb 2012 17:03:08 +0000 (18:03 +0100)
src/redis.c
src/redis.h
src/t_list.c

index 647c586736e3961796ddc4cc3fdb51c6672764ac..2eed47030b6e99ede8ebc31a106e7e46f7e1e792 100644 (file)
@@ -962,6 +962,7 @@ void initServerConfig() {
     populateCommandTable();
     server.delCommand = lookupCommandByCString("del");
     server.multiCommand = lookupCommandByCString("multi");
+    server.lpushCommand = lookupCommandByCString("lpush");
     
     /* Slow log */
     server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -1192,6 +1193,20 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
         replicationFeedSlaves(server.slaves,dbid,argv,argc);
 }
 
+/* Used inside commands to propatate an additional command if needed. */
+void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+                   int target)
+{
+    propagatedItem *pi = &server.also_propagate;
+
+    redisAssert(pi->target == REDIS_PROPAGATE_NONE);
+    pi->cmd = cmd;
+    pi->dbid = dbid;
+    pi->argv = argv;
+    pi->argc = argc;
+    pi->target = target;
+}
+
 /* Call() is the core of Redis execution of a command */
 void call(redisClient *c, int flags) {
     long long dirty, start = ustime(), duration;
@@ -1202,6 +1217,7 @@ void call(redisClient *c, int flags) {
         replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
 
     /* Call the command. */
+    server.also_propagate.target = REDIS_PROPAGATE_NONE;
     dirty = server.dirty;
     c->cmd->proc(c);
     dirty = server.dirty-dirty;
@@ -1232,6 +1248,16 @@ void call(redisClient *c, int flags) {
         if (flags != REDIS_PROPAGATE_NONE)
             propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
     }
+    /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
+     * PUSH command. */
+    if (server.also_propagate.target != REDIS_PROPAGATE_NONE) {
+        int j;
+        propagatedItem *pi = &server.also_propagate;
+
+        propagate(pi->cmd, pi->dbid, pi->argv, pi->argc, pi->target);
+        for (j = 0; j < pi->argc; j++) decrRefCount(pi->argv[j]);
+        zfree(pi->argv);
+    }
     server.stat_numcommands++;
 }
 
index f1e407b12798fe12b7e4b30983afa3e51b403ee1..09a746cc55b6cd924f5d1a39c0e1796c52a9813b 100644 (file)
@@ -399,6 +399,17 @@ typedef struct clientBufferLimitsConfig {
     time_t soft_limit_seconds;
 } clientBufferLimitsConfig;
 
+/* Currently only used to additionally propagate more commands to AOF/Replication
+ * after the propagation of the executed command.
+ * The structure contains everything needed to propagate a command:
+ * argv and argc, the ID of the database, pointer to the command table entry,
+ * and finally the target, that is an xor between REDIS_PROPAGATE_* flags. */
+typedef struct propagatedItem {
+    robj **argv;
+    int argc, dbid, target;
+    struct redisCommand *cmd;
+} propagatedItem;
+
 /*-----------------------------------------------------------------------------
  * Redis cluster data structures
  *----------------------------------------------------------------------------*/
@@ -562,7 +573,7 @@ struct redisServer {
     off_t loading_loaded_bytes;
     time_t loading_start_time;
     /* Fast pointers to often looked up command */
-    struct redisCommand *delCommand, *multiCommand;
+    struct redisCommand *delCommand, *multiCommand, *lpushCommand;
     int cronloops;                  /* Number of times the cron function run */
     time_t lastsave;                /* Unix time of last save succeeede */
     /* Fields used only for stats */
@@ -612,6 +623,8 @@ struct redisServer {
     int saveparamslen;              /* Number of saving points */
     char *rdb_filename;             /* Name of RDB file */
     int rdb_compression;            /* Use compression in RDB? */
+    /* Propagation of commands in AOF / replication */
+    propagatedItem also_propagate;  /* Additional command to propagate. */
     /* Logging */
     char *logfile;                  /* Path of log file */
     int syslog_enabled;             /* Is syslog enabled? */
@@ -956,6 +969,7 @@ struct redisCommand *lookupCommand(sds name);
 struct redisCommand *lookupCommandByCString(char *s);
 void call(redisClient *c, int flags);
 void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
+void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
 int prepareForShutdown();
 void redisLog(int level, const char *fmt, ...);
 void redisLogRaw(int level, const char *msg);
index f0c6790e3b5e10048a6d3aff173133726725db76..636c556c23721b4c61446467d3496283be96b40e 100644 (file)
@@ -288,6 +288,18 @@ void pushGenericCommand(redisClient *c, int where) {
     addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
     if (pushed) signalModifiedKey(c->db,c->argv[1]);
     server.dirty += pushed;
+
+    /* Alter the replication of the command accordingly to the number of
+     * list elements delivered to clients waiting into a blocking operation.
+     * We do that only if there were waiting clients, and only if still some
+     * element was pushed into the list (othewise dirty is 0 and nothign will
+     * be propagated). */
+    if (waiting && pushed) {
+        /* CMD KEY a b C D E */
+        for (j = 2; j < pushed+2; j++)
+            rewriteClientCommandArgument(c,j,c->argv[j+waiting]);
+        c->argc -= waiting;
+    }
 }
 
 void lpushCommand(redisClient *c) {
@@ -655,8 +667,6 @@ void lremCommand(redisClient *c) {
  */
 
 void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
-    robj *aux;
-
     if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
         /* Create the list if the key does not exist */
         if (!dstobj) {
@@ -666,27 +676,19 @@ void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey,
             signalModifiedKey(c->db,dstkey);
         }
         listTypePush(dstobj,value,REDIS_HEAD);
-        /* If we are pushing as a result of LPUSH against a key
-         * watched by BRPOPLPUSH, we need to rewrite the command vector
-         * as an LPUSH.
-         *
-         * If this is called directly by RPOPLPUSH (either directly
-         * or via a BRPOPLPUSH where the popped list exists)
-         * we should replicate the RPOPLPUSH command itself. */
-        if (c != origclient) {
-            aux = createStringObject("LPUSH",5);
-            rewriteClientCommandVector(origclient,3,aux,dstkey,value);
-            decrRefCount(aux);
-        } else {
-            /* Make sure to always use RPOPLPUSH in the replication / AOF,
-             * even if the original command was BRPOPLPUSH. */
-            aux = createStringObject("RPOPLPUSH",9);
-            rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
-            decrRefCount(aux);
+        /* Additionally propagate this PUSH operation together with
+         * the operation performed by the command. */
+        {
+            robj **argv = zmalloc(sizeof(robj*)*3);
+            argv[0] = createStringObject("LPUSH",5);
+            argv[1] = dstkey;
+            argv[2] = value;
+            incrRefCount(argv[1]);
+            incrRefCount(argv[2]);
+            alsoPropagate(server.lpushCommand,c->db->id,argv,3,
+                          REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
         }
-        server.dirty++;
     }
-
     /* Always send the pushed value to the client. */
     addReplyBulk(c,value);
 }
@@ -717,6 +719,13 @@ void rpoplpushCommand(redisClient *c) {
         signalModifiedKey(c->db,touchedkey);
         decrRefCount(touchedkey);
         server.dirty++;
+
+        /* Replicate this as a simple RPOP since the LPUSH side is replicated
+         * by rpoplpushHandlePush() call if needed (it may not be needed
+         * if a client is blocking wait a push against the list). */
+        rewriteClientCommandVector(c,2,
+            resetRefCount(createStringObject("RPOP",4)),
+            c->argv[1]);
     }
 }