3 /*-----------------------------------------------------------------------------
5 *----------------------------------------------------------------------------*/
7 /* Check the argument length to see if it requires us to convert the ziplist
8 * to a real list. Only check raw-encoded objects because integer encoded
9 * objects are never too long. */
10 void listTypeTryConversion(robj
*subject
, robj
*value
) {
11 if (subject
->encoding
!= REDIS_ENCODING_ZIPLIST
) return;
12 if (value
->encoding
== REDIS_ENCODING_RAW
&&
13 sdslen(value
->ptr
) > server
.list_max_ziplist_value
)
14 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
17 void listTypePush(robj
*subject
, robj
*value
, int where
) {
18 /* Check if we need to convert the ziplist */
19 listTypeTryConversion(subject
,value
);
20 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
&&
21 ziplistLen(subject
->ptr
) >= server
.list_max_ziplist_entries
)
22 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
24 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
25 int pos
= (where
== REDIS_HEAD
) ? ZIPLIST_HEAD
: ZIPLIST_TAIL
;
26 value
= getDecodedObject(value
);
27 subject
->ptr
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),pos
);
29 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
30 if (where
== REDIS_HEAD
) {
31 listAddNodeHead(subject
->ptr
,value
);
33 listAddNodeTail(subject
->ptr
,value
);
37 redisPanic("Unknown list encoding");
41 robj
*listTypePop(robj
*subject
, int where
) {
43 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
48 int pos
= (where
== REDIS_HEAD
) ? 0 : -1;
49 p
= ziplistIndex(subject
->ptr
,pos
);
50 if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) {
52 value
= createStringObject((char*)vstr
,vlen
);
54 value
= createStringObjectFromLongLong(vlong
);
56 /* We only need to delete an element when it exists */
57 subject
->ptr
= ziplistDelete(subject
->ptr
,&p
);
59 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
60 list
*list
= subject
->ptr
;
62 if (where
== REDIS_HEAD
) {
68 value
= listNodeValue(ln
);
73 redisPanic("Unknown list encoding");
78 unsigned long listTypeLength(robj
*subject
) {
79 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
) {
80 return ziplistLen(subject
->ptr
);
81 } else if (subject
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
82 return listLength((list
*)subject
->ptr
);
84 redisPanic("Unknown list encoding");
88 /* Initialize an iterator at the specified index. */
89 listTypeIterator
*listTypeInitIterator(robj
*subject
, long index
, unsigned char direction
) {
90 listTypeIterator
*li
= zmalloc(sizeof(listTypeIterator
));
91 li
->subject
= subject
;
92 li
->encoding
= subject
->encoding
;
93 li
->direction
= direction
;
94 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
95 li
->zi
= ziplistIndex(subject
->ptr
,index
);
96 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
97 li
->ln
= listIndex(subject
->ptr
,index
);
99 redisPanic("Unknown list encoding");
104 /* Clean up the iterator. */
105 void listTypeReleaseIterator(listTypeIterator
*li
) {
109 /* Stores pointer to current the entry in the provided entry structure
110 * and advances the position of the iterator. Returns 1 when the current
111 * entry is in fact an entry, 0 otherwise. */
112 int listTypeNext(listTypeIterator
*li
, listTypeEntry
*entry
) {
113 /* Protect from converting when iterating */
114 redisAssert(li
->subject
->encoding
== li
->encoding
);
117 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
119 if (entry
->zi
!= NULL
) {
120 if (li
->direction
== REDIS_TAIL
)
121 li
->zi
= ziplistNext(li
->subject
->ptr
,li
->zi
);
123 li
->zi
= ziplistPrev(li
->subject
->ptr
,li
->zi
);
126 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
128 if (entry
->ln
!= NULL
) {
129 if (li
->direction
== REDIS_TAIL
)
130 li
->ln
= li
->ln
->next
;
132 li
->ln
= li
->ln
->prev
;
136 redisPanic("Unknown list encoding");
141 /* Return entry or NULL at the current position of the iterator. */
142 robj
*listTypeGet(listTypeEntry
*entry
) {
143 listTypeIterator
*li
= entry
->li
;
145 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
149 redisAssert(entry
->zi
!= NULL
);
150 if (ziplistGet(entry
->zi
,&vstr
,&vlen
,&vlong
)) {
152 value
= createStringObject((char*)vstr
,vlen
);
154 value
= createStringObjectFromLongLong(vlong
);
157 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
158 redisAssert(entry
->ln
!= NULL
);
159 value
= listNodeValue(entry
->ln
);
162 redisPanic("Unknown list encoding");
167 void listTypeInsert(listTypeEntry
*entry
, robj
*value
, int where
) {
168 robj
*subject
= entry
->li
->subject
;
169 if (entry
->li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
170 value
= getDecodedObject(value
);
171 if (where
== REDIS_TAIL
) {
172 unsigned char *next
= ziplistNext(subject
->ptr
,entry
->zi
);
174 /* When we insert after the current element, but the current element
175 * is the tail of the list, we need to do a push. */
177 subject
->ptr
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),REDIS_TAIL
);
179 subject
->ptr
= ziplistInsert(subject
->ptr
,next
,value
->ptr
,sdslen(value
->ptr
));
182 subject
->ptr
= ziplistInsert(subject
->ptr
,entry
->zi
,value
->ptr
,sdslen(value
->ptr
));
185 } else if (entry
->li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
186 if (where
== REDIS_TAIL
) {
187 listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_TAIL
);
189 listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_HEAD
);
193 redisPanic("Unknown list encoding");
197 /* Compare the given object with the entry at the current position. */
198 int listTypeEqual(listTypeEntry
*entry
, robj
*o
) {
199 listTypeIterator
*li
= entry
->li
;
200 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
201 redisAssertWithInfo(NULL
,o
,o
->encoding
== REDIS_ENCODING_RAW
);
202 return ziplistCompare(entry
->zi
,o
->ptr
,sdslen(o
->ptr
));
203 } else if (li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
204 return equalStringObjects(o
,listNodeValue(entry
->ln
));
206 redisPanic("Unknown list encoding");
210 /* Delete the element pointed to. */
211 void listTypeDelete(listTypeEntry
*entry
) {
212 listTypeIterator
*li
= entry
->li
;
213 if (li
->encoding
== REDIS_ENCODING_ZIPLIST
) {
214 unsigned char *p
= entry
->zi
;
215 li
->subject
->ptr
= ziplistDelete(li
->subject
->ptr
,&p
);
217 /* Update position of the iterator depending on the direction */
218 if (li
->direction
== REDIS_TAIL
)
221 li
->zi
= ziplistPrev(li
->subject
->ptr
,p
);
222 } else if (entry
->li
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
224 if (li
->direction
== REDIS_TAIL
)
225 next
= entry
->ln
->next
;
227 next
= entry
->ln
->prev
;
228 listDelNode(li
->subject
->ptr
,entry
->ln
);
231 redisPanic("Unknown list encoding");
235 void listTypeConvert(robj
*subject
, int enc
) {
236 listTypeIterator
*li
;
238 redisAssertWithInfo(NULL
,subject
,subject
->type
== REDIS_LIST
);
240 if (enc
== REDIS_ENCODING_LINKEDLIST
) {
241 list
*l
= listCreate();
242 listSetFreeMethod(l
,decrRefCount
);
244 /* listTypeGet returns a robj with incremented refcount */
245 li
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
246 while (listTypeNext(li
,&entry
)) listAddNodeTail(l
,listTypeGet(&entry
));
247 listTypeReleaseIterator(li
);
249 subject
->encoding
= REDIS_ENCODING_LINKEDLIST
;
253 redisPanic("Unsupported list conversion");
257 /*-----------------------------------------------------------------------------
259 *----------------------------------------------------------------------------*/
261 void pushGenericCommand(redisClient
*c
, int where
) {
262 int j
, waiting
= 0, pushed
= 0;
263 robj
*lobj
= lookupKeyWrite(c
->db
,c
->argv
[1]);
264 int may_have_waiting_clients
= (lobj
== NULL
);
266 if (lobj
&& lobj
->type
!= REDIS_LIST
) {
267 addReply(c
,shared
.wrongtypeerr
);
271 for (j
= 2; j
< c
->argc
; j
++) {
272 c
->argv
[j
] = tryObjectEncoding(c
->argv
[j
]);
273 if (may_have_waiting_clients
) {
274 if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[j
])) {
278 may_have_waiting_clients
= 0;
282 lobj
= createZiplistObject();
283 dbAdd(c
->db
,c
->argv
[1],lobj
);
285 listTypePush(lobj
,c
->argv
[j
],where
);
288 addReplyLongLong(c
, waiting
+ (lobj
? listTypeLength(lobj
) : 0));
289 if (pushed
) signalModifiedKey(c
->db
,c
->argv
[1]);
290 server
.dirty
+= pushed
;
292 /* Alter the replication of the command accordingly to the number of
293 * list elements delivered to clients waiting into a blocking operation.
294 * We do that only if there were waiting clients, and only if still some
295 * element was pushed into the list (othewise dirty is 0 and nothign will
297 if (waiting
&& pushed
) {
298 /* CMD KEY a b C D E */
299 for (j
= 0; j
< waiting
; j
++) decrRefCount(c
->argv
[j
+2]);
300 memmove(c
->argv
+2,c
->argv
+2+waiting
,sizeof(robj
*)*pushed
);
305 void lpushCommand(redisClient
*c
) {
306 pushGenericCommand(c
,REDIS_HEAD
);
309 void rpushCommand(redisClient
*c
) {
310 pushGenericCommand(c
,REDIS_TAIL
);
313 void pushxGenericCommand(redisClient
*c
, robj
*refval
, robj
*val
, int where
) {
315 listTypeIterator
*iter
;
319 if ((subject
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL
||
320 checkType(c
,subject
,REDIS_LIST
)) return;
322 if (refval
!= NULL
) {
323 /* Note: we expect refval to be string-encoded because it is *not* the
324 * last argument of the multi-bulk LINSERT. */
325 redisAssertWithInfo(c
,refval
,refval
->encoding
== REDIS_ENCODING_RAW
);
327 /* We're not sure if this value can be inserted yet, but we cannot
328 * convert the list inside the iterator. We don't want to loop over
329 * the list twice (once to see if the value can be inserted and once
330 * to do the actual insert), so we assume this value can be inserted
331 * and convert the ziplist to a regular list if necessary. */
332 listTypeTryConversion(subject
,val
);
334 /* Seek refval from head to tail */
335 iter
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
336 while (listTypeNext(iter
,&entry
)) {
337 if (listTypeEqual(&entry
,refval
)) {
338 listTypeInsert(&entry
,val
,where
);
343 listTypeReleaseIterator(iter
);
346 /* Check if the length exceeds the ziplist length threshold. */
347 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
&&
348 ziplistLen(subject
->ptr
) > server
.list_max_ziplist_entries
)
349 listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
);
350 signalModifiedKey(c
->db
,c
->argv
[1]);
353 /* Notify client of a failed insert */
354 addReply(c
,shared
.cnegone
);
358 listTypePush(subject
,val
,where
);
359 signalModifiedKey(c
->db
,c
->argv
[1]);
363 addReplyLongLong(c
,listTypeLength(subject
));
366 void lpushxCommand(redisClient
*c
) {
367 c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
368 pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_HEAD
);
371 void rpushxCommand(redisClient
*c
) {
372 c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
373 pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_TAIL
);
376 void linsertCommand(redisClient
*c
) {
377 c
->argv
[4] = tryObjectEncoding(c
->argv
[4]);
378 if (strcasecmp(c
->argv
[2]->ptr
,"after") == 0) {
379 pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_TAIL
);
380 } else if (strcasecmp(c
->argv
[2]->ptr
,"before") == 0) {
381 pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_HEAD
);
383 addReply(c
,shared
.syntaxerr
);
387 void llenCommand(redisClient
*c
) {
388 robj
*o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
);
389 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
390 addReplyLongLong(c
,listTypeLength(o
));
393 void lindexCommand(redisClient
*c
) {
394 robj
*o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
);
395 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
399 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &index
, NULL
) != REDIS_OK
))
402 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
407 p
= ziplistIndex(o
->ptr
,index
);
408 if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) {
410 value
= createStringObject((char*)vstr
,vlen
);
412 value
= createStringObjectFromLongLong(vlong
);
414 addReplyBulk(c
,value
);
417 addReply(c
,shared
.nullbulk
);
419 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
420 listNode
*ln
= listIndex(o
->ptr
,index
);
422 value
= listNodeValue(ln
);
423 addReplyBulk(c
,value
);
425 addReply(c
,shared
.nullbulk
);
428 redisPanic("Unknown list encoding");
432 void lsetCommand(redisClient
*c
) {
433 robj
*o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nokeyerr
);
434 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
436 robj
*value
= (c
->argv
[3] = tryObjectEncoding(c
->argv
[3]));
438 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &index
, NULL
) != REDIS_OK
))
441 listTypeTryConversion(o
,value
);
442 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
443 unsigned char *p
, *zl
= o
->ptr
;
444 p
= ziplistIndex(zl
,index
);
446 addReply(c
,shared
.outofrangeerr
);
448 o
->ptr
= ziplistDelete(o
->ptr
,&p
);
449 value
= getDecodedObject(value
);
450 o
->ptr
= ziplistInsert(o
->ptr
,p
,value
->ptr
,sdslen(value
->ptr
));
452 addReply(c
,shared
.ok
);
453 signalModifiedKey(c
->db
,c
->argv
[1]);
456 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
457 listNode
*ln
= listIndex(o
->ptr
,index
);
459 addReply(c
,shared
.outofrangeerr
);
461 decrRefCount((robj
*)listNodeValue(ln
));
462 listNodeValue(ln
) = value
;
464 addReply(c
,shared
.ok
);
465 signalModifiedKey(c
->db
,c
->argv
[1]);
469 redisPanic("Unknown list encoding");
473 void popGenericCommand(redisClient
*c
, int where
) {
474 robj
*o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
);
475 if (o
== NULL
|| checkType(c
,o
,REDIS_LIST
)) return;
477 robj
*value
= listTypePop(o
,where
);
479 addReply(c
,shared
.nullbulk
);
481 addReplyBulk(c
,value
);
483 if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]);
484 signalModifiedKey(c
->db
,c
->argv
[1]);
489 void lpopCommand(redisClient
*c
) {
490 popGenericCommand(c
,REDIS_HEAD
);
493 void rpopCommand(redisClient
*c
) {
494 popGenericCommand(c
,REDIS_TAIL
);
497 void lrangeCommand(redisClient
*c
) {
499 long start
, end
, llen
, rangelen
;
501 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
502 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
504 if ((o
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.emptymultibulk
)) == NULL
505 || checkType(c
,o
,REDIS_LIST
)) return;
506 llen
= listTypeLength(o
);
508 /* convert negative indexes */
509 if (start
< 0) start
= llen
+start
;
510 if (end
< 0) end
= llen
+end
;
511 if (start
< 0) start
= 0;
513 /* Invariant: start >= 0, so this test will be true when end < 0.
514 * The range is empty when start > end or start >= length. */
515 if (start
> end
|| start
>= llen
) {
516 addReply(c
,shared
.emptymultibulk
);
519 if (end
>= llen
) end
= llen
-1;
520 rangelen
= (end
-start
)+1;
522 /* Return the result in form of a multi-bulk reply */
523 addReplyMultiBulkLen(c
,rangelen
);
524 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
525 unsigned char *p
= ziplistIndex(o
->ptr
,start
);
531 ziplistGet(p
,&vstr
,&vlen
,&vlong
);
533 addReplyBulkCBuffer(c
,vstr
,vlen
);
535 addReplyBulkLongLong(c
,vlong
);
537 p
= ziplistNext(o
->ptr
,p
);
539 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
542 /* If we are nearest to the end of the list, reach the element
543 * starting from tail and going backward, as it is faster. */
544 if (start
> llen
/2) start
-= llen
;
545 ln
= listIndex(o
->ptr
,start
);
548 addReplyBulk(c
,ln
->value
);
552 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
556 void ltrimCommand(redisClient
*c
) {
558 long start
, end
, llen
, j
, ltrim
, rtrim
;
562 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
563 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
565 if ((o
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.ok
)) == NULL
||
566 checkType(c
,o
,REDIS_LIST
)) return;
567 llen
= listTypeLength(o
);
569 /* convert negative indexes */
570 if (start
< 0) start
= llen
+start
;
571 if (end
< 0) end
= llen
+end
;
572 if (start
< 0) start
= 0;
574 /* Invariant: start >= 0, so this test will be true when end < 0.
575 * The range is empty when start > end or start >= length. */
576 if (start
> end
|| start
>= llen
) {
577 /* Out of range start or start > end result in empty list */
581 if (end
>= llen
) end
= llen
-1;
586 /* Remove list elements to perform the trim */
587 if (o
->encoding
== REDIS_ENCODING_ZIPLIST
) {
588 o
->ptr
= ziplistDeleteRange(o
->ptr
,0,ltrim
);
589 o
->ptr
= ziplistDeleteRange(o
->ptr
,-rtrim
,rtrim
);
590 } else if (o
->encoding
== REDIS_ENCODING_LINKEDLIST
) {
592 for (j
= 0; j
< ltrim
; j
++) {
593 ln
= listFirst(list
);
594 listDelNode(list
,ln
);
596 for (j
= 0; j
< rtrim
; j
++) {
598 listDelNode(list
,ln
);
601 redisPanic("Unknown list encoding");
603 if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]);
604 signalModifiedKey(c
->db
,c
->argv
[1]);
606 addReply(c
,shared
.ok
);
609 void lremCommand(redisClient
*c
) {
611 obj
= c
->argv
[3] = tryObjectEncoding(c
->argv
[3]);
616 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &toremove
, NULL
) != REDIS_OK
))
619 subject
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
);
620 if (subject
== NULL
|| checkType(c
,subject
,REDIS_LIST
)) return;
622 /* Make sure obj is raw when we're dealing with a ziplist */
623 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
)
624 obj
= getDecodedObject(obj
);
626 listTypeIterator
*li
;
628 toremove
= -toremove
;
629 li
= listTypeInitIterator(subject
,-1,REDIS_HEAD
);
631 li
= listTypeInitIterator(subject
,0,REDIS_TAIL
);
634 while (listTypeNext(li
,&entry
)) {
635 if (listTypeEqual(&entry
,obj
)) {
636 listTypeDelete(&entry
);
639 if (toremove
&& removed
== toremove
) break;
642 listTypeReleaseIterator(li
);
644 /* Clean up raw encoded object */
645 if (subject
->encoding
== REDIS_ENCODING_ZIPLIST
)
648 if (listTypeLength(subject
) == 0) dbDelete(c
->db
,c
->argv
[1]);
649 addReplyLongLong(c
,removed
);
650 if (removed
) signalModifiedKey(c
->db
,c
->argv
[1]);
653 /* This is the semantic of this command:
654 * RPOPLPUSH srclist dstlist:
655 * IF LLEN(srclist) > 0
656 * element = RPOP srclist
657 * LPUSH dstlist element
664 * The idea is to be able to get an element from a list in a reliable way
665 * since the element is not just returned but pushed against another list
666 * as well. This command was originally proposed by Ezra Zygmuntowicz.
669 void rpoplpushHandlePush(redisClient
*origclient
, redisClient
*c
, robj
*dstkey
, robj
*dstobj
, robj
*value
) {
670 if (!handleClientsWaitingListPush(origclient
,dstkey
,value
)) {
671 /* Create the list if the key does not exist */
673 dstobj
= createZiplistObject();
674 dbAdd(c
->db
,dstkey
,dstobj
);
676 signalModifiedKey(c
->db
,dstkey
);
678 listTypePush(dstobj
,value
,REDIS_HEAD
);
679 /* Additionally propagate this PUSH operation together with
680 * the operation performed by the command. */
682 robj
**argv
= zmalloc(sizeof(robj
*)*3);
683 argv
[0] = createStringObject("LPUSH",5);
686 incrRefCount(argv
[1]);
687 incrRefCount(argv
[2]);
688 alsoPropagate(server
.lpushCommand
,c
->db
->id
,argv
,3,
689 REDIS_PROPAGATE_AOF
|REDIS_PROPAGATE_REPL
);
692 /* Always send the pushed value to the client. */
693 addReplyBulk(c
,value
);
696 void rpoplpushCommand(redisClient
*c
) {
698 if ((sobj
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL
||
699 checkType(c
,sobj
,REDIS_LIST
)) return;
701 if (listTypeLength(sobj
) == 0) {
702 addReply(c
,shared
.nullbulk
);
704 robj
*dobj
= lookupKeyWrite(c
->db
,c
->argv
[2]);
705 robj
*touchedkey
= c
->argv
[1];
707 if (dobj
&& checkType(c
,dobj
,REDIS_LIST
)) return;
708 value
= listTypePop(sobj
,REDIS_TAIL
);
709 /* We saved touched key, and protect it, since rpoplpushHandlePush
710 * may change the client command argument vector. */
711 incrRefCount(touchedkey
);
712 rpoplpushHandlePush(c
,c
,c
->argv
[2],dobj
,value
);
714 /* listTypePop returns an object with its refcount incremented */
717 /* Delete the source list when it is empty */
718 if (listTypeLength(sobj
) == 0) dbDelete(c
->db
,touchedkey
);
719 signalModifiedKey(c
->db
,touchedkey
);
720 decrRefCount(touchedkey
);
723 /* Replicate this as a simple RPOP since the LPUSH side is replicated
724 * by rpoplpushHandlePush() call if needed (it may not be needed
725 * if a client is blocking wait a push against the list). */
726 rewriteClientCommandVector(c
,2,
727 resetRefCount(createStringObject("RPOP",4)),
732 /*-----------------------------------------------------------------------------
733 * Blocking POP operations
734 *----------------------------------------------------------------------------*/
736 /* Currently Redis blocking operations support is limited to list POP ops,
737 * so the current implementation is not fully generic, but it is also not
738 * completely specific so it will not require a rewrite to support new
739 * kind of blocking operations in the future.
741 * Still it's important to note that list blocking operations can be already
742 * used as a notification mechanism in order to implement other blocking
743 * operations at application level, so there must be a very strong evidence
744 * of usefulness and generality before new blocking operations are implemented.
746 * This is how the current blocking POP works, we use BLPOP as example:
747 * - If the user calls BLPOP and the key exists and contains a non empty list
748 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
749 * if there is not to block.
750 * - If instead BLPOP is called and the key does not exists or the list is
751 * empty we need to block. In order to do so we remove the notification for
752 * new data to read in the client socket (so that we'll not serve new
753 * requests if the blocking request is not served). Also we put the client
754 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
755 * blocking for this keys.
756 * - If a PUSH operation against a key with blocked clients waiting is
757 * performed, we serve the first in the list: basically instead to push
758 * the new element inside the list we return it to the (first / oldest)
759 * blocking client, unblock the client, and remove it form the list.
761 * The above comment and the source code should be enough in order to understand
762 * the implementation and modify / fix it later.
765 /* Set a client in blocking mode for the specified key, with the specified
767 void blockForKeys(redisClient
*c
, robj
**keys
, int numkeys
, time_t timeout
, robj
*target
) {
772 c
->bpop
.keys
= zmalloc(sizeof(robj
*)*numkeys
);
773 c
->bpop
.count
= numkeys
;
774 c
->bpop
.timeout
= timeout
;
775 c
->bpop
.target
= target
;
777 if (target
!= NULL
) {
778 incrRefCount(target
);
781 for (j
= 0; j
< numkeys
; j
++) {
782 /* Add the key in the client structure, to map clients -> keys */
783 c
->bpop
.keys
[j
] = keys
[j
];
784 incrRefCount(keys
[j
]);
786 /* And in the other "side", to map keys -> clients */
787 de
= dictFind(c
->db
->blocking_keys
,keys
[j
]);
791 /* For every key we take a list of clients blocked for it */
793 retval
= dictAdd(c
->db
->blocking_keys
,keys
[j
],l
);
794 incrRefCount(keys
[j
]);
795 redisAssertWithInfo(c
,keys
[j
],retval
== DICT_OK
);
799 listAddNodeTail(l
,c
);
801 /* Mark the client as a blocked client */
802 c
->flags
|= REDIS_BLOCKED
;
803 server
.bpop_blocked_clients
++;
806 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
807 void unblockClientWaitingData(redisClient
*c
) {
812 redisAssertWithInfo(c
,NULL
,c
->bpop
.keys
!= NULL
);
813 /* The client may wait for multiple keys, so unblock it for every key. */
814 for (j
= 0; j
< c
->bpop
.count
; j
++) {
815 /* Remove this client from the list of clients waiting for this key. */
816 de
= dictFind(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]);
817 redisAssertWithInfo(c
,c
->bpop
.keys
[j
],de
!= NULL
);
819 listDelNode(l
,listSearchKey(l
,c
));
820 /* If the list is empty we need to remove it to avoid wasting memory */
821 if (listLength(l
) == 0)
822 dictDelete(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]);
823 decrRefCount(c
->bpop
.keys
[j
]);
826 /* Cleanup the client structure */
829 if (c
->bpop
.target
) decrRefCount(c
->bpop
.target
);
830 c
->bpop
.target
= NULL
;
831 c
->flags
&= ~REDIS_BLOCKED
;
832 c
->flags
|= REDIS_UNBLOCKED
;
833 server
.bpop_blocked_clients
--;
834 listAddNodeTail(server
.unblocked_clients
,c
);
837 /* This should be called from any function PUSHing into lists.
838 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
839 * 'ele' is the element pushed.
841 * If the function returns 0 there was no client waiting for a list push
844 * If the function returns 1 there was a client waiting for a list push
845 * against this key, the element was passed to this client thus it's not
846 * needed to actually add it to the list and the caller should return asap. */
847 int handleClientsWaitingListPush(redisClient
*c
, robj
*key
, robj
*ele
) {
848 struct dictEntry
*de
;
849 redisClient
*receiver
;
853 robj
*dstkey
, *dstobj
;
855 de
= dictFind(c
->db
->blocking_keys
,key
);
856 if (de
== NULL
) return 0;
857 clients
= dictGetVal(de
);
858 numclients
= listLength(clients
);
860 /* Try to handle the push as long as there are clients waiting for a push.
861 * Note that "numclients" is used because the list of clients waiting for a
862 * push on "key" is deleted by unblockClient() when empty.
864 * This loop will have more than 1 iteration when there is a BRPOPLPUSH
865 * that cannot push the target list because it does not contain a list. If
866 * this happens, it simply tries the next client waiting for a push. */
867 while (numclients
--) {
868 ln
= listFirst(clients
);
869 redisAssertWithInfo(c
,key
,ln
!= NULL
);
870 receiver
= ln
->value
;
871 dstkey
= receiver
->bpop
.target
;
873 /* Protect receiver->bpop.target, that will be freed by
874 * the next unblockClientWaitingData() call. */
875 if (dstkey
) incrRefCount(dstkey
);
877 /* This should remove the first element of the "clients" list. */
878 unblockClientWaitingData(receiver
);
880 if (dstkey
== NULL
) {
882 addReplyMultiBulkLen(receiver
,2);
883 addReplyBulk(receiver
,key
);
884 addReplyBulk(receiver
,ele
);
885 return 1; /* Serve just the first client as in B[RL]POP semantics */
887 /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
888 dstobj
= lookupKeyWrite(receiver
->db
,dstkey
);
889 if (!(dstobj
&& checkType(receiver
,dstobj
,REDIS_LIST
))) {
890 rpoplpushHandlePush(c
,receiver
,dstkey
,dstobj
,ele
);
891 decrRefCount(dstkey
);
894 decrRefCount(dstkey
);
901 int getTimeoutFromObjectOrReply(redisClient
*c
, robj
*object
, time_t *timeout
) {
904 if (getLongFromObjectOrReply(c
,object
,&tval
,
905 "timeout is not an integer or out of range") != REDIS_OK
)
909 addReplyError(c
,"timeout is negative");
913 if (tval
> 0) tval
+= time(NULL
);
919 /* Blocking RPOP/LPOP */
920 void blockingPopGenericCommand(redisClient
*c
, int where
) {
925 if (getTimeoutFromObjectOrReply(c
,c
->argv
[c
->argc
-1],&timeout
) != REDIS_OK
)
928 for (j
= 1; j
< c
->argc
-1; j
++) {
929 o
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
931 if (o
->type
!= REDIS_LIST
) {
932 addReply(c
,shared
.wrongtypeerr
);
935 if (listTypeLength(o
) != 0) {
936 /* Non empty list, this is like a non normal [LR]POP. */
937 robj
*value
= listTypePop(o
,where
);
938 redisAssert(value
!= NULL
);
940 addReplyMultiBulkLen(c
,2);
941 addReplyBulk(c
,c
->argv
[j
]);
942 addReplyBulk(c
,value
);
944 if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[j
]);
945 signalModifiedKey(c
->db
,c
->argv
[j
]);
948 /* Replicate it as an [LR]POP instead of B[LR]POP. */
949 rewriteClientCommandVector(c
,2,
950 (where
== REDIS_HEAD
) ? shared
.lpop
: shared
.rpop
,
958 /* If we are inside a MULTI/EXEC and the list is empty the only thing
959 * we can do is treating it as a timeout (even with timeout 0). */
960 if (c
->flags
& REDIS_MULTI
) {
961 addReply(c
,shared
.nullmultibulk
);
965 /* If the list is empty or the key does not exists we must block */
966 blockForKeys(c
, c
->argv
+ 1, c
->argc
- 2, timeout
, NULL
);
969 void blpopCommand(redisClient
*c
) {
970 blockingPopGenericCommand(c
,REDIS_HEAD
);
973 void brpopCommand(redisClient
*c
) {
974 blockingPopGenericCommand(c
,REDIS_TAIL
);
977 void brpoplpushCommand(redisClient
*c
) {
980 if (getTimeoutFromObjectOrReply(c
,c
->argv
[3],&timeout
) != REDIS_OK
)
983 robj
*key
= lookupKeyWrite(c
->db
, c
->argv
[1]);
986 if (c
->flags
& REDIS_MULTI
) {
988 /* Blocking against an empty list in a multi state
989 * returns immediately. */
990 addReply(c
, shared
.nullbulk
);
992 /* The list is empty and the client blocks. */
993 blockForKeys(c
, c
->argv
+ 1, 1, timeout
, c
->argv
[2]);
996 if (key
->type
!= REDIS_LIST
) {
997 addReply(c
, shared
.wrongtypeerr
);
1000 /* The list exists and has elements, so
1001 * the regular rpoplpushCommand is executed. */
1002 redisAssertWithInfo(c
,key
,listTypeLength(key
) > 0);
1003 rpoplpushCommand(c
);