]> git.saurik.com Git - redis.git/commitdiff
Fix for bug 561 and other related problems
authorantirez <antirez@gmail.com>
Mon, 20 Jun 2011 15:07:18 +0000 (17:07 +0200)
committerantirez <antirez@gmail.com>
Mon, 20 Jun 2011 15:19:36 +0000 (17:19 +0200)
src/networking.c
src/redis.h
src/t_list.c
src/t_set.c
tests/integration/replication.tcl
tests/unit/type/list.tcl

index c7b56ca4cc8e8c90bfe143b847825a632d7dc32c..8f2e6d8f49b7596d725c865c374435cbadd5a500 100644 (file)
@@ -947,3 +947,28 @@ void clientCommand(redisClient *c) {
         addReplyError(c, "Syntax error, try CLIENT (LIST | KILL ip:port)");
     }
 }
+
+void rewriteClientCommandVector(redisClient *c, int argc, ...) {
+    va_list ap;
+    int j;
+    robj **argv; /* The new argument vector */
+
+    argv = zmalloc(sizeof(robj*)*argc);
+    va_start(ap,argc);
+    for (j = 0; j < argc; j++) {
+        robj *a;
+        
+        a = va_arg(ap, robj*);
+        argv[j] = a;
+        incrRefCount(a);
+    }
+    /* We free the objects in the original vector at the end, so we are
+     * sure that if the same objects are reused in the new vector the
+     * refcount gets incremented before it gets decremented. */
+    for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
+    zfree(c->argv);
+    /* Replace argv and argc with our new versions. */
+    c->argv = argv;
+    c->argc = argc;
+    va_end(ap);
+}
index 27131e5d9d5d65291c396389a4251cd0e0502a6a..cab7607e59c0f8d51b87fb267f0e82b0a7b48158 100644 (file)
@@ -819,6 +819,7 @@ void addReplyMultiBulkLen(redisClient *c, long length);
 void *dupClientReplyValue(void *o);
 void getClientsMaxBuffers(unsigned long *longest_output_list,
                           unsigned long *biggest_input_buffer);
+void rewriteClientCommandVector(redisClient *c, int argc, ...);
 
 #ifdef __GNUC__
 void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
index 7c1b848a814b3a9cc08ceb61e615d3eb5e167744..d88277261650230c17eb510c82b8fc7e2e33ca71 100644 (file)
@@ -640,7 +640,9 @@ void lremCommand(redisClient *c) {
  * as well. This command was originally proposed by Ezra Zygmuntowicz.
  */
 
-void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+    robj *aux;
+
     if (!handleClientsWaitingListPush(c,dstkey,value)) {
         /* Create the list if the key does not exist */
         if (!dstobj) {
@@ -648,9 +650,25 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
             dbAdd(c->db,dstkey,dstobj);
         } else {
             signalModifiedKey(c->db,dstkey);
-            server.dirty++;
         }
         listTypePush(dstobj,value,REDIS_HEAD);
+        /* If we are pushing as a result of LPUSH against a key
+         * watched by BLPOPLPUSH, we need to rewrite the command vector.
+         * But if this is called directly by RPOPLPUSH (either directly
+         * or via a BRPOPLPUSH where the popped list exists)
+         * we should replicate the BRPOPLPUSH command itself. */
+        if (c != origclient) {
+            aux = createStringObject("LPUSH",5);
+            rewriteClientCommandVector(origclient,3,aux,dstkey,value);
+            decrRefCount(aux);
+        } else {
+            /* Make sure to always use RPOPLPUSH in the replication / AOF,
+             * even if the original command was BRPOPLPUSH. */
+            aux = createStringObject("RPOPLPUSH",9);
+            rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
+            decrRefCount(aux);
+        }
+        server.dirty++;
     }
 
     /* Always send the pushed value to the client. */
@@ -666,16 +684,22 @@ void rpoplpushCommand(redisClient *c) {
         addReply(c,shared.nullbulk);
     } else {
         robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
+        robj *touchedkey = c->argv[1];
+
         if (dobj && checkType(c,dobj,REDIS_LIST)) return;
         value = listTypePop(sobj,REDIS_TAIL);
-        rpoplpushHandlePush(c,c->argv[2],dobj,value);
+        /* We saved touched key, and protect it, since rpoplpushHandlePush
+         * may change the client command argument vector. */
+        incrRefCount(touchedkey);
+        rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
 
         /* listTypePop returns an object with its refcount incremented */
         decrRefCount(value);
 
         /* Delete the source list when it is empty */
-        if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
-        signalModifiedKey(c->db,c->argv[1]);
+        if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
+        signalModifiedKey(c->db,touchedkey);
+        decrRefCount(touchedkey);
         server.dirty++;
     }
 }
@@ -777,6 +801,7 @@ void unblockClientWaitingData(redisClient *c) {
     /* Cleanup the client structure */
     zfree(c->bpop.keys);
     c->bpop.keys = NULL;
+    if (c->bpop.target) decrRefCount(c->bpop.target);
     c->bpop.target = NULL;
     c->flags &= ~REDIS_BLOCKED;
     c->flags |= REDIS_UNBLOCKED;
@@ -820,6 +845,10 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
         receiver = ln->value;
         dstkey = receiver->bpop.target;
 
+        /* Protect receiver->bpop.target, that will be freed by
+         * the next unblockClientWaitingData() call. */
+        if (dstkey) incrRefCount(dstkey);
+
         /* This should remove the first element of the "clients" list. */
         unblockClientWaitingData(receiver);
 
@@ -828,17 +857,16 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
             addReplyMultiBulkLen(receiver,2);
             addReplyBulk(receiver,key);
             addReplyBulk(receiver,ele);
-            return 1;
+            return 1; /* Serve just the first client as in B[RL]POP semantics */
         } else {
             /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
             dstobj = lookupKeyWrite(receiver->db,dstkey);
-            if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
-                decrRefCount(dstkey);
-            } else {
-                rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
+            if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
+                rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
                 decrRefCount(dstkey);
                 return 1;
             }
+            decrRefCount(dstkey);
         }
     }
 
index c7d05c2f1f8f82c68f10b54596bae8816481da33..bffea3d4d6d256dcf25b7c794b10a651389a5606 100644 (file)
@@ -332,7 +332,7 @@ void scardCommand(redisClient *c) {
 }
 
 void spopCommand(redisClient *c) {
-    robj *set, *ele;
+    robj *set, *ele, *aux;
     int64_t llele;
     int encoding;
 
@@ -348,16 +348,11 @@ void spopCommand(redisClient *c) {
         setTypeRemove(set,ele);
     }
 
-    /* Change argv to replicate as SREM */
-    c->argc = 3;
-    c->argv = zrealloc(c->argv,sizeof(robj*)*(c->argc));
-
-    /* Overwrite SREM with SPOP (same length) */
-    redisAssert(sdslen(c->argv[0]->ptr) == 4);
-    memcpy(c->argv[0]->ptr, "SREM", 4);
-
-    /* Popped element already has incremented refcount */
-    c->argv[2] = ele;
+    /* Replicate/AOF this command as an SREM operation */
+    aux = createStringObject("SREM",4);
+    rewriteClientCommandVector(c,3,aux,c->argv[1],ele);
+    decrRefCount(ele);
+    decrRefCount(aux);
 
     addReplyBulk(c,ele);
     if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
index 892fae03b662f6c585e755223912c21bdfab2037..227356b24de6399f928e305058f942728d46e2e6 100644 (file)
@@ -6,6 +6,24 @@ start_server {tags {"repl"}} {
             s -1 role
         } {slave}
 
+        test {BRPOPLPUSH replication, when blocking against empty list} {
+            set rd [redis_deferring_client]
+            $rd brpoplpush a b 5
+            r lpush a foo
+            after 1000
+            assert_equal [r debug digest] [r -1 debug digest]
+        }
+
+        test {BRPOPLPUSH replication, list exists} {
+            set rd [redis_deferring_client]
+            r lpush c 1
+            r lpush c 2
+            r lpush c 3
+            $rd brpoplpush c d 5
+            after 1000
+            assert_equal [r debug digest] [r -1 debug digest]
+        }
+
         test {MASTER and SLAVE dataset should be identical after complex ops} {
             createComplexDataset r 10000
             after 500
index b6055b2660916e199f378679ed7094b910e5b529..616abd21a04972a1fbf5d673888bb312daa5d8dc 100644 (file)
@@ -278,6 +278,14 @@ start_server {
         r exec
     } {foo bar {} {} {bar foo}}
 
+    test {BRPOPLPUSH timeout} {
+      set rd [redis_deferring_client]
+
+      $rd brpoplpush foo_list bar_list 1
+      after 2000
+      $rd read
+    } {}
+
     foreach {pop} {BLPOP BRPOP} {
         test "$pop: with single empty list argument" {
             set rd [redis_deferring_client]