]> git.saurik.com Git - redis.git/blobdiff - src/t_list.c
client buffer handling refactoring and optimization
[redis.git] / src / t_list.c
index ceed70c31697f26d2ab02a2eb2b39993e9c46cdb..f856d6cde823decbc0d62bbc8e9131f66692c727 100644 (file)
@@ -198,7 +198,7 @@ void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
 int listTypeEqual(listTypeEntry *entry, robj *o) {
     listTypeIterator *li = entry->li;
     if (li->encoding == REDIS_ENCODING_ZIPLIST) {
 int listTypeEqual(listTypeEntry *entry, robj *o) {
     listTypeIterator *li = entry->li;
     if (li->encoding == REDIS_ENCODING_ZIPLIST) {
-        redisAssert(o->encoding == REDIS_ENCODING_RAW);
+        redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
         return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
     } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
         return equalStringObjects(o,listNodeValue(entry->ln));
         return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
     } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
         return equalStringObjects(o,listNodeValue(entry->ln));
@@ -235,7 +235,7 @@ void listTypeDelete(listTypeEntry *entry) {
 void listTypeConvert(robj *subject, int enc) {
     listTypeIterator *li;
     listTypeEntry entry;
 void listTypeConvert(robj *subject, int enc) {
     listTypeIterator *li;
     listTypeEntry entry;
-    redisAssert(subject->type == REDIS_LIST);
+    redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
 
     if (enc == REDIS_ENCODING_LINKEDLIST) {
         list *l = listCreate();
 
     if (enc == REDIS_ENCODING_LINKEDLIST) {
         list *l = listCreate();
@@ -259,30 +259,35 @@ void listTypeConvert(robj *subject, int enc) {
  *----------------------------------------------------------------------------*/
 
 void pushGenericCommand(redisClient *c, int where) {
  *----------------------------------------------------------------------------*/
 
 void pushGenericCommand(redisClient *c, int where) {
+    int j, addlen = 0, pushed = 0;
     robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
     robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
-    c->argv[2] = tryObjectEncoding(c->argv[2]);
-    if (lobj == NULL) {
-        if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
-            addReply(c,shared.cone);
-            return;
-        }
-        lobj = createZiplistObject();
-        dbAdd(c->db,c->argv[1],lobj);
-    } else {
-        if (lobj->type != REDIS_LIST) {
-            addReply(c,shared.wrongtypeerr);
-            return;
+    int may_have_waiting_clients = (lobj == NULL);
+
+    if (lobj && lobj->type != REDIS_LIST) {
+        addReply(c,shared.wrongtypeerr);
+        return;
+    }
+
+    for (j = 2; j < c->argc; j++) {
+        c->argv[j] = tryObjectEncoding(c->argv[j]);
+        if (may_have_waiting_clients) {
+            if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
+                addlen++;
+                continue;
+            } else {
+                may_have_waiting_clients = 0;
+            }
         }
         }
-        if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
-            touchWatchedKey(c->db,c->argv[1]);
-            addReply(c,shared.cone);
-            return;
+        if (!lobj) {
+            lobj = createZiplistObject();
+            dbAdd(c->db,c->argv[1],lobj);
         }
         }
+        listTypePush(lobj,c->argv[j],where);
+        pushed++;
     }
     }
-    listTypePush(lobj,c->argv[2],where);
-    addReplyLongLong(c,listTypeLength(lobj));
-    touchWatchedKey(c->db,c->argv[1]);
-    server.dirty++;
+    addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0));
+    if (pushed) signalModifiedKey(c->db,c->argv[1]);
+    server.dirty += pushed;
 }
 
 void lpushCommand(redisClient *c) {
 }
 
 void lpushCommand(redisClient *c) {
@@ -305,7 +310,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
     if (refval != NULL) {
         /* Note: we expect refval to be string-encoded because it is *not* the
          * last argument of the multi-bulk LINSERT. */
     if (refval != NULL) {
         /* Note: we expect refval to be string-encoded because it is *not* the
          * last argument of the multi-bulk LINSERT. */
-        redisAssert(refval->encoding == REDIS_ENCODING_RAW);
+        redisAssertWithInfo(c,refval,refval->encoding == REDIS_ENCODING_RAW);
 
         /* We're not sure if this value can be inserted yet, but we cannot
          * convert the list inside the iterator. We don't want to loop over
 
         /* We're not sure if this value can be inserted yet, but we cannot
          * convert the list inside the iterator. We don't want to loop over
@@ -330,7 +335,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
             if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
                 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
                     listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
             if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
                 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
                     listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
-            touchWatchedKey(c->db,c->argv[1]);
+            signalModifiedKey(c->db,c->argv[1]);
             server.dirty++;
         } else {
             /* Notify client of a failed insert */
             server.dirty++;
         } else {
             /* Notify client of a failed insert */
@@ -339,7 +344,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
         }
     } else {
         listTypePush(subject,val,where);
         }
     } else {
         listTypePush(subject,val,where);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     }
 
         server.dirty++;
     }
 
@@ -376,9 +381,12 @@ void llenCommand(redisClient *c) {
 void lindexCommand(redisClient *c) {
     robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
     if (o == NULL || checkType(c,o,REDIS_LIST)) return;
 void lindexCommand(redisClient *c) {
     robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
     if (o == NULL || checkType(c,o,REDIS_LIST)) return;
-    int index = atoi(c->argv[2]->ptr);
+    long index;
     robj *value = NULL;
 
     robj *value = NULL;
 
+    if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
+        return;
+
     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *p;
         unsigned char *vstr;
     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *p;
         unsigned char *vstr;
@@ -412,9 +420,12 @@ void lindexCommand(redisClient *c) {
 void lsetCommand(redisClient *c) {
     robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
     if (o == NULL || checkType(c,o,REDIS_LIST)) return;
 void lsetCommand(redisClient *c) {
     robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
     if (o == NULL || checkType(c,o,REDIS_LIST)) return;
-    int index = atoi(c->argv[2]->ptr);
+    long index;
     robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
 
     robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
 
+    if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
+        return;
+
     listTypeTryConversion(o,value);
     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *p, *zl = o->ptr;
     listTypeTryConversion(o,value);
     if (o->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *p, *zl = o->ptr;
@@ -427,7 +438,7 @@ void lsetCommand(redisClient *c) {
             o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
             decrRefCount(value);
             addReply(c,shared.ok);
             o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
             decrRefCount(value);
             addReply(c,shared.ok);
-            touchWatchedKey(c->db,c->argv[1]);
+            signalModifiedKey(c->db,c->argv[1]);
             server.dirty++;
         }
     } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
             server.dirty++;
         }
     } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
@@ -439,7 +450,7 @@ void lsetCommand(redisClient *c) {
             listNodeValue(ln) = value;
             incrRefCount(value);
             addReply(c,shared.ok);
             listNodeValue(ln) = value;
             incrRefCount(value);
             addReply(c,shared.ok);
-            touchWatchedKey(c->db,c->argv[1]);
+            signalModifiedKey(c->db,c->argv[1]);
             server.dirty++;
         }
     } else {
             server.dirty++;
         }
     } else {
@@ -458,7 +469,7 @@ void popGenericCommand(redisClient *c, int where) {
         addReplyBulk(c,value);
         decrRefCount(value);
         if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
         addReplyBulk(c,value);
         decrRefCount(value);
         if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
-        touchWatchedKey(c->db,c->argv[1]);
+        signalModifiedKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
         server.dirty++;
     }
 }
@@ -472,12 +483,14 @@ void rpopCommand(redisClient *c) {
 }
 
 void lrangeCommand(redisClient *c) {
 }
 
 void lrangeCommand(redisClient *c) {
-    robj *o, *value;
-    int start = atoi(c->argv[2]->ptr);
-    int end = atoi(c->argv[3]->ptr);
+    robj *o;
+    long start;
+    long end;
     int llen;
     int llen;
-    int rangelen, j;
-    listTypeEntry entry;
+    int rangelen;
+
+    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
+        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
 
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
          || checkType(c,o,REDIS_LIST)) return;
 
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
          || checkType(c,o,REDIS_LIST)) return;
@@ -499,25 +512,50 @@ void lrangeCommand(redisClient *c) {
 
     /* Return the result in form of a multi-bulk reply */
     addReplyMultiBulkLen(c,rangelen);
 
     /* Return the result in form of a multi-bulk reply */
     addReplyMultiBulkLen(c,rangelen);
-    listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL);
-    for (j = 0; j < rangelen; j++) {
-        redisAssert(listTypeNext(li,&entry));
-        value = listTypeGet(&entry);
-        addReplyBulk(c,value);
-        decrRefCount(value);
+    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *p = ziplistIndex(o->ptr,start);
+        unsigned char *vstr;
+        unsigned int vlen;
+        long long vlong;
+
+        while(rangelen--) {
+            ziplistGet(p,&vstr,&vlen,&vlong);
+            if (vstr) {
+                addReplyBulkCBuffer(c,vstr,vlen);
+            } else {
+                addReplyBulkLongLong(c,vlong);
+            }
+            p = ziplistNext(o->ptr,p);
+        }
+    } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
+        listNode *ln;
+
+        /* If we are nearest to the end of the list, reach the element
+         * starting from tail and going backward, as it is faster. */
+        if (start > llen/2) start -= llen;
+        ln = listIndex(o->ptr,start);
+
+        while(rangelen--) {
+            addReplyBulk(c,ln->value);
+            ln = ln->next;
+        }
+    } else {
+        redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
     }
     }
-    listTypeReleaseIterator(li);
 }
 
 void ltrimCommand(redisClient *c) {
     robj *o;
 }
 
 void ltrimCommand(redisClient *c) {
     robj *o;
-    int start = atoi(c->argv[2]->ptr);
-    int end = atoi(c->argv[3]->ptr);
+    long start;
+    long end;
     int llen;
     int j, ltrim, rtrim;
     list *list;
     listNode *ln;
 
     int llen;
     int j, ltrim, rtrim;
     list *list;
     listNode *ln;
 
+    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
+        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
+
     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
         checkType(c,o,REDIS_LIST)) return;
     llen = listTypeLength(o);
     if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
         checkType(c,o,REDIS_LIST)) return;
     llen = listTypeLength(o);
@@ -557,7 +595,7 @@ void ltrimCommand(redisClient *c) {
         redisPanic("Unknown list encoding");
     }
     if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
         redisPanic("Unknown list encoding");
     }
     if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
     addReply(c,shared.ok);
 }
     server.dirty++;
     addReply(c,shared.ok);
 }
@@ -565,10 +603,13 @@ void ltrimCommand(redisClient *c) {
 void lremCommand(redisClient *c) {
     robj *subject, *obj;
     obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
 void lremCommand(redisClient *c) {
     robj *subject, *obj;
     obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
-    int toremove = atoi(c->argv[2]->ptr);
+    long toremove;
     int removed = 0;
     listTypeEntry entry;
 
     int removed = 0;
     listTypeEntry entry;
 
+    if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
+        return;
+
     subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
     if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
 
     subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
     if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
 
@@ -600,24 +641,62 @@ void lremCommand(redisClient *c) {
 
     if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
     addReplyLongLong(c,removed);
 
     if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
     addReplyLongLong(c,removed);
-    if (removed) touchWatchedKey(c->db,c->argv[1]);
+    if (removed) signalModifiedKey(c->db,c->argv[1]);
 }
 
 /* This is the semantic of this command:
  *  RPOPLPUSH srclist dstlist:
 }
 
 /* This is the semantic of this command:
  *  RPOPLPUSH srclist dstlist:
- *   IF LLEN(srclist) > 0
- *     element = RPOP srclist
- *     LPUSH dstlist element
- *     RETURN element
- *   ELSE
- *     RETURN nil
- *   END
+ *    IF LLEN(srclist) > 0
+ *      element = RPOP srclist
+ *      LPUSH dstlist element
+ *      RETURN element
+ *    ELSE
+ *      RETURN nil
+ *    END
  *  END
  *
  * The idea is to be able to get an element from a list in a reliable way
  * since the element is not just returned but pushed against another list
  * as well. This command was originally proposed by Ezra Zygmuntowicz.
  */
  *  END
  *
  * The idea is to be able to get an element from a list in a reliable way
  * since the element is not just returned but pushed against another list
  * as well. This command was originally proposed by Ezra Zygmuntowicz.
  */
+
+void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+    robj *aux;
+
+    if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
+        /* Create the list if the key does not exist */
+        if (!dstobj) {
+            dstobj = createZiplistObject();
+            dbAdd(c->db,dstkey,dstobj);
+        } else {
+            signalModifiedKey(c->db,dstkey);
+        }
+        listTypePush(dstobj,value,REDIS_HEAD);
+        /* If we are pushing as a result of LPUSH against a key
+         * watched by BRPOPLPUSH, we need to rewrite the command vector
+         * as an LPUSH.
+         *
+         * If this is called directly by RPOPLPUSH (either directly
+         * or via a BRPOPLPUSH where the popped list exists)
+         * we should replicate the RPOPLPUSH command itself. */
+        if (c != origclient) {
+            aux = createStringObject("LPUSH",5);
+            rewriteClientCommandVector(origclient,3,aux,dstkey,value);
+            decrRefCount(aux);
+        } else {
+            /* Make sure to always use RPOPLPUSH in the replication / AOF,
+             * even if the original command was BRPOPLPUSH. */
+            aux = createStringObject("RPOPLPUSH",9);
+            rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
+            decrRefCount(aux);
+        }
+        server.dirty++;
+    }
+
+    /* Always send the pushed value to the client. */
+    addReplyBulk(c,value);
+}
+
 void rpoplpushCommand(redisClient *c) {
     robj *sobj, *value;
     if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
 void rpoplpushCommand(redisClient *c) {
     robj *sobj, *value;
     if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
@@ -627,29 +706,22 @@ void rpoplpushCommand(redisClient *c) {
         addReply(c,shared.nullbulk);
     } else {
         robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
         addReply(c,shared.nullbulk);
     } else {
         robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
+        robj *touchedkey = c->argv[1];
+
         if (dobj && checkType(c,dobj,REDIS_LIST)) return;
         value = listTypePop(sobj,REDIS_TAIL);
         if (dobj && checkType(c,dobj,REDIS_LIST)) return;
         value = listTypePop(sobj,REDIS_TAIL);
-
-        /* Add the element to the target list (unless it's directly
-         * passed to some BLPOP-ing client */
-        if (!handleClientsWaitingListPush(c,c->argv[2],value)) {
-            /* Create the list if the key does not exist */
-            if (!dobj) {
-                dobj = createZiplistObject();
-                dbAdd(c->db,c->argv[2],dobj);
-            }
-            listTypePush(dobj,value,REDIS_HEAD);
-        }
-
-        /* Send the element to the client as reply as well */
-        addReplyBulk(c,value);
+        /* We saved touched key, and protect it, since rpoplpushHandlePush
+         * may change the client command argument vector. */
+        incrRefCount(touchedkey);
+        rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
 
         /* listTypePop returns an object with its refcount incremented */
         decrRefCount(value);
 
         /* Delete the source list when it is empty */
 
         /* listTypePop returns an object with its refcount incremented */
         decrRefCount(value);
 
         /* Delete the source list when it is empty */
-        if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
-        touchWatchedKey(c->db,c->argv[1]);
+        if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
+        signalModifiedKey(c->db,touchedkey);
+        decrRefCount(touchedkey);
         server.dirty++;
     }
 }
         server.dirty++;
     }
 }
@@ -700,7 +772,7 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj
     c->bpop.target = target;
 
     if (target != NULL) {
     c->bpop.target = target;
 
     if (target != NULL) {
-      incrRefCount(target);
+        incrRefCount(target);
     }
 
     for (j = 0; j < numkeys; j++) {
     }
 
     for (j = 0; j < numkeys; j++) {
@@ -717,15 +789,15 @@ void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj
             l = listCreate();
             retval = dictAdd(c->db->blocking_keys,keys[j],l);
             incrRefCount(keys[j]);
             l = listCreate();
             retval = dictAdd(c->db->blocking_keys,keys[j],l);
             incrRefCount(keys[j]);
-            redisAssert(retval == DICT_OK);
+            redisAssertWithInfo(c,keys[j],retval == DICT_OK);
         } else {
         } else {
-            l = dictGetEntryVal(de);
+            l = dictGetVal(de);
         }
         listAddNodeTail(l,c);
     }
     /* Mark the client as a blocked client */
     c->flags |= REDIS_BLOCKED;
         }
         listAddNodeTail(l,c);
     }
     /* Mark the client as a blocked client */
     c->flags |= REDIS_BLOCKED;
-    server.blpop_blocked_clients++;
+    server.bpop_blocked_clients++;
 }
 
 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
 }
 
 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
@@ -734,13 +806,13 @@ void unblockClientWaitingData(redisClient *c) {
     list *l;
     int j;
 
     list *l;
     int j;
 
-    redisAssert(c->bpop.keys != NULL);
+    redisAssertWithInfo(c,NULL,c->bpop.keys != NULL);
     /* The client may wait for multiple keys, so unblock it for every key. */
     for (j = 0; j < c->bpop.count; j++) {
         /* Remove this client from the list of clients waiting for this key. */
         de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
     /* The client may wait for multiple keys, so unblock it for every key. */
     for (j = 0; j < c->bpop.count; j++) {
         /* Remove this client from the list of clients waiting for this key. */
         de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
-        redisAssert(de != NULL);
-        l = dictGetEntryVal(de);
+        redisAssertWithInfo(c,c->bpop.keys[j],de != NULL);
+        l = dictGetVal(de);
         listDelNode(l,listSearchKey(l,c));
         /* If the list is empty we need to remove it to avoid wasting memory */
         if (listLength(l) == 0)
         listDelNode(l,listSearchKey(l,c));
         /* If the list is empty we need to remove it to avoid wasting memory */
         if (listLength(l) == 0)
@@ -751,15 +823,12 @@ void unblockClientWaitingData(redisClient *c) {
     /* Cleanup the client structure */
     zfree(c->bpop.keys);
     c->bpop.keys = NULL;
     /* Cleanup the client structure */
     zfree(c->bpop.keys);
     c->bpop.keys = NULL;
+    if (c->bpop.target) decrRefCount(c->bpop.target);
     c->bpop.target = NULL;
     c->bpop.target = NULL;
-    c->flags &= (~REDIS_BLOCKED);
-    server.blpop_blocked_clients--;
-    /* We want to process data if there is some command waiting
-     * in the input buffer. Note that this is safe even if
-     * unblockClientWaitingData() gets called from freeClient() because
-     * freeClient() will be smart enough to call this function
-     * *after* c->querybuf was set to NULL. */
-    if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
+    c->flags &= ~REDIS_BLOCKED;
+    c->flags |= REDIS_UNBLOCKED;
+    server.bpop_blocked_clients--;
+    listAddNodeTail(server.unblocked_clients,c);
 }
 
 /* This should be called from any function PUSHing into lists.
 }
 
 /* This should be called from any function PUSHing into lists.
@@ -775,62 +844,71 @@ void unblockClientWaitingData(redisClient *c) {
 int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
     struct dictEntry *de;
     redisClient *receiver;
 int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
     struct dictEntry *de;
     redisClient *receiver;
-    list *l;
+    int numclients;
+    list *clients;
     listNode *ln;
     listNode *ln;
+    robj *dstkey, *dstobj;
 
     de = dictFind(c->db->blocking_keys,key);
     if (de == NULL) return 0;
 
     de = dictFind(c->db->blocking_keys,key);
     if (de == NULL) return 0;
-    l = dictGetEntryVal(de);
-    ln = listFirst(l);
-    redisAssert(ln != NULL);
-    receiver = ln->value;
-
-    robj *target = receiver->bpop.target;
-
-    unblockClientWaitingData(receiver);
-
-    if (target == NULL) {
-        /* BRPOP/BLPOP return a multi-bulk with the name
-         * of the popped list */
-        addReplyMultiBulkLen(receiver,2);
-        addReplyBulk(receiver,key);
-        addReplyBulk(receiver,ele);
-    } else {
-        /* BRPOPLPUSH */
-        robj *dobj = lookupKeyWrite(receiver->db,target);
-        if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
-
-        addReplyBulk(receiver,ele);
-
-        if (!handleClientsWaitingListPush(receiver, target, ele)) {
-            /* Create the list if the key does not exist */
-            if (!dobj) {
-                dobj = createZiplistObject();
-                dbAdd(receiver->db, target, dobj);
+    clients = dictGetVal(de);
+    numclients = listLength(clients);
+
+    /* Try to handle the push as long as there are clients waiting for a push.
+     * Note that "numclients" is used because the list of clients waiting for a
+     * push on "key" is deleted by unblockClient() when empty.
+     *
+     * This loop will have more than 1 iteration when there is a BRPOPLPUSH
+     * that cannot push the target list because it does not contain a list. If
+     * this happens, it simply tries the next client waiting for a push. */
+    while (numclients--) {
+        ln = listFirst(clients);
+        redisAssertWithInfo(c,key,ln != NULL);
+        receiver = ln->value;
+        dstkey = receiver->bpop.target;
+
+        /* Protect receiver->bpop.target, that will be freed by
+         * the next unblockClientWaitingData() call. */
+        if (dstkey) incrRefCount(dstkey);
+
+        /* This should remove the first element of the "clients" list. */
+        unblockClientWaitingData(receiver);
+
+        if (dstkey == NULL) {
+            /* BRPOP/BLPOP */
+            addReplyMultiBulkLen(receiver,2);
+            addReplyBulk(receiver,key);
+            addReplyBulk(receiver,ele);
+            return 1; /* Serve just the first client as in B[RL]POP semantics */
+        } else {
+            /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
+            dstobj = lookupKeyWrite(receiver->db,dstkey);
+            if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
+                rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
+                decrRefCount(dstkey);
+                return 1;
             }
             }
-            listTypePush(dobj, ele, REDIS_HEAD);
+            decrRefCount(dstkey);
         }
         }
-
-        decrRefCount(target);
     }
 
     }
 
-    return 1;
+    return 0;
 }
 
 }
 
-int checkTimeout(redisClient *c, robj *object, time_t *timeout) {
-    long long lltimeout;
+int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
+    long tval;
 
 
-    if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) {
-        addReplyError(c, "timeout is not an integer");
+    if (getLongFromObjectOrReply(c,object,&tval,
+        "timeout is not an integer or out of range") != REDIS_OK)
         return REDIS_ERR;
         return REDIS_ERR;
-    }
 
 
-    if (lltimeout < 0) {
-        addReplyError(c, "timeout is negative");
+    if (tval < 0) {
+        addReplyError(c,"timeout is negative");
         return REDIS_ERR;
     }
 
         return REDIS_ERR;
     }
 
-    *timeout = lltimeout;
+    if (tval > 0) tval += time(NULL);
+    *timeout = tval;
 
     return REDIS_OK;
 }
 
     return REDIS_OK;
 }
@@ -841,9 +919,8 @@ void blockingPopGenericCommand(redisClient *c, int where) {
     time_t timeout;
     int j;
 
     time_t timeout;
     int j;
 
-    if (checkTimeout(c, c->argv[c->argc - 1], &timeout) != REDIS_OK) {
+    if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
         return;
         return;
-    }
 
     for (j = 1; j < c->argc-1; j++) {
         o = lookupKeyWrite(c->db,c->argv[j]);
 
     for (j = 1; j < c->argc-1; j++) {
         o = lookupKeyWrite(c->db,c->argv[j]);
@@ -855,6 +932,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
                 if (listTypeLength(o) != 0) {
                     /* If the list contains elements fall back to the usual
                      * non-blocking POP operation */
                 if (listTypeLength(o) != 0) {
                     /* If the list contains elements fall back to the usual
                      * non-blocking POP operation */
+                    struct redisCommand *orig_cmd;
                     robj *argv[2], **orig_argv;
                     int orig_argc;
 
                     robj *argv[2], **orig_argv;
                     int orig_argc;
 
@@ -862,6 +940,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
                      * popGenericCommand() as the command takes a single key. */
                     orig_argv = c->argv;
                     orig_argc = c->argc;
                      * popGenericCommand() as the command takes a single key. */
                     orig_argv = c->argv;
                     orig_argc = c->argc;
+                    orig_cmd = c->cmd;
                     argv[1] = c->argv[j];
                     c->argv = argv;
                     c->argc = 2;
                     argv[1] = c->argv[j];
                     c->argv = argv;
                     c->argc = 2;
@@ -879,6 +958,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
                     /* Fix the client structure with the original stuff */
                     c->argv = orig_argv;
                     c->argc = orig_argc;
                     /* Fix the client structure with the original stuff */
                     c->argv = orig_argv;
                     c->argc = orig_argc;
+                    c->cmd = orig_cmd;
 
                     return;
                 }
 
                     return;
                 }
@@ -894,7 +974,6 @@ void blockingPopGenericCommand(redisClient *c, int where) {
     }
 
     /* If the list is empty or the key does not exists we must block */
     }
 
     /* If the list is empty or the key does not exists we must block */
-    if (timeout > 0) timeout += time(NULL);
     blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
 }
 
     blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
 }
 
@@ -909,9 +988,8 @@ void brpopCommand(redisClient *c) {
 void brpoplpushCommand(redisClient *c) {
     time_t timeout;
 
 void brpoplpushCommand(redisClient *c) {
     time_t timeout;
 
-    if (checkTimeout(c, c->argv[3], &timeout) != REDIS_OK) {
+    if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
         return;
         return;
-    }
 
     robj *key = lookupKeyWrite(c->db, c->argv[1]);
 
 
     robj *key = lookupKeyWrite(c->db, c->argv[1]);
 
@@ -920,10 +998,8 @@ void brpoplpushCommand(redisClient *c) {
 
             /* Blocking against an empty list in a multi state
              * returns immediately. */
 
             /* Blocking against an empty list in a multi state
              * returns immediately. */
-            addReply(c, shared.nullmultibulk);
+            addReply(c, shared.nullbulk);
         } else {
         } else {
-            if (timeout > 0) timeout += time(NULL);
-
             /* The list is empty and the client blocks. */
             blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
         }
             /* The list is empty and the client blocks. */
             blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
         }
@@ -934,7 +1010,7 @@ void brpoplpushCommand(redisClient *c) {
 
             /* The list exists and has elements, so
              * the regular rpoplpushCommand is executed. */
 
             /* The list exists and has elements, so
              * the regular rpoplpushCommand is executed. */
-            redisAssert(listTypeLength(key) > 0);
+            redisAssertWithInfo(c,key,listTypeLength(key) > 0);
             rpoplpushCommand(c);
         }
     }
             rpoplpushCommand(c);
         }
     }