3 /*----------------------------------------------------------------------------- 
   5  *----------------------------------------------------------------------------*/ 
   7 /* Check the argument length to see if it requires us to convert the ziplist 
   8  * to a real list. Only check raw-encoded objects because integer encoded 
   9  * objects are never too long. */ 
  10 void listTypeTryConversion(robj 
*subject
, robj 
*value
) { 
  11     if (subject
->encoding 
!= REDIS_ENCODING_ZIPLIST
) return; 
  12     if (value
->encoding 
== REDIS_ENCODING_RAW 
&& 
  13         sdslen(value
->ptr
) > server
.list_max_ziplist_value
) 
  14             listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
  17 void listTypePush(robj 
*subject
, robj 
*value
, int where
) { 
  18     /* Check if we need to convert the ziplist */ 
  19     listTypeTryConversion(subject
,value
); 
  20     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST 
&& 
  21         ziplistLen(subject
->ptr
) >= server
.list_max_ziplist_entries
) 
  22             listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
  24     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  25         int pos 
= (where 
== REDIS_HEAD
) ? ZIPLIST_HEAD 
: ZIPLIST_TAIL
; 
  26         value 
= getDecodedObject(value
); 
  27         subject
->ptr 
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),pos
); 
  29     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  30         if (where 
== REDIS_HEAD
) { 
  31             listAddNodeHead(subject
->ptr
,value
); 
  33             listAddNodeTail(subject
->ptr
,value
); 
  37         redisPanic("Unknown list encoding"); 
  41 robj 
*listTypePop(robj 
*subject
, int where
) { 
  43     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  48         int pos 
= (where 
== REDIS_HEAD
) ? 0 : -1; 
  49         p 
= ziplistIndex(subject
->ptr
,pos
); 
  50         if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) { 
  52                 value 
= createStringObject((char*)vstr
,vlen
); 
  54                 value 
= createStringObjectFromLongLong(vlong
); 
  56             /* We only need to delete an element when it exists */ 
  57             subject
->ptr 
= ziplistDelete(subject
->ptr
,&p
); 
  59     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  60         list 
*list 
= subject
->ptr
; 
  62         if (where 
== REDIS_HEAD
) { 
  68             value 
= listNodeValue(ln
); 
  73         redisPanic("Unknown list encoding"); 
  78 unsigned long listTypeLength(robj 
*subject
) { 
  79     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  80         return ziplistLen(subject
->ptr
); 
  81     } else if (subject
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  82         return listLength((list
*)subject
->ptr
); 
  84         redisPanic("Unknown list encoding"); 
  88 /* Initialize an iterator at the specified index. */ 
  89 listTypeIterator 
*listTypeInitIterator(robj 
*subject
, long index
, unsigned char direction
) { 
  90     listTypeIterator 
*li 
= zmalloc(sizeof(listTypeIterator
)); 
  91     li
->subject 
= subject
; 
  92     li
->encoding 
= subject
->encoding
; 
  93     li
->direction 
= direction
; 
  94     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
  95         li
->zi 
= ziplistIndex(subject
->ptr
,index
); 
  96     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
  97         li
->ln 
= listIndex(subject
->ptr
,index
); 
  99         redisPanic("Unknown list encoding"); 
 104 /* Clean up the iterator. */ 
 105 void listTypeReleaseIterator(listTypeIterator 
*li
) { 
 109 /* Stores pointer to current the entry in the provided entry structure 
 110  * and advances the position of the iterator. Returns 1 when the current 
 111  * entry is in fact an entry, 0 otherwise. */ 
 112 int listTypeNext(listTypeIterator 
*li
, listTypeEntry 
*entry
) { 
 113     /* Protect from converting when iterating */ 
 114     redisAssert(li
->subject
->encoding 
== li
->encoding
); 
 117     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 119         if (entry
->zi 
!= NULL
) { 
 120             if (li
->direction 
== REDIS_TAIL
) 
 121                 li
->zi 
= ziplistNext(li
->subject
->ptr
,li
->zi
); 
 123                 li
->zi 
= ziplistPrev(li
->subject
->ptr
,li
->zi
); 
 126     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 128         if (entry
->ln 
!= NULL
) { 
 129             if (li
->direction 
== REDIS_TAIL
) 
 130                 li
->ln 
= li
->ln
->next
; 
 132                 li
->ln 
= li
->ln
->prev
; 
 136         redisPanic("Unknown list encoding"); 
 141 /* Return entry or NULL at the current position of the iterator. */ 
 142 robj 
*listTypeGet(listTypeEntry 
*entry
) { 
 143     listTypeIterator 
*li 
= entry
->li
; 
 145     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 149         redisAssert(entry
->zi 
!= NULL
); 
 150         if (ziplistGet(entry
->zi
,&vstr
,&vlen
,&vlong
)) { 
 152                 value 
= createStringObject((char*)vstr
,vlen
); 
 154                 value 
= createStringObjectFromLongLong(vlong
); 
 157     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 158         redisAssert(entry
->ln 
!= NULL
); 
 159         value 
= listNodeValue(entry
->ln
); 
 162         redisPanic("Unknown list encoding"); 
 167 void listTypeInsert(listTypeEntry 
*entry
, robj 
*value
, int where
) { 
 168     robj 
*subject 
= entry
->li
->subject
; 
 169     if (entry
->li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 170         value 
= getDecodedObject(value
); 
 171         if (where 
== REDIS_TAIL
) { 
 172             unsigned char *next 
= ziplistNext(subject
->ptr
,entry
->zi
); 
 174             /* When we insert after the current element, but the current element 
 175              * is the tail of the list, we need to do a push. */ 
 177                 subject
->ptr 
= ziplistPush(subject
->ptr
,value
->ptr
,sdslen(value
->ptr
),REDIS_TAIL
); 
 179                 subject
->ptr 
= ziplistInsert(subject
->ptr
,next
,value
->ptr
,sdslen(value
->ptr
)); 
 182             subject
->ptr 
= ziplistInsert(subject
->ptr
,entry
->zi
,value
->ptr
,sdslen(value
->ptr
)); 
 185     } else if (entry
->li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 186         if (where 
== REDIS_TAIL
) { 
 187             listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_TAIL
); 
 189             listInsertNode(subject
->ptr
,entry
->ln
,value
,AL_START_HEAD
); 
 193         redisPanic("Unknown list encoding"); 
 197 /* Compare the given object with the entry at the current position. */ 
 198 int listTypeEqual(listTypeEntry 
*entry
, robj 
*o
) { 
 199     listTypeIterator 
*li 
= entry
->li
; 
 200     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 201         redisAssertWithInfo(NULL
,o
,o
->encoding 
== REDIS_ENCODING_RAW
); 
 202         return ziplistCompare(entry
->zi
,o
->ptr
,sdslen(o
->ptr
)); 
 203     } else if (li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 204         return equalStringObjects(o
,listNodeValue(entry
->ln
)); 
 206         redisPanic("Unknown list encoding"); 
 210 /* Delete the element pointed to. */ 
 211 void listTypeDelete(listTypeEntry 
*entry
) { 
 212     listTypeIterator 
*li 
= entry
->li
; 
 213     if (li
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 214         unsigned char *p 
= entry
->zi
; 
 215         li
->subject
->ptr 
= ziplistDelete(li
->subject
->ptr
,&p
); 
 217         /* Update position of the iterator depending on the direction */ 
 218         if (li
->direction 
== REDIS_TAIL
) 
 221             li
->zi 
= ziplistPrev(li
->subject
->ptr
,p
); 
 222     } else if (entry
->li
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 224         if (li
->direction 
== REDIS_TAIL
) 
 225             next 
= entry
->ln
->next
; 
 227             next 
= entry
->ln
->prev
; 
 228         listDelNode(li
->subject
->ptr
,entry
->ln
); 
 231         redisPanic("Unknown list encoding"); 
 235 void listTypeConvert(robj 
*subject
, int enc
) { 
 236     listTypeIterator 
*li
; 
 238     redisAssertWithInfo(NULL
,subject
,subject
->type 
== REDIS_LIST
); 
 240     if (enc 
== REDIS_ENCODING_LINKEDLIST
) { 
 241         list 
*l 
= listCreate(); 
 242         listSetFreeMethod(l
,decrRefCount
); 
 244         /* listTypeGet returns a robj with incremented refcount */ 
 245         li 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 246         while (listTypeNext(li
,&entry
)) listAddNodeTail(l
,listTypeGet(&entry
)); 
 247         listTypeReleaseIterator(li
); 
 249         subject
->encoding 
= REDIS_ENCODING_LINKEDLIST
; 
 253         redisPanic("Unsupported list conversion"); 
 257 /*----------------------------------------------------------------------------- 
 259  *----------------------------------------------------------------------------*/ 
 261 void pushGenericCommand(redisClient 
*c
, int where
) { 
 262     int j
, waiting 
= 0, pushed 
= 0; 
 263     robj 
*lobj 
= lookupKeyWrite(c
->db
,c
->argv
[1]); 
 264     int may_have_waiting_clients 
= (lobj 
== NULL
); 
 266     if (lobj 
&& lobj
->type 
!= REDIS_LIST
) { 
 267         addReply(c
,shared
.wrongtypeerr
); 
 271     for (j 
= 2; j 
< c
->argc
; j
++) { 
 272         c
->argv
[j
] = tryObjectEncoding(c
->argv
[j
]); 
 273         if (may_have_waiting_clients
) { 
 274             if (handleClientsWaitingListPush(c
,c
->argv
[1],c
->argv
[j
])) { 
 278                 may_have_waiting_clients 
= 0; 
 282             lobj 
= createZiplistObject(); 
 283             dbAdd(c
->db
,c
->argv
[1],lobj
); 
 285         listTypePush(lobj
,c
->argv
[j
],where
); 
 288     addReplyLongLong(c
, waiting 
+ (lobj 
? listTypeLength(lobj
) : 0)); 
 289     if (pushed
) signalModifiedKey(c
->db
,c
->argv
[1]); 
 290     server
.dirty 
+= pushed
; 
 292     /* Alter the replication of the command accordingly to the number of 
 293      * list elements delivered to clients waiting into a blocking operation. 
 294      * We do that only if there were waiting clients, and only if still some 
 295      * element was pushed into the list (othewise dirty is 0 and nothign will 
 297     if (waiting 
&& pushed
) { 
 298         /* CMD KEY a b C D E */ 
 299         for (j 
= 0; j 
< waiting
; j
++) decrRefCount(c
->argv
[j
+2]); 
 300         memmove(c
->argv
+2,c
->argv
+2+waiting
,sizeof(robj
*)*pushed
); 
 305 void lpushCommand(redisClient 
*c
) { 
 306     pushGenericCommand(c
,REDIS_HEAD
); 
 309 void rpushCommand(redisClient 
*c
) { 
 310     pushGenericCommand(c
,REDIS_TAIL
); 
 313 void pushxGenericCommand(redisClient 
*c
, robj 
*refval
, robj 
*val
, int where
) { 
 315     listTypeIterator 
*iter
; 
 319     if ((subject 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 320         checkType(c
,subject
,REDIS_LIST
)) return; 
 322     if (refval 
!= NULL
) { 
 323         /* Note: we expect refval to be string-encoded because it is *not* the 
 324          * last argument of the multi-bulk LINSERT. */ 
 325         redisAssertWithInfo(c
,refval
,refval
->encoding 
== REDIS_ENCODING_RAW
); 
 327         /* We're not sure if this value can be inserted yet, but we cannot 
 328          * convert the list inside the iterator. We don't want to loop over 
 329          * the list twice (once to see if the value can be inserted and once 
 330          * to do the actual insert), so we assume this value can be inserted 
 331          * and convert the ziplist to a regular list if necessary. */ 
 332         listTypeTryConversion(subject
,val
); 
 334         /* Seek refval from head to tail */ 
 335         iter 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 336         while (listTypeNext(iter
,&entry
)) { 
 337             if (listTypeEqual(&entry
,refval
)) { 
 338                 listTypeInsert(&entry
,val
,where
); 
 343         listTypeReleaseIterator(iter
); 
 346             /* Check if the length exceeds the ziplist length threshold. */ 
 347             if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST 
&& 
 348                 ziplistLen(subject
->ptr
) > server
.list_max_ziplist_entries
) 
 349                     listTypeConvert(subject
,REDIS_ENCODING_LINKEDLIST
); 
 350             signalModifiedKey(c
->db
,c
->argv
[1]); 
 353             /* Notify client of a failed insert */ 
 354             addReply(c
,shared
.cnegone
); 
 358         listTypePush(subject
,val
,where
); 
 359         signalModifiedKey(c
->db
,c
->argv
[1]); 
 363     addReplyLongLong(c
,listTypeLength(subject
)); 
 366 void lpushxCommand(redisClient 
*c
) { 
 367     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 368     pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_HEAD
); 
 371 void rpushxCommand(redisClient 
*c
) { 
 372     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 373     pushxGenericCommand(c
,NULL
,c
->argv
[2],REDIS_TAIL
); 
 376 void linsertCommand(redisClient 
*c
) { 
 377     c
->argv
[4] = tryObjectEncoding(c
->argv
[4]); 
 378     if (strcasecmp(c
->argv
[2]->ptr
,"after") == 0) { 
 379         pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_TAIL
); 
 380     } else if (strcasecmp(c
->argv
[2]->ptr
,"before") == 0) { 
 381         pushxGenericCommand(c
,c
->argv
[3],c
->argv
[4],REDIS_HEAD
); 
 383         addReply(c
,shared
.syntaxerr
); 
 387 void llenCommand(redisClient 
*c
) { 
 388     robj 
*o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
); 
 389     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 390     addReplyLongLong(c
,listTypeLength(o
)); 
 393 void lindexCommand(redisClient 
*c
) { 
 394     robj 
*o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
); 
 395     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 399     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &index
, NULL
) != REDIS_OK
)) 
 402     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 407         p 
= ziplistIndex(o
->ptr
,index
); 
 408         if (ziplistGet(p
,&vstr
,&vlen
,&vlong
)) { 
 410                 value 
= createStringObject((char*)vstr
,vlen
); 
 412                 value 
= createStringObjectFromLongLong(vlong
); 
 414             addReplyBulk(c
,value
); 
 417             addReply(c
,shared
.nullbulk
); 
 419     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 420         listNode 
*ln 
= listIndex(o
->ptr
,index
); 
 422             value 
= listNodeValue(ln
); 
 423             addReplyBulk(c
,value
); 
 425             addReply(c
,shared
.nullbulk
); 
 428         redisPanic("Unknown list encoding"); 
 432 void lsetCommand(redisClient 
*c
) { 
 433     robj 
*o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nokeyerr
); 
 434     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 436     robj 
*value 
= (c
->argv
[3] = tryObjectEncoding(c
->argv
[3])); 
 438     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &index
, NULL
) != REDIS_OK
)) 
 441     listTypeTryConversion(o
,value
); 
 442     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 443         unsigned char *p
, *zl 
= o
->ptr
; 
 444         p 
= ziplistIndex(zl
,index
); 
 446             addReply(c
,shared
.outofrangeerr
); 
 448             o
->ptr 
= ziplistDelete(o
->ptr
,&p
); 
 449             value 
= getDecodedObject(value
); 
 450             o
->ptr 
= ziplistInsert(o
->ptr
,p
,value
->ptr
,sdslen(value
->ptr
)); 
 452             addReply(c
,shared
.ok
); 
 453             signalModifiedKey(c
->db
,c
->argv
[1]); 
 456     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 457         listNode 
*ln 
= listIndex(o
->ptr
,index
); 
 459             addReply(c
,shared
.outofrangeerr
); 
 461             decrRefCount((robj
*)listNodeValue(ln
)); 
 462             listNodeValue(ln
) = value
; 
 464             addReply(c
,shared
.ok
); 
 465             signalModifiedKey(c
->db
,c
->argv
[1]); 
 469         redisPanic("Unknown list encoding"); 
 473 void popGenericCommand(redisClient 
*c
, int where
) { 
 474     robj 
*o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
); 
 475     if (o 
== NULL 
|| checkType(c
,o
,REDIS_LIST
)) return; 
 477     robj 
*value 
= listTypePop(o
,where
); 
 479         addReply(c
,shared
.nullbulk
); 
 481         addReplyBulk(c
,value
); 
 483         if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 484         signalModifiedKey(c
->db
,c
->argv
[1]); 
 489 void lpopCommand(redisClient 
*c
) { 
 490     popGenericCommand(c
,REDIS_HEAD
); 
 493 void rpopCommand(redisClient 
*c
) { 
 494     popGenericCommand(c
,REDIS_TAIL
); 
 497 void lrangeCommand(redisClient 
*c
) { 
 499     long start
, end
, llen
, rangelen
; 
 501     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
 502         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
 504     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.emptymultibulk
)) == NULL
 
 505          || checkType(c
,o
,REDIS_LIST
)) return; 
 506     llen 
= listTypeLength(o
); 
 508     /* convert negative indexes */ 
 509     if (start 
< 0) start 
= llen
+start
; 
 510     if (end 
< 0) end 
= llen
+end
; 
 511     if (start 
< 0) start 
= 0; 
 513     /* Invariant: start >= 0, so this test will be true when end < 0. 
 514      * The range is empty when start > end or start >= length. */ 
 515     if (start 
> end 
|| start 
>= llen
) { 
 516         addReply(c
,shared
.emptymultibulk
); 
 519     if (end 
>= llen
) end 
= llen
-1; 
 520     rangelen 
= (end
-start
)+1; 
 522     /* Return the result in form of a multi-bulk reply */ 
 523     addReplyMultiBulkLen(c
,rangelen
); 
 524     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 525         unsigned char *p 
= ziplistIndex(o
->ptr
,start
); 
 531             ziplistGet(p
,&vstr
,&vlen
,&vlong
); 
 533                 addReplyBulkCBuffer(c
,vstr
,vlen
); 
 535                 addReplyBulkLongLong(c
,vlong
); 
 537             p 
= ziplistNext(o
->ptr
,p
); 
 539     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 542         /* If we are nearest to the end of the list, reach the element 
 543          * starting from tail and going backward, as it is faster. */ 
 544         if (start 
> llen
/2) start 
-= llen
; 
 545         ln 
= listIndex(o
->ptr
,start
); 
 548             addReplyBulk(c
,ln
->value
); 
 552         redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!"); 
 556 void ltrimCommand(redisClient 
*c
) { 
 558     long start
, end
, llen
, j
, ltrim
, rtrim
; 
 562     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
 563         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
 565     if ((o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.ok
)) == NULL 
|| 
 566         checkType(c
,o
,REDIS_LIST
)) return; 
 567     llen 
= listTypeLength(o
); 
 569     /* convert negative indexes */ 
 570     if (start 
< 0) start 
= llen
+start
; 
 571     if (end 
< 0) end 
= llen
+end
; 
 572     if (start 
< 0) start 
= 0; 
 574     /* Invariant: start >= 0, so this test will be true when end < 0. 
 575      * The range is empty when start > end or start >= length. */ 
 576     if (start 
> end 
|| start 
>= llen
) { 
 577         /* Out of range start or start > end result in empty list */ 
 581         if (end 
>= llen
) end 
= llen
-1; 
 586     /* Remove list elements to perform the trim */ 
 587     if (o
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 588         o
->ptr 
= ziplistDeleteRange(o
->ptr
,0,ltrim
); 
 589         o
->ptr 
= ziplistDeleteRange(o
->ptr
,-rtrim
,rtrim
); 
 590     } else if (o
->encoding 
== REDIS_ENCODING_LINKEDLIST
) { 
 592         for (j 
= 0; j 
< ltrim
; j
++) { 
 593             ln 
= listFirst(list
); 
 594             listDelNode(list
,ln
); 
 596         for (j 
= 0; j 
< rtrim
; j
++) { 
 598             listDelNode(list
,ln
); 
 601         redisPanic("Unknown list encoding"); 
 603     if (listTypeLength(o
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 604     signalModifiedKey(c
->db
,c
->argv
[1]); 
 606     addReply(c
,shared
.ok
); 
 609 void lremCommand(redisClient 
*c
) { 
 611     obj 
= c
->argv
[3] = tryObjectEncoding(c
->argv
[3]); 
 616     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &toremove
, NULL
) != REDIS_OK
)) 
 619     subject 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
); 
 620     if (subject 
== NULL 
|| checkType(c
,subject
,REDIS_LIST
)) return; 
 622     /* Make sure obj is raw when we're dealing with a ziplist */ 
 623     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) 
 624         obj 
= getDecodedObject(obj
); 
 626     listTypeIterator 
*li
; 
 628         toremove 
= -toremove
; 
 629         li 
= listTypeInitIterator(subject
,-1,REDIS_HEAD
); 
 631         li 
= listTypeInitIterator(subject
,0,REDIS_TAIL
); 
 634     while (listTypeNext(li
,&entry
)) { 
 635         if (listTypeEqual(&entry
,obj
)) { 
 636             listTypeDelete(&entry
); 
 639             if (toremove 
&& removed 
== toremove
) break; 
 642     listTypeReleaseIterator(li
); 
 644     /* Clean up raw encoded object */ 
 645     if (subject
->encoding 
== REDIS_ENCODING_ZIPLIST
) 
 648     if (listTypeLength(subject
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 649     addReplyLongLong(c
,removed
); 
 650     if (removed
) signalModifiedKey(c
->db
,c
->argv
[1]); 
 653 /* This is the semantic of this command: 
 654  *  RPOPLPUSH srclist dstlist: 
 655  *    IF LLEN(srclist) > 0 
 656  *      element = RPOP srclist 
 657  *      LPUSH dstlist element 
 664  * The idea is to be able to get an element from a list in a reliable way 
 665  * since the element is not just returned but pushed against another list 
 666  * as well. This command was originally proposed by Ezra Zygmuntowicz. 
 669 void rpoplpushHandlePush(redisClient 
*origclient
, redisClient 
*c
, robj 
*dstkey
, robj 
*dstobj
, robj 
*value
) { 
 670     if (!handleClientsWaitingListPush(origclient
,dstkey
,value
)) { 
 671         /* Create the list if the key does not exist */ 
 673             dstobj 
= createZiplistObject(); 
 674             dbAdd(c
->db
,dstkey
,dstobj
); 
 676             signalModifiedKey(c
->db
,dstkey
); 
 678         listTypePush(dstobj
,value
,REDIS_HEAD
); 
 679         /* Additionally propagate this PUSH operation together with 
 680          * the operation performed by the command. */ 
 682             robj 
**argv 
= zmalloc(sizeof(robj
*)*3); 
 683             argv
[0] = createStringObject("LPUSH",5); 
 686             incrRefCount(argv
[1]); 
 687             incrRefCount(argv
[2]); 
 688             alsoPropagate(server
.lpushCommand
,c
->db
->id
,argv
,3, 
 689                           REDIS_PROPAGATE_AOF
|REDIS_PROPAGATE_REPL
); 
 692     /* Always send the pushed value to the client. */ 
 693     addReplyBulk(c
,value
); 
 696 void rpoplpushCommand(redisClient 
*c
) { 
 698     if ((sobj 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
 699         checkType(c
,sobj
,REDIS_LIST
)) return; 
 701     if (listTypeLength(sobj
) == 0) { 
 702         addReply(c
,shared
.nullbulk
); 
 704         robj 
*dobj 
= lookupKeyWrite(c
->db
,c
->argv
[2]); 
 705         robj 
*touchedkey 
= c
->argv
[1]; 
 707         if (dobj 
&& checkType(c
,dobj
,REDIS_LIST
)) return; 
 708         value 
= listTypePop(sobj
,REDIS_TAIL
); 
 709         /* We saved touched key, and protect it, since rpoplpushHandlePush 
 710          * may change the client command argument vector. */ 
 711         incrRefCount(touchedkey
); 
 712         rpoplpushHandlePush(c
,c
,c
->argv
[2],dobj
,value
); 
 714         /* listTypePop returns an object with its refcount incremented */ 
 717         /* Delete the source list when it is empty */ 
 718         if (listTypeLength(sobj
) == 0) dbDelete(c
->db
,touchedkey
); 
 719         signalModifiedKey(c
->db
,touchedkey
); 
 720         decrRefCount(touchedkey
); 
 723         /* Replicate this as a simple RPOP since the LPUSH side is replicated 
 724          * by rpoplpushHandlePush() call if needed (it may not be needed 
 725          * if a client is blocking wait a push against the list). */ 
 726         rewriteClientCommandVector(c
,2, 
 727             resetRefCount(createStringObject("RPOP",4)), 
 732 /*----------------------------------------------------------------------------- 
 733  * Blocking POP operations 
 734  *----------------------------------------------------------------------------*/ 
 736 /* Currently Redis blocking operations support is limited to list POP ops, 
 737  * so the current implementation is not fully generic, but it is also not 
 738  * completely specific so it will not require a rewrite to support new 
 739  * kind of blocking operations in the future. 
 741  * Still it's important to note that list blocking operations can be already 
 742  * used as a notification mechanism in order to implement other blocking 
 743  * operations at application level, so there must be a very strong evidence 
 744  * of usefulness and generality before new blocking operations are implemented. 
 746  * This is how the current blocking POP works, we use BLPOP as example: 
 747  * - If the user calls BLPOP and the key exists and contains a non empty list 
 748  *   then LPOP is called instead. So BLPOP is semantically the same as LPOP 
 749  *   if there is not to block. 
 750  * - If instead BLPOP is called and the key does not exists or the list is 
 751  *   empty we need to block. In order to do so we remove the notification for 
 752  *   new data to read in the client socket (so that we'll not serve new 
 753  *   requests if the blocking request is not served). Also we put the client 
 754  *   in a dictionary (db->blocking_keys) mapping keys to a list of clients 
 755  *   blocking for this keys. 
 756  * - If a PUSH operation against a key with blocked clients waiting is 
 757  *   performed, we serve the first in the list: basically instead to push 
 758  *   the new element inside the list we return it to the (first / oldest) 
 759  *   blocking client, unblock the client, and remove it form the list. 
 761  * The above comment and the source code should be enough in order to understand 
 762  * the implementation and modify / fix it later. 
 765 /* Set a client in blocking mode for the specified key, with the specified 
 767 void blockForKeys(redisClient 
*c
, robj 
**keys
, int numkeys
, time_t timeout
, robj 
*target
) { 
 772     c
->bpop
.keys 
= zmalloc(sizeof(robj
*)*numkeys
); 
 773     c
->bpop
.count 
= numkeys
; 
 774     c
->bpop
.timeout 
= timeout
; 
 775     c
->bpop
.target 
= target
; 
 777     if (target 
!= NULL
) { 
 778         incrRefCount(target
); 
 781     for (j 
= 0; j 
< numkeys
; j
++) { 
 782         /* Add the key in the client structure, to map clients -> keys */ 
 783         c
->bpop
.keys
[j
] = keys
[j
]; 
 784         incrRefCount(keys
[j
]); 
 786         /* And in the other "side", to map keys -> clients */ 
 787         de 
= dictFind(c
->db
->blocking_keys
,keys
[j
]); 
 791             /* For every key we take a list of clients blocked for it */ 
 793             retval 
= dictAdd(c
->db
->blocking_keys
,keys
[j
],l
); 
 794             incrRefCount(keys
[j
]); 
 795             redisAssertWithInfo(c
,keys
[j
],retval 
== DICT_OK
); 
 799         listAddNodeTail(l
,c
); 
 801     /* Mark the client as a blocked client */ 
 802     c
->flags 
|= REDIS_BLOCKED
; 
 803     server
.bpop_blocked_clients
++; 
 806 /* Unblock a client that's waiting in a blocking operation such as BLPOP */ 
 807 void unblockClientWaitingData(redisClient 
*c
) { 
 812     redisAssertWithInfo(c
,NULL
,c
->bpop
.keys 
!= NULL
); 
 813     /* The client may wait for multiple keys, so unblock it for every key. */ 
 814     for (j 
= 0; j 
< c
->bpop
.count
; j
++) { 
 815         /* Remove this client from the list of clients waiting for this key. */ 
 816         de 
= dictFind(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]); 
 817         redisAssertWithInfo(c
,c
->bpop
.keys
[j
],de 
!= NULL
); 
 819         listDelNode(l
,listSearchKey(l
,c
)); 
 820         /* If the list is empty we need to remove it to avoid wasting memory */ 
 821         if (listLength(l
) == 0) 
 822             dictDelete(c
->db
->blocking_keys
,c
->bpop
.keys
[j
]); 
 823         decrRefCount(c
->bpop
.keys
[j
]); 
 826     /* Cleanup the client structure */ 
 829     if (c
->bpop
.target
) decrRefCount(c
->bpop
.target
); 
 830     c
->bpop
.target 
= NULL
; 
 831     c
->flags 
&= ~REDIS_BLOCKED
; 
 832     c
->flags 
|= REDIS_UNBLOCKED
; 
 833     server
.bpop_blocked_clients
--; 
 834     listAddNodeTail(server
.unblocked_clients
,c
); 
 837 /* This should be called from any function PUSHing into lists. 
 838  * 'c' is the "pushing client", 'key' is the key it is pushing data against, 
 839  * 'ele' is the element pushed. 
 841  * If the function returns 0 there was no client waiting for a list push 
 844  * If the function returns 1 there was a client waiting for a list push 
 845  * against this key, the element was passed to this client thus it's not 
 846  * needed to actually add it to the list and the caller should return asap. */ 
 847 int handleClientsWaitingListPush(redisClient 
*c
, robj 
*key
, robj 
*ele
) { 
 848     struct dictEntry 
*de
; 
 849     redisClient 
*receiver
; 
 853     robj 
*dstkey
, *dstobj
; 
 855     de 
= dictFind(c
->db
->blocking_keys
,key
); 
 856     if (de 
== NULL
) return 0; 
 857     clients 
= dictGetVal(de
); 
 858     numclients 
= listLength(clients
); 
 860     /* Try to handle the push as long as there are clients waiting for a push. 
 861      * Note that "numclients" is used because the list of clients waiting for a 
 862      * push on "key" is deleted by unblockClient() when empty. 
 864      * This loop will have more than 1 iteration when there is a BRPOPLPUSH 
 865      * that cannot push the target list because it does not contain a list. If 
 866      * this happens, it simply tries the next client waiting for a push. */ 
 867     while (numclients
--) { 
 868         ln 
= listFirst(clients
); 
 869         redisAssertWithInfo(c
,key
,ln 
!= NULL
); 
 870         receiver 
= ln
->value
; 
 871         dstkey 
= receiver
->bpop
.target
; 
 873         /* Protect receiver->bpop.target, that will be freed by 
 874          * the next unblockClientWaitingData() call. */ 
 875         if (dstkey
) incrRefCount(dstkey
); 
 877         /* This should remove the first element of the "clients" list. */ 
 878         unblockClientWaitingData(receiver
); 
 880         if (dstkey 
== NULL
) { 
 882             addReplyMultiBulkLen(receiver
,2); 
 883             addReplyBulk(receiver
,key
); 
 884             addReplyBulk(receiver
,ele
); 
 885             return 1; /* Serve just the first client as in B[RL]POP semantics */ 
 887             /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */ 
 888             dstobj 
= lookupKeyWrite(receiver
->db
,dstkey
); 
 889             if (!(dstobj 
&& checkType(receiver
,dstobj
,REDIS_LIST
))) { 
 890                 rpoplpushHandlePush(c
,receiver
,dstkey
,dstobj
,ele
); 
 891                 decrRefCount(dstkey
); 
 894             decrRefCount(dstkey
); 
 901 int getTimeoutFromObjectOrReply(redisClient 
*c
, robj 
*object
, time_t *timeout
) { 
 904     if (getLongFromObjectOrReply(c
,object
,&tval
, 
 905         "timeout is not an integer or out of range") != REDIS_OK
) 
 909         addReplyError(c
,"timeout is negative"); 
 913     if (tval 
> 0) tval 
+= time(NULL
); 
 919 /* Blocking RPOP/LPOP */ 
 920 void blockingPopGenericCommand(redisClient 
*c
, int where
) { 
 925     if (getTimeoutFromObjectOrReply(c
,c
->argv
[c
->argc
-1],&timeout
) != REDIS_OK
) 
 928     for (j 
= 1; j 
< c
->argc
-1; j
++) { 
 929         o 
= lookupKeyWrite(c
->db
,c
->argv
[j
]); 
 931             if (o
->type 
!= REDIS_LIST
) { 
 932                 addReply(c
,shared
.wrongtypeerr
); 
 935                 if (listTypeLength(o
) != 0) { 
 936                     /* If the list contains elements fall back to the usual 
 937                      * non-blocking POP operation */ 
 938                     struct redisCommand 
*orig_cmd
; 
 939                     robj 
*argv
[2], **orig_argv
; 
 942                     /* We need to alter the command arguments before to call 
 943                      * popGenericCommand() as the command takes a single key. */ 
 947                     argv
[1] = c
->argv
[j
]; 
 951                     /* Also the return value is different, we need to output 
 952                      * the multi bulk reply header and the key name. The 
 953                      * "real" command will add the last element (the value) 
 954                      * for us. If this souds like an hack to you it's just 
 955                      * because it is... */ 
 956                     addReplyMultiBulkLen(c
,2); 
 957                     addReplyBulk(c
,argv
[1]); 
 959                     popGenericCommand(c
,where
); 
 961                     /* Fix the client structure with the original stuff */ 
 972     /* If we are inside a MULTI/EXEC and the list is empty the only thing 
 973      * we can do is treating it as a timeout (even with timeout 0). */ 
 974     if (c
->flags 
& REDIS_MULTI
) { 
 975         addReply(c
,shared
.nullmultibulk
); 
 979     /* If the list is empty or the key does not exists we must block */ 
 980     blockForKeys(c
, c
->argv 
+ 1, c
->argc 
- 2, timeout
, NULL
); 
 983 void blpopCommand(redisClient 
*c
) { 
 984     blockingPopGenericCommand(c
,REDIS_HEAD
); 
 987 void brpopCommand(redisClient 
*c
) { 
 988     blockingPopGenericCommand(c
,REDIS_TAIL
); 
 991 void brpoplpushCommand(redisClient 
*c
) { 
 994     if (getTimeoutFromObjectOrReply(c
,c
->argv
[3],&timeout
) != REDIS_OK
) 
 997     robj 
*key 
= lookupKeyWrite(c
->db
, c
->argv
[1]); 
1000         if (c
->flags 
& REDIS_MULTI
) { 
1002             /* Blocking against an empty list in a multi state 
1003              * returns immediately. */ 
1004             addReply(c
, shared
.nullbulk
); 
1006             /* The list is empty and the client blocks. */ 
1007             blockForKeys(c
, c
->argv 
+ 1, 1, timeout
, c
->argv
[2]); 
1010         if (key
->type 
!= REDIS_LIST
) { 
1011             addReply(c
, shared
.wrongtypeerr
); 
1014             /* The list exists and has elements, so 
1015              * the regular rpoplpushCommand is executed. */ 
1016             redisAssertWithInfo(c
,key
,listTypeLength(key
) > 0); 
1017             rpoplpushCommand(c
);