]>
git.saurik.com Git - redis.git/blob - src/t_zset.c
1c55cb97a575e3a7ae3819cb64dcb29533767ad4
   5 /*----------------------------------------------------------------------------- 
   7  *----------------------------------------------------------------------------*/ 
   9 /* ZSETs are ordered sets using two data structures to hold the same elements 
  10  * in order to get O(log(N)) INSERT and REMOVE operations into a sorted 
  13  * The elements are added to an hash table mapping Redis objects to scores. 
  14  * At the same time the elements are added to a skip list mapping scores 
  15  * to Redis objects (so objects are sorted by scores in this "view"). */ 
  17 /* This skiplist implementation is almost a C translation of the original 
  18  * algorithm described by William Pugh in "Skip Lists: A Probabilistic 
  19  * Alternative to Balanced Trees", modified in three ways: 
  20  * a) this implementation allows for repeated values. 
  21  * b) the comparison is not just by key (our 'score') but by satellite data. 
  22  * c) there is a back pointer, so it's a doubly linked list with the back 
  23  * pointers being only at "level 1". This allows to traverse the list 
  24  * from tail to head, useful for ZREVRANGE. */ 
  26 zskiplistNode 
*zslCreateNode(int level
, double score
, robj 
*obj
) { 
  27     zskiplistNode 
*zn 
= zmalloc(sizeof(*zn
)+level
*sizeof(struct zskiplistLevel
)); 
  33 zskiplist 
*zslCreate(void) { 
  37     zsl 
= zmalloc(sizeof(*zsl
)); 
  40     zsl
->header 
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
); 
  41     for (j 
= 0; j 
< ZSKIPLIST_MAXLEVEL
; j
++) { 
  42         zsl
->header
->level
[j
].forward 
= NULL
; 
  43         zsl
->header
->level
[j
].span 
= 0; 
  45     zsl
->header
->backward 
= NULL
; 
  50 void zslFreeNode(zskiplistNode 
*node
) { 
  51     decrRefCount(node
->obj
); 
  55 void zslFree(zskiplist 
*zsl
) { 
  56     zskiplistNode 
*node 
= zsl
->header
->level
[0].forward
, *next
; 
  60         next 
= node
->level
[0].forward
; 
  67 int zslRandomLevel(void) { 
  69     while ((random()&0xFFFF) < (ZSKIPLIST_P 
* 0xFFFF)) 
  71     return (level
<ZSKIPLIST_MAXLEVEL
) ? level 
: ZSKIPLIST_MAXLEVEL
; 
  74 zskiplistNode 
*zslInsert(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
  75     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
  76     unsigned int rank
[ZSKIPLIST_MAXLEVEL
]; 
  80     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
  81         /* store rank that is crossed to reach the insert position */ 
  82         rank
[i
] = i 
== (zsl
->level
-1) ? 0 : rank
[i
+1]; 
  83         while (x
->level
[i
].forward 
&& 
  84             (x
->level
[i
].forward
->score 
< score 
|| 
  85                 (x
->level
[i
].forward
->score 
== score 
&& 
  86                 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) { 
  87             rank
[i
] += x
->level
[i
].span
; 
  88             x 
= x
->level
[i
].forward
; 
  92     /* we assume the key is not already inside, since we allow duplicated 
  93      * scores, and the re-insertion of score and redis object should never 
  94      * happpen since the caller of zslInsert() should test in the hash table 
  95      * if the element is already inside or not. */ 
  96     level 
= zslRandomLevel(); 
  97     if (level 
> zsl
->level
) { 
  98         for (i 
= zsl
->level
; i 
< level
; i
++) { 
 100             update
[i
] = zsl
->header
; 
 101             update
[i
]->level
[i
].span 
= zsl
->length
; 
 105     x 
= zslCreateNode(level
,score
,obj
); 
 106     for (i 
= 0; i 
< level
; i
++) { 
 107         x
->level
[i
].forward 
= update
[i
]->level
[i
].forward
; 
 108         update
[i
]->level
[i
].forward 
= x
; 
 110         /* update span covered by update[i] as x is inserted here */ 
 111         x
->level
[i
].span 
= update
[i
]->level
[i
].span 
- (rank
[0] - rank
[i
]); 
 112         update
[i
]->level
[i
].span 
= (rank
[0] - rank
[i
]) + 1; 
 115     /* increment span for untouched levels */ 
 116     for (i 
= level
; i 
< zsl
->level
; i
++) { 
 117         update
[i
]->level
[i
].span
++; 
 120     x
->backward 
= (update
[0] == zsl
->header
) ? NULL 
: update
[0]; 
 121     if (x
->level
[0].forward
) 
 122         x
->level
[0].forward
->backward 
= x
; 
 129 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */ 
 130 void zslDeleteNode(zskiplist 
*zsl
, zskiplistNode 
*x
, zskiplistNode 
**update
) { 
 132     for (i 
= 0; i 
< zsl
->level
; i
++) { 
 133         if (update
[i
]->level
[i
].forward 
== x
) { 
 134             update
[i
]->level
[i
].span 
+= x
->level
[i
].span 
- 1; 
 135             update
[i
]->level
[i
].forward 
= x
->level
[i
].forward
; 
 137             update
[i
]->level
[i
].span 
-= 1; 
 140     if (x
->level
[0].forward
) { 
 141         x
->level
[0].forward
->backward 
= x
->backward
; 
 143         zsl
->tail 
= x
->backward
; 
 145     while(zsl
->level 
> 1 && zsl
->header
->level
[zsl
->level
-1].forward 
== NULL
) 
 150 /* Delete an element with matching score/object from the skiplist. */ 
 151 int zslDelete(zskiplist 
*zsl
, double score
, robj 
*obj
) { 
 152     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 156     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 157         while (x
->level
[i
].forward 
&& 
 158             (x
->level
[i
].forward
->score 
< score 
|| 
 159                 (x
->level
[i
].forward
->score 
== score 
&& 
 160                 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) 
 161             x 
= x
->level
[i
].forward
; 
 164     /* We may have multiple elements with the same score, what we need 
 165      * is to find the element with both the right score and object. */ 
 166     x 
= x
->level
[0].forward
; 
 167     if (x 
&& score 
== x
->score 
&& equalStringObjects(x
->obj
,obj
)) { 
 168         zslDeleteNode(zsl
, x
, update
); 
 172         return 0; /* not found */ 
 174     return 0; /* not found */ 
 177 static int zslValueGteMin(double value
, zrangespec 
*spec
) { 
 178     return spec
->minex 
? (value 
> spec
->min
) : (value 
>= spec
->min
); 
 181 static int zslValueLteMax(double value
, zrangespec 
*spec
) { 
 182     return spec
->maxex 
? (value 
< spec
->max
) : (value 
<= spec
->max
); 
 185 /* Returns if there is a part of the zset is in range. */ 
 186 int zslIsInRange(zskiplist 
*zsl
, zrangespec 
*range
) { 
 189     /* Test for ranges that will always be empty. */ 
 190     if (range
->min 
> range
->max 
|| 
 191             (range
->min 
== range
->max 
&& (range
->minex 
|| range
->maxex
))) 
 194     if (x 
== NULL 
|| !zslValueGteMin(x
->score
,range
)) 
 196     x 
= zsl
->header
->level
[0].forward
; 
 197     if (x 
== NULL 
|| !zslValueLteMax(x
->score
,range
)) 
 202 /* Find the first node that is contained in the specified range. 
 203  * Returns NULL when no element is contained in the range. */ 
 204 zskiplistNode 
*zslFirstInRange(zskiplist 
*zsl
, zrangespec range
) { 
 208     /* If everything is out of range, return early. */ 
 209     if (!zslIsInRange(zsl
,&range
)) return NULL
; 
 212     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 213         /* Go forward while *OUT* of range. */ 
 214         while (x
->level
[i
].forward 
&& 
 215             !zslValueGteMin(x
->level
[i
].forward
->score
,&range
)) 
 216                 x 
= x
->level
[i
].forward
; 
 219     /* This is an inner range, so the next node cannot be NULL. */ 
 220     x 
= x
->level
[0].forward
; 
 221     redisAssert(x 
!= NULL
); 
 223     /* Check if score <= max. */ 
 224     if (!zslValueLteMax(x
->score
,&range
)) return NULL
; 
 228 /* Find the last node that is contained in the specified range. 
 229  * Returns NULL when no element is contained in the range. */ 
 230 zskiplistNode 
*zslLastInRange(zskiplist 
*zsl
, zrangespec range
) { 
 234     /* If everything is out of range, return early. */ 
 235     if (!zslIsInRange(zsl
,&range
)) return NULL
; 
 238     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 239         /* Go forward while *IN* range. */ 
 240         while (x
->level
[i
].forward 
&& 
 241             zslValueLteMax(x
->level
[i
].forward
->score
,&range
)) 
 242                 x 
= x
->level
[i
].forward
; 
 245     /* This is an inner range, so this node cannot be NULL. */ 
 246     redisAssert(x 
!= NULL
); 
 248     /* Check if score >= min. */ 
 249     if (!zslValueGteMin(x
->score
,&range
)) return NULL
; 
 253 /* Delete all the elements with score between min and max from the skiplist. 
 254  * Min and mx are inclusive, so a score >= min || score <= max is deleted. 
 255  * Note that this function takes the reference to the hash table view of the 
 256  * sorted set, in order to remove the elements from the hash table too. */ 
 257 unsigned long zslDeleteRangeByScore(zskiplist 
*zsl
, zrangespec range
, dict 
*dict
) { 
 258     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 259     unsigned long removed 
= 0; 
 263     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 264         while (x
->level
[i
].forward 
&& (range
.minex 
? 
 265             x
->level
[i
].forward
->score 
<= range
.min 
: 
 266             x
->level
[i
].forward
->score 
< range
.min
)) 
 267                 x 
= x
->level
[i
].forward
; 
 271     /* Current node is the last with score < or <= min. */ 
 272     x 
= x
->level
[0].forward
; 
 274     /* Delete nodes while in range. */ 
 275     while (x 
&& (range
.maxex 
? x
->score 
< range
.max 
: x
->score 
<= range
.max
)) { 
 276         zskiplistNode 
*next 
= x
->level
[0].forward
; 
 277         zslDeleteNode(zsl
,x
,update
); 
 278         dictDelete(dict
,x
->obj
); 
 286 /* Delete all the elements with rank between start and end from the skiplist. 
 287  * Start and end are inclusive. Note that start and end need to be 1-based */ 
 288 unsigned long zslDeleteRangeByRank(zskiplist 
*zsl
, unsigned int start
, unsigned int end
, dict 
*dict
) { 
 289     zskiplistNode 
*update
[ZSKIPLIST_MAXLEVEL
], *x
; 
 290     unsigned long traversed 
= 0, removed 
= 0; 
 294     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 295         while (x
->level
[i
].forward 
&& (traversed 
+ x
->level
[i
].span
) < start
) { 
 296             traversed 
+= x
->level
[i
].span
; 
 297             x 
= x
->level
[i
].forward
; 
 303     x 
= x
->level
[0].forward
; 
 304     while (x 
&& traversed 
<= end
) { 
 305         zskiplistNode 
*next 
= x
->level
[0].forward
; 
 306         zslDeleteNode(zsl
,x
,update
); 
 307         dictDelete(dict
,x
->obj
); 
 316 /* Find the rank for an element by both score and key. 
 317  * Returns 0 when the element cannot be found, rank otherwise. 
 318  * Note that the rank is 1-based due to the span of zsl->header to the 
 320 unsigned long zslGetRank(zskiplist 
*zsl
, double score
, robj 
*o
) { 
 322     unsigned long rank 
= 0; 
 326     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 327         while (x
->level
[i
].forward 
&& 
 328             (x
->level
[i
].forward
->score 
< score 
|| 
 329                 (x
->level
[i
].forward
->score 
== score 
&& 
 330                 compareStringObjects(x
->level
[i
].forward
->obj
,o
) <= 0))) { 
 331             rank 
+= x
->level
[i
].span
; 
 332             x 
= x
->level
[i
].forward
; 
 335         /* x might be equal to zsl->header, so test if obj is non-NULL */ 
 336         if (x
->obj 
&& equalStringObjects(x
->obj
,o
)) { 
 343 /* Finds an element by its rank. The rank argument needs to be 1-based. */ 
 344 zskiplistNode
* zslGetElementByRank(zskiplist 
*zsl
, unsigned long rank
) { 
 346     unsigned long traversed 
= 0; 
 350     for (i 
= zsl
->level
-1; i 
>= 0; i
--) { 
 351         while (x
->level
[i
].forward 
&& (traversed 
+ x
->level
[i
].span
) <= rank
) 
 353             traversed 
+= x
->level
[i
].span
; 
 354             x 
= x
->level
[i
].forward
; 
 356         if (traversed 
== rank
) { 
 363 /* Populate the rangespec according to the objects min and max. */ 
 364 static int zslParseRange(robj 
*min
, robj 
*max
, zrangespec 
*spec
) { 
 366     spec
->minex 
= spec
->maxex 
= 0; 
 368     /* Parse the min-max interval. If one of the values is prefixed 
 369      * by the "(" character, it's considered "open". For instance 
 370      * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max 
 371      * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ 
 372     if (min
->encoding 
== REDIS_ENCODING_INT
) { 
 373         spec
->min 
= (long)min
->ptr
; 
 375         if (((char*)min
->ptr
)[0] == '(') { 
 376             spec
->min 
= strtod((char*)min
->ptr
+1,&eptr
); 
 377             if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
; 
 380             spec
->min 
= strtod((char*)min
->ptr
,&eptr
); 
 381             if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
; 
 384     if (max
->encoding 
== REDIS_ENCODING_INT
) { 
 385         spec
->max 
= (long)max
->ptr
; 
 387         if (((char*)max
->ptr
)[0] == '(') { 
 388             spec
->max 
= strtod((char*)max
->ptr
+1,&eptr
); 
 389             if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
; 
 392             spec
->max 
= strtod((char*)max
->ptr
,&eptr
); 
 393             if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
; 
 400 /*----------------------------------------------------------------------------- 
 401  * Ziplist-backed sorted set API 
 402  *----------------------------------------------------------------------------*/ 
 404 double zzlGetScore(unsigned char *sptr
) { 
 411     redisAssert(sptr 
!= NULL
); 
 412     redisAssert(ziplistGet(sptr
,&vstr
,&vlen
,&vlong
)); 
 415         memcpy(buf
,vstr
,vlen
); 
 417         score 
= strtod(buf
,NULL
); 
 425 /* Compare element in sorted set with given element. */ 
 426 int zzlCompareElements(unsigned char *eptr
, unsigned char *cstr
, unsigned int clen
) { 
 430     unsigned char vbuf
[32]; 
 433     redisAssert(ziplistGet(eptr
,&vstr
,&vlen
,&vlong
)); 
 435         /* Store string representation of long long in buf. */ 
 436         vlen 
= ll2string((char*)vbuf
,sizeof(vbuf
),vlong
); 
 440     minlen 
= (vlen 
< clen
) ? vlen 
: clen
; 
 441     cmp 
= memcmp(vstr
,cstr
,minlen
); 
 442     if (cmp 
== 0) return vlen
-clen
; 
 446 unsigned int zzlLength(unsigned char *zl
) { 
 447     return ziplistLen(zl
)/2; 
 450 /* Move to next entry based on the values in eptr and sptr. Both are set to 
 451  * NULL when there is no next entry. */ 
 452 void zzlNext(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) { 
 453     unsigned char *_eptr
, *_sptr
; 
 454     redisAssert(*eptr 
!= NULL 
&& *sptr 
!= NULL
); 
 456     _eptr 
= ziplistNext(zl
,*sptr
); 
 458         _sptr 
= ziplistNext(zl
,_eptr
); 
 459         redisAssert(_sptr 
!= NULL
); 
 469 /* Move to the previous entry based on the values in eptr and sptr. Both are 
 470  * set to NULL when there is no next entry. */ 
 471 void zzlPrev(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) { 
 472     unsigned char *_eptr
, *_sptr
; 
 473     redisAssert(*eptr 
!= NULL 
&& *sptr 
!= NULL
); 
 475     _sptr 
= ziplistPrev(zl
,*eptr
); 
 477         _eptr 
= ziplistPrev(zl
,_sptr
); 
 478         redisAssert(_eptr 
!= NULL
); 
 480         /* No previous entry. */ 
 488 /* Returns if there is a part of the zset is in range. Should only be used 
 489  * internally by zzlFirstInRange and zzlLastInRange. */ 
 490 int zzlIsInRange(unsigned char *zl
, zrangespec 
*range
) { 
 494     /* Test for ranges that will always be empty. */ 
 495     if (range
->min 
> range
->max 
|| 
 496             (range
->min 
== range
->max 
&& (range
->minex 
|| range
->maxex
))) 
 499     p 
= ziplistIndex(zl
,-1); /* Last score. */ 
 500     redisAssert(p 
!= NULL
); 
 501     score 
= zzlGetScore(p
); 
 502     if (!zslValueGteMin(score
,range
)) 
 505     p 
= ziplistIndex(zl
,1); /* First score. */ 
 506     redisAssert(p 
!= NULL
); 
 507     score 
= zzlGetScore(p
); 
 508     if (!zslValueLteMax(score
,range
)) 
 514 /* Find pointer to the first element contained in the specified range. 
 515  * Returns NULL when no element is contained in the range. */ 
 516 unsigned char *zzlFirstInRange(unsigned char *zl
, zrangespec range
) { 
 517     unsigned char *eptr 
= ziplistIndex(zl
,0), *sptr
; 
 520     /* If everything is out of range, return early. */ 
 521     if (!zzlIsInRange(zl
,&range
)) return NULL
; 
 523     while (eptr 
!= NULL
) { 
 524         sptr 
= ziplistNext(zl
,eptr
); 
 525         redisAssert(sptr 
!= NULL
); 
 527         score 
= zzlGetScore(sptr
); 
 528         if (zslValueGteMin(score
,&range
)) { 
 529             /* Check if score <= max. */ 
 530             if (zslValueLteMax(score
,&range
)) 
 535         /* Move to next element. */ 
 536         eptr 
= ziplistNext(zl
,sptr
); 
 542 /* Find pointer to the last element contained in the specified range. 
 543  * Returns NULL when no element is contained in the range. */ 
 544 unsigned char *zzlLastInRange(unsigned char *zl
, zrangespec range
) { 
 545     unsigned char *eptr 
= ziplistIndex(zl
,-2), *sptr
; 
 548     /* If everything is out of range, return early. */ 
 549     if (!zzlIsInRange(zl
,&range
)) return NULL
; 
 551     while (eptr 
!= NULL
) { 
 552         sptr 
= ziplistNext(zl
,eptr
); 
 553         redisAssert(sptr 
!= NULL
); 
 555         score 
= zzlGetScore(sptr
); 
 556         if (zslValueLteMax(score
,&range
)) { 
 557             /* Check if score >= min. */ 
 558             if (zslValueGteMin(score
,&range
)) 
 563         /* Move to previous element by moving to the score of previous element. 
 564          * When this returns NULL, we know there also is no element. */ 
 565         sptr 
= ziplistPrev(zl
,eptr
); 
 567             redisAssert((eptr 
= ziplistPrev(zl
,sptr
)) != NULL
); 
 575 unsigned char *zzlFind(unsigned char *zl
, robj 
*ele
, double *score
) { 
 576     unsigned char *eptr 
= ziplistIndex(zl
,0), *sptr
; 
 578     ele 
= getDecodedObject(ele
); 
 579     while (eptr 
!= NULL
) { 
 580         sptr 
= ziplistNext(zl
,eptr
); 
 581         redisAssert(sptr 
!= NULL
); 
 583         if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
))) { 
 584             /* Matching element, pull out score. */ 
 585             if (score 
!= NULL
) *score 
= zzlGetScore(sptr
); 
 590         /* Move to next element. */ 
 591         eptr 
= ziplistNext(zl
,sptr
); 
 598 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we 
 599  * don't want to modify the one given as argument. */ 
 600 unsigned char *zzlDelete(unsigned char *zl
, unsigned char *eptr
) { 
 601     unsigned char *p 
= eptr
; 
 603     /* TODO: add function to ziplist API to delete N elements from offset. */ 
 604     zl 
= ziplistDelete(zl
,&p
); 
 605     zl 
= ziplistDelete(zl
,&p
); 
 609 unsigned char *zzlInsertAt(unsigned char *zl
, unsigned char *eptr
, robj 
*ele
, double score
) { 
 615     redisAssert(ele
->encoding 
== REDIS_ENCODING_RAW
); 
 616     scorelen 
= d2string(scorebuf
,sizeof(scorebuf
),score
); 
 618         zl 
= ziplistPush(zl
,ele
->ptr
,sdslen(ele
->ptr
),ZIPLIST_TAIL
); 
 619         zl 
= ziplistPush(zl
,(unsigned char*)scorebuf
,scorelen
,ZIPLIST_TAIL
); 
 621         /* Keep offset relative to zl, as it might be re-allocated. */ 
 623         zl 
= ziplistInsert(zl
,eptr
,ele
->ptr
,sdslen(ele
->ptr
)); 
 626         /* Insert score after the element. */ 
 627         redisAssert((sptr 
= ziplistNext(zl
,eptr
)) != NULL
); 
 628         zl 
= ziplistInsert(zl
,sptr
,(unsigned char*)scorebuf
,scorelen
); 
 634 /* Insert (element,score) pair in ziplist. This function assumes the element is 
 635  * not yet present in the list. */ 
 636 unsigned char *zzlInsert(unsigned char *zl
, robj 
*ele
, double score
) { 
 637     unsigned char *eptr 
= ziplistIndex(zl
,0), *sptr
; 
 640     ele 
= getDecodedObject(ele
); 
 641     while (eptr 
!= NULL
) { 
 642         sptr 
= ziplistNext(zl
,eptr
); 
 643         redisAssert(sptr 
!= NULL
); 
 644         s 
= zzlGetScore(sptr
); 
 647             /* First element with score larger than score for element to be 
 648              * inserted. This means we should take its spot in the list to 
 649              * maintain ordering. */ 
 650             zl 
= zzlInsertAt(zl
,eptr
,ele
,score
); 
 652         } else if (s 
== score
) { 
 653             /* Ensure lexicographical ordering for elements. */ 
 654             if (zzlCompareElements(eptr
,ele
->ptr
,sdslen(ele
->ptr
)) > 0) { 
 655                 zl 
= zzlInsertAt(zl
,eptr
,ele
,score
); 
 660         /* Move to next element. */ 
 661         eptr 
= ziplistNext(zl
,sptr
); 
 664     /* Push on tail of list when it was not yet inserted. */ 
 666         zl 
= zzlInsertAt(zl
,NULL
,ele
,score
); 
 672 unsigned char *zzlDeleteRangeByScore(unsigned char *zl
, zrangespec range
, unsigned long *deleted
) { 
 673     unsigned char *eptr
, *sptr
; 
 675     unsigned long num 
= 0; 
 677     if (deleted 
!= NULL
) *deleted 
= 0; 
 679     eptr 
= zzlFirstInRange(zl
,range
); 
 680     if (eptr 
== NULL
) return zl
; 
 682     /* When the tail of the ziplist is deleted, eptr will point to the sentinel 
 683      * byte and ziplistNext will return NULL. */ 
 684     while ((sptr 
= ziplistNext(zl
,eptr
)) != NULL
) { 
 685         score 
= zzlGetScore(sptr
); 
 686         if (zslValueLteMax(score
,&range
)) { 
 687             /* Delete both the element and the score. */ 
 688             zl 
= ziplistDelete(zl
,&eptr
); 
 689             zl 
= ziplistDelete(zl
,&eptr
); 
 692             /* No longer in range. */ 
 697     if (deleted 
!= NULL
) *deleted 
= num
; 
 701 /* Delete all the elements with rank between start and end from the skiplist. 
 702  * Start and end are inclusive. Note that start and end need to be 1-based */ 
 703 unsigned char *zzlDeleteRangeByRank(unsigned char *zl
, unsigned int start
, unsigned int end
, unsigned long *deleted
) { 
 704     unsigned int num 
= (end
-start
)+1; 
 705     if (deleted
) *deleted 
= num
; 
 706     zl 
= ziplistDeleteRange(zl
,2*(start
-1),2*num
); 
 710 /*----------------------------------------------------------------------------- 
 711  * Common sorted set API 
 712  *----------------------------------------------------------------------------*/ 
 714 unsigned int zsetLength(robj 
*zobj
) { 
 716     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 717         length 
= zzlLength(zobj
->ptr
); 
 718     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
 719         length 
= ((zset
*)zobj
->ptr
)->zsl
->length
; 
 721         redisPanic("Unknown sorted set encoding"); 
 726 void zsetConvert(robj 
*zobj
, int encoding
) { 
 728     zskiplistNode 
*node
, *next
; 
 732     if (zobj
->encoding 
== encoding
) return; 
 733     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 734         unsigned char *zl 
= zobj
->ptr
; 
 735         unsigned char *eptr
, *sptr
; 
 740         if (encoding 
!= REDIS_ENCODING_SKIPLIST
) 
 741             redisPanic("Unknown target encoding"); 
 743         zs 
= zmalloc(sizeof(*zs
)); 
 744         zs
->dict 
= dictCreate(&zsetDictType
,NULL
); 
 745         zs
->zsl 
= zslCreate(); 
 747         eptr 
= ziplistIndex(zl
,0); 
 748         redisAssert(eptr 
!= NULL
); 
 749         sptr 
= ziplistNext(zl
,eptr
); 
 750         redisAssert(sptr 
!= NULL
); 
 752         while (eptr 
!= NULL
) { 
 753             score 
= zzlGetScore(sptr
); 
 754             redisAssert(ziplistGet(eptr
,&vstr
,&vlen
,&vlong
)); 
 756                 ele 
= createStringObjectFromLongLong(vlong
); 
 758                 ele 
= createStringObject((char*)vstr
,vlen
); 
 760             /* Has incremented refcount since it was just created. */ 
 761             node 
= zslInsert(zs
->zsl
,score
,ele
); 
 762             redisAssert(dictAdd(zs
->dict
,ele
,&node
->score
) == DICT_OK
); 
 763             incrRefCount(ele
); /* Added to dictionary. */ 
 764             zzlNext(zl
,&eptr
,&sptr
); 
 769         zobj
->encoding 
= REDIS_ENCODING_SKIPLIST
; 
 770     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
 771         unsigned char *zl 
= ziplistNew(); 
 773         if (encoding 
!= REDIS_ENCODING_ZIPLIST
) 
 774             redisPanic("Unknown target encoding"); 
 776         /* Approach similar to zslFree(), since we want to free the skiplist at 
 777          * the same time as creating the ziplist. */ 
 779         dictRelease(zs
->dict
); 
 780         node 
= zs
->zsl
->header
->level
[0].forward
; 
 781         zfree(zs
->zsl
->header
); 
 785             ele 
= getDecodedObject(node
->obj
); 
 786             zl 
= zzlInsertAt(zl
,NULL
,ele
,node
->score
); 
 789             next 
= node
->level
[0].forward
; 
 796         zobj
->encoding 
= REDIS_ENCODING_ZIPLIST
; 
 798         redisPanic("Unknown sorted set encoding"); 
 802 /*----------------------------------------------------------------------------- 
 803  * Sorted set commands  
 804  *----------------------------------------------------------------------------*/ 
 806 /* This generic command implements both ZADD and ZINCRBY. */ 
 807 void zaddGenericCommand(redisClient 
*c
, int incr
) { 
 808     static char *nanerr 
= "resulting score is not a number (NaN)"; 
 809     robj 
*key 
= c
->argv
[1]; 
 813     double score
, curscore 
= 0.0; 
 815     if (getDoubleFromObjectOrReply(c
,c
->argv
[2],&score
,NULL
) != REDIS_OK
) 
 818     zobj 
= lookupKeyWrite(c
->db
,key
); 
 820         if (server
.zset_max_ziplist_entries 
== 0 || 
 821             server
.zset_max_ziplist_value 
< sdslen(c
->argv
[3]->ptr
)) 
 823             zobj 
= createZsetObject(); 
 825             zobj 
= createZsetZiplistObject(); 
 827         dbAdd(c
->db
,key
,zobj
); 
 829         if (zobj
->type 
!= REDIS_ZSET
) { 
 830             addReply(c
,shared
.wrongtypeerr
); 
 835     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 838         /* Prefer non-encoded element when dealing with ziplists. */ 
 840         if ((eptr 
= zzlFind(zobj
->ptr
,ele
,&curscore
)) != NULL
) { 
 844                     addReplyError(c
,nanerr
); 
 845                     /* Don't need to check if the sorted set is empty, because 
 846                      * we know it has at least one element. */ 
 851             /* Remove and re-insert when score changed. */ 
 852             if (score 
!= curscore
) { 
 853                 zobj
->ptr 
= zzlDelete(zobj
->ptr
,eptr
); 
 854                 zobj
->ptr 
= zzlInsert(zobj
->ptr
,ele
,score
); 
 856                 signalModifiedKey(c
->db
,key
); 
 860             if (incr
) /* ZINCRBY */ 
 861                 addReplyDouble(c
,score
); 
 863                 addReply(c
,shared
.czero
); 
 865             /* Optimize: check if the element is too large or the list becomes 
 866              * too long *before* executing zzlInsert. */ 
 867             zobj
->ptr 
= zzlInsert(zobj
->ptr
,ele
,score
); 
 868             if (zzlLength(zobj
->ptr
) > server
.zset_max_ziplist_entries
) 
 869                 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
); 
 870             if (sdslen(ele
->ptr
) > server
.zset_max_ziplist_value
) 
 871                 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
); 
 873             signalModifiedKey(c
->db
,key
); 
 876             if (incr
) /* ZINCRBY */ 
 877                 addReplyDouble(c
,score
); 
 879                 addReply(c
,shared
.cone
); 
 881     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
 882         zset 
*zs 
= zobj
->ptr
; 
 883         zskiplistNode 
*znode
; 
 886         ele 
= c
->argv
[3] = tryObjectEncoding(c
->argv
[3]); 
 887         de 
= dictFind(zs
->dict
,ele
); 
 889             curobj 
= dictGetEntryKey(de
); 
 890             curscore 
= *(double*)dictGetEntryVal(de
); 
 895                     addReplyError(c
,nanerr
); 
 896                     /* Don't need to check if the sorted set is empty, because 
 897                      * we know it has at least one element. */ 
 902             /* Remove and re-insert when score changed. We can safely delete 
 903              * the key object from the skiplist, since the dictionary still has 
 904              * a reference to it. */ 
 905             if (score 
!= curscore
) { 
 906                 redisAssert(zslDelete(zs
->zsl
,curscore
,curobj
)); 
 907                 znode 
= zslInsert(zs
->zsl
,score
,curobj
); 
 908                 incrRefCount(curobj
); /* Re-inserted in skiplist. */ 
 909                 dictGetEntryVal(de
) = &znode
->score
; /* Update score ptr. */ 
 911                 signalModifiedKey(c
->db
,key
); 
 915             if (incr
) /* ZINCRBY */ 
 916                 addReplyDouble(c
,score
); 
 918                 addReply(c
,shared
.czero
); 
 920             znode 
= zslInsert(zs
->zsl
,score
,ele
); 
 921             incrRefCount(ele
); /* Inserted in skiplist. */ 
 922             redisAssert(dictAdd(zs
->dict
,ele
,&znode
->score
) == DICT_OK
); 
 923             incrRefCount(ele
); /* Added to dictionary. */ 
 925             signalModifiedKey(c
->db
,key
); 
 928             if (incr
) /* ZINCRBY */ 
 929                 addReplyDouble(c
,score
); 
 931                 addReply(c
,shared
.cone
); 
 934         redisPanic("Unknown sorted set encoding"); 
 938 void zaddCommand(redisClient 
*c
) { 
 939     zaddGenericCommand(c
,0); 
 942 void zincrbyCommand(redisClient 
*c
) { 
 943     zaddGenericCommand(c
,1); 
 946 void zremCommand(redisClient 
*c
) { 
 947     robj 
*key 
= c
->argv
[1]; 
 948     robj 
*ele 
= c
->argv
[2]; 
 951     if ((zobj 
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL 
|| 
 952         checkType(c
,zobj
,REDIS_ZSET
)) return; 
 954     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
 957         if ((eptr 
= zzlFind(zobj
->ptr
,ele
,NULL
)) != NULL
) { 
 958             zobj
->ptr 
= zzlDelete(zobj
->ptr
,eptr
); 
 959             if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
); 
 961             addReply(c
,shared
.czero
); 
 964     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
 965         zset 
*zs 
= zobj
->ptr
; 
 969         de 
= dictFind(zs
->dict
,ele
); 
 971             /* Delete from the skiplist */ 
 972             score 
= *(double*)dictGetEntryVal(de
); 
 973             redisAssert(zslDelete(zs
->zsl
,score
,ele
)); 
 975             /* Delete from the hash table */ 
 976             dictDelete(zs
->dict
,ele
); 
 977             if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
 978             if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
); 
 980             addReply(c
,shared
.czero
); 
 984         redisPanic("Unknown sorted set encoding"); 
 987     signalModifiedKey(c
->db
,key
); 
 989     addReply(c
,shared
.cone
); 
 992 void zremrangebyscoreCommand(redisClient 
*c
) { 
 993     robj 
*key 
= c
->argv
[1]; 
 996     unsigned long deleted
; 
 998     /* Parse the range arguments. */ 
 999     if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) { 
1000         addReplyError(c
,"min or max is not a double"); 
1004     if ((zobj 
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL 
|| 
1005         checkType(c
,zobj
,REDIS_ZSET
)) return; 
1007     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1008         zobj
->ptr 
= zzlDeleteRangeByScore(zobj
->ptr
,range
,&deleted
); 
1009         if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
); 
1010     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1011         zset 
*zs 
= zobj
->ptr
; 
1012         deleted 
= zslDeleteRangeByScore(zs
->zsl
,range
,zs
->dict
); 
1013         if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
1014         if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
); 
1016         redisPanic("Unknown sorted set encoding"); 
1019     if (deleted
) signalModifiedKey(c
->db
,key
); 
1020     server
.dirty 
+= deleted
; 
1021     addReplyLongLong(c
,deleted
); 
1024 void zremrangebyrankCommand(redisClient 
*c
) { 
1025     robj 
*key 
= c
->argv
[1]; 
1030     unsigned long deleted
; 
1032     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
1033         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
1035     if ((zobj 
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL 
|| 
1036         checkType(c
,zobj
,REDIS_ZSET
)) return; 
1038     /* Sanitize indexes. */ 
1039     llen 
= zsetLength(zobj
); 
1040     if (start 
< 0) start 
= llen
+start
; 
1041     if (end 
< 0) end 
= llen
+end
; 
1042     if (start 
< 0) start 
= 0; 
1044     /* Invariant: start >= 0, so this test will be true when end < 0. 
1045      * The range is empty when start > end or start >= length. */ 
1046     if (start 
> end 
|| start 
>= llen
) { 
1047         addReply(c
,shared
.czero
); 
1050     if (end 
>= llen
) end 
= llen
-1; 
1052     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1053         /* Correct for 1-based rank. */ 
1054         zobj
->ptr 
= zzlDeleteRangeByRank(zobj
->ptr
,start
+1,end
+1,&deleted
); 
1055         if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
); 
1056     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1057         zset 
*zs 
= zobj
->ptr
; 
1059         /* Correct for 1-based rank. */ 
1060         deleted 
= zslDeleteRangeByRank(zs
->zsl
,start
+1,end
+1,zs
->dict
); 
1061         if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
); 
1062         if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
); 
1064         redisPanic("Unknown sorted set encoding"); 
1067     if (deleted
) signalModifiedKey(c
->db
,key
); 
1068     server
.dirty 
+= deleted
; 
1069     addReplyLongLong(c
,deleted
); 
1074     int type
; /* Set, sorted set */ 
1079         /* Set iterators. */ 
1092         /* Sorted set iterators. */ 
1096                 unsigned char *eptr
, *sptr
; 
1100                 zskiplistNode 
*node
; 
1107 /* Use dirty flags for pointers that need to be cleaned up in the next 
1108  * iteration over the zsetopval. The dirty flag for the long long value is 
1109  * special, since long long values don't need cleanup. Instead, it means that 
1110  * we already checked that "ell" holds a long long, or tried to convert another 
1111  * representation into a long long value. When this was successful, 
1112  * OPVAL_VALID_LL is set as well. */ 
1113 #define OPVAL_DIRTY_ROBJ 1 
1114 #define OPVAL_DIRTY_LL 2 
1115 #define OPVAL_VALID_LL 4 
1117 /* Store value retrieved from the iterator. */ 
1120     unsigned char _buf
[32]; /* Private buffer. */ 
1122     unsigned char *estr
; 
1128 typedef union _iterset iterset
; 
1129 typedef union _iterzset iterzset
; 
1131 void zuiInitIterator(zsetopsrc 
*op
) { 
1132     if (op
->subject 
== NULL
) 
1135     if (op
->type 
== REDIS_SET
) { 
1136         iterset 
*it 
= &op
->iter
.set
; 
1137         if (op
->encoding 
== REDIS_ENCODING_INTSET
) { 
1138             it
->is
.is 
= op
->subject
->ptr
; 
1140         } else if (op
->encoding 
== REDIS_ENCODING_HT
) { 
1141             it
->ht
.dict 
= op
->subject
->ptr
; 
1142             it
->ht
.di 
= dictGetIterator(op
->subject
->ptr
); 
1143             it
->ht
.de 
= dictNext(it
->ht
.di
); 
1145             redisPanic("Unknown set encoding"); 
1147     } else if (op
->type 
== REDIS_ZSET
) { 
1148         iterzset 
*it 
= &op
->iter
.zset
; 
1149         if (op
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1150             it
->zl
.zl 
= op
->subject
->ptr
; 
1151             it
->zl
.eptr 
= ziplistIndex(it
->zl
.zl
,0); 
1152             if (it
->zl
.eptr 
!= NULL
) { 
1153                 it
->zl
.sptr 
= ziplistNext(it
->zl
.zl
,it
->zl
.eptr
); 
1154                 redisAssert(it
->zl
.sptr 
!= NULL
); 
1156         } else if (op
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1157             it
->sl
.zs 
= op
->subject
->ptr
; 
1158             it
->sl
.node 
= it
->sl
.zs
->zsl
->header
->level
[0].forward
; 
1160             redisPanic("Unknown sorted set encoding"); 
1163         redisPanic("Unsupported type"); 
1167 void zuiClearIterator(zsetopsrc 
*op
) { 
1168     if (op
->subject 
== NULL
) 
1171     if (op
->type 
== REDIS_SET
) { 
1172         iterset 
*it 
= &op
->iter
.set
; 
1173         if (op
->encoding 
== REDIS_ENCODING_INTSET
) { 
1174             REDIS_NOTUSED(it
); /* skip */ 
1175         } else if (op
->encoding 
== REDIS_ENCODING_HT
) { 
1176             dictReleaseIterator(it
->ht
.di
); 
1178             redisPanic("Unknown set encoding"); 
1180     } else if (op
->type 
== REDIS_ZSET
) { 
1181         iterzset 
*it 
= &op
->iter
.zset
; 
1182         if (op
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1183             REDIS_NOTUSED(it
); /* skip */ 
1184         } else if (op
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1185             REDIS_NOTUSED(it
); /* skip */ 
1187             redisPanic("Unknown sorted set encoding"); 
1190         redisPanic("Unsupported type"); 
1194 int zuiLength(zsetopsrc 
*op
) { 
1195     if (op
->subject 
== NULL
) 
1198     if (op
->type 
== REDIS_SET
) { 
1199         iterset 
*it 
= &op
->iter
.set
; 
1200         if (op
->encoding 
== REDIS_ENCODING_INTSET
) { 
1201             return intsetLen(it
->is
.is
); 
1202         } else if (op
->encoding 
== REDIS_ENCODING_HT
) { 
1203             return dictSize(it
->ht
.dict
); 
1205             redisPanic("Unknown set encoding"); 
1207     } else if (op
->type 
== REDIS_ZSET
) { 
1208         iterzset 
*it 
= &op
->iter
.zset
; 
1209         if (op
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1210             return zzlLength(it
->zl
.zl
); 
1211         } else if (op
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1212             return it
->sl
.zs
->zsl
->length
; 
1214             redisPanic("Unknown sorted set encoding"); 
1217         redisPanic("Unsupported type"); 
1221 /* Check if the current value is valid. If so, store it in the passed structure 
1222  * and move to the next element. If not valid, this means we have reached the 
1223  * end of the structure and can abort. */ 
1224 int zuiNext(zsetopsrc 
*op
, zsetopval 
*val
) { 
1225     if (op
->subject 
== NULL
) 
1228     if (val
->flags 
& OPVAL_DIRTY_ROBJ
) 
1229         decrRefCount(val
->ele
); 
1231     bzero(val
,sizeof(zsetopval
)); 
1233     if (op
->type 
== REDIS_SET
) { 
1234         iterset 
*it 
= &op
->iter
.set
; 
1235         if (op
->encoding 
== REDIS_ENCODING_INTSET
) { 
1236             if (!intsetGet(it
->is
.is
,it
->is
.ii
,(int64_t*)&val
->ell
)) 
1240             /* Move to next element. */ 
1242         } else if (op
->encoding 
== REDIS_ENCODING_HT
) { 
1243             if (it
->ht
.de 
== NULL
) 
1245             val
->ele 
= dictGetEntryKey(it
->ht
.de
); 
1248             /* Move to next element. */ 
1249             it
->ht
.de 
= dictNext(it
->ht
.di
); 
1251             redisPanic("Unknown set encoding"); 
1253     } else if (op
->type 
== REDIS_ZSET
) { 
1254         iterzset 
*it 
= &op
->iter
.zset
; 
1255         if (op
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1256             /* No need to check both, but better be explicit. */ 
1257             if (it
->zl
.eptr 
== NULL 
|| it
->zl
.sptr 
== NULL
) 
1259             redisAssert(ziplistGet(it
->zl
.eptr
,&val
->estr
,&val
->elen
,&val
->ell
)); 
1260             val
->score 
= zzlGetScore(it
->zl
.sptr
); 
1262             /* Move to next element. */ 
1263             zzlNext(it
->zl
.zl
,&it
->zl
.eptr
,&it
->zl
.sptr
); 
1264         } else if (op
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1265             if (it
->sl
.node 
== NULL
) 
1267             val
->ele 
= it
->sl
.node
->obj
; 
1268             val
->score 
= it
->sl
.node
->score
; 
1270             /* Move to next element. */ 
1271             it
->sl
.node 
= it
->sl
.node
->level
[0].forward
; 
1273             redisPanic("Unknown sorted set encoding"); 
1276         redisPanic("Unsupported type"); 
1281 int zuiLongLongFromValue(zsetopval 
*val
) { 
1282     if (!(val
->flags 
& OPVAL_DIRTY_LL
)) { 
1283         val
->flags 
|= OPVAL_DIRTY_LL
; 
1285         if (val
->ele 
!= NULL
) { 
1286             if (val
->ele
->encoding 
== REDIS_ENCODING_INT
) { 
1287                 val
->ell 
= (long)val
->ele
->ptr
; 
1288                 val
->flags 
|= OPVAL_VALID_LL
; 
1289             } else if (val
->ele
->encoding 
== REDIS_ENCODING_RAW
) { 
1290                 if (string2ll(val
->ele
->ptr
,sdslen(val
->ele
->ptr
),&val
->ell
)) 
1291                     val
->flags 
|= OPVAL_VALID_LL
; 
1293                 redisPanic("Unsupported element encoding"); 
1295         } else if (val
->estr 
!= NULL
) { 
1296             if (string2ll((char*)val
->estr
,val
->elen
,&val
->ell
)) 
1297                 val
->flags 
|= OPVAL_VALID_LL
; 
1299             /* The long long was already set, flag as valid. */ 
1300             val
->flags 
|= OPVAL_VALID_LL
; 
1303     return val
->flags 
& OPVAL_VALID_LL
; 
1306 robj 
*zuiObjectFromValue(zsetopval 
*val
) { 
1307     if (val
->ele 
== NULL
) { 
1308         if (val
->estr 
!= NULL
) { 
1309             val
->ele 
= createStringObject((char*)val
->estr
,val
->elen
); 
1311             val
->ele 
= createStringObjectFromLongLong(val
->ell
); 
1313         val
->flags 
|= OPVAL_DIRTY_ROBJ
; 
1318 int zuiBufferFromValue(zsetopval 
*val
) { 
1319     if (val
->estr 
== NULL
) { 
1320         if (val
->ele 
!= NULL
) { 
1321             if (val
->ele
->encoding 
== REDIS_ENCODING_INT
) { 
1322                 val
->elen 
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),(long)val
->ele
->ptr
); 
1323                 val
->estr 
= val
->_buf
; 
1324             } else if (val
->ele
->encoding 
== REDIS_ENCODING_RAW
) { 
1325                 val
->elen 
= sdslen(val
->ele
->ptr
); 
1326                 val
->estr 
= val
->ele
->ptr
; 
1328                 redisPanic("Unsupported element encoding"); 
1331             val
->elen 
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),val
->ell
); 
1332             val
->estr 
= val
->_buf
; 
1338 /* Find value pointed to by val in the source pointer to by op. When found, 
1339  * return 1 and store its score in target. Return 0 otherwise. */ 
1340 int zuiFind(zsetopsrc 
*op
, zsetopval 
*val
, double *score
) { 
1341     if (op
->subject 
== NULL
) 
1344     if (op
->type 
== REDIS_SET
) { 
1345         iterset 
*it 
= &op
->iter
.set
; 
1347         if (op
->encoding 
== REDIS_ENCODING_INTSET
) { 
1348             if (zuiLongLongFromValue(val
) && intsetFind(it
->is
.is
,val
->ell
)) { 
1354         } else if (op
->encoding 
== REDIS_ENCODING_HT
) { 
1355             zuiObjectFromValue(val
); 
1356             if (dictFind(it
->ht
.dict
,val
->ele
) != NULL
) { 
1363             redisPanic("Unknown set encoding"); 
1365     } else if (op
->type 
== REDIS_ZSET
) { 
1366         iterzset 
*it 
= &op
->iter
.zset
; 
1367         zuiObjectFromValue(val
); 
1369         if (op
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1370             if (zzlFind(it
->zl
.zl
,val
->ele
,score
) != NULL
) { 
1371                 /* Score is already set by zzlFind. */ 
1376         } else if (op
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1378             if ((de 
= dictFind(it
->sl
.zs
->dict
,val
->ele
)) != NULL
) { 
1379                 *score 
= *(double*)dictGetEntryVal(de
); 
1385             redisPanic("Unknown sorted set encoding"); 
1388         redisPanic("Unsupported type"); 
1392 int zuiCompareByCardinality(const void *s1
, const void *s2
) { 
1393     return zuiLength((zsetopsrc
*)s1
) - zuiLength((zsetopsrc
*)s2
); 
1396 #define REDIS_AGGR_SUM 1 
1397 #define REDIS_AGGR_MIN 2 
1398 #define REDIS_AGGR_MAX 3 
1399 #define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e)) 
1401 inline static void zunionInterAggregate(double *target
, double val
, int aggregate
) { 
1402     if (aggregate 
== REDIS_AGGR_SUM
) { 
1403         *target 
= *target 
+ val
; 
1404         /* The result of adding two doubles is NaN when one variable 
1405          * is +inf and the other is -inf. When these numbers are added, 
1406          * we maintain the convention of the result being 0.0. */ 
1407         if (isnan(*target
)) *target 
= 0.0; 
1408     } else if (aggregate 
== REDIS_AGGR_MIN
) { 
1409         *target 
= val 
< *target 
? val 
: *target
; 
1410     } else if (aggregate 
== REDIS_AGGR_MAX
) { 
1411         *target 
= val 
> *target 
? val 
: *target
; 
1414         redisPanic("Unknown ZUNION/INTER aggregate type"); 
1418 void zunionInterGenericCommand(redisClient 
*c
, robj 
*dstkey
, int op
) { 
1420     int aggregate 
= REDIS_AGGR_SUM
; 
1424     unsigned int maxelelen 
= 0; 
1427     zskiplistNode 
*znode
; 
1430     /* expect setnum input keys to be given */ 
1431     setnum 
= atoi(c
->argv
[2]->ptr
); 
1434             "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); 
1438     /* test if the expected number of keys would overflow */ 
1439     if (3+setnum 
> c
->argc
) { 
1440         addReply(c
,shared
.syntaxerr
); 
1444     /* read keys to be used for input */ 
1445     src 
= zcalloc(sizeof(zsetopsrc
) * setnum
); 
1446     for (i 
= 0, j 
= 3; i 
< setnum
; i
++, j
++) { 
1447         robj 
*obj 
= lookupKeyWrite(c
->db
,c
->argv
[j
]); 
1449             if (obj
->type 
!= REDIS_ZSET 
&& obj
->type 
!= REDIS_SET
) { 
1451                 addReply(c
,shared
.wrongtypeerr
); 
1455             src
[i
].subject 
= obj
; 
1456             src
[i
].type 
= obj
->type
; 
1457             src
[i
].encoding 
= obj
->encoding
; 
1459             src
[i
].subject 
= NULL
; 
1462         /* Default all weights to 1. */ 
1463         src
[i
].weight 
= 1.0; 
1466     /* parse optional extra arguments */ 
1468         int remaining 
= c
->argc 
- j
; 
1471             if (remaining 
>= (setnum 
+ 1) && !strcasecmp(c
->argv
[j
]->ptr
,"weights")) { 
1473                 for (i 
= 0; i 
< setnum
; i
++, j
++, remaining
--) { 
1474                     if (getDoubleFromObjectOrReply(c
,c
->argv
[j
],&src
[i
].weight
, 
1475                             "weight value is not a double") != REDIS_OK
) 
1481             } else if (remaining 
>= 2 && !strcasecmp(c
->argv
[j
]->ptr
,"aggregate")) { 
1483                 if (!strcasecmp(c
->argv
[j
]->ptr
,"sum")) { 
1484                     aggregate 
= REDIS_AGGR_SUM
; 
1485                 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"min")) { 
1486                     aggregate 
= REDIS_AGGR_MIN
; 
1487                 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"max")) { 
1488                     aggregate 
= REDIS_AGGR_MAX
; 
1491                     addReply(c
,shared
.syntaxerr
); 
1497                 addReply(c
,shared
.syntaxerr
); 
1503     for (i 
= 0; i 
< setnum
; i
++) 
1504         zuiInitIterator(&src
[i
]); 
1506     /* sort sets from the smallest to largest, this will improve our 
1507      * algorithm's performance */ 
1508     qsort(src
,setnum
,sizeof(zsetopsrc
),zuiCompareByCardinality
); 
1510     dstobj 
= createZsetObject(); 
1511     dstzset 
= dstobj
->ptr
; 
1512     memset(&zval
, 0, sizeof(zval
)); 
1514     if (op 
== REDIS_OP_INTER
) { 
1515         /* Skip everything if the smallest input is empty. */ 
1516         if (zuiLength(&src
[0]) > 0) { 
1517             /* Precondition: as src[0] is non-empty and the inputs are ordered 
1518              * by size, all src[i > 0] are non-empty too. */ 
1519             while (zuiNext(&src
[0],&zval
)) { 
1520                 double score
, value
; 
1522                 score 
= src
[0].weight 
* zval
.score
; 
1523                 for (j 
= 1; j 
< setnum
; j
++) { 
1524                     /* It is not safe to access the hash we zset we are 
1525                      * iterating, so explicitly check for equal object. */ 
1526                     if (src
[j
].subject 
== src
[0].subject
) { 
1527                         value 
= zval
.score
*src
[j
].weight
; 
1528                         zunionInterAggregate(&score
,value
,aggregate
); 
1529                     } else if (zuiFind(&src
[j
],&zval
,&value
)) { 
1530                         value 
*= src
[j
].weight
; 
1531                         zunionInterAggregate(&score
,value
,aggregate
); 
1537                 /* Only continue when present in every input. */ 
1539                     tmp 
= zuiObjectFromValue(&zval
); 
1540                     znode 
= zslInsert(dstzset
->zsl
,score
,tmp
); 
1541                     incrRefCount(tmp
); /* added to skiplist */ 
1542                     dictAdd(dstzset
->dict
,tmp
,&znode
->score
); 
1543                     incrRefCount(tmp
); /* added to dictionary */ 
1545                     if (tmp
->encoding 
== REDIS_ENCODING_RAW
) 
1546                         if (sdslen(tmp
->ptr
) > maxelelen
) 
1547                             maxelelen 
= sdslen(tmp
->ptr
); 
1551     } else if (op 
== REDIS_OP_UNION
) { 
1552         for (i 
= 0; i 
< setnum
; i
++) { 
1553             if (zuiLength(&src
[0]) == 0) 
1556             while (zuiNext(&src
[i
],&zval
)) { 
1557                 double score
, value
; 
1559                 /* Skip key when already processed */ 
1560                 if (dictFind(dstzset
->dict
,zuiObjectFromValue(&zval
)) != NULL
) 
1563                 /* Initialize score */ 
1564                 score 
= src
[i
].weight 
* zval
.score
; 
1566                 /* Because the inputs are sorted by size, it's only possible 
1567                  * for sets at larger indices to hold this element. */ 
1568                 for (j 
= (i
+1); j 
< setnum
; j
++) { 
1569                     /* It is not safe to access the hash we zset we are 
1570                      * iterating, so explicitly check for equal object. */ 
1571                     if(src
[j
].subject 
== src
[i
].subject
) { 
1572                         value 
= zval
.score
*src
[j
].weight
; 
1573                         zunionInterAggregate(&score
,value
,aggregate
); 
1574                     } else if (zuiFind(&src
[j
],&zval
,&value
)) { 
1575                         value 
*= src
[j
].weight
; 
1576                         zunionInterAggregate(&score
,value
,aggregate
); 
1580                 tmp 
= zuiObjectFromValue(&zval
); 
1581                 znode 
= zslInsert(dstzset
->zsl
,score
,tmp
); 
1582                 incrRefCount(zval
.ele
); /* added to skiplist */ 
1583                 dictAdd(dstzset
->dict
,tmp
,&znode
->score
); 
1584                 incrRefCount(zval
.ele
); /* added to dictionary */ 
1586                 if (tmp
->encoding 
== REDIS_ENCODING_RAW
) 
1587                     if (sdslen(tmp
->ptr
) > maxelelen
) 
1588                         maxelelen 
= sdslen(tmp
->ptr
); 
1592         redisPanic("Unknown operator"); 
1595     for (i 
= 0; i 
< setnum
; i
++) 
1596         zuiClearIterator(&src
[i
]); 
1598     if (dbDelete(c
->db
,dstkey
)) { 
1599         signalModifiedKey(c
->db
,dstkey
); 
1603     if (dstzset
->zsl
->length
) { 
1604         /* Convert to ziplist when in limits. */ 
1605         if (dstzset
->zsl
->length 
<= server
.zset_max_ziplist_entries 
&& 
1606             maxelelen 
<= server
.zset_max_ziplist_value
) 
1607                 zsetConvert(dstobj
,REDIS_ENCODING_ZIPLIST
); 
1609         dbAdd(c
->db
,dstkey
,dstobj
); 
1610         addReplyLongLong(c
,zsetLength(dstobj
)); 
1611         if (!touched
) signalModifiedKey(c
->db
,dstkey
); 
1614         decrRefCount(dstobj
); 
1615         addReply(c
,shared
.czero
); 
1620 void zunionstoreCommand(redisClient 
*c
) { 
1621     zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_UNION
); 
1624 void zinterstoreCommand(redisClient 
*c
) { 
1625     zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_INTER
); 
1628 void zrangeGenericCommand(redisClient 
*c
, int reverse
) { 
1629     robj 
*key 
= c
->argv
[1]; 
1637     if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) || 
1638         (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return; 
1640     if (c
->argc 
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) { 
1642     } else if (c
->argc 
>= 5) { 
1643         addReply(c
,shared
.syntaxerr
); 
1647     if ((zobj 
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
 
1648          || checkType(c
,zobj
,REDIS_ZSET
)) return; 
1650     /* Sanitize indexes. */ 
1651     llen 
= zsetLength(zobj
); 
1652     if (start 
< 0) start 
= llen
+start
; 
1653     if (end 
< 0) end 
= llen
+end
; 
1654     if (start 
< 0) start 
= 0; 
1656     /* Invariant: start >= 0, so this test will be true when end < 0. 
1657      * The range is empty when start > end or start >= length. */ 
1658     if (start 
> end 
|| start 
>= llen
) { 
1659         addReply(c
,shared
.emptymultibulk
); 
1662     if (end 
>= llen
) end 
= llen
-1; 
1663     rangelen 
= (end
-start
)+1; 
1665     /* Return the result in form of a multi-bulk reply */ 
1666     addReplyMultiBulkLen(c
, withscores 
? (rangelen
*2) : rangelen
); 
1668     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1669         unsigned char *zl 
= zobj
->ptr
; 
1670         unsigned char *eptr
, *sptr
; 
1671         unsigned char *vstr
; 
1676             eptr 
= ziplistIndex(zl
,-2-(2*start
)); 
1678             eptr 
= ziplistIndex(zl
,2*start
); 
1680         redisAssert(eptr 
!= NULL
); 
1681         sptr 
= ziplistNext(zl
,eptr
); 
1683         while (rangelen
--) { 
1684             redisAssert(eptr 
!= NULL 
&& sptr 
!= NULL
); 
1685             redisAssert(ziplistGet(eptr
,&vstr
,&vlen
,&vlong
)); 
1687                 addReplyBulkLongLong(c
,vlong
); 
1689                 addReplyBulkCBuffer(c
,vstr
,vlen
); 
1692                 addReplyDouble(c
,zzlGetScore(sptr
)); 
1695                 zzlPrev(zl
,&eptr
,&sptr
); 
1697                 zzlNext(zl
,&eptr
,&sptr
); 
1700     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1701         zset 
*zs 
= zobj
->ptr
; 
1702         zskiplist 
*zsl 
= zs
->zsl
; 
1706         /* Check if starting point is trivial, before doing log(N) lookup. */ 
1710                 ln 
= zslGetElementByRank(zsl
,llen
-start
); 
1712             ln 
= zsl
->header
->level
[0].forward
; 
1714                 ln 
= zslGetElementByRank(zsl
,start
+1); 
1718             redisAssert(ln 
!= NULL
); 
1720             addReplyBulk(c
,ele
); 
1722                 addReplyDouble(c
,ln
->score
); 
1723             ln 
= reverse 
? ln
->backward 
: ln
->level
[0].forward
; 
1726         redisPanic("Unknown sorted set encoding"); 
1730 void zrangeCommand(redisClient 
*c
) { 
1731     zrangeGenericCommand(c
,0); 
1734 void zrevrangeCommand(redisClient 
*c
) { 
1735     zrangeGenericCommand(c
,1); 
1738 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT. 
1739  * If "justcount", only the number of elements in the range is returned. */ 
1740 void genericZrangebyscoreCommand(redisClient 
*c
, int reverse
, int justcount
) { 
1742     robj 
*key 
= c
->argv
[1]; 
1743     robj 
*emptyreply
, *zobj
; 
1744     int offset 
= 0, limit 
= -1; 
1746     unsigned long rangelen 
= 0; 
1747     void *replylen 
= NULL
; 
1750     /* Parse the range arguments. */ 
1752         /* Range is given as [max,min] */ 
1753         maxidx 
= 2; minidx 
= 3; 
1755         /* Range is given as [min,max] */ 
1756         minidx 
= 2; maxidx 
= 3; 
1759     if (zslParseRange(c
->argv
[minidx
],c
->argv
[maxidx
],&range
) != REDIS_OK
) { 
1760         addReplyError(c
,"min or max is not a double"); 
1764     /* Parse optional extra arguments. Note that ZCOUNT will exactly have 
1765      * 4 arguments, so we'll never enter the following code path. */ 
1767         int remaining 
= c
->argc 
- 4; 
1771             if (remaining 
>= 1 && !strcasecmp(c
->argv
[pos
]->ptr
,"withscores")) { 
1774             } else if (remaining 
>= 3 && !strcasecmp(c
->argv
[pos
]->ptr
,"limit")) { 
1775                 offset 
= atoi(c
->argv
[pos
+1]->ptr
); 
1776                 limit 
= atoi(c
->argv
[pos
+2]->ptr
); 
1777                 pos 
+= 3; remaining 
-= 3; 
1779                 addReply(c
,shared
.syntaxerr
); 
1785     /* Ok, lookup the key and get the range */ 
1786     emptyreply 
= justcount 
? shared
.czero 
: shared
.emptymultibulk
; 
1787     if ((zobj 
= lookupKeyReadOrReply(c
,key
,emptyreply
)) == NULL 
|| 
1788         checkType(c
,zobj
,REDIS_ZSET
)) return; 
1790     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1791         unsigned char *zl 
= zobj
->ptr
; 
1792         unsigned char *eptr
, *sptr
; 
1793         unsigned char *vstr
; 
1798         /* If reversed, get the last node in range as starting point. */ 
1800             eptr 
= zzlLastInRange(zl
,range
); 
1802             eptr 
= zzlFirstInRange(zl
,range
); 
1804         /* No "first" element in the specified interval. */ 
1806             addReply(c
,emptyreply
); 
1810         /* Get score pointer for the first element. */ 
1811         redisAssert(eptr 
!= NULL
); 
1812         sptr 
= ziplistNext(zl
,eptr
); 
1814         /* We don't know in advance how many matching elements there are in the 
1815          * list, so we push this object that will represent the multi-bulk 
1816          * length in the output buffer, and will "fix" it later */ 
1818             replylen 
= addDeferredMultiBulkLength(c
); 
1820         /* If there is an offset, just traverse the number of elements without 
1821          * checking the score because that is done in the next loop. */ 
1822         while (eptr 
&& offset
--) 
1824                 zzlPrev(zl
,&eptr
,&sptr
); 
1826                 zzlNext(zl
,&eptr
,&sptr
); 
1828         while (eptr 
&& limit
--) { 
1829             score 
= zzlGetScore(sptr
); 
1831             /* Abort when the node is no longer in range. */ 
1833                 if (!zslValueGteMin(score
,&range
)) break; 
1835                 if (!zslValueLteMax(score
,&range
)) break; 
1841                 redisAssert(ziplistGet(eptr
,&vstr
,&vlen
,&vlong
)); 
1843                     addReplyBulkLongLong(c
,vlong
); 
1845                     addReplyBulkCBuffer(c
,vstr
,vlen
); 
1848                     addReplyDouble(c
,score
); 
1851             /* Move to next node */ 
1853                 zzlPrev(zl
,&eptr
,&sptr
); 
1855                 zzlNext(zl
,&eptr
,&sptr
); 
1857     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1858         zset 
*zs 
= zobj
->ptr
; 
1859         zskiplist 
*zsl 
= zs
->zsl
; 
1862         /* If reversed, get the last node in range as starting point. */ 
1864             ln 
= zslLastInRange(zsl
,range
); 
1866             ln 
= zslFirstInRange(zsl
,range
); 
1868         /* No "first" element in the specified interval. */ 
1870             addReply(c
,emptyreply
); 
1874         /* We don't know in advance how many matching elements there are in the 
1875          * list, so we push this object that will represent the multi-bulk 
1876          * length in the output buffer, and will "fix" it later */ 
1878             replylen 
= addDeferredMultiBulkLength(c
); 
1880         /* If there is an offset, just traverse the number of elements without 
1881          * checking the score because that is done in the next loop. */ 
1882         while (ln 
&& offset
--) 
1883             ln 
= reverse 
? ln
->backward 
: ln
->level
[0].forward
; 
1885         while (ln 
&& limit
--) { 
1886             /* Abort when the node is no longer in range. */ 
1888                 if (!zslValueGteMin(ln
->score
,&range
)) break; 
1890                 if (!zslValueLteMax(ln
->score
,&range
)) break; 
1896                 addReplyBulk(c
,ln
->obj
); 
1898                     addReplyDouble(c
,ln
->score
); 
1901             /* Move to next node */ 
1902             ln 
= reverse 
? ln
->backward 
: ln
->level
[0].forward
; 
1905         redisPanic("Unknown sorted set encoding"); 
1909         addReplyLongLong(c
,(long)rangelen
); 
1911         if (withscores
) rangelen 
*= 2; 
1912         setDeferredMultiBulkLength(c
,replylen
,rangelen
); 
1916 void zrangebyscoreCommand(redisClient 
*c
) { 
1917     genericZrangebyscoreCommand(c
,0,0); 
1920 void zrevrangebyscoreCommand(redisClient 
*c
) { 
1921     genericZrangebyscoreCommand(c
,1,0); 
1924 void zcountCommand(redisClient 
*c
) { 
1925     genericZrangebyscoreCommand(c
,0,1); 
1928 void zcardCommand(redisClient 
*c
) { 
1929     robj 
*key 
= c
->argv
[1]; 
1932     if ((zobj 
= lookupKeyReadOrReply(c
,key
,shared
.czero
)) == NULL 
|| 
1933         checkType(c
,zobj
,REDIS_ZSET
)) return; 
1935     addReplyLongLong(c
,zsetLength(zobj
)); 
1938 void zscoreCommand(redisClient 
*c
) { 
1939     robj 
*key 
= c
->argv
[1]; 
1943     if ((zobj 
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL 
|| 
1944         checkType(c
,zobj
,REDIS_ZSET
)) return; 
1946     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1947         if (zzlFind(zobj
->ptr
,c
->argv
[2],&score
) != NULL
) 
1948             addReplyDouble(c
,score
); 
1950             addReply(c
,shared
.nullbulk
); 
1951     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
1952         zset 
*zs 
= zobj
->ptr
; 
1955         c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
1956         de 
= dictFind(zs
->dict
,c
->argv
[2]); 
1958             score 
= *(double*)dictGetEntryVal(de
); 
1959             addReplyDouble(c
,score
); 
1961             addReply(c
,shared
.nullbulk
); 
1964         redisPanic("Unknown sorted set encoding"); 
1968 void zrankGenericCommand(redisClient 
*c
, int reverse
) { 
1969     robj 
*key 
= c
->argv
[1]; 
1970     robj 
*ele 
= c
->argv
[2]; 
1975     if ((zobj 
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL 
|| 
1976         checkType(c
,zobj
,REDIS_ZSET
)) return; 
1977     llen 
= zsetLength(zobj
); 
1979     redisAssert(ele
->encoding 
== REDIS_ENCODING_RAW
); 
1980     if (zobj
->encoding 
== REDIS_ENCODING_ZIPLIST
) { 
1981         unsigned char *zl 
= zobj
->ptr
; 
1982         unsigned char *eptr
, *sptr
; 
1984         eptr 
= ziplistIndex(zl
,0); 
1985         redisAssert(eptr 
!= NULL
); 
1986         sptr 
= ziplistNext(zl
,eptr
); 
1987         redisAssert(sptr 
!= NULL
); 
1990         while(eptr 
!= NULL
) { 
1991             if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
))) 
1994             zzlNext(zl
,&eptr
,&sptr
); 
1999                 addReplyLongLong(c
,llen
-rank
); 
2001                 addReplyLongLong(c
,rank
-1); 
2003             addReply(c
,shared
.nullbulk
); 
2005     } else if (zobj
->encoding 
== REDIS_ENCODING_SKIPLIST
) { 
2006         zset 
*zs 
= zobj
->ptr
; 
2007         zskiplist 
*zsl 
= zs
->zsl
; 
2011         ele 
= c
->argv
[2] = tryObjectEncoding(c
->argv
[2]); 
2012         de 
= dictFind(zs
->dict
,ele
); 
2014             score 
= *(double*)dictGetEntryVal(de
); 
2015             rank 
= zslGetRank(zsl
,score
,ele
); 
2016             redisAssert(rank
); /* Existing elements always have a rank. */ 
2018                 addReplyLongLong(c
,llen
-rank
); 
2020                 addReplyLongLong(c
,rank
-1); 
2022             addReply(c
,shared
.nullbulk
); 
2025         redisPanic("Unknown sorted set encoding"); 
2029 void zrankCommand(redisClient 
*c
) { 
2030     zrankGenericCommand(c
, 0); 
2033 void zrevrankCommand(redisClient 
*c
) { 
2034     zrankGenericCommand(c
, 1);