3 /*-----------------------------------------------------------------------------
5 *----------------------------------------------------------------------------*/
7 /* Check the argument length to see if it requires us to convert the ziplist
8 * to a real list. Only check raw-encoded objects because integer encoded
9 * objects are never too long. */
10 void listTypeTryConversion(robj
*subject
, robj
*value
) {
11 if (subject
->encoding
!= REDIS_ENCODING_ZIPLIST
) return;
12 if (value
->encoding
== REDIS_ENCODING_RAW
&&
13 sdslen(value
->ptr
) > server
.list_max_ziplist_value
)
14 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
17 void listTypePush(robj
*subject
, robj
*value
, int where
) {
18 /* Check if we need to convert the ziplist */
19 listTypeTryConversion(subject
,value
);
20 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
&&
21 ziplistLen(subject
->ptr
) >= server
.list_max_ziplist_entries
)
22 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
24 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
25 int pos
= (where
== REDIS_HEAD
) ? ZIPLIST_HEAD
: ZIPLIST_TAIL
;
26 value
= getDecodedObject(value
);
27 subject
->ptr
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),pos
);
29 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
30 if (where
== REDIS_HEAD
) {
31 listAddNodeHead(subject
->ptr
,value
);
33 listAddNodeTail(subject
->ptr
,value
);
37 redisPanic("Unknown list encoding");
41 robj
*listTypePop(robj
*subject
, int where
) {
43 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
48 int pos
= (where
== REDIS_HEAD
) ? 0 : -1;
49 p
= ziplistIndex(subject
->ptr
,pos
);
50 if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) {
52 value
= createStringObject((char*)vstr
,vlen
);
54 value
= createStringObjectFromLongLong(vlong
);
56 /* We only need to delete an element when it exists */
57 subject
->ptr
= ziplistDelete(subject
->ptr
,&p
);
59 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
60 list
*list
= subject
->ptr
;
62 if (where
== REDIS_HEAD
) {
68 value
= listNodeValue(ln
);
73 redisPanic("Unknown list encoding");
78 unsigned long listTypeLength(robj
*subject
) {
79 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
80 return ziplistLen(subject
->ptr
);
81 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
82 return listLength((list
*)subject
->ptr
);
84 redisPanic("Unknown list encoding");
88 /* Initialize an iterator at the specified index. */
89 listTypeIterator
*listTypeInitIterator(robj
*subject
, int index
, unsigned char direction
) {
90 listTypeIterator
*li
= zmalloc(sizeof(listTypeIterator
));
91 li
->subject
= subject
;
92 li
->encoding
= subject
->encoding
;
93 li
->direction
= direction
;
94 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
95 li
->zi
= ziplistIndex(subject
->ptr
,index
);
96 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
97 li
->ln
= listIndex(subject
->ptr
,index
);
99 redisPanic("Unknown list encoding");
104 /* Clean up the iterator. */
105 void listTypeReleaseIterator(listTypeIterator
*li
) {
109 /* Stores pointer to current the entry in the provided entry structure
110 * and advances the position of the iterator. Returns 1 when the current
111 * entry is in fact an entry, 0 otherwise. */
112 int listTypeNext(listTypeIterator
*li
, listTypeEntry
*entry
) {
113 /* Protect from converting when iterating */
114 redisAssert(li
->subject
->encoding
== li
->encoding
);
117 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
119 if (entry
->zi
!= NULL
) {
120 if (li
->direction
== REDIS_TAIL
)
121 li
->zi
= ziplistNext(li
->subject
->ptr
,li
->zi
);
123 li
->zi
= ziplistPrev(li
->subject
->ptr
,li
->zi
);
126 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
128 if (entry
->ln
!= NULL
) {
129 if (li
->direction
== REDIS_TAIL
)
130 li
->ln
= li
->ln
->next
;
132 li
->ln
= li
->ln
->prev
;
136 redisPanic("Unknown list encoding");
141 /* Return entry or NULL at the current position of the iterator. */
142 robj
*listTypeGet(listTypeEntry
*entry
) {
143 listTypeIterator
*li
= entry
->li
;
145 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
149 redisAssert(entry
->zi
!= NULL
);
150 if (ziplistGet(entry
->zi
,&vstr
,&vlen
,&vlong
)) {
152 value
= createStringObject((char*)vstr
,vlen
);
154 value
= createStringObjectFromLongLong(vlong
);
157 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
158 redisAssert(entry
->ln
!= NULL
);
159 value
= listNodeValue(entry
->ln
);
162 redisPanic("Unknown list encoding");
167 void listTypeInsert(listTypeEntry
*entry
, robj
*value
, int where
) {
168 robj
*subject
= entry
->li
->subject
;
169 if (entry
->li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
170 value
= getDecodedObject(value
);
171 if (where
== REDIS_TAIL
) {
172 unsigned char *next
= ziplistNext(subject
->ptr
,entry
->zi
);
174 /* When we insert after the current element, but the current element
175 * is the tail of the list, we need to do a push. */
177 subject
->ptr
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),REDIS_TAIL
);
179 subject
->ptr
= ziplistInsert(subject
->ptr
,next
,value
->ptr
,sdslen(value
->ptr
));
182 subject
->ptr
= ziplistInsert(subject
->ptr
,entry
->zi
,value
->ptr
,sdslen(value
->ptr
));
185 } else if (entry
->li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
186 if (where
== REDIS_TAIL
) {
187 listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_TAIL
);
189 listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_HEAD
);
193 redisPanic("Unknown list encoding");
197 /* Compare the given object with the entry at the current position. */
198 int listTypeEqual(listTypeEntry
*entry
, robj
*o
) {
199 listTypeIterator
*li
= entry
->li
;
200 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
201 redisAssert(o
->encoding
== REDIS_ENCODING_RAW
);
202 return ziplistCompare(entry
->zi
,o
->ptr
,sdslen(o
->ptr
));
203 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
204 return equalStringObjects(o
,listNodeValue(entry
->ln
));
206 redisPanic("Unknown list encoding");
210 /* Delete the element pointed to. */
211 void listTypeDelete(listTypeEntry
*entry
) {
212 listTypeIterator
*li
= entry
->li
;
213 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
214 unsigned char *p
= entry
->zi
;
215 li
->subject
->ptr
= ziplistDelete(li
->subject
->ptr
,&p
);
217 /* Update position of the iterator depending on the direction */
218 if (li
->direction
== REDIS_TAIL
)
221 li
->zi
= ziplistPrev(li
->subject
->ptr
,p
);
222 } else if (entry
->li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
224 if (li
->direction
== REDIS_TAIL
)
225 next
= entry
->ln
->next
;
227 next
= entry
->ln
->prev
;
228 listDelNode(li
->subject
->ptr
,entry
->ln
);
231 redisPanic("Unknown list encoding");
235 void listTypeConvert(robj
*subject
, int enc
) {
236 listTypeIterator
*li
;
238 redisAssert(subject
->type
== REDIS_LIST
);
240 if (enc
== REDIS_ENCODING_LINKEDLIST
) {
241 list
*l
= listCreate();
242 listSetFreeMethod(l
,decrRefCount
);
244 /* listTypeGet returns a robj with incremented refcount */
245 li
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
246 while (listTypeNext(li
,&entry
)) listAddNodeTail(l
,listTypeGet(&entry
));
247 listTypeReleaseIterator(li
);
249 subject
->encoding
= REDIS_ENCODING_LINKEDLIST
;
253 redisPanic("Unsupported list conversion");
257 /*-----------------------------------------------------------------------------
259 *----------------------------------------------------------------------------*/
261 void pushGenericCommand(redisClient
*c
, int where
) {
262 robj
*lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
264 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
265 addReply(c
,shared
.cone
);
268 lobj
= createZiplistObject();
269 dbAdd(c
->db
,c
->argv
[1],lobj
);
271 if (lobj
->type
!= REDIS_LIST
) {
272 addReply(c
,shared
.wrongtypeerr
);
275 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) {
276 addReply(c
,shared
.cone
);
280 listTypePush(lobj
,c
->argv
[2],where
);
281 addReplyLongLong(c
,listTypeLength(lobj
));
285 void lpushCommand(redisClient
*c
) {
286 pushGenericCommand(c
,REDIS_HEAD
);
289 void rpushCommand(redisClient
*c
) {
290 pushGenericCommand(c
,REDIS_TAIL
);
293 void pushxGenericCommand(redisClient
*c
, robj
*refval
, robj
*val
, int where
) {
295 listTypeIterator
*iter
;
299 if ((subject
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL
||
300 checkType(c
,subject
,REDIS_LIST
)) return;
302 if (refval
!= NULL
) {
303 /* Note: we expect refval to be string-encoded because it is *not* the
304 * last argument of the multi-bulk LINSERT. */
305 redisAssert(refval
->encoding
== REDIS_ENCODING_RAW
);
307 /* We're not sure if this value can be inserted yet, but we cannot
308 * convert the list inside the iterator. We don't want to loop over
309 * the list twice (once to see if the value can be inserted and once
310 * to do the actual insert), so we assume this value can be inserted
311 * and convert the ziplist to a regular list if necessary. */
312 listTypeTryConversion(subject
,val
);
314 /* Seek refval from head to tail */
315 iter
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
316 while (listTypeNext(iter
,&entry
)) {
317 if (listTypeEqual(&entry
,refval
)) {
318 listTypeInsert(&entry
,val
,where
);
323 listTypeReleaseIterator(iter
);
326 /* Check if the length exceeds the ziplist length threshold. */
327 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
&&
328 ziplistLen(subject
->ptr
) > server
.list_max_ziplist_entries
)
329 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
332 /* Notify client of a failed insert */
333 addReply(c
,shared
.cnegone
);
337 listTypePush(subject
,val
,where
);
341 addReplyUlong(c
,listTypeLength(subject
));
344 void lpushxCommand(redisClient
*c
) {
345 pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_HEAD
);
348 void rpushxCommand(redisClient
*c
) {
349 pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_TAIL
);
352 void linsertCommand(redisClient
*c
) {
353 if (strcasecmp(c
->argv
[2]->ptr
,"after") == 0) {
354 pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_TAIL
);
355 } else if (strcasecmp(c
->argv
[2]->ptr
,"before") == 0) {
356 pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_HEAD
);
358 addReply(c
,shared
.syntaxerr
);
362 void llenCommand(redisClient
*c
) {
363 robj
*o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
);
364 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
365 addReplyUlong(c
,listTypeLength(o
));
368 void lindexCommand(redisClient
*c
) {
369 robj
*o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
);
370 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
371 int index
= atoi(c
->argv
[2]->ptr
);
374 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
379 p
= ziplistIndex(o
->ptr
,index
);
380 if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) {
382 value
= createStringObject((char*)vstr
,vlen
);
384 value
= createStringObjectFromLongLong(vlong
);
386 addReplyBulk(c
,value
);
389 addReply(c
,shared
.nullbulk
);
391 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
392 listNode
*ln
= listIndex(o
->ptr
,index
);
394 value
= listNodeValue(ln
);
395 addReplyBulk(c
,value
);
397 addReply(c
,shared
.nullbulk
);
400 redisPanic("Unknown list encoding");
404 void lsetCommand(redisClient
*c
) {
405 robj
*o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nokeyerr
);
406 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
407 int index
= atoi(c
->argv
[2]->ptr
);
408 robj
*value
= c
->argv
[3];
410 listTypeTryConversion(o
,value
);
411 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
412 unsigned char *p
, *zl
= o
->ptr
;
413 p
= ziplistIndex(zl
,index
);
415 addReply(c
,shared
.outofrangeerr
);
417 o
->ptr
= ziplistDelete(o
->ptr
,&p
);
418 value
= getDecodedObject(value
);
419 o
->ptr
= ziplistInsert(o
->ptr
,p
,value
->ptr
,sdslen(value
->ptr
));
421 addReply(c
,shared
.ok
);
424 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
425 listNode
*ln
= listIndex(o
->ptr
,index
);
427 addReply(c
,shared
.outofrangeerr
);
429 decrRefCount((robj
*)listNodeValue(ln
));
430 listNodeValue(ln
) = value
;
432 addReply(c
,shared
.ok
);
436 redisPanic("Unknown list encoding");
440 void popGenericCommand(redisClient
*c
, int where
) {
441 robj
*o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
);
442 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
444 robj
*value
= listTypePop(o
,where
);
446 addReply(c
,shared
.nullbulk
);
448 addReplyBulk(c
,value
);
450 if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]);
455 void lpopCommand(redisClient
*c
) {
456 popGenericCommand(c
,REDIS_HEAD
);
459 void rpopCommand(redisClient
*c
) {
460 popGenericCommand(c
,REDIS_TAIL
);
463 void lrangeCommand(redisClient
*c
) {
465 int start
= atoi(c
->argv
[2]->ptr
);
466 int end
= atoi(c
->argv
[3]->ptr
);
471 if ((o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.emptymultibulk
)) == NULL
472 || checkType(c
,o
,REDIS_LIST
)) return;
473 llen
= listTypeLength(o
);
475 /* convert negative indexes */
476 if (start
< 0) start
= llen
+start
;
477 if (end
< 0) end
= llen
+end
;
478 if (start
< 0) start
= 0;
479 if (end
< 0) end
= 0;
481 /* indexes sanity checks */
482 if (start
> end
|| start
>= llen
) {
483 /* Out of range start or start > end result in empty list */
484 addReply(c
,shared
.emptymultibulk
);
487 if (end
>= llen
) end
= llen
-1;
488 rangelen
= (end
-start
)+1;
490 /* Return the result in form of a multi-bulk reply */
491 addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n",rangelen
));
492 listTypeIterator
*li
= listTypeInitIterator(o
,start
,REDIS_TAIL
);
493 for (j
= 0; j
< rangelen
; j
++) {
494 redisAssert(listTypeNext(li
,&entry
));
495 value
= listTypeGet(&entry
);
496 addReplyBulk(c
,value
);
499 listTypeReleaseIterator(li
);
502 void ltrimCommand(redisClient
*c
) {
504 int start
= atoi(c
->argv
[2]->ptr
);
505 int end
= atoi(c
->argv
[3]->ptr
);
511 if ((o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.ok
)) == NULL
||
512 checkType(c
,o
,REDIS_LIST
)) return;
513 llen
= listTypeLength(o
);
515 /* convert negative indexes */
516 if (start
< 0) start
= llen
+start
;
517 if (end
< 0) end
= llen
+end
;
518 if (start
< 0) start
= 0;
519 if (end
< 0) end
= 0;
521 /* indexes sanity checks */
522 if (start
> end
|| start
>= llen
) {
523 /* Out of range start or start > end result in empty list */
527 if (end
>= llen
) end
= llen
-1;
532 /* Remove list elements to perform the trim */
533 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
534 o
->ptr
= ziplistDeleteRange(o
->ptr
,0,ltrim
);
535 o
->ptr
= ziplistDeleteRange(o
->ptr
,-rtrim
,rtrim
);
536 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
538 for (j
= 0; j
< ltrim
; j
++) {
539 ln
= listFirst(list
);
540 listDelNode(list
,ln
);
542 for (j
= 0; j
< rtrim
; j
++) {
544 listDelNode(list
,ln
);
547 redisPanic("Unknown list encoding");
549 if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]);
551 addReply(c
,shared
.ok
);
554 void lremCommand(redisClient
*c
) {
555 robj
*subject
, *obj
= c
->argv
[3];
556 int toremove
= atoi(c
->argv
[2]->ptr
);
560 subject
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
);
561 if (subject
== NULL
|| checkType(c
,subject
,REDIS_LIST
)) return;
563 /* Make sure obj is raw when we're dealing with a ziplist */
564 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
)
565 obj
= getDecodedObject(obj
);
567 listTypeIterator
*li
;
569 toremove
= -toremove
;
570 li
= listTypeInitIterator(subject
,-1,REDIS_HEAD
);
572 li
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
575 while (listTypeNext(li
,&entry
)) {
576 if (listTypeEqual(&entry
,obj
)) {
577 listTypeDelete(&entry
);
580 if (toremove
&& removed
== toremove
) break;
583 listTypeReleaseIterator(li
);
585 /* Clean up raw encoded object */
586 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
)
589 if (listTypeLength(subject
) == 0) dbDelete(c
->db
,c
->argv
[1]);
590 addReplySds(c
,sdscatprintf(sdsempty(),":%d\r\n",removed
));
593 /* This is the semantic of this command:
594 * RPOPLPUSH srclist dstlist:
595 * IF LLEN(srclist) > 0
596 * element = RPOP srclist
597 * LPUSH dstlist element
604 * The idea is to be able to get an element from a list in a reliable way
605 * since the element is not just returned but pushed against another list
606 * as well. This command was originally proposed by Ezra Zygmuntowicz.
608 void rpoplpushcommand(redisClient
*c
) {
610 if ((sobj
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL
||
611 checkType(c
,sobj
,REDIS_LIST
)) return;
613 if (listTypeLength(sobj
) == 0) {
614 addReply(c
,shared
.nullbulk
);
616 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
617 if (dobj
&& checkType(c
,dobj
,REDIS_LIST
)) return;
618 value
= listTypePop(sobj
,REDIS_TAIL
);
620 /* Add the element to the target list (unless it's directly
621 * passed to some BLPOP-ing client */
622 if (!handleClientsWaitingListPush(c
,c
->argv
[2],value
)) {
623 /* Create the list if the key does not exist */
625 dobj
= createZiplistObject();
626 dbAdd(c
->db
,c
->argv
[2],dobj
);
628 listTypePush(dobj
,value
,REDIS_HEAD
);
631 /* Send the element to the client as reply as well */
632 addReplyBulk(c
,value
);
634 /* listTypePop returns an object with its refcount incremented */
637 /* Delete the source list when it is empty */
638 if (listTypeLength(sobj
) == 0) dbDelete(c
->db
,c
->argv
[1]);
643 /*-----------------------------------------------------------------------------
644 * Blocking POP operations
645 *----------------------------------------------------------------------------*/
647 /* Currently Redis blocking operations support is limited to list POP ops,
648 * so the current implementation is not fully generic, but it is also not
649 * completely specific so it will not require a rewrite to support new
650 * kind of blocking operations in the future.
652 * Still it's important to note that list blocking operations can be already
653 * used as a notification mechanism in order to implement other blocking
654 * operations at application level, so there must be a very strong evidence
655 * of usefulness and generality before new blocking operations are implemented.
657 * This is how the current blocking POP works, we use BLPOP as example:
658 * - If the user calls BLPOP and the key exists and contains a non empty list
659 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
660 * if there is not to block.
661 * - If instead BLPOP is called and the key does not exists or the list is
662 * empty we need to block. In order to do so we remove the notification for
663 * new data to read in the client socket (so that we'll not serve new
664 * requests if the blocking request is not served). Also we put the client
665 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
666 * blocking for this keys.
667 * - If a PUSH operation against a key with blocked clients waiting is
668 * performed, we serve the first in the list: basically instead to push
669 * the new element inside the list we return it to the (first / oldest)
670 * blocking client, unblock the client, and remove it form the list.
672 * The above comment and the source code should be enough in order to understand
673 * the implementation and modify / fix it later.
676 /* Set a client in blocking mode for the specified key, with the specified
678 void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
) {
683 c
->blocking_keys
= zmalloc(sizeof(robj
*)*numkeys
);
684 c
->blocking_keys_num
= numkeys
;
685 c
->blockingto
= timeout
;
686 for (j
= 0; j
< numkeys
; j
++) {
687 /* Add the key in the client structure, to map clients -> keys */
688 c
->blocking_keys
[j
] = keys
[j
];
689 incrRefCount(keys
[j
]);
691 /* And in the other "side", to map keys -> clients */
692 de
= dictFind(c
->db
->blocking_keys
,keys
[j
]);
696 /* For every key we take a list of clients blocked for it */
698 retval
= dictAdd(c
->db
->blocking_keys
,keys
[j
],l
);
699 incrRefCount(keys
[j
]);
700 redisAssert(retval
== DICT_OK
);
702 l
= dictGetEntryVal(de
);
704 listAddNodeTail(l
,c
);
706 /* Mark the client as a blocked client */
707 c
->flags
|= REDIS_BLOCKED
;
708 server
.blpop_blocked_clients
++;
711 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
712 void unblockClientWaitingData(redisClient
*c
) {
717 redisAssert(c
->blocking_keys
!= NULL
);
718 /* The client may wait for multiple keys, so unblock it for every key. */
719 for (j
= 0; j
< c
->blocking_keys_num
; j
++) {
720 /* Remove this client from the list of clients waiting for this key. */
721 de
= dictFind(c
->db
->blocking_keys
,c
->blocking_keys
[j
]);
722 redisAssert(de
!= NULL
);
723 l
= dictGetEntryVal(de
);
724 listDelNode(l
,listSearchKey(l
,c
));
725 /* If the list is empty we need to remove it to avoid wasting memory */
726 if (listLength(l
) == 0)
727 dictDelete(c
->db
->blocking_keys
,c
->blocking_keys
[j
]);
728 decrRefCount(c
->blocking_keys
[j
]);
730 /* Cleanup the client structure */
731 zfree(c
->blocking_keys
);
732 c
->blocking_keys
= NULL
;
733 c
->flags
&= (~REDIS_BLOCKED
);
734 server
.blpop_blocked_clients
--;
735 /* We want to process data if there is some command waiting
736 * in the input buffer. Note that this is safe even if
737 * unblockClientWaitingData() gets called from freeClient() because
738 * freeClient() will be smart enough to call this function
739 * *after* c->querybuf was set to NULL. */
740 if (c
->querybuf
&& sdslen(c
->querybuf
) > 0) processInputBuffer(c
);
743 /* This should be called from any function PUSHing into lists.
744 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
745 * 'ele' is the element pushed.
747 * If the function returns 0 there was no client waiting for a list push
750 * If the function returns 1 there was a client waiting for a list push
751 * against this key, the element was passed to this client thus it's not
752 * needed to actually add it to the list and the caller should return asap. */
753 int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
754 struct dictEntry
*de
;
755 redisClient
*receiver
;
759 de
= dictFind(c
->db
->blocking_keys
,key
);
760 if (de
== NULL
) return 0;
761 l
= dictGetEntryVal(de
);
763 redisAssert(ln
!= NULL
);
764 receiver
= ln
->value
;
766 addReplySds(receiver
,sdsnew("*2\r\n"));
767 addReplyBulk(receiver
,key
);
768 addReplyBulk(receiver
,ele
);
769 unblockClientWaitingData(receiver
);
773 /* Blocking RPOP/LPOP */
774 void blockingPopGenericCommand(redisClient
*c
, int where
) {
779 for (j
= 1; j
< c
->argc
-1; j
++) {
780 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
782 if (o
->type
!= REDIS_LIST
) {
783 addReply(c
,shared
.wrongtypeerr
);
786 if (listTypeLength(o
) != 0) {
787 /* If the list contains elements fall back to the usual
788 * non-blocking POP operation */
789 robj
*argv
[2], **orig_argv
;
792 /* We need to alter the command arguments before to call
793 * popGenericCommand() as the command takes a single key. */
796 argv
[1] = c
->argv
[j
];
800 /* Also the return value is different, we need to output
801 * the multi bulk reply header and the key name. The
802 * "real" command will add the last element (the value)
803 * for us. If this souds like an hack to you it's just
804 * because it is... */
805 addReplySds(c
,sdsnew("*2\r\n"));
806 addReplyBulk(c
,argv
[1]);
807 popGenericCommand(c
,where
);
809 /* Fix the client structure with the original stuff */
817 /* If the list is empty or the key does not exists we must block */
818 timeout
= strtol(c
->argv
[c
->argc
-1]->ptr
,NULL
,10);
819 if (timeout
> 0) timeout
+= time(NULL
);
820 blockForKeys(c
,c
->argv
+1,c
->argc
-2,timeout
);
823 void blpopCommand(redisClient
*c
) {
824 blockingPopGenericCommand(c
,REDIS_HEAD
);
827 void brpopCommand(redisClient
*c
) {
828 blockingPopGenericCommand(c
,REDIS_TAIL
);