]> git.saurik.com Git - redis.git/commitdiff
Check other blocked clients when value could not be pushed
authorPieter Noordhuis <pcnoordhuis@gmail.com>
Mon, 6 Dec 2010 15:04:10 +0000 (16:04 +0100)
committerPieter Noordhuis <pcnoordhuis@gmail.com>
Mon, 6 Dec 2010 15:04:10 +0000 (16:04 +0100)
src/t_list.c
tests/unit/type/list.tcl

index 3ce0f992ff152ba6eed6584e348ef754144dc11e..866a6a3e2919524f063bd8e3ea0ea81052768aba 100644 (file)
@@ -780,35 +780,53 @@ void unblockClientWaitingData(redisClient *c) {
 int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
     struct dictEntry *de;
     redisClient *receiver;
-    list *l;
+    int numclients;
+    list *clients;
     listNode *ln;
+    robj *dstkey, *dstobj;
 
     de = dictFind(c->db->blocking_keys,key);
     if (de == NULL) return 0;
-    l = dictGetEntryVal(de);
-    ln = listFirst(l);
-    redisAssert(ln != NULL);
-    receiver = ln->value;
-
-    robj *target = receiver->bpop.target;
-
-    unblockClientWaitingData(receiver);
-
-    if (target == NULL) {
-        /* BRPOP/BLPOP return a multi-bulk with the name
-         * of the popped list */
-        addReplyMultiBulkLen(receiver,2);
-        addReplyBulk(receiver,key);
-        addReplyBulk(receiver,ele);
-    } else {
-        /* BRPOPLPUSH */
-        robj *dobj = lookupKeyWrite(receiver->db,target);
-        if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
-        rpoplpushHandlePush(receiver,target,dobj,ele);
-        decrRefCount(target);
+    clients = dictGetEntryVal(de);
+    numclients = listLength(clients);
+
+    /* Try to handle the push as long as there are clients waiting for a push.
+     * Note that "numclients" is used because the list of clients waiting for a
+     * push on "key" is deleted by unblockClient() when empty.
+     *
+     * This loop will have more than 1 iteration when there is a BRPOPLPUSH
+     * that cannot push the target list because it does not contain a list. If
+     * this happens, it simply tries the next client waiting for a push. */
+    while (numclients--) {
+        ln = listFirst(clients);
+        redisAssert(ln != NULL);
+        receiver = ln->value;
+        dstkey = receiver->bpop.target;
+
+        /* This should remove the first element of the "clients" list. */
+        unblockClientWaitingData(receiver);
+        redisAssert(ln != listFirst(clients));
+
+        if (dstkey == NULL) {
+            /* BRPOP/BLPOP */
+            addReplyMultiBulkLen(receiver,2);
+            addReplyBulk(receiver,key);
+            addReplyBulk(receiver,ele);
+            return 1;
+        } else {
+            /* BRPOPLPUSH */
+            dstobj = lookupKeyWrite(receiver->db,dstkey);
+            if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
+                decrRefCount(dstkey);
+            } else {
+                rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
+                decrRefCount(dstkey);
+                return 1;
+            }
+        }
     }
 
-    return 1;
+    return 0;
 }
 
 int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
index 3616443344468d2ec998556ebae8812663c9c4ac..8ac128c5957bace0ad9b6cdbcee4e7790310b302 100644 (file)
@@ -191,6 +191,20 @@ start_server {
         assert_equal {foo} [r lrange blist 0 -1]
     }
 
+    test "BRPOPLPUSH with multiple blocked clients" {
+        set rd1 [redis_deferring_client]
+        set rd2 [redis_deferring_client]
+        r del blist target1 target2
+        r set target1 nolist
+        $rd1 brpoplpush blist target1 0
+        $rd2 brpoplpush blist target2 0
+        r lpush blist foo
+
+        assert_error "ERR*wrong kind*" {$rd1 read}
+        assert_equal {foo} [$rd2 read]
+        assert_equal {foo} [r lrange target2 0 -1]
+    }
+
     test "linked BRPOPLPUSH" {
       set rd1 [redis_deferring_client]
       set rd2 [redis_deferring_client]