int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
struct dictEntry *de;
redisClient *receiver;
- list *l;
+ int numclients;
+ list *clients;
listNode *ln;
+ robj *dstkey, *dstobj;
de = dictFind(c->db->blocking_keys,key);
if (de == NULL) return 0;
- l = dictGetEntryVal(de);
- ln = listFirst(l);
- redisAssert(ln != NULL);
- receiver = ln->value;
-
- robj *target = receiver->bpop.target;
-
- unblockClientWaitingData(receiver);
-
- if (target == NULL) {
- /* BRPOP/BLPOP return a multi-bulk with the name
- * of the popped list */
- addReplyMultiBulkLen(receiver,2);
- addReplyBulk(receiver,key);
- addReplyBulk(receiver,ele);
- } else {
- /* BRPOPLPUSH */
- robj *dobj = lookupKeyWrite(receiver->db,target);
- if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0;
- rpoplpushHandlePush(receiver,target,dobj,ele);
- decrRefCount(target);
+ clients = dictGetEntryVal(de);
+ numclients = listLength(clients);
+
+ /* Try to handle the push as long as there are clients waiting for a push.
+ * Note that "numclients" is used because the list of clients waiting for a
+ * push on "key" is deleted by unblockClient() when empty.
+ *
+ * This loop will have more than 1 iteration when there is a BRPOPLPUSH
+ * that cannot push the target list because it does not contain a list. If
+ * this happens, it simply tries the next client waiting for a push. */
+ while (numclients--) {
+ ln = listFirst(clients);
+ redisAssert(ln != NULL);
+ receiver = ln->value;
+ dstkey = receiver->bpop.target;
+
+ /* This should remove the first element of the "clients" list. */
+ unblockClientWaitingData(receiver);
+ redisAssert(ln != listFirst(clients));
+
+ if (dstkey == NULL) {
+ /* BRPOP/BLPOP */
+ addReplyMultiBulkLen(receiver,2);
+ addReplyBulk(receiver,key);
+ addReplyBulk(receiver,ele);
+ return 1;
+ } else {
+ /* BRPOPLPUSH */
+ dstobj = lookupKeyWrite(receiver->db,dstkey);
+ if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
+ decrRefCount(dstkey);
+ } else {
+ rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
+ decrRefCount(dstkey);
+ return 1;
+ }
+ }
}
- return 1;
+ return 0;
}
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {