3 /*----------------------------------------------------------------------------- 
   5  *----------------------------------------------------------------------------*/ 
   7 /* Check the argument length to see if it requires us to convert the ziplist 
   8  * to a real list. Only check raw-encoded objects because integer encoded 
   9  * objects are never too long. */ 
  10 void listTypeTryConversion(robj 
*subject
, robj 
*value
) { 
  11     if (subject
->encoding 
!= REDIS_ENCODING_ZIPLIST
) return; 
  12     if (value
->encoding 
== REDIS_ENCODING_RAW 
&& 
  13         sdslen(value
->ptr
) > server
.list_max_ziplist_value
) 
  14             listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
  17 void listTypePush(robj 
*subject
, robj 
*value
, int where
) { 
  18     /* Check if we need to convert the ziplist */ 
  19     listTypeTryConversion(subject
,value
); 
  20     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST 
&& 
  21         ziplistLen(subject
->ptr
) >= server
.list_max_ziplist_entries
) 
  22             listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
  24     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  25         int pos 
= (where 
== REDIS_HEAD
) ? ZIPLIST_HEAD 
: ZIPLIST_TAIL
; 
  26         value 
= getDecodedObject(value
); 
  27         subject
->ptr 
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),pos
); 
  29     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  30         if (where 
== REDIS_HEAD
) { 
  31             listAddNodeHead(subject
->ptr
,value
); 
  33             listAddNodeTail(subject
->ptr
,value
); 
  37         redisPanic("Unknown list encoding"); 
  41 robj 
*listTypePop(robj 
*subject
, int where
) { 
  43     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  48         int pos 
= (where 
== REDIS_HEAD
) ? 0 : -1; 
  49         p 
= ziplistIndex(subject
->ptr
,pos
); 
  50         if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) { 
  52                 value 
= createStringObject((char*)vstr
,vlen
); 
  54                 value 
= createStringObjectFromLongLong(vlong
); 
  56             /* We only need to delete an element when it exists */ 
  57             subject
->ptr 
= ziplistDelete(subject
->ptr
,&p
); 
  59     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  60         list 
*list 
= subject
->ptr
; 
  62         if (where 
== REDIS_HEAD
) { 
  68             value 
= listNodeValue(ln
); 
  73         redisPanic("Unknown list encoding"); 
  78 unsigned long listTypeLength(robj 
*subject
) { 
  79     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  80         return ziplistLen(subject
->ptr
); 
  81     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  82         return listLength((list
*)subject
->ptr
); 
  84         redisPanic("Unknown list encoding"); 
  88 /* Initialize an iterator at the specified index. */ 
  89 listTypeIterator 
*listTypeInitIterator(robj 
*subject
, int index
, unsigned char direction
) { 
  90     listTypeIterator 
*li 
= zmalloc(sizeof(listTypeIterator
)); 
  91     li
->subject 
= subject
; 
  92     li
->encoding 
= subject
->encoding
; 
  93     li
->direction 
= direction
; 
  94     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  95         li
->zi 
= ziplistIndex(subject
->ptr
,index
); 
  96     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  97         li
->ln 
= listIndex(subject
->ptr
,index
); 
  99         redisPanic("Unknown list encoding"); 
 104 /* Clean up the iterator. */ 
 105 void listTypeReleaseIterator(listTypeIterator 
*li
) { 
 109 /* Stores pointer to current the entry in the provided entry structure 
 110  * and advances the position of the iterator. Returns 1 when the current 
 111  * entry is in fact an entry, 0 otherwise. */ 
 112 int listTypeNext(listTypeIterator 
*li
, listTypeEntry 
*entry
) { 
 113     /* Protect from converting when iterating */ 
 114     redisAssert(li
->subject
->encoding 
== li
->encoding
); 
 117     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 119         if (entry
->zi 
!= NULL
) { 
 120             if (li
->direction 
== REDIS_TAIL
) 
 121                 li
->zi 
= ziplistNext(li
->subject
->ptr
,li
->zi
); 
 123                 li
->zi 
= ziplistPrev(li
->subject
->ptr
,li
->zi
); 
 126     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 128         if (entry
->ln 
!= NULL
) { 
 129             if (li
->direction 
== REDIS_TAIL
) 
 130                 li
->ln 
= li
->ln
->next
; 
 132                 li
->ln 
= li
->ln
->prev
; 
 136         redisPanic("Unknown list encoding"); 
 141 /* Return entry or NULL at the current position of the iterator. */ 
 142 robj 
*listTypeGet(listTypeEntry 
*entry
) { 
 143     listTypeIterator 
*li 
= entry
->li
; 
 145     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 149         redisAssert(entry
->zi 
!= NULL
); 
 150         if (ziplistGet(entry
->zi
,&vstr
,&vlen
,&vlong
)) { 
 152                 value 
= createStringObject((char*)vstr
,vlen
); 
 154                 value 
= createStringObjectFromLongLong(vlong
); 
 157     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 158         redisAssert(entry
->ln 
!= NULL
); 
 159         value 
= listNodeValue(entry
->ln
); 
 162         redisPanic("Unknown list encoding"); 
 167 void listTypeInsert(listTypeEntry 
*entry
, robj 
*value
, int where
) { 
 168     robj 
*subject 
= entry
->li
->subject
; 
 169     if (entry
->li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 170         value 
= getDecodedObject(value
); 
 171         if (where 
== REDIS_TAIL
) { 
 172             unsigned char *next 
= ziplistNext(subject
->ptr
,entry
->zi
); 
 174             /* When we insert after the current element, but the current element 
 175              * is the tail of the list, we need to do a push. */ 
 177                 subject
->ptr 
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),REDIS_TAIL
); 
 179                 subject
->ptr 
= ziplistInsert(subject
->ptr
,next
,value
->ptr
,sdslen(value
->ptr
)); 
 182             subject
->ptr 
= ziplistInsert(subject
->ptr
,entry
->zi
,value
->ptr
,sdslen(value
->ptr
)); 
 185     } else if (entry
->li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 186         if (where 
== REDIS_TAIL
) { 
 187             listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_TAIL
); 
 189             listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_HEAD
); 
 193         redisPanic("Unknown list encoding"); 
 197 /* Compare the given object with the entry at the current position. */ 
 198 int listTypeEqual(listTypeEntry 
*entry
, robj 
*o
) { 
 199     listTypeIterator 
*li 
= entry
->li
; 
 200     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 201         redisAssert(o
->encoding 
== REDIS_ENCODING_RAW
); 
 202         return ziplistCompare(entry
->zi
,o
->ptr
,sdslen(o
->ptr
)); 
 203     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 204         return equalStringObjects(o
,listNodeValue(entry
->ln
)); 
 206         redisPanic("Unknown list encoding"); 
 210 /* Delete the element pointed to. */ 
 211 void listTypeDelete(listTypeEntry 
*entry
) { 
 212     listTypeIterator 
*li 
= entry
->li
; 
 213     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 214         unsigned char *p 
= entry
->zi
; 
 215         li
->subject
->ptr 
= ziplistDelete(li
->subject
->ptr
,&p
); 
 217         /* Update position of the iterator depending on the direction */ 
 218         if (li
->direction 
== REDIS_TAIL
) 
 221             li
->zi 
= ziplistPrev(li
->subject
->ptr
,p
); 
 222     } else if (entry
->li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 224         if (li
->direction 
== REDIS_TAIL
) 
 225             next 
= entry
->ln
->next
; 
 227             next 
= entry
->ln
->prev
; 
 228         listDelNode(li
->subject
->ptr
,entry
->ln
); 
 231         redisPanic("Unknown list encoding"); 
 235 void listTypeConvert(robj 
*subject
, int enc
) { 
 236     listTypeIterator 
*li
; 
 238     redisAssert(subject
->type 
== REDIS_LIST
); 
 240     if (enc 
== REDIS_ENCODING_LINKEDLIST
) { 
 241         list 
*l 
= listCreate(); 
 242         listSetFreeMethod(l
,decrRefCount
); 
 244         /* listTypeGet returns a robj with incremented refcount */ 
 245         li 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 246         while (listTypeNext(li
,&entry
)) listAddNodeTail(l
,listTypeGet(&entry
)); 
 247         listTypeReleaseIterator(li
); 
 249         subject
->encoding 
= REDIS_ENCODING_LINKEDLIST
; 
 253         redisPanic("Unsupported list conversion"); 
 257 /*----------------------------------------------------------------------------- 
 259  *----------------------------------------------------------------------------*/ 
 261 void pushGenericCommand(redisClient 
*c
, int where
) { 
 262     robj 
*lobj 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
 263     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 265         if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) { 
 266             addReply(c
,shared
.cone
); 
 269         lobj 
= createZiplistObject(); 
 270         dbAdd(c
->db
,c
->argv
[1],lobj
); 
 272         if (lobj
->type 
!= REDIS_LIST
) { 
 273             addReply(c
,shared
.wrongtypeerr
); 
 276         if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[2])) { 
 277             touchWatchedKey(c
->db
,c
->argv
[1]); 
 278             addReply(c
,shared
.cone
); 
 282     listTypePush(lobj
,c
->argv
[2],where
); 
 283     addReplyLongLong(c
,listTypeLength(lobj
)); 
 284     touchWatchedKey(c
->db
,c
->argv
[1]); 
 288 void lpushCommand(redisClient 
*c
) { 
 289     pushGenericCommand(c
,REDIS_HEAD
); 
 292 void rpushCommand(redisClient 
*c
) { 
 293     pushGenericCommand(c
,REDIS_TAIL
); 
 296 void pushxGenericCommand(redisClient 
*c
, robj 
*refval
, robj 
*val
, int where
) { 
 298     listTypeIterator 
*iter
; 
 302     if ((subject 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 303         checkType(c
,subject
,REDIS_LIST
)) return; 
 305     if (refval 
!= NULL
) { 
 306         /* Note: we expect refval to be string-encoded because it is *not* the 
 307          * last argument of the multi-bulk LINSERT. */ 
 308         redisAssert(refval
->encoding 
== REDIS_ENCODING_RAW
); 
 310         /* We're not sure if this value can be inserted yet, but we cannot 
 311          * convert the list inside the iterator. We don't want to loop over 
 312          * the list twice (once to see if the value can be inserted and once 
 313          * to do the actual insert), so we assume this value can be inserted 
 314          * and convert the ziplist to a regular list if necessary. */ 
 315         listTypeTryConversion(subject
,val
); 
 317         /* Seek refval from head to tail */ 
 318         iter 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 319         while (listTypeNext(iter
,&entry
)) { 
 320             if (listTypeEqual(&entry
,refval
)) { 
 321                 listTypeInsert(&entry
,val
,where
); 
 326         listTypeReleaseIterator(iter
); 
 329             /* Check if the length exceeds the ziplist length threshold. */ 
 330             if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST 
&& 
 331                 ziplistLen(subject
->ptr
) > server
.list_max_ziplist_entries
) 
 332                     listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
 333             touchWatchedKey(c
->db
,c
->argv
[1]); 
 336             /* Notify client of a failed insert */ 
 337             addReply(c
,shared
.cnegone
); 
 341         listTypePush(subject
,val
,where
); 
 342         touchWatchedKey(c
->db
,c
->argv
[1]); 
 346     addReplyLongLong(c
,listTypeLength(subject
)); 
 349 void lpushxCommand(redisClient 
*c
) { 
 350     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 351     pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_HEAD
); 
 354 void rpushxCommand(redisClient 
*c
) { 
 355     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 356     pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_TAIL
); 
 359 void linsertCommand(redisClient 
*c
) { 
 360     c
->argv
[4] = tryObjectEncoding(c
->argv
[4]); 
 361     if (strcasecmp(c
->argv
[2]->ptr
,"after") == 0) { 
 362         pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_TAIL
); 
 363     } else if (strcasecmp(c
->argv
[2]->ptr
,"before") == 0) { 
 364         pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_HEAD
); 
 366         addReply(c
,shared
.syntaxerr
); 
 370 void llenCommand(redisClient 
*c
) { 
 371     robj 
*o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
); 
 372     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 373     addReplyLongLong(c
,listTypeLength(o
)); 
 376 void lindexCommand(redisClient 
*c
) { 
 377     robj 
*o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
); 
 378     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 379     int index 
= atoi(c
->argv
[2]->ptr
); 
 382     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 387         p 
= ziplistIndex(o
->ptr
,index
); 
 388         if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) { 
 390                 value 
= createStringObject((char*)vstr
,vlen
); 
 392                 value 
= createStringObjectFromLongLong(vlong
); 
 394             addReplyBulk(c
,value
); 
 397             addReply(c
,shared
.nullbulk
); 
 399     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 400         listNode 
*ln 
= listIndex(o
->ptr
,index
); 
 402             value 
= listNodeValue(ln
); 
 403             addReplyBulk(c
,value
); 
 405             addReply(c
,shared
.nullbulk
); 
 408         redisPanic("Unknown list encoding"); 
 412 void lsetCommand(redisClient 
*c
) { 
 413     robj 
*o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nokeyerr
); 
 414     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 415     int index 
= atoi(c
->argv
[2]->ptr
); 
 416     robj 
*value 
= (c
->argv
[3] = tryObjectEncoding(c
->argv
[3])); 
 418     listTypeTryConversion(o
,value
); 
 419     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 420         unsigned char *p
, *zl 
= o
->ptr
; 
 421         p 
= ziplistIndex(zl
,index
); 
 423             addReply(c
,shared
.outofrangeerr
); 
 425             o
->ptr 
= ziplistDelete(o
->ptr
,&p
); 
 426             value 
= getDecodedObject(value
); 
 427             o
->ptr 
= ziplistInsert(o
->ptr
,p
,value
->ptr
,sdslen(value
->ptr
)); 
 429             addReply(c
,shared
.ok
); 
 430             touchWatchedKey(c
->db
,c
->argv
[1]); 
 433     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 434         listNode 
*ln 
= listIndex(o
->ptr
,index
); 
 436             addReply(c
,shared
.outofrangeerr
); 
 438             decrRefCount((robj
*)listNodeValue(ln
)); 
 439             listNodeValue(ln
) = value
; 
 441             addReply(c
,shared
.ok
); 
 442             touchWatchedKey(c
->db
,c
->argv
[1]); 
 446         redisPanic("Unknown list encoding"); 
 450 void popGenericCommand(redisClient 
*c
, int where
) { 
 451     robj 
*o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
); 
 452     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 454     robj 
*value 
= listTypePop(o
,where
); 
 456         addReply(c
,shared
.nullbulk
); 
 458         addReplyBulk(c
,value
); 
 460         if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 461         touchWatchedKey(c
->db
,c
->argv
[1]); 
 466 void lpopCommand(redisClient 
*c
) { 
 467     popGenericCommand(c
,REDIS_HEAD
); 
 470 void rpopCommand(redisClient 
*c
) { 
 471     popGenericCommand(c
,REDIS_TAIL
); 
 474 void lrangeCommand(redisClient 
*c
) { 
 476     int start 
= atoi(c
->argv
[2]->ptr
); 
 477     int end 
= atoi(c
->argv
[3]->ptr
); 
 481     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.emptymultibulk
)) == NULL
 
 482          || checkType(c
,o
,REDIS_LIST
)) return; 
 483     llen 
= listTypeLength(o
); 
 485     /* convert negative indexes */ 
 486     if (start 
< 0) start 
= llen
+start
; 
 487     if (end 
< 0) end 
= llen
+end
; 
 488     if (start 
< 0) start 
= 0; 
 490     /* Invariant: start >= 0, so this test will be true when end < 0. 
 491      * The range is empty when start > end or start >= length. */ 
 492     if (start 
> end 
|| start 
>= llen
) { 
 493         addReply(c
,shared
.emptymultibulk
); 
 496     if (end 
>= llen
) end 
= llen
-1; 
 497     rangelen 
= (end
-start
)+1; 
 499     /* Return the result in form of a multi-bulk reply */ 
 500     addReplyMultiBulkLen(c
,rangelen
); 
 501     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 502         unsigned char *p 
= ziplistIndex(o
->ptr
,start
); 
 508             ziplistGet(p
,&vstr
,&vlen
,&vlong
); 
 510                 addReplyBulkCBuffer(c
,vstr
,vlen
); 
 512                 addReplyBulkLongLong(c
,vlong
); 
 514             p 
= ziplistNext(o
->ptr
,p
); 
 516     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 517         listNode 
*ln 
= listIndex(o
->ptr
,start
); 
 520             addReplyBulk(c
,ln
->value
); 
 524         redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!"); 
 528 void ltrimCommand(redisClient 
*c
) { 
 530     int start 
= atoi(c
->argv
[2]->ptr
); 
 531     int end 
= atoi(c
->argv
[3]->ptr
); 
 537     if ((o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.ok
)) == NULL 
|| 
 538         checkType(c
,o
,REDIS_LIST
)) return; 
 539     llen 
= listTypeLength(o
); 
 541     /* convert negative indexes */ 
 542     if (start 
< 0) start 
= llen
+start
; 
 543     if (end 
< 0) end 
= llen
+end
; 
 544     if (start 
< 0) start 
= 0; 
 546     /* Invariant: start >= 0, so this test will be true when end < 0. 
 547      * The range is empty when start > end or start >= length. */ 
 548     if (start 
> end 
|| start 
>= llen
) { 
 549         /* Out of range start or start > end result in empty list */ 
 553         if (end 
>= llen
) end 
= llen
-1; 
 558     /* Remove list elements to perform the trim */ 
 559     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 560         o
->ptr 
= ziplistDeleteRange(o
->ptr
,0,ltrim
); 
 561         o
->ptr 
= ziplistDeleteRange(o
->ptr
,-rtrim
,rtrim
); 
 562     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 564         for (j 
= 0; j 
< ltrim
; j
++) { 
 565             ln 
= listFirst(list
); 
 566             listDelNode(list
,ln
); 
 568         for (j 
= 0; j 
< rtrim
; j
++) { 
 570             listDelNode(list
,ln
); 
 573         redisPanic("Unknown list encoding"); 
 575     if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 576     touchWatchedKey(c
->db
,c
->argv
[1]); 
 578     addReply(c
,shared
.ok
); 
 581 void lremCommand(redisClient 
*c
) { 
 583     obj 
= c
->argv
[3] = tryObjectEncoding(c
->argv
[3]); 
 584     int toremove 
= atoi(c
->argv
[2]->ptr
); 
 588     subject 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
); 
 589     if (subject 
== NULL 
|| checkType(c
,subject
,REDIS_LIST
)) return; 
 591     /* Make sure obj is raw when we're dealing with a ziplist */ 
 592     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) 
 593         obj 
= getDecodedObject(obj
); 
 595     listTypeIterator 
*li
; 
 597         toremove 
= -toremove
; 
 598         li 
= listTypeInitIterator(subject
,-1,REDIS_HEAD
); 
 600         li 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 603     while (listTypeNext(li
,&entry
)) { 
 604         if (listTypeEqual(&entry
,obj
)) { 
 605             listTypeDelete(&entry
); 
 608             if (toremove 
&& removed 
== toremove
) break; 
 611     listTypeReleaseIterator(li
); 
 613     /* Clean up raw encoded object */ 
 614     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) 
 617     if (listTypeLength(subject
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 618     addReplyLongLong(c
,removed
); 
 619     if (removed
) touchWatchedKey(c
->db
,c
->argv
[1]); 
 622 /* This is the semantic of this command: 
 623  *  RPOPLPUSH srclist dstlist: 
 624  *    IF LLEN(srclist) > 0 
 625  *      element = RPOP srclist 
 626  *      LPUSH dstlist element 
 633  * The idea is to be able to get an element from a list in a reliable way 
 634  * since the element is not just returned but pushed against another list 
 635  * as well. This command was originally proposed by Ezra Zygmuntowicz. 
 638 void rpoplpushHandlePush(redisClient 
*c
, robj 
*dstkey
, robj 
*dstobj
, robj 
*value
) { 
 639     if (!handleClientsWaitingListPush(c
,dstkey
,value
)) { 
 640         /* Create the list if the key does not exist */ 
 642             dstobj 
= createZiplistObject(); 
 643             dbAdd(c
->db
,dstkey
,dstobj
); 
 645             touchWatchedKey(c
->db
,dstkey
); 
 648         listTypePush(dstobj
,value
,REDIS_HEAD
); 
 651     /* Always send the pushed value to the client. */ 
 652     addReplyBulk(c
,value
); 
 655 void rpoplpushCommand(redisClient 
*c
) { 
 657     if ((sobj 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
 658         checkType(c
,sobj
,REDIS_LIST
)) return; 
 660     if (listTypeLength(sobj
) == 0) { 
 661         addReply(c
,shared
.nullbulk
); 
 663         robj 
*dobj 
= lookupKeyWrite(c
->db
,c
->argv
[2]); 
 664         if (dobj 
&& checkType(c
,dobj
,REDIS_LIST
)) return; 
 665         value 
= listTypePop(sobj
,REDIS_TAIL
); 
 666         rpoplpushHandlePush(c
,c
->argv
[2],dobj
,value
); 
 668         /* listTypePop returns an object with its refcount incremented */ 
 671         /* Delete the source list when it is empty */ 
 672         if (listTypeLength(sobj
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 673         touchWatchedKey(c
->db
,c
->argv
[1]); 
 678 /*----------------------------------------------------------------------------- 
 679  * Blocking POP operations 
 680  *----------------------------------------------------------------------------*/ 
 682 /* Currently Redis blocking operations support is limited to list POP ops, 
 683  * so the current implementation is not fully generic, but it is also not 
 684  * completely specific so it will not require a rewrite to support new 
 685  * kind of blocking operations in the future. 
 687  * Still it's important to note that list blocking operations can be already 
 688  * used as a notification mechanism in order to implement other blocking 
 689  * operations at application level, so there must be a very strong evidence 
 690  * of usefulness and generality before new blocking operations are implemented. 
 692  * This is how the current blocking POP works, we use BLPOP as example: 
 693  * - If the user calls BLPOP and the key exists and contains a non empty list 
 694  *   then LPOP is called instead. So BLPOP is semantically the same as LPOP 
 695  *   if there is not to block. 
 696  * - If instead BLPOP is called and the key does not exists or the list is 
 697  *   empty we need to block. In order to do so we remove the notification for 
 698  *   new data to read in the client socket (so that we'll not serve new 
 699  *   requests if the blocking request is not served). Also we put the client 
 700  *   in a dictionary (db->blocking_keys) mapping keys to a list of clients 
 701  *   blocking for this keys. 
 702  * - If a PUSH operation against a key with blocked clients waiting is 
 703  *   performed, we serve the first in the list: basically instead to push 
 704  *   the new element inside the list we return it to the (first / oldest) 
 705  *   blocking client, unblock the client, and remove it form the list. 
 707  * The above comment and the source code should be enough in order to understand 
 708  * the implementation and modify / fix it later. 
 711 /* Set a client in blocking mode for the specified key, with the specified 
 713 void blockForKeys(redisClient 
*c
, robj 
**keys
, int numkeys
, time_t timeout
, robj 
*target
) { 
 718     c
->bpop
.keys 
= zmalloc(sizeof(robj
*)*numkeys
); 
 719     c
->bpop
.count 
= numkeys
; 
 720     c
->bpop
.timeout 
= timeout
; 
 721     c
->bpop
.target 
= target
; 
 723     if (target 
!= NULL
) { 
 724         incrRefCount(target
); 
 727     for (j 
= 0; j 
< numkeys
; j
++) { 
 728         /* Add the key in the client structure, to map clients -> keys */ 
 729         c
->bpop
.keys
[j
] = keys
[j
]; 
 730         incrRefCount(keys
[j
]); 
 732         /* And in the other "side", to map keys -> clients */ 
 733         de 
= dictFind(c
->db
->blocking_keys
,keys
[j
]); 
 737             /* For every key we take a list of clients blocked for it */ 
 739             retval 
= dictAdd(c
->db
->blocking_keys
,keys
[j
],l
); 
 740             incrRefCount(keys
[j
]); 
 741             redisAssert(retval 
== DICT_OK
); 
 743             l 
= dictGetEntryVal(de
); 
 745         listAddNodeTail(l
,c
); 
 747     /* Mark the client as a blocked client */ 
 748     c
->flags 
|= REDIS_BLOCKED
; 
 749     server
.bpop_blocked_clients
++; 
 752 /* Unblock a client that's waiting in a blocking operation such as BLPOP */ 
 753 void unblockClientWaitingData(redisClient 
*c
) { 
 758     redisAssert(c
->bpop
.keys 
!= NULL
); 
 759     /* The client may wait for multiple keys, so unblock it for every key. */ 
 760     for (j 
= 0; j 
< c
->bpop
.count
; j
++) { 
 761         /* Remove this client from the list of clients waiting for this key. */ 
 762         de 
= dictFind(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]); 
 763         redisAssert(de 
!= NULL
); 
 764         l 
= dictGetEntryVal(de
); 
 765         listDelNode(l
,listSearchKey(l
,c
)); 
 766         /* If the list is empty we need to remove it to avoid wasting memory */ 
 767         if (listLength(l
) == 0) 
 768             dictDelete(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]); 
 769         decrRefCount(c
->bpop
.keys
[j
]); 
 772     /* Cleanup the client structure */ 
 775     c
->bpop
.target 
= NULL
; 
 776     c
->flags 
&= (~REDIS_BLOCKED
); 
 777     server
.bpop_blocked_clients
--; 
 778     listAddNodeTail(server
.unblocked_clients
,c
); 
 781 /* This should be called from any function PUSHing into lists. 
 782  * 'c' is the "pushing client", 'key' is the key it is pushing data against, 
 783  * 'ele' is the element pushed. 
 785  * If the function returns 0 there was no client waiting for a list push 
 788  * If the function returns 1 there was a client waiting for a list push 
 789  * against this key, the element was passed to this client thus it's not 
 790  * needed to actually add it to the list and the caller should return asap. */ 
 791 int handleClientsWaitingListPush(redisClient 
*c
, robj 
*key
, robj 
*ele
) { 
 792     struct dictEntry 
*de
; 
 793     redisClient 
*receiver
; 
 797     robj 
*dstkey
, *dstobj
; 
 799     de 
= dictFind(c
->db
->blocking_keys
,key
); 
 800     if (de 
== NULL
) return 0; 
 801     clients 
= dictGetEntryVal(de
); 
 802     numclients 
= listLength(clients
); 
 804     /* Try to handle the push as long as there are clients waiting for a push. 
 805      * Note that "numclients" is used because the list of clients waiting for a 
 806      * push on "key" is deleted by unblockClient() when empty. 
 808      * This loop will have more than 1 iteration when there is a BRPOPLPUSH 
 809      * that cannot push the target list because it does not contain a list. If 
 810      * this happens, it simply tries the next client waiting for a push. */ 
 811     while (numclients
--) { 
 812         ln 
= listFirst(clients
); 
 813         redisAssert(ln 
!= NULL
); 
 814         receiver 
= ln
->value
; 
 815         dstkey 
= receiver
->bpop
.target
; 
 817         /* This should remove the first element of the "clients" list. */ 
 818         unblockClientWaitingData(receiver
); 
 819         redisAssert(ln 
!= listFirst(clients
)); 
 821         if (dstkey 
== NULL
) { 
 823             addReplyMultiBulkLen(receiver
,2); 
 824             addReplyBulk(receiver
,key
); 
 825             addReplyBulk(receiver
,ele
); 
 829             dstobj 
= lookupKeyWrite(receiver
->db
,dstkey
); 
 830             if (dstobj 
&& checkType(receiver
,dstobj
,REDIS_LIST
)) { 
 831                 decrRefCount(dstkey
); 
 833                 rpoplpushHandlePush(receiver
,dstkey
,dstobj
,ele
); 
 834                 decrRefCount(dstkey
); 
 843 int getTimeoutFromObjectOrReply(redisClient 
*c
, robj 
*object
, time_t *timeout
) { 
 846     if (getLongFromObjectOrReply(c
,object
,&tval
, 
 847         "timeout is not an integer or out of range") != REDIS_OK
) 
 851         addReplyError(c
,"timeout is negative"); 
 855     if (tval 
> 0) tval 
+= time(NULL
); 
 861 /* Blocking RPOP/LPOP */ 
 862 void blockingPopGenericCommand(redisClient 
*c
, int where
) { 
 867     if (getTimeoutFromObjectOrReply(c
,c
->argv
[c
->argc
-1],&timeout
) != REDIS_OK
) 
 870     for (j 
= 1; j 
< c
->argc
-1; j
++) { 
 871         o 
= lookupKeyWrite(c
->db
,c
->argv
[j
]); 
 873             if (o
->type 
!= REDIS_LIST
) { 
 874                 addReply(c
,shared
.wrongtypeerr
); 
 877                 if (listTypeLength(o
) != 0) { 
 878                     /* If the list contains elements fall back to the usual 
 879                      * non-blocking POP operation */ 
 880                     robj 
*argv
[2], **orig_argv
; 
 883                     /* We need to alter the command arguments before to call 
 884                      * popGenericCommand() as the command takes a single key. */ 
 887                     argv
[1] = c
->argv
[j
]; 
 891                     /* Also the return value is different, we need to output 
 892                      * the multi bulk reply header and the key name. The 
 893                      * "real" command will add the last element (the value) 
 894                      * for us. If this souds like an hack to you it's just 
 895                      * because it is... */ 
 896                     addReplyMultiBulkLen(c
,2); 
 897                     addReplyBulk(c
,argv
[1]); 
 899                     popGenericCommand(c
,where
); 
 901                     /* Fix the client structure with the original stuff */ 
 911     /* If we are inside a MULTI/EXEC and the list is empty the only thing 
 912      * we can do is treating it as a timeout (even with timeout 0). */ 
 913     if (c
->flags 
& REDIS_MULTI
) { 
 914         addReply(c
,shared
.nullmultibulk
); 
 918     /* If the list is empty or the key does not exists we must block */ 
 919     blockForKeys(c
, c
->argv 
+ 1, c
->argc 
- 2, timeout
, NULL
); 
 922 void blpopCommand(redisClient 
*c
) { 
 923     blockingPopGenericCommand(c
,REDIS_HEAD
); 
 926 void brpopCommand(redisClient 
*c
) { 
 927     blockingPopGenericCommand(c
,REDIS_TAIL
); 
 930 void brpoplpushCommand(redisClient 
*c
) { 
 933     if (getTimeoutFromObjectOrReply(c
,c
->argv
[3],&timeout
) != REDIS_OK
) 
 936     robj 
*key 
= lookupKeyWrite(c
->db
, c
->argv
[1]); 
 939         if (c
->flags 
& REDIS_MULTI
) { 
 941             /* Blocking against an empty list in a multi state 
 942              * returns immediately. */ 
 943             addReply(c
, shared
.nullmultibulk
); 
 945             /* The list is empty and the client blocks. */ 
 946             blockForKeys(c
, c
->argv 
+ 1, 1, timeout
, c
->argv
[2]); 
 949         if (key
->type 
!= REDIS_LIST
) { 
 950             addReply(c
, shared
.wrongtypeerr
); 
 953             /* The list exists and has elements, so 
 954              * the regular rpoplpushCommand is executed. */ 
 955             redisAssert(listTypeLength(key
) > 0);