]> git.saurik.com Git - redis.git/blobdiff - src/t_list.c
Fix for bug 561 and other related problems
[redis.git] / src / t_list.c
index 9e6590929a64e450f386e6550c66b3b6b33d3bc5..d88277261650230c17eb510c82b8fc7e2e33ca71 100644 (file)
@@ -259,30 +259,35 @@ void listTypeConvert(robj *subject, int enc) {
  *----------------------------------------------------------------------------*/
 
 void pushGenericCommand(redisClient *c, int where) {
+    int j, addlen = 0, pushed = 0;
     robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
-    c->argv[2] = tryObjectEncoding(c->argv[2]);
-    if (lobj == NULL) {
-        if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
-            addReply(c,shared.cone);
-            return;
-        }
-        lobj = createZiplistObject();
-        dbAdd(c->db,c->argv[1],lobj);
-    } else {
-        if (lobj->type != REDIS_LIST) {
-            addReply(c,shared.wrongtypeerr);
-            return;
+    int may_have_waiting_clients = (lobj == NULL);
+
+    if (lobj && lobj->type != REDIS_LIST) {
+        addReply(c,shared.wrongtypeerr);
+        return;
+    }
+
+    for (j = 2; j < c->argc; j++) {
+        c->argv[j] = tryObjectEncoding(c->argv[j]);
+        if (may_have_waiting_clients) {
+            if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
+                addlen++;
+                continue;
+            } else {
+                may_have_waiting_clients = 0;
+            }
         }
-        if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
-            signalModifiedKey(c->db,c->argv[1]);
-            addReply(c,shared.cone);
-            return;
+        if (!lobj) {
+            lobj = createZiplistObject();
+            dbAdd(c->db,c->argv[1],lobj);
         }
+        listTypePush(lobj,c->argv[j],where);
+        pushed++;
     }
-    listTypePush(lobj,c->argv[2],where);
-    addReplyLongLong(c,listTypeLength(lobj));
-    signalModifiedKey(c->db,c->argv[1]);
-    server.dirty++;
+    addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0));
+    if (pushed) signalModifiedKey(c->db,c->argv[1]);
+    server.dirty += pushed;
 }
 
 void lpushCommand(redisClient *c) {
@@ -635,7 +640,9 @@ void lremCommand(redisClient *c) {
  * as well. This command was originally proposed by Ezra Zygmuntowicz.
  */
 
-void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+    robj *aux;
+
     if (!handleClientsWaitingListPush(c,dstkey,value)) {
         /* Create the list if the key does not exist */
         if (!dstobj) {
@@ -643,9 +650,25 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
             dbAdd(c->db,dstkey,dstobj);
         } else {
             signalModifiedKey(c->db,dstkey);
-            server.dirty++;
         }
         listTypePush(dstobj,value,REDIS_HEAD);
+        /* If we are pushing as a result of LPUSH against a key
+         * watched by BLPOPLPUSH, we need to rewrite the command vector.
+         * But if this is called directly by RPOPLPUSH (either directly
+         * or via a BRPOPLPUSH where the popped list exists)
+         * we should replicate the BRPOPLPUSH command itself. */
+        if (c != origclient) {
+            aux = createStringObject("LPUSH",5);
+            rewriteClientCommandVector(origclient,3,aux,dstkey,value);
+            decrRefCount(aux);
+        } else {
+            /* Make sure to always use RPOPLPUSH in the replication / AOF,
+             * even if the original command was BRPOPLPUSH. */
+            aux = createStringObject("RPOPLPUSH",9);
+            rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
+            decrRefCount(aux);
+        }
+        server.dirty++;
     }
 
     /* Always send the pushed value to the client. */
@@ -661,16 +684,22 @@ void rpoplpushCommand(redisClient *c) {
         addReply(c,shared.nullbulk);
     } else {
         robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
+        robj *touchedkey = c->argv[1];
+
         if (dobj && checkType(c,dobj,REDIS_LIST)) return;
         value = listTypePop(sobj,REDIS_TAIL);
-        rpoplpushHandlePush(c,c->argv[2],dobj,value);
+        /* We saved touched key, and protect it, since rpoplpushHandlePush
+         * may change the client command argument vector. */
+        incrRefCount(touchedkey);
+        rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
 
         /* listTypePop returns an object with its refcount incremented */
         decrRefCount(value);
 
         /* Delete the source list when it is empty */
-        if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
-        signalModifiedKey(c->db,c->argv[1]);
+        if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
+        signalModifiedKey(c->db,touchedkey);
+        decrRefCount(touchedkey);
         server.dirty++;
     }
 }
@@ -772,6 +801,7 @@ void unblockClientWaitingData(redisClient *c) {
     /* Cleanup the client structure */
     zfree(c->bpop.keys);
     c->bpop.keys = NULL;
+    if (c->bpop.target) decrRefCount(c->bpop.target);
     c->bpop.target = NULL;
     c->flags &= ~REDIS_BLOCKED;
     c->flags |= REDIS_UNBLOCKED;
@@ -815,26 +845,28 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
         receiver = ln->value;
         dstkey = receiver->bpop.target;
 
+        /* Protect receiver->bpop.target, that will be freed by
+         * the next unblockClientWaitingData() call. */
+        if (dstkey) incrRefCount(dstkey);
+
         /* This should remove the first element of the "clients" list. */
         unblockClientWaitingData(receiver);
-        redisAssert(ln != listFirst(clients));
 
         if (dstkey == NULL) {
             /* BRPOP/BLPOP */
             addReplyMultiBulkLen(receiver,2);
             addReplyBulk(receiver,key);
             addReplyBulk(receiver,ele);
-            return 1;
+            return 1; /* Serve just the first client as in B[RL]POP semantics */
         } else {
             /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
             dstobj = lookupKeyWrite(receiver->db,dstkey);
-            if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
-                decrRefCount(dstkey);
-            } else {
-                rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
+            if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
+                rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
                 decrRefCount(dstkey);
                 return 1;
             }
+            decrRefCount(dstkey);
         }
     }