2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
32 void signalListAsReady(redisClient
*c
, robj
*key
);
34 /*-----------------------------------------------------------------------------
36 *----------------------------------------------------------------------------*/
38 /* Check the argument length to see if it requires us to convert the ziplist
39 * to a real list. Only check raw-encoded objects because integer encoded
40 * objects are never too long. */
41 void listTypeTryConversion(robj
*subject
, robj
*value
) {
42 if (subject
->encoding
!= REDIS_ENCODING_ZIPLIST
) return;
43 if (value
->encoding
== REDIS_ENCODING_RAW
&&
44 sdslen(value
->ptr
) > server
.list_max_ziplist_value
)
45 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
48 /* The function pushes an elmenet to the specified list object 'subject',
49 * at head or tail position as specified by 'where'.
51 * There is no need for the caller to incremnet the refcount of 'value' as
52 * the function takes care of it if needed. */
53 void listTypePush(robj
*subject
, robj
*value
, int where
) {
54 /* Check if we need to convert the ziplist */
55 listTypeTryConversion(subject
,value
);
56 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
&&
57 ziplistLen(subject
->ptr
) >= server
.list_max_ziplist_entries
)
58 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
60 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
61 int pos
= (where
== REDIS_HEAD
) ? ZIPLIST_HEAD
: ZIPLIST_TAIL
;
62 value
= getDecodedObject(value
);
63 subject
->ptr
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),pos
);
65 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
66 if (where
== REDIS_HEAD
) {
67 listAddNodeHead(subject
->ptr
,value
);
69 listAddNodeTail(subject
->ptr
,value
);
73 redisPanic("Unknown list encoding");
77 robj
*listTypePop(robj
*subject
, int where
) {
79 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
84 int pos
= (where
== REDIS_HEAD
) ? 0 : -1;
85 p
= ziplistIndex(subject
->ptr
,pos
);
86 if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) {
88 value
= createStringObject((char*)vstr
,vlen
);
90 value
= createStringObjectFromLongLong(vlong
);
92 /* We only need to delete an element when it exists */
93 subject
->ptr
= ziplistDelete(subject
->ptr
,&p
);
95 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
96 list
*list
= subject
->ptr
;
98 if (where
== REDIS_HEAD
) {
104 value
= listNodeValue(ln
);
106 listDelNode(list
,ln
);
109 redisPanic("Unknown list encoding");
114 unsigned long listTypeLength(robj
*subject
) {
115 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
116 return ziplistLen(subject
->ptr
);
117 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
118 return listLength((list
*)subject
->ptr
);
120 redisPanic("Unknown list encoding");
124 /* Initialize an iterator at the specified index. */
125 listTypeIterator
*listTypeInitIterator(robj
*subject
, long index
, unsigned char direction
) {
126 listTypeIterator
*li
= zmalloc(sizeof(listTypeIterator
));
127 li
->subject
= subject
;
128 li
->encoding
= subject
->encoding
;
129 li
->direction
= direction
;
130 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
131 li
->zi
= ziplistIndex(subject
->ptr
,index
);
132 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
133 li
->ln
= listIndex(subject
->ptr
,index
);
135 redisPanic("Unknown list encoding");
140 /* Clean up the iterator. */
141 void listTypeReleaseIterator(listTypeIterator
*li
) {
145 /* Stores pointer to current the entry in the provided entry structure
146 * and advances the position of the iterator. Returns 1 when the current
147 * entry is in fact an entry, 0 otherwise. */
148 int listTypeNext(listTypeIterator
*li
, listTypeEntry
*entry
) {
149 /* Protect from converting when iterating */
150 redisAssert(li
->subject
->encoding
== li
->encoding
);
153 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
155 if (entry
->zi
!= NULL
) {
156 if (li
->direction
== REDIS_TAIL
)
157 li
->zi
= ziplistNext(li
->subject
->ptr
,li
->zi
);
159 li
->zi
= ziplistPrev(li
->subject
->ptr
,li
->zi
);
162 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
164 if (entry
->ln
!= NULL
) {
165 if (li
->direction
== REDIS_TAIL
)
166 li
->ln
= li
->ln
->next
;
168 li
->ln
= li
->ln
->prev
;
172 redisPanic("Unknown list encoding");
177 /* Return entry or NULL at the current position of the iterator. */
178 robj
*listTypeGet(listTypeEntry
*entry
) {
179 listTypeIterator
*li
= entry
->li
;
181 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
185 redisAssert(entry
->zi
!= NULL
);
186 if (ziplistGet(entry
->zi
,&vstr
,&vlen
,&vlong
)) {
188 value
= createStringObject((char*)vstr
,vlen
);
190 value
= createStringObjectFromLongLong(vlong
);
193 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
194 redisAssert(entry
->ln
!= NULL
);
195 value
= listNodeValue(entry
->ln
);
198 redisPanic("Unknown list encoding");
203 void listTypeInsert(listTypeEntry
*entry
, robj
*value
, int where
) {
204 robj
*subject
= entry
->li
->subject
;
205 if (entry
->li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
206 value
= getDecodedObject(value
);
207 if (where
== REDIS_TAIL
) {
208 unsigned char *next
= ziplistNext(subject
->ptr
,entry
->zi
);
210 /* When we insert after the current element, but the current element
211 * is the tail of the list, we need to do a push. */
213 subject
->ptr
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),REDIS_TAIL
);
215 subject
->ptr
= ziplistInsert(subject
->ptr
,next
,value
->ptr
,sdslen(value
->ptr
));
218 subject
->ptr
= ziplistInsert(subject
->ptr
,entry
->zi
,value
->ptr
,sdslen(value
->ptr
));
221 } else if (entry
->li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
222 if (where
== REDIS_TAIL
) {
223 listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_TAIL
);
225 listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_HEAD
);
229 redisPanic("Unknown list encoding");
233 /* Compare the given object with the entry at the current position. */
234 int listTypeEqual(listTypeEntry
*entry
, robj
*o
) {
235 listTypeIterator
*li
= entry
->li
;
236 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
237 redisAssertWithInfo(NULL
,o
,o
->encoding
== REDIS_ENCODING_RAW
);
238 return ziplistCompare(entry
->zi
,o
->ptr
,sdslen(o
->ptr
));
239 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
240 return equalStringObjects(o
,listNodeValue(entry
->ln
));
242 redisPanic("Unknown list encoding");
246 /* Delete the element pointed to. */
247 void listTypeDelete(listTypeEntry
*entry
) {
248 listTypeIterator
*li
= entry
->li
;
249 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
250 unsigned char *p
= entry
->zi
;
251 li
->subject
->ptr
= ziplistDelete(li
->subject
->ptr
,&p
);
253 /* Update position of the iterator depending on the direction */
254 if (li
->direction
== REDIS_TAIL
)
257 li
->zi
= ziplistPrev(li
->subject
->ptr
,p
);
258 } else if (entry
->li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
260 if (li
->direction
== REDIS_TAIL
)
261 next
= entry
->ln
->next
;
263 next
= entry
->ln
->prev
;
264 listDelNode(li
->subject
->ptr
,entry
->ln
);
267 redisPanic("Unknown list encoding");
271 void listTypeConvert(robj
*subject
, int enc
) {
272 listTypeIterator
*li
;
274 redisAssertWithInfo(NULL
,subject
,subject
->type
== REDIS_LIST
);
276 if (enc
== REDIS_ENCODING_LINKEDLIST
) {
277 list
*l
= listCreate();
278 listSetFreeMethod(l
,decrRefCount
);
280 /* listTypeGet returns a robj with incremented refcount */
281 li
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
282 while (listTypeNext(li
,&entry
)) listAddNodeTail(l
,listTypeGet(&entry
));
283 listTypeReleaseIterator(li
);
285 subject
->encoding
= REDIS_ENCODING_LINKEDLIST
;
289 redisPanic("Unsupported list conversion");
293 /*-----------------------------------------------------------------------------
295 *----------------------------------------------------------------------------*/
297 void pushGenericCommand(redisClient
*c
, int where
) {
298 int j
, waiting
= 0, pushed
= 0;
299 robj
*lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
300 int may_have_waiting_clients
= (lobj
== NULL
);
302 if (lobj
&& lobj
->type
!= REDIS_LIST
) {
303 addReply(c
,shared
.wrongtypeerr
);
307 if (may_have_waiting_clients
) signalListAsReady(c
,c
->argv
[1]);
309 for (j
= 2; j
< c
->argc
; j
++) {
310 c
->argv
[j
] = tryObjectEncoding(c
->argv
[j
]);
312 lobj
= createZiplistObject();
313 dbAdd(c
->db
,c
->argv
[1],lobj
);
315 listTypePush(lobj
,c
->argv
[j
],where
);
318 addReplyLongLong(c
, waiting
+ (lobj
? listTypeLength(lobj
) : 0));
319 if (pushed
) signalModifiedKey(c
->db
,c
->argv
[1]);
320 server
.dirty
+= pushed
;
323 void lpushCommand(redisClient
*c
) {
324 pushGenericCommand(c
,REDIS_HEAD
);
327 void rpushCommand(redisClient
*c
) {
328 pushGenericCommand(c
,REDIS_TAIL
);
331 void pushxGenericCommand(redisClient
*c
, robj
*refval
, robj
*val
, int where
) {
333 listTypeIterator
*iter
;
337 if ((subject
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL
||
338 checkType(c
,subject
,REDIS_LIST
)) return;
340 if (refval
!= NULL
) {
341 /* Note: we expect refval to be string-encoded because it is *not* the
342 * last argument of the multi-bulk LINSERT. */
343 redisAssertWithInfo(c
,refval
,refval
->encoding
== REDIS_ENCODING_RAW
);
345 /* We're not sure if this value can be inserted yet, but we cannot
346 * convert the list inside the iterator. We don't want to loop over
347 * the list twice (once to see if the value can be inserted and once
348 * to do the actual insert), so we assume this value can be inserted
349 * and convert the ziplist to a regular list if necessary. */
350 listTypeTryConversion(subject
,val
);
352 /* Seek refval from head to tail */
353 iter
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
354 while (listTypeNext(iter
,&entry
)) {
355 if (listTypeEqual(&entry
,refval
)) {
356 listTypeInsert(&entry
,val
,where
);
361 listTypeReleaseIterator(iter
);
364 /* Check if the length exceeds the ziplist length threshold. */
365 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
&&
366 ziplistLen(subject
->ptr
) > server
.list_max_ziplist_entries
)
367 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
368 signalModifiedKey(c
->db
,c
->argv
[1]);
371 /* Notify client of a failed insert */
372 addReply(c
,shared
.cnegone
);
376 listTypePush(subject
,val
,where
);
377 signalModifiedKey(c
->db
,c
->argv
[1]);
381 addReplyLongLong(c
,listTypeLength(subject
));
384 void lpushxCommand(redisClient
*c
) {
385 c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
386 pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_HEAD
);
389 void rpushxCommand(redisClient
*c
) {
390 c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
391 pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_TAIL
);
394 void linsertCommand(redisClient
*c
) {
395 c
->argv
[4] = tryObjectEncoding(c
->argv
[4]);
396 if (strcasecmp(c
->argv
[2]->ptr
,"after") == 0) {
397 pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_TAIL
);
398 } else if (strcasecmp(c
->argv
[2]->ptr
,"before") == 0) {
399 pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_HEAD
);
401 addReply(c
,shared
.syntaxerr
);
405 void llenCommand(redisClient
*c
) {
406 robj
*o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
);
407 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
408 addReplyLongLong(c
,listTypeLength(o
));
411 void lindexCommand(redisClient
*c
) {
412 robj
*o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
);
413 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
417 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &index
, NULL
) != REDIS_OK
))
420 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
425 p
= ziplistIndex(o
->ptr
,index
);
426 if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) {
428 value
= createStringObject((char*)vstr
,vlen
);
430 value
= createStringObjectFromLongLong(vlong
);
432 addReplyBulk(c
,value
);
435 addReply(c
,shared
.nullbulk
);
437 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
438 listNode
*ln
= listIndex(o
->ptr
,index
);
440 value
= listNodeValue(ln
);
441 addReplyBulk(c
,value
);
443 addReply(c
,shared
.nullbulk
);
446 redisPanic("Unknown list encoding");
450 void lsetCommand(redisClient
*c
) {
451 robj
*o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nokeyerr
);
452 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
454 robj
*value
= (c
->argv
[3] = tryObjectEncoding(c
->argv
[3]));
456 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &index
, NULL
) != REDIS_OK
))
459 listTypeTryConversion(o
,value
);
460 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
461 unsigned char *p
, *zl
= o
->ptr
;
462 p
= ziplistIndex(zl
,index
);
464 addReply(c
,shared
.outofrangeerr
);
466 o
->ptr
= ziplistDelete(o
->ptr
,&p
);
467 value
= getDecodedObject(value
);
468 o
->ptr
= ziplistInsert(o
->ptr
,p
,value
->ptr
,sdslen(value
->ptr
));
470 addReply(c
,shared
.ok
);
471 signalModifiedKey(c
->db
,c
->argv
[1]);
474 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
475 listNode
*ln
= listIndex(o
->ptr
,index
);
477 addReply(c
,shared
.outofrangeerr
);
479 decrRefCount((robj
*)listNodeValue(ln
));
480 listNodeValue(ln
) = value
;
482 addReply(c
,shared
.ok
);
483 signalModifiedKey(c
->db
,c
->argv
[1]);
487 redisPanic("Unknown list encoding");
491 void popGenericCommand(redisClient
*c
, int where
) {
492 robj
*o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
);
493 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
495 robj
*value
= listTypePop(o
,where
);
497 addReply(c
,shared
.nullbulk
);
499 addReplyBulk(c
,value
);
501 if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]);
502 signalModifiedKey(c
->db
,c
->argv
[1]);
507 void lpopCommand(redisClient
*c
) {
508 popGenericCommand(c
,REDIS_HEAD
);
511 void rpopCommand(redisClient
*c
) {
512 popGenericCommand(c
,REDIS_TAIL
);
515 void lrangeCommand(redisClient
*c
) {
517 long start
, end
, llen
, rangelen
;
519 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
520 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
522 if ((o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.emptymultibulk
)) == NULL
523 || checkType(c
,o
,REDIS_LIST
)) return;
524 llen
= listTypeLength(o
);
526 /* convert negative indexes */
527 if (start
< 0) start
= llen
+start
;
528 if (end
< 0) end
= llen
+end
;
529 if (start
< 0) start
= 0;
531 /* Invariant: start >= 0, so this test will be true when end < 0.
532 * The range is empty when start > end or start >= length. */
533 if (start
> end
|| start
>= llen
) {
534 addReply(c
,shared
.emptymultibulk
);
537 if (end
>= llen
) end
= llen
-1;
538 rangelen
= (end
-start
)+1;
540 /* Return the result in form of a multi-bulk reply */
541 addReplyMultiBulkLen(c
,rangelen
);
542 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
543 unsigned char *p
= ziplistIndex(o
->ptr
,start
);
549 ziplistGet(p
,&vstr
,&vlen
,&vlong
);
551 addReplyBulkCBuffer(c
,vstr
,vlen
);
553 addReplyBulkLongLong(c
,vlong
);
555 p
= ziplistNext(o
->ptr
,p
);
557 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
560 /* If we are nearest to the end of the list, reach the element
561 * starting from tail and going backward, as it is faster. */
562 if (start
> llen
/2) start
-= llen
;
563 ln
= listIndex(o
->ptr
,start
);
566 addReplyBulk(c
,ln
->value
);
570 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
574 void ltrimCommand(redisClient
*c
) {
576 long start
, end
, llen
, j
, ltrim
, rtrim
;
580 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
581 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
583 if ((o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.ok
)) == NULL
||
584 checkType(c
,o
,REDIS_LIST
)) return;
585 llen
= listTypeLength(o
);
587 /* convert negative indexes */
588 if (start
< 0) start
= llen
+start
;
589 if (end
< 0) end
= llen
+end
;
590 if (start
< 0) start
= 0;
592 /* Invariant: start >= 0, so this test will be true when end < 0.
593 * The range is empty when start > end or start >= length. */
594 if (start
> end
|| start
>= llen
) {
595 /* Out of range start or start > end result in empty list */
599 if (end
>= llen
) end
= llen
-1;
604 /* Remove list elements to perform the trim */
605 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
606 o
->ptr
= ziplistDeleteRange(o
->ptr
,0,ltrim
);
607 o
->ptr
= ziplistDeleteRange(o
->ptr
,-rtrim
,rtrim
);
608 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
610 for (j
= 0; j
< ltrim
; j
++) {
611 ln
= listFirst(list
);
612 listDelNode(list
,ln
);
614 for (j
= 0; j
< rtrim
; j
++) {
616 listDelNode(list
,ln
);
619 redisPanic("Unknown list encoding");
621 if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]);
622 signalModifiedKey(c
->db
,c
->argv
[1]);
624 addReply(c
,shared
.ok
);
627 void lremCommand(redisClient
*c
) {
629 obj
= c
->argv
[3] = tryObjectEncoding(c
->argv
[3]);
634 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &toremove
, NULL
) != REDIS_OK
))
637 subject
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
);
638 if (subject
== NULL
|| checkType(c
,subject
,REDIS_LIST
)) return;
640 /* Make sure obj is raw when we're dealing with a ziplist */
641 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
)
642 obj
= getDecodedObject(obj
);
644 listTypeIterator
*li
;
646 toremove
= -toremove
;
647 li
= listTypeInitIterator(subject
,-1,REDIS_HEAD
);
649 li
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
652 while (listTypeNext(li
,&entry
)) {
653 if (listTypeEqual(&entry
,obj
)) {
654 listTypeDelete(&entry
);
657 if (toremove
&& removed
== toremove
) break;
660 listTypeReleaseIterator(li
);
662 /* Clean up raw encoded object */
663 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
)
666 if (listTypeLength(subject
) == 0) dbDelete(c
->db
,c
->argv
[1]);
667 addReplyLongLong(c
,removed
);
668 if (removed
) signalModifiedKey(c
->db
,c
->argv
[1]);
671 /* This is the semantic of this command:
672 * RPOPLPUSH srclist dstlist:
673 * IF LLEN(srclist) > 0
674 * element = RPOP srclist
675 * LPUSH dstlist element
682 * The idea is to be able to get an element from a list in a reliable way
683 * since the element is not just returned but pushed against another list
684 * as well. This command was originally proposed by Ezra Zygmuntowicz.
687 void rpoplpushHandlePush(redisClient
*c
, robj
*dstkey
, robj
*dstobj
, robj
*value
) {
688 /* Create the list if the key does not exist */
690 dstobj
= createZiplistObject();
691 dbAdd(c
->db
,dstkey
,dstobj
);
692 signalListAsReady(c
,dstkey
);
694 signalModifiedKey(c
->db
,dstkey
);
695 listTypePush(dstobj
,value
,REDIS_HEAD
);
696 /* Always send the pushed value to the client. */
697 addReplyBulk(c
,value
);
700 void rpoplpushCommand(redisClient
*c
) {
702 if ((sobj
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL
||
703 checkType(c
,sobj
,REDIS_LIST
)) return;
705 if (listTypeLength(sobj
) == 0) {
706 /* This may only happen after loading very old RDB files. Recent
707 * versions of Redis delete keys of empty lists. */
708 addReply(c
,shared
.nullbulk
);
710 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
711 robj
*touchedkey
= c
->argv
[1];
713 if (dobj
&& checkType(c
,dobj
,REDIS_LIST
)) return;
714 value
= listTypePop(sobj
,REDIS_TAIL
);
715 /* We saved touched key, and protect it, since rpoplpushHandlePush
716 * may change the client command argument vector (it does not
718 incrRefCount(touchedkey
);
719 rpoplpushHandlePush(c
,c
->argv
[2],dobj
,value
);
721 /* listTypePop returns an object with its refcount incremented */
724 /* Delete the source list when it is empty */
725 if (listTypeLength(sobj
) == 0) dbDelete(c
->db
,touchedkey
);
726 signalModifiedKey(c
->db
,touchedkey
);
727 decrRefCount(touchedkey
);
732 /*-----------------------------------------------------------------------------
733 * Blocking POP operations
734 *----------------------------------------------------------------------------*/
736 /* This is how the current blocking POP works, we use BLPOP as example:
737 * - If the user calls BLPOP and the key exists and contains a non empty list
738 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
739 * if blocking is not required.
740 * - If instead BLPOP is called and the key does not exists or the list is
741 * empty we need to block. In order to do so we remove the notification for
742 * new data to read in the client socket (so that we'll not serve new
743 * requests if the blocking request is not served). Also we put the client
744 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
745 * blocking for this keys.
746 * - If a PUSH operation against a key with blocked clients waiting is
747 * performed, we mark this key as "ready", and after the current command,
748 * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
749 * for this list, from the one that blocked first, to the last, accordingly
750 * to the number of elements we have in the ready list.
753 /* Set a client in blocking mode for the specified key, with the specified
755 void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
, robj
*target
) {
760 c
->bpop
.timeout
= timeout
;
761 c
->bpop
.target
= target
;
763 if (target
!= NULL
) incrRefCount(target
);
765 for (j
= 0; j
< numkeys
; j
++) {
766 /* If the key already exists in the dict ignore it. */
767 if (dictAdd(c
->bpop
.keys
,keys
[j
],NULL
) != DICT_OK
) continue;
768 incrRefCount(keys
[j
]);
770 /* And in the other "side", to map keys -> clients */
771 de
= dictFind(c
->db
->blocking_keys
,keys
[j
]);
775 /* For every key we take a list of clients blocked for it */
777 retval
= dictAdd(c
->db
->blocking_keys
,keys
[j
],l
);
778 incrRefCount(keys
[j
]);
779 redisAssertWithInfo(c
,keys
[j
],retval
== DICT_OK
);
783 listAddNodeTail(l
,c
);
786 /* Mark the client as a blocked client */
787 c
->flags
|= REDIS_BLOCKED
;
788 server
.bpop_blocked_clients
++;
791 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
792 void unblockClientWaitingData(redisClient
*c
) {
797 redisAssertWithInfo(c
,NULL
,dictSize(c
->bpop
.keys
) != 0);
798 di
= dictGetIterator(c
->bpop
.keys
);
799 /* The client may wait for multiple keys, so unblock it for every key. */
800 while((de
= dictNext(di
)) != NULL
) {
801 robj
*key
= dictGetKey(de
);
803 /* Remove this client from the list of clients waiting for this key. */
804 l
= dictFetchValue(c
->db
->blocking_keys
,key
);
805 redisAssertWithInfo(c
,key
,l
!= NULL
);
806 listDelNode(l
,listSearchKey(l
,c
));
807 /* If the list is empty we need to remove it to avoid wasting memory */
808 if (listLength(l
) == 0)
809 dictDelete(c
->db
->blocking_keys
,key
);
811 dictReleaseIterator(di
);
813 /* Cleanup the client structure */
814 dictEmpty(c
->bpop
.keys
);
815 if (c
->bpop
.target
) {
816 decrRefCount(c
->bpop
.target
);
817 c
->bpop
.target
= NULL
;
819 c
->flags
&= ~REDIS_BLOCKED
;
820 c
->flags
|= REDIS_UNBLOCKED
;
821 server
.bpop_blocked_clients
--;
822 listAddNodeTail(server
.unblocked_clients
,c
);
825 /* If the specified key has clients blocked waiting for list pushes, this
826 * function will put the key reference into the server.ready_keys list.
827 * Note that db->ready_keys is an hash table that allows us to avoid putting
828 * the same key agains and again in the list in case of multiple pushes
829 * made by a script or in the context of MULTI/EXEC.
831 * The list will be finally processed by handleClientsBlockedOnLists() */
832 void signalListAsReady(redisClient
*c
, robj
*key
) {
835 /* No clients blocking for this key? No need to queue it. */
836 if (dictFind(c
->db
->blocking_keys
,key
) == NULL
) return;
838 /* Key was already signaled? No need to queue it again. */
839 if (dictFind(c
->db
->ready_keys
,key
) != NULL
) return;
841 /* Ok, we need to queue this key into server.ready_keys. */
842 rl
= zmalloc(sizeof(*rl
));
846 listAddNodeTail(server
.ready_keys
,rl
);
848 /* We also add the key in the db->ready_keys dictionary in order
849 * to avoid adding it multiple times into a list with a simple O(1)
852 redisAssert(dictAdd(c
->db
->ready_keys
,key
,NULL
) == DICT_OK
);
855 /* This is an helper function for handleClientsBlockedOnLists(). It's work
856 * is to serve a specific client (receiver) that is blocked on 'key'
857 * in the context of the specified 'db', doing the following:
859 * 1) Provide the client with the 'value' element.
860 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
861 * 'value' element on the destionation list (the LPUSH side of the command).
862 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
863 * the AOF and replication channel.
865 * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
866 * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
867 * we can propagate the command properly.
869 * The function returns REDIS_OK if we are able to serve the client, otherwise
870 * REDIS_ERR is returned to signal the caller that the list POP operation
871 * should be undoed as the client was not served: This only happens for
872 * BRPOPLPUSH that fails to push the value to the destination key as it is
873 * of the wrong type. */
874 int serveClientBlockedOnList(redisClient
*receiver
, robj
*key
, robj
*dstkey
, redisDb
*db
, robj
*value
, int where
)
878 if (dstkey
== NULL
) {
879 /* Propagate the [LR]POP operation. */
880 argv
[0] = (where
== REDIS_HEAD
) ? shared
.lpop
:
883 propagate((where
== REDIS_HEAD
) ?
884 server
.lpopCommand
: server
.rpopCommand
,
885 db
->id
,argv
,2,REDIS_PROPAGATE_AOF
|REDIS_PROPAGATE_REPL
);
888 addReplyMultiBulkLen(receiver
,2);
889 addReplyBulk(receiver
,key
);
890 addReplyBulk(receiver
,value
);
894 lookupKeyWrite(receiver
->db
,dstkey
);
896 checkType(receiver
,dstobj
,REDIS_LIST
)))
898 /* Propagate the RPOP operation. */
899 argv
[0] = shared
.rpop
;
901 propagate(server
.rpopCommand
,
904 REDIS_PROPAGATE_REPL
);
905 rpoplpushHandlePush(receiver
,dstkey
,dstobj
,
907 /* Propagate the LPUSH operation. */
908 argv
[0] = shared
.lpush
;
911 propagate(server
.lpushCommand
,
914 REDIS_PROPAGATE_REPL
);
916 /* BRPOPLPUSH failed because of wrong
917 * destination type. */
924 /* This function should be called by Redis every time a single command,
925 * a MULTI/EXEC block, or a Lua script, terminated its execution after
926 * being called by a client.
928 * All the keys with at least one client blocked that received at least
929 * one new element via some PUSH operation are accumulated into
930 * the server.ready_keys list. This function will run the list and will
931 * serve clients accordingly. Note that the function will iterate again and
932 * again as a result of serving BRPOPLPUSH we can have new blocking clients
933 * to serve because of the PUSH side of BRPOPLPUSH. */
934 void handleClientsBlockedOnLists(void) {
935 while(listLength(server
.ready_keys
) != 0) {
938 /* Point server.ready_keys to a fresh list and save the current one
939 * locally. This way as we run the old list we are free to call
940 * signalListAsReady() that may push new elements in server.ready_keys
941 * when handling clients blocked into BRPOPLPUSH. */
942 l
= server
.ready_keys
;
943 server
.ready_keys
= listCreate();
945 while(listLength(l
) != 0) {
946 listNode
*ln
= listFirst(l
);
947 readyList
*rl
= ln
->value
;
949 /* First of all remove this key from db->ready_keys so that
950 * we can safely call signalListAsReady() against this key. */
951 dictDelete(rl
->db
->ready_keys
,rl
->key
);
953 /* If the key exists and it's a list, serve blocked clients
955 robj
*o
= lookupKeyWrite(rl
->db
,rl
->key
);
956 if (o
!= NULL
&& o
->type
== REDIS_LIST
) {
959 /* We serve clients in the same order they blocked for
960 * this key, from the first blocked to the last. */
961 de
= dictFind(rl
->db
->blocking_keys
,rl
->key
);
963 list
*clients
= dictGetVal(de
);
964 int numclients
= listLength(clients
);
966 while(numclients
--) {
967 listNode
*clientnode
= listFirst(clients
);
968 redisClient
*receiver
= clientnode
->value
;
969 robj
*dstkey
= receiver
->bpop
.target
;
970 int where
= (receiver
->lastcmd
&&
971 receiver
->lastcmd
->proc
== blpopCommand
) ?
972 REDIS_HEAD
: REDIS_TAIL
;
973 robj
*value
= listTypePop(o
,where
);
976 /* Protect receiver->bpop.target, that will be
977 * freed by the next unblockClientWaitingData()
979 if (dstkey
) incrRefCount(dstkey
);
980 unblockClientWaitingData(receiver
);
982 if (serveClientBlockedOnList(receiver
,
983 rl
->key
,dstkey
,rl
->db
,value
,
986 /* If we failed serving the client we need
987 * to also undo the POP operation. */
988 listTypePush(o
,value
,where
);
991 if (dstkey
) decrRefCount(dstkey
);
999 if (listTypeLength(o
) == 0) dbDelete(rl
->db
,rl
->key
);
1000 /* We don't call signalModifiedKey() as it was already called
1001 * when an element was pushed on the list. */
1004 /* Free this item. */
1005 decrRefCount(rl
->key
);
1009 listRelease(l
); /* We have the new list on place at this point. */
1013 int getTimeoutFromObjectOrReply(redisClient
*c
, robj
*object
, time_t *timeout
) {
1016 if (getLongFromObjectOrReply(c
,object
,&tval
,
1017 "timeout is not an integer or out of range") != REDIS_OK
)
1021 addReplyError(c
,"timeout is negative");
1025 if (tval
> 0) tval
+= server
.unixtime
;
1031 /* Blocking RPOP/LPOP */
1032 void blockingPopGenericCommand(redisClient
*c
, int where
) {
1037 if (getTimeoutFromObjectOrReply(c
,c
->argv
[c
->argc
-1],&timeout
) != REDIS_OK
)
1040 for (j
= 1; j
< c
->argc
-1; j
++) {
1041 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
1043 if (o
->type
!= REDIS_LIST
) {
1044 addReply(c
,shared
.wrongtypeerr
);
1047 if (listTypeLength(o
) != 0) {
1048 /* Non empty list, this is like a non normal [LR]POP. */
1049 robj
*value
= listTypePop(o
,where
);
1050 redisAssert(value
!= NULL
);
1052 addReplyMultiBulkLen(c
,2);
1053 addReplyBulk(c
,c
->argv
[j
]);
1054 addReplyBulk(c
,value
);
1055 decrRefCount(value
);
1056 if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[j
]);
1057 signalModifiedKey(c
->db
,c
->argv
[j
]);
1060 /* Replicate it as an [LR]POP instead of B[LR]POP. */
1061 rewriteClientCommandVector(c
,2,
1062 (where
== REDIS_HEAD
) ? shared
.lpop
: shared
.rpop
,
1070 /* If we are inside a MULTI/EXEC and the list is empty the only thing
1071 * we can do is treating it as a timeout (even with timeout 0). */
1072 if (c
->flags
& REDIS_MULTI
) {
1073 addReply(c
,shared
.nullmultibulk
);
1077 /* If the list is empty or the key does not exists we must block */
1078 blockForKeys(c
, c
->argv
+ 1, c
->argc
- 2, timeout
, NULL
);
1081 void blpopCommand(redisClient
*c
) {
1082 blockingPopGenericCommand(c
,REDIS_HEAD
);
1085 void brpopCommand(redisClient
*c
) {
1086 blockingPopGenericCommand(c
,REDIS_TAIL
);
1089 void brpoplpushCommand(redisClient
*c
) {
1092 if (getTimeoutFromObjectOrReply(c
,c
->argv
[3],&timeout
) != REDIS_OK
)
1095 robj
*key
= lookupKeyWrite(c
->db
, c
->argv
[1]);
1098 if (c
->flags
& REDIS_MULTI
) {
1099 /* Blocking against an empty list in a multi state
1100 * returns immediately. */
1101 addReply(c
, shared
.nullbulk
);
1103 /* The list is empty and the client blocks. */
1104 blockForKeys(c
, c
->argv
+ 1, 1, timeout
, c
->argv
[2]);
1107 if (key
->type
!= REDIS_LIST
) {
1108 addReply(c
, shared
.wrongtypeerr
);
1110 /* The list exists and has elements, so
1111 * the regular rpoplpushCommand is executed. */
1112 redisAssertWithInfo(c
,key
,listTypeLength(key
) > 0);
1113 rpoplpushCommand(c
);