3 void signalListAsReady(redisClient
*c
, robj
*key
);
5 /*-----------------------------------------------------------------------------
7 *----------------------------------------------------------------------------*/
9 /* Check the argument length to see if it requires us to convert the ziplist
10 * to a real list. Only check raw-encoded objects because integer encoded
11 * objects are never too long. */
12 void listTypeTryConversion(robj
*subject
, robj
*value
) {
13 if (subject
->encoding
!= REDIS_ENCODING_ZIPLIST
) return;
14 if (value
->encoding
== REDIS_ENCODING_RAW
&&
15 sdslen(value
->ptr
) > server
.list_max_ziplist_value
)
16 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
19 /* The function pushes an elmenet to the specified list object 'subject',
20 * at head or tail position as specified by 'where'.
22 * There is no need for the caller to incremnet the refcount of 'value' as
23 * the function takes care of it if needed. */
24 void listTypePush(robj
*subject
, robj
*value
, int where
) {
25 /* Check if we need to convert the ziplist */
26 listTypeTryConversion(subject
,value
);
27 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
&&
28 ziplistLen(subject
->ptr
) >= server
.list_max_ziplist_entries
)
29 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
31 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
32 int pos
= (where
== REDIS_HEAD
) ? ZIPLIST_HEAD
: ZIPLIST_TAIL
;
33 value
= getDecodedObject(value
);
34 subject
->ptr
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),pos
);
36 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
37 if (where
== REDIS_HEAD
) {
38 listAddNodeHead(subject
->ptr
,value
);
40 listAddNodeTail(subject
->ptr
,value
);
44 redisPanic("Unknown list encoding");
48 robj
*listTypePop(robj
*subject
, int where
) {
50 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
55 int pos
= (where
== REDIS_HEAD
) ? 0 : -1;
56 p
= ziplistIndex(subject
->ptr
,pos
);
57 if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) {
59 value
= createStringObject((char*)vstr
,vlen
);
61 value
= createStringObjectFromLongLong(vlong
);
63 /* We only need to delete an element when it exists */
64 subject
->ptr
= ziplistDelete(subject
->ptr
,&p
);
66 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
67 list
*list
= subject
->ptr
;
69 if (where
== REDIS_HEAD
) {
75 value
= listNodeValue(ln
);
80 redisPanic("Unknown list encoding");
85 unsigned long listTypeLength(robj
*subject
) {
86 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
87 return ziplistLen(subject
->ptr
);
88 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
89 return listLength((list
*)subject
->ptr
);
91 redisPanic("Unknown list encoding");
95 /* Initialize an iterator at the specified index. */
96 listTypeIterator
*listTypeInitIterator(robj
*subject
, long index
, unsigned char direction
) {
97 listTypeIterator
*li
= zmalloc(sizeof(listTypeIterator
));
98 li
->subject
= subject
;
99 li
->encoding
= subject
->encoding
;
100 li
->direction
= direction
;
101 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
102 li
->zi
= ziplistIndex(subject
->ptr
,index
);
103 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
104 li
->ln
= listIndex(subject
->ptr
,index
);
106 redisPanic("Unknown list encoding");
111 /* Clean up the iterator. */
112 void listTypeReleaseIterator(listTypeIterator
*li
) {
116 /* Stores pointer to current the entry in the provided entry structure
117 * and advances the position of the iterator. Returns 1 when the current
118 * entry is in fact an entry, 0 otherwise. */
119 int listTypeNext(listTypeIterator
*li
, listTypeEntry
*entry
) {
120 /* Protect from converting when iterating */
121 redisAssert(li
->subject
->encoding
== li
->encoding
);
124 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
126 if (entry
->zi
!= NULL
) {
127 if (li
->direction
== REDIS_TAIL
)
128 li
->zi
= ziplistNext(li
->subject
->ptr
,li
->zi
);
130 li
->zi
= ziplistPrev(li
->subject
->ptr
,li
->zi
);
133 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
135 if (entry
->ln
!= NULL
) {
136 if (li
->direction
== REDIS_TAIL
)
137 li
->ln
= li
->ln
->next
;
139 li
->ln
= li
->ln
->prev
;
143 redisPanic("Unknown list encoding");
148 /* Return entry or NULL at the current position of the iterator. */
149 robj
*listTypeGet(listTypeEntry
*entry
) {
150 listTypeIterator
*li
= entry
->li
;
152 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
156 redisAssert(entry
->zi
!= NULL
);
157 if (ziplistGet(entry
->zi
,&vstr
,&vlen
,&vlong
)) {
159 value
= createStringObject((char*)vstr
,vlen
);
161 value
= createStringObjectFromLongLong(vlong
);
164 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
165 redisAssert(entry
->ln
!= NULL
);
166 value
= listNodeValue(entry
->ln
);
169 redisPanic("Unknown list encoding");
174 void listTypeInsert(listTypeEntry
*entry
, robj
*value
, int where
) {
175 robj
*subject
= entry
->li
->subject
;
176 if (entry
->li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
177 value
= getDecodedObject(value
);
178 if (where
== REDIS_TAIL
) {
179 unsigned char *next
= ziplistNext(subject
->ptr
,entry
->zi
);
181 /* When we insert after the current element, but the current element
182 * is the tail of the list, we need to do a push. */
184 subject
->ptr
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),REDIS_TAIL
);
186 subject
->ptr
= ziplistInsert(subject
->ptr
,next
,value
->ptr
,sdslen(value
->ptr
));
189 subject
->ptr
= ziplistInsert(subject
->ptr
,entry
->zi
,value
->ptr
,sdslen(value
->ptr
));
192 } else if (entry
->li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
193 if (where
== REDIS_TAIL
) {
194 listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_TAIL
);
196 listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_HEAD
);
200 redisPanic("Unknown list encoding");
204 /* Compare the given object with the entry at the current position. */
205 int listTypeEqual(listTypeEntry
*entry
, robj
*o
) {
206 listTypeIterator
*li
= entry
->li
;
207 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
208 redisAssertWithInfo(NULL
,o
,o
->encoding
== REDIS_ENCODING_RAW
);
209 return ziplistCompare(entry
->zi
,o
->ptr
,sdslen(o
->ptr
));
210 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
211 return equalStringObjects(o
,listNodeValue(entry
->ln
));
213 redisPanic("Unknown list encoding");
217 /* Delete the element pointed to. */
218 void listTypeDelete(listTypeEntry
*entry
) {
219 listTypeIterator
*li
= entry
->li
;
220 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
221 unsigned char *p
= entry
->zi
;
222 li
->subject
->ptr
= ziplistDelete(li
->subject
->ptr
,&p
);
224 /* Update position of the iterator depending on the direction */
225 if (li
->direction
== REDIS_TAIL
)
228 li
->zi
= ziplistPrev(li
->subject
->ptr
,p
);
229 } else if (entry
->li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
231 if (li
->direction
== REDIS_TAIL
)
232 next
= entry
->ln
->next
;
234 next
= entry
->ln
->prev
;
235 listDelNode(li
->subject
->ptr
,entry
->ln
);
238 redisPanic("Unknown list encoding");
242 void listTypeConvert(robj
*subject
, int enc
) {
243 listTypeIterator
*li
;
245 redisAssertWithInfo(NULL
,subject
,subject
->type
== REDIS_LIST
);
247 if (enc
== REDIS_ENCODING_LINKEDLIST
) {
248 list
*l
= listCreate();
249 listSetFreeMethod(l
,decrRefCount
);
251 /* listTypeGet returns a robj with incremented refcount */
252 li
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
253 while (listTypeNext(li
,&entry
)) listAddNodeTail(l
,listTypeGet(&entry
));
254 listTypeReleaseIterator(li
);
256 subject
->encoding
= REDIS_ENCODING_LINKEDLIST
;
260 redisPanic("Unsupported list conversion");
264 /*-----------------------------------------------------------------------------
266 *----------------------------------------------------------------------------*/
268 void pushGenericCommand(redisClient
*c
, int where
) {
269 int j
, waiting
= 0, pushed
= 0;
270 robj
*lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
271 int may_have_waiting_clients
= (lobj
== NULL
);
273 if (lobj
&& lobj
->type
!= REDIS_LIST
) {
274 addReply(c
,shared
.wrongtypeerr
);
278 if (may_have_waiting_clients
) signalListAsReady(c
,c
->argv
[1]);
280 for (j
= 2; j
< c
->argc
; j
++) {
281 c
->argv
[j
] = tryObjectEncoding(c
->argv
[j
]);
283 lobj
= createZiplistObject();
284 dbAdd(c
->db
,c
->argv
[1],lobj
);
286 listTypePush(lobj
,c
->argv
[j
],where
);
289 addReplyLongLong(c
, waiting
+ (lobj
? listTypeLength(lobj
) : 0));
290 if (pushed
) signalModifiedKey(c
->db
,c
->argv
[1]);
291 server
.dirty
+= pushed
;
294 void lpushCommand(redisClient
*c
) {
295 pushGenericCommand(c
,REDIS_HEAD
);
298 void rpushCommand(redisClient
*c
) {
299 pushGenericCommand(c
,REDIS_TAIL
);
302 void pushxGenericCommand(redisClient
*c
, robj
*refval
, robj
*val
, int where
) {
304 listTypeIterator
*iter
;
308 if ((subject
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL
||
309 checkType(c
,subject
,REDIS_LIST
)) return;
311 if (refval
!= NULL
) {
312 /* Note: we expect refval to be string-encoded because it is *not* the
313 * last argument of the multi-bulk LINSERT. */
314 redisAssertWithInfo(c
,refval
,refval
->encoding
== REDIS_ENCODING_RAW
);
316 /* We're not sure if this value can be inserted yet, but we cannot
317 * convert the list inside the iterator. We don't want to loop over
318 * the list twice (once to see if the value can be inserted and once
319 * to do the actual insert), so we assume this value can be inserted
320 * and convert the ziplist to a regular list if necessary. */
321 listTypeTryConversion(subject
,val
);
323 /* Seek refval from head to tail */
324 iter
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
325 while (listTypeNext(iter
,&entry
)) {
326 if (listTypeEqual(&entry
,refval
)) {
327 listTypeInsert(&entry
,val
,where
);
332 listTypeReleaseIterator(iter
);
335 /* Check if the length exceeds the ziplist length threshold. */
336 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
&&
337 ziplistLen(subject
->ptr
) > server
.list_max_ziplist_entries
)
338 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
339 signalModifiedKey(c
->db
,c
->argv
[1]);
342 /* Notify client of a failed insert */
343 addReply(c
,shared
.cnegone
);
347 listTypePush(subject
,val
,where
);
348 signalModifiedKey(c
->db
,c
->argv
[1]);
352 addReplyLongLong(c
,listTypeLength(subject
));
355 void lpushxCommand(redisClient
*c
) {
356 c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
357 pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_HEAD
);
360 void rpushxCommand(redisClient
*c
) {
361 c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
362 pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_TAIL
);
365 void linsertCommand(redisClient
*c
) {
366 c
->argv
[4] = tryObjectEncoding(c
->argv
[4]);
367 if (strcasecmp(c
->argv
[2]->ptr
,"after") == 0) {
368 pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_TAIL
);
369 } else if (strcasecmp(c
->argv
[2]->ptr
,"before") == 0) {
370 pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_HEAD
);
372 addReply(c
,shared
.syntaxerr
);
376 void llenCommand(redisClient
*c
) {
377 robj
*o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
);
378 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
379 addReplyLongLong(c
,listTypeLength(o
));
382 void lindexCommand(redisClient
*c
) {
383 robj
*o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
);
384 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
388 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &index
, NULL
) != REDIS_OK
))
391 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
396 p
= ziplistIndex(o
->ptr
,index
);
397 if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) {
399 value
= createStringObject((char*)vstr
,vlen
);
401 value
= createStringObjectFromLongLong(vlong
);
403 addReplyBulk(c
,value
);
406 addReply(c
,shared
.nullbulk
);
408 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
409 listNode
*ln
= listIndex(o
->ptr
,index
);
411 value
= listNodeValue(ln
);
412 addReplyBulk(c
,value
);
414 addReply(c
,shared
.nullbulk
);
417 redisPanic("Unknown list encoding");
421 void lsetCommand(redisClient
*c
) {
422 robj
*o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nokeyerr
);
423 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
425 robj
*value
= (c
->argv
[3] = tryObjectEncoding(c
->argv
[3]));
427 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &index
, NULL
) != REDIS_OK
))
430 listTypeTryConversion(o
,value
);
431 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
432 unsigned char *p
, *zl
= o
->ptr
;
433 p
= ziplistIndex(zl
,index
);
435 addReply(c
,shared
.outofrangeerr
);
437 o
->ptr
= ziplistDelete(o
->ptr
,&p
);
438 value
= getDecodedObject(value
);
439 o
->ptr
= ziplistInsert(o
->ptr
,p
,value
->ptr
,sdslen(value
->ptr
));
441 addReply(c
,shared
.ok
);
442 signalModifiedKey(c
->db
,c
->argv
[1]);
445 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
446 listNode
*ln
= listIndex(o
->ptr
,index
);
448 addReply(c
,shared
.outofrangeerr
);
450 decrRefCount((robj
*)listNodeValue(ln
));
451 listNodeValue(ln
) = value
;
453 addReply(c
,shared
.ok
);
454 signalModifiedKey(c
->db
,c
->argv
[1]);
458 redisPanic("Unknown list encoding");
462 void popGenericCommand(redisClient
*c
, int where
) {
463 robj
*o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
);
464 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
466 robj
*value
= listTypePop(o
,where
);
468 addReply(c
,shared
.nullbulk
);
470 addReplyBulk(c
,value
);
472 if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]);
473 signalModifiedKey(c
->db
,c
->argv
[1]);
478 void lpopCommand(redisClient
*c
) {
479 popGenericCommand(c
,REDIS_HEAD
);
482 void rpopCommand(redisClient
*c
) {
483 popGenericCommand(c
,REDIS_TAIL
);
486 void lrangeCommand(redisClient
*c
) {
488 long start
, end
, llen
, rangelen
;
490 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
491 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
493 if ((o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.emptymultibulk
)) == NULL
494 || checkType(c
,o
,REDIS_LIST
)) return;
495 llen
= listTypeLength(o
);
497 /* convert negative indexes */
498 if (start
< 0) start
= llen
+start
;
499 if (end
< 0) end
= llen
+end
;
500 if (start
< 0) start
= 0;
502 /* Invariant: start >= 0, so this test will be true when end < 0.
503 * The range is empty when start > end or start >= length. */
504 if (start
> end
|| start
>= llen
) {
505 addReply(c
,shared
.emptymultibulk
);
508 if (end
>= llen
) end
= llen
-1;
509 rangelen
= (end
-start
)+1;
511 /* Return the result in form of a multi-bulk reply */
512 addReplyMultiBulkLen(c
,rangelen
);
513 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
514 unsigned char *p
= ziplistIndex(o
->ptr
,start
);
520 ziplistGet(p
,&vstr
,&vlen
,&vlong
);
522 addReplyBulkCBuffer(c
,vstr
,vlen
);
524 addReplyBulkLongLong(c
,vlong
);
526 p
= ziplistNext(o
->ptr
,p
);
528 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
531 /* If we are nearest to the end of the list, reach the element
532 * starting from tail and going backward, as it is faster. */
533 if (start
> llen
/2) start
-= llen
;
534 ln
= listIndex(o
->ptr
,start
);
537 addReplyBulk(c
,ln
->value
);
541 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
545 void ltrimCommand(redisClient
*c
) {
547 long start
, end
, llen
, j
, ltrim
, rtrim
;
551 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
552 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
554 if ((o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.ok
)) == NULL
||
555 checkType(c
,o
,REDIS_LIST
)) return;
556 llen
= listTypeLength(o
);
558 /* convert negative indexes */
559 if (start
< 0) start
= llen
+start
;
560 if (end
< 0) end
= llen
+end
;
561 if (start
< 0) start
= 0;
563 /* Invariant: start >= 0, so this test will be true when end < 0.
564 * The range is empty when start > end or start >= length. */
565 if (start
> end
|| start
>= llen
) {
566 /* Out of range start or start > end result in empty list */
570 if (end
>= llen
) end
= llen
-1;
575 /* Remove list elements to perform the trim */
576 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
577 o
->ptr
= ziplistDeleteRange(o
->ptr
,0,ltrim
);
578 o
->ptr
= ziplistDeleteRange(o
->ptr
,-rtrim
,rtrim
);
579 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
581 for (j
= 0; j
< ltrim
; j
++) {
582 ln
= listFirst(list
);
583 listDelNode(list
,ln
);
585 for (j
= 0; j
< rtrim
; j
++) {
587 listDelNode(list
,ln
);
590 redisPanic("Unknown list encoding");
592 if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]);
593 signalModifiedKey(c
->db
,c
->argv
[1]);
595 addReply(c
,shared
.ok
);
598 void lremCommand(redisClient
*c
) {
600 obj
= c
->argv
[3] = tryObjectEncoding(c
->argv
[3]);
605 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &toremove
, NULL
) != REDIS_OK
))
608 subject
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
);
609 if (subject
== NULL
|| checkType(c
,subject
,REDIS_LIST
)) return;
611 /* Make sure obj is raw when we're dealing with a ziplist */
612 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
)
613 obj
= getDecodedObject(obj
);
615 listTypeIterator
*li
;
617 toremove
= -toremove
;
618 li
= listTypeInitIterator(subject
,-1,REDIS_HEAD
);
620 li
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
623 while (listTypeNext(li
,&entry
)) {
624 if (listTypeEqual(&entry
,obj
)) {
625 listTypeDelete(&entry
);
628 if (toremove
&& removed
== toremove
) break;
631 listTypeReleaseIterator(li
);
633 /* Clean up raw encoded object */
634 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
)
637 if (listTypeLength(subject
) == 0) dbDelete(c
->db
,c
->argv
[1]);
638 addReplyLongLong(c
,removed
);
639 if (removed
) signalModifiedKey(c
->db
,c
->argv
[1]);
642 /* This is the semantic of this command:
643 * RPOPLPUSH srclist dstlist:
644 * IF LLEN(srclist) > 0
645 * element = RPOP srclist
646 * LPUSH dstlist element
653 * The idea is to be able to get an element from a list in a reliable way
654 * since the element is not just returned but pushed against another list
655 * as well. This command was originally proposed by Ezra Zygmuntowicz.
658 void rpoplpushHandlePush(redisClient
*c
, robj
*dstkey
, robj
*dstobj
, robj
*value
) {
659 /* Create the list if the key does not exist */
661 dstobj
= createZiplistObject();
662 dbAdd(c
->db
,dstkey
,dstobj
);
663 signalListAsReady(c
,dstkey
);
665 signalModifiedKey(c
->db
,dstkey
);
666 listTypePush(dstobj
,value
,REDIS_HEAD
);
667 /* Always send the pushed value to the client. */
668 addReplyBulk(c
,value
);
671 void rpoplpushCommand(redisClient
*c
) {
673 if ((sobj
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL
||
674 checkType(c
,sobj
,REDIS_LIST
)) return;
676 if (listTypeLength(sobj
) == 0) {
677 /* This may only happen after loading very old RDB files. Recent
678 * versions of Redis delete keys of empty lists. */
679 addReply(c
,shared
.nullbulk
);
681 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
682 robj
*touchedkey
= c
->argv
[1];
684 if (dobj
&& checkType(c
,dobj
,REDIS_LIST
)) return;
685 value
= listTypePop(sobj
,REDIS_TAIL
);
686 /* We saved touched key, and protect it, since rpoplpushHandlePush
687 * may change the client command argument vector (it does not
689 incrRefCount(touchedkey
);
690 rpoplpushHandlePush(c
,c
->argv
[2],dobj
,value
);
692 /* listTypePop returns an object with its refcount incremented */
695 /* Delete the source list when it is empty */
696 if (listTypeLength(sobj
) == 0) dbDelete(c
->db
,touchedkey
);
697 signalModifiedKey(c
->db
,touchedkey
);
698 decrRefCount(touchedkey
);
703 /*-----------------------------------------------------------------------------
704 * Blocking POP operations
705 *----------------------------------------------------------------------------*/
707 /* This is how the current blocking POP works, we use BLPOP as example:
708 * - If the user calls BLPOP and the key exists and contains a non empty list
709 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
710 * if blocking is not required.
711 * - If instead BLPOP is called and the key does not exists or the list is
712 * empty we need to block. In order to do so we remove the notification for
713 * new data to read in the client socket (so that we'll not serve new
714 * requests if the blocking request is not served). Also we put the client
715 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
716 * blocking for this keys.
717 * - If a PUSH operation against a key with blocked clients waiting is
718 * performed, we mark this key as "ready", and after the current command,
719 * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
720 * for this list, from the one that blocked first, to the last, accordingly
721 * to the number of elements we have in the ready list.
724 /* Set a client in blocking mode for the specified key, with the specified
726 void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
, robj
*target
) {
731 c
->bpop
.keys
= zmalloc(sizeof(robj
*)*numkeys
);
732 c
->bpop
.count
= numkeys
;
733 c
->bpop
.timeout
= timeout
;
734 c
->bpop
.target
= target
;
736 if (target
!= NULL
) {
737 incrRefCount(target
);
740 for (j
= 0; j
< numkeys
; j
++) {
741 /* Add the key in the client structure, to map clients -> keys */
742 c
->bpop
.keys
[j
] = keys
[j
];
743 incrRefCount(keys
[j
]);
745 /* And in the other "side", to map keys -> clients */
746 de
= dictFind(c
->db
->blocking_keys
,keys
[j
]);
750 /* For every key we take a list of clients blocked for it */
752 retval
= dictAdd(c
->db
->blocking_keys
,keys
[j
],l
);
753 incrRefCount(keys
[j
]);
754 redisAssertWithInfo(c
,keys
[j
],retval
== DICT_OK
);
758 listAddNodeTail(l
,c
);
760 /* Mark the client as a blocked client */
761 c
->flags
|= REDIS_BLOCKED
;
762 server
.bpop_blocked_clients
++;
765 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
766 void unblockClientWaitingData(redisClient
*c
) {
771 redisAssertWithInfo(c
,NULL
,c
->bpop
.keys
!= NULL
);
772 /* The client may wait for multiple keys, so unblock it for every key. */
773 for (j
= 0; j
< c
->bpop
.count
; j
++) {
774 /* Remove this client from the list of clients waiting for this key. */
775 de
= dictFind(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]);
776 redisAssertWithInfo(c
,c
->bpop
.keys
[j
],de
!= NULL
);
778 listDelNode(l
,listSearchKey(l
,c
));
779 /* If the list is empty we need to remove it to avoid wasting memory */
780 if (listLength(l
) == 0)
781 dictDelete(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]);
782 decrRefCount(c
->bpop
.keys
[j
]);
785 /* Cleanup the client structure */
788 if (c
->bpop
.target
) decrRefCount(c
->bpop
.target
);
789 c
->bpop
.target
= NULL
;
790 c
->flags
&= ~REDIS_BLOCKED
;
791 c
->flags
|= REDIS_UNBLOCKED
;
792 server
.bpop_blocked_clients
--;
793 listAddNodeTail(server
.unblocked_clients
,c
);
796 /* If the specified key has clients blocked waiting for list pushes, this
797 * function will put the key reference into the server.ready_keys list.
798 * Note that db->ready_keys is an hash table that allows us to avoid putting
799 * the same key agains and again in the list in case of multiple pushes
800 * made by a script or in the context of MULTI/EXEC.
802 * The list will be finally processed by handleClientsBlockedOnLists() */
803 void signalListAsReady(redisClient
*c
, robj
*key
) {
806 /* No clients blocking for this key? No need to queue it. */
807 if (dictFind(c
->db
->blocking_keys
,key
) == NULL
) return;
809 /* Key was already signaled? No need to queue it again. */
810 if (dictFind(c
->db
->ready_keys
,key
) != NULL
) return;
812 /* Ok, we need to queue this key into server.ready_keys. */
813 rl
= zmalloc(sizeof(*rl
));
817 listAddNodeTail(server
.ready_keys
,rl
);
819 /* We also add the key in the db->ready_keys dictionary in order
820 * to avoid adding it multiple times into a list with a simple O(1)
823 redisAssert(dictAdd(c
->db
->ready_keys
,key
,NULL
) == DICT_OK
);
826 /* This is an helper function for handleClientsBlockedOnLists(). It's work
827 * is to serve a specific client (receiver) that is blocked on 'key'
828 * in the context of the specified 'db', doing the following:
830 * 1) Provide the client with the 'value' element.
831 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
832 * 'value' element on the destionation list (the LPUSH side of the command).
833 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
834 * the AOF and replication channel.
836 * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
837 * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
838 * we can propagate the command properly.
840 * The function returns REDIS_OK if we are able to serve the client, otherwise
841 * REDIS_ERR is returned to signal the caller that the list POP operation
842 * should be undoed as the client was not served: This only happens for
843 * BRPOPLPUSH that fails to push the value to the destination key as it is
844 * of the wrong type. */
845 int serveClientBlockedOnList(redisClient
*receiver
, robj
*key
, robj
*dstkey
, redisDb
*db
, robj
*value
, int where
)
849 if (dstkey
== NULL
) {
850 /* Propagate the [LR]POP operation. */
851 argv
[0] = (where
== REDIS_HEAD
) ? shared
.lpop
:
854 propagate((where
== REDIS_HEAD
) ?
855 server
.lpopCommand
: server
.rpopCommand
,
856 db
->id
,argv
,2,REDIS_PROPAGATE_AOF
|REDIS_PROPAGATE_REPL
);
859 addReplyMultiBulkLen(receiver
,2);
860 addReplyBulk(receiver
,key
);
861 addReplyBulk(receiver
,value
);
865 lookupKeyWrite(receiver
->db
,dstkey
);
867 checkType(receiver
,dstobj
,REDIS_LIST
)))
869 /* Propagate the RPOP operation. */
870 argv
[0] = shared
.rpop
;
872 propagate(server
.rpopCommand
,
875 REDIS_PROPAGATE_REPL
);
876 rpoplpushHandlePush(receiver
,dstkey
,dstobj
,
878 /* Propagate the LPUSH operation. */
879 argv
[0] = shared
.lpush
;
882 propagate(server
.lpushCommand
,
885 REDIS_PROPAGATE_REPL
);
887 /* BRPOPLPUSH failed because of wrong
888 * destination type. */
895 /* This function should be called by Redis every time a single command,
896 * a MULTI/EXEC block, or a Lua script, terminated its execution after
897 * being called by a client.
899 * All the keys with at least one client blocked that received at least
900 * one new element via some PUSH operation are accumulated into
901 * the server.ready_keys list. This function will run the list and will
902 * serve clients accordingly. Note that the function will iterate again and
903 * again as a result of serving BRPOPLPUSH we can have new blocking clients
904 * to serve because of the PUSH side of BRPOPLPUSH. */
905 void handleClientsBlockedOnLists(void) {
906 while(listLength(server
.ready_keys
) != 0) {
909 /* Point server.ready_keys to a fresh list and save the current one
910 * locally. This way as we run the old list we are free to call
911 * signalListAsReady() that may push new elements in server.ready_keys
912 * when handling clients blocked into BRPOPLPUSH. */
913 l
= server
.ready_keys
;
914 server
.ready_keys
= listCreate();
916 while(listLength(l
) != 0) {
917 listNode
*ln
= listFirst(l
);
918 readyList
*rl
= ln
->value
;
920 /* First of all remove this key from db->ready_keys so that
921 * we can safely call signalListAsReady() against this key. */
922 dictDelete(rl
->db
->ready_keys
,rl
->key
);
924 /* If the key exists and it's a list, serve blocked clients
926 robj
*o
= lookupKeyWrite(rl
->db
,rl
->key
);
927 if (o
!= NULL
&& o
->type
== REDIS_LIST
) {
930 /* We serve clients in the same order they blocked for
931 * this key, from the first blocked to the last. */
932 de
= dictFind(rl
->db
->blocking_keys
,rl
->key
);
934 list
*clients
= dictGetVal(de
);
935 int numclients
= listLength(clients
);
937 while(numclients
--) {
938 listNode
*clientnode
= listFirst(clients
);
939 redisClient
*receiver
= clientnode
->value
;
940 robj
*dstkey
= receiver
->bpop
.target
;
941 int where
= (receiver
->lastcmd
&&
942 receiver
->lastcmd
->proc
== blpopCommand
) ?
943 REDIS_HEAD
: REDIS_TAIL
;
944 robj
*value
= listTypePop(o
,where
);
947 /* Protect receiver->bpop.target, that will be
948 * freed by the next unblockClientWaitingData()
950 if (dstkey
) incrRefCount(dstkey
);
951 unblockClientWaitingData(receiver
);
953 if (serveClientBlockedOnList(receiver
,
954 rl
->key
,dstkey
,rl
->db
,value
,
957 /* If we failed serving the client we need
958 * to also undo the POP operation. */
959 listTypePush(o
,value
,where
);
962 if (dstkey
) decrRefCount(dstkey
);
970 if (listTypeLength(o
) == 0) dbDelete(rl
->db
,rl
->key
);
971 /* We don't call signalModifiedKey() as it was already called
972 * when an element was pushed on the list. */
975 /* Free this item. */
976 decrRefCount(rl
->key
);
980 listRelease(l
); /* We have the new list on place at this point. */
984 int getTimeoutFromObjectOrReply(redisClient
*c
, robj
*object
, time_t *timeout
) {
987 if (getLongFromObjectOrReply(c
,object
,&tval
,
988 "timeout is not an integer or out of range") != REDIS_OK
)
992 addReplyError(c
,"timeout is negative");
996 if (tval
> 0) tval
+= server
.unixtime
;
1002 /* Blocking RPOP/LPOP */
1003 void blockingPopGenericCommand(redisClient
*c
, int where
) {
1008 if (getTimeoutFromObjectOrReply(c
,c
->argv
[c
->argc
-1],&timeout
) != REDIS_OK
)
1011 for (j
= 1; j
< c
->argc
-1; j
++) {
1012 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
1014 if (o
->type
!= REDIS_LIST
) {
1015 addReply(c
,shared
.wrongtypeerr
);
1018 if (listTypeLength(o
) != 0) {
1019 /* Non empty list, this is like a non normal [LR]POP. */
1020 robj
*value
= listTypePop(o
,where
);
1021 redisAssert(value
!= NULL
);
1023 addReplyMultiBulkLen(c
,2);
1024 addReplyBulk(c
,c
->argv
[j
]);
1025 addReplyBulk(c
,value
);
1026 decrRefCount(value
);
1027 if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[j
]);
1028 signalModifiedKey(c
->db
,c
->argv
[j
]);
1031 /* Replicate it as an [LR]POP instead of B[LR]POP. */
1032 rewriteClientCommandVector(c
,2,
1033 (where
== REDIS_HEAD
) ? shared
.lpop
: shared
.rpop
,
1041 /* If we are inside a MULTI/EXEC and the list is empty the only thing
1042 * we can do is treating it as a timeout (even with timeout 0). */
1043 if (c
->flags
& REDIS_MULTI
) {
1044 addReply(c
,shared
.nullmultibulk
);
1048 /* If the list is empty or the key does not exists we must block */
1049 blockForKeys(c
, c
->argv
+ 1, c
->argc
- 2, timeout
, NULL
);
1052 void blpopCommand(redisClient
*c
) {
1053 blockingPopGenericCommand(c
,REDIS_HEAD
);
1056 void brpopCommand(redisClient
*c
) {
1057 blockingPopGenericCommand(c
,REDIS_TAIL
);
1060 void brpoplpushCommand(redisClient
*c
) {
1063 if (getTimeoutFromObjectOrReply(c
,c
->argv
[3],&timeout
) != REDIS_OK
)
1066 robj
*key
= lookupKeyWrite(c
->db
, c
->argv
[1]);
1069 if (c
->flags
& REDIS_MULTI
) {
1070 /* Blocking against an empty list in a multi state
1071 * returns immediately. */
1072 addReply(c
, shared
.nullbulk
);
1074 /* The list is empty and the client blocks. */
1075 blockForKeys(c
, c
->argv
+ 1, 1, timeout
, c
->argv
[2]);
1078 if (key
->type
!= REDIS_LIST
) {
1079 addReply(c
, shared
.wrongtypeerr
);
1081 /* The list exists and has elements, so
1082 * the regular rpoplpushCommand is executed. */
1083 redisAssertWithInfo(c
,key
,listTypeLength(key
) > 0);
1084 rpoplpushCommand(c
);