5 /*----------------------------------------------------------------------------- 
   7  *----------------------------------------------------------------------------*/ 
   9 /* ZSETs are ordered sets using two data structures to hold the same elements 
  10  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted 
  13  * The elements are added to an hash table mapping Redis objects to scores. 
  14  * At the same time the elements are added to a skip list mapping scores 
  15  * to Redis objects (so objects are sorted by scores in this "view"). */ 
  17 /* This skiplist implementation is almost a C translation of the original 
  18  * algorithm described by William Pugh in "Skip Lists: A Probabilistic 
  19  * Alternative to Balanced Trees", modified in three ways: 
  20  * a) this implementation allows for repeated values. 
  21  * b) the comparison is not just by key (our 'score') but by satellite data. 
  22  * c) there is a back pointer, so it's a doubly linked list with the back 
  23  * pointers being only at "level 1". This allows to traverse the list 
  24  * from tail to head, useful for ZREVRANGE. */ 
  26 zskiplistNode 
*zslCreateNode(int level
, double score
, robj 
*obj
) { 
  27     zskiplistNode 
*zn 
= zmalloc(sizeof(*zn
)+level
*sizeof(struct zskiplistLevel
)); 
  33 zskiplist 
*zslCreate(void) { 
  37     zsl 
= zmalloc(sizeof(*zsl
)); 
  40     zsl
->header 
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
); 
  41     for (j 
= 0; j 
< ZSKIPLIST_MAXLEVEL
; j
++) { 
  42         zsl
->header
->level
[j
].forward 
= NULL
; 
  43         zsl
->header
->level
[j
].span 
= 0; 
  45     zsl
->header
->backward 
= NULL
; 
  50 void zslFreeNode(zskiplistNode 
*node
) { 
  51     decrRefCount(node
->obj
); 
  55 void zslFree(zskiplist 
*zsl
) { 
  56     zskiplistNode 
*node 
= zsl
->header
->level
[0].forward
, *next
; 
  60         next 
= node
->level
[0].forward
; 
  67 int zslRandomLevel(void) { 
  69     while ((random()&0xFFFF) < (ZSKIPLIST_P 
* 0xFFFF)) 
  71     return (level
<ZSKIPLIST_MAXLEVEL
) ? level 
: ZSKIPLIST_MAXLEVEL
; 
  74 zskiplistNode 
*zslInsert(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
  75     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
  76     unsigned int rank
[ZSKIPLIST_MAXLEVEL
]; 
  80     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
  81         /* store rank that is crossed to reach the insert position */ 
  82         rank
[i
] = i 
== (zsl
->level
-1) ? 0 : rank
[i
+1]; 
  83         while (x
->level
[i
].forward 
&& 
  84             (x
->level
[i
].forward
->score 
< score 
|| 
  85                 (x
->level
[i
].forward
->score 
== score 
&& 
  86                 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) { 
  87             rank
[i
] += x
->level
[i
].span
; 
  88             x 
= x
->level
[i
].forward
; 
  92     /* we assume the key is not already inside, since we allow duplicated 
  93      * scores, and the re-insertion of score and redis object should never 
  94      * happpen since the caller of zslInsert() should test in the hash table 
  95      * if the element is already inside or not. */ 
  96     level 
= zslRandomLevel(); 
  97     if (level 
> zsl
->level
) { 
  98         for (i 
= zsl
->level
; i 
< level
; i
++) { 
 100             update
[i
] = zsl
->header
; 
 101             update
[i
]->level
[i
].span 
= zsl
->length
; 
 105     x 
= zslCreateNode(level
,score
,obj
); 
 106     for (i 
= 0; i 
< level
; i
++) { 
 107         x
->level
[i
].forward 
= update
[i
]->level
[i
].forward
; 
 108         update
[i
]->level
[i
].forward 
= x
; 
 110         /* update span covered by update[i] as x is inserted here */ 
 111         x
->level
[i
].span 
= update
[i
]->level
[i
].span 
- (rank
[0] - rank
[i
]); 
 112         update
[i
]->level
[i
].span 
= (rank
[0] - rank
[i
]) + 1; 
 115     /* increment span for untouched levels */ 
 116     for (i 
= level
; i 
< zsl
->level
; i
++) { 
 117         update
[i
]->level
[i
].span
++; 
 120     x
->backward 
= (update
[0] == zsl
->header
) ? NULL 
: update
[0]; 
 121     if (x
->level
[0].forward
) 
 122         x
->level
[0].forward
->backward 
= x
; 
 129 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */ 
 130 void zslDeleteNode(zskiplist 
*zsl
, zskiplistNode 
*x
, zskiplistNode 
**update
) { 
 132     for (i 
= 0; i 
< zsl
->level
; i
++) { 
 133         if (update
[i
]->level
[i
].forward 
== x
) { 
 134             update
[i
]->level
[i
].span 
+= x
->level
[i
].span 
- 1; 
 135             update
[i
]->level
[i
].forward 
= x
->level
[i
].forward
; 
 137             update
[i
]->level
[i
].span 
-= 1; 
 140     if (x
->level
[0].forward
) { 
 141         x
->level
[0].forward
->backward 
= x
->backward
; 
 143         zsl
->tail 
= x
->backward
; 
 145     while(zsl
->level 
> 1 && zsl
->header
->level
[zsl
->level
-1].forward 
== NULL
) 
 150 /* Delete an element with matching score/object from the skiplist. */ 
 151 int zslDelete(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
 152     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 156     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 157         while (x
->level
[i
].forward 
&& 
 158             (x
->level
[i
].forward
->score 
< score 
|| 
 159                 (x
->level
[i
].forward
->score 
== score 
&& 
 160                 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) 
 161             x 
= x
->level
[i
].forward
; 
 164     /* We may have multiple elements with the same score, what we need 
 165      * is to find the element with both the right score and object. */ 
 166     x 
= x
->level
[0].forward
; 
 167     if (x 
&& score 
== x
->score 
&& equalStringObjects(x
->obj
,obj
)) { 
 168         zslDeleteNode(zsl
, x
, update
); 
 172         return 0; /* not found */ 
 174     return 0; /* not found */ 
 177 /* Struct to hold a inclusive/exclusive range spec. */ 
 180     int minex
, maxex
; /* are min or max exclusive? */ 
 183 static int zslValueGteMin(double value
, zrangespec 
*spec
) { 
 184     return spec
->minex 
? (value 
> spec
->min
) : (value 
>= spec
->min
); 
 187 static int zslValueLteMax(double value
, zrangespec 
*spec
) { 
 188     return spec
->maxex 
? (value 
< spec
->max
) : (value 
<= spec
->max
); 
 191 static int zslValueInRange(double value
, zrangespec 
*spec
) { 
 192     return zslValueGteMin(value
,spec
) && zslValueLteMax(value
,spec
); 
 195 /* Returns if there is a part of the zset is in range. */ 
 196 int zslIsInRange(zskiplist 
*zsl
, zrangespec 
*range
) { 
 199     /* Test for ranges that will always be empty. */ 
 200     if (range
->min 
> range
->max 
|| 
 201             (range
->min 
== range
->max 
&& (range
->minex 
|| range
->maxex
))) 
 204     if (x 
== NULL 
|| !zslValueGteMin(x
->score
,range
)) 
 206     x 
= zsl
->header
->level
[0].forward
; 
 207     if (x 
== NULL 
|| !zslValueLteMax(x
->score
,range
)) 
 212 /* Find the first node that is contained in the specified range. 
 213  * Returns NULL when no element is contained in the range. */ 
 214 zskiplistNode 
*zslFirstInRange(zskiplist 
*zsl
, zrangespec range
) { 
 218     /* If everything is out of range, return early. */ 
 219     if (!zslIsInRange(zsl
,&range
)) return NULL
; 
 222     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 223         /* Go forward while *OUT* of range. */ 
 224         while (x
->level
[i
].forward 
&& 
 225             !zslValueGteMin(x
->level
[i
].forward
->score
,&range
)) 
 226                 x 
= x
->level
[i
].forward
; 
 229     /* The tail is in range, so the previous block should always return a 
 230      * node that is non-NULL and the last one to be out of range. */ 
 231     x 
= x
->level
[0].forward
; 
 232     redisAssert(x 
!= NULL 
&& zslValueInRange(x
->score
,&range
)); 
 236 /* Find the last node that is contained in the specified range. 
 237  * Returns NULL when no element is contained in the range. */ 
 238 zskiplistNode 
*zslLastInRange(zskiplist 
*zsl
, zrangespec range
) { 
 242     /* If everything is out of range, return early. */ 
 243     if (!zslIsInRange(zsl
,&range
)) return NULL
; 
 246     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 247         /* Go forward while *IN* range. */ 
 248         while (x
->level
[i
].forward 
&& 
 249             zslValueLteMax(x
->level
[i
].forward
->score
,&range
)) 
 250                 x 
= x
->level
[i
].forward
; 
 253     /* The header is in range, so the previous block should always return a 
 254      * node that is non-NULL and in range. */ 
 255     redisAssert(x 
!= NULL 
&& zslValueInRange(x
->score
,&range
)); 
 259 /* Delete all the elements with score between min and max from the skiplist. 
 260  * Min and mx are inclusive, so a score >= min || score <= max is deleted. 
 261  * Note that this function takes the reference to the hash table view of the 
 262  * sorted set, in order to remove the elements from the hash table too. */ 
 263 unsigned long zslDeleteRangeByScore(zskiplist 
*zsl
, zrangespec range
, dict 
*dict
) { 
 264     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 265     unsigned long removed 
= 0; 
 269     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 270         while (x
->level
[i
].forward 
&& (range
.minex 
? 
 271             x
->level
[i
].forward
->score 
<= range
.min 
: 
 272             x
->level
[i
].forward
->score 
< range
.min
)) 
 273                 x 
= x
->level
[i
].forward
; 
 277     /* Current node is the last with score < or <= min. */ 
 278     x 
= x
->level
[0].forward
; 
 280     /* Delete nodes while in range. */ 
 281     while (x 
&& (range
.maxex 
? x
->score 
< range
.max 
: x
->score 
<= range
.max
)) { 
 282         zskiplistNode 
*next 
= x
->level
[0].forward
; 
 283         zslDeleteNode(zsl
,x
,update
); 
 284         dictDelete(dict
,x
->obj
); 
 292 /* Delete all the elements with rank between start and end from the skiplist. 
 293  * Start and end are inclusive. Note that start and end need to be 1-based */ 
 294 unsigned long zslDeleteRangeByRank(zskiplist 
*zsl
, unsigned int start
, unsigned int end
, dict 
*dict
) { 
 295     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 296     unsigned long traversed 
= 0, removed 
= 0; 
 300     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 301         while (x
->level
[i
].forward 
&& (traversed 
+ x
->level
[i
].span
) < start
) { 
 302             traversed 
+= x
->level
[i
].span
; 
 303             x 
= x
->level
[i
].forward
; 
 309     x 
= x
->level
[0].forward
; 
 310     while (x 
&& traversed 
<= end
) { 
 311         zskiplistNode 
*next 
= x
->level
[0].forward
; 
 312         zslDeleteNode(zsl
,x
,update
); 
 313         dictDelete(dict
,x
->obj
); 
 322 /* Find the rank for an element by both score and key. 
 323  * Returns 0 when the element cannot be found, rank otherwise. 
 324  * Note that the rank is 1-based due to the span of zsl->header to the 
 326 unsigned long zslGetRank(zskiplist 
*zsl
, double score
, robj 
*o
) { 
 328     unsigned long rank 
= 0; 
 332     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 333         while (x
->level
[i
].forward 
&& 
 334             (x
->level
[i
].forward
->score 
< score 
|| 
 335                 (x
->level
[i
].forward
->score 
== score 
&& 
 336                 compareStringObjects(x
->level
[i
].forward
->obj
,o
) <= 0))) { 
 337             rank 
+= x
->level
[i
].span
; 
 338             x 
= x
->level
[i
].forward
; 
 341         /* x might be equal to zsl->header, so test if obj is non-NULL */ 
 342         if (x
->obj 
&& equalStringObjects(x
->obj
,o
)) { 
 349 /* Finds an element by its rank. The rank argument needs to be 1-based. */ 
 350 zskiplistNode
* zslGetElementByRank(zskiplist 
*zsl
, unsigned long rank
) { 
 352     unsigned long traversed 
= 0; 
 356     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 357         while (x
->level
[i
].forward 
&& (traversed 
+ x
->level
[i
].span
) <= rank
) 
 359             traversed 
+= x
->level
[i
].span
; 
 360             x 
= x
->level
[i
].forward
; 
 362         if (traversed 
== rank
) { 
 369 /* Populate the rangespec according to the objects min and max. */ 
 370 static int zslParseRange(robj 
*min
, robj 
*max
, zrangespec 
*spec
) { 
 372     spec
->minex 
= spec
->maxex 
= 0; 
 374     /* Parse the min-max interval. If one of the values is prefixed 
 375      * by the "(" character, it's considered "open". For instance 
 376      * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max 
 377      * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ 
 378     if (min
->encoding 
== REDIS_ENCODING_INT
) { 
 379         spec
->min 
= (long)min
->ptr
; 
 381         if (((char*)min
->ptr
)[0] == '(') { 
 382             spec
->min 
= strtod((char*)min
->ptr
+1,&eptr
); 
 383             if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
; 
 386             spec
->min 
= strtod((char*)min
->ptr
,&eptr
); 
 387             if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
; 
 390     if (max
->encoding 
== REDIS_ENCODING_INT
) { 
 391         spec
->max 
= (long)max
->ptr
; 
 393         if (((char*)max
->ptr
)[0] == '(') { 
 394             spec
->max 
= strtod((char*)max
->ptr
+1,&eptr
); 
 395             if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
; 
 398             spec
->max 
= strtod((char*)max
->ptr
,&eptr
); 
 399             if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
; 
 406 /*----------------------------------------------------------------------------- 
 407  * Ziplist-backed sorted set API 
 408  *----------------------------------------------------------------------------*/ 
 410 double zzlGetScore(unsigned char *sptr
) { 
 417     redisAssert(sptr 
!= NULL
); 
 418     redisAssert(ziplistGet(sptr
,&vstr
,&vlen
,&vlong
)); 
 421         memcpy(buf
,vstr
,vlen
); 
 423         score 
= strtod(buf
,NULL
); 
 431 /* Compare element in sorted set with given element. */ 
 432 int zzlCompareElements(unsigned char *eptr
, unsigned char *cstr
, unsigned int clen
) { 
 436     unsigned char vbuf
[32]; 
 439     redisAssert(ziplistGet(eptr
,&vstr
,&vlen
,&vlong
)); 
 441         /* Store string representation of long long in buf. */ 
 442         vlen 
= ll2string((char*)vbuf
,sizeof(vbuf
),vlong
); 
 446     minlen 
= (vlen 
< clen
) ? vlen 
: clen
; 
 447     cmp 
= memcmp(vstr
,cstr
,minlen
); 
 448     if (cmp 
== 0) return vlen
-clen
; 
 452 unsigned char *zzlFind(robj 
*zobj
, robj 
*ele
, double *score
) { 
 453     unsigned char *zl 
= zobj
->ptr
; 
 454     unsigned char *eptr 
= ziplistIndex(zl
,0), *sptr
; 
 456     ele 
= getDecodedObject(ele
); 
 457     while (eptr 
!= NULL
) { 
 458         sptr 
= ziplistNext(zl
,eptr
); 
 459         redisAssert(sptr 
!= NULL
); 
 461         if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
))) { 
 462             /* Matching element, pull out score. */ 
 463             *score 
= zzlGetScore(sptr
); 
 468         /* Move to next element. */ 
 469         eptr 
= ziplistNext(zl
,sptr
); 
 476 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we 
 477  * don't want to modify the one given as argument. */ 
 478 int zzlDelete(robj 
*zobj
, unsigned char *eptr
) { 
 479     unsigned char *zl 
= zobj
->ptr
; 
 480     unsigned char *p 
= eptr
; 
 482     /* TODO: add function to ziplist API to delete N elements from offset. */ 
 483     zl 
= ziplistDelete(zl
,&p
); 
 484     zl 
= ziplistDelete(zl
,&p
); 
 489 int zzlInsertAt(robj 
*zobj
, robj 
*ele
, double score
, unsigned char *eptr
) { 
 490     unsigned char *zl 
= zobj
->ptr
; 
 496     redisAssert(ele
->encoding 
== REDIS_ENCODING_RAW
); 
 497     scorelen 
= d2string(scorebuf
,sizeof(scorebuf
),score
); 
 499         zl 
= ziplistPush(zl
,ele
->ptr
,sdslen(ele
->ptr
),ZIPLIST_TAIL
); 
 500         zl 
= ziplistPush(zl
,(unsigned char*)scorebuf
,scorelen
,ZIPLIST_TAIL
); 
 502         /* Keep offset relative to zl, as it might be re-allocated. */ 
 504         zl 
= ziplistInsert(zl
,eptr
,ele
->ptr
,sdslen(ele
->ptr
)); 
 507         /* Insert score after the element. */ 
 508         redisAssert((sptr 
= ziplistNext(zl
,eptr
)) != NULL
); 
 509         zl 
= ziplistInsert(zl
,sptr
,(unsigned char*)scorebuf
,scorelen
); 
 516 /* Insert (element,score) pair in ziplist. This function assumes the element is 
 517  * not yet present in the list. */ 
 518 int zzlInsert(robj 
*zobj
, robj 
*ele
, double score
) { 
 519     unsigned char *zl 
= zobj
->ptr
; 
 520     unsigned char *eptr 
= ziplistIndex(zl
,0), *sptr
; 
 524     ele 
= getDecodedObject(ele
); 
 525     while (eptr 
!= NULL
) { 
 526         sptr 
= ziplistNext(zl
,eptr
); 
 527         redisAssert(sptr 
!= NULL
); 
 528         s 
= zzlGetScore(sptr
); 
 531             /* First element with score larger than score for element to be 
 532              * inserted. This means we should take its spot in the list to 
 533              * maintain ordering. */ 
 535         } else if (s 
== score
) { 
 536             /* Ensure lexicographical ordering for elements. */ 
 537             if (zzlCompareElements(eptr
,ele
->ptr
,sdslen(ele
->ptr
)) < 0) 
 542             zzlInsertAt(zobj
,ele
,score
,eptr
); 
 546         /* Move to next element. */ 
 547         eptr 
= ziplistNext(zl
,sptr
); 
 550     /* Push on tail of list when it was not yet inserted. */ 
 552         zzlInsertAt(zobj
,ele
,score
,eptr
); 
 558 /*----------------------------------------------------------------------------- 
 559  * Sorted set commands  
 560  *----------------------------------------------------------------------------*/ 
 562 /* This generic command implements both ZADD and ZINCRBY. */ 
 563 void zaddGenericCommand(redisClient 
*c
, int incr
) { 
 564     static char *nanerr 
= "resulting score is not a number (NaN)"; 
 565     robj 
*key 
= c
->argv
[1]; 
 569     double score
, curscore 
= 0.0; 
 571     if (getDoubleFromObjectOrReply(c
,c
->argv
[2],&score
,NULL
) != REDIS_OK
) 
 574     zobj 
= lookupKeyWrite(c
->db
,key
); 
 576         zobj 
= createZsetZiplistObject(); 
 577         dbAdd(c
->db
,key
,zobj
); 
 579         if (zobj
->type 
!= REDIS_ZSET
) { 
 580             addReply(c
,shared
.wrongtypeerr
); 
 585     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 588         /* Prefer non-encoded element when dealing with ziplists. */ 
 590         if ((eptr 
= zzlFind(zobj
,ele
,&curscore
)) != NULL
) { 
 594                     addReplyError(c
,nanerr
); 
 595                     /* Don't need to check if the sorted set is empty, because 
 596                      * we know it has at least one element. */ 
 601             /* Remove and re-insert when score changed. */ 
 602             if (score 
!= curscore
) { 
 603                 redisAssert(zzlDelete(zobj
,eptr
) == REDIS_OK
); 
 604                 redisAssert(zzlInsert(zobj
,ele
,score
) == REDIS_OK
); 
 606                 signalModifiedKey(c
->db
,key
); 
 610             if (incr
) /* ZINCRBY */ 
 611                 addReplyDouble(c
,score
); 
 613                 addReply(c
,shared
.czero
); 
 615             redisAssert(zzlInsert(zobj
,ele
,score
) == REDIS_OK
); 
 617             signalModifiedKey(c
->db
,key
); 
 620             if (incr
) /* ZINCRBY */ 
 621                 addReplyDouble(c
,score
); 
 623                 addReply(c
,shared
.cone
); 
 625     } else if (zobj
->encoding 
== REDIS_ENCODING_RAW
) { 
 626         zset 
*zs 
= zobj
->ptr
; 
 627         zskiplistNode 
*znode
; 
 630         ele 
= c
->argv
[3] = tryObjectEncoding(c
->argv
[3]); 
 631         de 
= dictFind(zs
->dict
,ele
); 
 633             curobj 
= dictGetEntryKey(de
); 
 634             curscore 
= *(double*)dictGetEntryVal(de
); 
 639                     addReplyError(c
,nanerr
); 
 640                     /* Don't need to check if the sorted set is empty, because 
 641                      * we know it has at least one element. */ 
 646             /* Remove and re-insert when score changed. We can safely delete 
 647              * the key object from the skiplist, since the dictionary still has 
 648              * a reference to it. */ 
 649             if (score 
!= curscore
) { 
 650                 redisAssert(zslDelete(zs
->zsl
,curscore
,curobj
)); 
 651                 znode 
= zslInsert(zs
->zsl
,score
,curobj
); 
 652                 incrRefCount(curobj
); /* Re-inserted in skiplist. */ 
 653                 dictGetEntryVal(de
) = &znode
->score
; /* Update score ptr. */ 
 655                 signalModifiedKey(c
->db
,key
); 
 659             if (incr
) /* ZINCRBY */ 
 660                 addReplyDouble(c
,score
); 
 662                 addReply(c
,shared
.czero
); 
 664             znode 
= zslInsert(zs
->zsl
,score
,ele
); 
 665             incrRefCount(ele
); /* Inserted in skiplist. */ 
 666             redisAssert(dictAdd(zs
->dict
,ele
,&znode
->score
) == DICT_OK
); 
 667             incrRefCount(ele
); /* Added to dictionary. */ 
 669             signalModifiedKey(c
->db
,key
); 
 672             if (incr
) /* ZINCRBY */ 
 673                 addReplyDouble(c
,score
); 
 675                 addReply(c
,shared
.cone
); 
 678         redisPanic("Unknown sorted set encoding"); 
 682 void zaddCommand(redisClient 
*c
) { 
 683     zaddGenericCommand(c
,0); 
 686 void zincrbyCommand(redisClient 
*c
) { 
 687     zaddGenericCommand(c
,1); 
 690 void zremCommand(redisClient 
*c
) { 
 697     if ((zsetobj 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 698         checkType(c
,zsetobj
,REDIS_ZSET
)) return; 
 701     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 702     de 
= dictFind(zs
->dict
,c
->argv
[2]); 
 704         addReply(c
,shared
.czero
); 
 707     /* Delete from the skiplist */ 
 708     curscore 
= *(double*)dictGetEntryVal(de
); 
 709     deleted 
= zslDelete(zs
->zsl
,curscore
,c
->argv
[2]); 
 710     redisAssert(deleted 
!= 0); 
 712     /* Delete from the hash table */ 
 713     dictDelete(zs
->dict
,c
->argv
[2]); 
 714     if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
 715     if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 716     signalModifiedKey(c
->db
,c
->argv
[1]); 
 718     addReply(c
,shared
.cone
); 
 721 void zremrangebyscoreCommand(redisClient 
*c
) { 
 727     /* Parse the range arguments. */ 
 728     if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) { 
 729         addReplyError(c
,"min or max is not a double"); 
 733     if ((o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 734         checkType(c
,o
,REDIS_ZSET
)) return; 
 737     deleted 
= zslDeleteRangeByScore(zs
->zsl
,range
,zs
->dict
); 
 738     if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
 739     if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 740     if (deleted
) signalModifiedKey(c
->db
,c
->argv
[1]); 
 741     server
.dirty 
+= deleted
; 
 742     addReplyLongLong(c
,deleted
); 
 745 void zremrangebyrankCommand(redisClient 
*c
) { 
 753     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
 754         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
 756     if ((zsetobj 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 757         checkType(c
,zsetobj
,REDIS_ZSET
)) return; 
 759     llen 
= zs
->zsl
->length
; 
 761     /* convert negative indexes */ 
 762     if (start 
< 0) start 
= llen
+start
; 
 763     if (end 
< 0) end 
= llen
+end
; 
 764     if (start 
< 0) start 
= 0; 
 766     /* Invariant: start >= 0, so this test will be true when end < 0. 
 767      * The range is empty when start > end or start >= length. */ 
 768     if (start 
> end 
|| start 
>= llen
) { 
 769         addReply(c
,shared
.czero
); 
 772     if (end 
>= llen
) end 
= llen
-1; 
 774     /* increment start and end because zsl*Rank functions 
 775      * use 1-based rank */ 
 776     deleted 
= zslDeleteRangeByRank(zs
->zsl
,start
+1,end
+1,zs
->dict
); 
 777     if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
 778     if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 779     if (deleted
) signalModifiedKey(c
->db
,c
->argv
[1]); 
 780     server
.dirty 
+= deleted
; 
 781     addReplyLongLong(c
, deleted
); 
 789 int qsortCompareZsetopsrcByCardinality(const void *s1
, const void *s2
) { 
 790     zsetopsrc 
*d1 
= (void*) s1
, *d2 
= (void*) s2
; 
 791     unsigned long size1
, size2
; 
 792     size1 
= d1
->dict 
? dictSize(d1
->dict
) : 0; 
 793     size2 
= d2
->dict 
? dictSize(d2
->dict
) : 0; 
 794     return size1 
- size2
; 
 797 #define REDIS_AGGR_SUM 1 
 798 #define REDIS_AGGR_MIN 2 
 799 #define REDIS_AGGR_MAX 3 
 800 #define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e)) 
 802 inline static void zunionInterAggregate(double *target
, double val
, int aggregate
) { 
 803     if (aggregate 
== REDIS_AGGR_SUM
) { 
 804         *target 
= *target 
+ val
; 
 805         /* The result of adding two doubles is NaN when one variable 
 806          * is +inf and the other is -inf. When these numbers are added, 
 807          * we maintain the convention of the result being 0.0. */ 
 808         if (isnan(*target
)) *target 
= 0.0; 
 809     } else if (aggregate 
== REDIS_AGGR_MIN
) { 
 810         *target 
= val 
< *target 
? val 
: *target
; 
 811     } else if (aggregate 
== REDIS_AGGR_MAX
) { 
 812         *target 
= val 
> *target 
? val 
: *target
; 
 815         redisPanic("Unknown ZUNION/INTER aggregate type"); 
 819 void zunionInterGenericCommand(redisClient 
*c
, robj 
*dstkey
, int op
) { 
 821     int aggregate 
= REDIS_AGGR_SUM
; 
 825     zskiplistNode 
*znode
; 
 830     /* expect setnum input keys to be given */ 
 831     setnum 
= atoi(c
->argv
[2]->ptr
); 
 834             "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); 
 838     /* test if the expected number of keys would overflow */ 
 839     if (3+setnum 
> c
->argc
) { 
 840         addReply(c
,shared
.syntaxerr
); 
 844     /* read keys to be used for input */ 
 845     src 
= zmalloc(sizeof(zsetopsrc
) * setnum
); 
 846     for (i 
= 0, j 
= 3; i 
< setnum
; i
++, j
++) { 
 847         robj 
*obj 
= lookupKeyWrite(c
->db
,c
->argv
[j
]); 
 851             if (obj
->type 
== REDIS_ZSET
) { 
 852                 src
[i
].dict 
= ((zset
*)obj
->ptr
)->dict
; 
 853             } else if (obj
->type 
== REDIS_SET
) { 
 854                 src
[i
].dict 
= (obj
->ptr
); 
 857                 addReply(c
,shared
.wrongtypeerr
); 
 862         /* default all weights to 1 */ 
 866     /* parse optional extra arguments */ 
 868         int remaining 
= c
->argc 
- j
; 
 871             if (remaining 
>= (setnum 
+ 1) && !strcasecmp(c
->argv
[j
]->ptr
,"weights")) { 
 873                 for (i 
= 0; i 
< setnum
; i
++, j
++, remaining
--) { 
 874                     if (getDoubleFromObjectOrReply(c
,c
->argv
[j
],&src
[i
].weight
, 
 875                             "weight value is not a double") != REDIS_OK
) 
 881             } else if (remaining 
>= 2 && !strcasecmp(c
->argv
[j
]->ptr
,"aggregate")) { 
 883                 if (!strcasecmp(c
->argv
[j
]->ptr
,"sum")) { 
 884                     aggregate 
= REDIS_AGGR_SUM
; 
 885                 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"min")) { 
 886                     aggregate 
= REDIS_AGGR_MIN
; 
 887                 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"max")) { 
 888                     aggregate 
= REDIS_AGGR_MAX
; 
 891                     addReply(c
,shared
.syntaxerr
); 
 897                 addReply(c
,shared
.syntaxerr
); 
 903     /* sort sets from the smallest to largest, this will improve our 
 904      * algorithm's performance */ 
 905     qsort(src
,setnum
,sizeof(zsetopsrc
),qsortCompareZsetopsrcByCardinality
); 
 907     dstobj 
= createZsetObject(); 
 908     dstzset 
= dstobj
->ptr
; 
 910     if (op 
== REDIS_OP_INTER
) { 
 911         /* skip going over all entries if the smallest zset is NULL or empty */ 
 912         if (src
[0].dict 
&& dictSize(src
[0].dict
) > 0) { 
 913             /* precondition: as src[0].dict is non-empty and the zsets are ordered 
 914              * from small to large, all src[i > 0].dict are non-empty too */ 
 915             di 
= dictGetIterator(src
[0].dict
); 
 916             while((de 
= dictNext(di
)) != NULL
) { 
 919                 score 
= src
[0].weight 
* zunionInterDictValue(de
); 
 920                 for (j 
= 1; j 
< setnum
; j
++) { 
 921                     dictEntry 
*other 
= dictFind(src
[j
].dict
,dictGetEntryKey(de
)); 
 923                         value 
= src
[j
].weight 
* zunionInterDictValue(other
); 
 924                         zunionInterAggregate(&score
,value
,aggregate
); 
 930                 /* Only continue when present in every source dict. */ 
 932                     robj 
*o 
= dictGetEntryKey(de
); 
 933                     znode 
= zslInsert(dstzset
->zsl
,score
,o
); 
 934                     incrRefCount(o
); /* added to skiplist */ 
 935                     dictAdd(dstzset
->dict
,o
,&znode
->score
); 
 936                     incrRefCount(o
); /* added to dictionary */ 
 939             dictReleaseIterator(di
); 
 941     } else if (op 
== REDIS_OP_UNION
) { 
 942         for (i 
= 0; i 
< setnum
; i
++) { 
 943             if (!src
[i
].dict
) continue; 
 945             di 
= dictGetIterator(src
[i
].dict
); 
 946             while((de 
= dictNext(di
)) != NULL
) { 
 949                 /* skip key when already processed */ 
 950                 if (dictFind(dstzset
->dict
,dictGetEntryKey(de
)) != NULL
) 
 953                 /* initialize score */ 
 954                 score 
= src
[i
].weight 
* zunionInterDictValue(de
); 
 956                 /* because the zsets are sorted by size, its only possible 
 957                  * for sets at larger indices to hold this entry */ 
 958                 for (j 
= (i
+1); j 
< setnum
; j
++) { 
 959                     dictEntry 
*other 
= dictFind(src
[j
].dict
,dictGetEntryKey(de
)); 
 961                         value 
= src
[j
].weight 
* zunionInterDictValue(other
); 
 962                         zunionInterAggregate(&score
,value
,aggregate
); 
 966                 robj 
*o 
= dictGetEntryKey(de
); 
 967                 znode 
= zslInsert(dstzset
->zsl
,score
,o
); 
 968                 incrRefCount(o
); /* added to skiplist */ 
 969                 dictAdd(dstzset
->dict
,o
,&znode
->score
); 
 970                 incrRefCount(o
); /* added to dictionary */ 
 972             dictReleaseIterator(di
); 
 975         /* unknown operator */ 
 976         redisAssert(op 
== REDIS_OP_INTER 
|| op 
== REDIS_OP_UNION
); 
 979     if (dbDelete(c
->db
,dstkey
)) { 
 980         signalModifiedKey(c
->db
,dstkey
); 
 984     if (dstzset
->zsl
->length
) { 
 985         dbAdd(c
->db
,dstkey
,dstobj
); 
 986         addReplyLongLong(c
, dstzset
->zsl
->length
); 
 987         if (!touched
) signalModifiedKey(c
->db
,dstkey
); 
 990         decrRefCount(dstobj
); 
 991         addReply(c
, shared
.czero
); 
 996 void zunionstoreCommand(redisClient 
*c
) { 
 997     zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_UNION
); 
1000 void zinterstoreCommand(redisClient 
*c
) { 
1001     zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_INTER
); 
1004 void zrangeGenericCommand(redisClient 
*c
, int reverse
) { 
1016     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
1017         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
1019     if (c
->argc 
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) { 
1021     } else if (c
->argc 
>= 5) { 
1022         addReply(c
,shared
.syntaxerr
); 
1026     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.emptymultibulk
)) == NULL
 
1027          || checkType(c
,o
,REDIS_ZSET
)) return; 
1032     /* convert negative indexes */ 
1033     if (start 
< 0) start 
= llen
+start
; 
1034     if (end 
< 0) end 
= llen
+end
; 
1035     if (start 
< 0) start 
= 0; 
1037     /* Invariant: start >= 0, so this test will be true when end < 0. 
1038      * The range is empty when start > end or start >= length. */ 
1039     if (start 
> end 
|| start 
>= llen
) { 
1040         addReply(c
,shared
.emptymultibulk
); 
1043     if (end 
>= llen
) end 
= llen
-1; 
1044     rangelen 
= (end
-start
)+1; 
1046     /* check if starting point is trivial, before searching 
1047      * the element in log(N) time */ 
1049         ln 
= start 
== 0 ? zsl
->tail 
: zslGetElementByRank(zsl
, llen
-start
); 
1052             zsl
->header
->level
[0].forward 
: zslGetElementByRank(zsl
, start
+1); 
1055     /* Return the result in form of a multi-bulk reply */ 
1056     addReplyMultiBulkLen(c
,withscores 
? (rangelen
*2) : rangelen
); 
1057     for (j 
= 0; j 
< rangelen
; j
++) { 
1059         addReplyBulk(c
,ele
); 
1061             addReplyDouble(c
,ln
->score
); 
1062         ln 
= reverse 
? ln
->backward 
: ln
->level
[0].forward
; 
1066 void zrangeCommand(redisClient 
*c
) { 
1067     zrangeGenericCommand(c
,0); 
1070 void zrevrangeCommand(redisClient 
*c
) { 
1071     zrangeGenericCommand(c
,1); 
1074 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT. 
1075  * If "justcount", only the number of elements in the range is returned. */ 
1076 void genericZrangebyscoreCommand(redisClient 
*c
, int reverse
, int justcount
) { 
1078     robj 
*o
, *emptyreply
; 
1082     int offset 
= 0, limit 
= -1; 
1084     unsigned long rangelen 
= 0; 
1085     void *replylen 
= NULL
; 
1088     /* Parse the range arguments. */ 
1090         /* Range is given as [max,min] */ 
1091         maxidx 
= 2; minidx 
= 3; 
1093         /* Range is given as [min,max] */ 
1094         minidx 
= 2; maxidx 
= 3; 
1097     if (zslParseRange(c
->argv
[minidx
],c
->argv
[maxidx
],&range
) != REDIS_OK
) { 
1098         addReplyError(c
,"min or max is not a double"); 
1102     /* Parse optional extra arguments. Note that ZCOUNT will exactly have 
1103      * 4 arguments, so we'll never enter the following code path. */ 
1105         int remaining 
= c
->argc 
- 4; 
1109             if (remaining 
>= 1 && !strcasecmp(c
->argv
[pos
]->ptr
,"withscores")) { 
1112             } else if (remaining 
>= 3 && !strcasecmp(c
->argv
[pos
]->ptr
,"limit")) { 
1113                 offset 
= atoi(c
->argv
[pos
+1]->ptr
); 
1114                 limit 
= atoi(c
->argv
[pos
+2]->ptr
); 
1115                 pos 
+= 3; remaining 
-= 3; 
1117                 addReply(c
,shared
.syntaxerr
); 
1123     /* Ok, lookup the key and get the range */ 
1124     emptyreply 
= justcount 
? shared
.czero 
: shared
.emptymultibulk
; 
1125     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],emptyreply
)) == NULL 
|| 
1126         checkType(c
,o
,REDIS_ZSET
)) return; 
1130     /* If reversed, get the last node in range as starting point. */ 
1132         ln 
= zslLastInRange(zsl
,range
); 
1134         ln 
= zslFirstInRange(zsl
,range
); 
1137     /* No "first" element in the specified interval. */ 
1139         addReply(c
,emptyreply
); 
1143     /* We don't know in advance how many matching elements there are in the 
1144      * list, so we push this object that will represent the multi-bulk length 
1145      * in the output buffer, and will "fix" it later */ 
1147         replylen 
= addDeferredMultiBulkLength(c
); 
1149     /* If there is an offset, just traverse the number of elements without 
1150      * checking the score because that is done in the next loop. */ 
1151     while(ln 
&& offset
--) { 
1152         ln 
= reverse 
? ln
->backward 
: ln
->level
[0].forward
; 
1155     while (ln 
&& limit
--) { 
1156         /* Abort when the node is no longer in range. */ 
1158             if (!zslValueGteMin(ln
->score
,&range
)) break; 
1160             if (!zslValueLteMax(ln
->score
,&range
)) break; 
1166             addReplyBulk(c
,ln
->obj
); 
1168                 addReplyDouble(c
,ln
->score
); 
1171         /* Move to next node */ 
1172         ln 
= reverse 
? ln
->backward 
: ln
->level
[0].forward
; 
1176         addReplyLongLong(c
,(long)rangelen
); 
1178         setDeferredMultiBulkLength(c
,replylen
, 
1179              withscores 
? (rangelen
*2) : rangelen
); 
1183 void zrangebyscoreCommand(redisClient 
*c
) { 
1184     genericZrangebyscoreCommand(c
,0,0); 
1187 void zrevrangebyscoreCommand(redisClient 
*c
) { 
1188     genericZrangebyscoreCommand(c
,1,0); 
1191 void zcountCommand(redisClient 
*c
) { 
1192     genericZrangebyscoreCommand(c
,0,1); 
1195 void zcardCommand(redisClient 
*c
) { 
1199     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
1200         checkType(c
,o
,REDIS_ZSET
)) return; 
1203     addReplyLongLong(c
,zs
->zsl
->length
); 
1206 void zscoreCommand(redisClient 
*c
) { 
1211     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
1212         checkType(c
,o
,REDIS_ZSET
)) return; 
1215     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
1216     de 
= dictFind(zs
->dict
,c
->argv
[2]); 
1218         addReply(c
,shared
.nullbulk
); 
1220         double *score 
= dictGetEntryVal(de
); 
1222         addReplyDouble(c
,*score
); 
1226 void zrankGenericCommand(redisClient 
*c
, int reverse
) { 
1234     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
1235         checkType(c
,o
,REDIS_ZSET
)) return; 
1239     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
1240     de 
= dictFind(zs
->dict
,c
->argv
[2]); 
1242         addReply(c
,shared
.nullbulk
); 
1246     score 
= dictGetEntryVal(de
); 
1247     rank 
= zslGetRank(zsl
, *score
, c
->argv
[2]); 
1250             addReplyLongLong(c
, zsl
->length 
- rank
); 
1252             addReplyLongLong(c
, rank
-1); 
1255         addReply(c
,shared
.nullbulk
); 
1259 void zrankCommand(redisClient 
*c
) { 
1260     zrankGenericCommand(c
, 0); 
1263 void zrevrankCommand(redisClient 
*c
) { 
1264     zrankGenericCommand(c
, 1);