3 /*----------------------------------------------------------------------------- 
   5  *----------------------------------------------------------------------------*/ 
   7 /* Check the argument length to see if it requires us to convert the ziplist 
   8  * to a real list. Only check raw-encoded objects because integer encoded 
   9  * objects are never too long. */ 
  10 void listTypeTryConversion(robj 
*subject
, robj 
*value
) { 
  11     if (subject
->encoding 
!= REDIS_ENCODING_ZIPLIST
) return; 
  12     if (value
->encoding 
== REDIS_ENCODING_RAW 
&& 
  13         sdslen(value
->ptr
) > server
.list_max_ziplist_value
) 
  14             listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
  17 void listTypePush(robj 
*subject
, robj 
*value
, int where
) { 
  18     /* Check if we need to convert the ziplist */ 
  19     listTypeTryConversion(subject
,value
); 
  20     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST 
&& 
  21         ziplistLen(subject
->ptr
) >= server
.list_max_ziplist_entries
) 
  22             listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
  24     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  25         int pos 
= (where 
== REDIS_HEAD
) ? ZIPLIST_HEAD 
: ZIPLIST_TAIL
; 
  26         value 
= getDecodedObject(value
); 
  27         subject
->ptr 
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),pos
); 
  29     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  30         if (where 
== REDIS_HEAD
) { 
  31             listAddNodeHead(subject
->ptr
,value
); 
  33             listAddNodeTail(subject
->ptr
,value
); 
  37         redisPanic("Unknown list encoding"); 
  41 robj 
*listTypePop(robj 
*subject
, int where
) { 
  43     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  48         int pos 
= (where 
== REDIS_HEAD
) ? 0 : -1; 
  49         p 
= ziplistIndex(subject
->ptr
,pos
); 
  50         if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) { 
  52                 value 
= createStringObject((char*)vstr
,vlen
); 
  54                 value 
= createStringObjectFromLongLong(vlong
); 
  56             /* We only need to delete an element when it exists */ 
  57             subject
->ptr 
= ziplistDelete(subject
->ptr
,&p
); 
  59     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  60         list 
*list 
= subject
->ptr
; 
  62         if (where 
== REDIS_HEAD
) { 
  68             value 
= listNodeValue(ln
); 
  73         redisPanic("Unknown list encoding"); 
  78 unsigned long listTypeLength(robj 
*subject
) { 
  79     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  80         return ziplistLen(subject
->ptr
); 
  81     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  82         return listLength((list
*)subject
->ptr
); 
  84         redisPanic("Unknown list encoding"); 
  88 /* Initialize an iterator at the specified index. */ 
  89 listTypeIterator 
*listTypeInitIterator(robj 
*subject
, int index
, unsigned char direction
) { 
  90     listTypeIterator 
*li 
= zmalloc(sizeof(listTypeIterator
)); 
  91     li
->subject 
= subject
; 
  92     li
->encoding 
= subject
->encoding
; 
  93     li
->direction 
= direction
; 
  94     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  95         li
->zi 
= ziplistIndex(subject
->ptr
,index
); 
  96     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  97         li
->ln 
= listIndex(subject
->ptr
,index
); 
  99         redisPanic("Unknown list encoding"); 
 104 /* Clean up the iterator. */ 
 105 void listTypeReleaseIterator(listTypeIterator 
*li
) { 
 109 /* Stores pointer to current the entry in the provided entry structure 
 110  * and advances the position of the iterator. Returns 1 when the current 
 111  * entry is in fact an entry, 0 otherwise. */ 
 112 int listTypeNext(listTypeIterator 
*li
, listTypeEntry 
*entry
) { 
 113     /* Protect from converting when iterating */ 
 114     redisAssert(li
->subject
->encoding 
== li
->encoding
); 
 117     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 119         if (entry
->zi 
!= NULL
) { 
 120             if (li
->direction 
== REDIS_TAIL
) 
 121                 li
->zi 
= ziplistNext(li
->subject
->ptr
,li
->zi
); 
 123                 li
->zi 
= ziplistPrev(li
->subject
->ptr
,li
->zi
); 
 126     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 128         if (entry
->ln 
!= NULL
) { 
 129             if (li
->direction 
== REDIS_TAIL
) 
 130                 li
->ln 
= li
->ln
->next
; 
 132                 li
->ln 
= li
->ln
->prev
; 
 136         redisPanic("Unknown list encoding"); 
 141 /* Return entry or NULL at the current position of the iterator. */ 
 142 robj 
*listTypeGet(listTypeEntry 
*entry
) { 
 143     listTypeIterator 
*li 
= entry
->li
; 
 145     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 149         redisAssert(entry
->zi 
!= NULL
); 
 150         if (ziplistGet(entry
->zi
,&vstr
,&vlen
,&vlong
)) { 
 152                 value 
= createStringObject((char*)vstr
,vlen
); 
 154                 value 
= createStringObjectFromLongLong(vlong
); 
 157     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 158         redisAssert(entry
->ln 
!= NULL
); 
 159         value 
= listNodeValue(entry
->ln
); 
 162         redisPanic("Unknown list encoding"); 
 167 void listTypeInsert(listTypeEntry 
*entry
, robj 
*value
, int where
) { 
 168     robj 
*subject 
= entry
->li
->subject
; 
 169     if (entry
->li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 170         value 
= getDecodedObject(value
); 
 171         if (where 
== REDIS_TAIL
) { 
 172             unsigned char *next 
= ziplistNext(subject
->ptr
,entry
->zi
); 
 174             /* When we insert after the current element, but the current element 
 175              * is the tail of the list, we need to do a push. */ 
 177                 subject
->ptr 
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),REDIS_TAIL
); 
 179                 subject
->ptr 
= ziplistInsert(subject
->ptr
,next
,value
->ptr
,sdslen(value
->ptr
)); 
 182             subject
->ptr 
= ziplistInsert(subject
->ptr
,entry
->zi
,value
->ptr
,sdslen(value
->ptr
)); 
 185     } else if (entry
->li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 186         if (where 
== REDIS_TAIL
) { 
 187             listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_TAIL
); 
 189             listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_HEAD
); 
 193         redisPanic("Unknown list encoding"); 
 197 /* Compare the given object with the entry at the current position. */ 
 198 int listTypeEqual(listTypeEntry 
*entry
, robj 
*o
) { 
 199     listTypeIterator 
*li 
= entry
->li
; 
 200     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 201         redisAssert(o
->encoding 
== REDIS_ENCODING_RAW
); 
 202         return ziplistCompare(entry
->zi
,o
->ptr
,sdslen(o
->ptr
)); 
 203     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 204         return equalStringObjects(o
,listNodeValue(entry
->ln
)); 
 206         redisPanic("Unknown list encoding"); 
 210 /* Delete the element pointed to. */ 
 211 void listTypeDelete(listTypeEntry 
*entry
) { 
 212     listTypeIterator 
*li 
= entry
->li
; 
 213     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 214         unsigned char *p 
= entry
->zi
; 
 215         li
->subject
->ptr 
= ziplistDelete(li
->subject
->ptr
,&p
); 
 217         /* Update position of the iterator depending on the direction */ 
 218         if (li
->direction 
== REDIS_TAIL
) 
 221             li
->zi 
= ziplistPrev(li
->subject
->ptr
,p
); 
 222     } else if (entry
->li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 224         if (li
->direction 
== REDIS_TAIL
) 
 225             next 
= entry
->ln
->next
; 
 227             next 
= entry
->ln
->prev
; 
 228         listDelNode(li
->subject
->ptr
,entry
->ln
); 
 231         redisPanic("Unknown list encoding"); 
 235 void listTypeConvert(robj 
*subject
, int enc
) { 
 236     listTypeIterator 
*li
; 
 238     redisAssert(subject
->type 
== REDIS_LIST
); 
 240     if (enc 
== REDIS_ENCODING_LINKEDLIST
) { 
 241         list 
*l 
= listCreate(); 
 242         listSetFreeMethod(l
,decrRefCount
); 
 244         /* listTypeGet returns a robj with incremented refcount */ 
 245         li 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 246         while (listTypeNext(li
,&entry
)) listAddNodeTail(l
,listTypeGet(&entry
)); 
 247         listTypeReleaseIterator(li
); 
 249         subject
->encoding 
= REDIS_ENCODING_LINKEDLIST
; 
 253         redisPanic("Unsupported list conversion"); 
 257 /*----------------------------------------------------------------------------- 
 259  *----------------------------------------------------------------------------*/ 
 261 void pushGenericCommand(redisClient 
*c
, int where
) { 
 262     int j
, addlen 
= 0, pushed 
= 0; 
 263     robj 
*lobj 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
 264     int may_have_waiting_clients 
= (lobj 
== NULL
); 
 266     if (lobj 
&& lobj
->type 
!= REDIS_LIST
) { 
 267         addReply(c
,shared
.wrongtypeerr
); 
 271     for (j 
= 2; j 
< c
->argc
; j
++) { 
 272         c
->argv
[j
] = tryObjectEncoding(c
->argv
[j
]); 
 273         if (may_have_waiting_clients
) { 
 274             if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[j
])) { 
 278                 may_have_waiting_clients 
= 0; 
 282             lobj 
= createZiplistObject(); 
 283             dbAdd(c
->db
,c
->argv
[1],lobj
); 
 285         listTypePush(lobj
,c
->argv
[j
],where
); 
 288     addReplyLongLong(c
,addlen 
+ (lobj 
? listTypeLength(lobj
) : 0)); 
 289     if (pushed
) signalModifiedKey(c
->db
,c
->argv
[1]); 
 290     server
.dirty 
+= pushed
; 
 293 void lpushCommand(redisClient 
*c
) { 
 294     pushGenericCommand(c
,REDIS_HEAD
); 
 297 void rpushCommand(redisClient 
*c
) { 
 298     pushGenericCommand(c
,REDIS_TAIL
); 
 301 void pushxGenericCommand(redisClient 
*c
, robj 
*refval
, robj 
*val
, int where
) { 
 303     listTypeIterator 
*iter
; 
 307     if ((subject 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 308         checkType(c
,subject
,REDIS_LIST
)) return; 
 310     if (refval 
!= NULL
) { 
 311         /* Note: we expect refval to be string-encoded because it is *not* the 
 312          * last argument of the multi-bulk LINSERT. */ 
 313         redisAssert(refval
->encoding 
== REDIS_ENCODING_RAW
); 
 315         /* We're not sure if this value can be inserted yet, but we cannot 
 316          * convert the list inside the iterator. We don't want to loop over 
 317          * the list twice (once to see if the value can be inserted and once 
 318          * to do the actual insert), so we assume this value can be inserted 
 319          * and convert the ziplist to a regular list if necessary. */ 
 320         listTypeTryConversion(subject
,val
); 
 322         /* Seek refval from head to tail */ 
 323         iter 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 324         while (listTypeNext(iter
,&entry
)) { 
 325             if (listTypeEqual(&entry
,refval
)) { 
 326                 listTypeInsert(&entry
,val
,where
); 
 331         listTypeReleaseIterator(iter
); 
 334             /* Check if the length exceeds the ziplist length threshold. */ 
 335             if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST 
&& 
 336                 ziplistLen(subject
->ptr
) > server
.list_max_ziplist_entries
) 
 337                     listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
 338             signalModifiedKey(c
->db
,c
->argv
[1]); 
 341             /* Notify client of a failed insert */ 
 342             addReply(c
,shared
.cnegone
); 
 346         listTypePush(subject
,val
,where
); 
 347         signalModifiedKey(c
->db
,c
->argv
[1]); 
 351     addReplyLongLong(c
,listTypeLength(subject
)); 
 354 void lpushxCommand(redisClient 
*c
) { 
 355     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 356     pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_HEAD
); 
 359 void rpushxCommand(redisClient 
*c
) { 
 360     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 361     pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_TAIL
); 
 364 void linsertCommand(redisClient 
*c
) { 
 365     c
->argv
[4] = tryObjectEncoding(c
->argv
[4]); 
 366     if (strcasecmp(c
->argv
[2]->ptr
,"after") == 0) { 
 367         pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_TAIL
); 
 368     } else if (strcasecmp(c
->argv
[2]->ptr
,"before") == 0) { 
 369         pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_HEAD
); 
 371         addReply(c
,shared
.syntaxerr
); 
 375 void llenCommand(redisClient 
*c
) { 
 376     robj 
*o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
); 
 377     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 378     addReplyLongLong(c
,listTypeLength(o
)); 
 381 void lindexCommand(redisClient 
*c
) { 
 382     robj 
*o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
); 
 383     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 384     int index 
= atoi(c
->argv
[2]->ptr
); 
 387     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 392         p 
= ziplistIndex(o
->ptr
,index
); 
 393         if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) { 
 395                 value 
= createStringObject((char*)vstr
,vlen
); 
 397                 value 
= createStringObjectFromLongLong(vlong
); 
 399             addReplyBulk(c
,value
); 
 402             addReply(c
,shared
.nullbulk
); 
 404     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 405         listNode 
*ln 
= listIndex(o
->ptr
,index
); 
 407             value 
= listNodeValue(ln
); 
 408             addReplyBulk(c
,value
); 
 410             addReply(c
,shared
.nullbulk
); 
 413         redisPanic("Unknown list encoding"); 
 417 void lsetCommand(redisClient 
*c
) { 
 418     robj 
*o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nokeyerr
); 
 419     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 420     int index 
= atoi(c
->argv
[2]->ptr
); 
 421     robj 
*value 
= (c
->argv
[3] = tryObjectEncoding(c
->argv
[3])); 
 423     listTypeTryConversion(o
,value
); 
 424     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 425         unsigned char *p
, *zl 
= o
->ptr
; 
 426         p 
= ziplistIndex(zl
,index
); 
 428             addReply(c
,shared
.outofrangeerr
); 
 430             o
->ptr 
= ziplistDelete(o
->ptr
,&p
); 
 431             value 
= getDecodedObject(value
); 
 432             o
->ptr 
= ziplistInsert(o
->ptr
,p
,value
->ptr
,sdslen(value
->ptr
)); 
 434             addReply(c
,shared
.ok
); 
 435             signalModifiedKey(c
->db
,c
->argv
[1]); 
 438     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 439         listNode 
*ln 
= listIndex(o
->ptr
,index
); 
 441             addReply(c
,shared
.outofrangeerr
); 
 443             decrRefCount((robj
*)listNodeValue(ln
)); 
 444             listNodeValue(ln
) = value
; 
 446             addReply(c
,shared
.ok
); 
 447             signalModifiedKey(c
->db
,c
->argv
[1]); 
 451         redisPanic("Unknown list encoding"); 
 455 void popGenericCommand(redisClient 
*c
, int where
) { 
 456     robj 
*o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
); 
 457     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 459     robj 
*value 
= listTypePop(o
,where
); 
 461         addReply(c
,shared
.nullbulk
); 
 463         addReplyBulk(c
,value
); 
 465         if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 466         signalModifiedKey(c
->db
,c
->argv
[1]); 
 471 void lpopCommand(redisClient 
*c
) { 
 472     popGenericCommand(c
,REDIS_HEAD
); 
 475 void rpopCommand(redisClient 
*c
) { 
 476     popGenericCommand(c
,REDIS_TAIL
); 
 479 void lrangeCommand(redisClient 
*c
) { 
 481     int start 
= atoi(c
->argv
[2]->ptr
); 
 482     int end 
= atoi(c
->argv
[3]->ptr
); 
 486     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.emptymultibulk
)) == NULL
 
 487          || checkType(c
,o
,REDIS_LIST
)) return; 
 488     llen 
= listTypeLength(o
); 
 490     /* convert negative indexes */ 
 491     if (start 
< 0) start 
= llen
+start
; 
 492     if (end 
< 0) end 
= llen
+end
; 
 493     if (start 
< 0) start 
= 0; 
 495     /* Invariant: start >= 0, so this test will be true when end < 0. 
 496      * The range is empty when start > end or start >= length. */ 
 497     if (start 
> end 
|| start 
>= llen
) { 
 498         addReply(c
,shared
.emptymultibulk
); 
 501     if (end 
>= llen
) end 
= llen
-1; 
 502     rangelen 
= (end
-start
)+1; 
 504     /* Return the result in form of a multi-bulk reply */ 
 505     addReplyMultiBulkLen(c
,rangelen
); 
 506     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 507         unsigned char *p 
= ziplistIndex(o
->ptr
,start
); 
 513             ziplistGet(p
,&vstr
,&vlen
,&vlong
); 
 515                 addReplyBulkCBuffer(c
,vstr
,vlen
); 
 517                 addReplyBulkLongLong(c
,vlong
); 
 519             p 
= ziplistNext(o
->ptr
,p
); 
 521     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 522         listNode 
*ln 
= listIndex(o
->ptr
,start
); 
 525             addReplyBulk(c
,ln
->value
); 
 529         redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!"); 
 533 void ltrimCommand(redisClient 
*c
) { 
 535     int start 
= atoi(c
->argv
[2]->ptr
); 
 536     int end 
= atoi(c
->argv
[3]->ptr
); 
 542     if ((o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.ok
)) == NULL 
|| 
 543         checkType(c
,o
,REDIS_LIST
)) return; 
 544     llen 
= listTypeLength(o
); 
 546     /* convert negative indexes */ 
 547     if (start 
< 0) start 
= llen
+start
; 
 548     if (end 
< 0) end 
= llen
+end
; 
 549     if (start 
< 0) start 
= 0; 
 551     /* Invariant: start >= 0, so this test will be true when end < 0. 
 552      * The range is empty when start > end or start >= length. */ 
 553     if (start 
> end 
|| start 
>= llen
) { 
 554         /* Out of range start or start > end result in empty list */ 
 558         if (end 
>= llen
) end 
= llen
-1; 
 563     /* Remove list elements to perform the trim */ 
 564     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 565         o
->ptr 
= ziplistDeleteRange(o
->ptr
,0,ltrim
); 
 566         o
->ptr 
= ziplistDeleteRange(o
->ptr
,-rtrim
,rtrim
); 
 567     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 569         for (j 
= 0; j 
< ltrim
; j
++) { 
 570             ln 
= listFirst(list
); 
 571             listDelNode(list
,ln
); 
 573         for (j 
= 0; j 
< rtrim
; j
++) { 
 575             listDelNode(list
,ln
); 
 578         redisPanic("Unknown list encoding"); 
 580     if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 581     signalModifiedKey(c
->db
,c
->argv
[1]); 
 583     addReply(c
,shared
.ok
); 
 586 void lremCommand(redisClient 
*c
) { 
 588     obj 
= c
->argv
[3] = tryObjectEncoding(c
->argv
[3]); 
 589     int toremove 
= atoi(c
->argv
[2]->ptr
); 
 593     subject 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
); 
 594     if (subject 
== NULL 
|| checkType(c
,subject
,REDIS_LIST
)) return; 
 596     /* Make sure obj is raw when we're dealing with a ziplist */ 
 597     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) 
 598         obj 
= getDecodedObject(obj
); 
 600     listTypeIterator 
*li
; 
 602         toremove 
= -toremove
; 
 603         li 
= listTypeInitIterator(subject
,-1,REDIS_HEAD
); 
 605         li 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 608     while (listTypeNext(li
,&entry
)) { 
 609         if (listTypeEqual(&entry
,obj
)) { 
 610             listTypeDelete(&entry
); 
 613             if (toremove 
&& removed 
== toremove
) break; 
 616     listTypeReleaseIterator(li
); 
 618     /* Clean up raw encoded object */ 
 619     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) 
 622     if (listTypeLength(subject
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 623     addReplyLongLong(c
,removed
); 
 624     if (removed
) signalModifiedKey(c
->db
,c
->argv
[1]); 
 627 /* This is the semantic of this command: 
 628  *  RPOPLPUSH srclist dstlist: 
 629  *    IF LLEN(srclist) > 0 
 630  *      element = RPOP srclist 
 631  *      LPUSH dstlist element 
 638  * The idea is to be able to get an element from a list in a reliable way 
 639  * since the element is not just returned but pushed against another list 
 640  * as well. This command was originally proposed by Ezra Zygmuntowicz. 
 643 void rpoplpushHandlePush(redisClient 
*origclient
, redisClient 
*c
, robj 
*dstkey
, robj 
*dstobj
, robj 
*value
) { 
 646     if (!handleClientsWaitingListPush(c
,dstkey
,value
)) { 
 647         /* Create the list if the key does not exist */ 
 649             dstobj 
= createZiplistObject(); 
 650             dbAdd(c
->db
,dstkey
,dstobj
); 
 652             signalModifiedKey(c
->db
,dstkey
); 
 654         listTypePush(dstobj
,value
,REDIS_HEAD
); 
 655         /* If we are pushing as a result of LPUSH against a key 
 656          * watched by BLPOPLPUSH, we need to rewrite the command vector. 
 657          * But if this is called directly by RPOPLPUSH (either directly 
 658          * or via a BRPOPLPUSH where the popped list exists) 
 659          * we should replicate the BRPOPLPUSH command itself. */ 
 660         if (c 
!= origclient
) { 
 661             aux 
= createStringObject("LPUSH",5); 
 662             rewriteClientCommandVector(origclient
,3,aux
,dstkey
,value
); 
 665             /* Make sure to always use RPOPLPUSH in the replication / AOF, 
 666              * even if the original command was BRPOPLPUSH. */ 
 667             aux 
= createStringObject("RPOPLPUSH",9); 
 668             rewriteClientCommandVector(origclient
,3,aux
,c
->argv
[1],c
->argv
[2]); 
 674     /* Always send the pushed value to the client. */ 
 675     addReplyBulk(c
,value
); 
 678 void rpoplpushCommand(redisClient 
*c
) { 
 680     if ((sobj 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
 681         checkType(c
,sobj
,REDIS_LIST
)) return; 
 683     if (listTypeLength(sobj
) == 0) { 
 684         addReply(c
,shared
.nullbulk
); 
 686         robj 
*dobj 
= lookupKeyWrite(c
->db
,c
->argv
[2]); 
 687         robj 
*touchedkey 
= c
->argv
[1]; 
 689         if (dobj 
&& checkType(c
,dobj
,REDIS_LIST
)) return; 
 690         value 
= listTypePop(sobj
,REDIS_TAIL
); 
 691         /* We saved touched key, and protect it, since rpoplpushHandlePush 
 692          * may change the client command argument vector. */ 
 693         incrRefCount(touchedkey
); 
 694         rpoplpushHandlePush(c
,c
,c
->argv
[2],dobj
,value
); 
 696         /* listTypePop returns an object with its refcount incremented */ 
 699         /* Delete the source list when it is empty */ 
 700         if (listTypeLength(sobj
) == 0) dbDelete(c
->db
,touchedkey
); 
 701         signalModifiedKey(c
->db
,touchedkey
); 
 702         decrRefCount(touchedkey
); 
 707 /*----------------------------------------------------------------------------- 
 708  * Blocking POP operations 
 709  *----------------------------------------------------------------------------*/ 
 711 /* Currently Redis blocking operations support is limited to list POP ops, 
 712  * so the current implementation is not fully generic, but it is also not 
 713  * completely specific so it will not require a rewrite to support new 
 714  * kind of blocking operations in the future. 
 716  * Still it's important to note that list blocking operations can be already 
 717  * used as a notification mechanism in order to implement other blocking 
 718  * operations at application level, so there must be a very strong evidence 
 719  * of usefulness and generality before new blocking operations are implemented. 
 721  * This is how the current blocking POP works, we use BLPOP as example: 
 722  * - If the user calls BLPOP and the key exists and contains a non empty list 
 723  *   then LPOP is called instead. So BLPOP is semantically the same as LPOP 
 724  *   if there is not to block. 
 725  * - If instead BLPOP is called and the key does not exists or the list is 
 726  *   empty we need to block. In order to do so we remove the notification for 
 727  *   new data to read in the client socket (so that we'll not serve new 
 728  *   requests if the blocking request is not served). Also we put the client 
 729  *   in a dictionary (db->blocking_keys) mapping keys to a list of clients 
 730  *   blocking for this keys. 
 731  * - If a PUSH operation against a key with blocked clients waiting is 
 732  *   performed, we serve the first in the list: basically instead to push 
 733  *   the new element inside the list we return it to the (first / oldest) 
 734  *   blocking client, unblock the client, and remove it form the list. 
 736  * The above comment and the source code should be enough in order to understand 
 737  * the implementation and modify / fix it later. 
 740 /* Set a client in blocking mode for the specified key, with the specified 
 742 void blockForKeys(redisClient 
*c
, robj 
**keys
, int numkeys
, time_t timeout
, robj 
*target
) { 
 747     c
->bpop
.keys 
= zmalloc(sizeof(robj
*)*numkeys
); 
 748     c
->bpop
.count 
= numkeys
; 
 749     c
->bpop
.timeout 
= timeout
; 
 750     c
->bpop
.target 
= target
; 
 752     if (target 
!= NULL
) { 
 753         incrRefCount(target
); 
 756     for (j 
= 0; j 
< numkeys
; j
++) { 
 757         /* Add the key in the client structure, to map clients -> keys */ 
 758         c
->bpop
.keys
[j
] = keys
[j
]; 
 759         incrRefCount(keys
[j
]); 
 761         /* And in the other "side", to map keys -> clients */ 
 762         de 
= dictFind(c
->db
->blocking_keys
,keys
[j
]); 
 766             /* For every key we take a list of clients blocked for it */ 
 768             retval 
= dictAdd(c
->db
->blocking_keys
,keys
[j
],l
); 
 769             incrRefCount(keys
[j
]); 
 770             redisAssert(retval 
== DICT_OK
); 
 772             l 
= dictGetEntryVal(de
); 
 774         listAddNodeTail(l
,c
); 
 776     /* Mark the client as a blocked client */ 
 777     c
->flags 
|= REDIS_BLOCKED
; 
 778     server
.bpop_blocked_clients
++; 
 781 /* Unblock a client that's waiting in a blocking operation such as BLPOP */ 
 782 void unblockClientWaitingData(redisClient 
*c
) { 
 787     redisAssert(c
->bpop
.keys 
!= NULL
); 
 788     /* The client may wait for multiple keys, so unblock it for every key. */ 
 789     for (j 
= 0; j 
< c
->bpop
.count
; j
++) { 
 790         /* Remove this client from the list of clients waiting for this key. */ 
 791         de 
= dictFind(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]); 
 792         redisAssert(de 
!= NULL
); 
 793         l 
= dictGetEntryVal(de
); 
 794         listDelNode(l
,listSearchKey(l
,c
)); 
 795         /* If the list is empty we need to remove it to avoid wasting memory */ 
 796         if (listLength(l
) == 0) 
 797             dictDelete(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]); 
 798         decrRefCount(c
->bpop
.keys
[j
]); 
 801     /* Cleanup the client structure */ 
 804     if (c
->bpop
.target
) decrRefCount(c
->bpop
.target
); 
 805     c
->bpop
.target 
= NULL
; 
 806     c
->flags 
&= ~REDIS_BLOCKED
; 
 807     c
->flags 
|= REDIS_UNBLOCKED
; 
 808     server
.bpop_blocked_clients
--; 
 809     listAddNodeTail(server
.unblocked_clients
,c
); 
 812 /* This should be called from any function PUSHing into lists. 
 813  * 'c' is the "pushing client", 'key' is the key it is pushing data against, 
 814  * 'ele' is the element pushed. 
 816  * If the function returns 0 there was no client waiting for a list push 
 819  * If the function returns 1 there was a client waiting for a list push 
 820  * against this key, the element was passed to this client thus it's not 
 821  * needed to actually add it to the list and the caller should return asap. */ 
 822 int handleClientsWaitingListPush(redisClient 
*c
, robj 
*key
, robj 
*ele
) { 
 823     struct dictEntry 
*de
; 
 824     redisClient 
*receiver
; 
 828     robj 
*dstkey
, *dstobj
; 
 830     de 
= dictFind(c
->db
->blocking_keys
,key
); 
 831     if (de 
== NULL
) return 0; 
 832     clients 
= dictGetEntryVal(de
); 
 833     numclients 
= listLength(clients
); 
 835     /* Try to handle the push as long as there are clients waiting for a push. 
 836      * Note that "numclients" is used because the list of clients waiting for a 
 837      * push on "key" is deleted by unblockClient() when empty. 
 839      * This loop will have more than 1 iteration when there is a BRPOPLPUSH 
 840      * that cannot push the target list because it does not contain a list. If 
 841      * this happens, it simply tries the next client waiting for a push. */ 
 842     while (numclients
--) { 
 843         ln 
= listFirst(clients
); 
 844         redisAssert(ln 
!= NULL
); 
 845         receiver 
= ln
->value
; 
 846         dstkey 
= receiver
->bpop
.target
; 
 848         /* Protect receiver->bpop.target, that will be freed by 
 849          * the next unblockClientWaitingData() call. */ 
 850         if (dstkey
) incrRefCount(dstkey
); 
 852         /* This should remove the first element of the "clients" list. */ 
 853         unblockClientWaitingData(receiver
); 
 855         if (dstkey 
== NULL
) { 
 857             addReplyMultiBulkLen(receiver
,2); 
 858             addReplyBulk(receiver
,key
); 
 859             addReplyBulk(receiver
,ele
); 
 860             return 1; /* Serve just the first client as in B[RL]POP semantics */ 
 862             /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */ 
 863             dstobj 
= lookupKeyWrite(receiver
->db
,dstkey
); 
 864             if (!(dstobj 
&& checkType(receiver
,dstobj
,REDIS_LIST
))) { 
 865                 rpoplpushHandlePush(c
,receiver
,dstkey
,dstobj
,ele
); 
 866                 decrRefCount(dstkey
); 
 869             decrRefCount(dstkey
); 
 876 int getTimeoutFromObjectOrReply(redisClient 
*c
, robj 
*object
, time_t *timeout
) { 
 879     if (getLongFromObjectOrReply(c
,object
,&tval
, 
 880         "timeout is not an integer or out of range") != REDIS_OK
) 
 884         addReplyError(c
,"timeout is negative"); 
 888     if (tval 
> 0) tval 
+= time(NULL
); 
 894 /* Blocking RPOP/LPOP */ 
 895 void blockingPopGenericCommand(redisClient 
*c
, int where
) { 
 900     if (getTimeoutFromObjectOrReply(c
,c
->argv
[c
->argc
-1],&timeout
) != REDIS_OK
) 
 903     for (j 
= 1; j 
< c
->argc
-1; j
++) { 
 904         o 
= lookupKeyWrite(c
->db
,c
->argv
[j
]); 
 906             if (o
->type 
!= REDIS_LIST
) { 
 907                 addReply(c
,shared
.wrongtypeerr
); 
 910                 if (listTypeLength(o
) != 0) { 
 911                     /* If the list contains elements fall back to the usual 
 912                      * non-blocking POP operation */ 
 913                     struct redisCommand 
*orig_cmd
; 
 914                     robj 
*argv
[2], **orig_argv
; 
 917                     /* We need to alter the command arguments before to call 
 918                      * popGenericCommand() as the command takes a single key. */ 
 922                     argv
[1] = c
->argv
[j
]; 
 926                     /* Also the return value is different, we need to output 
 927                      * the multi bulk reply header and the key name. The 
 928                      * "real" command will add the last element (the value) 
 929                      * for us. If this souds like an hack to you it's just 
 930                      * because it is... */ 
 931                     addReplyMultiBulkLen(c
,2); 
 932                     addReplyBulk(c
,argv
[1]); 
 934                     popGenericCommand(c
,where
); 
 936                     /* Fix the client structure with the original stuff */ 
 947     /* If we are inside a MULTI/EXEC and the list is empty the only thing 
 948      * we can do is treating it as a timeout (even with timeout 0). */ 
 949     if (c
->flags 
& REDIS_MULTI
) { 
 950         addReply(c
,shared
.nullmultibulk
); 
 954     /* If the list is empty or the key does not exists we must block */ 
 955     blockForKeys(c
, c
->argv 
+ 1, c
->argc 
- 2, timeout
, NULL
); 
 958 void blpopCommand(redisClient 
*c
) { 
 959     blockingPopGenericCommand(c
,REDIS_HEAD
); 
 962 void brpopCommand(redisClient 
*c
) { 
 963     blockingPopGenericCommand(c
,REDIS_TAIL
); 
 966 void brpoplpushCommand(redisClient 
*c
) { 
 969     if (getTimeoutFromObjectOrReply(c
,c
->argv
[3],&timeout
) != REDIS_OK
) 
 972     robj 
*key 
= lookupKeyWrite(c
->db
, c
->argv
[1]); 
 975         if (c
->flags 
& REDIS_MULTI
) { 
 977             /* Blocking against an empty list in a multi state 
 978              * returns immediately. */ 
 979             addReply(c
, shared
.nullbulk
); 
 981             /* The list is empty and the client blocks. */ 
 982             blockForKeys(c
, c
->argv 
+ 1, 1, timeout
, c
->argv
[2]); 
 985         if (key
->type 
!= REDIS_LIST
) { 
 986             addReply(c
, shared
.wrongtypeerr
); 
 989             /* The list exists and has elements, so 
 990              * the regular rpoplpushCommand is executed. */ 
 991             redisAssert(listTypeLength(key
) > 0);