]> git.saurik.com Git - redis.git/blob - src/t_zset.c
b8a961eb775022e46ef2468ecbb54334a6ce6520
[redis.git] / src / t_zset.c
1 #include "redis.h"
2
3 #include <math.h>
4
5 /*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31 }
32
33 zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48 }
49
50 void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
52 zfree(node);
53 }
54
55 void zslFree(zskiplist *zsl) {
56 zskiplistNode *node = zsl->header->level[0].forward, *next;
57
58 zfree(zsl->header);
59 while(node) {
60 next = node->level[0].forward;
61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65 }
66
67 int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72 }
73
74 zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
101 update[i]->level[i].span = zsl->length;
102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
109
110 /* update span covered by update[i] as x is inserted here */
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
117 update[i]->level[i].span++;
118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
123 else
124 zsl->tail = x;
125 zsl->length++;
126 return x;
127 }
128
129 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
136 } else {
137 update[i]->level[i].span -= 1;
138 }
139 }
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
142 } else {
143 zsl->tail = x->backward;
144 }
145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
146 zsl->level--;
147 zsl->length--;
148 }
149
150 /* Delete an element with matching score/object from the skiplist. */
151 int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
166 x = x->level[0].forward;
167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175 }
176
177 /* Struct to hold a inclusive/exclusive range spec. */
178 typedef struct {
179 double min, max;
180 int minex, maxex; /* are min or max exclusive? */
181 } zrangespec;
182
183 static int zslValueGteMin(double value, zrangespec *spec) {
184 return spec->minex ? (value > spec->min) : (value >= spec->min);
185 }
186
187 static int zslValueLteMax(double value, zrangespec *spec) {
188 return spec->maxex ? (value < spec->max) : (value <= spec->max);
189 }
190
191 static int zslValueInRange(double value, zrangespec *spec) {
192 return zslValueGteMin(value,spec) && zslValueLteMax(value,spec);
193 }
194
195 /* Returns if there is a part of the zset is in range. */
196 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
197 zskiplistNode *x;
198
199 /* Test for ranges that will always be empty. */
200 if (range->min > range->max ||
201 (range->min == range->max && (range->minex || range->maxex)))
202 return 0;
203 x = zsl->tail;
204 if (x == NULL || !zslValueGteMin(x->score,range))
205 return 0;
206 x = zsl->header->level[0].forward;
207 if (x == NULL || !zslValueLteMax(x->score,range))
208 return 0;
209 return 1;
210 }
211
212 /* Find the first node that is contained in the specified range.
213 * Returns NULL when no element is contained in the range. */
214 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
215 zskiplistNode *x;
216 int i;
217
218 /* If everything is out of range, return early. */
219 if (!zslIsInRange(zsl,&range)) return NULL;
220
221 x = zsl->header;
222 for (i = zsl->level-1; i >= 0; i--) {
223 /* Go forward while *OUT* of range. */
224 while (x->level[i].forward &&
225 !zslValueGteMin(x->level[i].forward->score,&range))
226 x = x->level[i].forward;
227 }
228
229 /* This is an inner range, so the next node cannot be NULL. */
230 x = x->level[0].forward;
231 redisAssert(x != NULL);
232
233 /* Check if score <= max. */
234 if (!zslValueLteMax(x->score,&range)) return NULL;
235 return x;
236 }
237
238 /* Find the last node that is contained in the specified range.
239 * Returns NULL when no element is contained in the range. */
240 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
241 zskiplistNode *x;
242 int i;
243
244 /* If everything is out of range, return early. */
245 if (!zslIsInRange(zsl,&range)) return NULL;
246
247 x = zsl->header;
248 for (i = zsl->level-1; i >= 0; i--) {
249 /* Go forward while *IN* range. */
250 while (x->level[i].forward &&
251 zslValueLteMax(x->level[i].forward->score,&range))
252 x = x->level[i].forward;
253 }
254
255 /* This is an inner range, so this node cannot be NULL. */
256 redisAssert(x != NULL);
257
258 /* Check if score >= min. */
259 if (!zslValueGteMin(x->score,&range)) return NULL;
260 return x;
261 }
262
263 /* Delete all the elements with score between min and max from the skiplist.
264 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
265 * Note that this function takes the reference to the hash table view of the
266 * sorted set, in order to remove the elements from the hash table too. */
267 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
268 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
269 unsigned long removed = 0;
270 int i;
271
272 x = zsl->header;
273 for (i = zsl->level-1; i >= 0; i--) {
274 while (x->level[i].forward && (range.minex ?
275 x->level[i].forward->score <= range.min :
276 x->level[i].forward->score < range.min))
277 x = x->level[i].forward;
278 update[i] = x;
279 }
280
281 /* Current node is the last with score < or <= min. */
282 x = x->level[0].forward;
283
284 /* Delete nodes while in range. */
285 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
286 zskiplistNode *next = x->level[0].forward;
287 zslDeleteNode(zsl,x,update);
288 dictDelete(dict,x->obj);
289 zslFreeNode(x);
290 removed++;
291 x = next;
292 }
293 return removed;
294 }
295
296 /* Delete all the elements with rank between start and end from the skiplist.
297 * Start and end are inclusive. Note that start and end need to be 1-based */
298 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
299 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
300 unsigned long traversed = 0, removed = 0;
301 int i;
302
303 x = zsl->header;
304 for (i = zsl->level-1; i >= 0; i--) {
305 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
306 traversed += x->level[i].span;
307 x = x->level[i].forward;
308 }
309 update[i] = x;
310 }
311
312 traversed++;
313 x = x->level[0].forward;
314 while (x && traversed <= end) {
315 zskiplistNode *next = x->level[0].forward;
316 zslDeleteNode(zsl,x,update);
317 dictDelete(dict,x->obj);
318 zslFreeNode(x);
319 removed++;
320 traversed++;
321 x = next;
322 }
323 return removed;
324 }
325
326 /* Find the rank for an element by both score and key.
327 * Returns 0 when the element cannot be found, rank otherwise.
328 * Note that the rank is 1-based due to the span of zsl->header to the
329 * first element. */
330 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
331 zskiplistNode *x;
332 unsigned long rank = 0;
333 int i;
334
335 x = zsl->header;
336 for (i = zsl->level-1; i >= 0; i--) {
337 while (x->level[i].forward &&
338 (x->level[i].forward->score < score ||
339 (x->level[i].forward->score == score &&
340 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
341 rank += x->level[i].span;
342 x = x->level[i].forward;
343 }
344
345 /* x might be equal to zsl->header, so test if obj is non-NULL */
346 if (x->obj && equalStringObjects(x->obj,o)) {
347 return rank;
348 }
349 }
350 return 0;
351 }
352
353 /* Finds an element by its rank. The rank argument needs to be 1-based. */
354 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
355 zskiplistNode *x;
356 unsigned long traversed = 0;
357 int i;
358
359 x = zsl->header;
360 for (i = zsl->level-1; i >= 0; i--) {
361 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
362 {
363 traversed += x->level[i].span;
364 x = x->level[i].forward;
365 }
366 if (traversed == rank) {
367 return x;
368 }
369 }
370 return NULL;
371 }
372
373 /* Populate the rangespec according to the objects min and max. */
374 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
375 char *eptr;
376 spec->minex = spec->maxex = 0;
377
378 /* Parse the min-max interval. If one of the values is prefixed
379 * by the "(" character, it's considered "open". For instance
380 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
381 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
382 if (min->encoding == REDIS_ENCODING_INT) {
383 spec->min = (long)min->ptr;
384 } else {
385 if (((char*)min->ptr)[0] == '(') {
386 spec->min = strtod((char*)min->ptr+1,&eptr);
387 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
388 spec->minex = 1;
389 } else {
390 spec->min = strtod((char*)min->ptr,&eptr);
391 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
392 }
393 }
394 if (max->encoding == REDIS_ENCODING_INT) {
395 spec->max = (long)max->ptr;
396 } else {
397 if (((char*)max->ptr)[0] == '(') {
398 spec->max = strtod((char*)max->ptr+1,&eptr);
399 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
400 spec->maxex = 1;
401 } else {
402 spec->max = strtod((char*)max->ptr,&eptr);
403 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
404 }
405 }
406
407 return REDIS_OK;
408 }
409
410 /*-----------------------------------------------------------------------------
411 * Ziplist-backed sorted set API
412 *----------------------------------------------------------------------------*/
413
414 double zzlGetScore(unsigned char *sptr) {
415 unsigned char *vstr;
416 unsigned int vlen;
417 long long vlong;
418 char buf[128];
419 double score;
420
421 redisAssert(sptr != NULL);
422 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
423
424 if (vstr) {
425 memcpy(buf,vstr,vlen);
426 buf[vlen] = '\0';
427 score = strtod(buf,NULL);
428 } else {
429 score = vlong;
430 }
431
432 return score;
433 }
434
435 /* Compare element in sorted set with given element. */
436 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
437 unsigned char *vstr;
438 unsigned int vlen;
439 long long vlong;
440 unsigned char vbuf[32];
441 int minlen, cmp;
442
443 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
444 if (vstr == NULL) {
445 /* Store string representation of long long in buf. */
446 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
447 vstr = vbuf;
448 }
449
450 minlen = (vlen < clen) ? vlen : clen;
451 cmp = memcmp(vstr,cstr,minlen);
452 if (cmp == 0) return vlen-clen;
453 return cmp;
454 }
455
456 unsigned int zzlLength(unsigned char *zl) {
457 return ziplistLen(zl)/2;
458 }
459
460 /* Move to next entry based on the values in eptr and sptr. Both are set to
461 * NULL when there is no next entry. */
462 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
463 unsigned char *_eptr, *_sptr;
464 redisAssert(*eptr != NULL && *sptr != NULL);
465
466 _eptr = ziplistNext(zl,*sptr);
467 if (_eptr != NULL) {
468 _sptr = ziplistNext(zl,_eptr);
469 redisAssert(_sptr != NULL);
470 } else {
471 /* No next entry. */
472 _sptr = NULL;
473 }
474
475 *eptr = _eptr;
476 *sptr = _sptr;
477 }
478
479 /* Move to the previous entry based on the values in eptr and sptr. Both are
480 * set to NULL when there is no next entry. */
481 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
482 unsigned char *_eptr, *_sptr;
483 redisAssert(*eptr != NULL && *sptr != NULL);
484
485 _sptr = ziplistPrev(zl,*eptr);
486 if (_sptr != NULL) {
487 _eptr = ziplistPrev(zl,_sptr);
488 redisAssert(_eptr != NULL);
489 } else {
490 /* No previous entry. */
491 _eptr = NULL;
492 }
493
494 *eptr = _eptr;
495 *sptr = _sptr;
496 }
497
498 /* Returns if there is a part of the zset is in range. Should only be used
499 * internally by zzlFirstInRange and zzlLastInRange. */
500 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
501 unsigned char *p;
502 double score;
503
504 /* Test for ranges that will always be empty. */
505 if (range->min > range->max ||
506 (range->min == range->max && (range->minex || range->maxex)))
507 return 0;
508
509 p = ziplistIndex(zl,-1); /* Last score. */
510 redisAssert(p != NULL);
511 score = zzlGetScore(p);
512 if (!zslValueGteMin(score,range))
513 return 0;
514
515 p = ziplistIndex(zl,1); /* First score. */
516 redisAssert(p != NULL);
517 score = zzlGetScore(p);
518 if (!zslValueLteMax(score,range))
519 return 0;
520
521 return 1;
522 }
523
524 /* Find pointer to the first element contained in the specified range.
525 * Returns NULL when no element is contained in the range. */
526 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
527 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
528 double score;
529
530 /* If everything is out of range, return early. */
531 if (!zzlIsInRange(zl,&range)) return NULL;
532
533 while (eptr != NULL) {
534 sptr = ziplistNext(zl,eptr);
535 redisAssert(sptr != NULL);
536
537 score = zzlGetScore(sptr);
538 if (zslValueGteMin(score,&range)) {
539 /* Check if score <= max. */
540 if (zslValueLteMax(score,&range))
541 return eptr;
542 return NULL;
543 }
544
545 /* Move to next element. */
546 eptr = ziplistNext(zl,sptr);
547 }
548
549 return NULL;
550 }
551
552 /* Find pointer to the last element contained in the specified range.
553 * Returns NULL when no element is contained in the range. */
554 unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) {
555 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
556 double score;
557
558 /* If everything is out of range, return early. */
559 if (!zzlIsInRange(zl,&range)) return NULL;
560
561 while (eptr != NULL) {
562 sptr = ziplistNext(zl,eptr);
563 redisAssert(sptr != NULL);
564
565 score = zzlGetScore(sptr);
566 if (zslValueLteMax(score,&range)) {
567 /* Check if score >= min. */
568 if (zslValueGteMin(score,&range))
569 return eptr;
570 return NULL;
571 }
572
573 /* Move to previous element by moving to the score of previous element.
574 * When this returns NULL, we know there also is no element. */
575 sptr = ziplistPrev(zl,eptr);
576 if (sptr != NULL)
577 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
578 else
579 eptr = NULL;
580 }
581
582 return NULL;
583 }
584
585 unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
586 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
587
588 ele = getDecodedObject(ele);
589 while (eptr != NULL) {
590 sptr = ziplistNext(zl,eptr);
591 redisAssert(sptr != NULL);
592
593 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
594 /* Matching element, pull out score. */
595 if (score != NULL) *score = zzlGetScore(sptr);
596 decrRefCount(ele);
597 return eptr;
598 }
599
600 /* Move to next element. */
601 eptr = ziplistNext(zl,sptr);
602 }
603
604 decrRefCount(ele);
605 return NULL;
606 }
607
608 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
609 * don't want to modify the one given as argument. */
610 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
611 unsigned char *p = eptr;
612
613 /* TODO: add function to ziplist API to delete N elements from offset. */
614 zl = ziplistDelete(zl,&p);
615 zl = ziplistDelete(zl,&p);
616 return zl;
617 }
618
619 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
620 unsigned char *sptr;
621 char scorebuf[128];
622 int scorelen;
623 int offset;
624
625 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
626 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
627 if (eptr == NULL) {
628 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
629 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
630 } else {
631 /* Keep offset relative to zl, as it might be re-allocated. */
632 offset = eptr-zl;
633 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
634 eptr = zl+offset;
635
636 /* Insert score after the element. */
637 redisAssert((sptr = ziplistNext(zl,eptr)) != NULL);
638 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
639 }
640
641 return zl;
642 }
643
644 /* Insert (element,score) pair in ziplist. This function assumes the element is
645 * not yet present in the list. */
646 unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
647 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
648 double s;
649
650 ele = getDecodedObject(ele);
651 while (eptr != NULL) {
652 sptr = ziplistNext(zl,eptr);
653 redisAssert(sptr != NULL);
654 s = zzlGetScore(sptr);
655
656 if (s > score) {
657 /* First element with score larger than score for element to be
658 * inserted. This means we should take its spot in the list to
659 * maintain ordering. */
660 zl = zzlInsertAt(zl,eptr,ele,score);
661 break;
662 } else if (s == score) {
663 /* Ensure lexicographical ordering for elements. */
664 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
665 zl = zzlInsertAt(zl,eptr,ele,score);
666 break;
667 }
668 }
669
670 /* Move to next element. */
671 eptr = ziplistNext(zl,sptr);
672 }
673
674 /* Push on tail of list when it was not yet inserted. */
675 if (eptr == NULL)
676 zl = zzlInsertAt(zl,NULL,ele,score);
677
678 decrRefCount(ele);
679 return zl;
680 }
681
682 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
683 unsigned char *eptr, *sptr;
684 double score;
685 unsigned long num = 0;
686
687 if (deleted != NULL) *deleted = 0;
688
689 eptr = zzlFirstInRange(zl,range);
690 if (eptr == NULL) return zl;
691
692 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
693 * byte and ziplistNext will return NULL. */
694 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
695 score = zzlGetScore(sptr);
696 if (zslValueLteMax(score,&range)) {
697 /* Delete both the element and the score. */
698 zl = ziplistDelete(zl,&eptr);
699 zl = ziplistDelete(zl,&eptr);
700 num++;
701 } else {
702 /* No longer in range. */
703 break;
704 }
705 }
706
707 if (deleted != NULL) *deleted = num;
708 return zl;
709 }
710
711 /* Delete all the elements with rank between start and end from the skiplist.
712 * Start and end are inclusive. Note that start and end need to be 1-based */
713 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
714 unsigned int num = (end-start)+1;
715 if (deleted) *deleted = num;
716 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
717 return zl;
718 }
719
720 /*-----------------------------------------------------------------------------
721 * Common sorted set API
722 *----------------------------------------------------------------------------*/
723
724 unsigned int zsetLength(robj *zobj) {
725 int length = -1;
726 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
727 length = zzlLength(zobj->ptr);
728 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
729 length = ((zset*)zobj->ptr)->zsl->length;
730 } else {
731 redisPanic("Unknown sorted set encoding");
732 }
733 return length;
734 }
735
736 void zsetConvert(robj *zobj, int encoding) {
737 zset *zs;
738 zskiplistNode *node, *next;
739 robj *ele;
740 double score;
741
742 if (zobj->encoding == encoding) return;
743 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
744 unsigned char *zl = zobj->ptr;
745 unsigned char *eptr, *sptr;
746 unsigned char *vstr;
747 unsigned int vlen;
748 long long vlong;
749
750 if (encoding != REDIS_ENCODING_RAW)
751 redisPanic("Unknown target encoding");
752
753 zs = zmalloc(sizeof(*zs));
754 zs->dict = dictCreate(&zsetDictType,NULL);
755 zs->zsl = zslCreate();
756
757 eptr = ziplistIndex(zl,0);
758 redisAssert(eptr != NULL);
759 sptr = ziplistNext(zl,eptr);
760 redisAssert(sptr != NULL);
761
762 while (eptr != NULL) {
763 score = zzlGetScore(sptr);
764 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
765 if (vstr == NULL)
766 ele = createStringObjectFromLongLong(vlong);
767 else
768 ele = createStringObject((char*)vstr,vlen);
769
770 /* Has incremented refcount since it was just created. */
771 node = zslInsert(zs->zsl,score,ele);
772 redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
773 incrRefCount(ele); /* Added to dictionary. */
774 zzlNext(zl,&eptr,&sptr);
775 }
776
777 zfree(zobj->ptr);
778 zobj->ptr = zs;
779 zobj->encoding = REDIS_ENCODING_RAW;
780 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
781 unsigned char *zl = ziplistNew();
782
783 if (encoding != REDIS_ENCODING_ZIPLIST)
784 redisPanic("Unknown target encoding");
785
786 /* Approach similar to zslFree(), since we want to free the skiplist at
787 * the same time as creating the ziplist. */
788 zs = zobj->ptr;
789 dictRelease(zs->dict);
790 node = zs->zsl->header->level[0].forward;
791 zfree(zs->zsl->header);
792 zfree(zs->zsl);
793
794 while (node) {
795 ele = getDecodedObject(node->obj);
796 zl = zzlInsertAt(zl,NULL,ele,node->score);
797 decrRefCount(ele);
798
799 next = node->level[0].forward;
800 zslFreeNode(node);
801 node = next;
802 }
803
804 zfree(zs);
805 zobj->ptr = zl;
806 zobj->encoding = REDIS_ENCODING_ZIPLIST;
807 } else {
808 redisPanic("Unknown sorted set encoding");
809 }
810 }
811
812 /*-----------------------------------------------------------------------------
813 * Sorted set commands
814 *----------------------------------------------------------------------------*/
815
816 /* This generic command implements both ZADD and ZINCRBY. */
817 void zaddGenericCommand(redisClient *c, int incr) {
818 static char *nanerr = "resulting score is not a number (NaN)";
819 robj *key = c->argv[1];
820 robj *ele;
821 robj *zobj;
822 robj *curobj;
823 double score, curscore = 0.0;
824
825 if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
826 return;
827
828 zobj = lookupKeyWrite(c->db,key);
829 if (zobj == NULL) {
830 if (server.zset_max_ziplist_entries == 0 ||
831 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
832 {
833 zobj = createZsetObject();
834 } else {
835 zobj = createZsetZiplistObject();
836 }
837 dbAdd(c->db,key,zobj);
838 } else {
839 if (zobj->type != REDIS_ZSET) {
840 addReply(c,shared.wrongtypeerr);
841 return;
842 }
843 }
844
845 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
846 unsigned char *eptr;
847
848 /* Prefer non-encoded element when dealing with ziplists. */
849 ele = c->argv[3];
850 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
851 if (incr) {
852 score += curscore;
853 if (isnan(score)) {
854 addReplyError(c,nanerr);
855 /* Don't need to check if the sorted set is empty, because
856 * we know it has at least one element. */
857 return;
858 }
859 }
860
861 /* Remove and re-insert when score changed. */
862 if (score != curscore) {
863 zobj->ptr = zzlDelete(zobj->ptr,eptr);
864 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
865
866 signalModifiedKey(c->db,key);
867 server.dirty++;
868 }
869
870 if (incr) /* ZINCRBY */
871 addReplyDouble(c,score);
872 else /* ZADD */
873 addReply(c,shared.czero);
874 } else {
875 /* Optimize: check if the element is too large or the list becomes
876 * too long *before* executing zzlInsert. */
877 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
878 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
879 zsetConvert(zobj,REDIS_ENCODING_RAW);
880 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
881 zsetConvert(zobj,REDIS_ENCODING_RAW);
882
883 signalModifiedKey(c->db,key);
884 server.dirty++;
885
886 if (incr) /* ZINCRBY */
887 addReplyDouble(c,score);
888 else /* ZADD */
889 addReply(c,shared.cone);
890 }
891 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
892 zset *zs = zobj->ptr;
893 zskiplistNode *znode;
894 dictEntry *de;
895
896 ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
897 de = dictFind(zs->dict,ele);
898 if (de != NULL) {
899 curobj = dictGetEntryKey(de);
900 curscore = *(double*)dictGetEntryVal(de);
901
902 if (incr) {
903 score += curscore;
904 if (isnan(score)) {
905 addReplyError(c,nanerr);
906 /* Don't need to check if the sorted set is empty, because
907 * we know it has at least one element. */
908 return;
909 }
910 }
911
912 /* Remove and re-insert when score changed. We can safely delete
913 * the key object from the skiplist, since the dictionary still has
914 * a reference to it. */
915 if (score != curscore) {
916 redisAssert(zslDelete(zs->zsl,curscore,curobj));
917 znode = zslInsert(zs->zsl,score,curobj);
918 incrRefCount(curobj); /* Re-inserted in skiplist. */
919 dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
920
921 signalModifiedKey(c->db,key);
922 server.dirty++;
923 }
924
925 if (incr) /* ZINCRBY */
926 addReplyDouble(c,score);
927 else /* ZADD */
928 addReply(c,shared.czero);
929 } else {
930 znode = zslInsert(zs->zsl,score,ele);
931 incrRefCount(ele); /* Inserted in skiplist. */
932 redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
933 incrRefCount(ele); /* Added to dictionary. */
934
935 signalModifiedKey(c->db,key);
936 server.dirty++;
937
938 if (incr) /* ZINCRBY */
939 addReplyDouble(c,score);
940 else /* ZADD */
941 addReply(c,shared.cone);
942 }
943 } else {
944 redisPanic("Unknown sorted set encoding");
945 }
946 }
947
948 void zaddCommand(redisClient *c) {
949 zaddGenericCommand(c,0);
950 }
951
952 void zincrbyCommand(redisClient *c) {
953 zaddGenericCommand(c,1);
954 }
955
956 void zremCommand(redisClient *c) {
957 robj *key = c->argv[1];
958 robj *ele = c->argv[2];
959 robj *zobj;
960
961 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
962 checkType(c,zobj,REDIS_ZSET)) return;
963
964 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
965 unsigned char *eptr;
966
967 if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
968 zobj->ptr = zzlDelete(zobj->ptr,eptr);
969 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
970 } else {
971 addReply(c,shared.czero);
972 return;
973 }
974 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
975 zset *zs = zobj->ptr;
976 dictEntry *de;
977 double score;
978
979 de = dictFind(zs->dict,ele);
980 if (de != NULL) {
981 /* Delete from the skiplist */
982 score = *(double*)dictGetEntryVal(de);
983 redisAssert(zslDelete(zs->zsl,score,ele));
984
985 /* Delete from the hash table */
986 dictDelete(zs->dict,ele);
987 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
988 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
989 } else {
990 addReply(c,shared.czero);
991 return;
992 }
993 } else {
994 redisPanic("Unknown sorted set encoding");
995 }
996
997 signalModifiedKey(c->db,key);
998 server.dirty++;
999 addReply(c,shared.cone);
1000 }
1001
1002 void zremrangebyscoreCommand(redisClient *c) {
1003 robj *key = c->argv[1];
1004 robj *zobj;
1005 zrangespec range;
1006 unsigned long deleted;
1007
1008 /* Parse the range arguments. */
1009 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1010 addReplyError(c,"min or max is not a double");
1011 return;
1012 }
1013
1014 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1015 checkType(c,zobj,REDIS_ZSET)) return;
1016
1017 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1018 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
1019 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1020 zset *zs = zobj->ptr;
1021 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1022 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1023 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1024 } else {
1025 redisPanic("Unknown sorted set encoding");
1026 }
1027
1028 if (deleted) signalModifiedKey(c->db,key);
1029 server.dirty += deleted;
1030 addReplyLongLong(c,deleted);
1031 }
1032
1033 void zremrangebyrankCommand(redisClient *c) {
1034 robj *key = c->argv[1];
1035 robj *zobj;
1036 long start;
1037 long end;
1038 int llen;
1039 unsigned long deleted;
1040
1041 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1042 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1043
1044 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1045 checkType(c,zobj,REDIS_ZSET)) return;
1046
1047 /* Sanitize indexes. */
1048 llen = zsetLength(zobj);
1049 if (start < 0) start = llen+start;
1050 if (end < 0) end = llen+end;
1051 if (start < 0) start = 0;
1052
1053 /* Invariant: start >= 0, so this test will be true when end < 0.
1054 * The range is empty when start > end or start >= length. */
1055 if (start > end || start >= llen) {
1056 addReply(c,shared.czero);
1057 return;
1058 }
1059 if (end >= llen) end = llen-1;
1060
1061 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1062 /* Correct for 1-based rank. */
1063 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1064 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1065 zset *zs = zobj->ptr;
1066
1067 /* Correct for 1-based rank. */
1068 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1069 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1070 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1071 } else {
1072 redisPanic("Unknown sorted set encoding");
1073 }
1074
1075 if (deleted) signalModifiedKey(c->db,key);
1076 server.dirty += deleted;
1077 addReplyLongLong(c,deleted);
1078 }
1079
1080 typedef struct {
1081 robj *subject;
1082 int type; /* Set, sorted set */
1083 int encoding;
1084 double weight;
1085
1086 union {
1087 /* Set iterators. */
1088 union _iterset {
1089 struct {
1090 intset *is;
1091 int ii;
1092 } is;
1093 struct {
1094 dict *dict;
1095 dictIterator *di;
1096 dictEntry *de;
1097 } ht;
1098 } set;
1099
1100 /* Sorted set iterators. */
1101 union _iterzset {
1102 struct {
1103 unsigned char *zl;
1104 unsigned char *eptr, *sptr;
1105 } zl;
1106 struct {
1107 zset *zs;
1108 zskiplistNode *node;
1109 } sl;
1110 } zset;
1111 } iter;
1112 } zsetopsrc;
1113
1114
1115 /* Use dirty flags for pointers that need to be cleaned up in the next
1116 * iteration over the zsetopval. The dirty flag for the long long value is
1117 * special, since long long values don't need cleanup. Instead, it means that
1118 * we already checked that "ell" holds a long long, or tried to convert another
1119 * representation into a long long value. When this was successful,
1120 * OPVAL_VALID_LL is set as well. */
1121 #define OPVAL_DIRTY_ROBJ 1
1122 #define OPVAL_DIRTY_LL 2
1123 #define OPVAL_VALID_LL 4
1124
1125 /* Store value retrieved from the iterator. */
1126 typedef struct {
1127 int flags;
1128 unsigned char _buf[32]; /* Private buffer. */
1129 robj *ele;
1130 unsigned char *estr;
1131 unsigned int elen;
1132 long long ell;
1133 double score;
1134 } zsetopval;
1135
1136 typedef union _iterset iterset;
1137 typedef union _iterzset iterzset;
1138
1139 void zuiInitIterator(zsetopsrc *op) {
1140 if (op->subject == NULL)
1141 return;
1142
1143 if (op->type == REDIS_SET) {
1144 iterset *it = &op->iter.set;
1145 if (op->encoding == REDIS_ENCODING_INTSET) {
1146 it->is.is = op->subject->ptr;
1147 it->is.ii = 0;
1148 } else if (op->encoding == REDIS_ENCODING_HT) {
1149 it->ht.dict = op->subject->ptr;
1150 it->ht.di = dictGetIterator(op->subject->ptr);
1151 it->ht.de = dictNext(it->ht.di);
1152 } else {
1153 redisPanic("Unknown set encoding");
1154 }
1155 } else if (op->type == REDIS_ZSET) {
1156 iterzset *it = &op->iter.zset;
1157 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1158 it->zl.zl = op->subject->ptr;
1159 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1160 if (it->zl.eptr != NULL) {
1161 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1162 redisAssert(it->zl.sptr != NULL);
1163 }
1164 } else if (op->encoding == REDIS_ENCODING_RAW) {
1165 it->sl.zs = op->subject->ptr;
1166 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1167 } else {
1168 redisPanic("Unknown sorted set encoding");
1169 }
1170 } else {
1171 redisPanic("Unsupported type");
1172 }
1173 }
1174
1175 void zuiClearIterator(zsetopsrc *op) {
1176 if (op->subject == NULL)
1177 return;
1178
1179 if (op->type == REDIS_SET) {
1180 iterset *it = &op->iter.set;
1181 if (op->encoding == REDIS_ENCODING_INTSET) {
1182 REDIS_NOTUSED(it); /* skip */
1183 } else if (op->encoding == REDIS_ENCODING_HT) {
1184 dictReleaseIterator(it->ht.di);
1185 } else {
1186 redisPanic("Unknown set encoding");
1187 }
1188 } else if (op->type == REDIS_ZSET) {
1189 iterzset *it = &op->iter.zset;
1190 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1191 REDIS_NOTUSED(it); /* skip */
1192 } else if (op->encoding == REDIS_ENCODING_RAW) {
1193 REDIS_NOTUSED(it); /* skip */
1194 } else {
1195 redisPanic("Unknown sorted set encoding");
1196 }
1197 } else {
1198 redisPanic("Unsupported type");
1199 }
1200 }
1201
1202 int zuiLength(zsetopsrc *op) {
1203 if (op->subject == NULL)
1204 return 0;
1205
1206 if (op->type == REDIS_SET) {
1207 iterset *it = &op->iter.set;
1208 if (op->encoding == REDIS_ENCODING_INTSET) {
1209 return intsetLen(it->is.is);
1210 } else if (op->encoding == REDIS_ENCODING_HT) {
1211 return dictSize(it->ht.dict);
1212 } else {
1213 redisPanic("Unknown set encoding");
1214 }
1215 } else if (op->type == REDIS_ZSET) {
1216 iterzset *it = &op->iter.zset;
1217 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1218 return zzlLength(it->zl.zl);
1219 } else if (op->encoding == REDIS_ENCODING_RAW) {
1220 return it->sl.zs->zsl->length;
1221 } else {
1222 redisPanic("Unknown sorted set encoding");
1223 }
1224 } else {
1225 redisPanic("Unsupported type");
1226 }
1227 }
1228
1229 /* Check if the current value is valid. If so, store it in the passed structure
1230 * and move to the next element. If not valid, this means we have reached the
1231 * end of the structure and can abort. */
1232 int zuiNext(zsetopsrc *op, zsetopval *val) {
1233 if (op->subject == NULL)
1234 return 0;
1235
1236 if (val->flags & OPVAL_DIRTY_ROBJ)
1237 decrRefCount(val->ele);
1238
1239 bzero(val,sizeof(zsetopval));
1240
1241 if (op->type == REDIS_SET) {
1242 iterset *it = &op->iter.set;
1243 if (op->encoding == REDIS_ENCODING_INTSET) {
1244 if (!intsetGet(it->is.is,it->is.ii,&val->ell))
1245 return 0;
1246 val->score = 1.0;
1247
1248 /* Move to next element. */
1249 it->is.ii++;
1250 } else if (op->encoding == REDIS_ENCODING_HT) {
1251 if (it->ht.de == NULL)
1252 return 0;
1253 val->ele = dictGetEntryKey(it->ht.de);
1254 val->score = 1.0;
1255
1256 /* Move to next element. */
1257 it->ht.de = dictNext(it->ht.di);
1258 } else {
1259 redisPanic("Unknown set encoding");
1260 }
1261 } else if (op->type == REDIS_ZSET) {
1262 iterzset *it = &op->iter.zset;
1263 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1264 /* No need to check both, but better be explicit. */
1265 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1266 return 0;
1267 redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1268 val->score = zzlGetScore(it->zl.sptr);
1269
1270 /* Move to next element. */
1271 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
1272 } else if (op->encoding == REDIS_ENCODING_RAW) {
1273 if (it->sl.node == NULL)
1274 return 0;
1275 val->ele = it->sl.node->obj;
1276 val->score = it->sl.node->score;
1277
1278 /* Move to next element. */
1279 it->sl.node = it->sl.node->level[0].forward;
1280 } else {
1281 redisPanic("Unknown sorted set encoding");
1282 }
1283 } else {
1284 redisPanic("Unsupported type");
1285 }
1286 return 1;
1287 }
1288
1289 int zuiLongLongFromValue(zsetopval *val) {
1290 if (!(val->flags & OPVAL_DIRTY_LL)) {
1291 val->flags |= OPVAL_DIRTY_LL;
1292
1293 if (val->ele != NULL) {
1294 if (val->ele->encoding == REDIS_ENCODING_INT) {
1295 val->ell = (long)val->ele->ptr;
1296 val->flags |= OPVAL_VALID_LL;
1297 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1298 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1299 val->flags |= OPVAL_VALID_LL;
1300 } else {
1301 redisPanic("Unsupported element encoding");
1302 }
1303 } else if (val->estr != NULL) {
1304 if (string2ll((char*)val->estr,val->elen,&val->ell))
1305 val->flags |= OPVAL_VALID_LL;
1306 } else {
1307 /* The long long was already set, flag as valid. */
1308 val->flags |= OPVAL_VALID_LL;
1309 }
1310 }
1311 return val->flags & OPVAL_VALID_LL;
1312 }
1313
1314 robj *zuiObjectFromValue(zsetopval *val) {
1315 if (val->ele == NULL) {
1316 if (val->estr != NULL) {
1317 val->ele = createStringObject((char*)val->estr,val->elen);
1318 } else {
1319 val->ele = createStringObjectFromLongLong(val->ell);
1320 }
1321 val->flags |= OPVAL_DIRTY_ROBJ;
1322 }
1323 return val->ele;
1324 }
1325
1326 int zuiBufferFromValue(zsetopval *val) {
1327 if (val->estr == NULL) {
1328 if (val->ele != NULL) {
1329 if (val->ele->encoding == REDIS_ENCODING_INT) {
1330 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1331 val->estr = val->_buf;
1332 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1333 val->elen = sdslen(val->ele->ptr);
1334 val->estr = val->ele->ptr;
1335 } else {
1336 redisPanic("Unsupported element encoding");
1337 }
1338 } else {
1339 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1340 val->estr = val->_buf;
1341 }
1342 }
1343 return 1;
1344 }
1345
1346 /* Find value pointed to by val in the source pointer to by op. When found,
1347 * return 1 and store its score in target. Return 0 otherwise. */
1348 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1349 if (op->subject == NULL)
1350 return 0;
1351
1352 if (op->type == REDIS_SET) {
1353 iterset *it = &op->iter.set;
1354
1355 if (op->encoding == REDIS_ENCODING_INTSET) {
1356 if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1357 *score = 1.0;
1358 return 1;
1359 } else {
1360 return 0;
1361 }
1362 } else if (op->encoding == REDIS_ENCODING_HT) {
1363 zuiObjectFromValue(val);
1364 if (dictFind(it->ht.dict,val->ele) != NULL) {
1365 *score = 1.0;
1366 return 1;
1367 } else {
1368 return 0;
1369 }
1370 } else {
1371 redisPanic("Unknown set encoding");
1372 }
1373 } else if (op->type == REDIS_ZSET) {
1374 iterzset *it = &op->iter.zset;
1375 zuiObjectFromValue(val);
1376
1377 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1378 if (zzlFind(it->zl.zl,val->ele,score) != NULL) {
1379 /* Score is already set by zzlFind. */
1380 return 1;
1381 } else {
1382 return 0;
1383 }
1384 } else if (op->encoding == REDIS_ENCODING_RAW) {
1385 dictEntry *de;
1386 if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
1387 *score = *(double*)dictGetEntryVal(de);
1388 return 1;
1389 } else {
1390 return 0;
1391 }
1392 } else {
1393 redisPanic("Unknown sorted set encoding");
1394 }
1395 } else {
1396 redisPanic("Unsupported type");
1397 }
1398 }
1399
1400 int zuiCompareByCardinality(const void *s1, const void *s2) {
1401 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
1402 }
1403
1404 #define REDIS_AGGR_SUM 1
1405 #define REDIS_AGGR_MIN 2
1406 #define REDIS_AGGR_MAX 3
1407 #define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
1408
1409 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1410 if (aggregate == REDIS_AGGR_SUM) {
1411 *target = *target + val;
1412 /* The result of adding two doubles is NaN when one variable
1413 * is +inf and the other is -inf. When these numbers are added,
1414 * we maintain the convention of the result being 0.0. */
1415 if (isnan(*target)) *target = 0.0;
1416 } else if (aggregate == REDIS_AGGR_MIN) {
1417 *target = val < *target ? val : *target;
1418 } else if (aggregate == REDIS_AGGR_MAX) {
1419 *target = val > *target ? val : *target;
1420 } else {
1421 /* safety net */
1422 redisPanic("Unknown ZUNION/INTER aggregate type");
1423 }
1424 }
1425
1426 void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1427 int i, j, setnum;
1428 int aggregate = REDIS_AGGR_SUM;
1429 zsetopsrc *src;
1430 zsetopval zval;
1431 robj *tmp;
1432 unsigned int maxelelen = 0;
1433 robj *dstobj;
1434 zset *dstzset;
1435 zskiplistNode *znode;
1436 int touched = 0;
1437
1438 /* expect setnum input keys to be given */
1439 setnum = atoi(c->argv[2]->ptr);
1440 if (setnum < 1) {
1441 addReplyError(c,
1442 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1443 return;
1444 }
1445
1446 /* test if the expected number of keys would overflow */
1447 if (3+setnum > c->argc) {
1448 addReply(c,shared.syntaxerr);
1449 return;
1450 }
1451
1452 /* read keys to be used for input */
1453 src = zcalloc(sizeof(zsetopsrc) * setnum);
1454 for (i = 0, j = 3; i < setnum; i++, j++) {
1455 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1456 if (obj != NULL) {
1457 if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
1458 zfree(src);
1459 addReply(c,shared.wrongtypeerr);
1460 return;
1461 }
1462
1463 src[i].subject = obj;
1464 src[i].type = obj->type;
1465 src[i].encoding = obj->encoding;
1466 } else {
1467 src[i].subject = NULL;
1468 }
1469
1470 /* Default all weights to 1. */
1471 src[i].weight = 1.0;
1472 }
1473
1474 /* parse optional extra arguments */
1475 if (j < c->argc) {
1476 int remaining = c->argc - j;
1477
1478 while (remaining) {
1479 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1480 j++; remaining--;
1481 for (i = 0; i < setnum; i++, j++, remaining--) {
1482 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1483 "weight value is not a double") != REDIS_OK)
1484 {
1485 zfree(src);
1486 return;
1487 }
1488 }
1489 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1490 j++; remaining--;
1491 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1492 aggregate = REDIS_AGGR_SUM;
1493 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1494 aggregate = REDIS_AGGR_MIN;
1495 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1496 aggregate = REDIS_AGGR_MAX;
1497 } else {
1498 zfree(src);
1499 addReply(c,shared.syntaxerr);
1500 return;
1501 }
1502 j++; remaining--;
1503 } else {
1504 zfree(src);
1505 addReply(c,shared.syntaxerr);
1506 return;
1507 }
1508 }
1509 }
1510
1511 for (i = 0; i < setnum; i++)
1512 zuiInitIterator(&src[i]);
1513
1514 /* sort sets from the smallest to largest, this will improve our
1515 * algorithm's performance */
1516 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
1517
1518 dstobj = createZsetObject();
1519 dstzset = dstobj->ptr;
1520
1521 if (op == REDIS_OP_INTER) {
1522 /* Skip everything if the smallest input is empty. */
1523 if (zuiLength(&src[0]) > 0) {
1524 /* Precondition: as src[0] is non-empty and the inputs are ordered
1525 * by size, all src[i > 0] are non-empty too. */
1526 while (zuiNext(&src[0],&zval)) {
1527 double score, value;
1528
1529 score = src[0].weight * zval.score;
1530 for (j = 1; j < setnum; j++) {
1531 if (zuiFind(&src[j],&zval,&value)) {
1532 value *= src[j].weight;
1533 zunionInterAggregate(&score,value,aggregate);
1534 } else {
1535 break;
1536 }
1537 }
1538
1539 /* Only continue when present in every input. */
1540 if (j == setnum) {
1541 tmp = zuiObjectFromValue(&zval);
1542 znode = zslInsert(dstzset->zsl,score,tmp);
1543 incrRefCount(tmp); /* added to skiplist */
1544 dictAdd(dstzset->dict,tmp,&znode->score);
1545 incrRefCount(tmp); /* added to dictionary */
1546
1547 if (tmp->encoding == REDIS_ENCODING_RAW)
1548 if (sdslen(tmp->ptr) > maxelelen)
1549 maxelelen = sdslen(tmp->ptr);
1550 }
1551 }
1552 }
1553 } else if (op == REDIS_OP_UNION) {
1554 for (i = 0; i < setnum; i++) {
1555 if (zuiLength(&src[0]) == 0)
1556 continue;
1557
1558 while (zuiNext(&src[i],&zval)) {
1559 double score, value;
1560
1561 /* Skip key when already processed */
1562 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
1563 continue;
1564
1565 /* Initialize score */
1566 score = src[i].weight * zval.score;
1567
1568 /* Because the inputs are sorted by size, it's only possible
1569 * for sets at larger indices to hold this element. */
1570 for (j = (i+1); j < setnum; j++) {
1571 if (zuiFind(&src[j],&zval,&value)) {
1572 value *= src[j].weight;
1573 zunionInterAggregate(&score,value,aggregate);
1574 }
1575 }
1576
1577 tmp = zuiObjectFromValue(&zval);
1578 znode = zslInsert(dstzset->zsl,score,tmp);
1579 incrRefCount(zval.ele); /* added to skiplist */
1580 dictAdd(dstzset->dict,tmp,&znode->score);
1581 incrRefCount(zval.ele); /* added to dictionary */
1582
1583 if (tmp->encoding == REDIS_ENCODING_RAW)
1584 if (sdslen(tmp->ptr) > maxelelen)
1585 maxelelen = sdslen(tmp->ptr);
1586 }
1587 }
1588 } else {
1589 redisPanic("Unknown operator");
1590 }
1591
1592 for (i = 0; i < setnum; i++)
1593 zuiClearIterator(&src[i]);
1594
1595 if (dbDelete(c->db,dstkey)) {
1596 signalModifiedKey(c->db,dstkey);
1597 touched = 1;
1598 server.dirty++;
1599 }
1600 if (dstzset->zsl->length) {
1601 /* Convert to ziplist when in limits. */
1602 if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
1603 maxelelen <= server.zset_max_ziplist_value)
1604 zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
1605
1606 dbAdd(c->db,dstkey,dstobj);
1607 addReplyLongLong(c,zsetLength(dstobj));
1608 if (!touched) signalModifiedKey(c->db,dstkey);
1609 server.dirty++;
1610 } else {
1611 decrRefCount(dstobj);
1612 addReply(c,shared.czero);
1613 }
1614 zfree(src);
1615 }
1616
1617 void zunionstoreCommand(redisClient *c) {
1618 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1619 }
1620
1621 void zinterstoreCommand(redisClient *c) {
1622 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1623 }
1624
1625 void zrangeGenericCommand(redisClient *c, int reverse) {
1626 robj *key = c->argv[1];
1627 robj *zobj;
1628 int withscores = 0;
1629 long start;
1630 long end;
1631 int llen;
1632 int rangelen;
1633
1634 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1635 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1636
1637 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1638 withscores = 1;
1639 } else if (c->argc >= 5) {
1640 addReply(c,shared.syntaxerr);
1641 return;
1642 }
1643
1644 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1645 || checkType(c,zobj,REDIS_ZSET)) return;
1646
1647 /* Sanitize indexes. */
1648 llen = zsetLength(zobj);
1649 if (start < 0) start = llen+start;
1650 if (end < 0) end = llen+end;
1651 if (start < 0) start = 0;
1652
1653 /* Invariant: start >= 0, so this test will be true when end < 0.
1654 * The range is empty when start > end or start >= length. */
1655 if (start > end || start >= llen) {
1656 addReply(c,shared.emptymultibulk);
1657 return;
1658 }
1659 if (end >= llen) end = llen-1;
1660 rangelen = (end-start)+1;
1661
1662 /* Return the result in form of a multi-bulk reply */
1663 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1664
1665 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1666 unsigned char *zl = zobj->ptr;
1667 unsigned char *eptr, *sptr;
1668 unsigned char *vstr;
1669 unsigned int vlen;
1670 long long vlong;
1671
1672 if (reverse)
1673 eptr = ziplistIndex(zl,-2-(2*start));
1674 else
1675 eptr = ziplistIndex(zl,2*start);
1676
1677 redisAssert(eptr != NULL);
1678 sptr = ziplistNext(zl,eptr);
1679
1680 while (rangelen--) {
1681 redisAssert(eptr != NULL && sptr != NULL);
1682 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1683 if (vstr == NULL)
1684 addReplyBulkLongLong(c,vlong);
1685 else
1686 addReplyBulkCBuffer(c,vstr,vlen);
1687
1688 if (withscores)
1689 addReplyDouble(c,zzlGetScore(sptr));
1690
1691 if (reverse)
1692 zzlPrev(zl,&eptr,&sptr);
1693 else
1694 zzlNext(zl,&eptr,&sptr);
1695 }
1696
1697 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1698 zset *zs = zobj->ptr;
1699 zskiplist *zsl = zs->zsl;
1700 zskiplistNode *ln;
1701 robj *ele;
1702
1703 /* Check if starting point is trivial, before doing log(N) lookup. */
1704 if (reverse) {
1705 ln = zsl->tail;
1706 if (start > 0)
1707 ln = zslGetElementByRank(zsl,llen-start);
1708 } else {
1709 ln = zsl->header->level[0].forward;
1710 if (start > 0)
1711 ln = zslGetElementByRank(zsl,start+1);
1712 }
1713
1714 while(rangelen--) {
1715 redisAssert(ln != NULL);
1716 ele = ln->obj;
1717 addReplyBulk(c,ele);
1718 if (withscores)
1719 addReplyDouble(c,ln->score);
1720 ln = reverse ? ln->backward : ln->level[0].forward;
1721 }
1722 } else {
1723 redisPanic("Unknown sorted set encoding");
1724 }
1725 }
1726
1727 void zrangeCommand(redisClient *c) {
1728 zrangeGenericCommand(c,0);
1729 }
1730
1731 void zrevrangeCommand(redisClient *c) {
1732 zrangeGenericCommand(c,1);
1733 }
1734
1735 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
1736 * If "justcount", only the number of elements in the range is returned. */
1737 void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
1738 zrangespec range;
1739 robj *key = c->argv[1];
1740 robj *emptyreply, *zobj;
1741 int offset = 0, limit = -1;
1742 int withscores = 0;
1743 unsigned long rangelen = 0;
1744 void *replylen = NULL;
1745 int minidx, maxidx;
1746
1747 /* Parse the range arguments. */
1748 if (reverse) {
1749 /* Range is given as [max,min] */
1750 maxidx = 2; minidx = 3;
1751 } else {
1752 /* Range is given as [min,max] */
1753 minidx = 2; maxidx = 3;
1754 }
1755
1756 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
1757 addReplyError(c,"min or max is not a double");
1758 return;
1759 }
1760
1761 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1762 * 4 arguments, so we'll never enter the following code path. */
1763 if (c->argc > 4) {
1764 int remaining = c->argc - 4;
1765 int pos = 4;
1766
1767 while (remaining) {
1768 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1769 pos++; remaining--;
1770 withscores = 1;
1771 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1772 offset = atoi(c->argv[pos+1]->ptr);
1773 limit = atoi(c->argv[pos+2]->ptr);
1774 pos += 3; remaining -= 3;
1775 } else {
1776 addReply(c,shared.syntaxerr);
1777 return;
1778 }
1779 }
1780 }
1781
1782 /* Ok, lookup the key and get the range */
1783 emptyreply = justcount ? shared.czero : shared.emptymultibulk;
1784 if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL ||
1785 checkType(c,zobj,REDIS_ZSET)) return;
1786
1787 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1788 unsigned char *zl = zobj->ptr;
1789 unsigned char *eptr, *sptr;
1790 unsigned char *vstr;
1791 unsigned int vlen;
1792 long long vlong;
1793 double score;
1794
1795 /* If reversed, get the last node in range as starting point. */
1796 if (reverse)
1797 eptr = zzlLastInRange(zl,range);
1798 else
1799 eptr = zzlFirstInRange(zl,range);
1800
1801 /* No "first" element in the specified interval. */
1802 if (eptr == NULL) {
1803 addReply(c,emptyreply);
1804 return;
1805 }
1806
1807 /* Get score pointer for the first element. */
1808 redisAssert(eptr != NULL);
1809 sptr = ziplistNext(zl,eptr);
1810
1811 /* We don't know in advance how many matching elements there are in the
1812 * list, so we push this object that will represent the multi-bulk
1813 * length in the output buffer, and will "fix" it later */
1814 if (!justcount)
1815 replylen = addDeferredMultiBulkLength(c);
1816
1817 /* If there is an offset, just traverse the number of elements without
1818 * checking the score because that is done in the next loop. */
1819 while (eptr && offset--)
1820 if (reverse)
1821 zzlPrev(zl,&eptr,&sptr);
1822 else
1823 zzlNext(zl,&eptr,&sptr);
1824
1825 while (eptr && limit--) {
1826 score = zzlGetScore(sptr);
1827
1828 /* Abort when the node is no longer in range. */
1829 if (reverse) {
1830 if (!zslValueGteMin(score,&range)) break;
1831 } else {
1832 if (!zslValueLteMax(score,&range)) break;
1833 }
1834
1835 /* Do our magic */
1836 rangelen++;
1837 if (!justcount) {
1838 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1839 if (vstr == NULL)
1840 addReplyBulkLongLong(c,vlong);
1841 else
1842 addReplyBulkCBuffer(c,vstr,vlen);
1843
1844 if (withscores)
1845 addReplyDouble(c,score);
1846 }
1847
1848 /* Move to next node */
1849 if (reverse)
1850 zzlPrev(zl,&eptr,&sptr);
1851 else
1852 zzlNext(zl,&eptr,&sptr);
1853 }
1854 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1855 zset *zs = zobj->ptr;
1856 zskiplist *zsl = zs->zsl;
1857 zskiplistNode *ln;
1858
1859 /* If reversed, get the last node in range as starting point. */
1860 if (reverse)
1861 ln = zslLastInRange(zsl,range);
1862 else
1863 ln = zslFirstInRange(zsl,range);
1864
1865 /* No "first" element in the specified interval. */
1866 if (ln == NULL) {
1867 addReply(c,emptyreply);
1868 return;
1869 }
1870
1871 /* We don't know in advance how many matching elements there are in the
1872 * list, so we push this object that will represent the multi-bulk
1873 * length in the output buffer, and will "fix" it later */
1874 if (!justcount)
1875 replylen = addDeferredMultiBulkLength(c);
1876
1877 /* If there is an offset, just traverse the number of elements without
1878 * checking the score because that is done in the next loop. */
1879 while (ln && offset--)
1880 ln = reverse ? ln->backward : ln->level[0].forward;
1881
1882 while (ln && limit--) {
1883 /* Abort when the node is no longer in range. */
1884 if (reverse) {
1885 if (!zslValueGteMin(ln->score,&range)) break;
1886 } else {
1887 if (!zslValueLteMax(ln->score,&range)) break;
1888 }
1889
1890 /* Do our magic */
1891 rangelen++;
1892 if (!justcount) {
1893 addReplyBulk(c,ln->obj);
1894 if (withscores)
1895 addReplyDouble(c,ln->score);
1896 }
1897
1898 /* Move to next node */
1899 ln = reverse ? ln->backward : ln->level[0].forward;
1900 }
1901 } else {
1902 redisPanic("Unknown sorted set encoding");
1903 }
1904
1905 if (justcount) {
1906 addReplyLongLong(c,(long)rangelen);
1907 } else {
1908 if (withscores) rangelen *= 2;
1909 setDeferredMultiBulkLength(c,replylen,rangelen);
1910 }
1911 }
1912
1913 void zrangebyscoreCommand(redisClient *c) {
1914 genericZrangebyscoreCommand(c,0,0);
1915 }
1916
1917 void zrevrangebyscoreCommand(redisClient *c) {
1918 genericZrangebyscoreCommand(c,1,0);
1919 }
1920
1921 void zcountCommand(redisClient *c) {
1922 genericZrangebyscoreCommand(c,0,1);
1923 }
1924
1925 void zcardCommand(redisClient *c) {
1926 robj *key = c->argv[1];
1927 robj *zobj;
1928
1929 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
1930 checkType(c,zobj,REDIS_ZSET)) return;
1931
1932 addReplyLongLong(c,zsetLength(zobj));
1933 }
1934
1935 void zscoreCommand(redisClient *c) {
1936 robj *key = c->argv[1];
1937 robj *zobj;
1938 double score;
1939
1940 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1941 checkType(c,zobj,REDIS_ZSET)) return;
1942
1943 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1944 if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
1945 addReplyDouble(c,score);
1946 else
1947 addReply(c,shared.nullbulk);
1948 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1949 zset *zs = zobj->ptr;
1950 dictEntry *de;
1951
1952 c->argv[2] = tryObjectEncoding(c->argv[2]);
1953 de = dictFind(zs->dict,c->argv[2]);
1954 if (de != NULL) {
1955 score = *(double*)dictGetEntryVal(de);
1956 addReplyDouble(c,score);
1957 } else {
1958 addReply(c,shared.nullbulk);
1959 }
1960 } else {
1961 redisPanic("Unknown sorted set encoding");
1962 }
1963 }
1964
1965 void zrankGenericCommand(redisClient *c, int reverse) {
1966 robj *key = c->argv[1];
1967 robj *ele = c->argv[2];
1968 robj *zobj;
1969 unsigned long llen;
1970 unsigned long rank;
1971
1972 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1973 checkType(c,zobj,REDIS_ZSET)) return;
1974 llen = zsetLength(zobj);
1975
1976 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
1977 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1978 unsigned char *zl = zobj->ptr;
1979 unsigned char *eptr, *sptr;
1980
1981 eptr = ziplistIndex(zl,0);
1982 redisAssert(eptr != NULL);
1983 sptr = ziplistNext(zl,eptr);
1984 redisAssert(sptr != NULL);
1985
1986 rank = 1;
1987 while(eptr != NULL) {
1988 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
1989 break;
1990 rank++;
1991 zzlNext(zl,&eptr,&sptr);
1992 }
1993
1994 if (eptr != NULL) {
1995 if (reverse)
1996 addReplyLongLong(c,llen-rank);
1997 else
1998 addReplyLongLong(c,rank-1);
1999 } else {
2000 addReply(c,shared.nullbulk);
2001 }
2002 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
2003 zset *zs = zobj->ptr;
2004 zskiplist *zsl = zs->zsl;
2005 dictEntry *de;
2006 double score;
2007
2008 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
2009 de = dictFind(zs->dict,ele);
2010 if (de != NULL) {
2011 score = *(double*)dictGetEntryVal(de);
2012 rank = zslGetRank(zsl,score,ele);
2013 redisAssert(rank); /* Existing elements always have a rank. */
2014 if (reverse)
2015 addReplyLongLong(c,llen-rank);
2016 else
2017 addReplyLongLong(c,rank-1);
2018 } else {
2019 addReply(c,shared.nullbulk);
2020 }
2021 } else {
2022 redisPanic("Unknown sorted set encoding");
2023 }
2024 }
2025
2026 void zrankCommand(redisClient *c) {
2027 zrankGenericCommand(c, 0);
2028 }
2029
2030 void zrevrankCommand(redisClient *c) {
2031 zrankGenericCommand(c, 1);
2032 }