5 /*----------------------------------------------------------------------------- 
   7  *----------------------------------------------------------------------------*/ 
   9 /* ZSETs are ordered sets using two data structures to hold the same elements 
  10  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted 
  13  * The elements are added to an hash table mapping Redis objects to scores. 
  14  * At the same time the elements are added to a skip list mapping scores 
  15  * to Redis objects (so objects are sorted by scores in this "view"). */ 
  17 /* This skiplist implementation is almost a C translation of the original 
  18  * algorithm described by William Pugh in "Skip Lists: A Probabilistic 
  19  * Alternative to Balanced Trees", modified in three ways: 
  20  * a) this implementation allows for repeated values. 
  21  * b) the comparison is not just by key (our 'score') but by satellite data. 
  22  * c) there is a back pointer, so it's a doubly linked list with the back 
  23  * pointers being only at "level 1". This allows to traverse the list 
  24  * from tail to head, useful for ZREVRANGE. */ 
  26 zskiplistNode 
*zslCreateNode(int level
, double score
, robj 
*obj
) { 
  27     zskiplistNode 
*zn 
= zmalloc(sizeof(*zn
)+level
*sizeof(struct zskiplistLevel
)); 
  33 zskiplist 
*zslCreate(void) { 
  37     zsl 
= zmalloc(sizeof(*zsl
)); 
  40     zsl
->header 
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
); 
  41     for (j 
= 0; j 
< ZSKIPLIST_MAXLEVEL
; j
++) { 
  42         zsl
->header
->level
[j
].forward 
= NULL
; 
  43         zsl
->header
->level
[j
].span 
= 0; 
  45     zsl
->header
->backward 
= NULL
; 
  50 void zslFreeNode(zskiplistNode 
*node
) { 
  51     decrRefCount(node
->obj
); 
  55 void zslFree(zskiplist 
*zsl
) { 
  56     zskiplistNode 
*node 
= zsl
->header
->level
[0].forward
, *next
; 
  60         next 
= node
->level
[0].forward
; 
  67 int zslRandomLevel(void) { 
  69     while ((random()&0xFFFF) < (ZSKIPLIST_P 
* 0xFFFF)) 
  71     return (level
<ZSKIPLIST_MAXLEVEL
) ? level 
: ZSKIPLIST_MAXLEVEL
; 
  74 zskiplistNode 
*zslInsert(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
  75     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
  76     unsigned int rank
[ZSKIPLIST_MAXLEVEL
]; 
  80     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
  81         /* store rank that is crossed to reach the insert position */ 
  82         rank
[i
] = i 
== (zsl
->level
-1) ? 0 : rank
[i
+1]; 
  83         while (x
->level
[i
].forward 
&& 
  84             (x
->level
[i
].forward
->score 
< score 
|| 
  85                 (x
->level
[i
].forward
->score 
== score 
&& 
  86                 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) { 
  87             rank
[i
] += x
->level
[i
].span
; 
  88             x 
= x
->level
[i
].forward
; 
  92     /* we assume the key is not already inside, since we allow duplicated 
  93      * scores, and the re-insertion of score and redis object should never 
  94      * happpen since the caller of zslInsert() should test in the hash table 
  95      * if the element is already inside or not. */ 
  96     level 
= zslRandomLevel(); 
  97     if (level 
> zsl
->level
) { 
  98         for (i 
= zsl
->level
; i 
< level
; i
++) { 
 100             update
[i
] = zsl
->header
; 
 101             update
[i
]->level
[i
].span 
= zsl
->length
; 
 105     x 
= zslCreateNode(level
,score
,obj
); 
 106     for (i 
= 0; i 
< level
; i
++) { 
 107         x
->level
[i
].forward 
= update
[i
]->level
[i
].forward
; 
 108         update
[i
]->level
[i
].forward 
= x
; 
 110         /* update span covered by update[i] as x is inserted here */ 
 111         x
->level
[i
].span 
= update
[i
]->level
[i
].span 
- (rank
[0] - rank
[i
]); 
 112         update
[i
]->level
[i
].span 
= (rank
[0] - rank
[i
]) + 1; 
 115     /* increment span for untouched levels */ 
 116     for (i 
= level
; i 
< zsl
->level
; i
++) { 
 117         update
[i
]->level
[i
].span
++; 
 120     x
->backward 
= (update
[0] == zsl
->header
) ? NULL 
: update
[0]; 
 121     if (x
->level
[0].forward
) 
 122         x
->level
[0].forward
->backward 
= x
; 
 129 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */ 
 130 void zslDeleteNode(zskiplist 
*zsl
, zskiplistNode 
*x
, zskiplistNode 
**update
) { 
 132     for (i 
= 0; i 
< zsl
->level
; i
++) { 
 133         if (update
[i
]->level
[i
].forward 
== x
) { 
 134             update
[i
]->level
[i
].span 
+= x
->level
[i
].span 
- 1; 
 135             update
[i
]->level
[i
].forward 
= x
->level
[i
].forward
; 
 137             update
[i
]->level
[i
].span 
-= 1; 
 140     if (x
->level
[0].forward
) { 
 141         x
->level
[0].forward
->backward 
= x
->backward
; 
 143         zsl
->tail 
= x
->backward
; 
 145     while(zsl
->level 
> 1 && zsl
->header
->level
[zsl
->level
-1].forward 
== NULL
) 
 150 /* Delete an element with matching score/object from the skiplist. */ 
 151 int zslDelete(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
 152     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 156     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 157         while (x
->level
[i
].forward 
&& 
 158             (x
->level
[i
].forward
->score 
< score 
|| 
 159                 (x
->level
[i
].forward
->score 
== score 
&& 
 160                 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) 
 161             x 
= x
->level
[i
].forward
; 
 164     /* We may have multiple elements with the same score, what we need 
 165      * is to find the element with both the right score and object. */ 
 166     x 
= x
->level
[0].forward
; 
 167     if (x 
&& score 
== x
->score 
&& equalStringObjects(x
->obj
,obj
)) { 
 168         zslDeleteNode(zsl
, x
, update
); 
 172         return 0; /* not found */ 
 174     return 0; /* not found */ 
 177 /* Struct to hold a inclusive/exclusive range spec. */ 
 180     int minex
, maxex
; /* are min or max exclusive? */ 
 183 /* Delete all the elements with score between min and max from the skiplist. 
 184  * Min and mx are inclusive, so a score >= min || score <= max is deleted. 
 185  * Note that this function takes the reference to the hash table view of the 
 186  * sorted set, in order to remove the elements from the hash table too. */ 
 187 unsigned long zslDeleteRangeByScore(zskiplist 
*zsl
, zrangespec range
, dict 
*dict
) { 
 188     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 189     unsigned long removed 
= 0; 
 193     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 194         while (x
->level
[i
].forward 
&& (range
.minex 
? 
 195             x
->level
[i
].forward
->score 
<= range
.min 
: 
 196             x
->level
[i
].forward
->score 
< range
.min
)) 
 197                 x 
= x
->level
[i
].forward
; 
 201     /* Current node is the last with score < or <= min. */ 
 202     x 
= x
->level
[0].forward
; 
 204     /* Delete nodes while in range. */ 
 205     while (x 
&& (range
.maxex 
? x
->score 
< range
.max 
: x
->score 
<= range
.max
)) { 
 206         zskiplistNode 
*next 
= x
->level
[0].forward
; 
 207         zslDeleteNode(zsl
,x
,update
); 
 208         dictDelete(dict
,x
->obj
); 
 216 /* Delete all the elements with rank between start and end from the skiplist. 
 217  * Start and end are inclusive. Note that start and end need to be 1-based */ 
 218 unsigned long zslDeleteRangeByRank(zskiplist 
*zsl
, unsigned int start
, unsigned int end
, dict 
*dict
) { 
 219     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 220     unsigned long traversed 
= 0, removed 
= 0; 
 224     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 225         while (x
->level
[i
].forward 
&& (traversed 
+ x
->level
[i
].span
) < start
) { 
 226             traversed 
+= x
->level
[i
].span
; 
 227             x 
= x
->level
[i
].forward
; 
 233     x 
= x
->level
[0].forward
; 
 234     while (x 
&& traversed 
<= end
) { 
 235         zskiplistNode 
*next 
= x
->level
[0].forward
; 
 236         zslDeleteNode(zsl
,x
,update
); 
 237         dictDelete(dict
,x
->obj
); 
 246 /* Find the first node having a score equal or greater than the specified one. 
 247  * Returns NULL if there is no match. */ 
 248 zskiplistNode 
*zslFirstWithScore(zskiplist 
*zsl
, double score
) { 
 253     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 254         while (x
->level
[i
].forward 
&& x
->level
[i
].forward
->score 
< score
) 
 255             x 
= x
->level
[i
].forward
; 
 257     /* We may have multiple elements with the same score, what we need 
 258      * is to find the element with both the right score and object. */ 
 259     return x
->level
[0].forward
; 
 262 /* Find the rank for an element by both score and key. 
 263  * Returns 0 when the element cannot be found, rank otherwise. 
 264  * Note that the rank is 1-based due to the span of zsl->header to the 
 266 unsigned long zslistTypeGetRank(zskiplist 
*zsl
, double score
, robj 
*o
) { 
 268     unsigned long rank 
= 0; 
 272     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 273         while (x
->level
[i
].forward 
&& 
 274             (x
->level
[i
].forward
->score 
< score 
|| 
 275                 (x
->level
[i
].forward
->score 
== score 
&& 
 276                 compareStringObjects(x
->level
[i
].forward
->obj
,o
) <= 0))) { 
 277             rank 
+= x
->level
[i
].span
; 
 278             x 
= x
->level
[i
].forward
; 
 281         /* x might be equal to zsl->header, so test if obj is non-NULL */ 
 282         if (x
->obj 
&& equalStringObjects(x
->obj
,o
)) { 
 289 /* Finds an element by its rank. The rank argument needs to be 1-based. */ 
 290 zskiplistNode
* zslistTypeGetElementByRank(zskiplist 
*zsl
, unsigned long rank
) { 
 292     unsigned long traversed 
= 0; 
 296     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 297         while (x
->level
[i
].forward 
&& (traversed 
+ x
->level
[i
].span
) <= rank
) 
 299             traversed 
+= x
->level
[i
].span
; 
 300             x 
= x
->level
[i
].forward
; 
 302         if (traversed 
== rank
) { 
 309 /* Populate the rangespec according to the objects min and max. */ 
 310 static int zslParseRange(robj 
*min
, robj 
*max
, zrangespec 
*spec
) { 
 312     spec
->minex 
= spec
->maxex 
= 0; 
 314     /* Parse the min-max interval. If one of the values is prefixed 
 315      * by the "(" character, it's considered "open". For instance 
 316      * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max 
 317      * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ 
 318     if (min
->encoding 
== REDIS_ENCODING_INT
) { 
 319         spec
->min 
= (long)min
->ptr
; 
 321         if (((char*)min
->ptr
)[0] == '(') { 
 322             spec
->min 
= strtod((char*)min
->ptr
+1,&eptr
); 
 323             if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
; 
 326             spec
->min 
= strtod((char*)min
->ptr
,&eptr
); 
 327             if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
; 
 330     if (max
->encoding 
== REDIS_ENCODING_INT
) { 
 331         spec
->max 
= (long)max
->ptr
; 
 333         if (((char*)max
->ptr
)[0] == '(') { 
 334             spec
->max 
= strtod((char*)max
->ptr
+1,&eptr
); 
 335             if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
; 
 338             spec
->max 
= strtod((char*)max
->ptr
,&eptr
); 
 339             if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
; 
 347 /*----------------------------------------------------------------------------- 
 348  * Sorted set commands  
 349  *----------------------------------------------------------------------------*/ 
 351 /* This generic command implements both ZADD and ZINCRBY. */ 
 352 void zaddGenericCommand(redisClient 
*c
, robj 
*key
, robj 
*ele
, double score
, int incr
) { 
 355     zskiplistNode 
*znode
; 
 357     zsetobj 
= lookupKeyWrite(c
->db
,key
); 
 358     if (zsetobj 
== NULL
) { 
 359         zsetobj 
= createZsetObject(); 
 360         dbAdd(c
->db
,key
,zsetobj
); 
 362         if (zsetobj
->type 
!= REDIS_ZSET
) { 
 363             addReply(c
,shared
.wrongtypeerr
); 
 369     /* Since both ZADD and ZINCRBY are implemented here, we need to increment 
 370      * the score first by the current score if ZINCRBY is called. */ 
 372         /* Read the old score. If the element was not present starts from 0 */ 
 373         dictEntry 
*de 
= dictFind(zs
->dict
,ele
); 
 375             score 
+= *(double*)dictGetEntryVal(de
); 
 378             addReplyError(c
,"resulting score is not a number (NaN)"); 
 379             /* Note that we don't need to check if the zset may be empty and 
 380              * should be removed here, as we can only obtain Nan as score if 
 381              * there was already an element in the sorted set. */ 
 386     /* We need to remove and re-insert the element when it was already present 
 387      * in the dictionary, to update the skiplist. Note that we delay adding a 
 388      * pointer to the score because we want to reference the score in the 
 390     if (dictAdd(zs
->dict
,ele
,NULL
) == DICT_OK
) { 
 394         incrRefCount(ele
); /* added to hash */ 
 395         znode 
= zslInsert(zs
->zsl
,score
,ele
); 
 396         incrRefCount(ele
); /* added to skiplist */ 
 398         /* Update the score in the dict entry */ 
 399         de 
= dictFind(zs
->dict
,ele
); 
 400         redisAssert(de 
!= NULL
); 
 401         dictGetEntryVal(de
) = &znode
->score
; 
 402         signalModifiedKey(c
->db
,c
->argv
[1]); 
 405             addReplyDouble(c
,score
); 
 407             addReply(c
,shared
.cone
); 
 415         de 
= dictFind(zs
->dict
,ele
); 
 416         redisAssert(de 
!= NULL
); 
 417         curobj 
= dictGetEntryKey(de
); 
 418         curscore 
= dictGetEntryVal(de
); 
 420         /* When the score is updated, reuse the existing string object to 
 421          * prevent extra alloc/dealloc of strings on ZINCRBY. */ 
 422         if (score 
!= *curscore
) { 
 423             deleted 
= zslDelete(zs
->zsl
,*curscore
,curobj
); 
 424             redisAssert(deleted 
!= 0); 
 425             znode 
= zslInsert(zs
->zsl
,score
,curobj
); 
 426             incrRefCount(curobj
); 
 428             /* Update the score in the current dict entry */ 
 429             dictGetEntryVal(de
) = &znode
->score
; 
 430             signalModifiedKey(c
->db
,c
->argv
[1]); 
 434             addReplyDouble(c
,score
); 
 436             addReply(c
,shared
.czero
); 
 440 void zaddCommand(redisClient 
*c
) { 
 442     if (getDoubleFromObjectOrReply(c
,c
->argv
[2],&scoreval
,NULL
) != REDIS_OK
) return; 
 443     c
->argv
[3] = tryObjectEncoding(c
->argv
[3]); 
 444     zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0); 
 447 void zincrbyCommand(redisClient 
*c
) { 
 449     if (getDoubleFromObjectOrReply(c
,c
->argv
[2],&scoreval
,NULL
) != REDIS_OK
) return; 
 450     c
->argv
[3] = tryObjectEncoding(c
->argv
[3]); 
 451     zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1); 
 454 void zremCommand(redisClient 
*c
) { 
 461     if ((zsetobj 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 462         checkType(c
,zsetobj
,REDIS_ZSET
)) return; 
 465     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
 466     de 
= dictFind(zs
->dict
,c
->argv
[2]); 
 468         addReply(c
,shared
.czero
); 
 471     /* Delete from the skiplist */ 
 472     curscore 
= *(double*)dictGetEntryVal(de
); 
 473     deleted 
= zslDelete(zs
->zsl
,curscore
,c
->argv
[2]); 
 474     redisAssert(deleted 
!= 0); 
 476     /* Delete from the hash table */ 
 477     dictDelete(zs
->dict
,c
->argv
[2]); 
 478     if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
 479     if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 480     signalModifiedKey(c
->db
,c
->argv
[1]); 
 482     addReply(c
,shared
.cone
); 
 485 void zremrangebyscoreCommand(redisClient 
*c
) { 
 491     /* Parse the range arguments. */ 
 492     if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) { 
 493         addReplyError(c
,"min or max is not a double"); 
 497     if ((o 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 498         checkType(c
,o
,REDIS_ZSET
)) return; 
 501     deleted 
= zslDeleteRangeByScore(zs
->zsl
,range
,zs
->dict
); 
 502     if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
 503     if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 504     if (deleted
) signalModifiedKey(c
->db
,c
->argv
[1]); 
 505     server
.dirty 
+= deleted
; 
 506     addReplyLongLong(c
,deleted
); 
 509 void zremrangebyrankCommand(redisClient 
*c
) { 
 517     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
 518         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
 520     if ((zsetobj 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 521         checkType(c
,zsetobj
,REDIS_ZSET
)) return; 
 523     llen 
= zs
->zsl
->length
; 
 525     /* convert negative indexes */ 
 526     if (start 
< 0) start 
= llen
+start
; 
 527     if (end 
< 0) end 
= llen
+end
; 
 528     if (start 
< 0) start 
= 0; 
 530     /* Invariant: start >= 0, so this test will be true when end < 0. 
 531      * The range is empty when start > end or start >= length. */ 
 532     if (start 
> end 
|| start 
>= llen
) { 
 533         addReply(c
,shared
.czero
); 
 536     if (end 
>= llen
) end 
= llen
-1; 
 538     /* increment start and end because zsl*Rank functions 
 539      * use 1-based rank */ 
 540     deleted 
= zslDeleteRangeByRank(zs
->zsl
,start
+1,end
+1,zs
->dict
); 
 541     if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
 542     if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 543     if (deleted
) signalModifiedKey(c
->db
,c
->argv
[1]); 
 544     server
.dirty 
+= deleted
; 
 545     addReplyLongLong(c
, deleted
); 
 553 int qsortCompareZsetopsrcByCardinality(const void *s1
, const void *s2
) { 
 554     zsetopsrc 
*d1 
= (void*) s1
, *d2 
= (void*) s2
; 
 555     unsigned long size1
, size2
; 
 556     size1 
= d1
->dict 
? dictSize(d1
->dict
) : 0; 
 557     size2 
= d2
->dict 
? dictSize(d2
->dict
) : 0; 
 558     return size1 
- size2
; 
 561 #define REDIS_AGGR_SUM 1 
 562 #define REDIS_AGGR_MIN 2 
 563 #define REDIS_AGGR_MAX 3 
 564 #define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e)) 
 566 inline static void zunionInterAggregate(double *target
, double val
, int aggregate
) { 
 567     if (aggregate 
== REDIS_AGGR_SUM
) { 
 568         *target 
= *target 
+ val
; 
 569         /* The result of adding two doubles is NaN when one variable 
 570          * is +inf and the other is -inf. When these numbers are added, 
 571          * we maintain the convention of the result being 0.0. */ 
 572         if (isnan(*target
)) *target 
= 0.0; 
 573     } else if (aggregate 
== REDIS_AGGR_MIN
) { 
 574         *target 
= val 
< *target 
? val 
: *target
; 
 575     } else if (aggregate 
== REDIS_AGGR_MAX
) { 
 576         *target 
= val 
> *target 
? val 
: *target
; 
 579         redisPanic("Unknown ZUNION/INTER aggregate type"); 
 583 void zunionInterGenericCommand(redisClient 
*c
, robj 
*dstkey
, int op
) { 
 585     int aggregate 
= REDIS_AGGR_SUM
; 
 589     zskiplistNode 
*znode
; 
 594     /* expect setnum input keys to be given */ 
 595     setnum 
= atoi(c
->argv
[2]->ptr
); 
 598             "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); 
 602     /* test if the expected number of keys would overflow */ 
 603     if (3+setnum 
> c
->argc
) { 
 604         addReply(c
,shared
.syntaxerr
); 
 608     /* read keys to be used for input */ 
 609     src 
= zmalloc(sizeof(zsetopsrc
) * setnum
); 
 610     for (i 
= 0, j 
= 3; i 
< setnum
; i
++, j
++) { 
 611         robj 
*obj 
= lookupKeyWrite(c
->db
,c
->argv
[j
]); 
 615             if (obj
->type 
== REDIS_ZSET
) { 
 616                 src
[i
].dict 
= ((zset
*)obj
->ptr
)->dict
; 
 617             } else if (obj
->type 
== REDIS_SET
) { 
 618                 src
[i
].dict 
= (obj
->ptr
); 
 621                 addReply(c
,shared
.wrongtypeerr
); 
 626         /* default all weights to 1 */ 
 630     /* parse optional extra arguments */ 
 632         int remaining 
= c
->argc 
- j
; 
 635             if (remaining 
>= (setnum 
+ 1) && !strcasecmp(c
->argv
[j
]->ptr
,"weights")) { 
 637                 for (i 
= 0; i 
< setnum
; i
++, j
++, remaining
--) { 
 638                     if (getDoubleFromObjectOrReply(c
,c
->argv
[j
],&src
[i
].weight
, 
 639                             "weight value is not a double") != REDIS_OK
) 
 645             } else if (remaining 
>= 2 && !strcasecmp(c
->argv
[j
]->ptr
,"aggregate")) { 
 647                 if (!strcasecmp(c
->argv
[j
]->ptr
,"sum")) { 
 648                     aggregate 
= REDIS_AGGR_SUM
; 
 649                 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"min")) { 
 650                     aggregate 
= REDIS_AGGR_MIN
; 
 651                 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"max")) { 
 652                     aggregate 
= REDIS_AGGR_MAX
; 
 655                     addReply(c
,shared
.syntaxerr
); 
 661                 addReply(c
,shared
.syntaxerr
); 
 667     /* sort sets from the smallest to largest, this will improve our 
 668      * algorithm's performance */ 
 669     qsort(src
,setnum
,sizeof(zsetopsrc
),qsortCompareZsetopsrcByCardinality
); 
 671     dstobj 
= createZsetObject(); 
 672     dstzset 
= dstobj
->ptr
; 
 674     if (op 
== REDIS_OP_INTER
) { 
 675         /* skip going over all entries if the smallest zset is NULL or empty */ 
 676         if (src
[0].dict 
&& dictSize(src
[0].dict
) > 0) { 
 677             /* precondition: as src[0].dict is non-empty and the zsets are ordered 
 678              * from small to large, all src[i > 0].dict are non-empty too */ 
 679             di 
= dictGetIterator(src
[0].dict
); 
 680             while((de 
= dictNext(di
)) != NULL
) { 
 683                 score 
= src
[0].weight 
* zunionInterDictValue(de
); 
 684                 for (j 
= 1; j 
< setnum
; j
++) { 
 685                     dictEntry 
*other 
= dictFind(src
[j
].dict
,dictGetEntryKey(de
)); 
 687                         value 
= src
[j
].weight 
* zunionInterDictValue(other
); 
 688                         zunionInterAggregate(&score
,value
,aggregate
); 
 694                 /* Only continue when present in every source dict. */ 
 696                     robj 
*o 
= dictGetEntryKey(de
); 
 697                     znode 
= zslInsert(dstzset
->zsl
,score
,o
); 
 698                     incrRefCount(o
); /* added to skiplist */ 
 699                     dictAdd(dstzset
->dict
,o
,&znode
->score
); 
 700                     incrRefCount(o
); /* added to dictionary */ 
 703             dictReleaseIterator(di
); 
 705     } else if (op 
== REDIS_OP_UNION
) { 
 706         for (i 
= 0; i 
< setnum
; i
++) { 
 707             if (!src
[i
].dict
) continue; 
 709             di 
= dictGetIterator(src
[i
].dict
); 
 710             while((de 
= dictNext(di
)) != NULL
) { 
 713                 /* skip key when already processed */ 
 714                 if (dictFind(dstzset
->dict
,dictGetEntryKey(de
)) != NULL
) 
 717                 /* initialize score */ 
 718                 score 
= src
[i
].weight 
* zunionInterDictValue(de
); 
 720                 /* because the zsets are sorted by size, its only possible 
 721                  * for sets at larger indices to hold this entry */ 
 722                 for (j 
= (i
+1); j 
< setnum
; j
++) { 
 723                     dictEntry 
*other 
= dictFind(src
[j
].dict
,dictGetEntryKey(de
)); 
 725                         value 
= src
[j
].weight 
* zunionInterDictValue(other
); 
 726                         zunionInterAggregate(&score
,value
,aggregate
); 
 730                 robj 
*o 
= dictGetEntryKey(de
); 
 731                 znode 
= zslInsert(dstzset
->zsl
,score
,o
); 
 732                 incrRefCount(o
); /* added to skiplist */ 
 733                 dictAdd(dstzset
->dict
,o
,&znode
->score
); 
 734                 incrRefCount(o
); /* added to dictionary */ 
 736             dictReleaseIterator(di
); 
 739         /* unknown operator */ 
 740         redisAssert(op 
== REDIS_OP_INTER 
|| op 
== REDIS_OP_UNION
); 
 743     if (dbDelete(c
->db
,dstkey
)) { 
 744         signalModifiedKey(c
->db
,dstkey
); 
 748     if (dstzset
->zsl
->length
) { 
 749         dbAdd(c
->db
,dstkey
,dstobj
); 
 750         addReplyLongLong(c
, dstzset
->zsl
->length
); 
 751         if (!touched
) signalModifiedKey(c
->db
,dstkey
); 
 754         decrRefCount(dstobj
); 
 755         addReply(c
, shared
.czero
); 
 760 void zunionstoreCommand(redisClient 
*c
) { 
 761     zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_UNION
); 
 764 void zinterstoreCommand(redisClient 
*c
) { 
 765     zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_INTER
); 
 768 void zrangeGenericCommand(redisClient 
*c
, int reverse
) { 
 780     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
 781         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
 783     if (c
->argc 
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) { 
 785     } else if (c
->argc 
>= 5) { 
 786         addReply(c
,shared
.syntaxerr
); 
 790     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.emptymultibulk
)) == NULL
 
 791          || checkType(c
,o
,REDIS_ZSET
)) return; 
 796     /* convert negative indexes */ 
 797     if (start 
< 0) start 
= llen
+start
; 
 798     if (end 
< 0) end 
= llen
+end
; 
 799     if (start 
< 0) start 
= 0; 
 801     /* Invariant: start >= 0, so this test will be true when end < 0. 
 802      * The range is empty when start > end or start >= length. */ 
 803     if (start 
> end 
|| start 
>= llen
) { 
 804         addReply(c
,shared
.emptymultibulk
); 
 807     if (end 
>= llen
) end 
= llen
-1; 
 808     rangelen 
= (end
-start
)+1; 
 810     /* check if starting point is trivial, before searching 
 811      * the element in log(N) time */ 
 813         ln 
= start 
== 0 ? zsl
->tail 
: zslistTypeGetElementByRank(zsl
, llen
-start
); 
 816             zsl
->header
->level
[0].forward 
: zslistTypeGetElementByRank(zsl
, start
+1); 
 819     /* Return the result in form of a multi-bulk reply */ 
 820     addReplyMultiBulkLen(c
,withscores 
? (rangelen
*2) : rangelen
); 
 821     for (j 
= 0; j 
< rangelen
; j
++) { 
 825             addReplyDouble(c
,ln
->score
); 
 826         ln 
= reverse 
? ln
->backward 
: ln
->level
[0].forward
; 
 830 void zrangeCommand(redisClient 
*c
) { 
 831     zrangeGenericCommand(c
,0); 
 834 void zrevrangeCommand(redisClient 
*c
) { 
 835     zrangeGenericCommand(c
,1); 
 838 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT. 
 839  * If "justcount", only the number of elements in the range is returned. */ 
 840 void genericZrangebyscoreCommand(redisClient 
*c
, int reverse
, int justcount
) { 
 842     robj 
*o
, *emptyreply
; 
 846     int offset 
= 0, limit 
= -1; 
 848     unsigned long rangelen 
= 0; 
 849     void *replylen 
= NULL
; 
 851     /* Parse the range arguments. */ 
 852     if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) { 
 853         addReplyError(c
,"min or max is not a double"); 
 857     /* Parse optional extra arguments. Note that ZCOUNT will exactly have 
 858      * 4 arguments, so we'll never enter the following code path. */ 
 860         int remaining 
= c
->argc 
- 4; 
 864             if (remaining 
>= 1 && !strcasecmp(c
->argv
[pos
]->ptr
,"withscores")) { 
 867             } else if (remaining 
>= 3 && !strcasecmp(c
->argv
[pos
]->ptr
,"limit")) { 
 868                 offset 
= atoi(c
->argv
[pos
+1]->ptr
); 
 869                 limit 
= atoi(c
->argv
[pos
+2]->ptr
); 
 870                 pos 
+= 3; remaining 
-= 3; 
 872                 addReply(c
,shared
.syntaxerr
); 
 878     /* Ok, lookup the key and get the range */ 
 879     emptyreply 
= justcount 
? shared
.czero 
: shared
.emptymultibulk
; 
 880     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],emptyreply
)) == NULL 
|| 
 881         checkType(c
,o
,REDIS_ZSET
)) return; 
 885     /* If reversed, assume the elements are sorted from high to low score. */ 
 886     ln 
= zslFirstWithScore(zsl
,range
.min
); 
 888         /* If range.min is out of range, ln will be NULL and we need to use 
 889          * the tail of the skiplist as first node of the range. */ 
 890         if (ln 
== NULL
) ln 
= zsl
->tail
; 
 892         /* zslFirstWithScore returns the first element with where with 
 893          * score >= range.min, so backtrack to make sure the element we use 
 894          * here has score <= range.min. */ 
 895         while (ln 
&& ln
->score 
> range
.min
) ln 
= ln
->backward
; 
 897         /* Move to the right element according to the range spec. */ 
 899             /* Find last element with score < range.min */ 
 900             while (ln 
&& ln
->score 
== range
.min
) ln 
= ln
->backward
; 
 902             /* Find last element with score <= range.min */ 
 903             while (ln 
&& ln
->level
[0].forward 
&& 
 904                          ln
->level
[0].forward
->score 
== range
.min
) 
 905                 ln 
= ln
->level
[0].forward
; 
 909             /* Find first element with score > range.min */ 
 910             while (ln 
&& ln
->score 
== range
.min
) ln 
= ln
->level
[0].forward
; 
 914     /* No "first" element in the specified interval. */ 
 916         addReply(c
,emptyreply
); 
 920     /* We don't know in advance how many matching elements there 
 921      * are in the list, so we push this object that will represent 
 922      * the multi-bulk length in the output buffer, and will "fix" 
 925         replylen 
= addDeferredMultiBulkLength(c
); 
 927     /* If there is an offset, just traverse the number of elements without 
 928      * checking the score because that is done in the next loop. */ 
 929     while(ln 
&& offset
--) { 
 933             ln 
= ln
->level
[0].forward
; 
 936     while (ln 
&& limit
--) { 
 937         /* Check if this this element is in range. */ 
 940                 /* Element should have score > range.max */ 
 941                 if (ln
->score 
<= range
.max
) break; 
 943                 /* Element should have score >= range.max */ 
 944                 if (ln
->score 
< range
.max
) break; 
 948                 /* Element should have score < range.max */ 
 949                 if (ln
->score 
>= range
.max
) break; 
 951                 /* Element should have score <= range.max */ 
 952                 if (ln
->score 
> range
.max
) break; 
 959             addReplyBulk(c
,ln
->obj
); 
 961                 addReplyDouble(c
,ln
->score
); 
 967             ln 
= ln
->level
[0].forward
; 
 971         addReplyLongLong(c
,(long)rangelen
); 
 973         setDeferredMultiBulkLength(c
,replylen
, 
 974              withscores 
? (rangelen
*2) : rangelen
); 
 978 void zrangebyscoreCommand(redisClient 
*c
) { 
 979     genericZrangebyscoreCommand(c
,0,0); 
 982 void zrevrangebyscoreCommand(redisClient 
*c
) { 
 983     genericZrangebyscoreCommand(c
,1,0); 
 986 void zcountCommand(redisClient 
*c
) { 
 987     genericZrangebyscoreCommand(c
,0,1); 
 990 void zcardCommand(redisClient 
*c
) { 
 994     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 995         checkType(c
,o
,REDIS_ZSET
)) return; 
 998     addReplyLongLong(c
,zs
->zsl
->length
); 
1001 void zscoreCommand(redisClient 
*c
) { 
1006     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
1007         checkType(c
,o
,REDIS_ZSET
)) return; 
1010     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
1011     de 
= dictFind(zs
->dict
,c
->argv
[2]); 
1013         addReply(c
,shared
.nullbulk
); 
1015         double *score 
= dictGetEntryVal(de
); 
1017         addReplyDouble(c
,*score
); 
1021 void zrankGenericCommand(redisClient 
*c
, int reverse
) { 
1029     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
1030         checkType(c
,o
,REDIS_ZSET
)) return; 
1034     c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
1035     de 
= dictFind(zs
->dict
,c
->argv
[2]); 
1037         addReply(c
,shared
.nullbulk
); 
1041     score 
= dictGetEntryVal(de
); 
1042     rank 
= zslistTypeGetRank(zsl
, *score
, c
->argv
[2]); 
1045             addReplyLongLong(c
, zsl
->length 
- rank
); 
1047             addReplyLongLong(c
, rank
-1); 
1050         addReply(c
,shared
.nullbulk
); 
1054 void zrankCommand(redisClient 
*c
) { 
1055     zrankGenericCommand(c
, 0); 
1058 void zrevrankCommand(redisClient 
*c
) { 
1059     zrankGenericCommand(c
, 1);