5 /*----------------------------------------------------------------------------- 
   7  *----------------------------------------------------------------------------*/ 
   9 /* ZSETs are ordered sets using two data structures to hold the same elements 
  10  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted 
  13  * The elements are added to an hash table mapping Redis objects to scores. 
  14  * At the same time the elements are added to a skip list mapping scores 
  15  * to Redis objects (so objects are sorted by scores in this "view"). */ 
  17 /* This skiplist implementation is almost a C translation of the original 
  18  * algorithm described by William Pugh in "Skip Lists: A Probabilistic 
  19  * Alternative to Balanced Trees", modified in three ways: 
  20  * a) this implementation allows for repeated values. 
  21  * b) the comparison is not just by key (our 'score') but by satellite data. 
  22  * c) there is a back pointer, so it's a doubly linked list with the back 
  23  * pointers being only at "level 1". This allows to traverse the list 
  24  * from tail to head, useful for ZREVRANGE. */ 
  26 zskiplistNode 
*zslCreateNode(int level
, double score
, robj 
*obj
) { 
  27     zskiplistNode 
*zn 
= zmalloc(sizeof(*zn
)+level
*sizeof(struct zskiplistLevel
)); 
  33 zskiplist 
*zslCreate(void) { 
  37     zsl 
= zmalloc(sizeof(*zsl
)); 
  40     zsl
->header 
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
); 
  41     for (j 
= 0; j 
< ZSKIPLIST_MAXLEVEL
; j
++) { 
  42         zsl
->header
->level
[j
].forward 
= NULL
; 
  43         zsl
->header
->level
[j
].span 
= 0; 
  45     zsl
->header
->backward 
= NULL
; 
  50 void zslFreeNode(zskiplistNode 
*node
) { 
  51     decrRefCount(node
->obj
); 
  55 void zslFree(zskiplist 
*zsl
) { 
  56     zskiplistNode 
*node 
= zsl
->header
->level
[0].forward
, *next
; 
  60         next 
= node
->level
[0].forward
; 
  67 int zslRandomLevel(void) { 
  69     while ((random()&0xFFFF) < (ZSKIPLIST_P 
* 0xFFFF)) 
  71     return (level
<ZSKIPLIST_MAXLEVEL
) ? level 
: ZSKIPLIST_MAXLEVEL
; 
  74 void zslInsert(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
  75     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
  76     unsigned int rank
[ZSKIPLIST_MAXLEVEL
]; 
  80     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
  81         /* store rank that is crossed to reach the insert position */ 
  82         rank
[i
] = i 
== (zsl
->level
-1) ? 0 : rank
[i
+1]; 
  83         while (x
->level
[i
].forward 
&& 
  84             (x
->level
[i
].forward
->score 
< score 
|| 
  85                 (x
->level
[i
].forward
->score 
== score 
&& 
  86                 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) { 
  87             rank
[i
] += x
->level
[i
].span
; 
  88             x 
= x
->level
[i
].forward
; 
  92     /* we assume the key is not already inside, since we allow duplicated 
  93      * scores, and the re-insertion of score and redis object should never 
  94      * happpen since the caller of zslInsert() should test in the hash table 
  95      * if the element is already inside or not. */ 
  96     level 
= zslRandomLevel(); 
  97     if (level 
> zsl
->level
) { 
  98         for (i 
= zsl
->level
; i 
< level
; i
++) { 
 100             update
[i
] = zsl
->header
; 
 101             update
[i
]->level
[i
].span 
= zsl
->length
; 
 105     x 
= zslCreateNode(level
,score
,obj
); 
 106     for (i 
= 0; i 
< level
; i
++) { 
 107         x
->level
[i
].forward 
= update
[i
]->level
[i
].forward
; 
 108         update
[i
]->level
[i
].forward 
= x
; 
 110         /* update span covered by update[i] as x is inserted here */ 
 111         x
->level
[i
].span 
= update
[i
]->level
[i
].span 
- (rank
[0] - rank
[i
]); 
 112         update
[i
]->level
[i
].span 
= (rank
[0] - rank
[i
]) + 1; 
 115     /* increment span for untouched levels */ 
 116     for (i 
= level
; i 
< zsl
->level
; i
++) { 
 117         update
[i
]->level
[i
].span
++; 
 120     x
->backward 
= (update
[0] == zsl
->header
) ? NULL 
: update
[0]; 
 121     if (x
->level
[0].forward
) 
 122         x
->level
[0].forward
->backward 
= x
; 
 128 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */ 
 129 void zslDeleteNode(zskiplist 
*zsl
, zskiplistNode 
*x
, zskiplistNode 
**update
) { 
 131     for (i 
= 0; i 
< zsl
->level
; i
++) { 
 132         if (update
[i
]->level
[i
].forward 
== x
) { 
 133             update
[i
]->level
[i
].span 
+= x
->level
[i
].span 
- 1; 
 134             update
[i
]->level
[i
].forward 
= x
->level
[i
].forward
; 
 136             update
[i
]->level
[i
].span 
-= 1; 
 139     if (x
->level
[0].forward
) { 
 140         x
->level
[0].forward
->backward 
= x
->backward
; 
 142         zsl
->tail 
= x
->backward
; 
 144     while(zsl
->level 
> 1 && zsl
->header
->level
[zsl
->level
-1].forward 
== NULL
) 
 149 /* Delete an element with matching score/object from the skiplist. */ 
 150 int zslDelete(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
 151     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 155     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 156         while (x
->level
[i
].forward 
&& 
 157             (x
->level
[i
].forward
->score 
< score 
|| 
 158                 (x
->level
[i
].forward
->score 
== score 
&& 
 159                 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) 
 160             x 
= x
->level
[i
].forward
; 
 163     /* We may have multiple elements with the same score, what we need 
 164      * is to find the element with both the right score and object. */ 
 165     x 
= x
->level
[0].forward
; 
 166     if (x 
&& score 
== x
->score 
&& equalStringObjects(x
->obj
,obj
)) { 
 167         zslDeleteNode(zsl
, x
, update
); 
 171         return 0; /* not found */ 
 173     return 0; /* not found */ 
 176 /* Delete all the elements with score between min and max from the skiplist. 
 177  * Min and mx are inclusive, so a score >= min || score <= max is deleted. 
 178  * Note that this function takes the reference to the hash table view of the 
 179  * sorted set, in order to remove the elements from the hash table too. */ 
 180 unsigned long zslDeleteRangeByScore(zskiplist 
*zsl
, double min
, double max
, dict 
*dict
) { 
 181     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 182     unsigned long removed 
= 0; 
 186     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 187         while (x
->level
[i
].forward 
&& x
->level
[i
].forward
->score 
< min
) 
 188             x 
= x
->level
[i
].forward
; 
 191     /* We may have multiple elements with the same score, what we need 
 192      * is to find the element with both the right score and object. */ 
 193     x 
= x
->level
[0].forward
; 
 194     while (x 
&& x
->score 
<= max
) { 
 195         zskiplistNode 
*next 
= x
->level
[0].forward
; 
 196         zslDeleteNode(zsl
, x
, update
); 
 197         dictDelete(dict
,x
->obj
); 
 202     return removed
; /* not found */ 
 205 /* Delete all the elements with rank between start and end from the skiplist. 
 206  * Start and end are inclusive. Note that start and end need to be 1-based */ 
 207 unsigned long zslDeleteRangeByRank(zskiplist 
*zsl
, unsigned int start
, unsigned int end
, dict 
*dict
) { 
 208     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 209     unsigned long traversed 
= 0, removed 
= 0; 
 213     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 214         while (x
->level
[i
].forward 
&& (traversed 
+ x
->level
[i
].span
) < start
) { 
 215             traversed 
+= x
->level
[i
].span
; 
 216             x 
= x
->level
[i
].forward
; 
 222     x 
= x
->level
[0].forward
; 
 223     while (x 
&& traversed 
<= end
) { 
 224         zskiplistNode 
*next 
= x
->level
[0].forward
; 
 225         zslDeleteNode(zsl
, x
, update
); 
 226         dictDelete(dict
,x
->obj
); 
 235 /* Find the first node having a score equal or greater than the specified one. 
 236  * Returns NULL if there is no match. */ 
 237 zskiplistNode 
*zslFirstWithScore(zskiplist 
*zsl
, double score
) { 
 242     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 243         while (x
->level
[i
].forward 
&& x
->level
[i
].forward
->score 
< score
) 
 244             x 
= x
->level
[i
].forward
; 
 246     /* We may have multiple elements with the same score, what we need 
 247      * is to find the element with both the right score and object. */ 
 248     return x
->level
[0].forward
; 
 251 /* Find the rank for an element by both score and key. 
 252  * Returns 0 when the element cannot be found, rank otherwise. 
 253  * Note that the rank is 1-based due to the span of zsl->header to the 
 255 unsigned long zslistTypeGetRank(zskiplist 
*zsl
, double score
, robj 
*o
) { 
 257     unsigned long rank 
= 0; 
 261     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 262         while (x
->level
[i
].forward 
&& 
 263             (x
->level
[i
].forward
->score 
< score 
|| 
 264                 (x
->level
[i
].forward
->score 
== score 
&& 
 265                 compareStringObjects(x
->level
[i
].forward
->obj
,o
) <= 0))) { 
 266             rank 
+= x
->level
[i
].span
; 
 267             x 
= x
->level
[i
].forward
; 
 270         /* x might be equal to zsl->header, so test if obj is non-NULL */ 
 271         if (x
->obj 
&& equalStringObjects(x
->obj
,o
)) { 
 278 /* Finds an element by its rank. The rank argument needs to be 1-based. */ 
 279 zskiplistNode
* zslistTypeGetElementByRank(zskiplist 
*zsl
, unsigned long rank
) { 
 281     unsigned long traversed 
= 0; 
 285     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 286         while (x
->level
[i
].forward 
&& (traversed 
+ x
->level
[i
].span
) <= rank
) 
 288             traversed 
+= x
->level
[i
].span
; 
 289             x 
= x
->level
[i
].forward
; 
 291         if (traversed 
== rank
) { 
 298 /*----------------------------------------------------------------------------- 
 299  * Sorted set commands  
 300  *----------------------------------------------------------------------------*/ 
 302 /* This generic command implements both ZADD and ZINCRBY. 
 303  * scoreval is the score if the operation is a ZADD (doincrement == 0) or 
 304  * the increment if the operation is a ZINCRBY (doincrement == 1). */ 
 305 void zaddGenericCommand(redisClient 
*c
, robj 
*key
, robj 
*ele
, double scoreval
, int doincrement
) { 
 310     zsetobj 
= lookupKeyWrite(c
->db
,key
); 
 311     if (zsetobj 
== NULL
) { 
 312         zsetobj 
= createZsetObject(); 
 313         dbAdd(c
->db
,key
,zsetobj
); 
 315         if (zsetobj
->type 
!= REDIS_ZSET
) { 
 316             addReply(c
,shared
.wrongtypeerr
); 
 322     /* Ok now since we implement both ZADD and ZINCRBY here the code 
 323      * needs to handle the two different conditions. It's all about setting 
 324      * '*score', that is, the new score to set, to the right value. */ 
 325     score 
= zmalloc(sizeof(double)); 
 329         /* Read the old score. If the element was not present starts from 0 */ 
 330         de 
= dictFind(zs
->dict
,ele
); 
 332             double *oldscore 
= dictGetEntryVal(de
); 
 333             *score 
= *oldscore 
+ scoreval
; 
 339                 sdsnew("-ERR resulting score is not a number (NaN)\r\n")); 
 341             /* Note that we don't need to check if the zset may be empty and 
 342              * should be removed here, as we can only obtain Nan as score if 
 343              * there was already an element in the sorted set. */ 
 350     /* What follows is a simple remove and re-insert operation that is common 
 351      * to both ZADD and ZINCRBY... */ 
 352     if (dictAdd(zs
->dict
,ele
,score
) == DICT_OK
) { 
 353         /* case 1: New element */ 
 354         incrRefCount(ele
); /* added to hash */ 
 355         zslInsert(zs
->zsl
,*score
,ele
); 
 356         incrRefCount(ele
); /* added to skiplist */ 
 357         touchWatchedKey(c
->db
,c
->argv
[1]); 
 360             addReplyDouble(c
,*score
); 
 362             addReply(c
,shared
.cone
); 
 367         /* case 2: Score update operation */ 
 368         de 
= dictFind(zs
->dict
,ele
); 
 369         redisAssert(de 
!= NULL
); 
 370         oldscore 
= dictGetEntryVal(de
); 
 371         if (*score 
!= *oldscore
) { 
 374             /* Remove and insert the element in the skip list with new score */ 
 375             deleted 
= zslDelete(zs
->zsl
,*oldscore
,ele
); 
 376             redisAssert(deleted 
!= 0); 
 377             zslInsert(zs
->zsl
,*score
,ele
); 
 379             /* Update the score in the hash table */ 
 380             dictReplace(zs
->dict
,ele
,score
); 
 381             touchWatchedKey(c
->db
,c
->argv
[1]); 
 387             addReplyDouble(c
,*score
); 
 389             addReply(c
,shared
.czero
); 
 393 void zaddCommand(redisClient 
*c
) { 
 395     if (getDoubleFromObjectOrReply(c
,c
->argv
[2],&scoreval
,NULL
) != REDIS_OK
) return; 
 396     zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,0); 
 399 void zincrbyCommand(redisClient 
*c
) { 
 401     if (getDoubleFromObjectOrReply(c
,c
->argv
[2],&scoreval
,NULL
) != REDIS_OK
) return; 
 402     zaddGenericCommand(c
,c
->argv
[1],c
->argv
[3],scoreval
,1); 
 405 void zremCommand(redisClient 
*c
) { 
 412     if ((zsetobj 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 413         checkType(c
,zsetobj
,REDIS_ZSET
)) return; 
 416     de 
= dictFind(zs
->dict
,c
->argv
[2]); 
 418         addReply(c
,shared
.czero
); 
 421     /* Delete from the skiplist */ 
 422     oldscore 
= dictGetEntryVal(de
); 
 423     deleted 
= zslDelete(zs
->zsl
,*oldscore
,c
->argv
[2]); 
 424     redisAssert(deleted 
!= 0); 
 426     /* Delete from the hash table */ 
 427     dictDelete(zs
->dict
,c
->argv
[2]); 
 428     if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
 429     if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 430     touchWatchedKey(c
->db
,c
->argv
[1]); 
 432     addReply(c
,shared
.cone
); 
 435 void zremrangebyscoreCommand(redisClient 
*c
) { 
 442     if ((getDoubleFromObjectOrReply(c
, c
->argv
[2], &min
, NULL
) != REDIS_OK
) || 
 443         (getDoubleFromObjectOrReply(c
, c
->argv
[3], &max
, NULL
) != REDIS_OK
)) return; 
 445     if ((zsetobj 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 446         checkType(c
,zsetobj
,REDIS_ZSET
)) return; 
 449     deleted 
= zslDeleteRangeByScore(zs
->zsl
,min
,max
,zs
->dict
); 
 450     if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
 451     if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 452     if (deleted
) touchWatchedKey(c
->db
,c
->argv
[1]); 
 453     server
.dirty 
+= deleted
; 
 454     addReplyLongLong(c
,deleted
); 
 457 void zremrangebyrankCommand(redisClient 
*c
) { 
 465     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
 466         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
 468     if ((zsetobj 
= lookupKeyWriteOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 469         checkType(c
,zsetobj
,REDIS_ZSET
)) return; 
 471     llen 
= zs
->zsl
->length
; 
 473     /* convert negative indexes */ 
 474     if (start 
< 0) start 
= llen
+start
; 
 475     if (end 
< 0) end 
= llen
+end
; 
 476     if (start 
< 0) start 
= 0; 
 478     /* Invariant: start >= 0, so this test will be true when end < 0. 
 479      * The range is empty when start > end or start >= length. */ 
 480     if (start 
> end 
|| start 
>= llen
) { 
 481         addReply(c
,shared
.czero
); 
 484     if (end 
>= llen
) end 
= llen
-1; 
 486     /* increment start and end because zsl*Rank functions 
 487      * use 1-based rank */ 
 488     deleted 
= zslDeleteRangeByRank(zs
->zsl
,start
+1,end
+1,zs
->dict
); 
 489     if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
 490     if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,c
->argv
[1]); 
 491     if (deleted
) touchWatchedKey(c
->db
,c
->argv
[1]); 
 492     server
.dirty 
+= deleted
; 
 493     addReplyLongLong(c
, deleted
); 
 501 int qsortCompareZsetopsrcByCardinality(const void *s1
, const void *s2
) { 
 502     zsetopsrc 
*d1 
= (void*) s1
, *d2 
= (void*) s2
; 
 503     unsigned long size1
, size2
; 
 504     size1 
= d1
->dict 
? dictSize(d1
->dict
) : 0; 
 505     size2 
= d2
->dict 
? dictSize(d2
->dict
) : 0; 
 506     return size1 
- size2
; 
 509 #define REDIS_AGGR_SUM 1 
 510 #define REDIS_AGGR_MIN 2 
 511 #define REDIS_AGGR_MAX 3 
 512 #define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e)) 
 514 inline static void zunionInterAggregate(double *target
, double val
, int aggregate
) { 
 515     if (aggregate 
== REDIS_AGGR_SUM
) { 
 516         *target 
= *target 
+ val
; 
 517         /* The result of adding two doubles is NaN when one variable 
 518          * is +inf and the other is -inf. When these numbers are added, 
 519          * we maintain the convention of the result being 0.0. */ 
 520         if (isnan(*target
)) *target 
= 0.0; 
 521     } else if (aggregate 
== REDIS_AGGR_MIN
) { 
 522         *target 
= val 
< *target 
? val 
: *target
; 
 523     } else if (aggregate 
== REDIS_AGGR_MAX
) { 
 524         *target 
= val 
> *target 
? val 
: *target
; 
 527         redisPanic("Unknown ZUNION/INTER aggregate type"); 
 531 void zunionInterGenericCommand(redisClient 
*c
, robj 
*dstkey
, int op
) { 
 533     int aggregate 
= REDIS_AGGR_SUM
; 
 541     /* expect setnum input keys to be given */ 
 542     setnum 
= atoi(c
->argv
[2]->ptr
); 
 544         addReplySds(c
,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n")); 
 548     /* test if the expected number of keys would overflow */ 
 549     if (3+setnum 
> c
->argc
) { 
 550         addReply(c
,shared
.syntaxerr
); 
 554     /* read keys to be used for input */ 
 555     src 
= zmalloc(sizeof(zsetopsrc
) * setnum
); 
 556     for (i 
= 0, j 
= 3; i 
< setnum
; i
++, j
++) { 
 557         robj 
*obj 
= lookupKeyWrite(c
->db
,c
->argv
[j
]); 
 561             if (obj
->type 
== REDIS_ZSET
) { 
 562                 src
[i
].dict 
= ((zset
*)obj
->ptr
)->dict
; 
 563             } else if (obj
->type 
== REDIS_SET
) { 
 564                 src
[i
].dict 
= (obj
->ptr
); 
 567                 addReply(c
,shared
.wrongtypeerr
); 
 572         /* default all weights to 1 */ 
 576     /* parse optional extra arguments */ 
 578         int remaining 
= c
->argc 
- j
; 
 581             if (remaining 
>= (setnum 
+ 1) && !strcasecmp(c
->argv
[j
]->ptr
,"weights")) { 
 583                 for (i 
= 0; i 
< setnum
; i
++, j
++, remaining
--) { 
 584                     if (getDoubleFromObjectOrReply(c
,c
->argv
[j
],&src
[i
].weight
, 
 585                             "weight value is not a double") != REDIS_OK
) 
 591             } else if (remaining 
>= 2 && !strcasecmp(c
->argv
[j
]->ptr
,"aggregate")) { 
 593                 if (!strcasecmp(c
->argv
[j
]->ptr
,"sum")) { 
 594                     aggregate 
= REDIS_AGGR_SUM
; 
 595                 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"min")) { 
 596                     aggregate 
= REDIS_AGGR_MIN
; 
 597                 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"max")) { 
 598                     aggregate 
= REDIS_AGGR_MAX
; 
 601                     addReply(c
,shared
.syntaxerr
); 
 607                 addReply(c
,shared
.syntaxerr
); 
 613     /* sort sets from the smallest to largest, this will improve our 
 614      * algorithm's performance */ 
 615     qsort(src
,setnum
,sizeof(zsetopsrc
),qsortCompareZsetopsrcByCardinality
); 
 617     dstobj 
= createZsetObject(); 
 618     dstzset 
= dstobj
->ptr
; 
 620     if (op 
== REDIS_OP_INTER
) { 
 621         /* skip going over all entries if the smallest zset is NULL or empty */ 
 622         if (src
[0].dict 
&& dictSize(src
[0].dict
) > 0) { 
 623             /* precondition: as src[0].dict is non-empty and the zsets are ordered 
 624              * from small to large, all src[i > 0].dict are non-empty too */ 
 625             di 
= dictGetIterator(src
[0].dict
); 
 626             while((de 
= dictNext(di
)) != NULL
) { 
 627                 double *score 
= zmalloc(sizeof(double)), value
; 
 628                 *score 
= src
[0].weight 
* zunionInterDictValue(de
); 
 630                 for (j 
= 1; j 
< setnum
; j
++) { 
 631                     dictEntry 
*other 
= dictFind(src
[j
].dict
,dictGetEntryKey(de
)); 
 633                         value 
= src
[j
].weight 
* zunionInterDictValue(other
); 
 634                         zunionInterAggregate(score
, value
, aggregate
); 
 640                 /* skip entry when not present in every source dict */ 
 644                     robj 
*o 
= dictGetEntryKey(de
); 
 645                     dictAdd(dstzset
->dict
,o
,score
); 
 646                     incrRefCount(o
); /* added to dictionary */ 
 647                     zslInsert(dstzset
->zsl
,*score
,o
); 
 648                     incrRefCount(o
); /* added to skiplist */ 
 651             dictReleaseIterator(di
); 
 653     } else if (op 
== REDIS_OP_UNION
) { 
 654         for (i 
= 0; i 
< setnum
; i
++) { 
 655             if (!src
[i
].dict
) continue; 
 657             di 
= dictGetIterator(src
[i
].dict
); 
 658             while((de 
= dictNext(di
)) != NULL
) { 
 659                 /* skip key when already processed */ 
 660                 if (dictFind(dstzset
->dict
,dictGetEntryKey(de
)) != NULL
) continue; 
 662                 double *score 
= zmalloc(sizeof(double)), value
; 
 663                 *score 
= src
[i
].weight 
* zunionInterDictValue(de
); 
 665                 /* because the zsets are sorted by size, its only possible 
 666                  * for sets at larger indices to hold this entry */ 
 667                 for (j 
= (i
+1); j 
< setnum
; j
++) { 
 668                     dictEntry 
*other 
= dictFind(src
[j
].dict
,dictGetEntryKey(de
)); 
 670                         value 
= src
[j
].weight 
* zunionInterDictValue(other
); 
 671                         zunionInterAggregate(score
, value
, aggregate
); 
 675                 robj 
*o 
= dictGetEntryKey(de
); 
 676                 dictAdd(dstzset
->dict
,o
,score
); 
 677                 incrRefCount(o
); /* added to dictionary */ 
 678                 zslInsert(dstzset
->zsl
,*score
,o
); 
 679                 incrRefCount(o
); /* added to skiplist */ 
 681             dictReleaseIterator(di
); 
 684         /* unknown operator */ 
 685         redisAssert(op 
== REDIS_OP_INTER 
|| op 
== REDIS_OP_UNION
); 
 688     if (dbDelete(c
->db
,dstkey
)) { 
 689         touchWatchedKey(c
->db
,dstkey
); 
 693     if (dstzset
->zsl
->length
) { 
 694         dbAdd(c
->db
,dstkey
,dstobj
); 
 695         addReplyLongLong(c
, dstzset
->zsl
->length
); 
 696         if (!touched
) touchWatchedKey(c
->db
,dstkey
); 
 699         decrRefCount(dstobj
); 
 700         addReply(c
, shared
.czero
); 
 705 void zunionstoreCommand(redisClient 
*c
) { 
 706     zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_UNION
); 
 709 void zinterstoreCommand(redisClient 
*c
) { 
 710     zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_INTER
); 
 713 void zrangeGenericCommand(redisClient 
*c
, int reverse
) { 
 725     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
 726         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
 728     if (c
->argc 
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) { 
 730     } else if (c
->argc 
>= 5) { 
 731         addReply(c
,shared
.syntaxerr
); 
 735     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.emptymultibulk
)) == NULL
 
 736          || checkType(c
,o
,REDIS_ZSET
)) return; 
 741     /* convert negative indexes */ 
 742     if (start 
< 0) start 
= llen
+start
; 
 743     if (end 
< 0) end 
= llen
+end
; 
 744     if (start 
< 0) start 
= 0; 
 746     /* Invariant: start >= 0, so this test will be true when end < 0. 
 747      * The range is empty when start > end or start >= length. */ 
 748     if (start 
> end 
|| start 
>= llen
) { 
 749         addReply(c
,shared
.emptymultibulk
); 
 752     if (end 
>= llen
) end 
= llen
-1; 
 753     rangelen 
= (end
-start
)+1; 
 755     /* check if starting point is trivial, before searching 
 756      * the element in log(N) time */ 
 758         ln 
= start 
== 0 ? zsl
->tail 
: zslistTypeGetElementByRank(zsl
, llen
-start
); 
 761             zsl
->header
->level
[0].forward 
: zslistTypeGetElementByRank(zsl
, start
+1); 
 764     /* Return the result in form of a multi-bulk reply */ 
 765     addReplySds(c
,sdscatprintf(sdsempty(),"*%d\r\n", 
 766         withscores 
? (rangelen
*2) : rangelen
)); 
 767     for (j 
= 0; j 
< rangelen
; j
++) { 
 771             addReplyDouble(c
,ln
->score
); 
 772         ln 
= reverse 
? ln
->backward 
: ln
->level
[0].forward
; 
 776 void zrangeCommand(redisClient 
*c
) { 
 777     zrangeGenericCommand(c
,0); 
 780 void zrevrangeCommand(redisClient 
*c
) { 
 781     zrangeGenericCommand(c
,1); 
 784 /* This command implements both ZRANGEBYSCORE and ZCOUNT. 
 785  * If justcount is non-zero, just the count is returned. */ 
 786 void genericZrangebyscoreCommand(redisClient 
*c
, int justcount
) { 
 789     int minex 
= 0, maxex 
= 0; /* are min or max exclusive? */ 
 790     int offset 
= 0, limit 
= -1; 
 794     /* Parse the min-max interval. If one of the values is prefixed 
 795      * by the "(" character, it's considered "open". For instance 
 796      * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max 
 797      * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ 
 798     if (((char*)c
->argv
[2]->ptr
)[0] == '(') { 
 799         min 
= strtod((char*)c
->argv
[2]->ptr
+1,NULL
); 
 802         min 
= strtod(c
->argv
[2]->ptr
,NULL
); 
 804     if (((char*)c
->argv
[3]->ptr
)[0] == '(') { 
 805         max 
= strtod((char*)c
->argv
[3]->ptr
+1,NULL
); 
 808         max 
= strtod(c
->argv
[3]->ptr
,NULL
); 
 811     /* Parse "WITHSCORES": note that if the command was called with 
 812      * the name ZCOUNT then we are sure that c->argc == 4, so we'll never 
 813      * enter the following paths to parse WITHSCORES and LIMIT. */ 
 814     if (c
->argc 
== 5 || c
->argc 
== 8) { 
 815         if (strcasecmp(c
->argv
[c
->argc
-1]->ptr
,"withscores") == 0) 
 820     if (c
->argc 
!= (4 + withscores
) && c
->argc 
!= (7 + withscores
)) 
 824             sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n")); 
 829     if (c
->argc 
== (7 + withscores
) && strcasecmp(c
->argv
[4]->ptr
,"limit")) { 
 830         addReply(c
,shared
.syntaxerr
); 
 832     } else if (c
->argc 
== (7 + withscores
)) { 
 833         offset 
= atoi(c
->argv
[5]->ptr
); 
 834         limit 
= atoi(c
->argv
[6]->ptr
); 
 835         if (offset 
< 0) offset 
= 0; 
 838     /* Ok, lookup the key and get the range */ 
 839     o 
= lookupKeyRead(c
->db
,c
->argv
[1]); 
 841         addReply(c
,justcount 
? shared
.czero 
: shared
.emptymultibulk
); 
 843         if (o
->type 
!= REDIS_ZSET
) { 
 844             addReply(c
,shared
.wrongtypeerr
); 
 846             zset 
*zsetobj 
= o
->ptr
; 
 847             zskiplist 
*zsl 
= zsetobj
->zsl
; 
 849             robj 
*ele
, *lenobj 
= NULL
; 
 850             unsigned long rangelen 
= 0; 
 852             /* Get the first node with the score >= min, or with 
 853              * score > min if 'minex' is true. */ 
 854             ln 
= zslFirstWithScore(zsl
,min
); 
 855             while (minex 
&& ln 
&& ln
->score 
== min
) ln 
= ln
->level
[0].forward
; 
 858                 /* No element matching the speciifed interval */ 
 859                 addReply(c
,justcount 
? shared
.czero 
: shared
.emptymultibulk
); 
 863             /* We don't know in advance how many matching elements there 
 864              * are in the list, so we push this object that will represent 
 865              * the multi-bulk length in the output buffer, and will "fix" 
 868                 lenobj 
= createObject(REDIS_STRING
,NULL
); 
 870                 decrRefCount(lenobj
); 
 873             while(ln 
&& (maxex 
? (ln
->score 
< max
) : (ln
->score 
<= max
))) { 
 876                     ln 
= ln
->level
[0].forward
; 
 879                 if (limit 
== 0) break; 
 884                         addReplyDouble(c
,ln
->score
); 
 886                 ln 
= ln
->level
[0].forward
; 
 888                 if (limit 
> 0) limit
--; 
 891                 addReplyLongLong(c
,(long)rangelen
); 
 893                 lenobj
->ptr 
= sdscatprintf(sdsempty(),"*%lu\r\n", 
 894                      withscores 
? (rangelen
*2) : rangelen
); 
 900 void zrangebyscoreCommand(redisClient 
*c
) { 
 901     genericZrangebyscoreCommand(c
,0); 
 904 void zcountCommand(redisClient 
*c
) { 
 905     genericZrangebyscoreCommand(c
,1); 
 908 void zcardCommand(redisClient 
*c
) { 
 912     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.czero
)) == NULL 
|| 
 913         checkType(c
,o
,REDIS_ZSET
)) return; 
 916     addReplyUlong(c
,zs
->zsl
->length
); 
 919 void zscoreCommand(redisClient 
*c
) { 
 924     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
 925         checkType(c
,o
,REDIS_ZSET
)) return; 
 928     de 
= dictFind(zs
->dict
,c
->argv
[2]); 
 930         addReply(c
,shared
.nullbulk
); 
 932         double *score 
= dictGetEntryVal(de
); 
 934         addReplyDouble(c
,*score
); 
 938 void zrankGenericCommand(redisClient 
*c
, int reverse
) { 
 946     if ((o 
= lookupKeyReadOrReply(c
,c
->argv
[1],shared
.nullbulk
)) == NULL 
|| 
 947         checkType(c
,o
,REDIS_ZSET
)) return; 
 951     de 
= dictFind(zs
->dict
,c
->argv
[2]); 
 953         addReply(c
,shared
.nullbulk
); 
 957     score 
= dictGetEntryVal(de
); 
 958     rank 
= zslistTypeGetRank(zsl
, *score
, c
->argv
[2]); 
 961             addReplyLongLong(c
, zsl
->length 
- rank
); 
 963             addReplyLongLong(c
, rank
-1); 
 966         addReply(c
,shared
.nullbulk
); 
 970 void zrankCommand(redisClient 
*c
) { 
 971     zrankGenericCommand(c
, 0); 
 974 void zrevrankCommand(redisClient 
*c
) { 
 975     zrankGenericCommand(c
, 1);