3 /*----------------------------------------------------------------------------- 
   5  *----------------------------------------------------------------------------*/ 
   7 /* Check the argument length to see if it requires us to convert the ziplist 
   8  * to a real list. Only check raw-encoded objects because integer encoded 
   9  * objects are never too long. */ 
  10 void listTypeTryConversion(robj 
*subject
, robj 
*value
) { 
  11     if (subject
->encoding 
!= REDIS_ENCODING_ZIPLIST
) return; 
  12     if (value
->encoding 
== REDIS_ENCODING_RAW 
&& 
  13         sdslen(value
->ptr
) > server
.list_max_ziplist_value
) 
  14             listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
  17 void listTypePush(robj 
*subject
, robj 
*value
, int where
) { 
  18     /* Check if we need to convert the ziplist */ 
  19     listTypeTryConversion(subject
,value
); 
  20     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST 
&& 
  21         ziplistLen(subject
->ptr
) >= server
.list_max_ziplist_entries
) 
  22             listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
  24     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  25         int pos 
= (where 
== REDIS_HEAD
) ? ZIPLIST_HEAD 
: ZIPLIST_TAIL
; 
  26         value 
= getDecodedObject(value
); 
  27         subject
->ptr 
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),pos
); 
  29     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  30         if (where 
== REDIS_HEAD
) { 
  31             listAddNodeHead(subject
->ptr
,value
); 
  33             listAddNodeTail(subject
->ptr
,value
); 
  37         redisPanic("Unknown list encoding"); 
  41 robj 
*listTypePop(robj 
*subject
, int where
) { 
  43     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  48         int pos 
= (where 
== REDIS_HEAD
) ? 0 : -1; 
  49         p 
= ziplistIndex(subject
->ptr
,pos
); 
  50         if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) { 
  52                 value 
= createStringObject((char*)vstr
,vlen
); 
  54                 value 
= createStringObjectFromLongLong(vlong
); 
  56             /* We only need to delete an element when it exists */ 
  57             subject
->ptr 
= ziplistDelete(subject
->ptr
,&p
); 
  59     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  60         list 
*list 
= subject
->ptr
; 
  62         if (where 
== REDIS_HEAD
) { 
  68             value 
= listNodeValue(ln
); 
  73         redisPanic("Unknown list encoding"); 
  78 unsigned long listTypeLength(robj 
*subject
) { 
  79     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  80         return ziplistLen(subject
->ptr
); 
  81     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  82         return listLength((list
*)subject
->ptr
); 
  84         redisPanic("Unknown list encoding"); 
  88 /* Initialize an iterator at the specified index. */ 
  89 listTypeIterator 
*listTypeInitIterator(robj 
*subject
, int index
, unsigned char direction
) { 
  90     listTypeIterator 
*li 
= zmalloc(sizeof(listTypeIterator
)); 
  91     li
->subject 
= subject
; 
  92     li
->encoding 
= subject
->encoding
; 
  93     li
->direction 
= direction
; 
  94     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  95         li
->zi 
= ziplistIndex(subject
->ptr
,index
); 
  96     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  97         li
->ln 
= listIndex(subject
->ptr
,index
); 
  99         redisPanic("Unknown list encoding"); 
 104 /* Clean up the iterator. */ 
 105 void listTypeReleaseIterator(listTypeIterator 
*li
) { 
 109 /* Stores pointer to current the entry in the provided entry structure 
 110  * and advances the position of the iterator. Returns 1 when the current 
 111  * entry is in fact an entry, 0 otherwise. */ 
 112 int listTypeNext(listTypeIterator 
*li
, listTypeEntry 
*entry
) { 
 113     /* Protect from converting when iterating */ 
 114     redisAssert(li
->subject
->encoding 
== li
->encoding
); 
 117     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 119         if (entry
->zi 
!= NULL
) { 
 120             if (li
->direction 
== REDIS_TAIL
) 
 121                 li
->zi 
= ziplistNext(li
->subject
->ptr
,li
->zi
); 
 123                 li
->zi 
= ziplistPrev(li
->subject
->ptr
,li
->zi
); 
 126     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 128         if (entry
->ln 
!= NULL
) { 
 129             if (li
->direction 
== REDIS_TAIL
) 
 130                 li
->ln 
= li
->ln
->next
; 
 132                 li
->ln 
= li
->ln
->prev
; 
 136         redisPanic("Unknown list encoding"); 
 141 /* Return entry or NULL at the current position of the iterator. */ 
 142 robj 
*listTypeGet(listTypeEntry 
*entry
) { 
 143     listTypeIterator 
*li 
= entry
->li
; 
 145     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 149         redisAssert(entry
->zi 
!= NULL
); 
 150         if (ziplistGet(entry
->zi
,&vstr
,&vlen
,&vlong
)) { 
 152                 value 
= createStringObject((char*)vstr
,vlen
); 
 154                 value 
= createStringObjectFromLongLong(vlong
); 
 157     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 158         redisAssert(entry
->ln 
!= NULL
); 
 159         value 
= listNodeValue(entry
->ln
); 
 162         redisPanic("Unknown list encoding"); 
 167 void listTypeInsert(listTypeEntry 
*entry
, robj 
*value
, int where
) { 
 168     robj 
*subject 
= entry
->li
->subject
; 
 169     if (entry
->li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 170         value 
= getDecodedObject(value
); 
 171         if (where 
== REDIS_TAIL
) { 
 172             unsigned char *next 
= ziplistNext(subject
->ptr
,entry
->zi
); 
 174             /* When we insert after the current element, but the current element 
 175              * is the tail of the list, we need to do a push. */ 
 177                 subject
->ptr 
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),REDIS_TAIL
); 
 179                 subject
->ptr 
= ziplistInsert(subject
->ptr
,next
,value
->ptr
,sdslen(value
->ptr
)); 
 182             subject
->ptr 
= ziplistInsert(subject
->ptr
,entry
->zi
,value
->ptr
,sdslen(value
->ptr
)); 
 185     } else if (entry
->li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 186         if (where 
== REDIS_TAIL
) { 
 187             listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_TAIL
); 
 189             listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_HEAD
); 
 193         redisPanic("Unknown list encoding"); 
 197 /* Compare the given object with the entry at the current position. */ 
 198 int listTypeEqual(listTypeEntry 
*entry
, robj 
*o
) { 
 199     listTypeIterator 
*li 
= entry
->li
; 
 200     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 201         redisAssert(o
->encoding 
== REDIS_ENCODING_RAW
); 
 202         return ziplistCompare(entry
->zi
,o
->ptr
,sdslen(o
->ptr
)); 
 203     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 204         return equalStringObjects(o
,listNodeValue(entry
->ln
)); 
 206         redisPanic("Unknown list encoding"); 
 210 /* Delete the element pointed to. */ 
 211 void listTypeDelete(listTypeEntry 
*entry
) { 
 212     listTypeIterator 
*li 
= entry
->li
; 
 213     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 214         unsigned char *p 
= entry
->zi
; 
 215         li
->subject
->ptr 
= ziplistDelete(li
->subject
->ptr
,&p
); 
 217         /* Update position of the iterator depending on the direction */ 
 218         if (li
->direction 
== REDIS_TAIL
) 
 221             li
->zi 
= ziplistPrev(li
->subject
->ptr
,p
); 
 222     } else if (entry
->li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 224         if (li
->direction 
== REDIS_TAIL
) 
 225             next 
= entry
->ln
->next
; 
 227             next 
= entry
->ln
->prev
; 
 228         listDelNode(li
->subject
->ptr
,entry
->ln
); 
 231         redisPanic("Unknown list encoding"); 
 235 void listTypeConvert(robj 
*subject
, int enc
) { 
 236     listTypeIterator 
*li
; 
 238     redisAssert(subject
->type 
== REDIS_LIST
); 
 240     if (enc 
== REDIS_ENCODING_LINKEDLIST
) { 
 241         list 
*l 
= listCreate(); 
 242         listSetFreeMethod(l
,decrRefCount
); 
 244         /* listTypeGet returns a robj with incremented refcount */ 
 245         li 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 246         while (listTypeNext(li
,&entry
)) listAddNodeTail(l
,listTypeGet(&entry
)); 
 247         listTypeReleaseIterator(li
); 
 249         subject
->encoding 
= REDIS_ENCODING_LINKEDLIST
; 
 253         redisPanic("Unsupported list conversion"); 
 257 /*----------------------------------------------------------------------------- 
 259  *----------------------------------------------------------------------------*/ 
 261 void pushGenericCommand(redisClient 
*c
, int where
) { 
 262     int j
, addlen 
= 0, pushed 
= 0; 
 263     robj 
*lobj 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
 264     int may_have_waiting_clients 
= (lobj 
== NULL
); 
 266     if (lobj 
&& lobj
->type 
!= REDIS_LIST
) { 
 267         addReply(c
,shared
.wrongtypeerr
); 
 271     for (j 
= 2; j 
< c
->argc
; j
++) { 
 272         c
->argv
[j
] = tryObjectEncoding(c
->argv
[j
]); 
 273         if (may_have_waiting_clients
) { 
 274             if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[j
])) { 
 278                 may_have_waiting_clients 
= 0; 
 282             lobj 
= createZiplistObject(); 
 283             dbAdd(c
->db
,c
->argv
[1],lobj
); 
 285         listTypePush(lobj
,c
->argv
[j
],where
); 
 288     addReplyLongLong(c
,addlen 
+ (lobj 
? listTypeLength(lobj
) : 0)); 
 289     if (pushed
) signalModifiedKey(c
->db
,c
->argv
[1]); 
 290     server
.dirty 
+= pushed
; 
 293 void lpushCommand(redisClient 
*c
) { 
 294     pushGenericCommand(c
,REDIS_HEAD
); 
 297 void rpushCommand(redisClient 
*c
) { 
 298     pushGenericCommand(c
,REDIS_TAIL
); 
 301 void pushxGenericCommand(redisClient 
*c
, robj 
*refval
, robj 
*val
, int where
) { 
 303     listTypeIterator 
*iter
; 
 307     if ((subject 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 308         checkType(c
,subject
,REDIS_LIST
)) return; 
 310     if (refval 
!= NULL
) { 
 311         /* Note: we expect refval to be string-encoded because it is *not* the 
 312          * last argument of the multi-bulk LINSERT. */ 
 313         redisAssert(refval
->encoding 
== REDIS_ENCODING_RAW
); 
 315         /* We're not sure if this value can be inserted yet, but we cannot 
 316          * convert the list inside the iterator. We don't want to loop over 
 317          * the list twice (once to see if the value can be inserted and once 
 318          * to do the actual insert), so we assume this value can be inserted 
 319          * and convert the ziplist to a regular list if necessary. */ 
 320         listTypeTryConversion(subject
,val
); 
 322         /* Seek refval from head to tail */ 
 323         iter 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 324         while (listTypeNext(iter
,&entry
)) { 
 325             if (listTypeEqual(&entry
,refval
)) { 
 326                 listTypeInsert(&entry
,val
,where
); 
 331         listTypeReleaseIterator(iter
); 
 334             /* Check if the length exceeds the ziplist length threshold. */ 
 335             if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST 
&& 
 336                 ziplistLen(subject
->ptr
) > server
.list_max_ziplist_entries
) 
 337                     listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
 338             signalModifiedKey(c
->db
,c
->argv
[1]); 
 341             /* Notify client of a failed insert */ 
 342             addReply(c
,shared
.cnegone
); 
 346         listTypePush(subject
,val
,where
); 
 347         signalModifiedKey(c
->db
,c
->argv
[1]); 
 351     addReplyLongLong(c
,listTypeLength(subject
)); 
 354 void lpushxCommand(redisClient 
*c
) { 
 355     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 356     pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_HEAD
); 
 359 void rpushxCommand(redisClient 
*c
) { 
 360     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 361     pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_TAIL
); 
 364 void linsertCommand(redisClient 
*c
) { 
 365     c
->argv
[4] = tryObjectEncoding(c
->argv
[4]); 
 366     if (strcasecmp(c
->argv
[2]->ptr
,"after") == 0) { 
 367         pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_TAIL
); 
 368     } else if (strcasecmp(c
->argv
[2]->ptr
,"before") == 0) { 
 369         pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_HEAD
); 
 371         addReply(c
,shared
.syntaxerr
); 
 375 void llenCommand(redisClient 
*c
) { 
 376     robj 
*o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
); 
 377     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 378     addReplyLongLong(c
,listTypeLength(o
)); 
 381 void lindexCommand(redisClient 
*c
) { 
 382     robj 
*o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
); 
 383     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 384     int index 
= atoi(c
->argv
[2]->ptr
); 
 387     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 392         p 
= ziplistIndex(o
->ptr
,index
); 
 393         if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) { 
 395                 value 
= createStringObject((char*)vstr
,vlen
); 
 397                 value 
= createStringObjectFromLongLong(vlong
); 
 399             addReplyBulk(c
,value
); 
 402             addReply(c
,shared
.nullbulk
); 
 404     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 405         listNode 
*ln 
= listIndex(o
->ptr
,index
); 
 407             value 
= listNodeValue(ln
); 
 408             addReplyBulk(c
,value
); 
 410             addReply(c
,shared
.nullbulk
); 
 413         redisPanic("Unknown list encoding"); 
 417 void lsetCommand(redisClient 
*c
) { 
 418     robj 
*o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nokeyerr
); 
 419     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 420     int index 
= atoi(c
->argv
[2]->ptr
); 
 421     robj 
*value 
= (c
->argv
[3] = tryObjectEncoding(c
->argv
[3])); 
 423     listTypeTryConversion(o
,value
); 
 424     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 425         unsigned char *p
, *zl 
= o
->ptr
; 
 426         p 
= ziplistIndex(zl
,index
); 
 428             addReply(c
,shared
.outofrangeerr
); 
 430             o
->ptr 
= ziplistDelete(o
->ptr
,&p
); 
 431             value 
= getDecodedObject(value
); 
 432             o
->ptr 
= ziplistInsert(o
->ptr
,p
,value
->ptr
,sdslen(value
->ptr
)); 
 434             addReply(c
,shared
.ok
); 
 435             signalModifiedKey(c
->db
,c
->argv
[1]); 
 438     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 439         listNode 
*ln 
= listIndex(o
->ptr
,index
); 
 441             addReply(c
,shared
.outofrangeerr
); 
 443             decrRefCount((robj
*)listNodeValue(ln
)); 
 444             listNodeValue(ln
) = value
; 
 446             addReply(c
,shared
.ok
); 
 447             signalModifiedKey(c
->db
,c
->argv
[1]); 
 451         redisPanic("Unknown list encoding"); 
 455 void popGenericCommand(redisClient 
*c
, int where
) { 
 456     robj 
*o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
); 
 457     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 459     robj 
*value 
= listTypePop(o
,where
); 
 461         addReply(c
,shared
.nullbulk
); 
 463         addReplyBulk(c
,value
); 
 465         if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 466         signalModifiedKey(c
->db
,c
->argv
[1]); 
 471 void lpopCommand(redisClient 
*c
) { 
 472     popGenericCommand(c
,REDIS_HEAD
); 
 475 void rpopCommand(redisClient 
*c
) { 
 476     popGenericCommand(c
,REDIS_TAIL
); 
 479 void lrangeCommand(redisClient 
*c
) { 
 481     int start 
= atoi(c
->argv
[2]->ptr
); 
 482     int end 
= atoi(c
->argv
[3]->ptr
); 
 486     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.emptymultibulk
)) == NULL
 
 487          || checkType(c
,o
,REDIS_LIST
)) return; 
 488     llen 
= listTypeLength(o
); 
 490     /* convert negative indexes */ 
 491     if (start 
< 0) start 
= llen
+start
; 
 492     if (end 
< 0) end 
= llen
+end
; 
 493     if (start 
< 0) start 
= 0; 
 495     /* Invariant: start >= 0, so this test will be true when end < 0. 
 496      * The range is empty when start > end or start >= length. */ 
 497     if (start 
> end 
|| start 
>= llen
) { 
 498         addReply(c
,shared
.emptymultibulk
); 
 501     if (end 
>= llen
) end 
= llen
-1; 
 502     rangelen 
= (end
-start
)+1; 
 504     /* Return the result in form of a multi-bulk reply */ 
 505     addReplyMultiBulkLen(c
,rangelen
); 
 506     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 507         unsigned char *p 
= ziplistIndex(o
->ptr
,start
); 
 513             ziplistGet(p
,&vstr
,&vlen
,&vlong
); 
 515                 addReplyBulkCBuffer(c
,vstr
,vlen
); 
 517                 addReplyBulkLongLong(c
,vlong
); 
 519             p 
= ziplistNext(o
->ptr
,p
); 
 521     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 522         listNode 
*ln 
= listIndex(o
->ptr
,start
); 
 525             addReplyBulk(c
,ln
->value
); 
 529         redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!"); 
 533 void ltrimCommand(redisClient 
*c
) { 
 535     int start 
= atoi(c
->argv
[2]->ptr
); 
 536     int end 
= atoi(c
->argv
[3]->ptr
); 
 542     if ((o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.ok
)) == NULL 
|| 
 543         checkType(c
,o
,REDIS_LIST
)) return; 
 544     llen 
= listTypeLength(o
); 
 546     /* convert negative indexes */ 
 547     if (start 
< 0) start 
= llen
+start
; 
 548     if (end 
< 0) end 
= llen
+end
; 
 549     if (start 
< 0) start 
= 0; 
 551     /* Invariant: start >= 0, so this test will be true when end < 0. 
 552      * The range is empty when start > end or start >= length. */ 
 553     if (start 
> end 
|| start 
>= llen
) { 
 554         /* Out of range start or start > end result in empty list */ 
 558         if (end 
>= llen
) end 
= llen
-1; 
 563     /* Remove list elements to perform the trim */ 
 564     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 565         o
->ptr 
= ziplistDeleteRange(o
->ptr
,0,ltrim
); 
 566         o
->ptr 
= ziplistDeleteRange(o
->ptr
,-rtrim
,rtrim
); 
 567     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 569         for (j 
= 0; j 
< ltrim
; j
++) { 
 570             ln 
= listFirst(list
); 
 571             listDelNode(list
,ln
); 
 573         for (j 
= 0; j 
< rtrim
; j
++) { 
 575             listDelNode(list
,ln
); 
 578         redisPanic("Unknown list encoding"); 
 580     if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 581     signalModifiedKey(c
->db
,c
->argv
[1]); 
 583     addReply(c
,shared
.ok
); 
 586 void lremCommand(redisClient 
*c
) { 
 588     obj 
= c
->argv
[3] = tryObjectEncoding(c
->argv
[3]); 
 589     int toremove 
= atoi(c
->argv
[2]->ptr
); 
 593     subject 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
); 
 594     if (subject 
== NULL 
|| checkType(c
,subject
,REDIS_LIST
)) return; 
 596     /* Make sure obj is raw when we're dealing with a ziplist */ 
 597     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) 
 598         obj 
= getDecodedObject(obj
); 
 600     listTypeIterator 
*li
; 
 602         toremove 
= -toremove
; 
 603         li 
= listTypeInitIterator(subject
,-1,REDIS_HEAD
); 
 605         li 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 608     while (listTypeNext(li
,&entry
)) { 
 609         if (listTypeEqual(&entry
,obj
)) { 
 610             listTypeDelete(&entry
); 
 613             if (toremove 
&& removed 
== toremove
) break; 
 616     listTypeReleaseIterator(li
); 
 618     /* Clean up raw encoded object */ 
 619     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) 
 622     if (listTypeLength(subject
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 623     addReplyLongLong(c
,removed
); 
 624     if (removed
) signalModifiedKey(c
->db
,c
->argv
[1]); 
 627 /* This is the semantic of this command: 
 628  *  RPOPLPUSH srclist dstlist: 
 629  *    IF LLEN(srclist) > 0 
 630  *      element = RPOP srclist 
 631  *      LPUSH dstlist element 
 638  * The idea is to be able to get an element from a list in a reliable way 
 639  * since the element is not just returned but pushed against another list 
 640  * as well. This command was originally proposed by Ezra Zygmuntowicz. 
 643 void rpoplpushHandlePush(redisClient 
*c
, robj 
*dstkey
, robj 
*dstobj
, robj 
*value
) { 
 644     if (!handleClientsWaitingListPush(c
,dstkey
,value
)) { 
 645         /* Create the list if the key does not exist */ 
 647             dstobj 
= createZiplistObject(); 
 648             dbAdd(c
->db
,dstkey
,dstobj
); 
 650             signalModifiedKey(c
->db
,dstkey
); 
 653         listTypePush(dstobj
,value
,REDIS_HEAD
); 
 656     /* Always send the pushed value to the client. */ 
 657     addReplyBulk(c
,value
); 
 660 void rpoplpushCommand(redisClient 
*c
) { 
 662     if ((sobj 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
 663         checkType(c
,sobj
,REDIS_LIST
)) return; 
 665     if (listTypeLength(sobj
) == 0) { 
 666         addReply(c
,shared
.nullbulk
); 
 668         robj 
*dobj 
= lookupKeyWrite(c
->db
,c
->argv
[2]); 
 669         if (dobj 
&& checkType(c
,dobj
,REDIS_LIST
)) return; 
 670         value 
= listTypePop(sobj
,REDIS_TAIL
); 
 671         rpoplpushHandlePush(c
,c
->argv
[2],dobj
,value
); 
 673         /* listTypePop returns an object with its refcount incremented */ 
 676         /* Delete the source list when it is empty */ 
 677         if (listTypeLength(sobj
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 678         signalModifiedKey(c
->db
,c
->argv
[1]); 
 683 /*----------------------------------------------------------------------------- 
 684  * Blocking POP operations 
 685  *----------------------------------------------------------------------------*/ 
 687 /* Currently Redis blocking operations support is limited to list POP ops, 
 688  * so the current implementation is not fully generic, but it is also not 
 689  * completely specific so it will not require a rewrite to support new 
 690  * kind of blocking operations in the future. 
 692  * Still it's important to note that list blocking operations can be already 
 693  * used as a notification mechanism in order to implement other blocking 
 694  * operations at application level, so there must be a very strong evidence 
 695  * of usefulness and generality before new blocking operations are implemented. 
 697  * This is how the current blocking POP works, we use BLPOP as example: 
 698  * - If the user calls BLPOP and the key exists and contains a non empty list 
 699  *   then LPOP is called instead. So BLPOP is semantically the same as LPOP 
 700  *   if there is not to block. 
 701  * - If instead BLPOP is called and the key does not exists or the list is 
 702  *   empty we need to block. In order to do so we remove the notification for 
 703  *   new data to read in the client socket (so that we'll not serve new 
 704  *   requests if the blocking request is not served). Also we put the client 
 705  *   in a dictionary (db->blocking_keys) mapping keys to a list of clients 
 706  *   blocking for this keys. 
 707  * - If a PUSH operation against a key with blocked clients waiting is 
 708  *   performed, we serve the first in the list: basically instead to push 
 709  *   the new element inside the list we return it to the (first / oldest) 
 710  *   blocking client, unblock the client, and remove it form the list. 
 712  * The above comment and the source code should be enough in order to understand 
 713  * the implementation and modify / fix it later. 
 716 /* Set a client in blocking mode for the specified key, with the specified 
 718 void blockForKeys(redisClient 
*c
, robj 
**keys
, int numkeys
, time_t timeout
, robj 
*target
) { 
 723     c
->bpop
.keys 
= zmalloc(sizeof(robj
*)*numkeys
); 
 724     c
->bpop
.count 
= numkeys
; 
 725     c
->bpop
.timeout 
= timeout
; 
 726     c
->bpop
.target 
= target
; 
 728     if (target 
!= NULL
) { 
 729         incrRefCount(target
); 
 732     for (j 
= 0; j 
< numkeys
; j
++) { 
 733         /* Add the key in the client structure, to map clients -> keys */ 
 734         c
->bpop
.keys
[j
] = keys
[j
]; 
 735         incrRefCount(keys
[j
]); 
 737         /* And in the other "side", to map keys -> clients */ 
 738         de 
= dictFind(c
->db
->blocking_keys
,keys
[j
]); 
 742             /* For every key we take a list of clients blocked for it */ 
 744             retval 
= dictAdd(c
->db
->blocking_keys
,keys
[j
],l
); 
 745             incrRefCount(keys
[j
]); 
 746             redisAssert(retval 
== DICT_OK
); 
 748             l 
= dictGetEntryVal(de
); 
 750         listAddNodeTail(l
,c
); 
 752     /* Mark the client as a blocked client */ 
 753     c
->flags 
|= REDIS_BLOCKED
; 
 754     server
.bpop_blocked_clients
++; 
 757 /* Unblock a client that's waiting in a blocking operation such as BLPOP */ 
 758 void unblockClientWaitingData(redisClient 
*c
) { 
 763     redisAssert(c
->bpop
.keys 
!= NULL
); 
 764     /* The client may wait for multiple keys, so unblock it for every key. */ 
 765     for (j 
= 0; j 
< c
->bpop
.count
; j
++) { 
 766         /* Remove this client from the list of clients waiting for this key. */ 
 767         de 
= dictFind(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]); 
 768         redisAssert(de 
!= NULL
); 
 769         l 
= dictGetEntryVal(de
); 
 770         listDelNode(l
,listSearchKey(l
,c
)); 
 771         /* If the list is empty we need to remove it to avoid wasting memory */ 
 772         if (listLength(l
) == 0) 
 773             dictDelete(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]); 
 774         decrRefCount(c
->bpop
.keys
[j
]); 
 777     /* Cleanup the client structure */ 
 780     c
->bpop
.target 
= NULL
; 
 781     c
->flags 
&= ~REDIS_BLOCKED
; 
 782     c
->flags 
|= REDIS_UNBLOCKED
; 
 783     server
.bpop_blocked_clients
--; 
 784     listAddNodeTail(server
.unblocked_clients
,c
); 
 787 /* This should be called from any function PUSHing into lists. 
 788  * 'c' is the "pushing client", 'key' is the key it is pushing data against, 
 789  * 'ele' is the element pushed. 
 791  * If the function returns 0 there was no client waiting for a list push 
 794  * If the function returns 1 there was a client waiting for a list push 
 795  * against this key, the element was passed to this client thus it's not 
 796  * needed to actually add it to the list and the caller should return asap. */ 
 797 int handleClientsWaitingListPush(redisClient 
*c
, robj 
*key
, robj 
*ele
) { 
 798     struct dictEntry 
*de
; 
 799     redisClient 
*receiver
; 
 803     robj 
*dstkey
, *dstobj
; 
 805     de 
= dictFind(c
->db
->blocking_keys
,key
); 
 806     if (de 
== NULL
) return 0; 
 807     clients 
= dictGetEntryVal(de
); 
 808     numclients 
= listLength(clients
); 
 810     /* Try to handle the push as long as there are clients waiting for a push. 
 811      * Note that "numclients" is used because the list of clients waiting for a 
 812      * push on "key" is deleted by unblockClient() when empty. 
 814      * This loop will have more than 1 iteration when there is a BRPOPLPUSH 
 815      * that cannot push the target list because it does not contain a list. If 
 816      * this happens, it simply tries the next client waiting for a push. */ 
 817     while (numclients
--) { 
 818         ln 
= listFirst(clients
); 
 819         redisAssert(ln 
!= NULL
); 
 820         receiver 
= ln
->value
; 
 821         dstkey 
= receiver
->bpop
.target
; 
 823         /* This should remove the first element of the "clients" list. */ 
 824         unblockClientWaitingData(receiver
); 
 825         redisAssert(ln 
!= listFirst(clients
)); 
 827         if (dstkey 
== NULL
) { 
 829             addReplyMultiBulkLen(receiver
,2); 
 830             addReplyBulk(receiver
,key
); 
 831             addReplyBulk(receiver
,ele
); 
 834             /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */ 
 835             dstobj 
= lookupKeyWrite(receiver
->db
,dstkey
); 
 836             if (dstobj 
&& checkType(receiver
,dstobj
,REDIS_LIST
)) { 
 837                 decrRefCount(dstkey
); 
 839                 rpoplpushHandlePush(receiver
,dstkey
,dstobj
,ele
); 
 840                 decrRefCount(dstkey
); 
 849 int getTimeoutFromObjectOrReply(redisClient 
*c
, robj 
*object
, time_t *timeout
) { 
 852     if (getLongFromObjectOrReply(c
,object
,&tval
, 
 853         "timeout is not an integer or out of range") != REDIS_OK
) 
 857         addReplyError(c
,"timeout is negative"); 
 861     if (tval 
> 0) tval 
+= time(NULL
); 
 867 /* Blocking RPOP/LPOP */ 
 868 void blockingPopGenericCommand(redisClient 
*c
, int where
) { 
 873     if (getTimeoutFromObjectOrReply(c
,c
->argv
[c
->argc
-1],&timeout
) != REDIS_OK
) 
 876     for (j 
= 1; j 
< c
->argc
-1; j
++) { 
 877         o 
= lookupKeyWrite(c
->db
,c
->argv
[j
]); 
 879             if (o
->type 
!= REDIS_LIST
) { 
 880                 addReply(c
,shared
.wrongtypeerr
); 
 883                 if (listTypeLength(o
) != 0) { 
 884                     /* If the list contains elements fall back to the usual 
 885                      * non-blocking POP operation */ 
 886                     robj 
*argv
[2], **orig_argv
; 
 889                     /* We need to alter the command arguments before to call 
 890                      * popGenericCommand() as the command takes a single key. */ 
 893                     argv
[1] = c
->argv
[j
]; 
 897                     /* Also the return value is different, we need to output 
 898                      * the multi bulk reply header and the key name. The 
 899                      * "real" command will add the last element (the value) 
 900                      * for us. If this souds like an hack to you it's just 
 901                      * because it is... */ 
 902                     addReplyMultiBulkLen(c
,2); 
 903                     addReplyBulk(c
,argv
[1]); 
 905                     popGenericCommand(c
,where
); 
 907                     /* Fix the client structure with the original stuff */ 
 917     /* If we are inside a MULTI/EXEC and the list is empty the only thing 
 918      * we can do is treating it as a timeout (even with timeout 0). */ 
 919     if (c
->flags 
& REDIS_MULTI
) { 
 920         addReply(c
,shared
.nullmultibulk
); 
 924     /* If the list is empty or the key does not exists we must block */ 
 925     blockForKeys(c
, c
->argv 
+ 1, c
->argc 
- 2, timeout
, NULL
); 
 928 void blpopCommand(redisClient 
*c
) { 
 929     blockingPopGenericCommand(c
,REDIS_HEAD
); 
 932 void brpopCommand(redisClient 
*c
) { 
 933     blockingPopGenericCommand(c
,REDIS_TAIL
); 
 936 void brpoplpushCommand(redisClient 
*c
) { 
 939     if (getTimeoutFromObjectOrReply(c
,c
->argv
[3],&timeout
) != REDIS_OK
) 
 942     robj 
*key 
= lookupKeyWrite(c
->db
, c
->argv
[1]); 
 945         if (c
->flags 
& REDIS_MULTI
) { 
 947             /* Blocking against an empty list in a multi state 
 948              * returns immediately. */ 
 949             addReply(c
, shared
.nullbulk
); 
 951             /* The list is empty and the client blocks. */ 
 952             blockForKeys(c
, c
->argv 
+ 1, 1, timeout
, c
->argv
[2]); 
 955         if (key
->type 
!= REDIS_LIST
) { 
 956             addReply(c
, shared
.wrongtypeerr
); 
 959             /* The list exists and has elements, so 
 960              * the regular rpoplpushCommand is executed. */ 
 961             redisAssert(listTypeLength(key
) > 0);