]> git.saurik.com Git - redis.git/blob - src/t_zset.c
Make zzl API unaware of the robj where the ziplist is stored
[redis.git] / src / t_zset.c
1 #include "redis.h"
2
3 #include <math.h>
4
5 /*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31 }
32
33 zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48 }
49
50 void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
52 zfree(node);
53 }
54
55 void zslFree(zskiplist *zsl) {
56 zskiplistNode *node = zsl->header->level[0].forward, *next;
57
58 zfree(zsl->header);
59 while(node) {
60 next = node->level[0].forward;
61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65 }
66
67 int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72 }
73
74 zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
101 update[i]->level[i].span = zsl->length;
102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
109
110 /* update span covered by update[i] as x is inserted here */
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
117 update[i]->level[i].span++;
118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
123 else
124 zsl->tail = x;
125 zsl->length++;
126 return x;
127 }
128
129 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
136 } else {
137 update[i]->level[i].span -= 1;
138 }
139 }
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
142 } else {
143 zsl->tail = x->backward;
144 }
145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
146 zsl->level--;
147 zsl->length--;
148 }
149
150 /* Delete an element with matching score/object from the skiplist. */
151 int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
166 x = x->level[0].forward;
167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175 }
176
177 /* Struct to hold a inclusive/exclusive range spec. */
178 typedef struct {
179 double min, max;
180 int minex, maxex; /* are min or max exclusive? */
181 } zrangespec;
182
183 static int zslValueGteMin(double value, zrangespec *spec) {
184 return spec->minex ? (value > spec->min) : (value >= spec->min);
185 }
186
187 static int zslValueLteMax(double value, zrangespec *spec) {
188 return spec->maxex ? (value < spec->max) : (value <= spec->max);
189 }
190
191 static int zslValueInRange(double value, zrangespec *spec) {
192 return zslValueGteMin(value,spec) && zslValueLteMax(value,spec);
193 }
194
195 /* Returns if there is a part of the zset is in range. */
196 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
197 zskiplistNode *x;
198
199 /* Test for ranges that will always be empty. */
200 if (range->min > range->max ||
201 (range->min == range->max && (range->minex || range->maxex)))
202 return 0;
203 x = zsl->tail;
204 if (x == NULL || !zslValueGteMin(x->score,range))
205 return 0;
206 x = zsl->header->level[0].forward;
207 if (x == NULL || !zslValueLteMax(x->score,range))
208 return 0;
209 return 1;
210 }
211
212 /* Find the first node that is contained in the specified range.
213 * Returns NULL when no element is contained in the range. */
214 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
215 zskiplistNode *x;
216 int i;
217
218 /* If everything is out of range, return early. */
219 if (!zslIsInRange(zsl,&range)) return NULL;
220
221 x = zsl->header;
222 for (i = zsl->level-1; i >= 0; i--) {
223 /* Go forward while *OUT* of range. */
224 while (x->level[i].forward &&
225 !zslValueGteMin(x->level[i].forward->score,&range))
226 x = x->level[i].forward;
227 }
228
229 /* The tail is in range, so the previous block should always return a
230 * node that is non-NULL and the last one to be out of range. */
231 x = x->level[0].forward;
232 redisAssert(x != NULL && zslValueInRange(x->score,&range));
233 return x;
234 }
235
236 /* Find the last node that is contained in the specified range.
237 * Returns NULL when no element is contained in the range. */
238 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
239 zskiplistNode *x;
240 int i;
241
242 /* If everything is out of range, return early. */
243 if (!zslIsInRange(zsl,&range)) return NULL;
244
245 x = zsl->header;
246 for (i = zsl->level-1; i >= 0; i--) {
247 /* Go forward while *IN* range. */
248 while (x->level[i].forward &&
249 zslValueLteMax(x->level[i].forward->score,&range))
250 x = x->level[i].forward;
251 }
252
253 /* The header is in range, so the previous block should always return a
254 * node that is non-NULL and in range. */
255 redisAssert(x != NULL && zslValueInRange(x->score,&range));
256 return x;
257 }
258
259 /* Delete all the elements with score between min and max from the skiplist.
260 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
261 * Note that this function takes the reference to the hash table view of the
262 * sorted set, in order to remove the elements from the hash table too. */
263 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
264 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
265 unsigned long removed = 0;
266 int i;
267
268 x = zsl->header;
269 for (i = zsl->level-1; i >= 0; i--) {
270 while (x->level[i].forward && (range.minex ?
271 x->level[i].forward->score <= range.min :
272 x->level[i].forward->score < range.min))
273 x = x->level[i].forward;
274 update[i] = x;
275 }
276
277 /* Current node is the last with score < or <= min. */
278 x = x->level[0].forward;
279
280 /* Delete nodes while in range. */
281 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
282 zskiplistNode *next = x->level[0].forward;
283 zslDeleteNode(zsl,x,update);
284 dictDelete(dict,x->obj);
285 zslFreeNode(x);
286 removed++;
287 x = next;
288 }
289 return removed;
290 }
291
292 /* Delete all the elements with rank between start and end from the skiplist.
293 * Start and end are inclusive. Note that start and end need to be 1-based */
294 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
295 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
296 unsigned long traversed = 0, removed = 0;
297 int i;
298
299 x = zsl->header;
300 for (i = zsl->level-1; i >= 0; i--) {
301 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
302 traversed += x->level[i].span;
303 x = x->level[i].forward;
304 }
305 update[i] = x;
306 }
307
308 traversed++;
309 x = x->level[0].forward;
310 while (x && traversed <= end) {
311 zskiplistNode *next = x->level[0].forward;
312 zslDeleteNode(zsl,x,update);
313 dictDelete(dict,x->obj);
314 zslFreeNode(x);
315 removed++;
316 traversed++;
317 x = next;
318 }
319 return removed;
320 }
321
322 /* Find the rank for an element by both score and key.
323 * Returns 0 when the element cannot be found, rank otherwise.
324 * Note that the rank is 1-based due to the span of zsl->header to the
325 * first element. */
326 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
327 zskiplistNode *x;
328 unsigned long rank = 0;
329 int i;
330
331 x = zsl->header;
332 for (i = zsl->level-1; i >= 0; i--) {
333 while (x->level[i].forward &&
334 (x->level[i].forward->score < score ||
335 (x->level[i].forward->score == score &&
336 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
337 rank += x->level[i].span;
338 x = x->level[i].forward;
339 }
340
341 /* x might be equal to zsl->header, so test if obj is non-NULL */
342 if (x->obj && equalStringObjects(x->obj,o)) {
343 return rank;
344 }
345 }
346 return 0;
347 }
348
349 /* Finds an element by its rank. The rank argument needs to be 1-based. */
350 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
351 zskiplistNode *x;
352 unsigned long traversed = 0;
353 int i;
354
355 x = zsl->header;
356 for (i = zsl->level-1; i >= 0; i--) {
357 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
358 {
359 traversed += x->level[i].span;
360 x = x->level[i].forward;
361 }
362 if (traversed == rank) {
363 return x;
364 }
365 }
366 return NULL;
367 }
368
369 /* Populate the rangespec according to the objects min and max. */
370 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
371 char *eptr;
372 spec->minex = spec->maxex = 0;
373
374 /* Parse the min-max interval. If one of the values is prefixed
375 * by the "(" character, it's considered "open". For instance
376 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
377 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
378 if (min->encoding == REDIS_ENCODING_INT) {
379 spec->min = (long)min->ptr;
380 } else {
381 if (((char*)min->ptr)[0] == '(') {
382 spec->min = strtod((char*)min->ptr+1,&eptr);
383 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
384 spec->minex = 1;
385 } else {
386 spec->min = strtod((char*)min->ptr,&eptr);
387 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
388 }
389 }
390 if (max->encoding == REDIS_ENCODING_INT) {
391 spec->max = (long)max->ptr;
392 } else {
393 if (((char*)max->ptr)[0] == '(') {
394 spec->max = strtod((char*)max->ptr+1,&eptr);
395 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
396 spec->maxex = 1;
397 } else {
398 spec->max = strtod((char*)max->ptr,&eptr);
399 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
400 }
401 }
402
403 return REDIS_OK;
404 }
405
406 /*-----------------------------------------------------------------------------
407 * Ziplist-backed sorted set API
408 *----------------------------------------------------------------------------*/
409
410 double zzlGetScore(unsigned char *sptr) {
411 unsigned char *vstr;
412 unsigned int vlen;
413 long long vlong;
414 char buf[128];
415 double score;
416
417 redisAssert(sptr != NULL);
418 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
419
420 if (vstr) {
421 memcpy(buf,vstr,vlen);
422 buf[vlen] = '\0';
423 score = strtod(buf,NULL);
424 } else {
425 score = vlong;
426 }
427
428 return score;
429 }
430
431 /* Compare element in sorted set with given element. */
432 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
433 unsigned char *vstr;
434 unsigned int vlen;
435 long long vlong;
436 unsigned char vbuf[32];
437 int minlen, cmp;
438
439 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
440 if (vstr == NULL) {
441 /* Store string representation of long long in buf. */
442 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
443 vstr = vbuf;
444 }
445
446 minlen = (vlen < clen) ? vlen : clen;
447 cmp = memcmp(vstr,cstr,minlen);
448 if (cmp == 0) return vlen-clen;
449 return cmp;
450 }
451
452 unsigned int zzlLength(unsigned char *zl) {
453 return ziplistLen(zl)/2;
454 }
455
456 /* Move to next entry based on the values in eptr and sptr. Both are set to
457 * NULL when there is no next entry. */
458 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
459 unsigned char *_eptr, *_sptr;
460 redisAssert(*eptr != NULL && *sptr != NULL);
461
462 _eptr = ziplistNext(zl,*sptr);
463 if (_eptr != NULL) {
464 _sptr = ziplistNext(zl,_eptr);
465 redisAssert(_sptr != NULL);
466 } else {
467 /* No next entry. */
468 _sptr = NULL;
469 }
470
471 *eptr = _eptr;
472 *sptr = _sptr;
473 }
474
475 /* Move to the previous entry based on the values in eptr and sptr. Both are
476 * set to NULL when there is no next entry. */
477 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
478 unsigned char *_eptr, *_sptr;
479 redisAssert(*eptr != NULL && *sptr != NULL);
480
481 _sptr = ziplistPrev(zl,*eptr);
482 if (_sptr != NULL) {
483 _eptr = ziplistPrev(zl,_sptr);
484 redisAssert(_eptr != NULL);
485 } else {
486 /* No previous entry. */
487 _eptr = NULL;
488 }
489
490 *eptr = _eptr;
491 *sptr = _sptr;
492 }
493
494 /* Returns if there is a part of the zset is in range. Should only be used
495 * internally by zzlFirstInRange and zzlLastInRange. */
496 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
497 unsigned char *p;
498 double score;
499
500 /* Test for ranges that will always be empty. */
501 if (range->min > range->max ||
502 (range->min == range->max && (range->minex || range->maxex)))
503 return 0;
504
505 p = ziplistIndex(zl,-1); /* Last score. */
506 redisAssert(p != NULL);
507 score = zzlGetScore(p);
508 if (!zslValueGteMin(score,range))
509 return 0;
510
511 p = ziplistIndex(zl,1); /* First score. */
512 redisAssert(p != NULL);
513 score = zzlGetScore(p);
514 if (!zslValueLteMax(score,range))
515 return 0;
516
517 return 1;
518 }
519
520 /* Find pointer to the first element contained in the specified range.
521 * Returns NULL when no element is contained in the range. */
522 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
523 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
524 double score;
525
526 /* If everything is out of range, return early. */
527 if (!zzlIsInRange(zl,&range)) return NULL;
528
529 while (eptr != NULL) {
530 sptr = ziplistNext(zl,eptr);
531 redisAssert(sptr != NULL);
532
533 score = zzlGetScore(sptr);
534 if (zslValueGteMin(score,&range))
535 return eptr;
536
537 /* Move to next element. */
538 eptr = ziplistNext(zl,sptr);
539 }
540
541 return NULL;
542 }
543
544 /* Find pointer to the last element contained in the specified range.
545 * Returns NULL when no element is contained in the range. */
546 unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) {
547 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
548 double score;
549
550 /* If everything is out of range, return early. */
551 if (!zzlIsInRange(zl,&range)) return NULL;
552
553 while (eptr != NULL) {
554 sptr = ziplistNext(zl,eptr);
555 redisAssert(sptr != NULL);
556
557 score = zzlGetScore(sptr);
558 if (zslValueLteMax(score,&range))
559 return eptr;
560
561 /* Move to previous element by moving to the score of previous element.
562 * When this returns NULL, we know there also is no element. */
563 sptr = ziplistPrev(zl,eptr);
564 if (sptr != NULL)
565 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
566 else
567 eptr = NULL;
568 }
569
570 return NULL;
571 }
572
573 unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
574 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
575
576 ele = getDecodedObject(ele);
577 while (eptr != NULL) {
578 sptr = ziplistNext(zl,eptr);
579 redisAssert(sptr != NULL);
580
581 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
582 /* Matching element, pull out score. */
583 if (score != NULL) *score = zzlGetScore(sptr);
584 decrRefCount(ele);
585 return eptr;
586 }
587
588 /* Move to next element. */
589 eptr = ziplistNext(zl,sptr);
590 }
591
592 decrRefCount(ele);
593 return NULL;
594 }
595
596 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
597 * don't want to modify the one given as argument. */
598 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
599 unsigned char *p = eptr;
600
601 /* TODO: add function to ziplist API to delete N elements from offset. */
602 zl = ziplistDelete(zl,&p);
603 zl = ziplistDelete(zl,&p);
604 return zl;
605 }
606
607 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
608 unsigned char *sptr;
609 char scorebuf[128];
610 int scorelen;
611 int offset;
612
613 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
614 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
615 if (eptr == NULL) {
616 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
617 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
618 } else {
619 /* Keep offset relative to zl, as it might be re-allocated. */
620 offset = eptr-zl;
621 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
622 eptr = zl+offset;
623
624 /* Insert score after the element. */
625 redisAssert((sptr = ziplistNext(zl,eptr)) != NULL);
626 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
627 }
628
629 return zl;
630 }
631
632 /* Insert (element,score) pair in ziplist. This function assumes the element is
633 * not yet present in the list. */
634 unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
635 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
636 double s;
637
638 ele = getDecodedObject(ele);
639 while (eptr != NULL) {
640 sptr = ziplistNext(zl,eptr);
641 redisAssert(sptr != NULL);
642 s = zzlGetScore(sptr);
643
644 if (s > score) {
645 /* First element with score larger than score for element to be
646 * inserted. This means we should take its spot in the list to
647 * maintain ordering. */
648 zl = zzlInsertAt(zl,eptr,ele,score);
649 break;
650 } else if (s == score) {
651 /* Ensure lexicographical ordering for elements. */
652 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
653 zl = zzlInsertAt(zl,eptr,ele,score);
654 break;
655 }
656 }
657
658 /* Move to next element. */
659 eptr = ziplistNext(zl,sptr);
660 }
661
662 /* Push on tail of list when it was not yet inserted. */
663 if (eptr == NULL)
664 zl = zzlInsertAt(zl,NULL,ele,score);
665
666 decrRefCount(ele);
667 return zl;
668 }
669
670 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
671 unsigned char *eptr, *sptr;
672 double score;
673 unsigned long num = 0;
674
675 if (deleted != NULL) *deleted = 0;
676
677 eptr = zzlFirstInRange(zl,range);
678 if (eptr == NULL) return zl;
679
680 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
681 * byte and ziplistNext will return NULL. */
682 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
683 score = zzlGetScore(sptr);
684 if (zslValueLteMax(score,&range)) {
685 /* Delete both the element and the score. */
686 zl = ziplistDelete(zl,&eptr);
687 zl = ziplistDelete(zl,&eptr);
688 num++;
689 } else {
690 /* No longer in range. */
691 break;
692 }
693 }
694
695 if (deleted != NULL) *deleted = num;
696 return zl;
697 }
698
699 /* Delete all the elements with rank between start and end from the skiplist.
700 * Start and end are inclusive. Note that start and end need to be 1-based */
701 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
702 unsigned int num = (end-start)+1;
703 if (deleted) *deleted = num;
704 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
705 return zl;
706 }
707
708 /*-----------------------------------------------------------------------------
709 * Common sorted set API
710 *----------------------------------------------------------------------------*/
711
712 unsigned int zsetLength(robj *zobj) {
713 int length = -1;
714 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
715 length = zzlLength(zobj->ptr);
716 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
717 length = ((zset*)zobj->ptr)->zsl->length;
718 } else {
719 redisPanic("Unknown sorted set encoding");
720 }
721 return length;
722 }
723
724 void zsetConvert(robj *zobj, int encoding) {
725 zset *zs;
726 zskiplistNode *node, *next;
727 robj *ele;
728 double score;
729
730 if (zobj->encoding == encoding) return;
731 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
732 unsigned char *zl = zobj->ptr;
733 unsigned char *eptr, *sptr;
734 unsigned char *vstr;
735 unsigned int vlen;
736 long long vlong;
737
738 if (encoding != REDIS_ENCODING_RAW)
739 redisPanic("Unknown target encoding");
740
741 zs = zmalloc(sizeof(*zs));
742 zs->dict = dictCreate(&zsetDictType,NULL);
743 zs->zsl = zslCreate();
744
745 eptr = ziplistIndex(zl,0);
746 redisAssert(eptr != NULL);
747 sptr = ziplistNext(zl,eptr);
748 redisAssert(sptr != NULL);
749
750 while (eptr != NULL) {
751 score = zzlGetScore(sptr);
752 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
753 if (vstr == NULL)
754 ele = createStringObjectFromLongLong(vlong);
755 else
756 ele = createStringObject((char*)vstr,vlen);
757
758 /* Has incremented refcount since it was just created. */
759 node = zslInsert(zs->zsl,score,ele);
760 redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
761 incrRefCount(ele); /* Added to dictionary. */
762 zzlNext(zl,&eptr,&sptr);
763 }
764
765 zfree(zobj->ptr);
766 zobj->ptr = zs;
767 zobj->encoding = REDIS_ENCODING_RAW;
768 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
769 unsigned char *zl = ziplistNew();
770
771 if (encoding != REDIS_ENCODING_ZIPLIST)
772 redisPanic("Unknown target encoding");
773
774 /* Approach similar to zslFree(), since we want to free the skiplist at
775 * the same time as creating the ziplist. */
776 zs = zobj->ptr;
777 dictRelease(zs->dict);
778 node = zs->zsl->header->level[0].forward;
779 zfree(zs->zsl->header);
780 zfree(zs->zsl);
781
782 while (node) {
783 ele = getDecodedObject(node->obj);
784 zl = zzlInsertAt(zl,NULL,ele,node->score);
785 decrRefCount(ele);
786
787 next = node->level[0].forward;
788 zslFreeNode(node);
789 node = next;
790 }
791
792 zfree(zs);
793 zobj->ptr = zl;
794 zobj->encoding = REDIS_ENCODING_ZIPLIST;
795 } else {
796 redisPanic("Unknown sorted set encoding");
797 }
798 }
799
800 /*-----------------------------------------------------------------------------
801 * Sorted set commands
802 *----------------------------------------------------------------------------*/
803
804 /* This generic command implements both ZADD and ZINCRBY. */
805 void zaddGenericCommand(redisClient *c, int incr) {
806 static char *nanerr = "resulting score is not a number (NaN)";
807 robj *key = c->argv[1];
808 robj *ele;
809 robj *zobj;
810 robj *curobj;
811 double score, curscore = 0.0;
812
813 if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
814 return;
815
816 zobj = lookupKeyWrite(c->db,key);
817 if (zobj == NULL) {
818 if (server.zset_max_ziplist_entries == 0 ||
819 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
820 {
821 zobj = createZsetObject();
822 } else {
823 zobj = createZsetZiplistObject();
824 }
825 dbAdd(c->db,key,zobj);
826 } else {
827 if (zobj->type != REDIS_ZSET) {
828 addReply(c,shared.wrongtypeerr);
829 return;
830 }
831 }
832
833 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
834 unsigned char *eptr;
835
836 /* Prefer non-encoded element when dealing with ziplists. */
837 ele = c->argv[3];
838 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
839 if (incr) {
840 score += curscore;
841 if (isnan(score)) {
842 addReplyError(c,nanerr);
843 /* Don't need to check if the sorted set is empty, because
844 * we know it has at least one element. */
845 return;
846 }
847 }
848
849 /* Remove and re-insert when score changed. */
850 if (score != curscore) {
851 zobj->ptr = zzlDelete(zobj->ptr,eptr);
852 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
853
854 signalModifiedKey(c->db,key);
855 server.dirty++;
856 }
857
858 if (incr) /* ZINCRBY */
859 addReplyDouble(c,score);
860 else /* ZADD */
861 addReply(c,shared.czero);
862 } else {
863 /* Optimize: check if the element is too large or the list becomes
864 * too long *before* executing zzlInsert. */
865 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
866 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
867 zsetConvert(zobj,REDIS_ENCODING_RAW);
868 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
869 zsetConvert(zobj,REDIS_ENCODING_RAW);
870
871 signalModifiedKey(c->db,key);
872 server.dirty++;
873
874 if (incr) /* ZINCRBY */
875 addReplyDouble(c,score);
876 else /* ZADD */
877 addReply(c,shared.cone);
878 }
879 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
880 zset *zs = zobj->ptr;
881 zskiplistNode *znode;
882 dictEntry *de;
883
884 ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
885 de = dictFind(zs->dict,ele);
886 if (de != NULL) {
887 curobj = dictGetEntryKey(de);
888 curscore = *(double*)dictGetEntryVal(de);
889
890 if (incr) {
891 score += curscore;
892 if (isnan(score)) {
893 addReplyError(c,nanerr);
894 /* Don't need to check if the sorted set is empty, because
895 * we know it has at least one element. */
896 return;
897 }
898 }
899
900 /* Remove and re-insert when score changed. We can safely delete
901 * the key object from the skiplist, since the dictionary still has
902 * a reference to it. */
903 if (score != curscore) {
904 redisAssert(zslDelete(zs->zsl,curscore,curobj));
905 znode = zslInsert(zs->zsl,score,curobj);
906 incrRefCount(curobj); /* Re-inserted in skiplist. */
907 dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
908
909 signalModifiedKey(c->db,key);
910 server.dirty++;
911 }
912
913 if (incr) /* ZINCRBY */
914 addReplyDouble(c,score);
915 else /* ZADD */
916 addReply(c,shared.czero);
917 } else {
918 znode = zslInsert(zs->zsl,score,ele);
919 incrRefCount(ele); /* Inserted in skiplist. */
920 redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
921 incrRefCount(ele); /* Added to dictionary. */
922
923 signalModifiedKey(c->db,key);
924 server.dirty++;
925
926 if (incr) /* ZINCRBY */
927 addReplyDouble(c,score);
928 else /* ZADD */
929 addReply(c,shared.cone);
930 }
931 } else {
932 redisPanic("Unknown sorted set encoding");
933 }
934 }
935
936 void zaddCommand(redisClient *c) {
937 zaddGenericCommand(c,0);
938 }
939
940 void zincrbyCommand(redisClient *c) {
941 zaddGenericCommand(c,1);
942 }
943
944 void zremCommand(redisClient *c) {
945 robj *key = c->argv[1];
946 robj *ele = c->argv[2];
947 robj *zobj;
948
949 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
950 checkType(c,zobj,REDIS_ZSET)) return;
951
952 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
953 unsigned char *eptr;
954
955 if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
956 zobj->ptr = zzlDelete(zobj->ptr,eptr);
957 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
958 } else {
959 addReply(c,shared.czero);
960 return;
961 }
962 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
963 zset *zs = zobj->ptr;
964 dictEntry *de;
965 double score;
966
967 de = dictFind(zs->dict,ele);
968 if (de != NULL) {
969 /* Delete from the skiplist */
970 score = *(double*)dictGetEntryVal(de);
971 redisAssert(zslDelete(zs->zsl,score,ele));
972
973 /* Delete from the hash table */
974 dictDelete(zs->dict,ele);
975 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
976 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
977 } else {
978 addReply(c,shared.czero);
979 return;
980 }
981 } else {
982 redisPanic("Unknown sorted set encoding");
983 }
984
985 signalModifiedKey(c->db,key);
986 server.dirty++;
987 addReply(c,shared.cone);
988 }
989
990 void zremrangebyscoreCommand(redisClient *c) {
991 robj *key = c->argv[1];
992 robj *zobj;
993 zrangespec range;
994 unsigned long deleted;
995
996 /* Parse the range arguments. */
997 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
998 addReplyError(c,"min or max is not a double");
999 return;
1000 }
1001
1002 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1003 checkType(c,zobj,REDIS_ZSET)) return;
1004
1005 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1006 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
1007 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1008 zset *zs = zobj->ptr;
1009 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1010 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1011 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1012 } else {
1013 redisPanic("Unknown sorted set encoding");
1014 }
1015
1016 if (deleted) signalModifiedKey(c->db,key);
1017 server.dirty += deleted;
1018 addReplyLongLong(c,deleted);
1019 }
1020
1021 void zremrangebyrankCommand(redisClient *c) {
1022 robj *key = c->argv[1];
1023 robj *zobj;
1024 long start;
1025 long end;
1026 int llen;
1027 unsigned long deleted;
1028
1029 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1030 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1031
1032 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1033 checkType(c,zobj,REDIS_ZSET)) return;
1034
1035 /* Sanitize indexes. */
1036 llen = zsetLength(zobj);
1037 if (start < 0) start = llen+start;
1038 if (end < 0) end = llen+end;
1039 if (start < 0) start = 0;
1040
1041 /* Invariant: start >= 0, so this test will be true when end < 0.
1042 * The range is empty when start > end or start >= length. */
1043 if (start > end || start >= llen) {
1044 addReply(c,shared.czero);
1045 return;
1046 }
1047 if (end >= llen) end = llen-1;
1048
1049 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1050 /* Correct for 1-based rank. */
1051 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1052 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1053 zset *zs = zobj->ptr;
1054
1055 /* Correct for 1-based rank. */
1056 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1057 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1058 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1059 } else {
1060 redisPanic("Unknown sorted set encoding");
1061 }
1062
1063 if (deleted) signalModifiedKey(c->db,key);
1064 server.dirty += deleted;
1065 addReplyLongLong(c,deleted);
1066 }
1067
1068 typedef struct {
1069 robj *subject;
1070 int type; /* Set, sorted set */
1071 int encoding;
1072 double weight;
1073
1074 union {
1075 /* Set iterators. */
1076 union _iterset {
1077 struct {
1078 intset *is;
1079 int ii;
1080 } is;
1081 struct {
1082 dict *dict;
1083 dictIterator *di;
1084 dictEntry *de;
1085 } ht;
1086 } set;
1087
1088 /* Sorted set iterators. */
1089 union _iterzset {
1090 struct {
1091 unsigned char *zl;
1092 unsigned char *eptr, *sptr;
1093 } zl;
1094 struct {
1095 zset *zs;
1096 zskiplistNode *node;
1097 } sl;
1098 } zset;
1099 } iter;
1100 } zsetopsrc;
1101
1102
1103 /* Use dirty flags for pointers that need to be cleaned up in the next
1104 * iteration over the zsetopval. The dirty flag for the long long value is
1105 * special, since long long values don't need cleanup. Instead, it means that
1106 * we already checked that "ell" holds a long long, or tried to convert another
1107 * representation into a long long value. When this was successful,
1108 * OPVAL_VALID_LL is set as well. */
1109 #define OPVAL_DIRTY_ROBJ 1
1110 #define OPVAL_DIRTY_LL 2
1111 #define OPVAL_VALID_LL 4
1112
1113 /* Store value retrieved from the iterator. */
1114 typedef struct {
1115 int flags;
1116 unsigned char _buf[32]; /* Private buffer. */
1117 robj *ele;
1118 unsigned char *estr;
1119 unsigned int elen;
1120 long long ell;
1121 double score;
1122 } zsetopval;
1123
1124 typedef union _iterset iterset;
1125 typedef union _iterzset iterzset;
1126
1127 void zuiInitIterator(zsetopsrc *op) {
1128 if (op->subject == NULL)
1129 return;
1130
1131 if (op->type == REDIS_SET) {
1132 iterset *it = &op->iter.set;
1133 if (op->encoding == REDIS_ENCODING_INTSET) {
1134 it->is.is = op->subject->ptr;
1135 it->is.ii = 0;
1136 } else if (op->encoding == REDIS_ENCODING_HT) {
1137 it->ht.dict = op->subject->ptr;
1138 it->ht.di = dictGetIterator(op->subject->ptr);
1139 it->ht.de = dictNext(it->ht.di);
1140 } else {
1141 redisPanic("Unknown set encoding");
1142 }
1143 } else if (op->type == REDIS_ZSET) {
1144 iterzset *it = &op->iter.zset;
1145 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1146 it->zl.zl = op->subject->ptr;
1147 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1148 if (it->zl.eptr != NULL) {
1149 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1150 redisAssert(it->zl.sptr != NULL);
1151 }
1152 } else if (op->encoding == REDIS_ENCODING_RAW) {
1153 it->sl.zs = op->subject->ptr;
1154 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1155 } else {
1156 redisPanic("Unknown sorted set encoding");
1157 }
1158 } else {
1159 redisPanic("Unsupported type");
1160 }
1161 }
1162
1163 void zuiClearIterator(zsetopsrc *op) {
1164 if (op->subject == NULL)
1165 return;
1166
1167 if (op->type == REDIS_SET) {
1168 iterset *it = &op->iter.set;
1169 if (op->encoding == REDIS_ENCODING_INTSET) {
1170 REDIS_NOTUSED(it); /* skip */
1171 } else if (op->encoding == REDIS_ENCODING_HT) {
1172 dictReleaseIterator(it->ht.di);
1173 } else {
1174 redisPanic("Unknown set encoding");
1175 }
1176 } else if (op->type == REDIS_ZSET) {
1177 iterzset *it = &op->iter.zset;
1178 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1179 REDIS_NOTUSED(it); /* skip */
1180 } else if (op->encoding == REDIS_ENCODING_RAW) {
1181 REDIS_NOTUSED(it); /* skip */
1182 } else {
1183 redisPanic("Unknown sorted set encoding");
1184 }
1185 } else {
1186 redisPanic("Unsupported type");
1187 }
1188 }
1189
1190 int zuiLength(zsetopsrc *op) {
1191 if (op->subject == NULL)
1192 return 0;
1193
1194 if (op->type == REDIS_SET) {
1195 iterset *it = &op->iter.set;
1196 if (op->encoding == REDIS_ENCODING_INTSET) {
1197 return intsetLen(it->is.is);
1198 } else if (op->encoding == REDIS_ENCODING_HT) {
1199 return dictSize(it->ht.dict);
1200 } else {
1201 redisPanic("Unknown set encoding");
1202 }
1203 } else if (op->type == REDIS_ZSET) {
1204 iterzset *it = &op->iter.zset;
1205 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1206 return zzlLength(it->zl.zl);
1207 } else if (op->encoding == REDIS_ENCODING_RAW) {
1208 return it->sl.zs->zsl->length;
1209 } else {
1210 redisPanic("Unknown sorted set encoding");
1211 }
1212 } else {
1213 redisPanic("Unsupported type");
1214 }
1215 }
1216
1217 /* Check if the current value is valid. If so, store it in the passed structure
1218 * and move to the next element. If not valid, this means we have reached the
1219 * end of the structure and can abort. */
1220 int zuiNext(zsetopsrc *op, zsetopval *val) {
1221 if (op->subject == NULL)
1222 return 0;
1223
1224 if (val->flags & OPVAL_DIRTY_ROBJ)
1225 decrRefCount(val->ele);
1226
1227 bzero(val,sizeof(zsetopval));
1228
1229 if (op->type == REDIS_SET) {
1230 iterset *it = &op->iter.set;
1231 if (op->encoding == REDIS_ENCODING_INTSET) {
1232 if (!intsetGet(it->is.is,it->is.ii,&val->ell))
1233 return 0;
1234 val->score = 1.0;
1235
1236 /* Move to next element. */
1237 it->is.ii++;
1238 } else if (op->encoding == REDIS_ENCODING_HT) {
1239 if (it->ht.de == NULL)
1240 return 0;
1241 val->ele = dictGetEntryKey(it->ht.de);
1242 val->score = 1.0;
1243
1244 /* Move to next element. */
1245 it->ht.de = dictNext(it->ht.di);
1246 } else {
1247 redisPanic("Unknown set encoding");
1248 }
1249 } else if (op->type == REDIS_ZSET) {
1250 iterzset *it = &op->iter.zset;
1251 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1252 /* No need to check both, but better be explicit. */
1253 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1254 return 0;
1255 redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1256 val->score = zzlGetScore(it->zl.sptr);
1257
1258 /* Move to next element. */
1259 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
1260 } else if (op->encoding == REDIS_ENCODING_RAW) {
1261 if (it->sl.node == NULL)
1262 return 0;
1263 val->ele = it->sl.node->obj;
1264 val->score = it->sl.node->score;
1265
1266 /* Move to next element. */
1267 it->sl.node = it->sl.node->level[0].forward;
1268 } else {
1269 redisPanic("Unknown sorted set encoding");
1270 }
1271 } else {
1272 redisPanic("Unsupported type");
1273 }
1274 return 1;
1275 }
1276
1277 int zuiLongLongFromValue(zsetopval *val) {
1278 if (!(val->flags & OPVAL_DIRTY_LL)) {
1279 val->flags |= OPVAL_DIRTY_LL;
1280
1281 if (val->ele != NULL) {
1282 if (val->ele->encoding == REDIS_ENCODING_INT) {
1283 val->ell = (long)val->ele->ptr;
1284 val->flags |= OPVAL_VALID_LL;
1285 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1286 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1287 val->flags |= OPVAL_VALID_LL;
1288 } else {
1289 redisPanic("Unsupported element encoding");
1290 }
1291 } else if (val->estr != NULL) {
1292 if (string2ll((char*)val->estr,val->elen,&val->ell))
1293 val->flags |= OPVAL_VALID_LL;
1294 } else {
1295 /* The long long was already set, flag as valid. */
1296 val->flags |= OPVAL_VALID_LL;
1297 }
1298 }
1299 return val->flags & OPVAL_VALID_LL;
1300 }
1301
1302 robj *zuiObjectFromValue(zsetopval *val) {
1303 if (val->ele == NULL) {
1304 if (val->estr != NULL) {
1305 val->ele = createStringObject((char*)val->estr,val->elen);
1306 } else {
1307 val->ele = createStringObjectFromLongLong(val->ell);
1308 }
1309 val->flags |= OPVAL_DIRTY_ROBJ;
1310 }
1311 return val->ele;
1312 }
1313
1314 int zuiBufferFromValue(zsetopval *val) {
1315 if (val->estr == NULL) {
1316 if (val->ele != NULL) {
1317 if (val->ele->encoding == REDIS_ENCODING_INT) {
1318 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1319 val->estr = val->_buf;
1320 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1321 val->elen = sdslen(val->ele->ptr);
1322 val->estr = val->ele->ptr;
1323 } else {
1324 redisPanic("Unsupported element encoding");
1325 }
1326 } else {
1327 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1328 val->estr = val->_buf;
1329 }
1330 }
1331 return 1;
1332 }
1333
1334 /* Find value pointed to by val in the source pointer to by op. When found,
1335 * return 1 and store its score in target. Return 0 otherwise. */
1336 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1337 if (op->subject == NULL)
1338 return 0;
1339
1340 if (op->type == REDIS_SET) {
1341 iterset *it = &op->iter.set;
1342
1343 if (op->encoding == REDIS_ENCODING_INTSET) {
1344 if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1345 *score = 1.0;
1346 return 1;
1347 } else {
1348 return 0;
1349 }
1350 } else if (op->encoding == REDIS_ENCODING_HT) {
1351 zuiObjectFromValue(val);
1352 if (dictFind(it->ht.dict,val->ele) != NULL) {
1353 *score = 1.0;
1354 return 1;
1355 } else {
1356 return 0;
1357 }
1358 } else {
1359 redisPanic("Unknown set encoding");
1360 }
1361 } else if (op->type == REDIS_ZSET) {
1362 iterzset *it = &op->iter.zset;
1363 zuiObjectFromValue(val);
1364
1365 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1366 if (zzlFind(it->zl.zl,val->ele,score) != NULL) {
1367 /* Score is already set by zzlFind. */
1368 return 1;
1369 } else {
1370 return 0;
1371 }
1372 } else if (op->encoding == REDIS_ENCODING_RAW) {
1373 dictEntry *de;
1374 if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
1375 *score = *(double*)dictGetEntryVal(de);
1376 return 1;
1377 } else {
1378 return 0;
1379 }
1380 } else {
1381 redisPanic("Unknown sorted set encoding");
1382 }
1383 } else {
1384 redisPanic("Unsupported type");
1385 }
1386 }
1387
1388 int zuiCompareByCardinality(const void *s1, const void *s2) {
1389 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
1390 }
1391
1392 #define REDIS_AGGR_SUM 1
1393 #define REDIS_AGGR_MIN 2
1394 #define REDIS_AGGR_MAX 3
1395 #define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
1396
1397 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1398 if (aggregate == REDIS_AGGR_SUM) {
1399 *target = *target + val;
1400 /* The result of adding two doubles is NaN when one variable
1401 * is +inf and the other is -inf. When these numbers are added,
1402 * we maintain the convention of the result being 0.0. */
1403 if (isnan(*target)) *target = 0.0;
1404 } else if (aggregate == REDIS_AGGR_MIN) {
1405 *target = val < *target ? val : *target;
1406 } else if (aggregate == REDIS_AGGR_MAX) {
1407 *target = val > *target ? val : *target;
1408 } else {
1409 /* safety net */
1410 redisPanic("Unknown ZUNION/INTER aggregate type");
1411 }
1412 }
1413
1414 void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1415 int i, j, setnum;
1416 int aggregate = REDIS_AGGR_SUM;
1417 zsetopsrc *src;
1418 zsetopval zval;
1419 robj *tmp;
1420 unsigned int maxelelen = 0;
1421 robj *dstobj;
1422 zset *dstzset;
1423 zskiplistNode *znode;
1424 int touched = 0;
1425
1426 /* expect setnum input keys to be given */
1427 setnum = atoi(c->argv[2]->ptr);
1428 if (setnum < 1) {
1429 addReplyError(c,
1430 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1431 return;
1432 }
1433
1434 /* test if the expected number of keys would overflow */
1435 if (3+setnum > c->argc) {
1436 addReply(c,shared.syntaxerr);
1437 return;
1438 }
1439
1440 /* read keys to be used for input */
1441 src = zcalloc(sizeof(zsetopsrc) * setnum);
1442 for (i = 0, j = 3; i < setnum; i++, j++) {
1443 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1444 if (obj != NULL) {
1445 if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
1446 zfree(src);
1447 addReply(c,shared.wrongtypeerr);
1448 return;
1449 }
1450
1451 src[i].subject = obj;
1452 src[i].type = obj->type;
1453 src[i].encoding = obj->encoding;
1454 } else {
1455 src[i].subject = NULL;
1456 }
1457
1458 /* Default all weights to 1. */
1459 src[i].weight = 1.0;
1460 }
1461
1462 /* parse optional extra arguments */
1463 if (j < c->argc) {
1464 int remaining = c->argc - j;
1465
1466 while (remaining) {
1467 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1468 j++; remaining--;
1469 for (i = 0; i < setnum; i++, j++, remaining--) {
1470 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1471 "weight value is not a double") != REDIS_OK)
1472 {
1473 zfree(src);
1474 return;
1475 }
1476 }
1477 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1478 j++; remaining--;
1479 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1480 aggregate = REDIS_AGGR_SUM;
1481 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1482 aggregate = REDIS_AGGR_MIN;
1483 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1484 aggregate = REDIS_AGGR_MAX;
1485 } else {
1486 zfree(src);
1487 addReply(c,shared.syntaxerr);
1488 return;
1489 }
1490 j++; remaining--;
1491 } else {
1492 zfree(src);
1493 addReply(c,shared.syntaxerr);
1494 return;
1495 }
1496 }
1497 }
1498
1499 for (i = 0; i < setnum; i++)
1500 zuiInitIterator(&src[i]);
1501
1502 /* sort sets from the smallest to largest, this will improve our
1503 * algorithm's performance */
1504 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
1505
1506 dstobj = createZsetObject();
1507 dstzset = dstobj->ptr;
1508
1509 if (op == REDIS_OP_INTER) {
1510 /* Skip everything if the smallest input is empty. */
1511 if (zuiLength(&src[0]) > 0) {
1512 /* Precondition: as src[0] is non-empty and the inputs are ordered
1513 * by size, all src[i > 0] are non-empty too. */
1514 while (zuiNext(&src[0],&zval)) {
1515 double score, value;
1516
1517 score = src[0].weight * zval.score;
1518 for (j = 1; j < setnum; j++) {
1519 if (zuiFind(&src[j],&zval,&value)) {
1520 value *= src[j].weight;
1521 zunionInterAggregate(&score,value,aggregate);
1522 } else {
1523 break;
1524 }
1525 }
1526
1527 /* Only continue when present in every input. */
1528 if (j == setnum) {
1529 tmp = zuiObjectFromValue(&zval);
1530 znode = zslInsert(dstzset->zsl,score,tmp);
1531 incrRefCount(tmp); /* added to skiplist */
1532 dictAdd(dstzset->dict,tmp,&znode->score);
1533 incrRefCount(tmp); /* added to dictionary */
1534
1535 if (tmp->encoding == REDIS_ENCODING_RAW)
1536 if (sdslen(tmp->ptr) > maxelelen)
1537 maxelelen = sdslen(tmp->ptr);
1538 }
1539 }
1540 }
1541 } else if (op == REDIS_OP_UNION) {
1542 for (i = 0; i < setnum; i++) {
1543 if (zuiLength(&src[0]) == 0)
1544 continue;
1545
1546 while (zuiNext(&src[i],&zval)) {
1547 double score, value;
1548
1549 /* Skip key when already processed */
1550 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
1551 continue;
1552
1553 /* Initialize score */
1554 score = src[i].weight * zval.score;
1555
1556 /* Because the inputs are sorted by size, it's only possible
1557 * for sets at larger indices to hold this element. */
1558 for (j = (i+1); j < setnum; j++) {
1559 if (zuiFind(&src[j],&zval,&value)) {
1560 value *= src[j].weight;
1561 zunionInterAggregate(&score,value,aggregate);
1562 }
1563 }
1564
1565 tmp = zuiObjectFromValue(&zval);
1566 znode = zslInsert(dstzset->zsl,score,tmp);
1567 incrRefCount(zval.ele); /* added to skiplist */
1568 dictAdd(dstzset->dict,tmp,&znode->score);
1569 incrRefCount(zval.ele); /* added to dictionary */
1570
1571 if (tmp->encoding == REDIS_ENCODING_RAW)
1572 if (sdslen(tmp->ptr) > maxelelen)
1573 maxelelen = sdslen(tmp->ptr);
1574 }
1575 }
1576 } else {
1577 redisPanic("Unknown operator");
1578 }
1579
1580 for (i = 0; i < setnum; i++)
1581 zuiClearIterator(&src[i]);
1582
1583 if (dbDelete(c->db,dstkey)) {
1584 signalModifiedKey(c->db,dstkey);
1585 touched = 1;
1586 server.dirty++;
1587 }
1588 if (dstzset->zsl->length) {
1589 /* Convert to ziplist when in limits. */
1590 if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
1591 maxelelen <= server.zset_max_ziplist_value)
1592 zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
1593
1594 dbAdd(c->db,dstkey,dstobj);
1595 addReplyLongLong(c,zsetLength(dstobj));
1596 if (!touched) signalModifiedKey(c->db,dstkey);
1597 server.dirty++;
1598 } else {
1599 decrRefCount(dstobj);
1600 addReply(c,shared.czero);
1601 }
1602 zfree(src);
1603 }
1604
1605 void zunionstoreCommand(redisClient *c) {
1606 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1607 }
1608
1609 void zinterstoreCommand(redisClient *c) {
1610 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1611 }
1612
1613 void zrangeGenericCommand(redisClient *c, int reverse) {
1614 robj *key = c->argv[1];
1615 robj *zobj;
1616 int withscores = 0;
1617 long start;
1618 long end;
1619 int llen;
1620 int rangelen;
1621
1622 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1623 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1624
1625 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1626 withscores = 1;
1627 } else if (c->argc >= 5) {
1628 addReply(c,shared.syntaxerr);
1629 return;
1630 }
1631
1632 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1633 || checkType(c,zobj,REDIS_ZSET)) return;
1634
1635 /* Sanitize indexes. */
1636 llen = zsetLength(zobj);
1637 if (start < 0) start = llen+start;
1638 if (end < 0) end = llen+end;
1639 if (start < 0) start = 0;
1640
1641 /* Invariant: start >= 0, so this test will be true when end < 0.
1642 * The range is empty when start > end or start >= length. */
1643 if (start > end || start >= llen) {
1644 addReply(c,shared.emptymultibulk);
1645 return;
1646 }
1647 if (end >= llen) end = llen-1;
1648 rangelen = (end-start)+1;
1649
1650 /* Return the result in form of a multi-bulk reply */
1651 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1652
1653 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1654 unsigned char *zl = zobj->ptr;
1655 unsigned char *eptr, *sptr;
1656 unsigned char *vstr;
1657 unsigned int vlen;
1658 long long vlong;
1659
1660 if (reverse)
1661 eptr = ziplistIndex(zl,-2-(2*start));
1662 else
1663 eptr = ziplistIndex(zl,2*start);
1664
1665 redisAssert(eptr != NULL);
1666 sptr = ziplistNext(zl,eptr);
1667
1668 while (rangelen--) {
1669 redisAssert(eptr != NULL && sptr != NULL);
1670 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1671 if (vstr == NULL)
1672 addReplyBulkLongLong(c,vlong);
1673 else
1674 addReplyBulkCBuffer(c,vstr,vlen);
1675
1676 if (withscores)
1677 addReplyDouble(c,zzlGetScore(sptr));
1678
1679 if (reverse)
1680 zzlPrev(zl,&eptr,&sptr);
1681 else
1682 zzlNext(zl,&eptr,&sptr);
1683 }
1684
1685 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1686 zset *zs = zobj->ptr;
1687 zskiplist *zsl = zs->zsl;
1688 zskiplistNode *ln;
1689 robj *ele;
1690
1691 /* Check if starting point is trivial, before doing log(N) lookup. */
1692 if (reverse) {
1693 ln = zsl->tail;
1694 if (start > 0)
1695 ln = zslGetElementByRank(zsl,llen-start);
1696 } else {
1697 ln = zsl->header->level[0].forward;
1698 if (start > 0)
1699 ln = zslGetElementByRank(zsl,start+1);
1700 }
1701
1702 while(rangelen--) {
1703 redisAssert(ln != NULL);
1704 ele = ln->obj;
1705 addReplyBulk(c,ele);
1706 if (withscores)
1707 addReplyDouble(c,ln->score);
1708 ln = reverse ? ln->backward : ln->level[0].forward;
1709 }
1710 } else {
1711 redisPanic("Unknown sorted set encoding");
1712 }
1713 }
1714
1715 void zrangeCommand(redisClient *c) {
1716 zrangeGenericCommand(c,0);
1717 }
1718
1719 void zrevrangeCommand(redisClient *c) {
1720 zrangeGenericCommand(c,1);
1721 }
1722
1723 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
1724 * If "justcount", only the number of elements in the range is returned. */
1725 void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
1726 zrangespec range;
1727 robj *key = c->argv[1];
1728 robj *emptyreply, *zobj;
1729 int offset = 0, limit = -1;
1730 int withscores = 0;
1731 unsigned long rangelen = 0;
1732 void *replylen = NULL;
1733 int minidx, maxidx;
1734
1735 /* Parse the range arguments. */
1736 if (reverse) {
1737 /* Range is given as [max,min] */
1738 maxidx = 2; minidx = 3;
1739 } else {
1740 /* Range is given as [min,max] */
1741 minidx = 2; maxidx = 3;
1742 }
1743
1744 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
1745 addReplyError(c,"min or max is not a double");
1746 return;
1747 }
1748
1749 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1750 * 4 arguments, so we'll never enter the following code path. */
1751 if (c->argc > 4) {
1752 int remaining = c->argc - 4;
1753 int pos = 4;
1754
1755 while (remaining) {
1756 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1757 pos++; remaining--;
1758 withscores = 1;
1759 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1760 offset = atoi(c->argv[pos+1]->ptr);
1761 limit = atoi(c->argv[pos+2]->ptr);
1762 pos += 3; remaining -= 3;
1763 } else {
1764 addReply(c,shared.syntaxerr);
1765 return;
1766 }
1767 }
1768 }
1769
1770 /* Ok, lookup the key and get the range */
1771 emptyreply = justcount ? shared.czero : shared.emptymultibulk;
1772 if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL ||
1773 checkType(c,zobj,REDIS_ZSET)) return;
1774
1775 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1776 unsigned char *zl = zobj->ptr;
1777 unsigned char *eptr, *sptr;
1778 unsigned char *vstr;
1779 unsigned int vlen;
1780 long long vlong;
1781 double score;
1782
1783 /* If reversed, get the last node in range as starting point. */
1784 if (reverse)
1785 eptr = zzlLastInRange(zl,range);
1786 else
1787 eptr = zzlFirstInRange(zl,range);
1788
1789 /* No "first" element in the specified interval. */
1790 if (eptr == NULL) {
1791 addReply(c,emptyreply);
1792 return;
1793 }
1794
1795 /* Get score pointer for the first element. */
1796 redisAssert(eptr != NULL);
1797 sptr = ziplistNext(zl,eptr);
1798
1799 /* We don't know in advance how many matching elements there are in the
1800 * list, so we push this object that will represent the multi-bulk
1801 * length in the output buffer, and will "fix" it later */
1802 if (!justcount)
1803 replylen = addDeferredMultiBulkLength(c);
1804
1805 /* If there is an offset, just traverse the number of elements without
1806 * checking the score because that is done in the next loop. */
1807 while (eptr && offset--)
1808 if (reverse)
1809 zzlPrev(zl,&eptr,&sptr);
1810 else
1811 zzlNext(zl,&eptr,&sptr);
1812
1813 while (eptr && limit--) {
1814 score = zzlGetScore(sptr);
1815
1816 /* Abort when the node is no longer in range. */
1817 if (reverse) {
1818 if (!zslValueGteMin(score,&range)) break;
1819 } else {
1820 if (!zslValueLteMax(score,&range)) break;
1821 }
1822
1823 /* Do our magic */
1824 rangelen++;
1825 if (!justcount) {
1826 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1827 if (vstr == NULL)
1828 addReplyBulkLongLong(c,vlong);
1829 else
1830 addReplyBulkCBuffer(c,vstr,vlen);
1831
1832 if (withscores)
1833 addReplyDouble(c,score);
1834 }
1835
1836 /* Move to next node */
1837 if (reverse)
1838 zzlPrev(zl,&eptr,&sptr);
1839 else
1840 zzlNext(zl,&eptr,&sptr);
1841 }
1842 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1843 zset *zs = zobj->ptr;
1844 zskiplist *zsl = zs->zsl;
1845 zskiplistNode *ln;
1846
1847 /* If reversed, get the last node in range as starting point. */
1848 if (reverse)
1849 ln = zslLastInRange(zsl,range);
1850 else
1851 ln = zslFirstInRange(zsl,range);
1852
1853 /* No "first" element in the specified interval. */
1854 if (ln == NULL) {
1855 addReply(c,emptyreply);
1856 return;
1857 }
1858
1859 /* We don't know in advance how many matching elements there are in the
1860 * list, so we push this object that will represent the multi-bulk
1861 * length in the output buffer, and will "fix" it later */
1862 if (!justcount)
1863 replylen = addDeferredMultiBulkLength(c);
1864
1865 /* If there is an offset, just traverse the number of elements without
1866 * checking the score because that is done in the next loop. */
1867 while (ln && offset--)
1868 ln = reverse ? ln->backward : ln->level[0].forward;
1869
1870 while (ln && limit--) {
1871 /* Abort when the node is no longer in range. */
1872 if (reverse) {
1873 if (!zslValueGteMin(ln->score,&range)) break;
1874 } else {
1875 if (!zslValueLteMax(ln->score,&range)) break;
1876 }
1877
1878 /* Do our magic */
1879 rangelen++;
1880 if (!justcount) {
1881 addReplyBulk(c,ln->obj);
1882 if (withscores)
1883 addReplyDouble(c,ln->score);
1884 }
1885
1886 /* Move to next node */
1887 ln = reverse ? ln->backward : ln->level[0].forward;
1888 }
1889 } else {
1890 redisPanic("Unknown sorted set encoding");
1891 }
1892
1893 if (justcount) {
1894 addReplyLongLong(c,(long)rangelen);
1895 } else {
1896 if (withscores) rangelen *= 2;
1897 setDeferredMultiBulkLength(c,replylen,rangelen);
1898 }
1899 }
1900
1901 void zrangebyscoreCommand(redisClient *c) {
1902 genericZrangebyscoreCommand(c,0,0);
1903 }
1904
1905 void zrevrangebyscoreCommand(redisClient *c) {
1906 genericZrangebyscoreCommand(c,1,0);
1907 }
1908
1909 void zcountCommand(redisClient *c) {
1910 genericZrangebyscoreCommand(c,0,1);
1911 }
1912
1913 void zcardCommand(redisClient *c) {
1914 robj *key = c->argv[1];
1915 robj *zobj;
1916
1917 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
1918 checkType(c,zobj,REDIS_ZSET)) return;
1919
1920 addReplyLongLong(c,zsetLength(zobj));
1921 }
1922
1923 void zscoreCommand(redisClient *c) {
1924 robj *key = c->argv[1];
1925 robj *zobj;
1926 double score;
1927
1928 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1929 checkType(c,zobj,REDIS_ZSET)) return;
1930
1931 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1932 if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
1933 addReplyDouble(c,score);
1934 else
1935 addReply(c,shared.nullbulk);
1936 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1937 zset *zs = zobj->ptr;
1938 dictEntry *de;
1939
1940 c->argv[2] = tryObjectEncoding(c->argv[2]);
1941 de = dictFind(zs->dict,c->argv[2]);
1942 if (de != NULL) {
1943 score = *(double*)dictGetEntryVal(de);
1944 addReplyDouble(c,score);
1945 } else {
1946 addReply(c,shared.nullbulk);
1947 }
1948 } else {
1949 redisPanic("Unknown sorted set encoding");
1950 }
1951 }
1952
1953 void zrankGenericCommand(redisClient *c, int reverse) {
1954 robj *key = c->argv[1];
1955 robj *ele = c->argv[2];
1956 robj *zobj;
1957 unsigned long llen;
1958 unsigned long rank;
1959
1960 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1961 checkType(c,zobj,REDIS_ZSET)) return;
1962 llen = zsetLength(zobj);
1963
1964 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
1965 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1966 unsigned char *zl = zobj->ptr;
1967 unsigned char *eptr, *sptr;
1968
1969 eptr = ziplistIndex(zl,0);
1970 redisAssert(eptr != NULL);
1971 sptr = ziplistNext(zl,eptr);
1972 redisAssert(sptr != NULL);
1973
1974 rank = 1;
1975 while(eptr != NULL) {
1976 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
1977 break;
1978 rank++;
1979 zzlNext(zl,&eptr,&sptr);
1980 }
1981
1982 if (eptr != NULL) {
1983 if (reverse)
1984 addReplyLongLong(c,llen-rank);
1985 else
1986 addReplyLongLong(c,rank-1);
1987 } else {
1988 addReply(c,shared.nullbulk);
1989 }
1990 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1991 zset *zs = zobj->ptr;
1992 zskiplist *zsl = zs->zsl;
1993 dictEntry *de;
1994 double score;
1995
1996 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
1997 de = dictFind(zs->dict,ele);
1998 if (de != NULL) {
1999 score = *(double*)dictGetEntryVal(de);
2000 rank = zslGetRank(zsl,score,ele);
2001 redisAssert(rank); /* Existing elements always have a rank. */
2002 if (reverse)
2003 addReplyLongLong(c,llen-rank);
2004 else
2005 addReplyLongLong(c,rank-1);
2006 } else {
2007 addReply(c,shared.nullbulk);
2008 }
2009 } else {
2010 redisPanic("Unknown sorted set encoding");
2011 }
2012 }
2013
2014 void zrankCommand(redisClient *c) {
2015 zrankGenericCommand(c, 0);
2016 }
2017
2018 void zrevrankCommand(redisClient *c) {
2019 zrankGenericCommand(c, 1);
2020 }