+ * 1) Provide the client with the 'value' element.
+ * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
+ * 'value' element on the destionation list (the LPUSH side of the command).
+ * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
+ * the AOF and replication channel.
+ *
+ * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
+ * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
+ * we can propagate the command properly.
+ *
+ * The function returns REDIS_OK if we are able to serve the client, otherwise
+ * REDIS_ERR is returned to signal the caller that the list POP operation
+ * should be undoed as the client was not served: This only happens for
+ * BRPOPLPUSH that fails to push the value to the destination key as it is
+ * of the wrong type. */
+int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
+{
+ robj *argv[3];
+
+ if (dstkey == NULL) {
+ /* Propagate the [LR]POP operation. */
+ argv[0] = (where == REDIS_HEAD) ? shared.lpop :
+ shared.rpop;
+ argv[1] = key;
+ propagate((where == REDIS_HEAD) ?
+ server.lpopCommand : server.rpopCommand,
+ db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
+
+ /* BRPOP/BLPOP */
+ addReplyMultiBulkLen(receiver,2);
+ addReplyBulk(receiver,key);
+ addReplyBulk(receiver,value);
+ } else {
+ /* BRPOPLPUSH */
+ robj *dstobj =
+ lookupKeyWrite(receiver->db,dstkey);
+ if (!(dstobj &&
+ checkType(receiver,dstobj,REDIS_LIST)))
+ {
+ /* Propagate the RPOP operation. */
+ argv[0] = shared.rpop;
+ argv[1] = key;
+ propagate(server.rpopCommand,
+ db->id,argv,2,
+ REDIS_PROPAGATE_AOF|
+ REDIS_PROPAGATE_REPL);
+ rpoplpushHandlePush(receiver,dstkey,dstobj,
+ value);
+ /* Propagate the LPUSH operation. */
+ argv[0] = shared.lpush;
+ argv[1] = dstkey;
+ argv[2] = value;
+ propagate(server.lpushCommand,
+ db->id,argv,3,
+ REDIS_PROPAGATE_AOF|
+ REDIS_PROPAGATE_REPL);
+ } else {
+ /* BRPOPLPUSH failed because of wrong
+ * destination type. */
+ return REDIS_ERR;
+ }
+ }
+ return REDIS_OK;
+}
+
+/* This function should be called by Redis every time a single command,
+ * a MULTI/EXEC block, or a Lua script, terminated its execution after
+ * being called by a client.
+ *
+ * All the keys with at least one client blocked that received at least
+ * one new element via some PUSH operation are accumulated into
+ * the server.ready_keys list. This function will run the list and will
+ * serve clients accordingly. Note that the function will iterate again and
+ * again as a result of serving BRPOPLPUSH we can have new blocking clients
+ * to serve because of the PUSH side of BRPOPLPUSH. */
+void handleClientsBlockedOnLists(void) {
+ while(listLength(server.ready_keys) != 0) {
+ list *l;
+
+ /* Point server.ready_keys to a fresh list and save the current one
+ * locally. This way as we run the old list we are free to call
+ * signalListAsReady() that may push new elements in server.ready_keys
+ * when handling clients blocked into BRPOPLPUSH. */
+ l = server.ready_keys;
+ server.ready_keys = listCreate();
+
+ while(listLength(l) != 0) {
+ listNode *ln = listFirst(l);
+ readyList *rl = ln->value;
+
+ /* First of all remove this key from db->ready_keys so that
+ * we can safely call signalListAsReady() against this key. */
+ dictDelete(rl->db->ready_keys,rl->key);
+
+ /* If the key exists and it's a list, serve blocked clients
+ * with data. */
+ robj *o = lookupKeyWrite(rl->db,rl->key);
+ if (o != NULL && o->type == REDIS_LIST) {
+ dictEntry *de;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+
+ while(numclients--) {
+ listNode *clientnode = listFirst(clients);
+ redisClient *receiver = clientnode->value;
+ robj *dstkey = receiver->bpop.target;
+ int where = (receiver->lastcmd &&
+ receiver->lastcmd->proc == blpopCommand) ?
+ REDIS_HEAD : REDIS_TAIL;
+ robj *value = listTypePop(o,where);
+
+ if (value) {
+ /* Protect receiver->bpop.target, that will be
+ * freed by the next unblockClientWaitingData()
+ * call. */
+ if (dstkey) incrRefCount(dstkey);
+ unblockClientWaitingData(receiver);
+
+ if (serveClientBlockedOnList(receiver,
+ rl->key,dstkey,rl->db,value,
+ where) == REDIS_ERR)
+ {
+ /* If we failed serving the client we need
+ * to also undo the POP operation. */
+ listTypePush(o,value,where);
+ }
+
+ if (dstkey) decrRefCount(dstkey);
+ decrRefCount(value);
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
+ /* We don't call signalModifiedKey() as it was already called
+ * when an element was pushed on the list. */
+ }
+
+ /* Free this item. */
+ decrRefCount(rl->key);
+ zfree(rl);
+ listDelNode(l,ln);
+ }
+ listRelease(l); /* We have the new list on place at this point. */
+ }
+}