]> git.saurik.com Git - redis.git/commitdiff
A reimplementation of blocking operation internals.
authorantirez <antirez@gmail.com>
Tue, 4 Sep 2012 08:37:49 +0000 (10:37 +0200)
committerantirez <antirez@gmail.com>
Mon, 17 Sep 2012 08:26:50 +0000 (10:26 +0200)
Redis provides support for blocking operations such as BLPOP or BRPOP.
This operations are identical to normal LPOP and RPOP operations as long
as there are elements in the target list, but if the list is empty they
block waiting for new data to arrive to the list.

All the clients blocked waiting for th same list are served in a FIFO
way, so the first that blocked is the first to be served when there is
more data pushed by another client into the list.

The previous implementation of blocking operations was conceived to
serve clients in the context of push operations. For for instance:

1) There is a client "A" blocked on list "foo".
2) The client "B" performs `LPUSH foo somevalue`.
3) The client "A" is served in the context of the "B" LPUSH,
synchronously.

Processing things in a synchronous way was useful as if "A" pushes a
value that is served by "B", from the point of view of the database is a
NOP (no operation) thing, that is, nothing is replicated, nothing is
written in the AOF file, and so forth.

However later we implemented two things:

1) Variadic LPUSH that could add multiple values to a list in the
context of a single call.
2) BRPOPLPUSH that was a version of BRPOP that also provided a "PUSH"
side effect when receiving data.

This forced us to make the synchronous implementation more complex. If
client "B" is waiting for data, and "A" pushes three elemnents in a
single call, we needed to propagate an LPUSH with a missing argument
in the AOF and replication link. We also needed to make sure to
replicate the LPUSH side of BRPOPLPUSH, but only if in turn did not
happened to serve another blocking client into another list ;)

This were complex but with a few of mutually recursive functions
everything worked as expected... until one day we introduced scripting
in Redis.

Scripting + synchronous blocking operations = Issue #614.

Basically you can't "rewrite" a script to have just a partial effect on
the replicas and AOF file if the script happened to serve a few blocked
clients.

The solution to all this problems, implemented by this commit, is to
change the way we serve blocked clients. Instead of serving the blocked
clients synchronously, in the context of the command performing the PUSH
operation, it is now an asynchronous and iterative process:

1) If a key that has clients blocked waiting for data is the subject of
a list push operation, We simply mark keys as "ready" and put it into a
queue.
2) Every command pushing stuff on lists, as a variadic LPUSH, a script,
or whatever it is, is replicated verbatim without any rewriting.
3) Every time a Redis command, a MULTI/EXEC block, or a script,
completed its execution, we run the list of keys ready to serve blocked
clients (as more data arrived), and process this list serving the
blocked clients.
4) As a result of "3" maybe more keys are ready again for other clients
(as a result of BRPOPLPUSH we may have push operations), so we iterate
back to step "3" if it's needed.

The new code has a much simpler semantics, and a simpler to understand
implementation, with the disadvantage of not being able to "optmize out"
a PUSH+BPOP as a No OP.

This commit will be tested with care before the final merge, more tests
will be added likely.

src/redis.c
src/redis.h
src/t_list.c
tests/integration/replication.tcl
tests/unit/scripting.tcl
tests/unit/type/list.tcl

index 5f1c48008d0e23c99f9901e00b212d766cd2f0f1..3d37b6d30a0e3068b06fa281efbe4994327f6014 100644 (file)
@@ -1046,6 +1046,7 @@ void createSharedObjects(void) {
     shared.del = createStringObject("DEL",3);
     shared.rpop = createStringObject("RPOP",4);
     shared.lpop = createStringObject("LPOP",4);
+    shared.lpush = createStringObject("LPUSH",5);
     for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
         shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
         shared.integers[j]->encoding = REDIS_ENCODING_INT;
@@ -1164,6 +1165,8 @@ void initServerConfig() {
     server.delCommand = lookupCommandByCString("del");
     server.multiCommand = lookupCommandByCString("multi");
     server.lpushCommand = lookupCommandByCString("lpush");
+    server.lpopCommand = lookupCommandByCString("lpop");
+    server.rpopCommand = lookupCommandByCString("rpop");
     
     /* Slow log */
     server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -1239,6 +1242,7 @@ void initServer() {
     server.slaves = listCreate();
     server.monitors = listCreate();
     server.unblocked_clients = listCreate();
+    server.ready_keys = listCreate();
 
     createSharedObjects();
     adjustOpenFilesLimit();
@@ -1269,6 +1273,7 @@ void initServer() {
         server.db[j].dict = dictCreate(&dbDictType,NULL);
         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
+        server.db[j].ready_keys = dictCreate(&setDictType,NULL);
         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
         server.db[j].id = j;
     }
@@ -1635,6 +1640,8 @@ int processCommand(redisClient *c) {
         addReply(c,shared.queued);
     } else {
         call(c,REDIS_CALL_FULL);
+        if (listLength(server.ready_keys))
+            handleClientsBlockedOnLists();
     }
     return REDIS_OK;
 }
index 1e1590830e3ef8f0fa5e55f9f933b2a242015bfb..04b9c319344844f6a784bb843bb48d3b1a681f1d 100644 (file)
@@ -295,6 +295,7 @@ typedef struct redisDb {
     dict *dict;                 /* The keyspace for this DB */
     dict *expires;              /* Timeout of keys with a timeout set */
     dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */
+    dict *ready_keys;           /* Blocked keys that received a PUSH */
     dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
     int id;
 } redisDb;
@@ -321,6 +322,22 @@ typedef struct blockingState {
                              * for BRPOPLPUSH. */
 } blockingState;
 
+/* The following structure represents a node in the server.ready_keys list,
+ * where we accumulate all the keys that had clients blocked with a blocking
+ * operation such as B[LR]POP, but received new data in the context of the
+ * last executed command.
+ *
+ * After the execution of every command or script, we run this list to check
+ * if as a result we should serve data to clients blocked, unblocking them.
+ * Note that server.ready_keys will not have duplicates as there dictionary
+ * also called ready_keys in every structure representing a Redis database,
+ * where we make sure to remember if a given key was already added in the
+ * server.ready_keys list. */
+typedef struct readyList {
+    redisDb *db;
+    robj *key;
+} readyList;
+
 /* With multiplexing we need to take per-clinet state.
  * Clients are taken in a liked list. */
 typedef struct redisClient {
@@ -375,6 +392,7 @@ struct sharedObjectsStruct {
     *masterdownerr, *roslaveerr,
     *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
     *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
+    *lpush,
     *select[REDIS_SHARED_SELECT_CMDS],
     *integers[REDIS_SHARED_INTEGERS],
     *mbulkhdr[REDIS_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@@ -469,7 +487,8 @@ struct redisServer {
     off_t loading_loaded_bytes;
     time_t loading_start_time;
     /* Fast pointers to often looked up command */
-    struct redisCommand *delCommand, *multiCommand, *lpushCommand;
+    struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
+                        *rpopCommand;
     /* Fields used only for stats */
     time_t stat_starttime;          /* Server start time */
     long long stat_numcommands;     /* Number of processed commands */
@@ -568,6 +587,7 @@ struct redisServer {
     /* Blocked clients */
     unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
     list *unblocked_clients; /* list of clients to unblock before next loop */
+    list *ready_keys;        /* List of readyList structures for BLPOP & co */
     /* Sort parameters - qsort_r() is only available under BSD so we
      * have to take this state global, in order to pass it to sortCompare() */
     int sort_desc;
@@ -777,7 +797,7 @@ int listTypeEqual(listTypeEntry *entry, robj *o);
 void listTypeDelete(listTypeEntry *entry);
 void listTypeConvert(robj *subject, int enc);
 void unblockClientWaitingData(redisClient *c);
-int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
+void handleClientsBlockedOnLists(void);
 void popGenericCommand(redisClient *c, int where);
 
 /* MULTI/EXEC/WATCH... */
index ca03916b953fd1eb91d9e80b506b6646d97e45f4..77e40eb6b215dbe69036bc7ac00595e688072b4d 100644 (file)
@@ -1,5 +1,7 @@
 #include "redis.h"
 
+void signalListAsReady(redisClient *c, robj *key);
+
 /*-----------------------------------------------------------------------------
  * List API
  *----------------------------------------------------------------------------*/
@@ -14,6 +16,11 @@ void listTypeTryConversion(robj *subject, robj *value) {
             listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
 }
 
+/* The function pushes an elmenet to the specified list object 'subject',
+ * at head or tail position as specified by 'where'.
+ *
+ * There is no need for the caller to incremnet the refcount of 'value' as
+ * the function takes care of it if needed. */
 void listTypePush(robj *subject, robj *value, int where) {
     /* Check if we need to convert the ziplist */
     listTypeTryConversion(subject,value);
@@ -268,16 +275,10 @@ void pushGenericCommand(redisClient *c, int where) {
         return;
     }
 
+    if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
+
     for (j = 2; j < c->argc; j++) {
         c->argv[j] = tryObjectEncoding(c->argv[j]);
-        if (may_have_waiting_clients) {
-            if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
-                waiting++;
-                continue;
-            } else {
-                may_have_waiting_clients = 0;
-            }
-        }
         if (!lobj) {
             lobj = createZiplistObject();
             dbAdd(c->db,c->argv[1],lobj);
@@ -288,18 +289,6 @@ void pushGenericCommand(redisClient *c, int where) {
     addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
     if (pushed) signalModifiedKey(c->db,c->argv[1]);
     server.dirty += pushed;
-
-    /* Alter the replication of the command accordingly to the number of
-     * list elements delivered to clients waiting into a blocking operation.
-     * We do that only if there were waiting clients, and only if still some
-     * element was pushed into the list (othewise dirty is 0 and nothign will
-     * be propagated). */
-    if (waiting && pushed) {
-        /* CMD KEY a b C D E */
-        for (j = 0; j < waiting; j++) decrRefCount(c->argv[j+2]);
-        memmove(c->argv+2,c->argv+2+waiting,sizeof(robj*)*pushed);
-        c->argc -= waiting;
-    }
 }
 
 void lpushCommand(redisClient *c) {
@@ -666,29 +655,15 @@ void lremCommand(redisClient *c) {
  * as well. This command was originally proposed by Ezra Zygmuntowicz.
  */
 
-void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
-    if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
-        /* Create the list if the key does not exist */
-        if (!dstobj) {
-            dstobj = createZiplistObject();
-            dbAdd(c->db,dstkey,dstobj);
-        } else {
-            signalModifiedKey(c->db,dstkey);
-        }
-        listTypePush(dstobj,value,REDIS_HEAD);
-        /* Additionally propagate this PUSH operation together with
-         * the operation performed by the command. */
-        {
-            robj **argv = zmalloc(sizeof(robj*)*3);
-            argv[0] = createStringObject("LPUSH",5);
-            argv[1] = dstkey;
-            argv[2] = value;
-            incrRefCount(argv[1]);
-            incrRefCount(argv[2]);
-            alsoPropagate(server.lpushCommand,c->db->id,argv,3,
-                          REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
-        }
+void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+    /* Create the list if the key does not exist */
+    if (!dstobj) {
+        dstobj = createZiplistObject();
+        dbAdd(c->db,dstkey,dstobj);
+        signalListAsReady(c,dstkey);
     }
+    signalModifiedKey(c->db,dstkey);
+    listTypePush(dstobj,value,REDIS_HEAD);
     /* Always send the pushed value to the client. */
     addReplyBulk(c,value);
 }
@@ -709,9 +684,10 @@ void rpoplpushCommand(redisClient *c) {
         if (dobj && checkType(c,dobj,REDIS_LIST)) return;
         value = listTypePop(sobj,REDIS_TAIL);
         /* We saved touched key, and protect it, since rpoplpushHandlePush
-         * may change the client command argument vector. */
+         * may change the client command argument vector (it does not
+         * currently). */
         incrRefCount(touchedkey);
-        rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
+        rpoplpushHandlePush(c,c->argv[2],dobj,value);
 
         /* listTypePop returns an object with its refcount incremented */
         decrRefCount(value);
@@ -721,13 +697,6 @@ void rpoplpushCommand(redisClient *c) {
         signalModifiedKey(c->db,touchedkey);
         decrRefCount(touchedkey);
         server.dirty++;
-
-        /* Replicate this as a simple RPOP since the LPUSH side is replicated
-         * by rpoplpushHandlePush() call if needed (it may not be needed
-         * if a client is blocking wait a push against the list). */
-        rewriteClientCommandVector(c,2,
-            resetRefCount(createStringObject("RPOP",4)),
-            c->argv[1]);
     }
 }
 
@@ -735,20 +704,10 @@ void rpoplpushCommand(redisClient *c) {
  * Blocking POP operations
  *----------------------------------------------------------------------------*/
 
-/* Currently Redis blocking operations support is limited to list POP ops,
- * so the current implementation is not fully generic, but it is also not
- * completely specific so it will not require a rewrite to support new
- * kind of blocking operations in the future.
- *
- * Still it's important to note that list blocking operations can be already
- * used as a notification mechanism in order to implement other blocking
- * operations at application level, so there must be a very strong evidence
- * of usefulness and generality before new blocking operations are implemented.
- *
- * This is how the current blocking POP works, we use BLPOP as example:
+/* This is how the current blocking POP works, we use BLPOP as example:
  * - If the user calls BLPOP and the key exists and contains a non empty list
  *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
- *   if there is not to block.
+ *   if blocking is not required.
  * - If instead BLPOP is called and the key does not exists or the list is
  *   empty we need to block. In order to do so we remove the notification for
  *   new data to read in the client socket (so that we'll not serve new
@@ -756,12 +715,10 @@ void rpoplpushCommand(redisClient *c) {
  *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
  *   blocking for this keys.
  * - If a PUSH operation against a key with blocked clients waiting is
- *   performed, we serve the first in the list: basically instead to push
- *   the new element inside the list we return it to the (first / oldest)
- *   blocking client, unblock the client, and remove it form the list.
- *
- * The above comment and the source code should be enough in order to understand
- * the implementation and modify / fix it later.
+ *   performed, we mark this key as "ready", and after the current command,
+ *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
+ *   for this list, from the one that blocked first, to the last, accordingly
+ *   to the number of elements we have in the ready list.
  */
 
 /* Set a client in blocking mode for the specified key, with the specified
@@ -836,68 +793,192 @@ void unblockClientWaitingData(redisClient *c) {
     listAddNodeTail(server.unblocked_clients,c);
 }
 
-/* This should be called from any function PUSHing into lists.
- * 'c' is the "pushing client", 'key' is the key it is pushing data against,
- * 'ele' is the element pushed.
+/* If the specified key has clients blocked waiting for list pushes, this
+ * function will put the key reference into the server.ready_keys list.
+ * Note that db->ready_keys is an hash table that allows us to avoid putting
+ * the same key agains and again in the list in case of multiple pushes
+ * made by a script or in the context of MULTI/EXEC.
  *
- * If the function returns 0 there was no client waiting for a list push
- * against this key.
+ * The list will be finally processed by handleClientsBlockedOnLists() */
+void signalListAsReady(redisClient *c, robj *key) {
+    readyList *rl;
+
+    /* No clients blocking for this key? No need to queue it. */
+    if (dictFind(c->db->blocking_keys,key) == NULL) return;
+
+    /* Key was already signaled? No need to queue it again. */
+    if (dictFind(c->db->ready_keys,key) != NULL) return;
+
+    /* Ok, we need to queue this key into server.ready_keys. */
+    rl = zmalloc(sizeof(*rl));
+    rl->key = key;
+    rl->db = c->db;
+    incrRefCount(key);
+    listAddNodeTail(server.ready_keys,rl);
+
+    /* We also add the key in the db->ready_keys dictionary in order
+     * to avoid adding it multiple times into a list with a simple O(1)
+     * check. */
+    incrRefCount(key);
+    redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
+}
+
+/* This is an helper function for handleClientsBlockedOnLists(). It's work
+ * is to serve a specific client (receiver) that is blocked on 'key'
+ * in the context of the specified 'db', doing the following:
  *
- * If the function returns 1 there was a client waiting for a list push
- * against this key, the element was passed to this client thus it's not
- * needed to actually add it to the list and the caller should return asap. */
-int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
-    struct dictEntry *de;
-    redisClient *receiver;
-    int numclients;
-    list *clients;
-    listNode *ln;
-    robj *dstkey, *dstobj;
-
-    de = dictFind(c->db->blocking_keys,key);
-    if (de == NULL) return 0;
-    clients = dictGetVal(de);
-    numclients = listLength(clients);
-
-    /* Try to handle the push as long as there are clients waiting for a push.
-     * Note that "numclients" is used because the list of clients waiting for a
-     * push on "key" is deleted by unblockClient() when empty.
-     *
-     * This loop will have more than 1 iteration when there is a BRPOPLPUSH
-     * that cannot push the target list because it does not contain a list. If
-     * this happens, it simply tries the next client waiting for a push. */
-    while (numclients--) {
-        ln = listFirst(clients);
-        redisAssertWithInfo(c,key,ln != NULL);
-        receiver = ln->value;
-        dstkey = receiver->bpop.target;
-
-        /* Protect receiver->bpop.target, that will be freed by
-         * the next unblockClientWaitingData() call. */
-        if (dstkey) incrRefCount(dstkey);
-
-        /* This should remove the first element of the "clients" list. */
-        unblockClientWaitingData(receiver);
-
-        if (dstkey == NULL) {
-            /* BRPOP/BLPOP */
-            addReplyMultiBulkLen(receiver,2);
-            addReplyBulk(receiver,key);
-            addReplyBulk(receiver,ele);
-            return 1; /* Serve just the first client as in B[RL]POP semantics */
+ * 1) Provide the client with the 'value' element.
+ * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
+ *    'value' element on the destionation list (the LPUSH side of the command).
+ * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
+ *    the AOF and replication channel.
+ *
+ * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
+ * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
+ * we can propagate the command properly.
+ *
+ * The function returns REDIS_OK if we are able to serve the client, otherwise
+ * REDIS_ERR is returned to signal the caller that the list POP operation
+ * should be undoed as the client was not served: This only happens for
+ * BRPOPLPUSH that fails to push the value to the destination key as it is
+ * of the wrong type. */
+int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
+{
+    robj *argv[3];
+
+    if (dstkey == NULL) {
+        /* Propagate the [LR]POP operation. */
+        argv[0] = (where == REDIS_HEAD) ? shared.lpop :
+                                          shared.rpop;
+        argv[1] = key;
+        propagate((where == REDIS_HEAD) ?
+            server.lpopCommand : server.rpopCommand,
+            db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
+
+        /* BRPOP/BLPOP */
+        addReplyMultiBulkLen(receiver,2);
+        addReplyBulk(receiver,key);
+        addReplyBulk(receiver,value);
+    } else {
+        /* BRPOPLPUSH */
+        robj *dstobj =
+            lookupKeyWrite(receiver->db,dstkey);
+        if (!(dstobj &&
+             checkType(receiver,dstobj,REDIS_LIST)))
+        {
+            /* Propagate the RPOP operation. */
+            argv[0] = shared.rpop;
+            argv[1] = key;
+            propagate(server.rpopCommand,
+                db->id,argv,2,
+                REDIS_PROPAGATE_AOF|
+                REDIS_PROPAGATE_REPL);
+            rpoplpushHandlePush(receiver,dstkey,dstobj,
+                value);
+            /* Propagate the LPUSH operation. */
+            argv[0] = shared.lpush;
+            argv[1] = dstkey;
+            argv[2] = value;
+            propagate(server.lpushCommand,
+                db->id,argv,3,
+                REDIS_PROPAGATE_AOF|
+                REDIS_PROPAGATE_REPL);
         } else {
-            /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
-            dstobj = lookupKeyWrite(receiver->db,dstkey);
-            if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
-                rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
-                decrRefCount(dstkey);
-                return 1;
-            }
-            decrRefCount(dstkey);
+            /* BRPOPLPUSH failed because of wrong
+             * destination type. */
+            return REDIS_ERR;
         }
     }
+    return REDIS_OK;
+}
 
-    return 0;
+/* This function should be called by Redis every time a single command,
+ * a MULTI/EXEC block, or a Lua script, terminated its execution after
+ * being called by a client.
+ *
+ * All the keys with at least one client blocked that received at least
+ * one new element via some PUSH operation are accumulated into
+ * the server.ready_keys list. This function will run the list and will
+ * serve clients accordingly. Note that the function will iterate again and
+ * again as a result of serving BRPOPLPUSH we can have new blocking clients
+ * to serve because of the PUSH side of BRPOPLPUSH. */
+void handleClientsBlockedOnLists(void) {
+    while(listLength(server.ready_keys) != 0) {
+        list *l;
+
+        /* Point server.ready_keys to a fresh list and save the current one
+         * locally. This way as we run the old list we are free to call
+         * signalListAsReady() that may push new elements in server.ready_keys
+         * when handling clients blocked into BRPOPLPUSH. */
+        l = server.ready_keys;
+        server.ready_keys = listCreate();
+
+        while(listLength(l) != 0) {
+            listNode *ln = listFirst(l);
+            readyList *rl = ln->value;
+
+            /* First of all remove this key from db->ready_keys so that
+             * we can safely call signalListAsReady() against this key. */
+            dictDelete(rl->db->ready_keys,rl->key);
+
+            /* If the key exists and it's a list, serve blocked clients
+             * with data. */
+            robj *o = lookupKeyWrite(rl->db,rl->key);
+            if (o != NULL && o->type == REDIS_LIST) {
+                dictEntry *de;
+
+                /* We serve clients in the same order they blocked for
+                 * this key, from the first blocked to the last. */
+                de = dictFind(rl->db->blocking_keys,rl->key);
+                if (de) {
+                    list *clients = dictGetVal(de);
+                    int numclients = listLength(clients);
+
+                    while(numclients--) {
+                        listNode *clientnode = listFirst(clients);
+                        redisClient *receiver = clientnode->value;
+                        robj *dstkey = receiver->bpop.target;
+                        int where = (receiver->lastcmd &&
+                                     receiver->lastcmd->proc == blpopCommand) ?
+                                    REDIS_HEAD : REDIS_TAIL;
+                        robj *value = listTypePop(o,where);
+
+                        if (value) {
+                            /* Protect receiver->bpop.target, that will be
+                             * freed by the next unblockClientWaitingData()
+                             * call. */
+                            if (dstkey) incrRefCount(dstkey);
+                            unblockClientWaitingData(receiver);
+
+                            if (serveClientBlockedOnList(receiver,
+                                rl->key,dstkey,rl->db,value,
+                                where) == REDIS_ERR)
+                            {
+                                /* If we failed serving the client we need
+                                 * to also undo the POP operation. */
+                                    listTypePush(o,value,where);
+                            }
+
+                            if (dstkey) decrRefCount(dstkey);
+                            decrRefCount(value);
+                        } else {
+                            break;
+                        }
+                    }
+                }
+                
+                if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
+                /* We don't call signalModifiedKey() as it was already called
+                 * when an element was pushed on the list. */
+            }
+
+            /* Free this item. */
+            decrRefCount(rl->key);
+            zfree(rl);
+            listDelNode(l,ln);
+        }
+        listRelease(l); /* We have the new list on place at this point. */
+    }
 }
 
 int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
@@ -986,7 +1067,6 @@ void brpoplpushCommand(redisClient *c) {
 
     if (key == NULL) {
         if (c->flags & REDIS_MULTI) {
-
             /* Blocking against an empty list in a multi state
              * returns immediately. */
             addReply(c, shared.nullbulk);
@@ -998,7 +1078,6 @@ void brpoplpushCommand(redisClient *c) {
         if (key->type != REDIS_LIST) {
             addReply(c, shared.wrongtypeerr);
         } else {
-
             /* The list exists and has elements, so
              * the regular rpoplpushCommand is executed. */
             redisAssertWithInfo(c,key,listTypeLength(key) > 0);
index 18e639d41f6bab6b40c696143211645f0896db07..da94b088065034797e010f87dfe7b5f3e737cd8f 100644 (file)
@@ -61,9 +61,13 @@ start_server {tags {"repl"}} {
         
         test {SET on the master should immediately propagate} {
             r -1 set mykey bar
-            if {$::valgrind} {after 2000}
-            r  0 get mykey
-        } {bar}
+
+            wait_for_condition 500 100 {
+                [r  0 get mykey] eq {bar}
+            } else {
+                fail "SET on master did not propagated on slave"
+            }
+        }
 
         test {FLUSHALL should replicate} {
             r -1 flushall
index 0579187166a7e1e89d5b1f060fa399ed51bf9d5f..6dbdb6b63d7df755ec74961a46657dd8909da5bf 100644 (file)
@@ -344,5 +344,22 @@ start_server {tags {"scripting repl"}} {
                 fail "Expected 2 in x, but value is '[r -1 get x]'"
             }
         }
+
+        test {Replication of script multiple pushes to list with BLPOP} {
+            set rd [redis_deferring_client]
+            $rd brpop a 0
+            r eval {
+                redis.call("lpush","a","1");
+                redis.call("lpush","a","2");
+            } 0
+            set res [$rd read]
+            $rd close
+            wait_for_condition 50 100 {
+                [r -1 lrange a 0 -1] eq [r lrange a 0 -1]
+            } else {
+                fail "Expected list 'a' in slave and master to be the same, but they are respectively '[r -1 lrange a 0 -1]' and '[r lrange a 0 -1]'"
+            }
+            set res
+        } {a 1}
     }
 }
index 85dde5690a490093ccd3369418f6d6637e596bbc..8f598a4ab3f9402877b813368904c2a2dee9b7d2 100644 (file)
@@ -161,6 +161,47 @@ start_server {
         }
     }
 
+    test "BLPOP, LPUSH + DEL should not awake blocked client" {
+        set rd [redis_deferring_client]
+        r del list
+
+        $rd blpop list 0
+        r multi
+        r lpush list a
+        r del list
+        r exec
+        r del list
+        r lpush list b
+        $rd read
+    } {list b}
+
+    test "BLPOP, LPUSH + DEL + SET should not awake blocked client" {
+        set rd [redis_deferring_client]
+        r del list
+
+        $rd blpop list 0
+        r multi
+        r lpush list a
+        r del list
+        r set list foo
+        r exec
+        r del list
+        r lpush list b
+        $rd read
+    } {list b}
+
+    test "MULTI/EXEC is isolated from the point of view of BLPOP" {
+        set rd [redis_deferring_client]
+        r del list
+        $rd blpop list 0
+        r multi
+        r lpush list a
+        r lpush list b
+        r lpush list c
+        r exec
+        $rd read
+    } {list c}
+
     test "BLPOP with variadic LPUSH" {
         set rd [redis_deferring_client]
         r del blist target
@@ -169,8 +210,8 @@ start_server {
         if {$::valgrind} {after 100}
         assert_equal 2 [r lpush blist foo bar]
         if {$::valgrind} {after 100}
-        assert_equal {blist foo} [$rd read]
-        assert_equal bar [lindex [r lrange blist 0 -1] 0]
+        assert_equal {blist bar} [$rd read]
+        assert_equal foo [lindex [r lrange blist 0 -1] 0]
     }
 
     test "BRPOPLPUSH with zero timeout should block indefinitely" {
@@ -222,6 +263,16 @@ start_server {
         assert_equal {foo} [r lrange blist 0 -1]
     }
 
+    test "BRPOPLPUSH maintains order of elements after failure" {
+        set rd [redis_deferring_client]
+        r del blist target
+        r set target nolist
+        $rd brpoplpush blist target 0
+        r rpush blist a b c
+        assert_error "ERR*wrong kind*" {$rd read}
+        r lrange blist 0 -1
+    } {a b c}
+
     test "BRPOPLPUSH with multiple blocked clients" {
         set rd1 [redis_deferring_client]
         set rd2 [redis_deferring_client]
@@ -293,6 +344,41 @@ start_server {
         r exec
     } {foo bar {} {} {bar foo}}
 
+    test "PUSH resulting from BRPOPLPUSH affect WATCH" {
+        set blocked_client [redis_deferring_client]
+        set watching_client [redis_deferring_client]
+        r del srclist dstlist somekey
+        r set somekey somevalue
+        $blocked_client brpoplpush srclist dstlist 0
+        $watching_client watch dstlist
+        $watching_client read
+        $watching_client multi
+        $watching_client read
+        $watching_client get somekey
+        $watching_client read
+        r lpush srclist element
+        $watching_client exec
+        $watching_client read
+    } {}
+
+    test "BRPOPLPUSH does not affect WATCH while still blocked" {
+        set blocked_client [redis_deferring_client]
+        set watching_client [redis_deferring_client]
+        r del srclist dstlist somekey
+        r set somekey somevalue
+        $blocked_client brpoplpush srclist dstlist 0
+        $watching_client watch dstlist
+        $watching_client read
+        $watching_client multi
+        $watching_client read
+        $watching_client get somekey
+        $watching_client read
+        $watching_client exec
+        # Blocked BLPOPLPUSH may create problems, unblock it.
+        r lpush srclist element
+        $watching_client read
+    } {somevalue}
+
     test {BRPOPLPUSH timeout} {
       set rd [redis_deferring_client]