]> git.saurik.com Git - redis.git/blob - src/t_zset.c
35d95ba750229fb8a34247552bcde8fe3cb29a01
[redis.git] / src / t_zset.c
1 #include "redis.h"
2
3 #include <math.h>
4
5 /*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31 }
32
33 zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48 }
49
50 void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
52 zfree(node);
53 }
54
55 void zslFree(zskiplist *zsl) {
56 zskiplistNode *node = zsl->header->level[0].forward, *next;
57
58 zfree(zsl->header);
59 while(node) {
60 next = node->level[0].forward;
61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65 }
66
67 int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72 }
73
74 zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
101 update[i]->level[i].span = zsl->length;
102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
109
110 /* update span covered by update[i] as x is inserted here */
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
117 update[i]->level[i].span++;
118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
123 else
124 zsl->tail = x;
125 zsl->length++;
126 return x;
127 }
128
129 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
136 } else {
137 update[i]->level[i].span -= 1;
138 }
139 }
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
142 } else {
143 zsl->tail = x->backward;
144 }
145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
146 zsl->level--;
147 zsl->length--;
148 }
149
150 /* Delete an element with matching score/object from the skiplist. */
151 int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
166 x = x->level[0].forward;
167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175 }
176
177 /* Struct to hold a inclusive/exclusive range spec. */
178 typedef struct {
179 double min, max;
180 int minex, maxex; /* are min or max exclusive? */
181 } zrangespec;
182
183 static int zslValueGteMin(double value, zrangespec *spec) {
184 return spec->minex ? (value > spec->min) : (value >= spec->min);
185 }
186
187 static int zslValueLteMax(double value, zrangespec *spec) {
188 return spec->maxex ? (value < spec->max) : (value <= spec->max);
189 }
190
191 static int zslValueInRange(double value, zrangespec *spec) {
192 return zslValueGteMin(value,spec) && zslValueLteMax(value,spec);
193 }
194
195 /* Returns if there is a part of the zset is in range. */
196 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
197 zskiplistNode *x;
198
199 /* Test for ranges that will always be empty. */
200 if (range->min > range->max ||
201 (range->min == range->max && (range->minex || range->maxex)))
202 return 0;
203 x = zsl->tail;
204 if (x == NULL || !zslValueGteMin(x->score,range))
205 return 0;
206 x = zsl->header->level[0].forward;
207 if (x == NULL || !zslValueLteMax(x->score,range))
208 return 0;
209 return 1;
210 }
211
212 /* Find the first node that is contained in the specified range.
213 * Returns NULL when no element is contained in the range. */
214 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
215 zskiplistNode *x;
216 int i;
217
218 /* If everything is out of range, return early. */
219 if (!zslIsInRange(zsl,&range)) return NULL;
220
221 x = zsl->header;
222 for (i = zsl->level-1; i >= 0; i--) {
223 /* Go forward while *OUT* of range. */
224 while (x->level[i].forward &&
225 !zslValueGteMin(x->level[i].forward->score,&range))
226 x = x->level[i].forward;
227 }
228
229 /* The tail is in range, so the previous block should always return a
230 * node that is non-NULL and the last one to be out of range. */
231 x = x->level[0].forward;
232 redisAssert(x != NULL && zslValueInRange(x->score,&range));
233 return x;
234 }
235
236 /* Find the last node that is contained in the specified range.
237 * Returns NULL when no element is contained in the range. */
238 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
239 zskiplistNode *x;
240 int i;
241
242 /* If everything is out of range, return early. */
243 if (!zslIsInRange(zsl,&range)) return NULL;
244
245 x = zsl->header;
246 for (i = zsl->level-1; i >= 0; i--) {
247 /* Go forward while *IN* range. */
248 while (x->level[i].forward &&
249 zslValueLteMax(x->level[i].forward->score,&range))
250 x = x->level[i].forward;
251 }
252
253 /* The header is in range, so the previous block should always return a
254 * node that is non-NULL and in range. */
255 redisAssert(x != NULL && zslValueInRange(x->score,&range));
256 return x;
257 }
258
259 /* Delete all the elements with score between min and max from the skiplist.
260 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
261 * Note that this function takes the reference to the hash table view of the
262 * sorted set, in order to remove the elements from the hash table too. */
263 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
264 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
265 unsigned long removed = 0;
266 int i;
267
268 x = zsl->header;
269 for (i = zsl->level-1; i >= 0; i--) {
270 while (x->level[i].forward && (range.minex ?
271 x->level[i].forward->score <= range.min :
272 x->level[i].forward->score < range.min))
273 x = x->level[i].forward;
274 update[i] = x;
275 }
276
277 /* Current node is the last with score < or <= min. */
278 x = x->level[0].forward;
279
280 /* Delete nodes while in range. */
281 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
282 zskiplistNode *next = x->level[0].forward;
283 zslDeleteNode(zsl,x,update);
284 dictDelete(dict,x->obj);
285 zslFreeNode(x);
286 removed++;
287 x = next;
288 }
289 return removed;
290 }
291
292 /* Delete all the elements with rank between start and end from the skiplist.
293 * Start and end are inclusive. Note that start and end need to be 1-based */
294 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
295 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
296 unsigned long traversed = 0, removed = 0;
297 int i;
298
299 x = zsl->header;
300 for (i = zsl->level-1; i >= 0; i--) {
301 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
302 traversed += x->level[i].span;
303 x = x->level[i].forward;
304 }
305 update[i] = x;
306 }
307
308 traversed++;
309 x = x->level[0].forward;
310 while (x && traversed <= end) {
311 zskiplistNode *next = x->level[0].forward;
312 zslDeleteNode(zsl,x,update);
313 dictDelete(dict,x->obj);
314 zslFreeNode(x);
315 removed++;
316 traversed++;
317 x = next;
318 }
319 return removed;
320 }
321
322 /* Find the rank for an element by both score and key.
323 * Returns 0 when the element cannot be found, rank otherwise.
324 * Note that the rank is 1-based due to the span of zsl->header to the
325 * first element. */
326 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
327 zskiplistNode *x;
328 unsigned long rank = 0;
329 int i;
330
331 x = zsl->header;
332 for (i = zsl->level-1; i >= 0; i--) {
333 while (x->level[i].forward &&
334 (x->level[i].forward->score < score ||
335 (x->level[i].forward->score == score &&
336 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
337 rank += x->level[i].span;
338 x = x->level[i].forward;
339 }
340
341 /* x might be equal to zsl->header, so test if obj is non-NULL */
342 if (x->obj && equalStringObjects(x->obj,o)) {
343 return rank;
344 }
345 }
346 return 0;
347 }
348
349 /* Finds an element by its rank. The rank argument needs to be 1-based. */
350 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
351 zskiplistNode *x;
352 unsigned long traversed = 0;
353 int i;
354
355 x = zsl->header;
356 for (i = zsl->level-1; i >= 0; i--) {
357 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
358 {
359 traversed += x->level[i].span;
360 x = x->level[i].forward;
361 }
362 if (traversed == rank) {
363 return x;
364 }
365 }
366 return NULL;
367 }
368
369 /* Populate the rangespec according to the objects min and max. */
370 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
371 char *eptr;
372 spec->minex = spec->maxex = 0;
373
374 /* Parse the min-max interval. If one of the values is prefixed
375 * by the "(" character, it's considered "open". For instance
376 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
377 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
378 if (min->encoding == REDIS_ENCODING_INT) {
379 spec->min = (long)min->ptr;
380 } else {
381 if (((char*)min->ptr)[0] == '(') {
382 spec->min = strtod((char*)min->ptr+1,&eptr);
383 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
384 spec->minex = 1;
385 } else {
386 spec->min = strtod((char*)min->ptr,&eptr);
387 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
388 }
389 }
390 if (max->encoding == REDIS_ENCODING_INT) {
391 spec->max = (long)max->ptr;
392 } else {
393 if (((char*)max->ptr)[0] == '(') {
394 spec->max = strtod((char*)max->ptr+1,&eptr);
395 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
396 spec->maxex = 1;
397 } else {
398 spec->max = strtod((char*)max->ptr,&eptr);
399 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
400 }
401 }
402
403 return REDIS_OK;
404 }
405
406 /*-----------------------------------------------------------------------------
407 * Ziplist-backed sorted set API
408 *----------------------------------------------------------------------------*/
409
410 double zzlGetScore(unsigned char *sptr) {
411 unsigned char *vstr;
412 unsigned int vlen;
413 long long vlong;
414 char buf[128];
415 double score;
416
417 redisAssert(sptr != NULL);
418 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
419
420 if (vstr) {
421 memcpy(buf,vstr,vlen);
422 buf[vlen] = '\0';
423 score = strtod(buf,NULL);
424 } else {
425 score = vlong;
426 }
427
428 return score;
429 }
430
431 /* Compare element in sorted set with given element. */
432 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
433 unsigned char *vstr;
434 unsigned int vlen;
435 long long vlong;
436 unsigned char vbuf[32];
437 int minlen, cmp;
438
439 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
440 if (vstr == NULL) {
441 /* Store string representation of long long in buf. */
442 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
443 vstr = vbuf;
444 }
445
446 minlen = (vlen < clen) ? vlen : clen;
447 cmp = memcmp(vstr,cstr,minlen);
448 if (cmp == 0) return vlen-clen;
449 return cmp;
450 }
451
452 unsigned int zzlLength(robj *zobj) {
453 unsigned char *zl = zobj->ptr;
454 return ziplistLen(zl)/2;
455 }
456
457 /* Move to next entry based on the values in eptr and sptr. Both are set to
458 * NULL when there is no next entry. */
459 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
460 unsigned char *_eptr, *_sptr;
461 redisAssert(*eptr != NULL && *sptr != NULL);
462
463 _eptr = ziplistNext(zl,*sptr);
464 if (_eptr != NULL) {
465 _sptr = ziplistNext(zl,_eptr);
466 redisAssert(_sptr != NULL);
467 } else {
468 /* No next entry. */
469 _sptr = NULL;
470 }
471
472 *eptr = _eptr;
473 *sptr = _sptr;
474 }
475
476 /* Move to the previous entry based on the values in eptr and sptr. Both are
477 * set to NULL when there is no next entry. */
478 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
479 unsigned char *_eptr, *_sptr;
480 redisAssert(*eptr != NULL && *sptr != NULL);
481
482 _sptr = ziplistPrev(zl,*eptr);
483 if (_sptr != NULL) {
484 _eptr = ziplistPrev(zl,_sptr);
485 redisAssert(_eptr != NULL);
486 } else {
487 /* No previous entry. */
488 _eptr = NULL;
489 }
490
491 *eptr = _eptr;
492 *sptr = _sptr;
493 }
494
495 /* Returns if there is a part of the zset is in range. Should only be used
496 * internally by zzlFirstInRange and zzlLastInRange. */
497 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
498 unsigned char *p;
499 double score;
500
501 /* Test for ranges that will always be empty. */
502 if (range->min > range->max ||
503 (range->min == range->max && (range->minex || range->maxex)))
504 return 0;
505
506 p = ziplistIndex(zl,-1); /* Last score. */
507 redisAssert(p != NULL);
508 score = zzlGetScore(p);
509 if (!zslValueGteMin(score,range))
510 return 0;
511
512 p = ziplistIndex(zl,1); /* First score. */
513 redisAssert(p != NULL);
514 score = zzlGetScore(p);
515 if (!zslValueLteMax(score,range))
516 return 0;
517
518 return 1;
519 }
520
521 /* Find pointer to the first element contained in the specified range.
522 * Returns NULL when no element is contained in the range. */
523 unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) {
524 unsigned char *zl = zobj->ptr;
525 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
526 double score;
527
528 /* If everything is out of range, return early. */
529 if (!zzlIsInRange(zl,&range)) return NULL;
530
531 while (eptr != NULL) {
532 sptr = ziplistNext(zl,eptr);
533 redisAssert(sptr != NULL);
534
535 score = zzlGetScore(sptr);
536 if (zslValueGteMin(score,&range))
537 return eptr;
538
539 /* Move to next element. */
540 eptr = ziplistNext(zl,sptr);
541 }
542
543 return NULL;
544 }
545
546 /* Find pointer to the last element contained in the specified range.
547 * Returns NULL when no element is contained in the range. */
548 unsigned char *zzlLastInRange(robj *zobj, zrangespec range) {
549 unsigned char *zl = zobj->ptr;
550 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
551 double score;
552
553 /* If everything is out of range, return early. */
554 if (!zzlIsInRange(zl,&range)) return NULL;
555
556 while (eptr != NULL) {
557 sptr = ziplistNext(zl,eptr);
558 redisAssert(sptr != NULL);
559
560 score = zzlGetScore(sptr);
561 if (zslValueLteMax(score,&range))
562 return eptr;
563
564 /* Move to previous element by moving to the score of previous element.
565 * When this returns NULL, we know there also is no element. */
566 sptr = ziplistPrev(zl,eptr);
567 if (sptr != NULL)
568 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
569 else
570 eptr = NULL;
571 }
572
573 return NULL;
574 }
575
576 unsigned char *zzlFind(robj *zobj, robj *ele, double *score) {
577 unsigned char *zl = zobj->ptr;
578 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
579
580 ele = getDecodedObject(ele);
581 while (eptr != NULL) {
582 sptr = ziplistNext(zl,eptr);
583 redisAssert(sptr != NULL);
584
585 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
586 /* Matching element, pull out score. */
587 if (score != NULL) *score = zzlGetScore(sptr);
588 decrRefCount(ele);
589 return eptr;
590 }
591
592 /* Move to next element. */
593 eptr = ziplistNext(zl,sptr);
594 }
595
596 decrRefCount(ele);
597 return NULL;
598 }
599
600 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
601 * don't want to modify the one given as argument. */
602 int zzlDelete(robj *zobj, unsigned char *eptr) {
603 unsigned char *zl = zobj->ptr;
604 unsigned char *p = eptr;
605
606 /* TODO: add function to ziplist API to delete N elements from offset. */
607 zl = ziplistDelete(zl,&p);
608 zl = ziplistDelete(zl,&p);
609 zobj->ptr = zl;
610 return REDIS_OK;
611 }
612
613 int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) {
614 unsigned char *zl = zobj->ptr;
615 unsigned char *sptr;
616 char scorebuf[128];
617 int scorelen;
618 int offset;
619
620 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
621 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
622 if (eptr == NULL) {
623 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
624 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
625 } else {
626 /* Keep offset relative to zl, as it might be re-allocated. */
627 offset = eptr-zl;
628 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
629 eptr = zl+offset;
630
631 /* Insert score after the element. */
632 redisAssert((sptr = ziplistNext(zl,eptr)) != NULL);
633 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
634 }
635
636 zobj->ptr = zl;
637 return REDIS_OK;
638 }
639
640 /* Insert (element,score) pair in ziplist. This function assumes the element is
641 * not yet present in the list. */
642 int zzlInsert(robj *zobj, robj *ele, double score) {
643 unsigned char *zl = zobj->ptr;
644 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
645 double s;
646
647 ele = getDecodedObject(ele);
648 while (eptr != NULL) {
649 sptr = ziplistNext(zl,eptr);
650 redisAssert(sptr != NULL);
651 s = zzlGetScore(sptr);
652
653 if (s > score) {
654 /* First element with score larger than score for element to be
655 * inserted. This means we should take its spot in the list to
656 * maintain ordering. */
657 zzlInsertAt(zobj,ele,score,eptr);
658 break;
659 } else if (s == score) {
660 /* Ensure lexicographical ordering for elements. */
661 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
662 zzlInsertAt(zobj,ele,score,eptr);
663 break;
664 }
665 }
666
667 /* Move to next element. */
668 eptr = ziplistNext(zl,sptr);
669 }
670
671 /* Push on tail of list when it was not yet inserted. */
672 if (eptr == NULL)
673 zzlInsertAt(zobj,ele,score,NULL);
674
675 decrRefCount(ele);
676 return REDIS_OK;
677 }
678
679 unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) {
680 unsigned char *zl = zobj->ptr;
681 unsigned char *eptr, *sptr;
682 double score;
683 unsigned long deleted = 0;
684
685 eptr = zzlFirstInRange(zobj,range);
686 if (eptr == NULL) return deleted;
687
688
689 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
690 * byte and ziplistNext will return NULL. */
691 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
692 score = zzlGetScore(sptr);
693 if (zslValueLteMax(score,&range)) {
694 /* Delete both the element and the score. */
695 zl = ziplistDelete(zl,&eptr);
696 zl = ziplistDelete(zl,&eptr);
697 deleted++;
698 } else {
699 /* No longer in range. */
700 break;
701 }
702 }
703
704 return deleted;
705 }
706
707 /* Delete all the elements with rank between start and end from the skiplist.
708 * Start and end are inclusive. Note that start and end need to be 1-based */
709 unsigned long zzlDeleteRangeByRank(robj *zobj, unsigned int start, unsigned int end) {
710 unsigned int num = (end-start)+1;
711 zobj->ptr = ziplistDeleteRange(zobj->ptr,2*(start-1),2*num);
712 return num;
713 }
714
715 /*-----------------------------------------------------------------------------
716 * Common sorted set API
717 *----------------------------------------------------------------------------*/
718
719 int zsLength(robj *zobj) {
720 int length = -1;
721 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
722 length = zzlLength(zobj);
723 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
724 length = ((zset*)zobj->ptr)->zsl->length;
725 } else {
726 redisPanic("Unknown sorted set encoding");
727 }
728 return length;
729 }
730
731 void zsConvert(robj *zobj, int encoding) {
732 zset *zs;
733 zskiplistNode *node, *next;
734 robj *ele;
735 double score;
736
737 if (zobj->encoding == encoding) return;
738 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
739 unsigned char *zl = zobj->ptr;
740 unsigned char *eptr, *sptr;
741 unsigned char *vstr;
742 unsigned int vlen;
743 long long vlong;
744
745 if (encoding != REDIS_ENCODING_RAW)
746 redisPanic("Unknown target encoding");
747
748 zs = zmalloc(sizeof(*zs));
749 zs->dict = dictCreate(&zsetDictType,NULL);
750 zs->zsl = zslCreate();
751
752 eptr = ziplistIndex(zl,0);
753 redisAssert(eptr != NULL);
754 sptr = ziplistNext(zl,eptr);
755 redisAssert(sptr != NULL);
756
757 while (eptr != NULL) {
758 score = zzlGetScore(sptr);
759 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
760 if (vstr == NULL)
761 ele = createStringObjectFromLongLong(vlong);
762 else
763 ele = createStringObject((char*)vstr,vlen);
764
765 /* Has incremented refcount since it was just created. */
766 node = zslInsert(zs->zsl,score,ele);
767 redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
768 incrRefCount(ele); /* Added to dictionary. */
769 zzlNext(zl,&eptr,&sptr);
770 }
771
772 zfree(zobj->ptr);
773 zobj->ptr = zs;
774 zobj->encoding = REDIS_ENCODING_RAW;
775 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
776 unsigned char *zl = ziplistNew();
777
778 if (encoding != REDIS_ENCODING_ZIPLIST)
779 redisPanic("Unknown target encoding");
780
781 /* Approach similar to zslFree(), since we want to free the skiplist at
782 * the same time as creating the ziplist. */
783 zs = zobj->ptr;
784 dictRelease(zs->dict);
785 node = zs->zsl->header->level[0].forward;
786 zfree(zs->zsl->header);
787 zfree(zs->zsl);
788
789 /* Immediately store pointer to ziplist in object because it will
790 * change because of reallocations when pushing to the ziplist. */
791 zobj->ptr = zl;
792
793 while (node) {
794 ele = getDecodedObject(node->obj);
795 redisAssert(zzlInsertAt(zobj,ele,node->score,NULL) == REDIS_OK);
796 decrRefCount(ele);
797
798 next = node->level[0].forward;
799 zslFreeNode(node);
800 node = next;
801 }
802
803 zfree(zs);
804 zobj->encoding = REDIS_ENCODING_ZIPLIST;
805 } else {
806 redisPanic("Unknown sorted set encoding");
807 }
808 }
809
810 /*-----------------------------------------------------------------------------
811 * Sorted set commands
812 *----------------------------------------------------------------------------*/
813
814 /* This generic command implements both ZADD and ZINCRBY. */
815 void zaddGenericCommand(redisClient *c, int incr) {
816 static char *nanerr = "resulting score is not a number (NaN)";
817 robj *key = c->argv[1];
818 robj *ele;
819 robj *zobj;
820 robj *curobj;
821 double score, curscore = 0.0;
822
823 if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
824 return;
825
826 zobj = lookupKeyWrite(c->db,key);
827 if (zobj == NULL) {
828 if (server.zset_max_ziplist_entries == 0 ||
829 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
830 {
831 zobj = createZsetObject();
832 } else {
833 zobj = createZsetZiplistObject();
834 }
835 dbAdd(c->db,key,zobj);
836 } else {
837 if (zobj->type != REDIS_ZSET) {
838 addReply(c,shared.wrongtypeerr);
839 return;
840 }
841 }
842
843 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
844 unsigned char *eptr;
845
846 /* Prefer non-encoded element when dealing with ziplists. */
847 ele = c->argv[3];
848 if ((eptr = zzlFind(zobj,ele,&curscore)) != NULL) {
849 if (incr) {
850 score += curscore;
851 if (isnan(score)) {
852 addReplyError(c,nanerr);
853 /* Don't need to check if the sorted set is empty, because
854 * we know it has at least one element. */
855 return;
856 }
857 }
858
859 /* Remove and re-insert when score changed. */
860 if (score != curscore) {
861 redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
862 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
863
864 signalModifiedKey(c->db,key);
865 server.dirty++;
866 }
867
868 if (incr) /* ZINCRBY */
869 addReplyDouble(c,score);
870 else /* ZADD */
871 addReply(c,shared.czero);
872 } else {
873 /* Optimize: check if the element is too large or the list becomes
874 * too long *before* executing zzlInsert. */
875 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
876 if (zzlLength(zobj) > server.zset_max_ziplist_entries)
877 zsConvert(zobj,REDIS_ENCODING_RAW);
878 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
879 zsConvert(zobj,REDIS_ENCODING_RAW);
880
881 signalModifiedKey(c->db,key);
882 server.dirty++;
883
884 if (incr) /* ZINCRBY */
885 addReplyDouble(c,score);
886 else /* ZADD */
887 addReply(c,shared.cone);
888 }
889 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
890 zset *zs = zobj->ptr;
891 zskiplistNode *znode;
892 dictEntry *de;
893
894 ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
895 de = dictFind(zs->dict,ele);
896 if (de != NULL) {
897 curobj = dictGetEntryKey(de);
898 curscore = *(double*)dictGetEntryVal(de);
899
900 if (incr) {
901 score += curscore;
902 if (isnan(score)) {
903 addReplyError(c,nanerr);
904 /* Don't need to check if the sorted set is empty, because
905 * we know it has at least one element. */
906 return;
907 }
908 }
909
910 /* Remove and re-insert when score changed. We can safely delete
911 * the key object from the skiplist, since the dictionary still has
912 * a reference to it. */
913 if (score != curscore) {
914 redisAssert(zslDelete(zs->zsl,curscore,curobj));
915 znode = zslInsert(zs->zsl,score,curobj);
916 incrRefCount(curobj); /* Re-inserted in skiplist. */
917 dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
918
919 signalModifiedKey(c->db,key);
920 server.dirty++;
921 }
922
923 if (incr) /* ZINCRBY */
924 addReplyDouble(c,score);
925 else /* ZADD */
926 addReply(c,shared.czero);
927 } else {
928 znode = zslInsert(zs->zsl,score,ele);
929 incrRefCount(ele); /* Inserted in skiplist. */
930 redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
931 incrRefCount(ele); /* Added to dictionary. */
932
933 signalModifiedKey(c->db,key);
934 server.dirty++;
935
936 if (incr) /* ZINCRBY */
937 addReplyDouble(c,score);
938 else /* ZADD */
939 addReply(c,shared.cone);
940 }
941 } else {
942 redisPanic("Unknown sorted set encoding");
943 }
944 }
945
946 void zaddCommand(redisClient *c) {
947 zaddGenericCommand(c,0);
948 }
949
950 void zincrbyCommand(redisClient *c) {
951 zaddGenericCommand(c,1);
952 }
953
954 void zremCommand(redisClient *c) {
955 robj *key = c->argv[1];
956 robj *ele = c->argv[2];
957 robj *zobj;
958
959 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
960 checkType(c,zobj,REDIS_ZSET)) return;
961
962 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
963 unsigned char *eptr;
964
965 if ((eptr = zzlFind(zobj,ele,NULL)) != NULL) {
966 redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
967 if (zzlLength(zobj) == 0) dbDelete(c->db,key);
968 } else {
969 addReply(c,shared.czero);
970 return;
971 }
972 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
973 zset *zs = zobj->ptr;
974 dictEntry *de;
975 double score;
976
977 de = dictFind(zs->dict,ele);
978 if (de != NULL) {
979 /* Delete from the skiplist */
980 score = *(double*)dictGetEntryVal(de);
981 redisAssert(zslDelete(zs->zsl,score,ele));
982
983 /* Delete from the hash table */
984 dictDelete(zs->dict,ele);
985 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
986 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
987 } else {
988 addReply(c,shared.czero);
989 return;
990 }
991 } else {
992 redisPanic("Unknown sorted set encoding");
993 }
994
995 signalModifiedKey(c->db,key);
996 server.dirty++;
997 addReply(c,shared.cone);
998 }
999
1000 void zremrangebyscoreCommand(redisClient *c) {
1001 robj *key = c->argv[1];
1002 robj *zobj;
1003 zrangespec range;
1004 unsigned long deleted;
1005
1006 /* Parse the range arguments. */
1007 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1008 addReplyError(c,"min or max is not a double");
1009 return;
1010 }
1011
1012 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1013 checkType(c,zobj,REDIS_ZSET)) return;
1014
1015 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1016 deleted = zzlDeleteRangeByScore(zobj,range);
1017 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1018 zset *zs = zobj->ptr;
1019 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1020 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1021 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1022 } else {
1023 redisPanic("Unknown sorted set encoding");
1024 }
1025
1026 if (deleted) signalModifiedKey(c->db,key);
1027 server.dirty += deleted;
1028 addReplyLongLong(c,deleted);
1029 }
1030
1031 void zremrangebyrankCommand(redisClient *c) {
1032 robj *key = c->argv[1];
1033 robj *zobj;
1034 long start;
1035 long end;
1036 int llen;
1037 unsigned long deleted;
1038
1039 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1040 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1041
1042 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1043 checkType(c,zobj,REDIS_ZSET)) return;
1044
1045 /* Sanitize indexes. */
1046 llen = zsLength(zobj);
1047 if (start < 0) start = llen+start;
1048 if (end < 0) end = llen+end;
1049 if (start < 0) start = 0;
1050
1051 /* Invariant: start >= 0, so this test will be true when end < 0.
1052 * The range is empty when start > end or start >= length. */
1053 if (start > end || start >= llen) {
1054 addReply(c,shared.czero);
1055 return;
1056 }
1057 if (end >= llen) end = llen-1;
1058
1059 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1060 /* Correct for 1-based rank. */
1061 deleted = zzlDeleteRangeByRank(zobj,start+1,end+1);
1062 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1063 zset *zs = zobj->ptr;
1064
1065 /* Correct for 1-based rank. */
1066 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1067 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1068 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1069 } else {
1070 redisPanic("Unknown sorted set encoding");
1071 }
1072
1073 if (deleted) signalModifiedKey(c->db,key);
1074 server.dirty += deleted;
1075 addReplyLongLong(c,deleted);
1076 }
1077
1078 typedef struct {
1079 dict *dict;
1080 double weight;
1081 } zsetopsrc;
1082
1083 int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) {
1084 zsetopsrc *d1 = (void*) s1, *d2 = (void*) s2;
1085 unsigned long size1, size2;
1086 size1 = d1->dict ? dictSize(d1->dict) : 0;
1087 size2 = d2->dict ? dictSize(d2->dict) : 0;
1088 return size1 - size2;
1089 }
1090
1091 #define REDIS_AGGR_SUM 1
1092 #define REDIS_AGGR_MIN 2
1093 #define REDIS_AGGR_MAX 3
1094 #define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
1095
1096 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1097 if (aggregate == REDIS_AGGR_SUM) {
1098 *target = *target + val;
1099 /* The result of adding two doubles is NaN when one variable
1100 * is +inf and the other is -inf. When these numbers are added,
1101 * we maintain the convention of the result being 0.0. */
1102 if (isnan(*target)) *target = 0.0;
1103 } else if (aggregate == REDIS_AGGR_MIN) {
1104 *target = val < *target ? val : *target;
1105 } else if (aggregate == REDIS_AGGR_MAX) {
1106 *target = val > *target ? val : *target;
1107 } else {
1108 /* safety net */
1109 redisPanic("Unknown ZUNION/INTER aggregate type");
1110 }
1111 }
1112
1113 void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1114 int i, j, setnum;
1115 int aggregate = REDIS_AGGR_SUM;
1116 zsetopsrc *src;
1117 robj *dstobj;
1118 zset *dstzset;
1119 zskiplistNode *znode;
1120 dictIterator *di;
1121 dictEntry *de;
1122 int touched = 0;
1123
1124 /* expect setnum input keys to be given */
1125 setnum = atoi(c->argv[2]->ptr);
1126 if (setnum < 1) {
1127 addReplyError(c,
1128 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1129 return;
1130 }
1131
1132 /* test if the expected number of keys would overflow */
1133 if (3+setnum > c->argc) {
1134 addReply(c,shared.syntaxerr);
1135 return;
1136 }
1137
1138 /* read keys to be used for input */
1139 src = zmalloc(sizeof(zsetopsrc) * setnum);
1140 for (i = 0, j = 3; i < setnum; i++, j++) {
1141 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1142 if (!obj) {
1143 src[i].dict = NULL;
1144 } else {
1145 if (obj->type == REDIS_ZSET) {
1146 src[i].dict = ((zset*)obj->ptr)->dict;
1147 } else if (obj->type == REDIS_SET) {
1148 src[i].dict = (obj->ptr);
1149 } else {
1150 zfree(src);
1151 addReply(c,shared.wrongtypeerr);
1152 return;
1153 }
1154 }
1155
1156 /* default all weights to 1 */
1157 src[i].weight = 1.0;
1158 }
1159
1160 /* parse optional extra arguments */
1161 if (j < c->argc) {
1162 int remaining = c->argc - j;
1163
1164 while (remaining) {
1165 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1166 j++; remaining--;
1167 for (i = 0; i < setnum; i++, j++, remaining--) {
1168 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1169 "weight value is not a double") != REDIS_OK)
1170 {
1171 zfree(src);
1172 return;
1173 }
1174 }
1175 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1176 j++; remaining--;
1177 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1178 aggregate = REDIS_AGGR_SUM;
1179 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1180 aggregate = REDIS_AGGR_MIN;
1181 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1182 aggregate = REDIS_AGGR_MAX;
1183 } else {
1184 zfree(src);
1185 addReply(c,shared.syntaxerr);
1186 return;
1187 }
1188 j++; remaining--;
1189 } else {
1190 zfree(src);
1191 addReply(c,shared.syntaxerr);
1192 return;
1193 }
1194 }
1195 }
1196
1197 /* sort sets from the smallest to largest, this will improve our
1198 * algorithm's performance */
1199 qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality);
1200
1201 dstobj = createZsetObject();
1202 dstzset = dstobj->ptr;
1203
1204 if (op == REDIS_OP_INTER) {
1205 /* skip going over all entries if the smallest zset is NULL or empty */
1206 if (src[0].dict && dictSize(src[0].dict) > 0) {
1207 /* precondition: as src[0].dict is non-empty and the zsets are ordered
1208 * from small to large, all src[i > 0].dict are non-empty too */
1209 di = dictGetIterator(src[0].dict);
1210 while((de = dictNext(di)) != NULL) {
1211 double score, value;
1212
1213 score = src[0].weight * zunionInterDictValue(de);
1214 for (j = 1; j < setnum; j++) {
1215 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
1216 if (other) {
1217 value = src[j].weight * zunionInterDictValue(other);
1218 zunionInterAggregate(&score,value,aggregate);
1219 } else {
1220 break;
1221 }
1222 }
1223
1224 /* Only continue when present in every source dict. */
1225 if (j == setnum) {
1226 robj *o = dictGetEntryKey(de);
1227 znode = zslInsert(dstzset->zsl,score,o);
1228 incrRefCount(o); /* added to skiplist */
1229 dictAdd(dstzset->dict,o,&znode->score);
1230 incrRefCount(o); /* added to dictionary */
1231 }
1232 }
1233 dictReleaseIterator(di);
1234 }
1235 } else if (op == REDIS_OP_UNION) {
1236 for (i = 0; i < setnum; i++) {
1237 if (!src[i].dict) continue;
1238
1239 di = dictGetIterator(src[i].dict);
1240 while((de = dictNext(di)) != NULL) {
1241 double score, value;
1242
1243 /* skip key when already processed */
1244 if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL)
1245 continue;
1246
1247 /* initialize score */
1248 score = src[i].weight * zunionInterDictValue(de);
1249
1250 /* because the zsets are sorted by size, its only possible
1251 * for sets at larger indices to hold this entry */
1252 for (j = (i+1); j < setnum; j++) {
1253 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
1254 if (other) {
1255 value = src[j].weight * zunionInterDictValue(other);
1256 zunionInterAggregate(&score,value,aggregate);
1257 }
1258 }
1259
1260 robj *o = dictGetEntryKey(de);
1261 znode = zslInsert(dstzset->zsl,score,o);
1262 incrRefCount(o); /* added to skiplist */
1263 dictAdd(dstzset->dict,o,&znode->score);
1264 incrRefCount(o); /* added to dictionary */
1265 }
1266 dictReleaseIterator(di);
1267 }
1268 } else {
1269 /* unknown operator */
1270 redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION);
1271 }
1272
1273 if (dbDelete(c->db,dstkey)) {
1274 signalModifiedKey(c->db,dstkey);
1275 touched = 1;
1276 server.dirty++;
1277 }
1278 if (dstzset->zsl->length) {
1279 dbAdd(c->db,dstkey,dstobj);
1280 addReplyLongLong(c, dstzset->zsl->length);
1281 if (!touched) signalModifiedKey(c->db,dstkey);
1282 server.dirty++;
1283 } else {
1284 decrRefCount(dstobj);
1285 addReply(c, shared.czero);
1286 }
1287 zfree(src);
1288 }
1289
1290 void zunionstoreCommand(redisClient *c) {
1291 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1292 }
1293
1294 void zinterstoreCommand(redisClient *c) {
1295 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1296 }
1297
1298 void zrangeGenericCommand(redisClient *c, int reverse) {
1299 robj *key = c->argv[1];
1300 robj *zobj;
1301 int withscores = 0;
1302 long start;
1303 long end;
1304 int llen;
1305 int rangelen;
1306
1307 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1308 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1309
1310 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1311 withscores = 1;
1312 } else if (c->argc >= 5) {
1313 addReply(c,shared.syntaxerr);
1314 return;
1315 }
1316
1317 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1318 || checkType(c,zobj,REDIS_ZSET)) return;
1319
1320 /* Sanitize indexes. */
1321 llen = zsLength(zobj);
1322 if (start < 0) start = llen+start;
1323 if (end < 0) end = llen+end;
1324 if (start < 0) start = 0;
1325
1326 /* Invariant: start >= 0, so this test will be true when end < 0.
1327 * The range is empty when start > end or start >= length. */
1328 if (start > end || start >= llen) {
1329 addReply(c,shared.emptymultibulk);
1330 return;
1331 }
1332 if (end >= llen) end = llen-1;
1333 rangelen = (end-start)+1;
1334
1335 /* Return the result in form of a multi-bulk reply */
1336 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1337
1338 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1339 unsigned char *zl = zobj->ptr;
1340 unsigned char *eptr, *sptr;
1341 unsigned char *vstr;
1342 unsigned int vlen;
1343 long long vlong;
1344
1345 if (reverse)
1346 eptr = ziplistIndex(zl,-2-(2*start));
1347 else
1348 eptr = ziplistIndex(zl,2*start);
1349
1350 redisAssert(eptr != NULL);
1351 sptr = ziplistNext(zl,eptr);
1352
1353 while (rangelen--) {
1354 redisAssert(eptr != NULL && sptr != NULL);
1355 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1356 if (vstr == NULL)
1357 addReplyBulkLongLong(c,vlong);
1358 else
1359 addReplyBulkCBuffer(c,vstr,vlen);
1360
1361 if (withscores)
1362 addReplyDouble(c,zzlGetScore(sptr));
1363
1364 if (reverse)
1365 zzlPrev(zl,&eptr,&sptr);
1366 else
1367 zzlNext(zl,&eptr,&sptr);
1368 }
1369
1370 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1371 zset *zs = zobj->ptr;
1372 zskiplist *zsl = zs->zsl;
1373 zskiplistNode *ln;
1374 robj *ele;
1375
1376 /* Check if starting point is trivial, before doing log(N) lookup. */
1377 if (reverse) {
1378 ln = zsl->tail;
1379 if (start > 0)
1380 ln = zslGetElementByRank(zsl,llen-start);
1381 } else {
1382 ln = zsl->header->level[0].forward;
1383 if (start > 0)
1384 ln = zslGetElementByRank(zsl,start+1);
1385 }
1386
1387 while(rangelen--) {
1388 redisAssert(ln != NULL);
1389 ele = ln->obj;
1390 addReplyBulk(c,ele);
1391 if (withscores)
1392 addReplyDouble(c,ln->score);
1393 ln = reverse ? ln->backward : ln->level[0].forward;
1394 }
1395 } else {
1396 redisPanic("Unknown sorted set encoding");
1397 }
1398 }
1399
1400 void zrangeCommand(redisClient *c) {
1401 zrangeGenericCommand(c,0);
1402 }
1403
1404 void zrevrangeCommand(redisClient *c) {
1405 zrangeGenericCommand(c,1);
1406 }
1407
1408 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
1409 * If "justcount", only the number of elements in the range is returned. */
1410 void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
1411 zrangespec range;
1412 robj *key = c->argv[1];
1413 robj *emptyreply, *zobj;
1414 int offset = 0, limit = -1;
1415 int withscores = 0;
1416 unsigned long rangelen = 0;
1417 void *replylen = NULL;
1418 int minidx, maxidx;
1419
1420 /* Parse the range arguments. */
1421 if (reverse) {
1422 /* Range is given as [max,min] */
1423 maxidx = 2; minidx = 3;
1424 } else {
1425 /* Range is given as [min,max] */
1426 minidx = 2; maxidx = 3;
1427 }
1428
1429 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
1430 addReplyError(c,"min or max is not a double");
1431 return;
1432 }
1433
1434 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1435 * 4 arguments, so we'll never enter the following code path. */
1436 if (c->argc > 4) {
1437 int remaining = c->argc - 4;
1438 int pos = 4;
1439
1440 while (remaining) {
1441 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1442 pos++; remaining--;
1443 withscores = 1;
1444 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1445 offset = atoi(c->argv[pos+1]->ptr);
1446 limit = atoi(c->argv[pos+2]->ptr);
1447 pos += 3; remaining -= 3;
1448 } else {
1449 addReply(c,shared.syntaxerr);
1450 return;
1451 }
1452 }
1453 }
1454
1455 /* Ok, lookup the key and get the range */
1456 emptyreply = justcount ? shared.czero : shared.emptymultibulk;
1457 if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL ||
1458 checkType(c,zobj,REDIS_ZSET)) return;
1459
1460 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1461 unsigned char *zl = zobj->ptr;
1462 unsigned char *eptr, *sptr;
1463 unsigned char *vstr;
1464 unsigned int vlen;
1465 long long vlong;
1466 double score;
1467
1468 /* If reversed, get the last node in range as starting point. */
1469 if (reverse)
1470 eptr = zzlLastInRange(zobj,range);
1471 else
1472 eptr = zzlFirstInRange(zobj,range);
1473
1474 /* No "first" element in the specified interval. */
1475 if (eptr == NULL) {
1476 addReply(c,emptyreply);
1477 return;
1478 }
1479
1480 /* Get score pointer for the first element. */
1481 redisAssert(eptr != NULL);
1482 sptr = ziplistNext(zl,eptr);
1483
1484 /* We don't know in advance how many matching elements there are in the
1485 * list, so we push this object that will represent the multi-bulk
1486 * length in the output buffer, and will "fix" it later */
1487 if (!justcount)
1488 replylen = addDeferredMultiBulkLength(c);
1489
1490 /* If there is an offset, just traverse the number of elements without
1491 * checking the score because that is done in the next loop. */
1492 while (eptr && offset--)
1493 if (reverse)
1494 zzlPrev(zl,&eptr,&sptr);
1495 else
1496 zzlNext(zl,&eptr,&sptr);
1497
1498 while (eptr && limit--) {
1499 score = zzlGetScore(sptr);
1500
1501 /* Abort when the node is no longer in range. */
1502 if (reverse) {
1503 if (!zslValueGteMin(score,&range)) break;
1504 } else {
1505 if (!zslValueLteMax(score,&range)) break;
1506 }
1507
1508 /* Do our magic */
1509 rangelen++;
1510 if (!justcount) {
1511 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1512 if (vstr == NULL)
1513 addReplyBulkLongLong(c,vlong);
1514 else
1515 addReplyBulkCBuffer(c,vstr,vlen);
1516
1517 if (withscores)
1518 addReplyDouble(c,score);
1519 }
1520
1521 /* Move to next node */
1522 if (reverse)
1523 zzlPrev(zl,&eptr,&sptr);
1524 else
1525 zzlNext(zl,&eptr,&sptr);
1526 }
1527 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1528 zset *zs = zobj->ptr;
1529 zskiplist *zsl = zs->zsl;
1530 zskiplistNode *ln;
1531
1532 /* If reversed, get the last node in range as starting point. */
1533 if (reverse)
1534 ln = zslLastInRange(zsl,range);
1535 else
1536 ln = zslFirstInRange(zsl,range);
1537
1538 /* No "first" element in the specified interval. */
1539 if (ln == NULL) {
1540 addReply(c,emptyreply);
1541 return;
1542 }
1543
1544 /* We don't know in advance how many matching elements there are in the
1545 * list, so we push this object that will represent the multi-bulk
1546 * length in the output buffer, and will "fix" it later */
1547 if (!justcount)
1548 replylen = addDeferredMultiBulkLength(c);
1549
1550 /* If there is an offset, just traverse the number of elements without
1551 * checking the score because that is done in the next loop. */
1552 while (ln && offset--)
1553 ln = reverse ? ln->backward : ln->level[0].forward;
1554
1555 while (ln && limit--) {
1556 /* Abort when the node is no longer in range. */
1557 if (reverse) {
1558 if (!zslValueGteMin(ln->score,&range)) break;
1559 } else {
1560 if (!zslValueLteMax(ln->score,&range)) break;
1561 }
1562
1563 /* Do our magic */
1564 rangelen++;
1565 if (!justcount) {
1566 addReplyBulk(c,ln->obj);
1567 if (withscores)
1568 addReplyDouble(c,ln->score);
1569 }
1570
1571 /* Move to next node */
1572 ln = reverse ? ln->backward : ln->level[0].forward;
1573 }
1574 } else {
1575 redisPanic("Unknown sorted set encoding");
1576 }
1577
1578 if (justcount) {
1579 addReplyLongLong(c,(long)rangelen);
1580 } else {
1581 if (withscores) rangelen *= 2;
1582 setDeferredMultiBulkLength(c,replylen,rangelen);
1583 }
1584 }
1585
1586 void zrangebyscoreCommand(redisClient *c) {
1587 genericZrangebyscoreCommand(c,0,0);
1588 }
1589
1590 void zrevrangebyscoreCommand(redisClient *c) {
1591 genericZrangebyscoreCommand(c,1,0);
1592 }
1593
1594 void zcountCommand(redisClient *c) {
1595 genericZrangebyscoreCommand(c,0,1);
1596 }
1597
1598 void zcardCommand(redisClient *c) {
1599 robj *key = c->argv[1];
1600 robj *zobj;
1601
1602 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
1603 checkType(c,zobj,REDIS_ZSET)) return;
1604
1605 addReplyLongLong(c,zzlLength(zobj));
1606 }
1607
1608 void zscoreCommand(redisClient *c) {
1609 robj *key = c->argv[1];
1610 robj *zobj;
1611 double score;
1612
1613 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1614 checkType(c,zobj,REDIS_ZSET)) return;
1615
1616 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1617 if (zzlFind(zobj,c->argv[2],&score) != NULL)
1618 addReplyDouble(c,score);
1619 else
1620 addReply(c,shared.nullbulk);
1621 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1622 zset *zs = zobj->ptr;
1623 dictEntry *de;
1624
1625 c->argv[2] = tryObjectEncoding(c->argv[2]);
1626 de = dictFind(zs->dict,c->argv[2]);
1627 if (de != NULL) {
1628 score = *(double*)dictGetEntryVal(de);
1629 addReplyDouble(c,score);
1630 } else {
1631 addReply(c,shared.nullbulk);
1632 }
1633 } else {
1634 redisPanic("Unknown sorted set encoding");
1635 }
1636 }
1637
1638 void zrankGenericCommand(redisClient *c, int reverse) {
1639 robj *key = c->argv[1];
1640 robj *ele = c->argv[2];
1641 robj *zobj;
1642 unsigned long llen;
1643 unsigned long rank;
1644
1645 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1646 checkType(c,zobj,REDIS_ZSET)) return;
1647 llen = zsLength(zobj);
1648
1649 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
1650 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1651 unsigned char *zl = zobj->ptr;
1652 unsigned char *eptr, *sptr;
1653
1654 eptr = ziplistIndex(zl,0);
1655 redisAssert(eptr != NULL);
1656 sptr = ziplistNext(zl,eptr);
1657 redisAssert(sptr != NULL);
1658
1659 rank = 1;
1660 while(eptr != NULL) {
1661 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
1662 break;
1663 rank++;
1664 zzlNext(zl,&eptr,&sptr);
1665 }
1666
1667 if (eptr != NULL) {
1668 if (reverse)
1669 addReplyLongLong(c,llen-rank);
1670 else
1671 addReplyLongLong(c,rank-1);
1672 } else {
1673 addReply(c,shared.nullbulk);
1674 }
1675 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1676 zset *zs = zobj->ptr;
1677 zskiplist *zsl = zs->zsl;
1678 dictEntry *de;
1679 double score;
1680
1681 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
1682 de = dictFind(zs->dict,ele);
1683 if (de != NULL) {
1684 score = *(double*)dictGetEntryVal(de);
1685 rank = zslGetRank(zsl,score,ele);
1686 redisAssert(rank); /* Existing elements always have a rank. */
1687 if (reverse)
1688 addReplyLongLong(c,llen-rank);
1689 else
1690 addReplyLongLong(c,rank-1);
1691 } else {
1692 addReply(c,shared.nullbulk);
1693 }
1694 } else {
1695 redisPanic("Unknown sorted set encoding");
1696 }
1697 }
1698
1699 void zrankCommand(redisClient *c) {
1700 zrankGenericCommand(c, 0);
1701 }
1702
1703 void zrevrankCommand(redisClient *c) {
1704 zrankGenericCommand(c, 1);
1705 }