]> git.saurik.com Git - redis.git/commitdiff
Better system for additional commands replication.
authorantirez <antirez@gmail.com>
Tue, 28 Feb 2012 23:46:50 +0000 (00:46 +0100)
committerantirez <antirez@gmail.com>
Tue, 28 Feb 2012 23:46:50 +0000 (00:46 +0100)
The new code uses a more generic data structure to describe redis operations.
The new design allows for multiple alsoPropagate() calls within the scope of a
single command, that is useful in different contexts. For instance there
when there are multiple clients doing BRPOPLPUSH against the same list,
and a variadic LPUSH is performed against this list, the blocked clients
will both be served, and we should correctly replicate multiple LPUSH
commands after the replication of the current command.

src/redis.c
src/redis.h

index 2eed47030b6e99ede8ebc31a106e7e46f7e1e792..78067d3106343484cedcd557fc8dfc3e9ff0637f 100644 (file)
@@ -1161,6 +1161,43 @@ void resetCommandTableStats(void) {
     }
 }
 
+/* ========================== Redis OP Array API ============================ */
+
+void redisOpArrayInit(redisOpArray *oa) {
+    oa->ops = NULL;
+    oa->numops = 0;
+}
+
+int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
+                       robj **argv, int argc, int target)
+{
+    redisOp *op;
+
+    oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
+    op = oa->ops+oa->numops;
+    op->cmd = cmd;
+    op->dbid = dbid;
+    op->argv = argv;
+    op->argc = argc;
+    op->target = target;
+    oa->numops++;
+    return oa->numops;
+}
+
+void redisOpArrayFree(redisOpArray *oa) {
+    while(oa->numops) {
+        int j;
+        redisOp *op;
+
+        oa->numops--;
+        op = oa->ops+oa->numops;
+        for (j = 0; j < op->argc; j++)
+            decrRefCount(op->argv[j]);
+        zfree(op->argv);
+    }
+    zfree(oa->ops);
+}
+
 /* ====================== Commands lookup and execution ===================== */
 
 struct redisCommand *lookupCommand(sds name) {
@@ -1193,18 +1230,12 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
         replicationFeedSlaves(server.slaves,dbid,argv,argc);
 }
 
-/* Used inside commands to propatate an additional command if needed. */
+/* Used inside commands to schedule the propagation of additional commands
+ * after the current command is propagated to AOF / Replication. */
 void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
                    int target)
 {
-    propagatedItem *pi = &server.also_propagate;
-
-    redisAssert(pi->target == REDIS_PROPAGATE_NONE);
-    pi->cmd = cmd;
-    pi->dbid = dbid;
-    pi->argv = argv;
-    pi->argc = argc;
-    pi->target = target;
+    redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target);
 }
 
 /* Call() is the core of Redis execution of a command */
@@ -1217,7 +1248,7 @@ void call(redisClient *c, int flags) {
         replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
 
     /* Call the command. */
-    server.also_propagate.target = REDIS_PROPAGATE_NONE;
+    redisOpArrayInit(&server.also_propagate);
     dirty = server.dirty;
     c->cmd->proc(c);
     dirty = server.dirty-dirty;
@@ -1250,13 +1281,15 @@ void call(redisClient *c, int flags) {
     }
     /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
      * PUSH command. */
-    if (server.also_propagate.target != REDIS_PROPAGATE_NONE) {
+    if (server.also_propagate.numops) {
         int j;
-        propagatedItem *pi = &server.also_propagate;
+        redisOp *rop;
 
-        propagate(pi->cmd, pi->dbid, pi->argv, pi->argc, pi->target);
-        for (j = 0; j < pi->argc; j++) decrRefCount(pi->argv[j]);
-        zfree(pi->argv);
+        for (j = 0; j < server.also_propagate.numops; j++) {
+            rop = &server.also_propagate.ops[j];
+            propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
+        }
+        redisOpArrayFree(&server.also_propagate);
     }
     server.stat_numcommands++;
 }
index 09a746cc55b6cd924f5d1a39c0e1796c52a9813b..c26178702cdb04fa041cfb4b773e66fc7fa12478 100644 (file)
@@ -399,16 +399,29 @@ typedef struct clientBufferLimitsConfig {
     time_t soft_limit_seconds;
 } clientBufferLimitsConfig;
 
-/* Currently only used to additionally propagate more commands to AOF/Replication
- * after the propagation of the executed command.
- * The structure contains everything needed to propagate a command:
- * argv and argc, the ID of the database, pointer to the command table entry,
- * and finally the target, that is an xor between REDIS_PROPAGATE_* flags. */
-typedef struct propagatedItem {
+/* The redisOp structure defines a Redis Operation, that is an instance of
+ * a command with an argument vector, database ID, propagation target
+ * (REDIS_PROPAGATE_*), and command pointer.
+ *
+ * Currently only used to additionally propagate more commands to AOF/Replication
+ * after the propagation of the executed command. */
+typedef struct redisOp {
     robj **argv;
     int argc, dbid, target;
     struct redisCommand *cmd;
-} propagatedItem;
+} redisOp;
+
+/* Defines an array of Redis operations. There is an API to add to this
+ * structure in a easy way.
+ *
+ * redisOpArrayInit();
+ * redisOpArrayAppend();
+ * redisOpArrayFree();
+ */
+typedef struct redisOpArray {
+    redisOp *ops;
+    int numops;
+} redisOpArray;
 
 /*-----------------------------------------------------------------------------
  * Redis cluster data structures
@@ -624,7 +637,7 @@ struct redisServer {
     char *rdb_filename;             /* Name of RDB file */
     int rdb_compression;            /* Use compression in RDB? */
     /* Propagation of commands in AOF / replication */
-    propagatedItem also_propagate;  /* Additional command to propagate. */
+    redisOpArray also_propagate;    /* Additional command to propagate. */
     /* Logging */
     char *logfile;                  /* Path of log file */
     int syslog_enabled;             /* Is syslog enabled? */