]>
git.saurik.com Git - redis.git/blob - src/t_zset.c
50ad8d433d2d7988dff5cd0829ad70c3638d5cca
   5 /*----------------------------------------------------------------------------- 
   7  *----------------------------------------------------------------------------*/ 
   9 /* ZSETs are ordered sets using two data structures to hold the same elements 
  10  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted 
  13  * The elements are added to an hash table mapping Redis objects to scores. 
  14  * At the same time the elements are added to a skip list mapping scores 
  15  * to Redis objects (so objects are sorted by scores in this "view"). */ 
  17 /* This skiplist implementation is almost a C translation of the original 
  18  * algorithm described by William Pugh in "Skip Lists: A Probabilistic 
  19  * Alternative to Balanced Trees", modified in three ways: 
  20  * a) this implementation allows for repeated scores. 
  21  * b) the comparison is not just by key (our 'score') but by satellite data. 
  22  * c) there is a back pointer, so it's a doubly linked list with the back 
  23  * pointers being only at "level 1". This allows to traverse the list 
  24  * from tail to head, useful for ZREVRANGE. */ 
  26 zskiplistNode 
*zslCreateNode(int level
, double score
, robj 
*obj
) { 
  27     zskiplistNode 
*zn 
= zmalloc(sizeof(*zn
)+level
*sizeof(struct zskiplistLevel
)); 
  33 zskiplist 
*zslCreate(void) { 
  37     zsl 
= zmalloc(sizeof(*zsl
)); 
  40     zsl
->header 
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
); 
  41     for (j 
= 0; j 
< ZSKIPLIST_MAXLEVEL
; j
++) { 
  42         zsl
->header
->level
[j
].forward 
= NULL
; 
  43         zsl
->header
->level
[j
].span 
= 0; 
  45     zsl
->header
->backward 
= NULL
; 
  50 void zslFreeNode(zskiplistNode 
*node
) { 
  51     decrRefCount(node
->obj
); 
  55 void zslFree(zskiplist 
*zsl
) { 
  56     zskiplistNode 
*node 
= zsl
->header
->level
[0].forward
, *next
; 
  60         next 
= node
->level
[0].forward
; 
  67 /* Returns a random level for the new skiplist node we are going to create. 
  68  * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL 
  69  * (both inclusive), with a powerlaw-alike distribution where higher 
  70  * levels are less likely to be returned. */ 
  71 int zslRandomLevel(void) { 
  73     while ((random()&0xFFFF) < (ZSKIPLIST_P 
* 0xFFFF)) 
  75     return (level
<ZSKIPLIST_MAXLEVEL
) ? level 
: ZSKIPLIST_MAXLEVEL
; 
  78 zskiplistNode 
*zslInsert(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
  79     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
  80     unsigned int rank
[ZSKIPLIST_MAXLEVEL
]; 
  83     redisAssert(!isnan(score
)); 
  85     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
  86         /* store rank that is crossed to reach the insert position */ 
  87         rank
[i
] = i 
== (zsl
->level
-1) ? 0 : rank
[i
+1]; 
  88         while (x
->level
[i
].forward 
&& 
  89             (x
->level
[i
].forward
->score 
< score 
|| 
  90                 (x
->level
[i
].forward
->score 
== score 
&& 
  91                 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) { 
  92             rank
[i
] += x
->level
[i
].span
; 
  93             x 
= x
->level
[i
].forward
; 
  97     /* we assume the key is not already inside, since we allow duplicated 
  98      * scores, and the re-insertion of score and redis object should never 
  99      * happpen since the caller of zslInsert() should test in the hash table 
 100      * if the element is already inside or not. */ 
 101     level 
= zslRandomLevel(); 
 102     if (level 
> zsl
->level
) { 
 103         for (i 
= zsl
->level
; i 
< level
; i
++) { 
 105             update
[i
] = zsl
->header
; 
 106             update
[i
]->level
[i
].span 
= zsl
->length
; 
 110     x 
= zslCreateNode(level
,score
,obj
); 
 111     for (i 
= 0; i 
< level
; i
++) { 
 112         x
->level
[i
].forward 
= update
[i
]->level
[i
].forward
; 
 113         update
[i
]->level
[i
].forward 
= x
; 
 115         /* update span covered by update[i] as x is inserted here */ 
 116         x
->level
[i
].span 
= update
[i
]->level
[i
].span 
- (rank
[0] - rank
[i
]); 
 117         update
[i
]->level
[i
].span 
= (rank
[0] - rank
[i
]) + 1; 
 120     /* increment span for untouched levels */ 
 121     for (i 
= level
; i 
< zsl
->level
; i
++) { 
 122         update
[i
]->level
[i
].span
++; 
 125     x
->backward 
= (update
[0] == zsl
->header
) ? NULL 
: update
[0]; 
 126     if (x
->level
[0].forward
) 
 127         x
->level
[0].forward
->backward 
= x
; 
 134 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */ 
 135 void zslDeleteNode(zskiplist 
*zsl
, zskiplistNode 
*x
, zskiplistNode 
**update
) { 
 137     for (i 
= 0; i 
< zsl
->level
; i
++) { 
 138         if (update
[i
]->level
[i
].forward 
== x
) { 
 139             update
[i
]->level
[i
].span 
+= x
->level
[i
].span 
- 1; 
 140             update
[i
]->level
[i
].forward 
= x
->level
[i
].forward
; 
 142             update
[i
]->level
[i
].span 
-= 1; 
 145     if (x
->level
[0].forward
) { 
 146         x
->level
[0].forward
->backward 
= x
->backward
; 
 148         zsl
->tail 
= x
->backward
; 
 150     while(zsl
->level 
> 1 && zsl
->header
->level
[zsl
->level
-1].forward 
== NULL
) 
 155 /* Delete an element with matching score/object from the skiplist. */ 
 156 int zslDelete(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
 157     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 161     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 162         while (x
->level
[i
].forward 
&& 
 163             (x
->level
[i
].forward
->score 
< score 
|| 
 164                 (x
->level
[i
].forward
->score 
== score 
&& 
 165                 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) 
 166             x 
= x
->level
[i
].forward
; 
 169     /* We may have multiple elements with the same score, what we need 
 170      * is to find the element with both the right score and object. */ 
 171     x 
= x
->level
[0].forward
; 
 172     if (x 
&& score 
== x
->score 
&& equalStringObjects(x
->obj
,obj
)) { 
 173         zslDeleteNode(zsl
, x
, update
); 
 177         return 0; /* not found */ 
 179     return 0; /* not found */ 
 182 static int zslValueGteMin(double value
, zrangespec 
*spec
) { 
 183     return spec
->minex 
? (value 
> spec
->min
) : (value 
>= spec
->min
); 
 186 static int zslValueLteMax(double value
, zrangespec 
*spec
) { 
 187     return spec
->maxex 
? (value 
< spec
->max
) : (value 
<= spec
->max
); 
 190 /* Returns if there is a part of the zset is in range. */ 
 191 int zslIsInRange(zskiplist 
*zsl
, zrangespec 
*range
) { 
 194     /* Test for ranges that will always be empty. */ 
 195     if (range
->min 
> range
->max 
|| 
 196             (range
->min 
== range
->max 
&& (range
->minex 
|| range
->maxex
))) 
 199     if (x 
== NULL 
|| !zslValueGteMin(x
->score
,range
)) 
 201     x 
= zsl
->header
->level
[0].forward
; 
 202     if (x 
== NULL 
|| !zslValueLteMax(x
->score
,range
)) 
 207 /* Find the first node that is contained in the specified range. 
 208  * Returns NULL when no element is contained in the range. */ 
 209 zskiplistNode 
*zslFirstInRange(zskiplist 
*zsl
, zrangespec range
) { 
 213     /* If everything is out of range, return early. */ 
 214     if (!zslIsInRange(zsl
,&range
)) return NULL
; 
 217     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 218         /* Go forward while *OUT* of range. */ 
 219         while (x
->level
[i
].forward 
&& 
 220             !zslValueGteMin(x
->level
[i
].forward
->score
,&range
)) 
 221                 x 
= x
->level
[i
].forward
; 
 224     /* This is an inner range, so the next node cannot be NULL. */ 
 225     x 
= x
->level
[0].forward
; 
 226     redisAssert(x 
!= NULL
); 
 228     /* Check if score <= max. */ 
 229     if (!zslValueLteMax(x
->score
,&range
)) return NULL
; 
 233 /* Find the last node that is contained in the specified range. 
 234  * Returns NULL when no element is contained in the range. */ 
 235 zskiplistNode 
*zslLastInRange(zskiplist 
*zsl
, zrangespec range
) { 
 239     /* If everything is out of range, return early. */ 
 240     if (!zslIsInRange(zsl
,&range
)) return NULL
; 
 243     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 244         /* Go forward while *IN* range. */ 
 245         while (x
->level
[i
].forward 
&& 
 246             zslValueLteMax(x
->level
[i
].forward
->score
,&range
)) 
 247                 x 
= x
->level
[i
].forward
; 
 250     /* This is an inner range, so this node cannot be NULL. */ 
 251     redisAssert(x 
!= NULL
); 
 253     /* Check if score >= min. */ 
 254     if (!zslValueGteMin(x
->score
,&range
)) return NULL
; 
 258 /* Delete all the elements with score between min and max from the skiplist. 
 259  * Min and mx are inclusive, so a score >= min || score <= max is deleted. 
 260  * Note that this function takes the reference to the hash table view of the 
 261  * sorted set, in order to remove the elements from the hash table too. */ 
 262 unsigned long zslDeleteRangeByScore(zskiplist 
*zsl
, zrangespec range
, dict 
*dict
) { 
 263     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 264     unsigned long removed 
= 0; 
 268     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 269         while (x
->level
[i
].forward 
&& (range
.minex 
? 
 270             x
->level
[i
].forward
->score 
<= range
.min 
: 
 271             x
->level
[i
].forward
->score 
< range
.min
)) 
 272                 x 
= x
->level
[i
].forward
; 
 276     /* Current node is the last with score < or <= min. */ 
 277     x 
= x
->level
[0].forward
; 
 279     /* Delete nodes while in range. */ 
 280     while (x 
&& (range
.maxex 
? x
->score 
< range
.max 
: x
->score 
<= range
.max
)) { 
 281         zskiplistNode 
*next 
= x
->level
[0].forward
; 
 282         zslDeleteNode(zsl
,x
,update
); 
 283         dictDelete(dict
,x
->obj
); 
 291 /* Delete all the elements with rank between start and end from the skiplist. 
 292  * Start and end are inclusive. Note that start and end need to be 1-based */ 
 293 unsigned long zslDeleteRangeByRank(zskiplist 
*zsl
, unsigned int start
, unsigned int end
, dict 
*dict
) { 
 294     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 295     unsigned long traversed 
= 0, removed 
= 0; 
 299     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 300         while (x
->level
[i
].forward 
&& (traversed 
+ x
->level
[i
].span
) < start
) { 
 301             traversed 
+= x
->level
[i
].span
; 
 302             x 
= x
->level
[i
].forward
; 
 308     x 
= x
->level
[0].forward
; 
 309     while (x 
&& traversed 
<= end
) { 
 310         zskiplistNode 
*next 
= x
->level
[0].forward
; 
 311         zslDeleteNode(zsl
,x
,update
); 
 312         dictDelete(dict
,x
->obj
); 
 321 /* Find the rank for an element by both score and key. 
 322  * Returns 0 when the element cannot be found, rank otherwise. 
 323  * Note that the rank is 1-based due to the span of zsl->header to the 
 325 unsigned long zslGetRank(zskiplist 
*zsl
, double score
, robj 
*o
) { 
 327     unsigned long rank 
= 0; 
 331     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 332         while (x
->level
[i
].forward 
&& 
 333             (x
->level
[i
].forward
->score 
< score 
|| 
 334                 (x
->level
[i
].forward
->score 
== score 
&& 
 335                 compareStringObjects(x
->level
[i
].forward
->obj
,o
) <= 0))) { 
 336             rank 
+= x
->level
[i
].span
; 
 337             x 
= x
->level
[i
].forward
; 
 340         /* x might be equal to zsl->header, so test if obj is non-NULL */ 
 341         if (x
->obj 
&& equalStringObjects(x
->obj
,o
)) { 
 348 /* Finds an element by its rank. The rank argument needs to be 1-based. */ 
 349 zskiplistNode
* zslGetElementByRank(zskiplist 
*zsl
, unsigned long rank
) { 
 351     unsigned long traversed 
= 0; 
 355     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 356         while (x
->level
[i
].forward 
&& (traversed 
+ x
->level
[i
].span
) <= rank
) 
 358             traversed 
+= x
->level
[i
].span
; 
 359             x 
= x
->level
[i
].forward
; 
 361         if (traversed 
== rank
) { 
 368 /* Populate the rangespec according to the objects min and max. */ 
 369 static int zslParseRange(robj 
*min
, robj 
*max
, zrangespec 
*spec
) { 
 371     spec
->minex 
= spec
->maxex 
= 0; 
 373     /* Parse the min-max interval. If one of the values is prefixed 
 374      * by the "(" character, it's considered "open". For instance 
 375      * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max 
 376      * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ 
 377     if (min
->encoding 
== REDIS_ENCODING_INT
) { 
 378         spec
->min 
= (long)min
->ptr
; 
 380         if (((char*)min
->ptr
)[0] == '(') { 
 381             spec
->min 
= strtod((char*)min
->ptr
+1,&eptr
); 
 382             if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
; 
 385             spec
->min 
= strtod((char*)min
->ptr
,&eptr
); 
 386             if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
; 
 389     if (max
->encoding 
== REDIS_ENCODING_INT
) { 
 390         spec
->max 
= (long)max
->ptr
; 
 392         if (((char*)max
->ptr
)[0] == '(') { 
 393             spec
->max 
= strtod((char*)max
->ptr
+1,&eptr
); 
 394             if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
; 
 397             spec
->max 
= strtod((char*)max
->ptr
,&eptr
); 
 398             if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
; 
 405 /*----------------------------------------------------------------------------- 
 406  * Ziplist-backed sorted set API 
 407  *----------------------------------------------------------------------------*/ 
 409 double zzlGetScore(unsigned char *sptr
) { 
 416     redisAssert(sptr 
!= NULL
); 
 417     redisAssert(ziplistGet(sptr
,&vstr
,&vlen
,&vlong
)); 
 420         memcpy(buf
,vstr
,vlen
); 
 422         score 
= strtod(buf
,NULL
); 
 430 /* Compare element in sorted set with given element. */ 
 431 int zzlCompareElements(unsigned char *eptr
, unsigned char *cstr
, unsigned int clen
) { 
 435     unsigned char vbuf
[32]; 
 438     redisAssert(ziplistGet(eptr
,&vstr
,&vlen
,&vlong
)); 
 440         /* Store string representation of long long in buf. */ 
 441         vlen 
= ll2string((char*)vbuf
,sizeof(vbuf
),vlong
); 
 445     minlen 
= (vlen 
< clen
) ? vlen 
: clen
; 
 446     cmp 
= memcmp(vstr
,cstr
,minlen
); 
 447     if (cmp 
== 0) return vlen
-clen
; 
 451 unsigned int zzlLength(unsigned char *zl
) { 
 452     return ziplistLen(zl
)/2; 
 455 /* Move to next entry based on the values in eptr and sptr. Both are set to 
 456  * NULL when there is no next entry. */ 
 457 void zzlNext(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) { 
 458     unsigned char *_eptr
, *_sptr
; 
 459     redisAssert(*eptr 
!= NULL 
&& *sptr 
!= NULL
); 
 461     _eptr 
= ziplistNext(zl
,*sptr
); 
 463         _sptr 
= ziplistNext(zl
,_eptr
); 
 464         redisAssert(_sptr 
!= NULL
); 
 474 /* Move to the previous entry based on the values in eptr and sptr. Both are 
 475  * set to NULL when there is no next entry. */ 
 476 void zzlPrev(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) { 
 477     unsigned char *_eptr
, *_sptr
; 
 478     redisAssert(*eptr 
!= NULL 
&& *sptr 
!= NULL
); 
 480     _sptr 
= ziplistPrev(zl
,*eptr
); 
 482         _eptr 
= ziplistPrev(zl
,_sptr
); 
 483         redisAssert(_eptr 
!= NULL
); 
 485         /* No previous entry. */ 
 493 /* Returns if there is a part of the zset is in range. Should only be used 
 494  * internally by zzlFirstInRange and zzlLastInRange. */ 
 495 int zzlIsInRange(unsigned char *zl
, zrangespec 
*range
) { 
 499     /* Test for ranges that will always be empty. */ 
 500     if (range
->min 
> range
->max 
|| 
 501             (range
->min 
== range
->max 
&& (range
->minex 
|| range
->maxex
))) 
 504     p 
= ziplistIndex(zl
,-1); /* Last score. */ 
 505     if (p 
== NULL
) return 0; /* Empty sorted set */ 
 506     score 
= zzlGetScore(p
); 
 507     if (!zslValueGteMin(score
,range
)) 
 510     p 
= ziplistIndex(zl
,1); /* First score. */ 
 511     redisAssert(p 
!= NULL
); 
 512     score 
= zzlGetScore(p
); 
 513     if (!zslValueLteMax(score
,range
)) 
 519 /* Find pointer to the first element contained in the specified range. 
 520  * Returns NULL when no element is contained in the range. */ 
 521 unsigned char *zzlFirstInRange(unsigned char *zl
, zrangespec range
) { 
 522     unsigned char *eptr 
= ziplistIndex(zl
,0), *sptr
; 
 525     /* If everything is out of range, return early. */ 
 526     if (!zzlIsInRange(zl
,&range
)) return NULL
; 
 528     while (eptr 
!= NULL
) { 
 529         sptr 
= ziplistNext(zl
,eptr
); 
 530         redisAssert(sptr 
!= NULL
); 
 532         score 
= zzlGetScore(sptr
); 
 533         if (zslValueGteMin(score
,&range
)) { 
 534             /* Check if score <= max. */ 
 535             if (zslValueLteMax(score
,&range
)) 
 540         /* Move to next element. */ 
 541         eptr 
= ziplistNext(zl
,sptr
); 
 547 /* Find pointer to the last element contained in the specified range. 
 548  * Returns NULL when no element is contained in the range. */ 
 549 unsigned char *zzlLastInRange(unsigned char *zl
, zrangespec range
) { 
 550     unsigned char *eptr 
= ziplistIndex(zl
,-2), *sptr
; 
 553     /* If everything is out of range, return early. */ 
 554     if (!zzlIsInRange(zl
,&range
)) return NULL
; 
 556     while (eptr 
!= NULL
) { 
 557         sptr 
= ziplistNext(zl
,eptr
); 
 558         redisAssert(sptr 
!= NULL
); 
 560         score 
= zzlGetScore(sptr
); 
 561         if (zslValueLteMax(score
,&range
)) { 
 562             /* Check if score >= min. */ 
 563             if (zslValueGteMin(score
,&range
)) 
 568         /* Move to previous element by moving to the score of previous element. 
 569          * When this returns NULL, we know there also is no element. */ 
 570         sptr 
= ziplistPrev(zl
,eptr
); 
 572             redisAssert((eptr 
= ziplistPrev(zl
,sptr
)) != NULL
); 
 580 unsigned char *zzlFind(unsigned char *zl
, robj 
*ele
, double *score
) { 
 581     unsigned char *eptr 
= ziplistIndex(zl
,0), *sptr
; 
 583     ele 
= getDecodedObject(ele
); 
 584     while (eptr 
!= NULL
) { 
 585         sptr 
= ziplistNext(zl
,eptr
); 
 586         redisAssertWithInfo(NULL
,ele
,sptr 
!= NULL
); 
 588         if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
))) { 
 589             /* Matching element, pull out score. */ 
 590             if (score 
!= NULL
) *score 
= zzlGetScore(sptr
); 
 595         /* Move to next element. */ 
 596         eptr 
= ziplistNext(zl
,sptr
); 
 603 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we 
 604  * don't want to modify the one given as argument. */ 
 605 unsigned char *zzlDelete(unsigned char *zl
, unsigned char *eptr
) { 
 606     unsigned char *p 
= eptr
; 
 608     /* TODO: add function to ziplist API to delete N elements from offset. */ 
 609     zl 
= ziplistDelete(zl
,&p
); 
 610     zl 
= ziplistDelete(zl
,&p
); 
 614 unsigned char *zzlInsertAt(unsigned char *zl
, unsigned char *eptr
, robj 
*ele
, double score
) { 
 620     redisAssertWithInfo(NULL
,ele
,ele
->encoding 
== REDIS_ENCODING_RAW
); 
 621     scorelen 
= d2string(scorebuf
,sizeof(scorebuf
),score
); 
 623         zl 
= ziplistPush(zl
,ele
->ptr
,sdslen(ele
->ptr
),ZIPLIST_TAIL
); 
 624         zl 
= ziplistPush(zl
,(unsigned char*)scorebuf
,scorelen
,ZIPLIST_TAIL
); 
 626         /* Keep offset relative to zl, as it might be re-allocated. */ 
 628         zl 
= ziplistInsert(zl
,eptr
,ele
->ptr
,sdslen(ele
->ptr
)); 
 631         /* Insert score after the element. */ 
 632         redisAssertWithInfo(NULL
,ele
,(sptr 
= ziplistNext(zl
,eptr
)) != NULL
); 
 633         zl 
= ziplistInsert(zl
,sptr
,(unsigned char*)scorebuf
,scorelen
); 
 639 /* Insert (element,score) pair in ziplist. This function assumes the element is 
 640  * not yet present in the list. */ 
 641 unsigned char *zzlInsert(unsigned char *zl
, robj 
*ele
, double score
) { 
 642     unsigned char *eptr 
= ziplistIndex(zl
,0), *sptr
; 
 645     ele 
= getDecodedObject(ele
); 
 646     while (eptr 
!= NULL
) { 
 647         sptr 
= ziplistNext(zl
,eptr
); 
 648         redisAssertWithInfo(NULL
,ele
,sptr 
!= NULL
); 
 649         s 
= zzlGetScore(sptr
); 
 652             /* First element with score larger than score for element to be 
 653              * inserted. This means we should take its spot in the list to 
 654              * maintain ordering. */ 
 655             zl 
= zzlInsertAt(zl
,eptr
,ele
,score
); 
 657         } else if (s 
== score
) { 
 658             /* Ensure lexicographical ordering for elements. */ 
 659             if (zzlCompareElements(eptr
,ele
->ptr
,sdslen(ele
->ptr
)) > 0) { 
 660                 zl 
= zzlInsertAt(zl
,eptr
,ele
,score
); 
 665         /* Move to next element. */ 
 666         eptr 
= ziplistNext(zl
,sptr
); 
 669     /* Push on tail of list when it was not yet inserted. */ 
 671         zl 
= zzlInsertAt(zl
,NULL
,ele
,score
); 
 677 unsigned char *zzlDeleteRangeByScore(unsigned char *zl
, zrangespec range
, unsigned long *deleted
) { 
 678     unsigned char *eptr
, *sptr
; 
 680     unsigned long num 
= 0; 
 682     if (deleted 
!= NULL
) *deleted 
= 0; 
 684     eptr 
= zzlFirstInRange(zl
,range
); 
 685     if (eptr 
== NULL
) return zl
; 
 687     /* When the tail of the ziplist is deleted, eptr will point to the sentinel 
 688      * byte and ziplistNext will return NULL. */ 
 689     while ((sptr 
= ziplistNext(zl
,eptr
)) != NULL
) { 
 690         score 
= zzlGetScore(sptr
); 
 691         if (zslValueLteMax(score
,&range
)) { 
 692             /* Delete both the element and the score. */ 
 693             zl 
= ziplistDelete(zl
,&eptr
); 
 694             zl 
= ziplistDelete(zl
,&eptr
); 
 697             /* No longer in range. */ 
 702     if (deleted 
!= NULL
) *deleted 
= num
; 
 706 /* Delete all the elements with rank between start and end from the skiplist. 
 707  * Start and end are inclusive. Note that start and end need to be 1-based */ 
 708 unsigned char *zzlDeleteRangeByRank(unsigned char *zl
, unsigned int start
, unsigned int end
, unsigned long *deleted
) { 
 709     unsigned int num 
= (end
-start
)+1; 
 710     if (deleted
) *deleted 
= num
; 
 711     zl 
= ziplistDeleteRange(zl
,2*(start
-1),2*num
); 
 715 /*----------------------------------------------------------------------------- 
 716  * Common sorted set API 
 717  *----------------------------------------------------------------------------*/ 
 719 unsigned int zsetLength(robj 
*zobj
) { 
 721     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 722         length 
= zzlLength(zobj
->ptr
); 
 723     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
 724         length 
= ((zset
*)zobj
->ptr
)->zsl
->length
; 
 726         redisPanic("Unknown sorted set encoding"); 
 731 void zsetConvert(robj 
*zobj
, int encoding
) { 
 733     zskiplistNode 
*node
, *next
; 
 737     if (zobj
->encoding 
== encoding
) return; 
 738     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 739         unsigned char *zl 
= zobj
->ptr
; 
 740         unsigned char *eptr
, *sptr
; 
 745         if (encoding 
!= REDIS_ENCODING_SKIPLIST
) 
 746             redisPanic("Unknown target encoding"); 
 748         zs 
= zmalloc(sizeof(*zs
)); 
 749         zs
->dict 
= dictCreate(&zsetDictType
,NULL
); 
 750         zs
->zsl 
= zslCreate(); 
 752         eptr 
= ziplistIndex(zl
,0); 
 753         redisAssertWithInfo(NULL
,zobj
,eptr 
!= NULL
); 
 754         sptr 
= ziplistNext(zl
,eptr
); 
 755         redisAssertWithInfo(NULL
,zobj
,sptr 
!= NULL
); 
 757         while (eptr 
!= NULL
) { 
 758             score 
= zzlGetScore(sptr
); 
 759             redisAssertWithInfo(NULL
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
)); 
 761                 ele 
= createStringObjectFromLongLong(vlong
); 
 763                 ele 
= createStringObject((char*)vstr
,vlen
); 
 765             /* Has incremented refcount since it was just created. */ 
 766             node 
= zslInsert(zs
->zsl
,score
,ele
); 
 767             redisAssertWithInfo(NULL
,zobj
,dictAdd(zs
->dict
,ele
,&node
->score
) == DICT_OK
); 
 768             incrRefCount(ele
); /* Added to dictionary. */ 
 769             zzlNext(zl
,&eptr
,&sptr
); 
 774         zobj
->encoding 
= REDIS_ENCODING_SKIPLIST
; 
 775     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
 776         unsigned char *zl 
= ziplistNew(); 
 778         if (encoding 
!= REDIS_ENCODING_ZIPLIST
) 
 779             redisPanic("Unknown target encoding"); 
 781         /* Approach similar to zslFree(), since we want to free the skiplist at 
 782          * the same time as creating the ziplist. */ 
 784         dictRelease(zs
->dict
); 
 785         node 
= zs
->zsl
->header
->level
[0].forward
; 
 786         zfree(zs
->zsl
->header
); 
 790             ele 
= getDecodedObject(node
->obj
); 
 791             zl 
= zzlInsertAt(zl
,NULL
,ele
,node
->score
); 
 794             next 
= node
->level
[0].forward
; 
 801         zobj
->encoding 
= REDIS_ENCODING_ZIPLIST
; 
 803         redisPanic("Unknown sorted set encoding"); 
 807 /*----------------------------------------------------------------------------- 
 808  * Sorted set commands  
 809  *----------------------------------------------------------------------------*/ 
 811 /* This generic command implements both ZADD and ZINCRBY. */ 
 812 void zaddGenericCommand(redisClient 
*c
, int incr
) { 
 813     static char *nanerr 
= "resulting score is not a number (NaN)"; 
 814     robj 
*key 
= c
->argv
[1]; 
 818     double score 
= 0, *scores
, curscore 
= 0.0; 
 819     int j
, elements 
= (c
->argc
-2)/2; 
 823         addReply(c
,shared
.syntaxerr
); 
 827     /* Start parsing all the scores, we need to emit any syntax error 
 828      * before executing additions to the sorted set, as the command should 
 829      * either execute fully or nothing at all. */ 
 830     scores 
= zmalloc(sizeof(double)*elements
); 
 831     for (j 
= 0; j 
< elements
; j
++) { 
 832         if (getDoubleFromObjectOrReply(c
,c
->argv
[2+j
*2],&scores
[j
],NULL
) 
 840     /* Lookup the key and create the sorted set if does not exist. */ 
 841     zobj 
= lookupKeyWrite(c
->db
,key
); 
 843         if (server
.zset_max_ziplist_entries 
== 0 || 
 844             server
.zset_max_ziplist_value 
< sdslen(c
->argv
[3]->ptr
)) 
 846             zobj 
= createZsetObject(); 
 848             zobj 
= createZsetZiplistObject(); 
 850         dbAdd(c
->db
,key
,zobj
); 
 852         if (zobj
->type 
!= REDIS_ZSET
) { 
 853             addReply(c
,shared
.wrongtypeerr
); 
 859     for (j 
= 0; j 
< elements
; j
++) { 
 862         if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 865             /* Prefer non-encoded element when dealing with ziplists. */ 
 866             ele 
= c
->argv
[3+j
*2]; 
 867             if ((eptr 
= zzlFind(zobj
->ptr
,ele
,&curscore
)) != NULL
) { 
 871                         addReplyError(c
,nanerr
); 
 872                         /* Don't need to check if the sorted set is empty 
 873                          * because we know it has at least one element. */ 
 879                 /* Remove and re-insert when score changed. */ 
 880                 if (score 
!= curscore
) { 
 881                     zobj
->ptr 
= zzlDelete(zobj
->ptr
,eptr
); 
 882                     zobj
->ptr 
= zzlInsert(zobj
->ptr
,ele
,score
); 
 884                     signalModifiedKey(c
->db
,key
); 
 888                 /* Optimize: check if the element is too large or the list 
 889                  * becomes too long *before* executing zzlInsert. */ 
 890                 zobj
->ptr 
= zzlInsert(zobj
->ptr
,ele
,score
); 
 891                 if (zzlLength(zobj
->ptr
) > server
.zset_max_ziplist_entries
) 
 892                     zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
); 
 893                 if (sdslen(ele
->ptr
) > server
.zset_max_ziplist_value
) 
 894                     zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
); 
 896                 signalModifiedKey(c
->db
,key
); 
 900         } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
 901             zset 
*zs 
= zobj
->ptr
; 
 902             zskiplistNode 
*znode
; 
 905             ele 
= c
->argv
[3+j
*2] = tryObjectEncoding(c
->argv
[3+j
*2]); 
 906             de 
= dictFind(zs
->dict
,ele
); 
 908                 curobj 
= dictGetKey(de
); 
 909                 curscore 
= *(double*)dictGetVal(de
); 
 914                         addReplyError(c
,nanerr
); 
 915                         /* Don't need to check if the sorted set is empty 
 916                          * because we know it has at least one element. */ 
 922                 /* Remove and re-insert when score changed. We can safely 
 923                  * delete the key object from the skiplist, since the 
 924                  * dictionary still has a reference to it. */ 
 925                 if (score 
!= curscore
) { 
 926                     redisAssertWithInfo(c
,curobj
,zslDelete(zs
->zsl
,curscore
,curobj
)); 
 927                     znode 
= zslInsert(zs
->zsl
,score
,curobj
); 
 928                     incrRefCount(curobj
); /* Re-inserted in skiplist. */ 
 929                     dictGetVal(de
) = &znode
->score
; /* Update score ptr. */ 
 931                     signalModifiedKey(c
->db
,key
); 
 935                 znode 
= zslInsert(zs
->zsl
,score
,ele
); 
 936                 incrRefCount(ele
); /* Inserted in skiplist. */ 
 937                 redisAssertWithInfo(c
,NULL
,dictAdd(zs
->dict
,ele
,&znode
->score
) == DICT_OK
); 
 938                 incrRefCount(ele
); /* Added to dictionary. */ 
 940                 signalModifiedKey(c
->db
,key
); 
 945             redisPanic("Unknown sorted set encoding"); 
 949     if (incr
) /* ZINCRBY */ 
 950         addReplyDouble(c
,score
); 
 952         addReplyLongLong(c
,added
); 
 955 void zaddCommand(redisClient 
*c
) { 
 956     zaddGenericCommand(c
,0); 
 959 void zincrbyCommand(redisClient 
*c
) { 
 960     zaddGenericCommand(c
,1); 
 963 void zremCommand(redisClient 
*c
) { 
 964     robj 
*key 
= c
->argv
[1]; 
 968     if ((zobj 
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL 
|| 
 969         checkType(c
,zobj
,REDIS_ZSET
)) return; 
 971     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 974         for (j 
= 2; j 
< c
->argc
; j
++) { 
 975             if ((eptr 
= zzlFind(zobj
->ptr
,c
->argv
[j
],NULL
)) != NULL
) { 
 977                 zobj
->ptr 
= zzlDelete(zobj
->ptr
,eptr
); 
 978                 if (zzlLength(zobj
->ptr
) == 0) { 
 984     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
 985         zset 
*zs 
= zobj
->ptr
; 
 989         for (j 
= 2; j 
< c
->argc
; j
++) { 
 990             de 
= dictFind(zs
->dict
,c
->argv
[j
]); 
 994                 /* Delete from the skiplist */ 
 995                 score 
= *(double*)dictGetVal(de
); 
 996                 redisAssertWithInfo(c
,c
->argv
[j
],zslDelete(zs
->zsl
,score
,c
->argv
[j
])); 
 998                 /* Delete from the hash table */ 
 999                 dictDelete(zs
->dict
,c
->argv
[j
]); 
1000                 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
1001                 if (dictSize(zs
->dict
) == 0) { 
1002                     dbDelete(c
->db
,key
); 
1008         redisPanic("Unknown sorted set encoding"); 
1012         signalModifiedKey(c
->db
,key
); 
1013         server
.dirty 
+= deleted
; 
1015     addReplyLongLong(c
,deleted
); 
1018 void zremrangebyscoreCommand(redisClient 
*c
) { 
1019     robj 
*key 
= c
->argv
[1]; 
1022     unsigned long deleted
; 
1024     /* Parse the range arguments. */ 
1025     if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) { 
1026         addReplyError(c
,"min or max is not a float"); 
1030     if ((zobj 
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL 
|| 
1031         checkType(c
,zobj
,REDIS_ZSET
)) return; 
1033     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1034         zobj
->ptr 
= zzlDeleteRangeByScore(zobj
->ptr
,range
,&deleted
); 
1035         if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
); 
1036     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1037         zset 
*zs 
= zobj
->ptr
; 
1038         deleted 
= zslDeleteRangeByScore(zs
->zsl
,range
,zs
->dict
); 
1039         if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
1040         if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
); 
1042         redisPanic("Unknown sorted set encoding"); 
1045     if (deleted
) signalModifiedKey(c
->db
,key
); 
1046     server
.dirty 
+= deleted
; 
1047     addReplyLongLong(c
,deleted
); 
1050 void zremrangebyrankCommand(redisClient 
*c
) { 
1051     robj 
*key 
= c
->argv
[1]; 
1056     unsigned long deleted
; 
1058     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
1059         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
1061     if ((zobj 
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL 
|| 
1062         checkType(c
,zobj
,REDIS_ZSET
)) return; 
1064     /* Sanitize indexes. */ 
1065     llen 
= zsetLength(zobj
); 
1066     if (start 
< 0) start 
= llen
+start
; 
1067     if (end 
< 0) end 
= llen
+end
; 
1068     if (start 
< 0) start 
= 0; 
1070     /* Invariant: start >= 0, so this test will be true when end < 0. 
1071      * The range is empty when start > end or start >= length. */ 
1072     if (start 
> end 
|| start 
>= llen
) { 
1073         addReply(c
,shared
.czero
); 
1076     if (end 
>= llen
) end 
= llen
-1; 
1078     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1079         /* Correct for 1-based rank. */ 
1080         zobj
->ptr 
= zzlDeleteRangeByRank(zobj
->ptr
,start
+1,end
+1,&deleted
); 
1081         if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
); 
1082     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1083         zset 
*zs 
= zobj
->ptr
; 
1085         /* Correct for 1-based rank. */ 
1086         deleted 
= zslDeleteRangeByRank(zs
->zsl
,start
+1,end
+1,zs
->dict
); 
1087         if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
1088         if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
); 
1090         redisPanic("Unknown sorted set encoding"); 
1093     if (deleted
) signalModifiedKey(c
->db
,key
); 
1094     server
.dirty 
+= deleted
; 
1095     addReplyLongLong(c
,deleted
); 
1100     int type
; /* Set, sorted set */ 
1105         /* Set iterators. */ 
1118         /* Sorted set iterators. */ 
1122                 unsigned char *eptr
, *sptr
; 
1126                 zskiplistNode 
*node
; 
1133 /* Use dirty flags for pointers that need to be cleaned up in the next 
1134  * iteration over the zsetopval. The dirty flag for the long long value is 
1135  * special, since long long values don't need cleanup. Instead, it means that 
1136  * we already checked that "ell" holds a long long, or tried to convert another 
1137  * representation into a long long value. When this was successful, 
1138  * OPVAL_VALID_LL is set as well. */ 
1139 #define OPVAL_DIRTY_ROBJ 1 
1140 #define OPVAL_DIRTY_LL 2 
1141 #define OPVAL_VALID_LL 4 
1143 /* Store value retrieved from the iterator. */ 
1146     unsigned char _buf
[32]; /* Private buffer. */ 
1148     unsigned char *estr
; 
1154 typedef union _iterset iterset
; 
1155 typedef union _iterzset iterzset
; 
1157 void zuiInitIterator(zsetopsrc 
*op
) { 
1158     if (op
->subject 
== NULL
) 
1161     if (op
->type 
== REDIS_SET
) { 
1162         iterset 
*it 
= &op
->iter
.set
; 
1163         if (op
->encoding 
== REDIS_ENCODING_INTSET
) { 
1164             it
->is
.is 
= op
->subject
->ptr
; 
1166         } else if (op
->encoding 
== REDIS_ENCODING_HT
) { 
1167             it
->ht
.dict 
= op
->subject
->ptr
; 
1168             it
->ht
.di 
= dictGetIterator(op
->subject
->ptr
); 
1169             it
->ht
.de 
= dictNext(it
->ht
.di
); 
1171             redisPanic("Unknown set encoding"); 
1173     } else if (op
->type 
== REDIS_ZSET
) { 
1174         iterzset 
*it 
= &op
->iter
.zset
; 
1175         if (op
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1176             it
->zl
.zl 
= op
->subject
->ptr
; 
1177             it
->zl
.eptr 
= ziplistIndex(it
->zl
.zl
,0); 
1178             if (it
->zl
.eptr 
!= NULL
) { 
1179                 it
->zl
.sptr 
= ziplistNext(it
->zl
.zl
,it
->zl
.eptr
); 
1180                 redisAssert(it
->zl
.sptr 
!= NULL
); 
1182         } else if (op
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1183             it
->sl
.zs 
= op
->subject
->ptr
; 
1184             it
->sl
.node 
= it
->sl
.zs
->zsl
->header
->level
[0].forward
; 
1186             redisPanic("Unknown sorted set encoding"); 
1189         redisPanic("Unsupported type"); 
1193 void zuiClearIterator(zsetopsrc 
*op
) { 
1194     if (op
->subject 
== NULL
) 
1197     if (op
->type 
== REDIS_SET
) { 
1198         iterset 
*it 
= &op
->iter
.set
; 
1199         if (op
->encoding 
== REDIS_ENCODING_INTSET
) { 
1200             REDIS_NOTUSED(it
); /* skip */ 
1201         } else if (op
->encoding 
== REDIS_ENCODING_HT
) { 
1202             dictReleaseIterator(it
->ht
.di
); 
1204             redisPanic("Unknown set encoding"); 
1206     } else if (op
->type 
== REDIS_ZSET
) { 
1207         iterzset 
*it 
= &op
->iter
.zset
; 
1208         if (op
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1209             REDIS_NOTUSED(it
); /* skip */ 
1210         } else if (op
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1211             REDIS_NOTUSED(it
); /* skip */ 
1213             redisPanic("Unknown sorted set encoding"); 
1216         redisPanic("Unsupported type"); 
1220 int zuiLength(zsetopsrc 
*op
) { 
1221     if (op
->subject 
== NULL
) 
1224     if (op
->type 
== REDIS_SET
) { 
1225         iterset 
*it 
= &op
->iter
.set
; 
1226         if (op
->encoding 
== REDIS_ENCODING_INTSET
) { 
1227             return intsetLen(it
->is
.is
); 
1228         } else if (op
->encoding 
== REDIS_ENCODING_HT
) { 
1229             return dictSize(it
->ht
.dict
); 
1231             redisPanic("Unknown set encoding"); 
1233     } else if (op
->type 
== REDIS_ZSET
) { 
1234         iterzset 
*it 
= &op
->iter
.zset
; 
1235         if (op
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1236             return zzlLength(it
->zl
.zl
); 
1237         } else if (op
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1238             return it
->sl
.zs
->zsl
->length
; 
1240             redisPanic("Unknown sorted set encoding"); 
1243         redisPanic("Unsupported type"); 
1247 /* Check if the current value is valid. If so, store it in the passed structure 
1248  * and move to the next element. If not valid, this means we have reached the 
1249  * end of the structure and can abort. */ 
1250 int zuiNext(zsetopsrc 
*op
, zsetopval 
*val
) { 
1251     if (op
->subject 
== NULL
) 
1254     if (val
->flags 
& OPVAL_DIRTY_ROBJ
) 
1255         decrRefCount(val
->ele
); 
1257     memset(val
,0,sizeof(zsetopval
)); 
1259     if (op
->type 
== REDIS_SET
) { 
1260         iterset 
*it 
= &op
->iter
.set
; 
1261         if (op
->encoding 
== REDIS_ENCODING_INTSET
) { 
1262             int64_t ell 
= val
->ell
; 
1264             if (!intsetGet(it
->is
.is
,it
->is
.ii
,&ell
)) 
1268             /* Move to next element. */ 
1270         } else if (op
->encoding 
== REDIS_ENCODING_HT
) { 
1271             if (it
->ht
.de 
== NULL
) 
1273             val
->ele 
= dictGetKey(it
->ht
.de
); 
1276             /* Move to next element. */ 
1277             it
->ht
.de 
= dictNext(it
->ht
.di
); 
1279             redisPanic("Unknown set encoding"); 
1281     } else if (op
->type 
== REDIS_ZSET
) { 
1282         iterzset 
*it 
= &op
->iter
.zset
; 
1283         if (op
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1284             /* No need to check both, but better be explicit. */ 
1285             if (it
->zl
.eptr 
== NULL 
|| it
->zl
.sptr 
== NULL
) 
1287             redisAssert(ziplistGet(it
->zl
.eptr
,&val
->estr
,&val
->elen
,&val
->ell
)); 
1288             val
->score 
= zzlGetScore(it
->zl
.sptr
); 
1290             /* Move to next element. */ 
1291             zzlNext(it
->zl
.zl
,&it
->zl
.eptr
,&it
->zl
.sptr
); 
1292         } else if (op
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1293             if (it
->sl
.node 
== NULL
) 
1295             val
->ele 
= it
->sl
.node
->obj
; 
1296             val
->score 
= it
->sl
.node
->score
; 
1298             /* Move to next element. */ 
1299             it
->sl
.node 
= it
->sl
.node
->level
[0].forward
; 
1301             redisPanic("Unknown sorted set encoding"); 
1304         redisPanic("Unsupported type"); 
1309 int zuiLongLongFromValue(zsetopval 
*val
) { 
1310     if (!(val
->flags 
& OPVAL_DIRTY_LL
)) { 
1311         val
->flags 
|= OPVAL_DIRTY_LL
; 
1313         if (val
->ele 
!= NULL
) { 
1314             if (val
->ele
->encoding 
== REDIS_ENCODING_INT
) { 
1315                 val
->ell 
= (long)val
->ele
->ptr
; 
1316                 val
->flags 
|= OPVAL_VALID_LL
; 
1317             } else if (val
->ele
->encoding 
== REDIS_ENCODING_RAW
) { 
1318                 if (string2ll(val
->ele
->ptr
,sdslen(val
->ele
->ptr
),&val
->ell
)) 
1319                     val
->flags 
|= OPVAL_VALID_LL
; 
1321                 redisPanic("Unsupported element encoding"); 
1323         } else if (val
->estr 
!= NULL
) { 
1324             if (string2ll((char*)val
->estr
,val
->elen
,&val
->ell
)) 
1325                 val
->flags 
|= OPVAL_VALID_LL
; 
1327             /* The long long was already set, flag as valid. */ 
1328             val
->flags 
|= OPVAL_VALID_LL
; 
1331     return val
->flags 
& OPVAL_VALID_LL
; 
1334 robj 
*zuiObjectFromValue(zsetopval 
*val
) { 
1335     if (val
->ele 
== NULL
) { 
1336         if (val
->estr 
!= NULL
) { 
1337             val
->ele 
= createStringObject((char*)val
->estr
,val
->elen
); 
1339             val
->ele 
= createStringObjectFromLongLong(val
->ell
); 
1341         val
->flags 
|= OPVAL_DIRTY_ROBJ
; 
1346 int zuiBufferFromValue(zsetopval 
*val
) { 
1347     if (val
->estr 
== NULL
) { 
1348         if (val
->ele 
!= NULL
) { 
1349             if (val
->ele
->encoding 
== REDIS_ENCODING_INT
) { 
1350                 val
->elen 
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),(long)val
->ele
->ptr
); 
1351                 val
->estr 
= val
->_buf
; 
1352             } else if (val
->ele
->encoding 
== REDIS_ENCODING_RAW
) { 
1353                 val
->elen 
= sdslen(val
->ele
->ptr
); 
1354                 val
->estr 
= val
->ele
->ptr
; 
1356                 redisPanic("Unsupported element encoding"); 
1359             val
->elen 
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),val
->ell
); 
1360             val
->estr 
= val
->_buf
; 
1366 /* Find value pointed to by val in the source pointer to by op. When found, 
1367  * return 1 and store its score in target. Return 0 otherwise. */ 
1368 int zuiFind(zsetopsrc 
*op
, zsetopval 
*val
, double *score
) { 
1369     if (op
->subject 
== NULL
) 
1372     if (op
->type 
== REDIS_SET
) { 
1373         iterset 
*it 
= &op
->iter
.set
; 
1375         if (op
->encoding 
== REDIS_ENCODING_INTSET
) { 
1376             if (zuiLongLongFromValue(val
) && intsetFind(it
->is
.is
,val
->ell
)) { 
1382         } else if (op
->encoding 
== REDIS_ENCODING_HT
) { 
1383             zuiObjectFromValue(val
); 
1384             if (dictFind(it
->ht
.dict
,val
->ele
) != NULL
) { 
1391             redisPanic("Unknown set encoding"); 
1393     } else if (op
->type 
== REDIS_ZSET
) { 
1394         iterzset 
*it 
= &op
->iter
.zset
; 
1395         zuiObjectFromValue(val
); 
1397         if (op
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1398             if (zzlFind(it
->zl
.zl
,val
->ele
,score
) != NULL
) { 
1399                 /* Score is already set by zzlFind. */ 
1404         } else if (op
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1406             if ((de 
= dictFind(it
->sl
.zs
->dict
,val
->ele
)) != NULL
) { 
1407                 *score 
= *(double*)dictGetVal(de
); 
1413             redisPanic("Unknown sorted set encoding"); 
1416         redisPanic("Unsupported type"); 
1420 int zuiCompareByCardinality(const void *s1
, const void *s2
) { 
1421     return zuiLength((zsetopsrc
*)s1
) - zuiLength((zsetopsrc
*)s2
); 
1424 #define REDIS_AGGR_SUM 1 
1425 #define REDIS_AGGR_MIN 2 
1426 #define REDIS_AGGR_MAX 3 
1427 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e)) 
1429 inline static void zunionInterAggregate(double *target
, double val
, int aggregate
) { 
1430     if (aggregate 
== REDIS_AGGR_SUM
) { 
1431         *target 
= *target 
+ val
; 
1432         /* The result of adding two doubles is NaN when one variable 
1433          * is +inf and the other is -inf. When these numbers are added, 
1434          * we maintain the convention of the result being 0.0. */ 
1435         if (isnan(*target
)) *target 
= 0.0; 
1436     } else if (aggregate 
== REDIS_AGGR_MIN
) { 
1437         *target 
= val 
< *target 
? val 
: *target
; 
1438     } else if (aggregate 
== REDIS_AGGR_MAX
) { 
1439         *target 
= val 
> *target 
? val 
: *target
; 
1442         redisPanic("Unknown ZUNION/INTER aggregate type"); 
1446 void zunionInterGenericCommand(redisClient 
*c
, robj 
*dstkey
, int op
) { 
1449     int aggregate 
= REDIS_AGGR_SUM
; 
1453     unsigned int maxelelen 
= 0; 
1456     zskiplistNode 
*znode
; 
1459     /* expect setnum input keys to be given */ 
1460     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &setnum
, NULL
) != REDIS_OK
)) 
1465             "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); 
1469     /* test if the expected number of keys would overflow */ 
1470     if (3+setnum 
> c
->argc
) { 
1471         addReply(c
,shared
.syntaxerr
); 
1475     /* read keys to be used for input */ 
1476     src 
= zcalloc(sizeof(zsetopsrc
) * setnum
); 
1477     for (i 
= 0, j 
= 3; i 
< setnum
; i
++, j
++) { 
1478         robj 
*obj 
= lookupKeyWrite(c
->db
,c
->argv
[j
]); 
1480             if (obj
->type 
!= REDIS_ZSET 
&& obj
->type 
!= REDIS_SET
) { 
1482                 addReply(c
,shared
.wrongtypeerr
); 
1486             src
[i
].subject 
= obj
; 
1487             src
[i
].type 
= obj
->type
; 
1488             src
[i
].encoding 
= obj
->encoding
; 
1490             src
[i
].subject 
= NULL
; 
1493         /* Default all weights to 1. */ 
1494         src
[i
].weight 
= 1.0; 
1497     /* parse optional extra arguments */ 
1499         int remaining 
= c
->argc 
- j
; 
1502             if (remaining 
>= (setnum 
+ 1) && !strcasecmp(c
->argv
[j
]->ptr
,"weights")) { 
1504                 for (i 
= 0; i 
< setnum
; i
++, j
++, remaining
--) { 
1505                     if (getDoubleFromObjectOrReply(c
,c
->argv
[j
],&src
[i
].weight
, 
1506                             "weight value is not a float") != REDIS_OK
) 
1512             } else if (remaining 
>= 2 && !strcasecmp(c
->argv
[j
]->ptr
,"aggregate")) { 
1514                 if (!strcasecmp(c
->argv
[j
]->ptr
,"sum")) { 
1515                     aggregate 
= REDIS_AGGR_SUM
; 
1516                 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"min")) { 
1517                     aggregate 
= REDIS_AGGR_MIN
; 
1518                 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"max")) { 
1519                     aggregate 
= REDIS_AGGR_MAX
; 
1522                     addReply(c
,shared
.syntaxerr
); 
1528                 addReply(c
,shared
.syntaxerr
); 
1534     for (i 
= 0; i 
< setnum
; i
++) 
1535         zuiInitIterator(&src
[i
]); 
1537     /* sort sets from the smallest to largest, this will improve our 
1538      * algorithm's performance */ 
1539     qsort(src
,setnum
,sizeof(zsetopsrc
),zuiCompareByCardinality
); 
1541     dstobj 
= createZsetObject(); 
1542     dstzset 
= dstobj
->ptr
; 
1543     memset(&zval
, 0, sizeof(zval
)); 
1545     if (op 
== REDIS_OP_INTER
) { 
1546         /* Skip everything if the smallest input is empty. */ 
1547         if (zuiLength(&src
[0]) > 0) { 
1548             /* Precondition: as src[0] is non-empty and the inputs are ordered 
1549              * by size, all src[i > 0] are non-empty too. */ 
1550             while (zuiNext(&src
[0],&zval
)) { 
1551                 double score
, value
; 
1553                 score 
= src
[0].weight 
* zval
.score
; 
1554                 if (isnan(score
)) score 
= 0; 
1556                 for (j 
= 1; j 
< setnum
; j
++) { 
1557                     /* It is not safe to access the zset we are 
1558                      * iterating, so explicitly check for equal object. */ 
1559                     if (src
[j
].subject 
== src
[0].subject
) { 
1560                         value 
= zval
.score
*src
[j
].weight
; 
1561                         zunionInterAggregate(&score
,value
,aggregate
); 
1562                     } else if (zuiFind(&src
[j
],&zval
,&value
)) { 
1563                         value 
*= src
[j
].weight
; 
1564                         zunionInterAggregate(&score
,value
,aggregate
); 
1570                 /* Only continue when present in every input. */ 
1572                     tmp 
= zuiObjectFromValue(&zval
); 
1573                     znode 
= zslInsert(dstzset
->zsl
,score
,tmp
); 
1574                     incrRefCount(tmp
); /* added to skiplist */ 
1575                     dictAdd(dstzset
->dict
,tmp
,&znode
->score
); 
1576                     incrRefCount(tmp
); /* added to dictionary */ 
1578                     if (tmp
->encoding 
== REDIS_ENCODING_RAW
) 
1579                         if (sdslen(tmp
->ptr
) > maxelelen
) 
1580                             maxelelen 
= sdslen(tmp
->ptr
); 
1584     } else if (op 
== REDIS_OP_UNION
) { 
1585         for (i 
= 0; i 
< setnum
; i
++) { 
1586             if (zuiLength(&src
[i
]) == 0) 
1589             while (zuiNext(&src
[i
],&zval
)) { 
1590                 double score
, value
; 
1592                 /* Skip key when already processed */ 
1593                 if (dictFind(dstzset
->dict
,zuiObjectFromValue(&zval
)) != NULL
) 
1596                 /* Initialize score */ 
1597                 score 
= src
[i
].weight 
* zval
.score
; 
1598                 if (isnan(score
)) score 
= 0; 
1600                 /* Because the inputs are sorted by size, it's only possible 
1601                  * for sets at larger indices to hold this element. */ 
1602                 for (j 
= (i
+1); j 
< setnum
; j
++) { 
1603                     /* It is not safe to access the zset we are 
1604                      * iterating, so explicitly check for equal object. */ 
1605                     if(src
[j
].subject 
== src
[i
].subject
) { 
1606                         value 
= zval
.score
*src
[j
].weight
; 
1607                         zunionInterAggregate(&score
,value
,aggregate
); 
1608                     } else if (zuiFind(&src
[j
],&zval
,&value
)) { 
1609                         value 
*= src
[j
].weight
; 
1610                         zunionInterAggregate(&score
,value
,aggregate
); 
1614                 tmp 
= zuiObjectFromValue(&zval
); 
1615                 znode 
= zslInsert(dstzset
->zsl
,score
,tmp
); 
1616                 incrRefCount(zval
.ele
); /* added to skiplist */ 
1617                 dictAdd(dstzset
->dict
,tmp
,&znode
->score
); 
1618                 incrRefCount(zval
.ele
); /* added to dictionary */ 
1620                 if (tmp
->encoding 
== REDIS_ENCODING_RAW
) 
1621                     if (sdslen(tmp
->ptr
) > maxelelen
) 
1622                         maxelelen 
= sdslen(tmp
->ptr
); 
1626         redisPanic("Unknown operator"); 
1629     for (i 
= 0; i 
< setnum
; i
++) 
1630         zuiClearIterator(&src
[i
]); 
1632     if (dbDelete(c
->db
,dstkey
)) { 
1633         signalModifiedKey(c
->db
,dstkey
); 
1637     if (dstzset
->zsl
->length
) { 
1638         /* Convert to ziplist when in limits. */ 
1639         if (dstzset
->zsl
->length 
<= server
.zset_max_ziplist_entries 
&& 
1640             maxelelen 
<= server
.zset_max_ziplist_value
) 
1641                 zsetConvert(dstobj
,REDIS_ENCODING_ZIPLIST
); 
1643         dbAdd(c
->db
,dstkey
,dstobj
); 
1644         addReplyLongLong(c
,zsetLength(dstobj
)); 
1645         if (!touched
) signalModifiedKey(c
->db
,dstkey
); 
1648         decrRefCount(dstobj
); 
1649         addReply(c
,shared
.czero
); 
1654 void zunionstoreCommand(redisClient 
*c
) { 
1655     zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_UNION
); 
1658 void zinterstoreCommand(redisClient 
*c
) { 
1659     zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_INTER
); 
1662 void zrangeGenericCommand(redisClient 
*c
, int reverse
) { 
1663     robj 
*key 
= c
->argv
[1]; 
1671     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
1672         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
1674     if (c
->argc 
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) { 
1676     } else if (c
->argc 
>= 5) { 
1677         addReply(c
,shared
.syntaxerr
); 
1681     if ((zobj 
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
 
1682          || checkType(c
,zobj
,REDIS_ZSET
)) return; 
1684     /* Sanitize indexes. */ 
1685     llen 
= zsetLength(zobj
); 
1686     if (start 
< 0) start 
= llen
+start
; 
1687     if (end 
< 0) end 
= llen
+end
; 
1688     if (start 
< 0) start 
= 0; 
1690     /* Invariant: start >= 0, so this test will be true when end < 0. 
1691      * The range is empty when start > end or start >= length. */ 
1692     if (start 
> end 
|| start 
>= llen
) { 
1693         addReply(c
,shared
.emptymultibulk
); 
1696     if (end 
>= llen
) end 
= llen
-1; 
1697     rangelen 
= (end
-start
)+1; 
1699     /* Return the result in form of a multi-bulk reply */ 
1700     addReplyMultiBulkLen(c
, withscores 
? (rangelen
*2) : rangelen
); 
1702     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1703         unsigned char *zl 
= zobj
->ptr
; 
1704         unsigned char *eptr
, *sptr
; 
1705         unsigned char *vstr
; 
1710             eptr 
= ziplistIndex(zl
,-2-(2*start
)); 
1712             eptr 
= ziplistIndex(zl
,2*start
); 
1714         redisAssertWithInfo(c
,zobj
,eptr 
!= NULL
); 
1715         sptr 
= ziplistNext(zl
,eptr
); 
1717         while (rangelen
--) { 
1718             redisAssertWithInfo(c
,zobj
,eptr 
!= NULL 
&& sptr 
!= NULL
); 
1719             redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
)); 
1721                 addReplyBulkLongLong(c
,vlong
); 
1723                 addReplyBulkCBuffer(c
,vstr
,vlen
); 
1726                 addReplyDouble(c
,zzlGetScore(sptr
)); 
1729                 zzlPrev(zl
,&eptr
,&sptr
); 
1731                 zzlNext(zl
,&eptr
,&sptr
); 
1734     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1735         zset 
*zs 
= zobj
->ptr
; 
1736         zskiplist 
*zsl 
= zs
->zsl
; 
1740         /* Check if starting point is trivial, before doing log(N) lookup. */ 
1744                 ln 
= zslGetElementByRank(zsl
,llen
-start
); 
1746             ln 
= zsl
->header
->level
[0].forward
; 
1748                 ln 
= zslGetElementByRank(zsl
,start
+1); 
1752             redisAssertWithInfo(c
,zobj
,ln 
!= NULL
); 
1754             addReplyBulk(c
,ele
); 
1756                 addReplyDouble(c
,ln
->score
); 
1757             ln 
= reverse 
? ln
->backward 
: ln
->level
[0].forward
; 
1760         redisPanic("Unknown sorted set encoding"); 
1764 void zrangeCommand(redisClient 
*c
) { 
1765     zrangeGenericCommand(c
,0); 
1768 void zrevrangeCommand(redisClient 
*c
) { 
1769     zrangeGenericCommand(c
,1); 
1772 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */ 
1773 void genericZrangebyscoreCommand(redisClient 
*c
, int reverse
) { 
1775     robj 
*key 
= c
->argv
[1]; 
1777     long offset 
= 0, limit 
= -1; 
1779     unsigned long rangelen 
= 0; 
1780     void *replylen 
= NULL
; 
1783     /* Parse the range arguments. */ 
1785         /* Range is given as [max,min] */ 
1786         maxidx 
= 2; minidx 
= 3; 
1788         /* Range is given as [min,max] */ 
1789         minidx 
= 2; maxidx 
= 3; 
1792     if (zslParseRange(c
->argv
[minidx
],c
->argv
[maxidx
],&range
) != REDIS_OK
) { 
1793         addReplyError(c
,"min or max is not a float"); 
1797     /* Parse optional extra arguments. Note that ZCOUNT will exactly have 
1798      * 4 arguments, so we'll never enter the following code path. */ 
1800         int remaining 
= c
->argc 
- 4; 
1804             if (remaining 
>= 1 && !strcasecmp(c
->argv
[pos
]->ptr
,"withscores")) { 
1807             } else if (remaining 
>= 3 && !strcasecmp(c
->argv
[pos
]->ptr
,"limit")) { 
1808                 if ((getLongFromObjectOrReply(c
, c
->argv
[pos
+1], &offset
, NULL
) != REDIS_OK
) || 
1809                     (getLongFromObjectOrReply(c
, c
->argv
[pos
+2], &limit
, NULL
) != REDIS_OK
)) return; 
1810                 pos 
+= 3; remaining 
-= 3; 
1812                 addReply(c
,shared
.syntaxerr
); 
1818     /* Ok, lookup the key and get the range */ 
1819     if ((zobj 
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL 
|| 
1820         checkType(c
,zobj
,REDIS_ZSET
)) return; 
1822     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1823         unsigned char *zl 
= zobj
->ptr
; 
1824         unsigned char *eptr
, *sptr
; 
1825         unsigned char *vstr
; 
1830         /* If reversed, get the last node in range as starting point. */ 
1832             eptr 
= zzlLastInRange(zl
,range
); 
1834             eptr 
= zzlFirstInRange(zl
,range
); 
1837         /* No "first" element in the specified interval. */ 
1839             addReply(c
, shared
.emptymultibulk
); 
1843         /* Get score pointer for the first element. */ 
1844         redisAssertWithInfo(c
,zobj
,eptr 
!= NULL
); 
1845         sptr 
= ziplistNext(zl
,eptr
); 
1847         /* We don't know in advance how many matching elements there are in the 
1848          * list, so we push this object that will represent the multi-bulk 
1849          * length in the output buffer, and will "fix" it later */ 
1850         replylen 
= addDeferredMultiBulkLength(c
); 
1852         /* If there is an offset, just traverse the number of elements without 
1853          * checking the score because that is done in the next loop. */ 
1854         while (eptr 
&& offset
--) { 
1856                 zzlPrev(zl
,&eptr
,&sptr
); 
1858                 zzlNext(zl
,&eptr
,&sptr
); 
1862         while (eptr 
&& limit
--) { 
1863             score 
= zzlGetScore(sptr
); 
1865             /* Abort when the node is no longer in range. */ 
1867                 if (!zslValueGteMin(score
,&range
)) break; 
1869                 if (!zslValueLteMax(score
,&range
)) break; 
1872             /* We know the element exists, so ziplistGet should always succeed */ 
1873             redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
)); 
1877                 addReplyBulkLongLong(c
,vlong
); 
1879                 addReplyBulkCBuffer(c
,vstr
,vlen
); 
1883                 addReplyDouble(c
,score
); 
1886             /* Move to next node */ 
1888                 zzlPrev(zl
,&eptr
,&sptr
); 
1890                 zzlNext(zl
,&eptr
,&sptr
); 
1893     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1894         zset 
*zs 
= zobj
->ptr
; 
1895         zskiplist 
*zsl 
= zs
->zsl
; 
1898         /* If reversed, get the last node in range as starting point. */ 
1900             ln 
= zslLastInRange(zsl
,range
); 
1902             ln 
= zslFirstInRange(zsl
,range
); 
1905         /* No "first" element in the specified interval. */ 
1907             addReply(c
, shared
.emptymultibulk
); 
1911         /* We don't know in advance how many matching elements there are in the 
1912          * list, so we push this object that will represent the multi-bulk 
1913          * length in the output buffer, and will "fix" it later */ 
1914         replylen 
= addDeferredMultiBulkLength(c
); 
1916         /* If there is an offset, just traverse the number of elements without 
1917          * checking the score because that is done in the next loop. */ 
1918         while (ln 
&& offset
--) { 
1922                 ln 
= ln
->level
[0].forward
; 
1926         while (ln 
&& limit
--) { 
1927             /* Abort when the node is no longer in range. */ 
1929                 if (!zslValueGteMin(ln
->score
,&range
)) break; 
1931                 if (!zslValueLteMax(ln
->score
,&range
)) break; 
1935             addReplyBulk(c
,ln
->obj
); 
1938                 addReplyDouble(c
,ln
->score
); 
1941             /* Move to next node */ 
1945                 ln 
= ln
->level
[0].forward
; 
1949         redisPanic("Unknown sorted set encoding"); 
1956     setDeferredMultiBulkLength(c
, replylen
, rangelen
); 
1959 void zrangebyscoreCommand(redisClient 
*c
) { 
1960     genericZrangebyscoreCommand(c
,0); 
1963 void zrevrangebyscoreCommand(redisClient 
*c
) { 
1964     genericZrangebyscoreCommand(c
,1); 
1967 void zcountCommand(redisClient 
*c
) { 
1968     robj 
*key 
= c
->argv
[1]; 
1973     /* Parse the range arguments */ 
1974     if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) { 
1975         addReplyError(c
,"min or max is not a float"); 
1979     /* Lookup the sorted set */ 
1980     if ((zobj 
= lookupKeyReadOrReply(c
, key
, shared
.czero
)) == NULL 
|| 
1981         checkType(c
, zobj
, REDIS_ZSET
)) return; 
1983     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1984         unsigned char *zl 
= zobj
->ptr
; 
1985         unsigned char *eptr
, *sptr
; 
1988         /* Use the first element in range as the starting point */ 
1989         eptr 
= zzlFirstInRange(zl
,range
); 
1991         /* No "first" element */ 
1993             addReply(c
, shared
.czero
); 
1997         /* First element is in range */ 
1998         sptr 
= ziplistNext(zl
,eptr
); 
1999         score 
= zzlGetScore(sptr
); 
2000         redisAssertWithInfo(c
,zobj
,zslValueLteMax(score
,&range
)); 
2002         /* Iterate over elements in range */ 
2004             score 
= zzlGetScore(sptr
); 
2006             /* Abort when the node is no longer in range. */ 
2007             if (!zslValueLteMax(score
,&range
)) { 
2011                 zzlNext(zl
,&eptr
,&sptr
); 
2014     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
2015         zset 
*zs 
= zobj
->ptr
; 
2016         zskiplist 
*zsl 
= zs
->zsl
; 
2020         /* Find first element in range */ 
2021         zn 
= zslFirstInRange(zsl
, range
); 
2023         /* Use rank of first element, if any, to determine preliminary count */ 
2025             rank 
= zslGetRank(zsl
, zn
->score
, zn
->obj
); 
2026             count 
= (zsl
->length 
- (rank 
- 1)); 
2028             /* Find last element in range */ 
2029             zn 
= zslLastInRange(zsl
, range
); 
2031             /* Use rank of last element, if any, to determine the actual count */ 
2033                 rank 
= zslGetRank(zsl
, zn
->score
, zn
->obj
); 
2034                 count 
-= (zsl
->length 
- rank
); 
2038         redisPanic("Unknown sorted set encoding"); 
2041     addReplyLongLong(c
, count
); 
2044 void zcardCommand(redisClient 
*c
) { 
2045     robj 
*key 
= c
->argv
[1]; 
2048     if ((zobj 
= lookupKeyReadOrReply(c
,key
,shared
.czero
)) == NULL 
|| 
2049         checkType(c
,zobj
,REDIS_ZSET
)) return; 
2051     addReplyLongLong(c
,zsetLength(zobj
)); 
2054 void zscoreCommand(redisClient 
*c
) { 
2055     robj 
*key 
= c
->argv
[1]; 
2059     if ((zobj 
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL 
|| 
2060         checkType(c
,zobj
,REDIS_ZSET
)) return; 
2062     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
2063         if (zzlFind(zobj
->ptr
,c
->argv
[2],&score
) != NULL
) 
2064             addReplyDouble(c
,score
); 
2066             addReply(c
,shared
.nullbulk
); 
2067     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
2068         zset 
*zs 
= zobj
->ptr
; 
2071         c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
2072         de 
= dictFind(zs
->dict
,c
->argv
[2]); 
2074             score 
= *(double*)dictGetVal(de
); 
2075             addReplyDouble(c
,score
); 
2077             addReply(c
,shared
.nullbulk
); 
2080         redisPanic("Unknown sorted set encoding"); 
2084 void zrankGenericCommand(redisClient 
*c
, int reverse
) { 
2085     robj 
*key 
= c
->argv
[1]; 
2086     robj 
*ele 
= c
->argv
[2]; 
2091     if ((zobj 
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL 
|| 
2092         checkType(c
,zobj
,REDIS_ZSET
)) return; 
2093     llen 
= zsetLength(zobj
); 
2095     redisAssertWithInfo(c
,ele
,ele
->encoding 
== REDIS_ENCODING_RAW
); 
2096     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
2097         unsigned char *zl 
= zobj
->ptr
; 
2098         unsigned char *eptr
, *sptr
; 
2100         eptr 
= ziplistIndex(zl
,0); 
2101         redisAssertWithInfo(c
,zobj
,eptr 
!= NULL
); 
2102         sptr 
= ziplistNext(zl
,eptr
); 
2103         redisAssertWithInfo(c
,zobj
,sptr 
!= NULL
); 
2106         while(eptr 
!= NULL
) { 
2107             if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
))) 
2110             zzlNext(zl
,&eptr
,&sptr
); 
2115                 addReplyLongLong(c
,llen
-rank
); 
2117                 addReplyLongLong(c
,rank
-1); 
2119             addReply(c
,shared
.nullbulk
); 
2121     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
2122         zset 
*zs 
= zobj
->ptr
; 
2123         zskiplist 
*zsl 
= zs
->zsl
; 
2127         ele 
= c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
2128         de 
= dictFind(zs
->dict
,ele
); 
2130             score 
= *(double*)dictGetVal(de
); 
2131             rank 
= zslGetRank(zsl
,score
,ele
); 
2132             redisAssertWithInfo(c
,ele
,rank
); /* Existing elements always have a rank. */ 
2134                 addReplyLongLong(c
,llen
-rank
); 
2136                 addReplyLongLong(c
,rank
-1); 
2138             addReply(c
,shared
.nullbulk
); 
2141         redisPanic("Unknown sorted set encoding"); 
2145 void zrankCommand(redisClient 
*c
) { 
2146     zrankGenericCommand(c
, 0); 
2149 void zrevrankCommand(redisClient 
*c
) { 
2150     zrankGenericCommand(c
, 1);