]> git.saurik.com Git - redis.git/blob - src/t_zset.c
save peak memory usage as statistic and show it in INFO. Also a new INFO field was...
[redis.git] / src / t_zset.c
1 #include "redis.h"
2
3 #include <math.h>
4
5 /*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31 }
32
33 zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48 }
49
50 void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
52 zfree(node);
53 }
54
55 void zslFree(zskiplist *zsl) {
56 zskiplistNode *node = zsl->header->level[0].forward, *next;
57
58 zfree(zsl->header);
59 while(node) {
60 next = node->level[0].forward;
61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65 }
66
67 int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72 }
73
74 zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
101 update[i]->level[i].span = zsl->length;
102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
109
110 /* update span covered by update[i] as x is inserted here */
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
117 update[i]->level[i].span++;
118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
123 else
124 zsl->tail = x;
125 zsl->length++;
126 return x;
127 }
128
129 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
136 } else {
137 update[i]->level[i].span -= 1;
138 }
139 }
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
142 } else {
143 zsl->tail = x->backward;
144 }
145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
146 zsl->level--;
147 zsl->length--;
148 }
149
150 /* Delete an element with matching score/object from the skiplist. */
151 int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
166 x = x->level[0].forward;
167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175 }
176
177 /* Struct to hold a inclusive/exclusive range spec. */
178 typedef struct {
179 double min, max;
180 int minex, maxex; /* are min or max exclusive? */
181 } zrangespec;
182
183 static int zslValueGteMin(double value, zrangespec *spec) {
184 return spec->minex ? (value > spec->min) : (value >= spec->min);
185 }
186
187 static int zslValueLteMax(double value, zrangespec *spec) {
188 return spec->maxex ? (value < spec->max) : (value <= spec->max);
189 }
190
191 /* Returns if there is a part of the zset is in range. */
192 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
193 zskiplistNode *x;
194
195 /* Test for ranges that will always be empty. */
196 if (range->min > range->max ||
197 (range->min == range->max && (range->minex || range->maxex)))
198 return 0;
199 x = zsl->tail;
200 if (x == NULL || !zslValueGteMin(x->score,range))
201 return 0;
202 x = zsl->header->level[0].forward;
203 if (x == NULL || !zslValueLteMax(x->score,range))
204 return 0;
205 return 1;
206 }
207
208 /* Find the first node that is contained in the specified range.
209 * Returns NULL when no element is contained in the range. */
210 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
211 zskiplistNode *x;
212 int i;
213
214 /* If everything is out of range, return early. */
215 if (!zslIsInRange(zsl,&range)) return NULL;
216
217 x = zsl->header;
218 for (i = zsl->level-1; i >= 0; i--) {
219 /* Go forward while *OUT* of range. */
220 while (x->level[i].forward &&
221 !zslValueGteMin(x->level[i].forward->score,&range))
222 x = x->level[i].forward;
223 }
224
225 /* This is an inner range, so the next node cannot be NULL. */
226 x = x->level[0].forward;
227 redisAssert(x != NULL);
228
229 /* Check if score <= max. */
230 if (!zslValueLteMax(x->score,&range)) return NULL;
231 return x;
232 }
233
234 /* Find the last node that is contained in the specified range.
235 * Returns NULL when no element is contained in the range. */
236 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
237 zskiplistNode *x;
238 int i;
239
240 /* If everything is out of range, return early. */
241 if (!zslIsInRange(zsl,&range)) return NULL;
242
243 x = zsl->header;
244 for (i = zsl->level-1; i >= 0; i--) {
245 /* Go forward while *IN* range. */
246 while (x->level[i].forward &&
247 zslValueLteMax(x->level[i].forward->score,&range))
248 x = x->level[i].forward;
249 }
250
251 /* This is an inner range, so this node cannot be NULL. */
252 redisAssert(x != NULL);
253
254 /* Check if score >= min. */
255 if (!zslValueGteMin(x->score,&range)) return NULL;
256 return x;
257 }
258
259 /* Delete all the elements with score between min and max from the skiplist.
260 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
261 * Note that this function takes the reference to the hash table view of the
262 * sorted set, in order to remove the elements from the hash table too. */
263 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
264 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
265 unsigned long removed = 0;
266 int i;
267
268 x = zsl->header;
269 for (i = zsl->level-1; i >= 0; i--) {
270 while (x->level[i].forward && (range.minex ?
271 x->level[i].forward->score <= range.min :
272 x->level[i].forward->score < range.min))
273 x = x->level[i].forward;
274 update[i] = x;
275 }
276
277 /* Current node is the last with score < or <= min. */
278 x = x->level[0].forward;
279
280 /* Delete nodes while in range. */
281 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
282 zskiplistNode *next = x->level[0].forward;
283 zslDeleteNode(zsl,x,update);
284 dictDelete(dict,x->obj);
285 zslFreeNode(x);
286 removed++;
287 x = next;
288 }
289 return removed;
290 }
291
292 /* Delete all the elements with rank between start and end from the skiplist.
293 * Start and end are inclusive. Note that start and end need to be 1-based */
294 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
295 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
296 unsigned long traversed = 0, removed = 0;
297 int i;
298
299 x = zsl->header;
300 for (i = zsl->level-1; i >= 0; i--) {
301 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
302 traversed += x->level[i].span;
303 x = x->level[i].forward;
304 }
305 update[i] = x;
306 }
307
308 traversed++;
309 x = x->level[0].forward;
310 while (x && traversed <= end) {
311 zskiplistNode *next = x->level[0].forward;
312 zslDeleteNode(zsl,x,update);
313 dictDelete(dict,x->obj);
314 zslFreeNode(x);
315 removed++;
316 traversed++;
317 x = next;
318 }
319 return removed;
320 }
321
322 /* Find the rank for an element by both score and key.
323 * Returns 0 when the element cannot be found, rank otherwise.
324 * Note that the rank is 1-based due to the span of zsl->header to the
325 * first element. */
326 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
327 zskiplistNode *x;
328 unsigned long rank = 0;
329 int i;
330
331 x = zsl->header;
332 for (i = zsl->level-1; i >= 0; i--) {
333 while (x->level[i].forward &&
334 (x->level[i].forward->score < score ||
335 (x->level[i].forward->score == score &&
336 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
337 rank += x->level[i].span;
338 x = x->level[i].forward;
339 }
340
341 /* x might be equal to zsl->header, so test if obj is non-NULL */
342 if (x->obj && equalStringObjects(x->obj,o)) {
343 return rank;
344 }
345 }
346 return 0;
347 }
348
349 /* Finds an element by its rank. The rank argument needs to be 1-based. */
350 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
351 zskiplistNode *x;
352 unsigned long traversed = 0;
353 int i;
354
355 x = zsl->header;
356 for (i = zsl->level-1; i >= 0; i--) {
357 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
358 {
359 traversed += x->level[i].span;
360 x = x->level[i].forward;
361 }
362 if (traversed == rank) {
363 return x;
364 }
365 }
366 return NULL;
367 }
368
369 /* Populate the rangespec according to the objects min and max. */
370 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
371 char *eptr;
372 spec->minex = spec->maxex = 0;
373
374 /* Parse the min-max interval. If one of the values is prefixed
375 * by the "(" character, it's considered "open". For instance
376 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
377 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
378 if (min->encoding == REDIS_ENCODING_INT) {
379 spec->min = (long)min->ptr;
380 } else {
381 if (((char*)min->ptr)[0] == '(') {
382 spec->min = strtod((char*)min->ptr+1,&eptr);
383 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
384 spec->minex = 1;
385 } else {
386 spec->min = strtod((char*)min->ptr,&eptr);
387 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
388 }
389 }
390 if (max->encoding == REDIS_ENCODING_INT) {
391 spec->max = (long)max->ptr;
392 } else {
393 if (((char*)max->ptr)[0] == '(') {
394 spec->max = strtod((char*)max->ptr+1,&eptr);
395 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
396 spec->maxex = 1;
397 } else {
398 spec->max = strtod((char*)max->ptr,&eptr);
399 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
400 }
401 }
402
403 return REDIS_OK;
404 }
405
406 /*-----------------------------------------------------------------------------
407 * Ziplist-backed sorted set API
408 *----------------------------------------------------------------------------*/
409
410 double zzlGetScore(unsigned char *sptr) {
411 unsigned char *vstr;
412 unsigned int vlen;
413 long long vlong;
414 char buf[128];
415 double score;
416
417 redisAssert(sptr != NULL);
418 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
419
420 if (vstr) {
421 memcpy(buf,vstr,vlen);
422 buf[vlen] = '\0';
423 score = strtod(buf,NULL);
424 } else {
425 score = vlong;
426 }
427
428 return score;
429 }
430
431 /* Compare element in sorted set with given element. */
432 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
433 unsigned char *vstr;
434 unsigned int vlen;
435 long long vlong;
436 unsigned char vbuf[32];
437 int minlen, cmp;
438
439 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
440 if (vstr == NULL) {
441 /* Store string representation of long long in buf. */
442 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
443 vstr = vbuf;
444 }
445
446 minlen = (vlen < clen) ? vlen : clen;
447 cmp = memcmp(vstr,cstr,minlen);
448 if (cmp == 0) return vlen-clen;
449 return cmp;
450 }
451
452 unsigned int zzlLength(unsigned char *zl) {
453 return ziplistLen(zl)/2;
454 }
455
456 /* Move to next entry based on the values in eptr and sptr. Both are set to
457 * NULL when there is no next entry. */
458 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
459 unsigned char *_eptr, *_sptr;
460 redisAssert(*eptr != NULL && *sptr != NULL);
461
462 _eptr = ziplistNext(zl,*sptr);
463 if (_eptr != NULL) {
464 _sptr = ziplistNext(zl,_eptr);
465 redisAssert(_sptr != NULL);
466 } else {
467 /* No next entry. */
468 _sptr = NULL;
469 }
470
471 *eptr = _eptr;
472 *sptr = _sptr;
473 }
474
475 /* Move to the previous entry based on the values in eptr and sptr. Both are
476 * set to NULL when there is no next entry. */
477 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
478 unsigned char *_eptr, *_sptr;
479 redisAssert(*eptr != NULL && *sptr != NULL);
480
481 _sptr = ziplistPrev(zl,*eptr);
482 if (_sptr != NULL) {
483 _eptr = ziplistPrev(zl,_sptr);
484 redisAssert(_eptr != NULL);
485 } else {
486 /* No previous entry. */
487 _eptr = NULL;
488 }
489
490 *eptr = _eptr;
491 *sptr = _sptr;
492 }
493
494 /* Returns if there is a part of the zset is in range. Should only be used
495 * internally by zzlFirstInRange and zzlLastInRange. */
496 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
497 unsigned char *p;
498 double score;
499
500 /* Test for ranges that will always be empty. */
501 if (range->min > range->max ||
502 (range->min == range->max && (range->minex || range->maxex)))
503 return 0;
504
505 p = ziplistIndex(zl,-1); /* Last score. */
506 redisAssert(p != NULL);
507 score = zzlGetScore(p);
508 if (!zslValueGteMin(score,range))
509 return 0;
510
511 p = ziplistIndex(zl,1); /* First score. */
512 redisAssert(p != NULL);
513 score = zzlGetScore(p);
514 if (!zslValueLteMax(score,range))
515 return 0;
516
517 return 1;
518 }
519
520 /* Find pointer to the first element contained in the specified range.
521 * Returns NULL when no element is contained in the range. */
522 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
523 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
524 double score;
525
526 /* If everything is out of range, return early. */
527 if (!zzlIsInRange(zl,&range)) return NULL;
528
529 while (eptr != NULL) {
530 sptr = ziplistNext(zl,eptr);
531 redisAssert(sptr != NULL);
532
533 score = zzlGetScore(sptr);
534 if (zslValueGteMin(score,&range)) {
535 /* Check if score <= max. */
536 if (zslValueLteMax(score,&range))
537 return eptr;
538 return NULL;
539 }
540
541 /* Move to next element. */
542 eptr = ziplistNext(zl,sptr);
543 }
544
545 return NULL;
546 }
547
548 /* Find pointer to the last element contained in the specified range.
549 * Returns NULL when no element is contained in the range. */
550 unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) {
551 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
552 double score;
553
554 /* If everything is out of range, return early. */
555 if (!zzlIsInRange(zl,&range)) return NULL;
556
557 while (eptr != NULL) {
558 sptr = ziplistNext(zl,eptr);
559 redisAssert(sptr != NULL);
560
561 score = zzlGetScore(sptr);
562 if (zslValueLteMax(score,&range)) {
563 /* Check if score >= min. */
564 if (zslValueGteMin(score,&range))
565 return eptr;
566 return NULL;
567 }
568
569 /* Move to previous element by moving to the score of previous element.
570 * When this returns NULL, we know there also is no element. */
571 sptr = ziplistPrev(zl,eptr);
572 if (sptr != NULL)
573 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
574 else
575 eptr = NULL;
576 }
577
578 return NULL;
579 }
580
581 unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
582 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
583
584 ele = getDecodedObject(ele);
585 while (eptr != NULL) {
586 sptr = ziplistNext(zl,eptr);
587 redisAssert(sptr != NULL);
588
589 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
590 /* Matching element, pull out score. */
591 if (score != NULL) *score = zzlGetScore(sptr);
592 decrRefCount(ele);
593 return eptr;
594 }
595
596 /* Move to next element. */
597 eptr = ziplistNext(zl,sptr);
598 }
599
600 decrRefCount(ele);
601 return NULL;
602 }
603
604 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
605 * don't want to modify the one given as argument. */
606 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
607 unsigned char *p = eptr;
608
609 /* TODO: add function to ziplist API to delete N elements from offset. */
610 zl = ziplistDelete(zl,&p);
611 zl = ziplistDelete(zl,&p);
612 return zl;
613 }
614
615 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
616 unsigned char *sptr;
617 char scorebuf[128];
618 int scorelen;
619 size_t offset;
620
621 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
622 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
623 if (eptr == NULL) {
624 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
625 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
626 } else {
627 /* Keep offset relative to zl, as it might be re-allocated. */
628 offset = eptr-zl;
629 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
630 eptr = zl+offset;
631
632 /* Insert score after the element. */
633 redisAssert((sptr = ziplistNext(zl,eptr)) != NULL);
634 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
635 }
636
637 return zl;
638 }
639
640 /* Insert (element,score) pair in ziplist. This function assumes the element is
641 * not yet present in the list. */
642 unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
643 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
644 double s;
645
646 ele = getDecodedObject(ele);
647 while (eptr != NULL) {
648 sptr = ziplistNext(zl,eptr);
649 redisAssert(sptr != NULL);
650 s = zzlGetScore(sptr);
651
652 if (s > score) {
653 /* First element with score larger than score for element to be
654 * inserted. This means we should take its spot in the list to
655 * maintain ordering. */
656 zl = zzlInsertAt(zl,eptr,ele,score);
657 break;
658 } else if (s == score) {
659 /* Ensure lexicographical ordering for elements. */
660 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
661 zl = zzlInsertAt(zl,eptr,ele,score);
662 break;
663 }
664 }
665
666 /* Move to next element. */
667 eptr = ziplistNext(zl,sptr);
668 }
669
670 /* Push on tail of list when it was not yet inserted. */
671 if (eptr == NULL)
672 zl = zzlInsertAt(zl,NULL,ele,score);
673
674 decrRefCount(ele);
675 return zl;
676 }
677
678 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
679 unsigned char *eptr, *sptr;
680 double score;
681 unsigned long num = 0;
682
683 if (deleted != NULL) *deleted = 0;
684
685 eptr = zzlFirstInRange(zl,range);
686 if (eptr == NULL) return zl;
687
688 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
689 * byte and ziplistNext will return NULL. */
690 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
691 score = zzlGetScore(sptr);
692 if (zslValueLteMax(score,&range)) {
693 /* Delete both the element and the score. */
694 zl = ziplistDelete(zl,&eptr);
695 zl = ziplistDelete(zl,&eptr);
696 num++;
697 } else {
698 /* No longer in range. */
699 break;
700 }
701 }
702
703 if (deleted != NULL) *deleted = num;
704 return zl;
705 }
706
707 /* Delete all the elements with rank between start and end from the skiplist.
708 * Start and end are inclusive. Note that start and end need to be 1-based */
709 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
710 unsigned int num = (end-start)+1;
711 if (deleted) *deleted = num;
712 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
713 return zl;
714 }
715
716 /*-----------------------------------------------------------------------------
717 * Common sorted set API
718 *----------------------------------------------------------------------------*/
719
720 unsigned int zsetLength(robj *zobj) {
721 int length = -1;
722 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
723 length = zzlLength(zobj->ptr);
724 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
725 length = ((zset*)zobj->ptr)->zsl->length;
726 } else {
727 redisPanic("Unknown sorted set encoding");
728 }
729 return length;
730 }
731
732 void zsetConvert(robj *zobj, int encoding) {
733 zset *zs;
734 zskiplistNode *node, *next;
735 robj *ele;
736 double score;
737
738 if (zobj->encoding == encoding) return;
739 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
740 unsigned char *zl = zobj->ptr;
741 unsigned char *eptr, *sptr;
742 unsigned char *vstr;
743 unsigned int vlen;
744 long long vlong;
745
746 if (encoding != REDIS_ENCODING_SKIPLIST)
747 redisPanic("Unknown target encoding");
748
749 zs = zmalloc(sizeof(*zs));
750 zs->dict = dictCreate(&zsetDictType,NULL);
751 zs->zsl = zslCreate();
752
753 eptr = ziplistIndex(zl,0);
754 redisAssert(eptr != NULL);
755 sptr = ziplistNext(zl,eptr);
756 redisAssert(sptr != NULL);
757
758 while (eptr != NULL) {
759 score = zzlGetScore(sptr);
760 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
761 if (vstr == NULL)
762 ele = createStringObjectFromLongLong(vlong);
763 else
764 ele = createStringObject((char*)vstr,vlen);
765
766 /* Has incremented refcount since it was just created. */
767 node = zslInsert(zs->zsl,score,ele);
768 redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
769 incrRefCount(ele); /* Added to dictionary. */
770 zzlNext(zl,&eptr,&sptr);
771 }
772
773 zfree(zobj->ptr);
774 zobj->ptr = zs;
775 zobj->encoding = REDIS_ENCODING_SKIPLIST;
776 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
777 unsigned char *zl = ziplistNew();
778
779 if (encoding != REDIS_ENCODING_ZIPLIST)
780 redisPanic("Unknown target encoding");
781
782 /* Approach similar to zslFree(), since we want to free the skiplist at
783 * the same time as creating the ziplist. */
784 zs = zobj->ptr;
785 dictRelease(zs->dict);
786 node = zs->zsl->header->level[0].forward;
787 zfree(zs->zsl->header);
788 zfree(zs->zsl);
789
790 while (node) {
791 ele = getDecodedObject(node->obj);
792 zl = zzlInsertAt(zl,NULL,ele,node->score);
793 decrRefCount(ele);
794
795 next = node->level[0].forward;
796 zslFreeNode(node);
797 node = next;
798 }
799
800 zfree(zs);
801 zobj->ptr = zl;
802 zobj->encoding = REDIS_ENCODING_ZIPLIST;
803 } else {
804 redisPanic("Unknown sorted set encoding");
805 }
806 }
807
808 /*-----------------------------------------------------------------------------
809 * Sorted set commands
810 *----------------------------------------------------------------------------*/
811
812 /* This generic command implements both ZADD and ZINCRBY. */
813 void zaddGenericCommand(redisClient *c, int incr) {
814 static char *nanerr = "resulting score is not a number (NaN)";
815 robj *key = c->argv[1];
816 robj *ele;
817 robj *zobj;
818 robj *curobj;
819 double score, curscore = 0.0;
820
821 if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
822 return;
823
824 zobj = lookupKeyWrite(c->db,key);
825 if (zobj == NULL) {
826 if (server.zset_max_ziplist_entries == 0 ||
827 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
828 {
829 zobj = createZsetObject();
830 } else {
831 zobj = createZsetZiplistObject();
832 }
833 dbAdd(c->db,key,zobj);
834 } else {
835 if (zobj->type != REDIS_ZSET) {
836 addReply(c,shared.wrongtypeerr);
837 return;
838 }
839 }
840
841 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
842 unsigned char *eptr;
843
844 /* Prefer non-encoded element when dealing with ziplists. */
845 ele = c->argv[3];
846 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
847 if (incr) {
848 score += curscore;
849 if (isnan(score)) {
850 addReplyError(c,nanerr);
851 /* Don't need to check if the sorted set is empty, because
852 * we know it has at least one element. */
853 return;
854 }
855 }
856
857 /* Remove and re-insert when score changed. */
858 if (score != curscore) {
859 zobj->ptr = zzlDelete(zobj->ptr,eptr);
860 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
861
862 signalModifiedKey(c->db,key);
863 server.dirty++;
864 }
865
866 if (incr) /* ZINCRBY */
867 addReplyDouble(c,score);
868 else /* ZADD */
869 addReply(c,shared.czero);
870 } else {
871 /* Optimize: check if the element is too large or the list becomes
872 * too long *before* executing zzlInsert. */
873 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
874 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
875 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
876 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
877 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
878
879 signalModifiedKey(c->db,key);
880 server.dirty++;
881
882 if (incr) /* ZINCRBY */
883 addReplyDouble(c,score);
884 else /* ZADD */
885 addReply(c,shared.cone);
886 }
887 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
888 zset *zs = zobj->ptr;
889 zskiplistNode *znode;
890 dictEntry *de;
891
892 ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
893 de = dictFind(zs->dict,ele);
894 if (de != NULL) {
895 curobj = dictGetEntryKey(de);
896 curscore = *(double*)dictGetEntryVal(de);
897
898 if (incr) {
899 score += curscore;
900 if (isnan(score)) {
901 addReplyError(c,nanerr);
902 /* Don't need to check if the sorted set is empty, because
903 * we know it has at least one element. */
904 return;
905 }
906 }
907
908 /* Remove and re-insert when score changed. We can safely delete
909 * the key object from the skiplist, since the dictionary still has
910 * a reference to it. */
911 if (score != curscore) {
912 redisAssert(zslDelete(zs->zsl,curscore,curobj));
913 znode = zslInsert(zs->zsl,score,curobj);
914 incrRefCount(curobj); /* Re-inserted in skiplist. */
915 dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
916
917 signalModifiedKey(c->db,key);
918 server.dirty++;
919 }
920
921 if (incr) /* ZINCRBY */
922 addReplyDouble(c,score);
923 else /* ZADD */
924 addReply(c,shared.czero);
925 } else {
926 znode = zslInsert(zs->zsl,score,ele);
927 incrRefCount(ele); /* Inserted in skiplist. */
928 redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
929 incrRefCount(ele); /* Added to dictionary. */
930
931 signalModifiedKey(c->db,key);
932 server.dirty++;
933
934 if (incr) /* ZINCRBY */
935 addReplyDouble(c,score);
936 else /* ZADD */
937 addReply(c,shared.cone);
938 }
939 } else {
940 redisPanic("Unknown sorted set encoding");
941 }
942 }
943
944 void zaddCommand(redisClient *c) {
945 zaddGenericCommand(c,0);
946 }
947
948 void zincrbyCommand(redisClient *c) {
949 zaddGenericCommand(c,1);
950 }
951
952 void zremCommand(redisClient *c) {
953 robj *key = c->argv[1];
954 robj *ele = c->argv[2];
955 robj *zobj;
956
957 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
958 checkType(c,zobj,REDIS_ZSET)) return;
959
960 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
961 unsigned char *eptr;
962
963 if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
964 zobj->ptr = zzlDelete(zobj->ptr,eptr);
965 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
966 } else {
967 addReply(c,shared.czero);
968 return;
969 }
970 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
971 zset *zs = zobj->ptr;
972 dictEntry *de;
973 double score;
974
975 de = dictFind(zs->dict,ele);
976 if (de != NULL) {
977 /* Delete from the skiplist */
978 score = *(double*)dictGetEntryVal(de);
979 redisAssert(zslDelete(zs->zsl,score,ele));
980
981 /* Delete from the hash table */
982 dictDelete(zs->dict,ele);
983 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
984 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
985 } else {
986 addReply(c,shared.czero);
987 return;
988 }
989 } else {
990 redisPanic("Unknown sorted set encoding");
991 }
992
993 signalModifiedKey(c->db,key);
994 server.dirty++;
995 addReply(c,shared.cone);
996 }
997
998 void zremrangebyscoreCommand(redisClient *c) {
999 robj *key = c->argv[1];
1000 robj *zobj;
1001 zrangespec range;
1002 unsigned long deleted;
1003
1004 /* Parse the range arguments. */
1005 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1006 addReplyError(c,"min or max is not a double");
1007 return;
1008 }
1009
1010 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1011 checkType(c,zobj,REDIS_ZSET)) return;
1012
1013 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1014 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
1015 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
1016 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1017 zset *zs = zobj->ptr;
1018 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1019 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1020 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1021 } else {
1022 redisPanic("Unknown sorted set encoding");
1023 }
1024
1025 if (deleted) signalModifiedKey(c->db,key);
1026 server.dirty += deleted;
1027 addReplyLongLong(c,deleted);
1028 }
1029
1030 void zremrangebyrankCommand(redisClient *c) {
1031 robj *key = c->argv[1];
1032 robj *zobj;
1033 long start;
1034 long end;
1035 int llen;
1036 unsigned long deleted;
1037
1038 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1039 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1040
1041 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1042 checkType(c,zobj,REDIS_ZSET)) return;
1043
1044 /* Sanitize indexes. */
1045 llen = zsetLength(zobj);
1046 if (start < 0) start = llen+start;
1047 if (end < 0) end = llen+end;
1048 if (start < 0) start = 0;
1049
1050 /* Invariant: start >= 0, so this test will be true when end < 0.
1051 * The range is empty when start > end or start >= length. */
1052 if (start > end || start >= llen) {
1053 addReply(c,shared.czero);
1054 return;
1055 }
1056 if (end >= llen) end = llen-1;
1057
1058 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1059 /* Correct for 1-based rank. */
1060 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1061 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
1062 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1063 zset *zs = zobj->ptr;
1064
1065 /* Correct for 1-based rank. */
1066 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1067 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1068 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1069 } else {
1070 redisPanic("Unknown sorted set encoding");
1071 }
1072
1073 if (deleted) signalModifiedKey(c->db,key);
1074 server.dirty += deleted;
1075 addReplyLongLong(c,deleted);
1076 }
1077
1078 typedef struct {
1079 robj *subject;
1080 int type; /* Set, sorted set */
1081 int encoding;
1082 double weight;
1083
1084 union {
1085 /* Set iterators. */
1086 union _iterset {
1087 struct {
1088 intset *is;
1089 int ii;
1090 } is;
1091 struct {
1092 dict *dict;
1093 dictIterator *di;
1094 dictEntry *de;
1095 } ht;
1096 } set;
1097
1098 /* Sorted set iterators. */
1099 union _iterzset {
1100 struct {
1101 unsigned char *zl;
1102 unsigned char *eptr, *sptr;
1103 } zl;
1104 struct {
1105 zset *zs;
1106 zskiplistNode *node;
1107 } sl;
1108 } zset;
1109 } iter;
1110 } zsetopsrc;
1111
1112
1113 /* Use dirty flags for pointers that need to be cleaned up in the next
1114 * iteration over the zsetopval. The dirty flag for the long long value is
1115 * special, since long long values don't need cleanup. Instead, it means that
1116 * we already checked that "ell" holds a long long, or tried to convert another
1117 * representation into a long long value. When this was successful,
1118 * OPVAL_VALID_LL is set as well. */
1119 #define OPVAL_DIRTY_ROBJ 1
1120 #define OPVAL_DIRTY_LL 2
1121 #define OPVAL_VALID_LL 4
1122
1123 /* Store value retrieved from the iterator. */
1124 typedef struct {
1125 int flags;
1126 unsigned char _buf[32]; /* Private buffer. */
1127 robj *ele;
1128 unsigned char *estr;
1129 unsigned int elen;
1130 long long ell;
1131 double score;
1132 } zsetopval;
1133
1134 typedef union _iterset iterset;
1135 typedef union _iterzset iterzset;
1136
1137 void zuiInitIterator(zsetopsrc *op) {
1138 if (op->subject == NULL)
1139 return;
1140
1141 if (op->type == REDIS_SET) {
1142 iterset *it = &op->iter.set;
1143 if (op->encoding == REDIS_ENCODING_INTSET) {
1144 it->is.is = op->subject->ptr;
1145 it->is.ii = 0;
1146 } else if (op->encoding == REDIS_ENCODING_HT) {
1147 it->ht.dict = op->subject->ptr;
1148 it->ht.di = dictGetIterator(op->subject->ptr);
1149 it->ht.de = dictNext(it->ht.di);
1150 } else {
1151 redisPanic("Unknown set encoding");
1152 }
1153 } else if (op->type == REDIS_ZSET) {
1154 iterzset *it = &op->iter.zset;
1155 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1156 it->zl.zl = op->subject->ptr;
1157 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1158 if (it->zl.eptr != NULL) {
1159 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1160 redisAssert(it->zl.sptr != NULL);
1161 }
1162 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1163 it->sl.zs = op->subject->ptr;
1164 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1165 } else {
1166 redisPanic("Unknown sorted set encoding");
1167 }
1168 } else {
1169 redisPanic("Unsupported type");
1170 }
1171 }
1172
1173 void zuiClearIterator(zsetopsrc *op) {
1174 if (op->subject == NULL)
1175 return;
1176
1177 if (op->type == REDIS_SET) {
1178 iterset *it = &op->iter.set;
1179 if (op->encoding == REDIS_ENCODING_INTSET) {
1180 REDIS_NOTUSED(it); /* skip */
1181 } else if (op->encoding == REDIS_ENCODING_HT) {
1182 dictReleaseIterator(it->ht.di);
1183 } else {
1184 redisPanic("Unknown set encoding");
1185 }
1186 } else if (op->type == REDIS_ZSET) {
1187 iterzset *it = &op->iter.zset;
1188 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1189 REDIS_NOTUSED(it); /* skip */
1190 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1191 REDIS_NOTUSED(it); /* skip */
1192 } else {
1193 redisPanic("Unknown sorted set encoding");
1194 }
1195 } else {
1196 redisPanic("Unsupported type");
1197 }
1198 }
1199
1200 int zuiLength(zsetopsrc *op) {
1201 if (op->subject == NULL)
1202 return 0;
1203
1204 if (op->type == REDIS_SET) {
1205 iterset *it = &op->iter.set;
1206 if (op->encoding == REDIS_ENCODING_INTSET) {
1207 return intsetLen(it->is.is);
1208 } else if (op->encoding == REDIS_ENCODING_HT) {
1209 return dictSize(it->ht.dict);
1210 } else {
1211 redisPanic("Unknown set encoding");
1212 }
1213 } else if (op->type == REDIS_ZSET) {
1214 iterzset *it = &op->iter.zset;
1215 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1216 return zzlLength(it->zl.zl);
1217 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1218 return it->sl.zs->zsl->length;
1219 } else {
1220 redisPanic("Unknown sorted set encoding");
1221 }
1222 } else {
1223 redisPanic("Unsupported type");
1224 }
1225 }
1226
1227 /* Check if the current value is valid. If so, store it in the passed structure
1228 * and move to the next element. If not valid, this means we have reached the
1229 * end of the structure and can abort. */
1230 int zuiNext(zsetopsrc *op, zsetopval *val) {
1231 if (op->subject == NULL)
1232 return 0;
1233
1234 if (val->flags & OPVAL_DIRTY_ROBJ)
1235 decrRefCount(val->ele);
1236
1237 bzero(val,sizeof(zsetopval));
1238
1239 if (op->type == REDIS_SET) {
1240 iterset *it = &op->iter.set;
1241 if (op->encoding == REDIS_ENCODING_INTSET) {
1242 if (!intsetGet(it->is.is,it->is.ii,&val->ell))
1243 return 0;
1244 val->score = 1.0;
1245
1246 /* Move to next element. */
1247 it->is.ii++;
1248 } else if (op->encoding == REDIS_ENCODING_HT) {
1249 if (it->ht.de == NULL)
1250 return 0;
1251 val->ele = dictGetEntryKey(it->ht.de);
1252 val->score = 1.0;
1253
1254 /* Move to next element. */
1255 it->ht.de = dictNext(it->ht.di);
1256 } else {
1257 redisPanic("Unknown set encoding");
1258 }
1259 } else if (op->type == REDIS_ZSET) {
1260 iterzset *it = &op->iter.zset;
1261 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1262 /* No need to check both, but better be explicit. */
1263 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1264 return 0;
1265 redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1266 val->score = zzlGetScore(it->zl.sptr);
1267
1268 /* Move to next element. */
1269 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
1270 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1271 if (it->sl.node == NULL)
1272 return 0;
1273 val->ele = it->sl.node->obj;
1274 val->score = it->sl.node->score;
1275
1276 /* Move to next element. */
1277 it->sl.node = it->sl.node->level[0].forward;
1278 } else {
1279 redisPanic("Unknown sorted set encoding");
1280 }
1281 } else {
1282 redisPanic("Unsupported type");
1283 }
1284 return 1;
1285 }
1286
1287 int zuiLongLongFromValue(zsetopval *val) {
1288 if (!(val->flags & OPVAL_DIRTY_LL)) {
1289 val->flags |= OPVAL_DIRTY_LL;
1290
1291 if (val->ele != NULL) {
1292 if (val->ele->encoding == REDIS_ENCODING_INT) {
1293 val->ell = (long)val->ele->ptr;
1294 val->flags |= OPVAL_VALID_LL;
1295 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1296 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1297 val->flags |= OPVAL_VALID_LL;
1298 } else {
1299 redisPanic("Unsupported element encoding");
1300 }
1301 } else if (val->estr != NULL) {
1302 if (string2ll((char*)val->estr,val->elen,&val->ell))
1303 val->flags |= OPVAL_VALID_LL;
1304 } else {
1305 /* The long long was already set, flag as valid. */
1306 val->flags |= OPVAL_VALID_LL;
1307 }
1308 }
1309 return val->flags & OPVAL_VALID_LL;
1310 }
1311
1312 robj *zuiObjectFromValue(zsetopval *val) {
1313 if (val->ele == NULL) {
1314 if (val->estr != NULL) {
1315 val->ele = createStringObject((char*)val->estr,val->elen);
1316 } else {
1317 val->ele = createStringObjectFromLongLong(val->ell);
1318 }
1319 val->flags |= OPVAL_DIRTY_ROBJ;
1320 }
1321 return val->ele;
1322 }
1323
1324 int zuiBufferFromValue(zsetopval *val) {
1325 if (val->estr == NULL) {
1326 if (val->ele != NULL) {
1327 if (val->ele->encoding == REDIS_ENCODING_INT) {
1328 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1329 val->estr = val->_buf;
1330 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1331 val->elen = sdslen(val->ele->ptr);
1332 val->estr = val->ele->ptr;
1333 } else {
1334 redisPanic("Unsupported element encoding");
1335 }
1336 } else {
1337 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1338 val->estr = val->_buf;
1339 }
1340 }
1341 return 1;
1342 }
1343
1344 /* Find value pointed to by val in the source pointer to by op. When found,
1345 * return 1 and store its score in target. Return 0 otherwise. */
1346 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1347 if (op->subject == NULL)
1348 return 0;
1349
1350 if (op->type == REDIS_SET) {
1351 iterset *it = &op->iter.set;
1352
1353 if (op->encoding == REDIS_ENCODING_INTSET) {
1354 if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1355 *score = 1.0;
1356 return 1;
1357 } else {
1358 return 0;
1359 }
1360 } else if (op->encoding == REDIS_ENCODING_HT) {
1361 zuiObjectFromValue(val);
1362 if (dictFind(it->ht.dict,val->ele) != NULL) {
1363 *score = 1.0;
1364 return 1;
1365 } else {
1366 return 0;
1367 }
1368 } else {
1369 redisPanic("Unknown set encoding");
1370 }
1371 } else if (op->type == REDIS_ZSET) {
1372 iterzset *it = &op->iter.zset;
1373 zuiObjectFromValue(val);
1374
1375 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1376 if (zzlFind(it->zl.zl,val->ele,score) != NULL) {
1377 /* Score is already set by zzlFind. */
1378 return 1;
1379 } else {
1380 return 0;
1381 }
1382 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1383 dictEntry *de;
1384 if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
1385 *score = *(double*)dictGetEntryVal(de);
1386 return 1;
1387 } else {
1388 return 0;
1389 }
1390 } else {
1391 redisPanic("Unknown sorted set encoding");
1392 }
1393 } else {
1394 redisPanic("Unsupported type");
1395 }
1396 }
1397
1398 int zuiCompareByCardinality(const void *s1, const void *s2) {
1399 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
1400 }
1401
1402 #define REDIS_AGGR_SUM 1
1403 #define REDIS_AGGR_MIN 2
1404 #define REDIS_AGGR_MAX 3
1405 #define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
1406
1407 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1408 if (aggregate == REDIS_AGGR_SUM) {
1409 *target = *target + val;
1410 /* The result of adding two doubles is NaN when one variable
1411 * is +inf and the other is -inf. When these numbers are added,
1412 * we maintain the convention of the result being 0.0. */
1413 if (isnan(*target)) *target = 0.0;
1414 } else if (aggregate == REDIS_AGGR_MIN) {
1415 *target = val < *target ? val : *target;
1416 } else if (aggregate == REDIS_AGGR_MAX) {
1417 *target = val > *target ? val : *target;
1418 } else {
1419 /* safety net */
1420 redisPanic("Unknown ZUNION/INTER aggregate type");
1421 }
1422 }
1423
1424 void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1425 int i, j, setnum;
1426 int aggregate = REDIS_AGGR_SUM;
1427 zsetopsrc *src;
1428 zsetopval zval;
1429 robj *tmp;
1430 unsigned int maxelelen = 0;
1431 robj *dstobj;
1432 zset *dstzset;
1433 zskiplistNode *znode;
1434 int touched = 0;
1435
1436 /* expect setnum input keys to be given */
1437 setnum = atoi(c->argv[2]->ptr);
1438 if (setnum < 1) {
1439 addReplyError(c,
1440 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1441 return;
1442 }
1443
1444 /* test if the expected number of keys would overflow */
1445 if (3+setnum > c->argc) {
1446 addReply(c,shared.syntaxerr);
1447 return;
1448 }
1449
1450 /* read keys to be used for input */
1451 src = zcalloc(sizeof(zsetopsrc) * setnum);
1452 for (i = 0, j = 3; i < setnum; i++, j++) {
1453 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1454 if (obj != NULL) {
1455 if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
1456 zfree(src);
1457 addReply(c,shared.wrongtypeerr);
1458 return;
1459 }
1460
1461 src[i].subject = obj;
1462 src[i].type = obj->type;
1463 src[i].encoding = obj->encoding;
1464 } else {
1465 src[i].subject = NULL;
1466 }
1467
1468 /* Default all weights to 1. */
1469 src[i].weight = 1.0;
1470 }
1471
1472 /* parse optional extra arguments */
1473 if (j < c->argc) {
1474 int remaining = c->argc - j;
1475
1476 while (remaining) {
1477 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1478 j++; remaining--;
1479 for (i = 0; i < setnum; i++, j++, remaining--) {
1480 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1481 "weight value is not a double") != REDIS_OK)
1482 {
1483 zfree(src);
1484 return;
1485 }
1486 }
1487 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1488 j++; remaining--;
1489 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1490 aggregate = REDIS_AGGR_SUM;
1491 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1492 aggregate = REDIS_AGGR_MIN;
1493 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1494 aggregate = REDIS_AGGR_MAX;
1495 } else {
1496 zfree(src);
1497 addReply(c,shared.syntaxerr);
1498 return;
1499 }
1500 j++; remaining--;
1501 } else {
1502 zfree(src);
1503 addReply(c,shared.syntaxerr);
1504 return;
1505 }
1506 }
1507 }
1508
1509 for (i = 0; i < setnum; i++)
1510 zuiInitIterator(&src[i]);
1511
1512 /* sort sets from the smallest to largest, this will improve our
1513 * algorithm's performance */
1514 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
1515
1516 dstobj = createZsetObject();
1517 dstzset = dstobj->ptr;
1518 memset(&zval, 0, sizeof(zval));
1519
1520 if (op == REDIS_OP_INTER) {
1521 /* Skip everything if the smallest input is empty. */
1522 if (zuiLength(&src[0]) > 0) {
1523 /* Precondition: as src[0] is non-empty and the inputs are ordered
1524 * by size, all src[i > 0] are non-empty too. */
1525 while (zuiNext(&src[0],&zval)) {
1526 double score, value;
1527
1528 score = src[0].weight * zval.score;
1529 for (j = 1; j < setnum; j++) {
1530 if (zuiFind(&src[j],&zval,&value)) {
1531 value *= src[j].weight;
1532 zunionInterAggregate(&score,value,aggregate);
1533 } else {
1534 break;
1535 }
1536 }
1537
1538 /* Only continue when present in every input. */
1539 if (j == setnum) {
1540 tmp = zuiObjectFromValue(&zval);
1541 znode = zslInsert(dstzset->zsl,score,tmp);
1542 incrRefCount(tmp); /* added to skiplist */
1543 dictAdd(dstzset->dict,tmp,&znode->score);
1544 incrRefCount(tmp); /* added to dictionary */
1545
1546 if (tmp->encoding == REDIS_ENCODING_RAW)
1547 if (sdslen(tmp->ptr) > maxelelen)
1548 maxelelen = sdslen(tmp->ptr);
1549 }
1550 }
1551 }
1552 } else if (op == REDIS_OP_UNION) {
1553 for (i = 0; i < setnum; i++) {
1554 if (zuiLength(&src[0]) == 0)
1555 continue;
1556
1557 while (zuiNext(&src[i],&zval)) {
1558 double score, value;
1559
1560 /* Skip key when already processed */
1561 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
1562 continue;
1563
1564 /* Initialize score */
1565 score = src[i].weight * zval.score;
1566
1567 /* Because the inputs are sorted by size, it's only possible
1568 * for sets at larger indices to hold this element. */
1569 for (j = (i+1); j < setnum; j++) {
1570 if (zuiFind(&src[j],&zval,&value)) {
1571 value *= src[j].weight;
1572 zunionInterAggregate(&score,value,aggregate);
1573 }
1574 }
1575
1576 tmp = zuiObjectFromValue(&zval);
1577 znode = zslInsert(dstzset->zsl,score,tmp);
1578 incrRefCount(zval.ele); /* added to skiplist */
1579 dictAdd(dstzset->dict,tmp,&znode->score);
1580 incrRefCount(zval.ele); /* added to dictionary */
1581
1582 if (tmp->encoding == REDIS_ENCODING_RAW)
1583 if (sdslen(tmp->ptr) > maxelelen)
1584 maxelelen = sdslen(tmp->ptr);
1585 }
1586 }
1587 } else {
1588 redisPanic("Unknown operator");
1589 }
1590
1591 for (i = 0; i < setnum; i++)
1592 zuiClearIterator(&src[i]);
1593
1594 if (dbDelete(c->db,dstkey)) {
1595 signalModifiedKey(c->db,dstkey);
1596 touched = 1;
1597 server.dirty++;
1598 }
1599 if (dstzset->zsl->length) {
1600 /* Convert to ziplist when in limits. */
1601 if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
1602 maxelelen <= server.zset_max_ziplist_value)
1603 zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
1604
1605 dbAdd(c->db,dstkey,dstobj);
1606 addReplyLongLong(c,zsetLength(dstobj));
1607 if (!touched) signalModifiedKey(c->db,dstkey);
1608 server.dirty++;
1609 } else {
1610 decrRefCount(dstobj);
1611 addReply(c,shared.czero);
1612 }
1613 zfree(src);
1614 }
1615
1616 void zunionstoreCommand(redisClient *c) {
1617 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1618 }
1619
1620 void zinterstoreCommand(redisClient *c) {
1621 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1622 }
1623
1624 void zrangeGenericCommand(redisClient *c, int reverse) {
1625 robj *key = c->argv[1];
1626 robj *zobj;
1627 int withscores = 0;
1628 long start;
1629 long end;
1630 int llen;
1631 int rangelen;
1632
1633 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1634 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1635
1636 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1637 withscores = 1;
1638 } else if (c->argc >= 5) {
1639 addReply(c,shared.syntaxerr);
1640 return;
1641 }
1642
1643 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1644 || checkType(c,zobj,REDIS_ZSET)) return;
1645
1646 /* Sanitize indexes. */
1647 llen = zsetLength(zobj);
1648 if (start < 0) start = llen+start;
1649 if (end < 0) end = llen+end;
1650 if (start < 0) start = 0;
1651
1652 /* Invariant: start >= 0, so this test will be true when end < 0.
1653 * The range is empty when start > end or start >= length. */
1654 if (start > end || start >= llen) {
1655 addReply(c,shared.emptymultibulk);
1656 return;
1657 }
1658 if (end >= llen) end = llen-1;
1659 rangelen = (end-start)+1;
1660
1661 /* Return the result in form of a multi-bulk reply */
1662 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1663
1664 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1665 unsigned char *zl = zobj->ptr;
1666 unsigned char *eptr, *sptr;
1667 unsigned char *vstr;
1668 unsigned int vlen;
1669 long long vlong;
1670
1671 if (reverse)
1672 eptr = ziplistIndex(zl,-2-(2*start));
1673 else
1674 eptr = ziplistIndex(zl,2*start);
1675
1676 redisAssert(eptr != NULL);
1677 sptr = ziplistNext(zl,eptr);
1678
1679 while (rangelen--) {
1680 redisAssert(eptr != NULL && sptr != NULL);
1681 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1682 if (vstr == NULL)
1683 addReplyBulkLongLong(c,vlong);
1684 else
1685 addReplyBulkCBuffer(c,vstr,vlen);
1686
1687 if (withscores)
1688 addReplyDouble(c,zzlGetScore(sptr));
1689
1690 if (reverse)
1691 zzlPrev(zl,&eptr,&sptr);
1692 else
1693 zzlNext(zl,&eptr,&sptr);
1694 }
1695
1696 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1697 zset *zs = zobj->ptr;
1698 zskiplist *zsl = zs->zsl;
1699 zskiplistNode *ln;
1700 robj *ele;
1701
1702 /* Check if starting point is trivial, before doing log(N) lookup. */
1703 if (reverse) {
1704 ln = zsl->tail;
1705 if (start > 0)
1706 ln = zslGetElementByRank(zsl,llen-start);
1707 } else {
1708 ln = zsl->header->level[0].forward;
1709 if (start > 0)
1710 ln = zslGetElementByRank(zsl,start+1);
1711 }
1712
1713 while(rangelen--) {
1714 redisAssert(ln != NULL);
1715 ele = ln->obj;
1716 addReplyBulk(c,ele);
1717 if (withscores)
1718 addReplyDouble(c,ln->score);
1719 ln = reverse ? ln->backward : ln->level[0].forward;
1720 }
1721 } else {
1722 redisPanic("Unknown sorted set encoding");
1723 }
1724 }
1725
1726 void zrangeCommand(redisClient *c) {
1727 zrangeGenericCommand(c,0);
1728 }
1729
1730 void zrevrangeCommand(redisClient *c) {
1731 zrangeGenericCommand(c,1);
1732 }
1733
1734 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
1735 * If "justcount", only the number of elements in the range is returned. */
1736 void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
1737 zrangespec range;
1738 robj *key = c->argv[1];
1739 robj *emptyreply, *zobj;
1740 int offset = 0, limit = -1;
1741 int withscores = 0;
1742 unsigned long rangelen = 0;
1743 void *replylen = NULL;
1744 int minidx, maxidx;
1745
1746 /* Parse the range arguments. */
1747 if (reverse) {
1748 /* Range is given as [max,min] */
1749 maxidx = 2; minidx = 3;
1750 } else {
1751 /* Range is given as [min,max] */
1752 minidx = 2; maxidx = 3;
1753 }
1754
1755 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
1756 addReplyError(c,"min or max is not a double");
1757 return;
1758 }
1759
1760 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1761 * 4 arguments, so we'll never enter the following code path. */
1762 if (c->argc > 4) {
1763 int remaining = c->argc - 4;
1764 int pos = 4;
1765
1766 while (remaining) {
1767 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1768 pos++; remaining--;
1769 withscores = 1;
1770 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1771 offset = atoi(c->argv[pos+1]->ptr);
1772 limit = atoi(c->argv[pos+2]->ptr);
1773 pos += 3; remaining -= 3;
1774 } else {
1775 addReply(c,shared.syntaxerr);
1776 return;
1777 }
1778 }
1779 }
1780
1781 /* Ok, lookup the key and get the range */
1782 emptyreply = justcount ? shared.czero : shared.emptymultibulk;
1783 if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL ||
1784 checkType(c,zobj,REDIS_ZSET)) return;
1785
1786 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1787 unsigned char *zl = zobj->ptr;
1788 unsigned char *eptr, *sptr;
1789 unsigned char *vstr;
1790 unsigned int vlen;
1791 long long vlong;
1792 double score;
1793
1794 /* If reversed, get the last node in range as starting point. */
1795 if (reverse)
1796 eptr = zzlLastInRange(zl,range);
1797 else
1798 eptr = zzlFirstInRange(zl,range);
1799
1800 /* No "first" element in the specified interval. */
1801 if (eptr == NULL) {
1802 addReply(c,emptyreply);
1803 return;
1804 }
1805
1806 /* Get score pointer for the first element. */
1807 redisAssert(eptr != NULL);
1808 sptr = ziplistNext(zl,eptr);
1809
1810 /* We don't know in advance how many matching elements there are in the
1811 * list, so we push this object that will represent the multi-bulk
1812 * length in the output buffer, and will "fix" it later */
1813 if (!justcount)
1814 replylen = addDeferredMultiBulkLength(c);
1815
1816 /* If there is an offset, just traverse the number of elements without
1817 * checking the score because that is done in the next loop. */
1818 while (eptr && offset--)
1819 if (reverse)
1820 zzlPrev(zl,&eptr,&sptr);
1821 else
1822 zzlNext(zl,&eptr,&sptr);
1823
1824 while (eptr && limit--) {
1825 score = zzlGetScore(sptr);
1826
1827 /* Abort when the node is no longer in range. */
1828 if (reverse) {
1829 if (!zslValueGteMin(score,&range)) break;
1830 } else {
1831 if (!zslValueLteMax(score,&range)) break;
1832 }
1833
1834 /* Do our magic */
1835 rangelen++;
1836 if (!justcount) {
1837 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1838 if (vstr == NULL)
1839 addReplyBulkLongLong(c,vlong);
1840 else
1841 addReplyBulkCBuffer(c,vstr,vlen);
1842
1843 if (withscores)
1844 addReplyDouble(c,score);
1845 }
1846
1847 /* Move to next node */
1848 if (reverse)
1849 zzlPrev(zl,&eptr,&sptr);
1850 else
1851 zzlNext(zl,&eptr,&sptr);
1852 }
1853 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1854 zset *zs = zobj->ptr;
1855 zskiplist *zsl = zs->zsl;
1856 zskiplistNode *ln;
1857
1858 /* If reversed, get the last node in range as starting point. */
1859 if (reverse)
1860 ln = zslLastInRange(zsl,range);
1861 else
1862 ln = zslFirstInRange(zsl,range);
1863
1864 /* No "first" element in the specified interval. */
1865 if (ln == NULL) {
1866 addReply(c,emptyreply);
1867 return;
1868 }
1869
1870 /* We don't know in advance how many matching elements there are in the
1871 * list, so we push this object that will represent the multi-bulk
1872 * length in the output buffer, and will "fix" it later */
1873 if (!justcount)
1874 replylen = addDeferredMultiBulkLength(c);
1875
1876 /* If there is an offset, just traverse the number of elements without
1877 * checking the score because that is done in the next loop. */
1878 while (ln && offset--)
1879 ln = reverse ? ln->backward : ln->level[0].forward;
1880
1881 while (ln && limit--) {
1882 /* Abort when the node is no longer in range. */
1883 if (reverse) {
1884 if (!zslValueGteMin(ln->score,&range)) break;
1885 } else {
1886 if (!zslValueLteMax(ln->score,&range)) break;
1887 }
1888
1889 /* Do our magic */
1890 rangelen++;
1891 if (!justcount) {
1892 addReplyBulk(c,ln->obj);
1893 if (withscores)
1894 addReplyDouble(c,ln->score);
1895 }
1896
1897 /* Move to next node */
1898 ln = reverse ? ln->backward : ln->level[0].forward;
1899 }
1900 } else {
1901 redisPanic("Unknown sorted set encoding");
1902 }
1903
1904 if (justcount) {
1905 addReplyLongLong(c,(long)rangelen);
1906 } else {
1907 if (withscores) rangelen *= 2;
1908 setDeferredMultiBulkLength(c,replylen,rangelen);
1909 }
1910 }
1911
1912 void zrangebyscoreCommand(redisClient *c) {
1913 genericZrangebyscoreCommand(c,0,0);
1914 }
1915
1916 void zrevrangebyscoreCommand(redisClient *c) {
1917 genericZrangebyscoreCommand(c,1,0);
1918 }
1919
1920 void zcountCommand(redisClient *c) {
1921 genericZrangebyscoreCommand(c,0,1);
1922 }
1923
1924 void zcardCommand(redisClient *c) {
1925 robj *key = c->argv[1];
1926 robj *zobj;
1927
1928 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
1929 checkType(c,zobj,REDIS_ZSET)) return;
1930
1931 addReplyLongLong(c,zsetLength(zobj));
1932 }
1933
1934 void zscoreCommand(redisClient *c) {
1935 robj *key = c->argv[1];
1936 robj *zobj;
1937 double score;
1938
1939 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1940 checkType(c,zobj,REDIS_ZSET)) return;
1941
1942 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1943 if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
1944 addReplyDouble(c,score);
1945 else
1946 addReply(c,shared.nullbulk);
1947 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1948 zset *zs = zobj->ptr;
1949 dictEntry *de;
1950
1951 c->argv[2] = tryObjectEncoding(c->argv[2]);
1952 de = dictFind(zs->dict,c->argv[2]);
1953 if (de != NULL) {
1954 score = *(double*)dictGetEntryVal(de);
1955 addReplyDouble(c,score);
1956 } else {
1957 addReply(c,shared.nullbulk);
1958 }
1959 } else {
1960 redisPanic("Unknown sorted set encoding");
1961 }
1962 }
1963
1964 void zrankGenericCommand(redisClient *c, int reverse) {
1965 robj *key = c->argv[1];
1966 robj *ele = c->argv[2];
1967 robj *zobj;
1968 unsigned long llen;
1969 unsigned long rank;
1970
1971 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1972 checkType(c,zobj,REDIS_ZSET)) return;
1973 llen = zsetLength(zobj);
1974
1975 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
1976 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1977 unsigned char *zl = zobj->ptr;
1978 unsigned char *eptr, *sptr;
1979
1980 eptr = ziplistIndex(zl,0);
1981 redisAssert(eptr != NULL);
1982 sptr = ziplistNext(zl,eptr);
1983 redisAssert(sptr != NULL);
1984
1985 rank = 1;
1986 while(eptr != NULL) {
1987 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
1988 break;
1989 rank++;
1990 zzlNext(zl,&eptr,&sptr);
1991 }
1992
1993 if (eptr != NULL) {
1994 if (reverse)
1995 addReplyLongLong(c,llen-rank);
1996 else
1997 addReplyLongLong(c,rank-1);
1998 } else {
1999 addReply(c,shared.nullbulk);
2000 }
2001 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2002 zset *zs = zobj->ptr;
2003 zskiplist *zsl = zs->zsl;
2004 dictEntry *de;
2005 double score;
2006
2007 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
2008 de = dictFind(zs->dict,ele);
2009 if (de != NULL) {
2010 score = *(double*)dictGetEntryVal(de);
2011 rank = zslGetRank(zsl,score,ele);
2012 redisAssert(rank); /* Existing elements always have a rank. */
2013 if (reverse)
2014 addReplyLongLong(c,llen-rank);
2015 else
2016 addReplyLongLong(c,rank-1);
2017 } else {
2018 addReply(c,shared.nullbulk);
2019 }
2020 } else {
2021 redisPanic("Unknown sorted set encoding");
2022 }
2023 }
2024
2025 void zrankCommand(redisClient *c) {
2026 zrankGenericCommand(c, 0);
2027 }
2028
2029 void zrevrankCommand(redisClient *c) {
2030 zrankGenericCommand(c, 1);
2031 }