]> git.saurik.com Git - redis.git/blob - src/t_zset.c
1ef5e9911fc628cf4223441eefd9ffafa4d63000
[redis.git] / src / t_zset.c
1 #include "redis.h"
2
3 #include <math.h>
4
5 /*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31 }
32
33 zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48 }
49
50 void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
52 zfree(node);
53 }
54
55 void zslFree(zskiplist *zsl) {
56 zskiplistNode *node = zsl->header->level[0].forward, *next;
57
58 zfree(zsl->header);
59 while(node) {
60 next = node->level[0].forward;
61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65 }
66
67 int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72 }
73
74 zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
101 update[i]->level[i].span = zsl->length;
102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
109
110 /* update span covered by update[i] as x is inserted here */
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
117 update[i]->level[i].span++;
118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
123 else
124 zsl->tail = x;
125 zsl->length++;
126 return x;
127 }
128
129 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
136 } else {
137 update[i]->level[i].span -= 1;
138 }
139 }
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
142 } else {
143 zsl->tail = x->backward;
144 }
145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
146 zsl->level--;
147 zsl->length--;
148 }
149
150 /* Delete an element with matching score/object from the skiplist. */
151 int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
166 x = x->level[0].forward;
167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175 }
176
177 /* Struct to hold a inclusive/exclusive range spec. */
178 typedef struct {
179 double min, max;
180 int minex, maxex; /* are min or max exclusive? */
181 } zrangespec;
182
183 static int zslValueGteMin(double value, zrangespec *spec) {
184 return spec->minex ? (value > spec->min) : (value >= spec->min);
185 }
186
187 static int zslValueLteMax(double value, zrangespec *spec) {
188 return spec->maxex ? (value < spec->max) : (value <= spec->max);
189 }
190
191 static int zslValueInRange(double value, zrangespec *spec) {
192 return zslValueGteMin(value,spec) && zslValueLteMax(value,spec);
193 }
194
195 /* Returns if there is a part of the zset is in range. */
196 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
197 zskiplistNode *x;
198
199 /* Test for ranges that will always be empty. */
200 if (range->min > range->max ||
201 (range->min == range->max && (range->minex || range->maxex)))
202 return 0;
203 x = zsl->tail;
204 if (x == NULL || !zslValueGteMin(x->score,range))
205 return 0;
206 x = zsl->header->level[0].forward;
207 if (x == NULL || !zslValueLteMax(x->score,range))
208 return 0;
209 return 1;
210 }
211
212 /* Find the first node that is contained in the specified range.
213 * Returns NULL when no element is contained in the range. */
214 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
215 zskiplistNode *x;
216 int i;
217
218 /* If everything is out of range, return early. */
219 if (!zslIsInRange(zsl,&range)) return NULL;
220
221 x = zsl->header;
222 for (i = zsl->level-1; i >= 0; i--) {
223 /* Go forward while *OUT* of range. */
224 while (x->level[i].forward &&
225 !zslValueGteMin(x->level[i].forward->score,&range))
226 x = x->level[i].forward;
227 }
228
229 /* The tail is in range, so the previous block should always return a
230 * node that is non-NULL and the last one to be out of range. */
231 x = x->level[0].forward;
232 redisAssert(x != NULL && zslValueInRange(x->score,&range));
233 return x;
234 }
235
236 /* Find the last node that is contained in the specified range.
237 * Returns NULL when no element is contained in the range. */
238 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
239 zskiplistNode *x;
240 int i;
241
242 /* If everything is out of range, return early. */
243 if (!zslIsInRange(zsl,&range)) return NULL;
244
245 x = zsl->header;
246 for (i = zsl->level-1; i >= 0; i--) {
247 /* Go forward while *IN* range. */
248 while (x->level[i].forward &&
249 zslValueLteMax(x->level[i].forward->score,&range))
250 x = x->level[i].forward;
251 }
252
253 /* The header is in range, so the previous block should always return a
254 * node that is non-NULL and in range. */
255 redisAssert(x != NULL && zslValueInRange(x->score,&range));
256 return x;
257 }
258
259 /* Delete all the elements with score between min and max from the skiplist.
260 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
261 * Note that this function takes the reference to the hash table view of the
262 * sorted set, in order to remove the elements from the hash table too. */
263 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
264 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
265 unsigned long removed = 0;
266 int i;
267
268 x = zsl->header;
269 for (i = zsl->level-1; i >= 0; i--) {
270 while (x->level[i].forward && (range.minex ?
271 x->level[i].forward->score <= range.min :
272 x->level[i].forward->score < range.min))
273 x = x->level[i].forward;
274 update[i] = x;
275 }
276
277 /* Current node is the last with score < or <= min. */
278 x = x->level[0].forward;
279
280 /* Delete nodes while in range. */
281 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
282 zskiplistNode *next = x->level[0].forward;
283 zslDeleteNode(zsl,x,update);
284 dictDelete(dict,x->obj);
285 zslFreeNode(x);
286 removed++;
287 x = next;
288 }
289 return removed;
290 }
291
292 /* Delete all the elements with rank between start and end from the skiplist.
293 * Start and end are inclusive. Note that start and end need to be 1-based */
294 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
295 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
296 unsigned long traversed = 0, removed = 0;
297 int i;
298
299 x = zsl->header;
300 for (i = zsl->level-1; i >= 0; i--) {
301 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
302 traversed += x->level[i].span;
303 x = x->level[i].forward;
304 }
305 update[i] = x;
306 }
307
308 traversed++;
309 x = x->level[0].forward;
310 while (x && traversed <= end) {
311 zskiplistNode *next = x->level[0].forward;
312 zslDeleteNode(zsl,x,update);
313 dictDelete(dict,x->obj);
314 zslFreeNode(x);
315 removed++;
316 traversed++;
317 x = next;
318 }
319 return removed;
320 }
321
322 /* Find the rank for an element by both score and key.
323 * Returns 0 when the element cannot be found, rank otherwise.
324 * Note that the rank is 1-based due to the span of zsl->header to the
325 * first element. */
326 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
327 zskiplistNode *x;
328 unsigned long rank = 0;
329 int i;
330
331 x = zsl->header;
332 for (i = zsl->level-1; i >= 0; i--) {
333 while (x->level[i].forward &&
334 (x->level[i].forward->score < score ||
335 (x->level[i].forward->score == score &&
336 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
337 rank += x->level[i].span;
338 x = x->level[i].forward;
339 }
340
341 /* x might be equal to zsl->header, so test if obj is non-NULL */
342 if (x->obj && equalStringObjects(x->obj,o)) {
343 return rank;
344 }
345 }
346 return 0;
347 }
348
349 /* Finds an element by its rank. The rank argument needs to be 1-based. */
350 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
351 zskiplistNode *x;
352 unsigned long traversed = 0;
353 int i;
354
355 x = zsl->header;
356 for (i = zsl->level-1; i >= 0; i--) {
357 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
358 {
359 traversed += x->level[i].span;
360 x = x->level[i].forward;
361 }
362 if (traversed == rank) {
363 return x;
364 }
365 }
366 return NULL;
367 }
368
369 /* Populate the rangespec according to the objects min and max. */
370 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
371 char *eptr;
372 spec->minex = spec->maxex = 0;
373
374 /* Parse the min-max interval. If one of the values is prefixed
375 * by the "(" character, it's considered "open". For instance
376 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
377 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
378 if (min->encoding == REDIS_ENCODING_INT) {
379 spec->min = (long)min->ptr;
380 } else {
381 if (((char*)min->ptr)[0] == '(') {
382 spec->min = strtod((char*)min->ptr+1,&eptr);
383 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
384 spec->minex = 1;
385 } else {
386 spec->min = strtod((char*)min->ptr,&eptr);
387 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
388 }
389 }
390 if (max->encoding == REDIS_ENCODING_INT) {
391 spec->max = (long)max->ptr;
392 } else {
393 if (((char*)max->ptr)[0] == '(') {
394 spec->max = strtod((char*)max->ptr+1,&eptr);
395 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
396 spec->maxex = 1;
397 } else {
398 spec->max = strtod((char*)max->ptr,&eptr);
399 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
400 }
401 }
402
403 return REDIS_OK;
404 }
405
406 /*-----------------------------------------------------------------------------
407 * Ziplist-backed sorted set API
408 *----------------------------------------------------------------------------*/
409
410 double zzlGetScore(unsigned char *sptr) {
411 unsigned char *vstr;
412 unsigned int vlen;
413 long long vlong;
414 char buf[128];
415 double score;
416
417 redisAssert(sptr != NULL);
418 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
419
420 if (vstr) {
421 memcpy(buf,vstr,vlen);
422 buf[vlen] = '\0';
423 score = strtod(buf,NULL);
424 } else {
425 score = vlong;
426 }
427
428 return score;
429 }
430
431 /* Compare element in sorted set with given element. */
432 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
433 unsigned char *vstr;
434 unsigned int vlen;
435 long long vlong;
436 unsigned char vbuf[32];
437 int minlen, cmp;
438
439 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
440 if (vstr == NULL) {
441 /* Store string representation of long long in buf. */
442 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
443 vstr = vbuf;
444 }
445
446 minlen = (vlen < clen) ? vlen : clen;
447 cmp = memcmp(vstr,cstr,minlen);
448 if (cmp == 0) return vlen-clen;
449 return cmp;
450 }
451
452 unsigned int zzlLength(unsigned char *zl) {
453 return ziplistLen(zl)/2;
454 }
455
456 /* Move to next entry based on the values in eptr and sptr. Both are set to
457 * NULL when there is no next entry. */
458 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
459 unsigned char *_eptr, *_sptr;
460 redisAssert(*eptr != NULL && *sptr != NULL);
461
462 _eptr = ziplistNext(zl,*sptr);
463 if (_eptr != NULL) {
464 _sptr = ziplistNext(zl,_eptr);
465 redisAssert(_sptr != NULL);
466 } else {
467 /* No next entry. */
468 _sptr = NULL;
469 }
470
471 *eptr = _eptr;
472 *sptr = _sptr;
473 }
474
475 /* Move to the previous entry based on the values in eptr and sptr. Both are
476 * set to NULL when there is no next entry. */
477 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
478 unsigned char *_eptr, *_sptr;
479 redisAssert(*eptr != NULL && *sptr != NULL);
480
481 _sptr = ziplistPrev(zl,*eptr);
482 if (_sptr != NULL) {
483 _eptr = ziplistPrev(zl,_sptr);
484 redisAssert(_eptr != NULL);
485 } else {
486 /* No previous entry. */
487 _eptr = NULL;
488 }
489
490 *eptr = _eptr;
491 *sptr = _sptr;
492 }
493
494 /* Returns if there is a part of the zset is in range. Should only be used
495 * internally by zzlFirstInRange and zzlLastInRange. */
496 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
497 unsigned char *p;
498 double score;
499
500 /* Test for ranges that will always be empty. */
501 if (range->min > range->max ||
502 (range->min == range->max && (range->minex || range->maxex)))
503 return 0;
504
505 p = ziplistIndex(zl,-1); /* Last score. */
506 redisAssert(p != NULL);
507 score = zzlGetScore(p);
508 if (!zslValueGteMin(score,range))
509 return 0;
510
511 p = ziplistIndex(zl,1); /* First score. */
512 redisAssert(p != NULL);
513 score = zzlGetScore(p);
514 if (!zslValueLteMax(score,range))
515 return 0;
516
517 return 1;
518 }
519
520 /* Find pointer to the first element contained in the specified range.
521 * Returns NULL when no element is contained in the range. */
522 unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) {
523 unsigned char *zl = zobj->ptr;
524 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
525 double score;
526
527 /* If everything is out of range, return early. */
528 if (!zzlIsInRange(zl,&range)) return NULL;
529
530 while (eptr != NULL) {
531 sptr = ziplistNext(zl,eptr);
532 redisAssert(sptr != NULL);
533
534 score = zzlGetScore(sptr);
535 if (zslValueGteMin(score,&range))
536 return eptr;
537
538 /* Move to next element. */
539 eptr = ziplistNext(zl,sptr);
540 }
541
542 return NULL;
543 }
544
545 /* Find pointer to the last element contained in the specified range.
546 * Returns NULL when no element is contained in the range. */
547 unsigned char *zzlLastInRange(robj *zobj, zrangespec range) {
548 unsigned char *zl = zobj->ptr;
549 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
550 double score;
551
552 /* If everything is out of range, return early. */
553 if (!zzlIsInRange(zl,&range)) return NULL;
554
555 while (eptr != NULL) {
556 sptr = ziplistNext(zl,eptr);
557 redisAssert(sptr != NULL);
558
559 score = zzlGetScore(sptr);
560 if (zslValueLteMax(score,&range))
561 return eptr;
562
563 /* Move to previous element by moving to the score of previous element.
564 * When this returns NULL, we know there also is no element. */
565 sptr = ziplistPrev(zl,eptr);
566 if (sptr != NULL)
567 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
568 else
569 eptr = NULL;
570 }
571
572 return NULL;
573 }
574
575 unsigned char *zzlFind(robj *zobj, robj *ele, double *score) {
576 unsigned char *zl = zobj->ptr;
577 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
578
579 ele = getDecodedObject(ele);
580 while (eptr != NULL) {
581 sptr = ziplistNext(zl,eptr);
582 redisAssert(sptr != NULL);
583
584 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
585 /* Matching element, pull out score. */
586 if (score != NULL) *score = zzlGetScore(sptr);
587 decrRefCount(ele);
588 return eptr;
589 }
590
591 /* Move to next element. */
592 eptr = ziplistNext(zl,sptr);
593 }
594
595 decrRefCount(ele);
596 return NULL;
597 }
598
599 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
600 * don't want to modify the one given as argument. */
601 int zzlDelete(robj *zobj, unsigned char *eptr) {
602 unsigned char *zl = zobj->ptr;
603 unsigned char *p = eptr;
604
605 /* TODO: add function to ziplist API to delete N elements from offset. */
606 zl = ziplistDelete(zl,&p);
607 zl = ziplistDelete(zl,&p);
608 zobj->ptr = zl;
609 return REDIS_OK;
610 }
611
612 int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) {
613 unsigned char *zl = zobj->ptr;
614 unsigned char *sptr;
615 char scorebuf[128];
616 int scorelen;
617 int offset;
618
619 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
620 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
621 if (eptr == NULL) {
622 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
623 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
624 } else {
625 /* Keep offset relative to zl, as it might be re-allocated. */
626 offset = eptr-zl;
627 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
628 eptr = zl+offset;
629
630 /* Insert score after the element. */
631 redisAssert((sptr = ziplistNext(zl,eptr)) != NULL);
632 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
633 }
634
635 zobj->ptr = zl;
636 return REDIS_OK;
637 }
638
639 /* Insert (element,score) pair in ziplist. This function assumes the element is
640 * not yet present in the list. */
641 int zzlInsert(robj *zobj, robj *ele, double score) {
642 unsigned char *zl = zobj->ptr;
643 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
644 double s;
645
646 ele = getDecodedObject(ele);
647 while (eptr != NULL) {
648 sptr = ziplistNext(zl,eptr);
649 redisAssert(sptr != NULL);
650 s = zzlGetScore(sptr);
651
652 if (s > score) {
653 /* First element with score larger than score for element to be
654 * inserted. This means we should take its spot in the list to
655 * maintain ordering. */
656 zzlInsertAt(zobj,ele,score,eptr);
657 break;
658 } else if (s == score) {
659 /* Ensure lexicographical ordering for elements. */
660 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
661 zzlInsertAt(zobj,ele,score,eptr);
662 break;
663 }
664 }
665
666 /* Move to next element. */
667 eptr = ziplistNext(zl,sptr);
668 }
669
670 /* Push on tail of list when it was not yet inserted. */
671 if (eptr == NULL)
672 zzlInsertAt(zobj,ele,score,NULL);
673
674 decrRefCount(ele);
675 return REDIS_OK;
676 }
677
678 unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) {
679 unsigned char *zl = zobj->ptr;
680 unsigned char *eptr, *sptr;
681 double score;
682 unsigned long deleted = 0;
683
684 eptr = zzlFirstInRange(zobj,range);
685 if (eptr == NULL) return deleted;
686
687
688 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
689 * byte and ziplistNext will return NULL. */
690 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
691 score = zzlGetScore(sptr);
692 if (zslValueLteMax(score,&range)) {
693 /* Delete both the element and the score. */
694 zl = ziplistDelete(zl,&eptr);
695 zl = ziplistDelete(zl,&eptr);
696 deleted++;
697 } else {
698 /* No longer in range. */
699 break;
700 }
701 }
702
703 return deleted;
704 }
705
706 /* Delete all the elements with rank between start and end from the skiplist.
707 * Start and end are inclusive. Note that start and end need to be 1-based */
708 unsigned long zzlDeleteRangeByRank(robj *zobj, unsigned int start, unsigned int end) {
709 unsigned int num = (end-start)+1;
710 zobj->ptr = ziplistDeleteRange(zobj->ptr,2*(start-1),2*num);
711 return num;
712 }
713
714 /*-----------------------------------------------------------------------------
715 * Common sorted set API
716 *----------------------------------------------------------------------------*/
717
718 int zsLength(robj *zobj) {
719 int length = -1;
720 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
721 length = zzlLength(zobj->ptr);
722 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
723 length = ((zset*)zobj->ptr)->zsl->length;
724 } else {
725 redisPanic("Unknown sorted set encoding");
726 }
727 return length;
728 }
729
730 void zsConvert(robj *zobj, int encoding) {
731 zset *zs;
732 zskiplistNode *node, *next;
733 robj *ele;
734 double score;
735
736 if (zobj->encoding == encoding) return;
737 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
738 unsigned char *zl = zobj->ptr;
739 unsigned char *eptr, *sptr;
740 unsigned char *vstr;
741 unsigned int vlen;
742 long long vlong;
743
744 if (encoding != REDIS_ENCODING_RAW)
745 redisPanic("Unknown target encoding");
746
747 zs = zmalloc(sizeof(*zs));
748 zs->dict = dictCreate(&zsetDictType,NULL);
749 zs->zsl = zslCreate();
750
751 eptr = ziplistIndex(zl,0);
752 redisAssert(eptr != NULL);
753 sptr = ziplistNext(zl,eptr);
754 redisAssert(sptr != NULL);
755
756 while (eptr != NULL) {
757 score = zzlGetScore(sptr);
758 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
759 if (vstr == NULL)
760 ele = createStringObjectFromLongLong(vlong);
761 else
762 ele = createStringObject((char*)vstr,vlen);
763
764 /* Has incremented refcount since it was just created. */
765 node = zslInsert(zs->zsl,score,ele);
766 redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
767 incrRefCount(ele); /* Added to dictionary. */
768 zzlNext(zl,&eptr,&sptr);
769 }
770
771 zfree(zobj->ptr);
772 zobj->ptr = zs;
773 zobj->encoding = REDIS_ENCODING_RAW;
774 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
775 unsigned char *zl = ziplistNew();
776
777 if (encoding != REDIS_ENCODING_ZIPLIST)
778 redisPanic("Unknown target encoding");
779
780 /* Approach similar to zslFree(), since we want to free the skiplist at
781 * the same time as creating the ziplist. */
782 zs = zobj->ptr;
783 dictRelease(zs->dict);
784 node = zs->zsl->header->level[0].forward;
785 zfree(zs->zsl->header);
786 zfree(zs->zsl);
787
788 /* Immediately store pointer to ziplist in object because it will
789 * change because of reallocations when pushing to the ziplist. */
790 zobj->ptr = zl;
791
792 while (node) {
793 ele = getDecodedObject(node->obj);
794 redisAssert(zzlInsertAt(zobj,ele,node->score,NULL) == REDIS_OK);
795 decrRefCount(ele);
796
797 next = node->level[0].forward;
798 zslFreeNode(node);
799 node = next;
800 }
801
802 zfree(zs);
803 zobj->encoding = REDIS_ENCODING_ZIPLIST;
804 } else {
805 redisPanic("Unknown sorted set encoding");
806 }
807 }
808
809 /*-----------------------------------------------------------------------------
810 * Sorted set commands
811 *----------------------------------------------------------------------------*/
812
813 // if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
814 // } else if (zobj->encoding == REDIS_ENCODING_RAW) {
815 // } else {
816 // redisPanic("Unknown sorted set encoding");
817 // }
818
819 /* This generic command implements both ZADD and ZINCRBY. */
820 void zaddGenericCommand(redisClient *c, int incr) {
821 static char *nanerr = "resulting score is not a number (NaN)";
822 robj *key = c->argv[1];
823 robj *ele;
824 robj *zobj;
825 robj *curobj;
826 double score, curscore = 0.0;
827
828 if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
829 return;
830
831 zobj = lookupKeyWrite(c->db,key);
832 if (zobj == NULL) {
833 if (server.zset_max_ziplist_entries == 0 ||
834 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
835 {
836 zobj = createZsetObject();
837 } else {
838 zobj = createZsetZiplistObject();
839 }
840 dbAdd(c->db,key,zobj);
841 } else {
842 if (zobj->type != REDIS_ZSET) {
843 addReply(c,shared.wrongtypeerr);
844 return;
845 }
846 }
847
848 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
849 unsigned char *eptr;
850
851 /* Prefer non-encoded element when dealing with ziplists. */
852 ele = c->argv[3];
853 if ((eptr = zzlFind(zobj,ele,&curscore)) != NULL) {
854 if (incr) {
855 score += curscore;
856 if (isnan(score)) {
857 addReplyError(c,nanerr);
858 /* Don't need to check if the sorted set is empty, because
859 * we know it has at least one element. */
860 return;
861 }
862 }
863
864 /* Remove and re-insert when score changed. */
865 if (score != curscore) {
866 redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
867 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
868
869 signalModifiedKey(c->db,key);
870 server.dirty++;
871 }
872
873 if (incr) /* ZINCRBY */
874 addReplyDouble(c,score);
875 else /* ZADD */
876 addReply(c,shared.czero);
877 } else {
878 /* Optimize: check if the element is too large or the list becomes
879 * too long *before* executing zzlInsert. */
880 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
881 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
882 zsConvert(zobj,REDIS_ENCODING_RAW);
883 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
884 zsConvert(zobj,REDIS_ENCODING_RAW);
885
886 signalModifiedKey(c->db,key);
887 server.dirty++;
888
889 if (incr) /* ZINCRBY */
890 addReplyDouble(c,score);
891 else /* ZADD */
892 addReply(c,shared.cone);
893 }
894 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
895 zset *zs = zobj->ptr;
896 zskiplistNode *znode;
897 dictEntry *de;
898
899 ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
900 de = dictFind(zs->dict,ele);
901 if (de != NULL) {
902 curobj = dictGetEntryKey(de);
903 curscore = *(double*)dictGetEntryVal(de);
904
905 if (incr) {
906 score += curscore;
907 if (isnan(score)) {
908 addReplyError(c,nanerr);
909 /* Don't need to check if the sorted set is empty, because
910 * we know it has at least one element. */
911 return;
912 }
913 }
914
915 /* Remove and re-insert when score changed. We can safely delete
916 * the key object from the skiplist, since the dictionary still has
917 * a reference to it. */
918 if (score != curscore) {
919 redisAssert(zslDelete(zs->zsl,curscore,curobj));
920 znode = zslInsert(zs->zsl,score,curobj);
921 incrRefCount(curobj); /* Re-inserted in skiplist. */
922 dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
923
924 signalModifiedKey(c->db,key);
925 server.dirty++;
926 }
927
928 if (incr) /* ZINCRBY */
929 addReplyDouble(c,score);
930 else /* ZADD */
931 addReply(c,shared.czero);
932 } else {
933 znode = zslInsert(zs->zsl,score,ele);
934 incrRefCount(ele); /* Inserted in skiplist. */
935 redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
936 incrRefCount(ele); /* Added to dictionary. */
937
938 signalModifiedKey(c->db,key);
939 server.dirty++;
940
941 if (incr) /* ZINCRBY */
942 addReplyDouble(c,score);
943 else /* ZADD */
944 addReply(c,shared.cone);
945 }
946 } else {
947 redisPanic("Unknown sorted set encoding");
948 }
949 }
950
951 void zaddCommand(redisClient *c) {
952 zaddGenericCommand(c,0);
953 }
954
955 void zincrbyCommand(redisClient *c) {
956 zaddGenericCommand(c,1);
957 }
958
959 void zremCommand(redisClient *c) {
960 robj *key = c->argv[1];
961 robj *ele = c->argv[2];
962 robj *zobj;
963
964 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
965 checkType(c,zobj,REDIS_ZSET)) return;
966
967 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
968 unsigned char *eptr;
969
970 if ((eptr = zzlFind(zobj,ele,NULL)) != NULL) {
971 redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
972 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
973 } else {
974 addReply(c,shared.czero);
975 return;
976 }
977 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
978 zset *zs = zobj->ptr;
979 dictEntry *de;
980 double score;
981
982 de = dictFind(zs->dict,ele);
983 if (de != NULL) {
984 /* Delete from the skiplist */
985 score = *(double*)dictGetEntryVal(de);
986 redisAssert(zslDelete(zs->zsl,score,ele));
987
988 /* Delete from the hash table */
989 dictDelete(zs->dict,ele);
990 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
991 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
992 } else {
993 addReply(c,shared.czero);
994 return;
995 }
996 } else {
997 redisPanic("Unknown sorted set encoding");
998 }
999
1000 signalModifiedKey(c->db,key);
1001 server.dirty++;
1002 addReply(c,shared.cone);
1003 }
1004
1005 void zremrangebyscoreCommand(redisClient *c) {
1006 robj *key = c->argv[1];
1007 robj *zobj;
1008 zrangespec range;
1009 unsigned long deleted;
1010
1011 /* Parse the range arguments. */
1012 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1013 addReplyError(c,"min or max is not a double");
1014 return;
1015 }
1016
1017 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1018 checkType(c,zobj,REDIS_ZSET)) return;
1019
1020 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1021 deleted = zzlDeleteRangeByScore(zobj,range);
1022 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1023 zset *zs = zobj->ptr;
1024 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1025 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1026 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1027 } else {
1028 redisPanic("Unknown sorted set encoding");
1029 }
1030
1031 if (deleted) signalModifiedKey(c->db,key);
1032 server.dirty += deleted;
1033 addReplyLongLong(c,deleted);
1034 }
1035
1036 void zremrangebyrankCommand(redisClient *c) {
1037 robj *key = c->argv[1];
1038 robj *zobj;
1039 long start;
1040 long end;
1041 int llen;
1042 unsigned long deleted;
1043
1044 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1045 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1046
1047 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1048 checkType(c,zobj,REDIS_ZSET)) return;
1049
1050 /* Sanitize indexes. */
1051 llen = zsLength(zobj);
1052 if (start < 0) start = llen+start;
1053 if (end < 0) end = llen+end;
1054 if (start < 0) start = 0;
1055
1056 /* Invariant: start >= 0, so this test will be true when end < 0.
1057 * The range is empty when start > end or start >= length. */
1058 if (start > end || start >= llen) {
1059 addReply(c,shared.czero);
1060 return;
1061 }
1062 if (end >= llen) end = llen-1;
1063
1064 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1065 /* Correct for 1-based rank. */
1066 deleted = zzlDeleteRangeByRank(zobj,start+1,end+1);
1067 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1068 zset *zs = zobj->ptr;
1069
1070 /* Correct for 1-based rank. */
1071 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1072 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1073 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1074 } else {
1075 redisPanic("Unknown sorted set encoding");
1076 }
1077
1078 if (deleted) signalModifiedKey(c->db,key);
1079 server.dirty += deleted;
1080 addReplyLongLong(c,deleted);
1081 }
1082
1083 typedef struct {
1084 robj *subject;
1085 int type; /* Set, sorted set */
1086 int encoding;
1087 double weight;
1088
1089 union {
1090 /* Set iterators. */
1091 union _iterset {
1092 struct {
1093 intset *is;
1094 int ii;
1095 } is;
1096 struct {
1097 dict *dict;
1098 dictIterator *di;
1099 dictEntry *de;
1100 } ht;
1101 } set;
1102
1103 /* Sorted set iterators. */
1104 union _iterzset {
1105 struct {
1106 unsigned char *zl;
1107 unsigned char *eptr, *sptr;
1108 } zl;
1109 struct {
1110 zset *zs;
1111 zskiplistNode *node;
1112 } sl;
1113 } zset;
1114 } iter;
1115 } zsetopsrc;
1116
1117
1118 /* Use dirty flags for pointers that need to be cleaned up in the next
1119 * iteration over the zsetopval. The dirty flag for the long long value is
1120 * special, since long long values don't need cleanup. Instead, it means that
1121 * we already checked that "ell" holds a long long, or tried to convert another
1122 * representation into a long long value. When this was successful,
1123 * OPVAL_VALID_LL is set as well. */
1124 #define OPVAL_DIRTY_ROBJ 1
1125 #define OPVAL_DIRTY_LL 2
1126 #define OPVAL_VALID_LL 4
1127
1128 /* Store value retrieved from the iterator. */
1129 typedef struct {
1130 int flags;
1131 unsigned char _buf[32]; /* Private buffer. */
1132 robj *ele;
1133 unsigned char *estr;
1134 unsigned int elen;
1135 long long ell;
1136 double score;
1137 } zsetopval;
1138
1139 typedef union _iterset iterset;
1140 typedef union _iterzset iterzset;
1141
1142 void zuiInitIterator(zsetopsrc *op) {
1143 if (op->subject == NULL)
1144 return;
1145
1146 if (op->type == REDIS_SET) {
1147 iterset *it = &op->iter.set;
1148 if (op->encoding == REDIS_ENCODING_INTSET) {
1149 it->is.is = op->subject->ptr;
1150 it->is.ii = 0;
1151 } else if (op->encoding == REDIS_ENCODING_HT) {
1152 it->ht.dict = op->subject->ptr;
1153 it->ht.di = dictGetIterator(op->subject->ptr);
1154 it->ht.de = dictNext(it->ht.di);
1155 } else {
1156 redisPanic("Unknown set encoding");
1157 }
1158 } else if (op->type == REDIS_ZSET) {
1159 iterzset *it = &op->iter.zset;
1160 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1161 it->zl.zl = op->subject->ptr;
1162 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1163 if (it->zl.eptr != NULL) {
1164 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1165 redisAssert(it->zl.sptr != NULL);
1166 }
1167 } else if (op->encoding == REDIS_ENCODING_RAW) {
1168 it->sl.zs = op->subject->ptr;
1169 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1170 } else {
1171 redisPanic("Unknown sorted set encoding");
1172 }
1173 } else {
1174 redisPanic("Unsupported type");
1175 }
1176 }
1177
1178 void zuiClearIterator(zsetopsrc *op) {
1179 if (op->subject == NULL)
1180 return;
1181
1182 if (op->type == REDIS_SET) {
1183 iterset *it = &op->iter.set;
1184 if (op->encoding == REDIS_ENCODING_INTSET) {
1185 REDIS_NOTUSED(it); /* skip */
1186 } else if (op->encoding == REDIS_ENCODING_HT) {
1187 dictReleaseIterator(it->ht.di);
1188 } else {
1189 redisPanic("Unknown set encoding");
1190 }
1191 } else if (op->type == REDIS_ZSET) {
1192 iterzset *it = &op->iter.zset;
1193 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1194 REDIS_NOTUSED(it); /* skip */
1195 } else if (op->encoding == REDIS_ENCODING_RAW) {
1196 REDIS_NOTUSED(it); /* skip */
1197 } else {
1198 redisPanic("Unknown sorted set encoding");
1199 }
1200 } else {
1201 redisPanic("Unsupported type");
1202 }
1203 }
1204
1205 int zuiLength(zsetopsrc *op) {
1206 if (op->subject == NULL)
1207 return 0;
1208
1209 if (op->type == REDIS_SET) {
1210 iterset *it = &op->iter.set;
1211 if (op->encoding == REDIS_ENCODING_INTSET) {
1212 return intsetLen(it->is.is);
1213 } else if (op->encoding == REDIS_ENCODING_HT) {
1214 return dictSize(it->ht.dict);
1215 } else {
1216 redisPanic("Unknown set encoding");
1217 }
1218 } else if (op->type == REDIS_ZSET) {
1219 iterzset *it = &op->iter.zset;
1220 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1221 return zzlLength(it->zl.zl);
1222 } else if (op->encoding == REDIS_ENCODING_RAW) {
1223 return it->sl.zs->zsl->length;
1224 } else {
1225 redisPanic("Unknown sorted set encoding");
1226 }
1227 } else {
1228 redisPanic("Unsupported type");
1229 }
1230 }
1231
1232 /* Check if the current value is valid. If so, store it in the passed structure
1233 * and move to the next element. If not valid, this means we have reached the
1234 * end of the structure and can abort. */
1235 int zuiNext(zsetopsrc *op, zsetopval *val) {
1236 if (op->subject == NULL)
1237 return 0;
1238
1239 if (val->flags & OPVAL_DIRTY_ROBJ)
1240 decrRefCount(val->ele);
1241
1242 bzero(val,sizeof(zsetopval));
1243
1244 if (op->type == REDIS_SET) {
1245 iterset *it = &op->iter.set;
1246 if (op->encoding == REDIS_ENCODING_INTSET) {
1247 if (!intsetGet(it->is.is,it->is.ii,&val->ell))
1248 return 0;
1249 val->score = 1.0;
1250
1251 /* Move to next element. */
1252 it->is.ii++;
1253 } else if (op->encoding == REDIS_ENCODING_HT) {
1254 if (it->ht.de == NULL)
1255 return 0;
1256 val->ele = dictGetEntryKey(it->ht.de);
1257 val->score = 1.0;
1258
1259 /* Move to next element. */
1260 it->ht.de = dictNext(it->ht.di);
1261 } else {
1262 redisPanic("Unknown set encoding");
1263 }
1264 } else if (op->type == REDIS_ZSET) {
1265 iterzset *it = &op->iter.zset;
1266 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1267 /* No need to check both, but better be explicit. */
1268 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1269 return 0;
1270 redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1271 val->score = zzlGetScore(it->zl.sptr);
1272
1273 /* Move to next element. */
1274 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
1275 } else if (op->encoding == REDIS_ENCODING_RAW) {
1276 if (it->sl.node == NULL)
1277 return 0;
1278 val->ele = it->sl.node->obj;
1279 val->score = it->sl.node->score;
1280
1281 /* Move to next element. */
1282 it->sl.node = it->sl.node->level[0].forward;
1283 } else {
1284 redisPanic("Unknown sorted set encoding");
1285 }
1286 } else {
1287 redisPanic("Unsupported type");
1288 }
1289 return 1;
1290 }
1291
1292 int zuiLongLongFromValue(zsetopval *val) {
1293 if (!(val->flags & OPVAL_DIRTY_LL)) {
1294 val->flags |= OPVAL_DIRTY_LL;
1295
1296 if (val->ele != NULL) {
1297 if (val->ele->encoding == REDIS_ENCODING_INT) {
1298 val->ell = (long)val->ele->ptr;
1299 val->flags |= OPVAL_VALID_LL;
1300 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1301 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1302 val->flags |= OPVAL_VALID_LL;
1303 } else {
1304 redisPanic("Unsupported element encoding");
1305 }
1306 } else if (val->estr != NULL) {
1307 if (string2ll((char*)val->estr,val->elen,&val->ell))
1308 val->flags |= OPVAL_VALID_LL;
1309 } else {
1310 /* The long long was already set, flag as valid. */
1311 val->flags |= OPVAL_VALID_LL;
1312 }
1313 }
1314 return val->flags & OPVAL_VALID_LL;
1315 }
1316
1317 robj *zuiObjectFromValue(zsetopval *val) {
1318 if (val->ele == NULL) {
1319 if (val->estr != NULL) {
1320 val->ele = createStringObject((char*)val->estr,val->elen);
1321 } else {
1322 val->ele = createStringObjectFromLongLong(val->ell);
1323 }
1324 val->flags |= OPVAL_DIRTY_ROBJ;
1325 }
1326 return val->ele;
1327 }
1328
1329 int zuiBufferFromValue(zsetopval *val) {
1330 if (val->estr == NULL) {
1331 if (val->ele != NULL) {
1332 if (val->ele->encoding == REDIS_ENCODING_INT) {
1333 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1334 val->estr = val->_buf;
1335 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1336 val->elen = sdslen(val->ele->ptr);
1337 val->estr = val->ele->ptr;
1338 } else {
1339 redisPanic("Unsupported element encoding");
1340 }
1341 } else {
1342 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1343 val->estr = val->_buf;
1344 }
1345 }
1346 return 1;
1347 }
1348
1349 /* Find value pointed to by val in the source pointer to by op. When found,
1350 * return 1 and store its score in target. Return 0 otherwise. */
1351 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1352 if (op->subject == NULL)
1353 return 0;
1354
1355 if (op->type == REDIS_SET) {
1356 iterset *it = &op->iter.set;
1357
1358 if (op->encoding == REDIS_ENCODING_INTSET) {
1359 if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1360 *score = 1.0;
1361 return 1;
1362 } else {
1363 return 0;
1364 }
1365 } else if (op->encoding == REDIS_ENCODING_HT) {
1366 zuiObjectFromValue(val);
1367 if (dictFind(it->ht.dict,val->ele) != NULL) {
1368 *score = 1.0;
1369 return 1;
1370 } else {
1371 return 0;
1372 }
1373 } else {
1374 redisPanic("Unknown set encoding");
1375 }
1376 } else if (op->type == REDIS_ZSET) {
1377 iterzset *it = &op->iter.zset;
1378 zuiObjectFromValue(val);
1379
1380 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1381 if (zzlFind(op->subject,val->ele,score) != NULL) {
1382 /* Score is already set by zzlFind. */
1383 return 1;
1384 } else {
1385 return 0;
1386 }
1387 } else if (op->encoding == REDIS_ENCODING_RAW) {
1388 dictEntry *de;
1389 if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
1390 *score = *(double*)dictGetEntryVal(de);
1391 return 1;
1392 } else {
1393 return 0;
1394 }
1395 } else {
1396 redisPanic("Unknown sorted set encoding");
1397 }
1398 } else {
1399 redisPanic("Unsupported type");
1400 }
1401 }
1402
1403 int zuiCompareByCardinality(const void *s1, const void *s2) {
1404 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
1405 }
1406
1407 #define REDIS_AGGR_SUM 1
1408 #define REDIS_AGGR_MIN 2
1409 #define REDIS_AGGR_MAX 3
1410 #define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
1411
1412 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1413 if (aggregate == REDIS_AGGR_SUM) {
1414 *target = *target + val;
1415 /* The result of adding two doubles is NaN when one variable
1416 * is +inf and the other is -inf. When these numbers are added,
1417 * we maintain the convention of the result being 0.0. */
1418 if (isnan(*target)) *target = 0.0;
1419 } else if (aggregate == REDIS_AGGR_MIN) {
1420 *target = val < *target ? val : *target;
1421 } else if (aggregate == REDIS_AGGR_MAX) {
1422 *target = val > *target ? val : *target;
1423 } else {
1424 /* safety net */
1425 redisPanic("Unknown ZUNION/INTER aggregate type");
1426 }
1427 }
1428
1429 void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1430 int i, j, setnum;
1431 int aggregate = REDIS_AGGR_SUM;
1432 zsetopsrc *src;
1433 zsetopval zval;
1434 robj *tmp;
1435 robj *dstobj;
1436 zset *dstzset;
1437 zskiplistNode *znode;
1438 int touched = 0;
1439
1440 /* expect setnum input keys to be given */
1441 setnum = atoi(c->argv[2]->ptr);
1442 if (setnum < 1) {
1443 addReplyError(c,
1444 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1445 return;
1446 }
1447
1448 /* test if the expected number of keys would overflow */
1449 if (3+setnum > c->argc) {
1450 addReply(c,shared.syntaxerr);
1451 return;
1452 }
1453
1454 /* read keys to be used for input */
1455 src = zcalloc(sizeof(zsetopsrc) * setnum);
1456 for (i = 0, j = 3; i < setnum; i++, j++) {
1457 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1458 if (obj != NULL) {
1459 if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
1460 zfree(src);
1461 addReply(c,shared.wrongtypeerr);
1462 return;
1463 }
1464
1465 src[i].subject = obj;
1466 src[i].type = obj->type;
1467 src[i].encoding = obj->encoding;
1468 } else {
1469 src[i].subject = NULL;
1470 }
1471
1472 /* Default all weights to 1. */
1473 src[i].weight = 1.0;
1474 }
1475
1476 /* parse optional extra arguments */
1477 if (j < c->argc) {
1478 int remaining = c->argc - j;
1479
1480 while (remaining) {
1481 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1482 j++; remaining--;
1483 for (i = 0; i < setnum; i++, j++, remaining--) {
1484 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1485 "weight value is not a double") != REDIS_OK)
1486 {
1487 zfree(src);
1488 return;
1489 }
1490 }
1491 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1492 j++; remaining--;
1493 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1494 aggregate = REDIS_AGGR_SUM;
1495 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1496 aggregate = REDIS_AGGR_MIN;
1497 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1498 aggregate = REDIS_AGGR_MAX;
1499 } else {
1500 zfree(src);
1501 addReply(c,shared.syntaxerr);
1502 return;
1503 }
1504 j++; remaining--;
1505 } else {
1506 zfree(src);
1507 addReply(c,shared.syntaxerr);
1508 return;
1509 }
1510 }
1511 }
1512
1513 for (i = 0; i < setnum; i++)
1514 zuiInitIterator(&src[i]);
1515
1516 /* sort sets from the smallest to largest, this will improve our
1517 * algorithm's performance */
1518 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
1519
1520 dstobj = createZsetObject();
1521 dstzset = dstobj->ptr;
1522
1523 if (op == REDIS_OP_INTER) {
1524 /* Skip everything if the smallest input is empty. */
1525 if (zuiLength(&src[0]) > 0) {
1526 /* Precondition: as src[0] is non-empty and the inputs are ordered
1527 * by size, all src[i > 0] are non-empty too. */
1528 while (zuiNext(&src[0],&zval)) {
1529 double score, value;
1530
1531 score = src[0].weight * zval.score;
1532 for (j = 1; j < setnum; j++) {
1533 if (zuiFind(&src[j],&zval,&value)) {
1534 value *= src[j].weight;
1535 zunionInterAggregate(&score,value,aggregate);
1536 } else {
1537 break;
1538 }
1539 }
1540
1541 /* Only continue when present in every input. */
1542 if (j == setnum) {
1543 tmp = zuiObjectFromValue(&zval);
1544 znode = zslInsert(dstzset->zsl,score,tmp);
1545 incrRefCount(tmp); /* added to skiplist */
1546 dictAdd(dstzset->dict,tmp,&znode->score);
1547 incrRefCount(tmp); /* added to dictionary */
1548 }
1549 }
1550 }
1551 } else if (op == REDIS_OP_UNION) {
1552 for (i = 0; i < setnum; i++) {
1553 if (zuiLength(&src[0]) == 0)
1554 continue;
1555
1556 while (zuiNext(&src[i],&zval)) {
1557 double score, value;
1558
1559 /* Skip key when already processed */
1560 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
1561 continue;
1562
1563 /* Initialize score */
1564 score = src[i].weight * zval.score;
1565
1566 /* Because the inputs are sorted by size, it's only possible
1567 * for sets at larger indices to hold this element. */
1568 for (j = (i+1); j < setnum; j++) {
1569 if (zuiFind(&src[j],&zval,&value)) {
1570 value *= src[j].weight;
1571 zunionInterAggregate(&score,value,aggregate);
1572 }
1573 }
1574
1575 tmp = zuiObjectFromValue(&zval);
1576 znode = zslInsert(dstzset->zsl,score,tmp);
1577 incrRefCount(zval.ele); /* added to skiplist */
1578 dictAdd(dstzset->dict,tmp,&znode->score);
1579 incrRefCount(zval.ele); /* added to dictionary */
1580 }
1581 }
1582 } else {
1583 redisPanic("Unknown operator");
1584 }
1585
1586 for (i = 0; i < setnum; i++)
1587 zuiClearIterator(&src[i]);
1588
1589 if (dbDelete(c->db,dstkey)) {
1590 signalModifiedKey(c->db,dstkey);
1591 touched = 1;
1592 server.dirty++;
1593 }
1594 if (dstzset->zsl->length) {
1595 dbAdd(c->db,dstkey,dstobj);
1596 addReplyLongLong(c, dstzset->zsl->length);
1597 if (!touched) signalModifiedKey(c->db,dstkey);
1598 server.dirty++;
1599 } else {
1600 decrRefCount(dstobj);
1601 addReply(c, shared.czero);
1602 }
1603 zfree(src);
1604 }
1605
1606 void zunionstoreCommand(redisClient *c) {
1607 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1608 }
1609
1610 void zinterstoreCommand(redisClient *c) {
1611 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1612 }
1613
1614 void zrangeGenericCommand(redisClient *c, int reverse) {
1615 robj *key = c->argv[1];
1616 robj *zobj;
1617 int withscores = 0;
1618 long start;
1619 long end;
1620 int llen;
1621 int rangelen;
1622
1623 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1624 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1625
1626 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1627 withscores = 1;
1628 } else if (c->argc >= 5) {
1629 addReply(c,shared.syntaxerr);
1630 return;
1631 }
1632
1633 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1634 || checkType(c,zobj,REDIS_ZSET)) return;
1635
1636 /* Sanitize indexes. */
1637 llen = zsLength(zobj);
1638 if (start < 0) start = llen+start;
1639 if (end < 0) end = llen+end;
1640 if (start < 0) start = 0;
1641
1642 /* Invariant: start >= 0, so this test will be true when end < 0.
1643 * The range is empty when start > end or start >= length. */
1644 if (start > end || start >= llen) {
1645 addReply(c,shared.emptymultibulk);
1646 return;
1647 }
1648 if (end >= llen) end = llen-1;
1649 rangelen = (end-start)+1;
1650
1651 /* Return the result in form of a multi-bulk reply */
1652 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1653
1654 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1655 unsigned char *zl = zobj->ptr;
1656 unsigned char *eptr, *sptr;
1657 unsigned char *vstr;
1658 unsigned int vlen;
1659 long long vlong;
1660
1661 if (reverse)
1662 eptr = ziplistIndex(zl,-2-(2*start));
1663 else
1664 eptr = ziplistIndex(zl,2*start);
1665
1666 redisAssert(eptr != NULL);
1667 sptr = ziplistNext(zl,eptr);
1668
1669 while (rangelen--) {
1670 redisAssert(eptr != NULL && sptr != NULL);
1671 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1672 if (vstr == NULL)
1673 addReplyBulkLongLong(c,vlong);
1674 else
1675 addReplyBulkCBuffer(c,vstr,vlen);
1676
1677 if (withscores)
1678 addReplyDouble(c,zzlGetScore(sptr));
1679
1680 if (reverse)
1681 zzlPrev(zl,&eptr,&sptr);
1682 else
1683 zzlNext(zl,&eptr,&sptr);
1684 }
1685
1686 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1687 zset *zs = zobj->ptr;
1688 zskiplist *zsl = zs->zsl;
1689 zskiplistNode *ln;
1690 robj *ele;
1691
1692 /* Check if starting point is trivial, before doing log(N) lookup. */
1693 if (reverse) {
1694 ln = zsl->tail;
1695 if (start > 0)
1696 ln = zslGetElementByRank(zsl,llen-start);
1697 } else {
1698 ln = zsl->header->level[0].forward;
1699 if (start > 0)
1700 ln = zslGetElementByRank(zsl,start+1);
1701 }
1702
1703 while(rangelen--) {
1704 redisAssert(ln != NULL);
1705 ele = ln->obj;
1706 addReplyBulk(c,ele);
1707 if (withscores)
1708 addReplyDouble(c,ln->score);
1709 ln = reverse ? ln->backward : ln->level[0].forward;
1710 }
1711 } else {
1712 redisPanic("Unknown sorted set encoding");
1713 }
1714 }
1715
1716 void zrangeCommand(redisClient *c) {
1717 zrangeGenericCommand(c,0);
1718 }
1719
1720 void zrevrangeCommand(redisClient *c) {
1721 zrangeGenericCommand(c,1);
1722 }
1723
1724 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
1725 * If "justcount", only the number of elements in the range is returned. */
1726 void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
1727 zrangespec range;
1728 robj *key = c->argv[1];
1729 robj *emptyreply, *zobj;
1730 int offset = 0, limit = -1;
1731 int withscores = 0;
1732 unsigned long rangelen = 0;
1733 void *replylen = NULL;
1734 int minidx, maxidx;
1735
1736 /* Parse the range arguments. */
1737 if (reverse) {
1738 /* Range is given as [max,min] */
1739 maxidx = 2; minidx = 3;
1740 } else {
1741 /* Range is given as [min,max] */
1742 minidx = 2; maxidx = 3;
1743 }
1744
1745 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
1746 addReplyError(c,"min or max is not a double");
1747 return;
1748 }
1749
1750 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1751 * 4 arguments, so we'll never enter the following code path. */
1752 if (c->argc > 4) {
1753 int remaining = c->argc - 4;
1754 int pos = 4;
1755
1756 while (remaining) {
1757 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1758 pos++; remaining--;
1759 withscores = 1;
1760 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1761 offset = atoi(c->argv[pos+1]->ptr);
1762 limit = atoi(c->argv[pos+2]->ptr);
1763 pos += 3; remaining -= 3;
1764 } else {
1765 addReply(c,shared.syntaxerr);
1766 return;
1767 }
1768 }
1769 }
1770
1771 /* Ok, lookup the key and get the range */
1772 emptyreply = justcount ? shared.czero : shared.emptymultibulk;
1773 if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL ||
1774 checkType(c,zobj,REDIS_ZSET)) return;
1775
1776 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1777 unsigned char *zl = zobj->ptr;
1778 unsigned char *eptr, *sptr;
1779 unsigned char *vstr;
1780 unsigned int vlen;
1781 long long vlong;
1782 double score;
1783
1784 /* If reversed, get the last node in range as starting point. */
1785 if (reverse)
1786 eptr = zzlLastInRange(zobj,range);
1787 else
1788 eptr = zzlFirstInRange(zobj,range);
1789
1790 /* No "first" element in the specified interval. */
1791 if (eptr == NULL) {
1792 addReply(c,emptyreply);
1793 return;
1794 }
1795
1796 /* Get score pointer for the first element. */
1797 redisAssert(eptr != NULL);
1798 sptr = ziplistNext(zl,eptr);
1799
1800 /* We don't know in advance how many matching elements there are in the
1801 * list, so we push this object that will represent the multi-bulk
1802 * length in the output buffer, and will "fix" it later */
1803 if (!justcount)
1804 replylen = addDeferredMultiBulkLength(c);
1805
1806 /* If there is an offset, just traverse the number of elements without
1807 * checking the score because that is done in the next loop. */
1808 while (eptr && offset--)
1809 if (reverse)
1810 zzlPrev(zl,&eptr,&sptr);
1811 else
1812 zzlNext(zl,&eptr,&sptr);
1813
1814 while (eptr && limit--) {
1815 score = zzlGetScore(sptr);
1816
1817 /* Abort when the node is no longer in range. */
1818 if (reverse) {
1819 if (!zslValueGteMin(score,&range)) break;
1820 } else {
1821 if (!zslValueLteMax(score,&range)) break;
1822 }
1823
1824 /* Do our magic */
1825 rangelen++;
1826 if (!justcount) {
1827 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1828 if (vstr == NULL)
1829 addReplyBulkLongLong(c,vlong);
1830 else
1831 addReplyBulkCBuffer(c,vstr,vlen);
1832
1833 if (withscores)
1834 addReplyDouble(c,score);
1835 }
1836
1837 /* Move to next node */
1838 if (reverse)
1839 zzlPrev(zl,&eptr,&sptr);
1840 else
1841 zzlNext(zl,&eptr,&sptr);
1842 }
1843 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1844 zset *zs = zobj->ptr;
1845 zskiplist *zsl = zs->zsl;
1846 zskiplistNode *ln;
1847
1848 /* If reversed, get the last node in range as starting point. */
1849 if (reverse)
1850 ln = zslLastInRange(zsl,range);
1851 else
1852 ln = zslFirstInRange(zsl,range);
1853
1854 /* No "first" element in the specified interval. */
1855 if (ln == NULL) {
1856 addReply(c,emptyreply);
1857 return;
1858 }
1859
1860 /* We don't know in advance how many matching elements there are in the
1861 * list, so we push this object that will represent the multi-bulk
1862 * length in the output buffer, and will "fix" it later */
1863 if (!justcount)
1864 replylen = addDeferredMultiBulkLength(c);
1865
1866 /* If there is an offset, just traverse the number of elements without
1867 * checking the score because that is done in the next loop. */
1868 while (ln && offset--)
1869 ln = reverse ? ln->backward : ln->level[0].forward;
1870
1871 while (ln && limit--) {
1872 /* Abort when the node is no longer in range. */
1873 if (reverse) {
1874 if (!zslValueGteMin(ln->score,&range)) break;
1875 } else {
1876 if (!zslValueLteMax(ln->score,&range)) break;
1877 }
1878
1879 /* Do our magic */
1880 rangelen++;
1881 if (!justcount) {
1882 addReplyBulk(c,ln->obj);
1883 if (withscores)
1884 addReplyDouble(c,ln->score);
1885 }
1886
1887 /* Move to next node */
1888 ln = reverse ? ln->backward : ln->level[0].forward;
1889 }
1890 } else {
1891 redisPanic("Unknown sorted set encoding");
1892 }
1893
1894 if (justcount) {
1895 addReplyLongLong(c,(long)rangelen);
1896 } else {
1897 if (withscores) rangelen *= 2;
1898 setDeferredMultiBulkLength(c,replylen,rangelen);
1899 }
1900 }
1901
1902 void zrangebyscoreCommand(redisClient *c) {
1903 genericZrangebyscoreCommand(c,0,0);
1904 }
1905
1906 void zrevrangebyscoreCommand(redisClient *c) {
1907 genericZrangebyscoreCommand(c,1,0);
1908 }
1909
1910 void zcountCommand(redisClient *c) {
1911 genericZrangebyscoreCommand(c,0,1);
1912 }
1913
1914 void zcardCommand(redisClient *c) {
1915 robj *key = c->argv[1];
1916 robj *zobj;
1917
1918 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
1919 checkType(c,zobj,REDIS_ZSET)) return;
1920
1921 addReplyLongLong(c,zsLength(zobj));
1922 }
1923
1924 void zscoreCommand(redisClient *c) {
1925 robj *key = c->argv[1];
1926 robj *zobj;
1927 double score;
1928
1929 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1930 checkType(c,zobj,REDIS_ZSET)) return;
1931
1932 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1933 if (zzlFind(zobj,c->argv[2],&score) != NULL)
1934 addReplyDouble(c,score);
1935 else
1936 addReply(c,shared.nullbulk);
1937 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1938 zset *zs = zobj->ptr;
1939 dictEntry *de;
1940
1941 c->argv[2] = tryObjectEncoding(c->argv[2]);
1942 de = dictFind(zs->dict,c->argv[2]);
1943 if (de != NULL) {
1944 score = *(double*)dictGetEntryVal(de);
1945 addReplyDouble(c,score);
1946 } else {
1947 addReply(c,shared.nullbulk);
1948 }
1949 } else {
1950 redisPanic("Unknown sorted set encoding");
1951 }
1952 }
1953
1954 void zrankGenericCommand(redisClient *c, int reverse) {
1955 robj *key = c->argv[1];
1956 robj *ele = c->argv[2];
1957 robj *zobj;
1958 unsigned long llen;
1959 unsigned long rank;
1960
1961 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1962 checkType(c,zobj,REDIS_ZSET)) return;
1963 llen = zsLength(zobj);
1964
1965 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
1966 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1967 unsigned char *zl = zobj->ptr;
1968 unsigned char *eptr, *sptr;
1969
1970 eptr = ziplistIndex(zl,0);
1971 redisAssert(eptr != NULL);
1972 sptr = ziplistNext(zl,eptr);
1973 redisAssert(sptr != NULL);
1974
1975 rank = 1;
1976 while(eptr != NULL) {
1977 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
1978 break;
1979 rank++;
1980 zzlNext(zl,&eptr,&sptr);
1981 }
1982
1983 if (eptr != NULL) {
1984 if (reverse)
1985 addReplyLongLong(c,llen-rank);
1986 else
1987 addReplyLongLong(c,rank-1);
1988 } else {
1989 addReply(c,shared.nullbulk);
1990 }
1991 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1992 zset *zs = zobj->ptr;
1993 zskiplist *zsl = zs->zsl;
1994 dictEntry *de;
1995 double score;
1996
1997 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
1998 de = dictFind(zs->dict,ele);
1999 if (de != NULL) {
2000 score = *(double*)dictGetEntryVal(de);
2001 rank = zslGetRank(zsl,score,ele);
2002 redisAssert(rank); /* Existing elements always have a rank. */
2003 if (reverse)
2004 addReplyLongLong(c,llen-rank);
2005 else
2006 addReplyLongLong(c,rank-1);
2007 } else {
2008 addReply(c,shared.nullbulk);
2009 }
2010 } else {
2011 redisPanic("Unknown sorted set encoding");
2012 }
2013 }
2014
2015 void zrankCommand(redisClient *c) {
2016 zrankGenericCommand(c, 0);
2017 }
2018
2019 void zrevrankCommand(redisClient *c) {
2020 zrankGenericCommand(c, 1);
2021 }