]> git.saurik.com Git - redis.git/blame - src/t_zset.c
Encode sorted set after loading from dump
[redis.git] / src / t_zset.c
CommitLineData
e2641e09 1#include "redis.h"
2
3#include <math.h>
4
5/*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9/* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17/* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
2159782b 27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
e2641e09 28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31}
32
33zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
2159782b
PN
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
e2641e09 44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48}
49
50void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
e2641e09 52 zfree(node);
53}
54
55void zslFree(zskiplist *zsl) {
2159782b 56 zskiplistNode *node = zsl->header->level[0].forward, *next;
e2641e09 57
e2641e09 58 zfree(zsl->header);
59 while(node) {
2159782b 60 next = node->level[0].forward;
e2641e09 61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65}
66
67int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72}
73
69ef89f2 74zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
e2641e09 75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
2159782b
PN
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
e2641e09 89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
2159782b 101 update[i]->level[i].span = zsl->length;
e2641e09 102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
2159782b
PN
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
e2641e09 109
110 /* update span covered by update[i] as x is inserted here */
2159782b
PN
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
e2641e09 113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
2159782b 117 update[i]->level[i].span++;
e2641e09 118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
2159782b
PN
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
e2641e09 123 else
124 zsl->tail = x;
125 zsl->length++;
69ef89f2 126 return x;
e2641e09 127}
128
129/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
2159782b
PN
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
e2641e09 136 } else {
2159782b 137 update[i]->level[i].span -= 1;
e2641e09 138 }
139 }
2159782b
PN
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
e2641e09 142 } else {
143 zsl->tail = x->backward;
144 }
2159782b 145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
e2641e09 146 zsl->level--;
147 zsl->length--;
148}
149
150/* Delete an element with matching score/object from the skiplist. */
151int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
e2641e09 162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
2159782b 166 x = x->level[0].forward;
e2641e09 167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175}
176
91504b6c
PN
177/* Struct to hold a inclusive/exclusive range spec. */
178typedef struct {
179 double min, max;
180 int minex, maxex; /* are min or max exclusive? */
181} zrangespec;
182
45290ad9 183static int zslValueGteMin(double value, zrangespec *spec) {
22b9bf15
PN
184 return spec->minex ? (value > spec->min) : (value >= spec->min);
185}
186
45290ad9 187static int zslValueLteMax(double value, zrangespec *spec) {
22b9bf15
PN
188 return spec->maxex ? (value < spec->max) : (value <= spec->max);
189}
190
df278b8b 191static int zslValueInRange(double value, zrangespec *spec) {
45290ad9 192 return zslValueGteMin(value,spec) && zslValueLteMax(value,spec);
22b9bf15
PN
193}
194
195/* Returns if there is a part of the zset is in range. */
196int zslIsInRange(zskiplist *zsl, zrangespec *range) {
197 zskiplistNode *x;
198
8e1b3277
PN
199 /* Test for ranges that will always be empty. */
200 if (range->min > range->max ||
201 (range->min == range->max && (range->minex || range->maxex)))
202 return 0;
22b9bf15 203 x = zsl->tail;
45290ad9 204 if (x == NULL || !zslValueGteMin(x->score,range))
22b9bf15
PN
205 return 0;
206 x = zsl->header->level[0].forward;
45290ad9 207 if (x == NULL || !zslValueLteMax(x->score,range))
22b9bf15
PN
208 return 0;
209 return 1;
210}
211
212/* Find the first node that is contained in the specified range.
213 * Returns NULL when no element is contained in the range. */
214zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
215 zskiplistNode *x;
216 int i;
217
218 /* If everything is out of range, return early. */
219 if (!zslIsInRange(zsl,&range)) return NULL;
220
221 x = zsl->header;
222 for (i = zsl->level-1; i >= 0; i--) {
223 /* Go forward while *OUT* of range. */
224 while (x->level[i].forward &&
45290ad9 225 !zslValueGteMin(x->level[i].forward->score,&range))
22b9bf15
PN
226 x = x->level[i].forward;
227 }
228
229 /* The tail is in range, so the previous block should always return a
230 * node that is non-NULL and the last one to be out of range. */
231 x = x->level[0].forward;
232 redisAssert(x != NULL && zslValueInRange(x->score,&range));
233 return x;
234}
235
236/* Find the last node that is contained in the specified range.
237 * Returns NULL when no element is contained in the range. */
238zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
239 zskiplistNode *x;
240 int i;
241
242 /* If everything is out of range, return early. */
243 if (!zslIsInRange(zsl,&range)) return NULL;
244
245 x = zsl->header;
246 for (i = zsl->level-1; i >= 0; i--) {
247 /* Go forward while *IN* range. */
248 while (x->level[i].forward &&
45290ad9 249 zslValueLteMax(x->level[i].forward->score,&range))
22b9bf15
PN
250 x = x->level[i].forward;
251 }
252
253 /* The header is in range, so the previous block should always return a
254 * node that is non-NULL and in range. */
255 redisAssert(x != NULL && zslValueInRange(x->score,&range));
256 return x;
257}
258
e2641e09 259/* Delete all the elements with score between min and max from the skiplist.
260 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
261 * Note that this function takes the reference to the hash table view of the
262 * sorted set, in order to remove the elements from the hash table too. */
91504b6c 263unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
e2641e09 264 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
265 unsigned long removed = 0;
266 int i;
267
268 x = zsl->header;
269 for (i = zsl->level-1; i >= 0; i--) {
91504b6c
PN
270 while (x->level[i].forward && (range.minex ?
271 x->level[i].forward->score <= range.min :
272 x->level[i].forward->score < range.min))
273 x = x->level[i].forward;
e2641e09 274 update[i] = x;
275 }
91504b6c
PN
276
277 /* Current node is the last with score < or <= min. */
2159782b 278 x = x->level[0].forward;
91504b6c
PN
279
280 /* Delete nodes while in range. */
281 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
2159782b 282 zskiplistNode *next = x->level[0].forward;
69ef89f2 283 zslDeleteNode(zsl,x,update);
e2641e09 284 dictDelete(dict,x->obj);
285 zslFreeNode(x);
286 removed++;
287 x = next;
288 }
91504b6c 289 return removed;
e2641e09 290}
291
292/* Delete all the elements with rank between start and end from the skiplist.
293 * Start and end are inclusive. Note that start and end need to be 1-based */
294unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
295 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
296 unsigned long traversed = 0, removed = 0;
297 int i;
298
299 x = zsl->header;
300 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
301 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
302 traversed += x->level[i].span;
303 x = x->level[i].forward;
e2641e09 304 }
305 update[i] = x;
306 }
307
308 traversed++;
2159782b 309 x = x->level[0].forward;
e2641e09 310 while (x && traversed <= end) {
2159782b 311 zskiplistNode *next = x->level[0].forward;
69ef89f2 312 zslDeleteNode(zsl,x,update);
e2641e09 313 dictDelete(dict,x->obj);
314 zslFreeNode(x);
315 removed++;
316 traversed++;
317 x = next;
318 }
319 return removed;
320}
321
e2641e09 322/* Find the rank for an element by both score and key.
323 * Returns 0 when the element cannot be found, rank otherwise.
324 * Note that the rank is 1-based due to the span of zsl->header to the
325 * first element. */
a3004773 326unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
e2641e09 327 zskiplistNode *x;
328 unsigned long rank = 0;
329 int i;
330
331 x = zsl->header;
332 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
333 while (x->level[i].forward &&
334 (x->level[i].forward->score < score ||
335 (x->level[i].forward->score == score &&
336 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
337 rank += x->level[i].span;
338 x = x->level[i].forward;
e2641e09 339 }
340
341 /* x might be equal to zsl->header, so test if obj is non-NULL */
342 if (x->obj && equalStringObjects(x->obj,o)) {
343 return rank;
344 }
345 }
346 return 0;
347}
348
349/* Finds an element by its rank. The rank argument needs to be 1-based. */
a3004773 350zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
e2641e09 351 zskiplistNode *x;
352 unsigned long traversed = 0;
353 int i;
354
355 x = zsl->header;
356 for (i = zsl->level-1; i >= 0; i--) {
2159782b 357 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
e2641e09 358 {
2159782b
PN
359 traversed += x->level[i].span;
360 x = x->level[i].forward;
e2641e09 361 }
362 if (traversed == rank) {
363 return x;
364 }
365 }
366 return NULL;
367}
368
25bb8a44 369/* Populate the rangespec according to the objects min and max. */
7236fdb2
PN
370static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
371 char *eptr;
25bb8a44
PN
372 spec->minex = spec->maxex = 0;
373
374 /* Parse the min-max interval. If one of the values is prefixed
375 * by the "(" character, it's considered "open". For instance
376 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
377 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
378 if (min->encoding == REDIS_ENCODING_INT) {
379 spec->min = (long)min->ptr;
380 } else {
381 if (((char*)min->ptr)[0] == '(') {
7236fdb2
PN
382 spec->min = strtod((char*)min->ptr+1,&eptr);
383 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
384 spec->minex = 1;
385 } else {
7236fdb2
PN
386 spec->min = strtod((char*)min->ptr,&eptr);
387 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
388 }
389 }
390 if (max->encoding == REDIS_ENCODING_INT) {
391 spec->max = (long)max->ptr;
392 } else {
393 if (((char*)max->ptr)[0] == '(') {
7236fdb2
PN
394 spec->max = strtod((char*)max->ptr+1,&eptr);
395 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
396 spec->maxex = 1;
397 } else {
7236fdb2
PN
398 spec->max = strtod((char*)max->ptr,&eptr);
399 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
400 }
401 }
402
403 return REDIS_OK;
404}
405
21c5b508
PN
406/*-----------------------------------------------------------------------------
407 * Ziplist-backed sorted set API
408 *----------------------------------------------------------------------------*/
409
410double zzlGetScore(unsigned char *sptr) {
411 unsigned char *vstr;
412 unsigned int vlen;
413 long long vlong;
414 char buf[128];
415 double score;
416
417 redisAssert(sptr != NULL);
418 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
419
420 if (vstr) {
421 memcpy(buf,vstr,vlen);
422 buf[vlen] = '\0';
423 score = strtod(buf,NULL);
424 } else {
425 score = vlong;
426 }
427
428 return score;
429}
430
431/* Compare element in sorted set with given element. */
432int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
433 unsigned char *vstr;
434 unsigned int vlen;
435 long long vlong;
436 unsigned char vbuf[32];
437 int minlen, cmp;
438
439 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
440 if (vstr == NULL) {
441 /* Store string representation of long long in buf. */
442 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
443 vstr = vbuf;
444 }
445
446 minlen = (vlen < clen) ? vlen : clen;
447 cmp = memcmp(vstr,cstr,minlen);
448 if (cmp == 0) return vlen-clen;
449 return cmp;
450}
451
bbfe232f 452unsigned int zzlLength(unsigned char *zl) {
0b10e104
PN
453 return ziplistLen(zl)/2;
454}
455
4c5f0966
PN
456/* Move to next entry based on the values in eptr and sptr. Both are set to
457 * NULL when there is no next entry. */
458void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
459 unsigned char *_eptr, *_sptr;
460 redisAssert(*eptr != NULL && *sptr != NULL);
461
462 _eptr = ziplistNext(zl,*sptr);
463 if (_eptr != NULL) {
464 _sptr = ziplistNext(zl,_eptr);
465 redisAssert(_sptr != NULL);
466 } else {
467 /* No next entry. */
468 _sptr = NULL;
469 }
470
471 *eptr = _eptr;
472 *sptr = _sptr;
473}
474
475/* Move to the previous entry based on the values in eptr and sptr. Both are
476 * set to NULL when there is no next entry. */
477void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
478 unsigned char *_eptr, *_sptr;
479 redisAssert(*eptr != NULL && *sptr != NULL);
480
481 _sptr = ziplistPrev(zl,*eptr);
482 if (_sptr != NULL) {
483 _eptr = ziplistPrev(zl,_sptr);
484 redisAssert(_eptr != NULL);
485 } else {
486 /* No previous entry. */
487 _eptr = NULL;
488 }
489
490 *eptr = _eptr;
491 *sptr = _sptr;
492}
493
4a14dbba
PN
494/* Returns if there is a part of the zset is in range. Should only be used
495 * internally by zzlFirstInRange and zzlLastInRange. */
496int zzlIsInRange(unsigned char *zl, zrangespec *range) {
497 unsigned char *p;
498 double score;
499
500 /* Test for ranges that will always be empty. */
501 if (range->min > range->max ||
502 (range->min == range->max && (range->minex || range->maxex)))
503 return 0;
504
505 p = ziplistIndex(zl,-1); /* Last score. */
506 redisAssert(p != NULL);
507 score = zzlGetScore(p);
508 if (!zslValueGteMin(score,range))
509 return 0;
510
511 p = ziplistIndex(zl,1); /* First score. */
512 redisAssert(p != NULL);
513 score = zzlGetScore(p);
514 if (!zslValueLteMax(score,range))
515 return 0;
516
517 return 1;
518}
519
520/* Find pointer to the first element contained in the specified range.
521 * Returns NULL when no element is contained in the range. */
522unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) {
523 unsigned char *zl = zobj->ptr;
524 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
525 double score;
526
527 /* If everything is out of range, return early. */
528 if (!zzlIsInRange(zl,&range)) return NULL;
529
530 while (eptr != NULL) {
531 sptr = ziplistNext(zl,eptr);
532 redisAssert(sptr != NULL);
533
534 score = zzlGetScore(sptr);
535 if (zslValueGteMin(score,&range))
536 return eptr;
537
538 /* Move to next element. */
539 eptr = ziplistNext(zl,sptr);
540 }
541
542 return NULL;
543}
544
545/* Find pointer to the last element contained in the specified range.
546 * Returns NULL when no element is contained in the range. */
547unsigned char *zzlLastInRange(robj *zobj, zrangespec range) {
548 unsigned char *zl = zobj->ptr;
549 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
550 double score;
551
552 /* If everything is out of range, return early. */
553 if (!zzlIsInRange(zl,&range)) return NULL;
554
555 while (eptr != NULL) {
556 sptr = ziplistNext(zl,eptr);
557 redisAssert(sptr != NULL);
558
559 score = zzlGetScore(sptr);
560 if (zslValueLteMax(score,&range))
561 return eptr;
562
563 /* Move to previous element by moving to the score of previous element.
564 * When this returns NULL, we know there also is no element. */
565 sptr = ziplistPrev(zl,eptr);
566 if (sptr != NULL)
567 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
568 else
569 eptr = NULL;
570 }
571
572 return NULL;
573}
574
21c5b508
PN
575unsigned char *zzlFind(robj *zobj, robj *ele, double *score) {
576 unsigned char *zl = zobj->ptr;
577 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
578
579 ele = getDecodedObject(ele);
580 while (eptr != NULL) {
581 sptr = ziplistNext(zl,eptr);
582 redisAssert(sptr != NULL);
583
584 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
585 /* Matching element, pull out score. */
0b10e104 586 if (score != NULL) *score = zzlGetScore(sptr);
21c5b508
PN
587 decrRefCount(ele);
588 return eptr;
589 }
590
591 /* Move to next element. */
592 eptr = ziplistNext(zl,sptr);
593 }
594
595 decrRefCount(ele);
596 return NULL;
597}
598
599/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
600 * don't want to modify the one given as argument. */
601int zzlDelete(robj *zobj, unsigned char *eptr) {
602 unsigned char *zl = zobj->ptr;
603 unsigned char *p = eptr;
604
605 /* TODO: add function to ziplist API to delete N elements from offset. */
606 zl = ziplistDelete(zl,&p);
607 zl = ziplistDelete(zl,&p);
608 zobj->ptr = zl;
609 return REDIS_OK;
610}
611
612int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) {
613 unsigned char *zl = zobj->ptr;
614 unsigned char *sptr;
615 char scorebuf[128];
616 int scorelen;
617 int offset;
618
619 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
620 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
621 if (eptr == NULL) {
622 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
623 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
624 } else {
625 /* Keep offset relative to zl, as it might be re-allocated. */
626 offset = eptr-zl;
627 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
628 eptr = zl+offset;
629
630 /* Insert score after the element. */
631 redisAssert((sptr = ziplistNext(zl,eptr)) != NULL);
632 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
633 }
634
635 zobj->ptr = zl;
636 return REDIS_OK;
637}
638
639/* Insert (element,score) pair in ziplist. This function assumes the element is
640 * not yet present in the list. */
641int zzlInsert(robj *zobj, robj *ele, double score) {
642 unsigned char *zl = zobj->ptr;
643 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
644 double s;
21c5b508
PN
645
646 ele = getDecodedObject(ele);
647 while (eptr != NULL) {
648 sptr = ziplistNext(zl,eptr);
649 redisAssert(sptr != NULL);
650 s = zzlGetScore(sptr);
651
652 if (s > score) {
653 /* First element with score larger than score for element to be
654 * inserted. This means we should take its spot in the list to
655 * maintain ordering. */
21c5b508
PN
656 zzlInsertAt(zobj,ele,score,eptr);
657 break;
8218db3d
PN
658 } else if (s == score) {
659 /* Ensure lexicographical ordering for elements. */
d1c920c5 660 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
8218db3d
PN
661 zzlInsertAt(zobj,ele,score,eptr);
662 break;
663 }
21c5b508
PN
664 }
665
666 /* Move to next element. */
667 eptr = ziplistNext(zl,sptr);
668 }
669
670 /* Push on tail of list when it was not yet inserted. */
8218db3d
PN
671 if (eptr == NULL)
672 zzlInsertAt(zobj,ele,score,NULL);
21c5b508
PN
673
674 decrRefCount(ele);
675 return REDIS_OK;
676}
25bb8a44 677
4a14dbba
PN
678unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) {
679 unsigned char *zl = zobj->ptr;
680 unsigned char *eptr, *sptr;
681 double score;
682 unsigned long deleted = 0;
683
684 eptr = zzlFirstInRange(zobj,range);
685 if (eptr == NULL) return deleted;
686
687
688 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
689 * byte and ziplistNext will return NULL. */
690 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
691 score = zzlGetScore(sptr);
692 if (zslValueLteMax(score,&range)) {
693 /* Delete both the element and the score. */
694 zl = ziplistDelete(zl,&eptr);
695 zl = ziplistDelete(zl,&eptr);
696 deleted++;
697 } else {
698 /* No longer in range. */
699 break;
700 }
701 }
702
703 return deleted;
704}
705
63b7b7fb
PN
706/* Delete all the elements with rank between start and end from the skiplist.
707 * Start and end are inclusive. Note that start and end need to be 1-based */
708unsigned long zzlDeleteRangeByRank(robj *zobj, unsigned int start, unsigned int end) {
709 unsigned int num = (end-start)+1;
710 zobj->ptr = ziplistDeleteRange(zobj->ptr,2*(start-1),2*num);
711 return num;
712}
713
5d1b4fb6
PN
714/*-----------------------------------------------------------------------------
715 * Common sorted set API
716 *----------------------------------------------------------------------------*/
717
df26a0ae 718unsigned int zsetLength(robj *zobj) {
5d1b4fb6
PN
719 int length = -1;
720 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
bbfe232f 721 length = zzlLength(zobj->ptr);
5d1b4fb6
PN
722 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
723 length = ((zset*)zobj->ptr)->zsl->length;
724 } else {
725 redisPanic("Unknown sorted set encoding");
726 }
727 return length;
728}
729
df26a0ae 730void zsetConvert(robj *zobj, int encoding) {
a669d5e9
PN
731 zset *zs;
732 zskiplistNode *node, *next;
733 robj *ele;
734 double score;
735
736 if (zobj->encoding == encoding) return;
737 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
738 unsigned char *zl = zobj->ptr;
739 unsigned char *eptr, *sptr;
740 unsigned char *vstr;
741 unsigned int vlen;
742 long long vlong;
743
744 if (encoding != REDIS_ENCODING_RAW)
745 redisPanic("Unknown target encoding");
746
747 zs = zmalloc(sizeof(*zs));
748 zs->dict = dictCreate(&zsetDictType,NULL);
749 zs->zsl = zslCreate();
750
751 eptr = ziplistIndex(zl,0);
752 redisAssert(eptr != NULL);
753 sptr = ziplistNext(zl,eptr);
754 redisAssert(sptr != NULL);
755
756 while (eptr != NULL) {
757 score = zzlGetScore(sptr);
758 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
759 if (vstr == NULL)
760 ele = createStringObjectFromLongLong(vlong);
761 else
762 ele = createStringObject((char*)vstr,vlen);
763
764 /* Has incremented refcount since it was just created. */
765 node = zslInsert(zs->zsl,score,ele);
766 redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
767 incrRefCount(ele); /* Added to dictionary. */
768 zzlNext(zl,&eptr,&sptr);
769 }
770
771 zfree(zobj->ptr);
772 zobj->ptr = zs;
773 zobj->encoding = REDIS_ENCODING_RAW;
774 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
775 unsigned char *zl = ziplistNew();
776
777 if (encoding != REDIS_ENCODING_ZIPLIST)
778 redisPanic("Unknown target encoding");
779
780 /* Approach similar to zslFree(), since we want to free the skiplist at
781 * the same time as creating the ziplist. */
782 zs = zobj->ptr;
783 dictRelease(zs->dict);
784 node = zs->zsl->header->level[0].forward;
785 zfree(zs->zsl->header);
786 zfree(zs->zsl);
787
788 /* Immediately store pointer to ziplist in object because it will
789 * change because of reallocations when pushing to the ziplist. */
790 zobj->ptr = zl;
791
792 while (node) {
793 ele = getDecodedObject(node->obj);
794 redisAssert(zzlInsertAt(zobj,ele,node->score,NULL) == REDIS_OK);
795 decrRefCount(ele);
796
797 next = node->level[0].forward;
798 zslFreeNode(node);
799 node = next;
800 }
801
802 zfree(zs);
803 zobj->encoding = REDIS_ENCODING_ZIPLIST;
804 } else {
805 redisPanic("Unknown sorted set encoding");
806 }
807}
808
e2641e09 809/*-----------------------------------------------------------------------------
810 * Sorted set commands
811 *----------------------------------------------------------------------------*/
812
69ef89f2 813/* This generic command implements both ZADD and ZINCRBY. */
3ca7532a 814void zaddGenericCommand(redisClient *c, int incr) {
21c5b508 815 static char *nanerr = "resulting score is not a number (NaN)";
3ca7532a
PN
816 robj *key = c->argv[1];
817 robj *ele;
21c5b508
PN
818 robj *zobj;
819 robj *curobj;
3ca7532a
PN
820 double score, curscore = 0.0;
821
822 if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
823 return;
21c5b508
PN
824
825 zobj = lookupKeyWrite(c->db,key);
826 if (zobj == NULL) {
a669d5e9
PN
827 if (server.zset_max_ziplist_entries == 0 ||
828 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
829 {
830 zobj = createZsetObject();
831 } else {
832 zobj = createZsetZiplistObject();
833 }
21c5b508 834 dbAdd(c->db,key,zobj);
e2641e09 835 } else {
21c5b508 836 if (zobj->type != REDIS_ZSET) {
e2641e09 837 addReply(c,shared.wrongtypeerr);
838 return;
839 }
840 }
e2641e09 841
21c5b508
PN
842 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
843 unsigned char *eptr;
844
3ca7532a
PN
845 /* Prefer non-encoded element when dealing with ziplists. */
846 ele = c->argv[3];
21c5b508
PN
847 if ((eptr = zzlFind(zobj,ele,&curscore)) != NULL) {
848 if (incr) {
849 score += curscore;
850 if (isnan(score)) {
851 addReplyError(c,nanerr);
852 /* Don't need to check if the sorted set is empty, because
853 * we know it has at least one element. */
854 return;
855 }
856 }
857
858 /* Remove and re-insert when score changed. */
859 if (score != curscore) {
860 redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
861 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
862
863 signalModifiedKey(c->db,key);
864 server.dirty++;
865 }
866
867 if (incr) /* ZINCRBY */
868 addReplyDouble(c,score);
869 else /* ZADD */
870 addReply(c,shared.czero);
871 } else {
a669d5e9
PN
872 /* Optimize: check if the element is too large or the list becomes
873 * too long *before* executing zzlInsert. */
21c5b508 874 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
bbfe232f 875 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
df26a0ae 876 zsetConvert(zobj,REDIS_ENCODING_RAW);
a669d5e9 877 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
df26a0ae 878 zsetConvert(zobj,REDIS_ENCODING_RAW);
69ef89f2 879
21c5b508
PN
880 signalModifiedKey(c->db,key);
881 server.dirty++;
69ef89f2 882
21c5b508
PN
883 if (incr) /* ZINCRBY */
884 addReplyDouble(c,score);
885 else /* ZADD */
886 addReply(c,shared.cone);
887 }
888 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
889 zset *zs = zobj->ptr;
890 zskiplistNode *znode;
e2641e09 891 dictEntry *de;
e2641e09 892
3ca7532a 893 ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
e2641e09 894 de = dictFind(zs->dict,ele);
21c5b508
PN
895 if (de != NULL) {
896 curobj = dictGetEntryKey(de);
897 curscore = *(double*)dictGetEntryVal(de);
898
899 if (incr) {
900 score += curscore;
901 if (isnan(score)) {
902 addReplyError(c,nanerr);
903 /* Don't need to check if the sorted set is empty, because
904 * we know it has at least one element. */
905 return;
906 }
907 }
908
909 /* Remove and re-insert when score changed. We can safely delete
910 * the key object from the skiplist, since the dictionary still has
911 * a reference to it. */
912 if (score != curscore) {
913 redisAssert(zslDelete(zs->zsl,curscore,curobj));
914 znode = zslInsert(zs->zsl,score,curobj);
915 incrRefCount(curobj); /* Re-inserted in skiplist. */
916 dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
917
918 signalModifiedKey(c->db,key);
919 server.dirty++;
920 }
921
922 if (incr) /* ZINCRBY */
923 addReplyDouble(c,score);
924 else /* ZADD */
925 addReply(c,shared.czero);
926 } else {
927 znode = zslInsert(zs->zsl,score,ele);
928 incrRefCount(ele); /* Inserted in skiplist. */
929 redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
930 incrRefCount(ele); /* Added to dictionary. */
931
932 signalModifiedKey(c->db,key);
e2641e09 933 server.dirty++;
21c5b508
PN
934
935 if (incr) /* ZINCRBY */
936 addReplyDouble(c,score);
937 else /* ZADD */
938 addReply(c,shared.cone);
e2641e09 939 }
21c5b508
PN
940 } else {
941 redisPanic("Unknown sorted set encoding");
e2641e09 942 }
943}
944
945void zaddCommand(redisClient *c) {
3ca7532a 946 zaddGenericCommand(c,0);
e2641e09 947}
948
949void zincrbyCommand(redisClient *c) {
3ca7532a 950 zaddGenericCommand(c,1);
e2641e09 951}
952
953void zremCommand(redisClient *c) {
0b10e104
PN
954 robj *key = c->argv[1];
955 robj *ele = c->argv[2];
956 robj *zobj;
957
958 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
959 checkType(c,zobj,REDIS_ZSET)) return;
960
961 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
962 unsigned char *eptr;
963
964 if ((eptr = zzlFind(zobj,ele,NULL)) != NULL) {
965 redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
bbfe232f 966 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
0b10e104
PN
967 } else {
968 addReply(c,shared.czero);
969 return;
970 }
971 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
972 zset *zs = zobj->ptr;
973 dictEntry *de;
974 double score;
975
976 de = dictFind(zs->dict,ele);
977 if (de != NULL) {
978 /* Delete from the skiplist */
979 score = *(double*)dictGetEntryVal(de);
980 redisAssert(zslDelete(zs->zsl,score,ele));
981
982 /* Delete from the hash table */
983 dictDelete(zs->dict,ele);
984 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
985 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
986 } else {
987 addReply(c,shared.czero);
988 return;
989 }
990 } else {
991 redisPanic("Unknown sorted set encoding");
e2641e09 992 }
e2641e09 993
0b10e104 994 signalModifiedKey(c->db,key);
e2641e09 995 server.dirty++;
996 addReply(c,shared.cone);
997}
998
999void zremrangebyscoreCommand(redisClient *c) {
4a14dbba
PN
1000 robj *key = c->argv[1];
1001 robj *zobj;
91504b6c 1002 zrangespec range;
4a14dbba 1003 unsigned long deleted;
e2641e09 1004
91504b6c 1005 /* Parse the range arguments. */
7236fdb2
PN
1006 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1007 addReplyError(c,"min or max is not a double");
1008 return;
1009 }
e2641e09 1010
4a14dbba
PN
1011 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1012 checkType(c,zobj,REDIS_ZSET)) return;
1013
1014 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1015 deleted = zzlDeleteRangeByScore(zobj,range);
1016 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1017 zset *zs = zobj->ptr;
1018 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1019 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1020 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1021 } else {
1022 redisPanic("Unknown sorted set encoding");
1023 }
e2641e09 1024
4a14dbba 1025 if (deleted) signalModifiedKey(c->db,key);
e2641e09 1026 server.dirty += deleted;
1027 addReplyLongLong(c,deleted);
1028}
1029
1030void zremrangebyrankCommand(redisClient *c) {
63b7b7fb
PN
1031 robj *key = c->argv[1];
1032 robj *zobj;
e2641e09 1033 long start;
1034 long end;
1035 int llen;
63b7b7fb 1036 unsigned long deleted;
e2641e09 1037
1038 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1039 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1040
63b7b7fb
PN
1041 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1042 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1043
63b7b7fb 1044 /* Sanitize indexes. */
df26a0ae 1045 llen = zsetLength(zobj);
e2641e09 1046 if (start < 0) start = llen+start;
1047 if (end < 0) end = llen+end;
1048 if (start < 0) start = 0;
e2641e09 1049
d0a4e24e
PN
1050 /* Invariant: start >= 0, so this test will be true when end < 0.
1051 * The range is empty when start > end or start >= length. */
e2641e09 1052 if (start > end || start >= llen) {
1053 addReply(c,shared.czero);
1054 return;
1055 }
1056 if (end >= llen) end = llen-1;
1057
63b7b7fb
PN
1058 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1059 /* Correct for 1-based rank. */
1060 deleted = zzlDeleteRangeByRank(zobj,start+1,end+1);
1061 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1062 zset *zs = zobj->ptr;
1063
1064 /* Correct for 1-based rank. */
1065 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1066 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1067 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1068 } else {
1069 redisPanic("Unknown sorted set encoding");
1070 }
1071
1072 if (deleted) signalModifiedKey(c->db,key);
e2641e09 1073 server.dirty += deleted;
63b7b7fb 1074 addReplyLongLong(c,deleted);
e2641e09 1075}
1076
1077typedef struct {
56ce42fa
PN
1078 robj *subject;
1079 int type; /* Set, sorted set */
1080 int encoding;
e2641e09 1081 double weight;
56ce42fa
PN
1082
1083 union {
1084 /* Set iterators. */
1085 union _iterset {
1086 struct {
1087 intset *is;
1088 int ii;
1089 } is;
1090 struct {
1091 dict *dict;
1092 dictIterator *di;
1093 dictEntry *de;
1094 } ht;
1095 } set;
1096
1097 /* Sorted set iterators. */
1098 union _iterzset {
1099 struct {
1100 unsigned char *zl;
1101 unsigned char *eptr, *sptr;
1102 } zl;
1103 struct {
1104 zset *zs;
1105 zskiplistNode *node;
1106 } sl;
1107 } zset;
1108 } iter;
e2641e09 1109} zsetopsrc;
1110
56ce42fa
PN
1111
1112/* Use dirty flags for pointers that need to be cleaned up in the next
1113 * iteration over the zsetopval. The dirty flag for the long long value is
1114 * special, since long long values don't need cleanup. Instead, it means that
1115 * we already checked that "ell" holds a long long, or tried to convert another
1116 * representation into a long long value. When this was successful,
1117 * OPVAL_VALID_LL is set as well. */
1118#define OPVAL_DIRTY_ROBJ 1
1119#define OPVAL_DIRTY_LL 2
1120#define OPVAL_VALID_LL 4
1121
1122/* Store value retrieved from the iterator. */
1123typedef struct {
1124 int flags;
1125 unsigned char _buf[32]; /* Private buffer. */
1126 robj *ele;
1127 unsigned char *estr;
1128 unsigned int elen;
1129 long long ell;
1130 double score;
1131} zsetopval;
1132
1133typedef union _iterset iterset;
1134typedef union _iterzset iterzset;
1135
1136void zuiInitIterator(zsetopsrc *op) {
1137 if (op->subject == NULL)
1138 return;
1139
1140 if (op->type == REDIS_SET) {
1141 iterset *it = &op->iter.set;
1142 if (op->encoding == REDIS_ENCODING_INTSET) {
1143 it->is.is = op->subject->ptr;
1144 it->is.ii = 0;
1145 } else if (op->encoding == REDIS_ENCODING_HT) {
1146 it->ht.dict = op->subject->ptr;
1147 it->ht.di = dictGetIterator(op->subject->ptr);
1148 it->ht.de = dictNext(it->ht.di);
1149 } else {
1150 redisPanic("Unknown set encoding");
1151 }
1152 } else if (op->type == REDIS_ZSET) {
1153 iterzset *it = &op->iter.zset;
1154 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1155 it->zl.zl = op->subject->ptr;
1156 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1157 if (it->zl.eptr != NULL) {
1158 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1159 redisAssert(it->zl.sptr != NULL);
1160 }
1161 } else if (op->encoding == REDIS_ENCODING_RAW) {
1162 it->sl.zs = op->subject->ptr;
1163 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1164 } else {
1165 redisPanic("Unknown sorted set encoding");
1166 }
1167 } else {
1168 redisPanic("Unsupported type");
1169 }
1170}
1171
1172void zuiClearIterator(zsetopsrc *op) {
1173 if (op->subject == NULL)
1174 return;
1175
1176 if (op->type == REDIS_SET) {
1177 iterset *it = &op->iter.set;
1178 if (op->encoding == REDIS_ENCODING_INTSET) {
1179 REDIS_NOTUSED(it); /* skip */
1180 } else if (op->encoding == REDIS_ENCODING_HT) {
1181 dictReleaseIterator(it->ht.di);
1182 } else {
1183 redisPanic("Unknown set encoding");
1184 }
1185 } else if (op->type == REDIS_ZSET) {
1186 iterzset *it = &op->iter.zset;
1187 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1188 REDIS_NOTUSED(it); /* skip */
1189 } else if (op->encoding == REDIS_ENCODING_RAW) {
1190 REDIS_NOTUSED(it); /* skip */
1191 } else {
1192 redisPanic("Unknown sorted set encoding");
1193 }
1194 } else {
1195 redisPanic("Unsupported type");
1196 }
1197}
1198
1199int zuiLength(zsetopsrc *op) {
1200 if (op->subject == NULL)
1201 return 0;
1202
1203 if (op->type == REDIS_SET) {
1204 iterset *it = &op->iter.set;
1205 if (op->encoding == REDIS_ENCODING_INTSET) {
1206 return intsetLen(it->is.is);
1207 } else if (op->encoding == REDIS_ENCODING_HT) {
1208 return dictSize(it->ht.dict);
1209 } else {
1210 redisPanic("Unknown set encoding");
1211 }
1212 } else if (op->type == REDIS_ZSET) {
1213 iterzset *it = &op->iter.zset;
1214 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1215 return zzlLength(it->zl.zl);
1216 } else if (op->encoding == REDIS_ENCODING_RAW) {
1217 return it->sl.zs->zsl->length;
1218 } else {
1219 redisPanic("Unknown sorted set encoding");
1220 }
1221 } else {
1222 redisPanic("Unsupported type");
1223 }
1224}
1225
1226/* Check if the current value is valid. If so, store it in the passed structure
1227 * and move to the next element. If not valid, this means we have reached the
1228 * end of the structure and can abort. */
1229int zuiNext(zsetopsrc *op, zsetopval *val) {
1230 if (op->subject == NULL)
1231 return 0;
1232
1233 if (val->flags & OPVAL_DIRTY_ROBJ)
1234 decrRefCount(val->ele);
1235
1236 bzero(val,sizeof(zsetopval));
1237
1238 if (op->type == REDIS_SET) {
1239 iterset *it = &op->iter.set;
1240 if (op->encoding == REDIS_ENCODING_INTSET) {
1241 if (!intsetGet(it->is.is,it->is.ii,&val->ell))
1242 return 0;
1243 val->score = 1.0;
1244
1245 /* Move to next element. */
1246 it->is.ii++;
1247 } else if (op->encoding == REDIS_ENCODING_HT) {
1248 if (it->ht.de == NULL)
1249 return 0;
1250 val->ele = dictGetEntryKey(it->ht.de);
1251 val->score = 1.0;
1252
1253 /* Move to next element. */
1254 it->ht.de = dictNext(it->ht.di);
1255 } else {
1256 redisPanic("Unknown set encoding");
1257 }
1258 } else if (op->type == REDIS_ZSET) {
1259 iterzset *it = &op->iter.zset;
1260 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1261 /* No need to check both, but better be explicit. */
1262 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1263 return 0;
1264 redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1265 val->score = zzlGetScore(it->zl.sptr);
1266
1267 /* Move to next element. */
1268 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
1269 } else if (op->encoding == REDIS_ENCODING_RAW) {
1270 if (it->sl.node == NULL)
1271 return 0;
1272 val->ele = it->sl.node->obj;
1273 val->score = it->sl.node->score;
1274
1275 /* Move to next element. */
1276 it->sl.node = it->sl.node->level[0].forward;
1277 } else {
1278 redisPanic("Unknown sorted set encoding");
1279 }
1280 } else {
1281 redisPanic("Unsupported type");
1282 }
1283 return 1;
1284}
1285
1286int zuiLongLongFromValue(zsetopval *val) {
1287 if (!(val->flags & OPVAL_DIRTY_LL)) {
1288 val->flags |= OPVAL_DIRTY_LL;
1289
1290 if (val->ele != NULL) {
1291 if (val->ele->encoding == REDIS_ENCODING_INT) {
1292 val->ell = (long)val->ele->ptr;
1293 val->flags |= OPVAL_VALID_LL;
1294 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1295 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1296 val->flags |= OPVAL_VALID_LL;
1297 } else {
1298 redisPanic("Unsupported element encoding");
1299 }
1300 } else if (val->estr != NULL) {
1301 if (string2ll((char*)val->estr,val->elen,&val->ell))
1302 val->flags |= OPVAL_VALID_LL;
1303 } else {
1304 /* The long long was already set, flag as valid. */
1305 val->flags |= OPVAL_VALID_LL;
1306 }
1307 }
1308 return val->flags & OPVAL_VALID_LL;
1309}
1310
1311robj *zuiObjectFromValue(zsetopval *val) {
1312 if (val->ele == NULL) {
1313 if (val->estr != NULL) {
1314 val->ele = createStringObject((char*)val->estr,val->elen);
1315 } else {
1316 val->ele = createStringObjectFromLongLong(val->ell);
1317 }
1318 val->flags |= OPVAL_DIRTY_ROBJ;
1319 }
1320 return val->ele;
1321}
1322
1323int zuiBufferFromValue(zsetopval *val) {
1324 if (val->estr == NULL) {
1325 if (val->ele != NULL) {
1326 if (val->ele->encoding == REDIS_ENCODING_INT) {
1327 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1328 val->estr = val->_buf;
1329 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1330 val->elen = sdslen(val->ele->ptr);
1331 val->estr = val->ele->ptr;
1332 } else {
1333 redisPanic("Unsupported element encoding");
1334 }
1335 } else {
1336 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1337 val->estr = val->_buf;
1338 }
1339 }
1340 return 1;
1341}
1342
1343/* Find value pointed to by val in the source pointer to by op. When found,
1344 * return 1 and store its score in target. Return 0 otherwise. */
1345int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1346 if (op->subject == NULL)
1347 return 0;
1348
1349 if (op->type == REDIS_SET) {
1350 iterset *it = &op->iter.set;
1351
1352 if (op->encoding == REDIS_ENCODING_INTSET) {
1353 if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1354 *score = 1.0;
1355 return 1;
1356 } else {
1357 return 0;
1358 }
1359 } else if (op->encoding == REDIS_ENCODING_HT) {
1360 zuiObjectFromValue(val);
1361 if (dictFind(it->ht.dict,val->ele) != NULL) {
1362 *score = 1.0;
1363 return 1;
1364 } else {
1365 return 0;
1366 }
1367 } else {
1368 redisPanic("Unknown set encoding");
1369 }
1370 } else if (op->type == REDIS_ZSET) {
1371 iterzset *it = &op->iter.zset;
1372 zuiObjectFromValue(val);
1373
1374 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1375 if (zzlFind(op->subject,val->ele,score) != NULL) {
1376 /* Score is already set by zzlFind. */
1377 return 1;
1378 } else {
1379 return 0;
1380 }
1381 } else if (op->encoding == REDIS_ENCODING_RAW) {
1382 dictEntry *de;
1383 if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
1384 *score = *(double*)dictGetEntryVal(de);
1385 return 1;
1386 } else {
1387 return 0;
1388 }
1389 } else {
1390 redisPanic("Unknown sorted set encoding");
1391 }
1392 } else {
1393 redisPanic("Unsupported type");
1394 }
1395}
1396
1397int zuiCompareByCardinality(const void *s1, const void *s2) {
1398 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
e2641e09 1399}
1400
1401#define REDIS_AGGR_SUM 1
1402#define REDIS_AGGR_MIN 2
1403#define REDIS_AGGR_MAX 3
1404#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
1405
1406inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1407 if (aggregate == REDIS_AGGR_SUM) {
1408 *target = *target + val;
d9e28bcf
PN
1409 /* The result of adding two doubles is NaN when one variable
1410 * is +inf and the other is -inf. When these numbers are added,
1411 * we maintain the convention of the result being 0.0. */
1412 if (isnan(*target)) *target = 0.0;
e2641e09 1413 } else if (aggregate == REDIS_AGGR_MIN) {
1414 *target = val < *target ? val : *target;
1415 } else if (aggregate == REDIS_AGGR_MAX) {
1416 *target = val > *target ? val : *target;
1417 } else {
1418 /* safety net */
1419 redisPanic("Unknown ZUNION/INTER aggregate type");
1420 }
1421}
1422
1423void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1424 int i, j, setnum;
1425 int aggregate = REDIS_AGGR_SUM;
1426 zsetopsrc *src;
56ce42fa
PN
1427 zsetopval zval;
1428 robj *tmp;
255eebe2 1429 unsigned int maxelelen = 0;
e2641e09 1430 robj *dstobj;
1431 zset *dstzset;
69ef89f2 1432 zskiplistNode *znode;
8c1420ff 1433 int touched = 0;
e2641e09 1434
1435 /* expect setnum input keys to be given */
1436 setnum = atoi(c->argv[2]->ptr);
1437 if (setnum < 1) {
3ab20376
PN
1438 addReplyError(c,
1439 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
e2641e09 1440 return;
1441 }
1442
1443 /* test if the expected number of keys would overflow */
1444 if (3+setnum > c->argc) {
1445 addReply(c,shared.syntaxerr);
1446 return;
1447 }
1448
1449 /* read keys to be used for input */
56ce42fa 1450 src = zcalloc(sizeof(zsetopsrc) * setnum);
e2641e09 1451 for (i = 0, j = 3; i < setnum; i++, j++) {
1452 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
56ce42fa
PN
1453 if (obj != NULL) {
1454 if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
e2641e09 1455 zfree(src);
1456 addReply(c,shared.wrongtypeerr);
1457 return;
1458 }
56ce42fa
PN
1459
1460 src[i].subject = obj;
1461 src[i].type = obj->type;
1462 src[i].encoding = obj->encoding;
1463 } else {
1464 src[i].subject = NULL;
e2641e09 1465 }
1466
56ce42fa 1467 /* Default all weights to 1. */
e2641e09 1468 src[i].weight = 1.0;
1469 }
1470
1471 /* parse optional extra arguments */
1472 if (j < c->argc) {
1473 int remaining = c->argc - j;
1474
1475 while (remaining) {
1476 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1477 j++; remaining--;
1478 for (i = 0; i < setnum; i++, j++, remaining--) {
673e1fb7
PN
1479 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1480 "weight value is not a double") != REDIS_OK)
1481 {
1482 zfree(src);
e2641e09 1483 return;
673e1fb7 1484 }
e2641e09 1485 }
1486 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1487 j++; remaining--;
1488 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1489 aggregate = REDIS_AGGR_SUM;
1490 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1491 aggregate = REDIS_AGGR_MIN;
1492 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1493 aggregate = REDIS_AGGR_MAX;
1494 } else {
1495 zfree(src);
1496 addReply(c,shared.syntaxerr);
1497 return;
1498 }
1499 j++; remaining--;
1500 } else {
1501 zfree(src);
1502 addReply(c,shared.syntaxerr);
1503 return;
1504 }
1505 }
1506 }
1507
56ce42fa
PN
1508 for (i = 0; i < setnum; i++)
1509 zuiInitIterator(&src[i]);
1510
e2641e09 1511 /* sort sets from the smallest to largest, this will improve our
1512 * algorithm's performance */
56ce42fa 1513 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
e2641e09 1514
1515 dstobj = createZsetObject();
1516 dstzset = dstobj->ptr;
1517
1518 if (op == REDIS_OP_INTER) {
56ce42fa
PN
1519 /* Skip everything if the smallest input is empty. */
1520 if (zuiLength(&src[0]) > 0) {
1521 /* Precondition: as src[0] is non-empty and the inputs are ordered
1522 * by size, all src[i > 0] are non-empty too. */
1523 while (zuiNext(&src[0],&zval)) {
d433ebc6 1524 double score, value;
e2641e09 1525
56ce42fa 1526 score = src[0].weight * zval.score;
e2641e09 1527 for (j = 1; j < setnum; j++) {
56ce42fa
PN
1528 if (zuiFind(&src[j],&zval,&value)) {
1529 value *= src[j].weight;
d433ebc6 1530 zunionInterAggregate(&score,value,aggregate);
e2641e09 1531 } else {
1532 break;
1533 }
1534 }
1535
56ce42fa 1536 /* Only continue when present in every input. */
d433ebc6 1537 if (j == setnum) {
56ce42fa
PN
1538 tmp = zuiObjectFromValue(&zval);
1539 znode = zslInsert(dstzset->zsl,score,tmp);
1540 incrRefCount(tmp); /* added to skiplist */
1541 dictAdd(dstzset->dict,tmp,&znode->score);
1542 incrRefCount(tmp); /* added to dictionary */
255eebe2
PN
1543
1544 if (tmp->encoding == REDIS_ENCODING_RAW)
1545 if (sdslen(tmp->ptr) > maxelelen)
1546 maxelelen = sdslen(tmp->ptr);
e2641e09 1547 }
1548 }
e2641e09 1549 }
1550 } else if (op == REDIS_OP_UNION) {
1551 for (i = 0; i < setnum; i++) {
56ce42fa
PN
1552 if (zuiLength(&src[0]) == 0)
1553 continue;
e2641e09 1554
56ce42fa 1555 while (zuiNext(&src[i],&zval)) {
d433ebc6
PN
1556 double score, value;
1557
56ce42fa
PN
1558 /* Skip key when already processed */
1559 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
d433ebc6 1560 continue;
e2641e09 1561
56ce42fa
PN
1562 /* Initialize score */
1563 score = src[i].weight * zval.score;
e2641e09 1564
56ce42fa
PN
1565 /* Because the inputs are sorted by size, it's only possible
1566 * for sets at larger indices to hold this element. */
e2641e09 1567 for (j = (i+1); j < setnum; j++) {
56ce42fa
PN
1568 if (zuiFind(&src[j],&zval,&value)) {
1569 value *= src[j].weight;
d433ebc6 1570 zunionInterAggregate(&score,value,aggregate);
e2641e09 1571 }
1572 }
1573
56ce42fa
PN
1574 tmp = zuiObjectFromValue(&zval);
1575 znode = zslInsert(dstzset->zsl,score,tmp);
1576 incrRefCount(zval.ele); /* added to skiplist */
1577 dictAdd(dstzset->dict,tmp,&znode->score);
1578 incrRefCount(zval.ele); /* added to dictionary */
255eebe2
PN
1579
1580 if (tmp->encoding == REDIS_ENCODING_RAW)
1581 if (sdslen(tmp->ptr) > maxelelen)
1582 maxelelen = sdslen(tmp->ptr);
e2641e09 1583 }
e2641e09 1584 }
1585 } else {
56ce42fa 1586 redisPanic("Unknown operator");
e2641e09 1587 }
1588
56ce42fa
PN
1589 for (i = 0; i < setnum; i++)
1590 zuiClearIterator(&src[i]);
1591
8c1420ff 1592 if (dbDelete(c->db,dstkey)) {
cea8c5cd 1593 signalModifiedKey(c->db,dstkey);
8c1420ff 1594 touched = 1;
1595 server.dirty++;
1596 }
e2641e09 1597 if (dstzset->zsl->length) {
255eebe2
PN
1598 /* Convert to ziplist when in limits. */
1599 if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
1600 maxelelen <= server.zset_max_ziplist_value)
df26a0ae 1601 zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
255eebe2 1602
e2641e09 1603 dbAdd(c->db,dstkey,dstobj);
df26a0ae 1604 addReplyLongLong(c,zsetLength(dstobj));
cea8c5cd 1605 if (!touched) signalModifiedKey(c->db,dstkey);
cbf7e107 1606 server.dirty++;
e2641e09 1607 } else {
1608 decrRefCount(dstobj);
255eebe2 1609 addReply(c,shared.czero);
e2641e09 1610 }
1611 zfree(src);
1612}
1613
1614void zunionstoreCommand(redisClient *c) {
1615 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1616}
1617
1618void zinterstoreCommand(redisClient *c) {
1619 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1620}
1621
1622void zrangeGenericCommand(redisClient *c, int reverse) {
5d1b4fb6
PN
1623 robj *key = c->argv[1];
1624 robj *zobj;
1625 int withscores = 0;
e2641e09 1626 long start;
1627 long end;
e2641e09 1628 int llen;
5d1b4fb6 1629 int rangelen;
e2641e09 1630
1631 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1632 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1633
1634 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1635 withscores = 1;
1636 } else if (c->argc >= 5) {
1637 addReply(c,shared.syntaxerr);
1638 return;
1639 }
1640
5d1b4fb6
PN
1641 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1642 || checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1643
5d1b4fb6 1644 /* Sanitize indexes. */
df26a0ae 1645 llen = zsetLength(zobj);
e2641e09 1646 if (start < 0) start = llen+start;
1647 if (end < 0) end = llen+end;
1648 if (start < 0) start = 0;
e2641e09 1649
d0a4e24e
PN
1650 /* Invariant: start >= 0, so this test will be true when end < 0.
1651 * The range is empty when start > end or start >= length. */
e2641e09 1652 if (start > end || start >= llen) {
e2641e09 1653 addReply(c,shared.emptymultibulk);
1654 return;
1655 }
1656 if (end >= llen) end = llen-1;
1657 rangelen = (end-start)+1;
1658
e2641e09 1659 /* Return the result in form of a multi-bulk reply */
5d1b4fb6
PN
1660 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1661
1662 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1663 unsigned char *zl = zobj->ptr;
1664 unsigned char *eptr, *sptr;
1665 unsigned char *vstr;
1666 unsigned int vlen;
1667 long long vlong;
1668
1669 if (reverse)
1670 eptr = ziplistIndex(zl,-2-(2*start));
1671 else
1672 eptr = ziplistIndex(zl,2*start);
1673
4c5f0966
PN
1674 redisAssert(eptr != NULL);
1675 sptr = ziplistNext(zl,eptr);
1676
5d1b4fb6 1677 while (rangelen--) {
4c5f0966 1678 redisAssert(eptr != NULL && sptr != NULL);
5d1b4fb6
PN
1679 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1680 if (vstr == NULL)
1681 addReplyBulkLongLong(c,vlong);
1682 else
1683 addReplyBulkCBuffer(c,vstr,vlen);
1684
4c5f0966 1685 if (withscores)
5d1b4fb6 1686 addReplyDouble(c,zzlGetScore(sptr));
5d1b4fb6 1687
4c5f0966
PN
1688 if (reverse)
1689 zzlPrev(zl,&eptr,&sptr);
1690 else
1691 zzlNext(zl,&eptr,&sptr);
5d1b4fb6
PN
1692 }
1693
1694 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1695 zset *zs = zobj->ptr;
1696 zskiplist *zsl = zs->zsl;
1697 zskiplistNode *ln;
1698 robj *ele;
1699
1700 /* Check if starting point is trivial, before doing log(N) lookup. */
1701 if (reverse) {
1702 ln = zsl->tail;
1703 if (start > 0)
1704 ln = zslGetElementByRank(zsl,llen-start);
1705 } else {
1706 ln = zsl->header->level[0].forward;
1707 if (start > 0)
1708 ln = zslGetElementByRank(zsl,start+1);
1709 }
1710
1711 while(rangelen--) {
1712 redisAssert(ln != NULL);
1713 ele = ln->obj;
1714 addReplyBulk(c,ele);
1715 if (withscores)
1716 addReplyDouble(c,ln->score);
1717 ln = reverse ? ln->backward : ln->level[0].forward;
1718 }
1719 } else {
1720 redisPanic("Unknown sorted set encoding");
e2641e09 1721 }
1722}
1723
1724void zrangeCommand(redisClient *c) {
1725 zrangeGenericCommand(c,0);
1726}
1727
1728void zrevrangeCommand(redisClient *c) {
1729 zrangeGenericCommand(c,1);
1730}
1731
25bb8a44
PN
1732/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
1733 * If "justcount", only the number of elements in the range is returned. */
1734void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
1735 zrangespec range;
aff255c8
PN
1736 robj *key = c->argv[1];
1737 robj *emptyreply, *zobj;
e2641e09 1738 int offset = 0, limit = -1;
1739 int withscores = 0;
25bb8a44
PN
1740 unsigned long rangelen = 0;
1741 void *replylen = NULL;
22b9bf15 1742 int minidx, maxidx;
e2641e09 1743
25bb8a44 1744 /* Parse the range arguments. */
22b9bf15
PN
1745 if (reverse) {
1746 /* Range is given as [max,min] */
1747 maxidx = 2; minidx = 3;
1748 } else {
1749 /* Range is given as [min,max] */
1750 minidx = 2; maxidx = 3;
1751 }
1752
1753 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
7236fdb2
PN
1754 addReplyError(c,"min or max is not a double");
1755 return;
1756 }
25bb8a44
PN
1757
1758 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1759 * 4 arguments, so we'll never enter the following code path. */
1760 if (c->argc > 4) {
1761 int remaining = c->argc - 4;
1762 int pos = 4;
1763
1764 while (remaining) {
1765 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1766 pos++; remaining--;
1767 withscores = 1;
1768 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1769 offset = atoi(c->argv[pos+1]->ptr);
1770 limit = atoi(c->argv[pos+2]->ptr);
1771 pos += 3; remaining -= 3;
1772 } else {
1773 addReply(c,shared.syntaxerr);
1774 return;
1775 }
1776 }
e2641e09 1777 }
25bb8a44
PN
1778
1779 /* Ok, lookup the key and get the range */
1780 emptyreply = justcount ? shared.czero : shared.emptymultibulk;
aff255c8
PN
1781 if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL ||
1782 checkType(c,zobj,REDIS_ZSET)) return;
25bb8a44 1783
aff255c8
PN
1784 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1785 unsigned char *zl = zobj->ptr;
1786 unsigned char *eptr, *sptr;
1787 unsigned char *vstr;
1788 unsigned int vlen;
1789 long long vlong;
1790 double score;
e2641e09 1791
aff255c8
PN
1792 /* If reversed, get the last node in range as starting point. */
1793 if (reverse)
1794 eptr = zzlLastInRange(zobj,range);
1795 else
1796 eptr = zzlFirstInRange(zobj,range);
e2641e09 1797
aff255c8
PN
1798 /* No "first" element in the specified interval. */
1799 if (eptr == NULL) {
1800 addReply(c,emptyreply);
1801 return;
1802 }
e2641e09 1803
aff255c8
PN
1804 /* Get score pointer for the first element. */
1805 redisAssert(eptr != NULL);
1806 sptr = ziplistNext(zl,eptr);
e2641e09 1807
aff255c8
PN
1808 /* We don't know in advance how many matching elements there are in the
1809 * list, so we push this object that will represent the multi-bulk
1810 * length in the output buffer, and will "fix" it later */
1811 if (!justcount)
1812 replylen = addDeferredMultiBulkLength(c);
1813
1814 /* If there is an offset, just traverse the number of elements without
1815 * checking the score because that is done in the next loop. */
1816 while (eptr && offset--)
1817 if (reverse)
1818 zzlPrev(zl,&eptr,&sptr);
1819 else
1820 zzlNext(zl,&eptr,&sptr);
1821
1822 while (eptr && limit--) {
1823 score = zzlGetScore(sptr);
1824
1825 /* Abort when the node is no longer in range. */
1826 if (reverse) {
1827 if (!zslValueGteMin(score,&range)) break;
1828 } else {
1829 if (!zslValueLteMax(score,&range)) break;
1830 }
1831
1832 /* Do our magic */
1833 rangelen++;
1834 if (!justcount) {
1835 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1836 if (vstr == NULL)
1837 addReplyBulkLongLong(c,vlong);
1838 else
1839 addReplyBulkCBuffer(c,vstr,vlen);
1840
1841 if (withscores)
1842 addReplyDouble(c,score);
1843 }
1844
1845 /* Move to next node */
1846 if (reverse)
1847 zzlPrev(zl,&eptr,&sptr);
1848 else
1849 zzlNext(zl,&eptr,&sptr);
e2641e09 1850 }
aff255c8
PN
1851 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1852 zset *zs = zobj->ptr;
1853 zskiplist *zsl = zs->zsl;
1854 zskiplistNode *ln;
25bb8a44 1855
aff255c8
PN
1856 /* If reversed, get the last node in range as starting point. */
1857 if (reverse)
1858 ln = zslLastInRange(zsl,range);
1859 else
1860 ln = zslFirstInRange(zsl,range);
1861
1862 /* No "first" element in the specified interval. */
1863 if (ln == NULL) {
1864 addReply(c,emptyreply);
1865 return;
25bb8a44
PN
1866 }
1867
aff255c8
PN
1868 /* We don't know in advance how many matching elements there are in the
1869 * list, so we push this object that will represent the multi-bulk
1870 * length in the output buffer, and will "fix" it later */
1871 if (!justcount)
1872 replylen = addDeferredMultiBulkLength(c);
1873
1874 /* If there is an offset, just traverse the number of elements without
1875 * checking the score because that is done in the next loop. */
1876 while (ln && offset--)
1877 ln = reverse ? ln->backward : ln->level[0].forward;
1878
1879 while (ln && limit--) {
1880 /* Abort when the node is no longer in range. */
1881 if (reverse) {
1882 if (!zslValueGteMin(ln->score,&range)) break;
1883 } else {
1884 if (!zslValueLteMax(ln->score,&range)) break;
1885 }
1886
1887 /* Do our magic */
1888 rangelen++;
1889 if (!justcount) {
1890 addReplyBulk(c,ln->obj);
1891 if (withscores)
1892 addReplyDouble(c,ln->score);
1893 }
1894
1895 /* Move to next node */
1896 ln = reverse ? ln->backward : ln->level[0].forward;
1897 }
1898 } else {
1899 redisPanic("Unknown sorted set encoding");
25bb8a44
PN
1900 }
1901
1902 if (justcount) {
1903 addReplyLongLong(c,(long)rangelen);
1904 } else {
aff255c8
PN
1905 if (withscores) rangelen *= 2;
1906 setDeferredMultiBulkLength(c,replylen,rangelen);
e2641e09 1907 }
1908}
1909
1910void zrangebyscoreCommand(redisClient *c) {
25bb8a44
PN
1911 genericZrangebyscoreCommand(c,0,0);
1912}
1913
1914void zrevrangebyscoreCommand(redisClient *c) {
1915 genericZrangebyscoreCommand(c,1,0);
e2641e09 1916}
1917
1918void zcountCommand(redisClient *c) {
25bb8a44 1919 genericZrangebyscoreCommand(c,0,1);
e2641e09 1920}
1921
1922void zcardCommand(redisClient *c) {
d1c920c5
PN
1923 robj *key = c->argv[1];
1924 robj *zobj;
e2641e09 1925
d1c920c5
PN
1926 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
1927 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1928
df26a0ae 1929 addReplyLongLong(c,zsetLength(zobj));
e2641e09 1930}
1931
1932void zscoreCommand(redisClient *c) {
d1c920c5
PN
1933 robj *key = c->argv[1];
1934 robj *zobj;
1935 double score;
e2641e09 1936
d1c920c5
PN
1937 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1938 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1939
d1c920c5
PN
1940 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1941 if (zzlFind(zobj,c->argv[2],&score) != NULL)
1942 addReplyDouble(c,score);
1943 else
1944 addReply(c,shared.nullbulk);
1945 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1946 zset *zs = zobj->ptr;
1947 dictEntry *de;
e2641e09 1948
d1c920c5
PN
1949 c->argv[2] = tryObjectEncoding(c->argv[2]);
1950 de = dictFind(zs->dict,c->argv[2]);
1951 if (de != NULL) {
1952 score = *(double*)dictGetEntryVal(de);
1953 addReplyDouble(c,score);
1954 } else {
1955 addReply(c,shared.nullbulk);
1956 }
1957 } else {
1958 redisPanic("Unknown sorted set encoding");
e2641e09 1959 }
1960}
1961
1962void zrankGenericCommand(redisClient *c, int reverse) {
d1c920c5
PN
1963 robj *key = c->argv[1];
1964 robj *ele = c->argv[2];
1965 robj *zobj;
1966 unsigned long llen;
e2641e09 1967 unsigned long rank;
e2641e09 1968
d1c920c5
PN
1969 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1970 checkType(c,zobj,REDIS_ZSET)) return;
df26a0ae 1971 llen = zsetLength(zobj);
e2641e09 1972
d1c920c5
PN
1973 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
1974 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1975 unsigned char *zl = zobj->ptr;
1976 unsigned char *eptr, *sptr;
e2641e09 1977
d1c920c5
PN
1978 eptr = ziplistIndex(zl,0);
1979 redisAssert(eptr != NULL);
1980 sptr = ziplistNext(zl,eptr);
1981 redisAssert(sptr != NULL);
1982
1983 rank = 1;
1984 while(eptr != NULL) {
1985 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
1986 break;
1987 rank++;
1988 zzlNext(zl,&eptr,&sptr);
1989 }
1990
1991 if (eptr != NULL) {
1992 if (reverse)
1993 addReplyLongLong(c,llen-rank);
1994 else
1995 addReplyLongLong(c,rank-1);
e2641e09 1996 } else {
d1c920c5
PN
1997 addReply(c,shared.nullbulk);
1998 }
1999 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
2000 zset *zs = zobj->ptr;
2001 zskiplist *zsl = zs->zsl;
2002 dictEntry *de;
2003 double score;
2004
2005 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
2006 de = dictFind(zs->dict,ele);
2007 if (de != NULL) {
2008 score = *(double*)dictGetEntryVal(de);
2009 rank = zslGetRank(zsl,score,ele);
2010 redisAssert(rank); /* Existing elements always have a rank. */
2011 if (reverse)
2012 addReplyLongLong(c,llen-rank);
2013 else
2014 addReplyLongLong(c,rank-1);
2015 } else {
2016 addReply(c,shared.nullbulk);
e2641e09 2017 }
2018 } else {
d1c920c5 2019 redisPanic("Unknown sorted set encoding");
e2641e09 2020 }
2021}
2022
2023void zrankCommand(redisClient *c) {
2024 zrankGenericCommand(c, 0);
2025}
2026
2027void zrevrankCommand(redisClient *c) {
2028 zrankGenericCommand(c, 1);
2029}