]> git.saurik.com Git - redis.git/blame - src/t_zset.c
redis-check-dump now is RDB version 6 ready.
[redis.git] / src / t_zset.c
CommitLineData
e2641e09 1#include "redis.h"
2
3#include <math.h>
4
5/*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9/* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17/* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
d8bd12f9 20 * a) this implementation allows for repeated scores.
e2641e09 21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
2159782b 27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
e2641e09 28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31}
32
33zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
2159782b
PN
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
e2641e09 44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48}
49
50void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
e2641e09 52 zfree(node);
53}
54
55void zslFree(zskiplist *zsl) {
2159782b 56 zskiplistNode *node = zsl->header->level[0].forward, *next;
e2641e09 57
e2641e09 58 zfree(zsl->header);
59 while(node) {
2159782b 60 next = node->level[0].forward;
e2641e09 61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65}
66
7faa1f07 67/* Returns a random level for the new skiplist node we are going to create.
68 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
69 * (both inclusive), with a powerlaw-alike distribution where higher
70 * levels are less likely to be returned. */
e2641e09 71int zslRandomLevel(void) {
72 int level = 1;
73 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
74 level += 1;
75 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
76}
77
69ef89f2 78zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
e2641e09 79 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
80 unsigned int rank[ZSKIPLIST_MAXLEVEL];
81 int i, level;
82
a244a13b 83 redisAssert(!isnan(score));
e2641e09 84 x = zsl->header;
85 for (i = zsl->level-1; i >= 0; i--) {
86 /* store rank that is crossed to reach the insert position */
87 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
2159782b
PN
88 while (x->level[i].forward &&
89 (x->level[i].forward->score < score ||
90 (x->level[i].forward->score == score &&
91 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
92 rank[i] += x->level[i].span;
93 x = x->level[i].forward;
e2641e09 94 }
95 update[i] = x;
96 }
97 /* we assume the key is not already inside, since we allow duplicated
98 * scores, and the re-insertion of score and redis object should never
99 * happpen since the caller of zslInsert() should test in the hash table
100 * if the element is already inside or not. */
101 level = zslRandomLevel();
102 if (level > zsl->level) {
103 for (i = zsl->level; i < level; i++) {
104 rank[i] = 0;
105 update[i] = zsl->header;
2159782b 106 update[i]->level[i].span = zsl->length;
e2641e09 107 }
108 zsl->level = level;
109 }
110 x = zslCreateNode(level,score,obj);
111 for (i = 0; i < level; i++) {
2159782b
PN
112 x->level[i].forward = update[i]->level[i].forward;
113 update[i]->level[i].forward = x;
e2641e09 114
115 /* update span covered by update[i] as x is inserted here */
2159782b
PN
116 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
117 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
e2641e09 118 }
119
120 /* increment span for untouched levels */
121 for (i = level; i < zsl->level; i++) {
2159782b 122 update[i]->level[i].span++;
e2641e09 123 }
124
125 x->backward = (update[0] == zsl->header) ? NULL : update[0];
2159782b
PN
126 if (x->level[0].forward)
127 x->level[0].forward->backward = x;
e2641e09 128 else
129 zsl->tail = x;
130 zsl->length++;
69ef89f2 131 return x;
e2641e09 132}
133
134/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
135void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
136 int i;
137 for (i = 0; i < zsl->level; i++) {
2159782b
PN
138 if (update[i]->level[i].forward == x) {
139 update[i]->level[i].span += x->level[i].span - 1;
140 update[i]->level[i].forward = x->level[i].forward;
e2641e09 141 } else {
2159782b 142 update[i]->level[i].span -= 1;
e2641e09 143 }
144 }
2159782b
PN
145 if (x->level[0].forward) {
146 x->level[0].forward->backward = x->backward;
e2641e09 147 } else {
148 zsl->tail = x->backward;
149 }
2159782b 150 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
e2641e09 151 zsl->level--;
152 zsl->length--;
153}
154
155/* Delete an element with matching score/object from the skiplist. */
156int zslDelete(zskiplist *zsl, double score, robj *obj) {
157 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
158 int i;
159
160 x = zsl->header;
161 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
162 while (x->level[i].forward &&
163 (x->level[i].forward->score < score ||
164 (x->level[i].forward->score == score &&
165 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
166 x = x->level[i].forward;
e2641e09 167 update[i] = x;
168 }
169 /* We may have multiple elements with the same score, what we need
170 * is to find the element with both the right score and object. */
2159782b 171 x = x->level[0].forward;
e2641e09 172 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
173 zslDeleteNode(zsl, x, update);
174 zslFreeNode(x);
175 return 1;
176 } else {
177 return 0; /* not found */
178 }
179 return 0; /* not found */
180}
181
45290ad9 182static int zslValueGteMin(double value, zrangespec *spec) {
22b9bf15
PN
183 return spec->minex ? (value > spec->min) : (value >= spec->min);
184}
185
45290ad9 186static int zslValueLteMax(double value, zrangespec *spec) {
22b9bf15
PN
187 return spec->maxex ? (value < spec->max) : (value <= spec->max);
188}
189
22b9bf15
PN
190/* Returns if there is a part of the zset is in range. */
191int zslIsInRange(zskiplist *zsl, zrangespec *range) {
192 zskiplistNode *x;
193
8e1b3277
PN
194 /* Test for ranges that will always be empty. */
195 if (range->min > range->max ||
196 (range->min == range->max && (range->minex || range->maxex)))
197 return 0;
22b9bf15 198 x = zsl->tail;
45290ad9 199 if (x == NULL || !zslValueGteMin(x->score,range))
22b9bf15
PN
200 return 0;
201 x = zsl->header->level[0].forward;
45290ad9 202 if (x == NULL || !zslValueLteMax(x->score,range))
22b9bf15
PN
203 return 0;
204 return 1;
205}
206
207/* Find the first node that is contained in the specified range.
208 * Returns NULL when no element is contained in the range. */
209zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
210 zskiplistNode *x;
211 int i;
212
213 /* If everything is out of range, return early. */
214 if (!zslIsInRange(zsl,&range)) return NULL;
215
216 x = zsl->header;
217 for (i = zsl->level-1; i >= 0; i--) {
218 /* Go forward while *OUT* of range. */
219 while (x->level[i].forward &&
45290ad9 220 !zslValueGteMin(x->level[i].forward->score,&range))
22b9bf15
PN
221 x = x->level[i].forward;
222 }
223
e53ca04b 224 /* This is an inner range, so the next node cannot be NULL. */
22b9bf15 225 x = x->level[0].forward;
e53ca04b
PN
226 redisAssert(x != NULL);
227
228 /* Check if score <= max. */
229 if (!zslValueLteMax(x->score,&range)) return NULL;
22b9bf15
PN
230 return x;
231}
232
233/* Find the last node that is contained in the specified range.
234 * Returns NULL when no element is contained in the range. */
235zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
236 zskiplistNode *x;
237 int i;
238
239 /* If everything is out of range, return early. */
240 if (!zslIsInRange(zsl,&range)) return NULL;
241
242 x = zsl->header;
243 for (i = zsl->level-1; i >= 0; i--) {
244 /* Go forward while *IN* range. */
245 while (x->level[i].forward &&
45290ad9 246 zslValueLteMax(x->level[i].forward->score,&range))
22b9bf15
PN
247 x = x->level[i].forward;
248 }
249
e53ca04b
PN
250 /* This is an inner range, so this node cannot be NULL. */
251 redisAssert(x != NULL);
252
253 /* Check if score >= min. */
254 if (!zslValueGteMin(x->score,&range)) return NULL;
22b9bf15
PN
255 return x;
256}
257
e2641e09 258/* Delete all the elements with score between min and max from the skiplist.
259 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
260 * Note that this function takes the reference to the hash table view of the
261 * sorted set, in order to remove the elements from the hash table too. */
91504b6c 262unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
e2641e09 263 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
264 unsigned long removed = 0;
265 int i;
266
267 x = zsl->header;
268 for (i = zsl->level-1; i >= 0; i--) {
91504b6c
PN
269 while (x->level[i].forward && (range.minex ?
270 x->level[i].forward->score <= range.min :
271 x->level[i].forward->score < range.min))
272 x = x->level[i].forward;
e2641e09 273 update[i] = x;
274 }
91504b6c
PN
275
276 /* Current node is the last with score < or <= min. */
2159782b 277 x = x->level[0].forward;
91504b6c
PN
278
279 /* Delete nodes while in range. */
280 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
2159782b 281 zskiplistNode *next = x->level[0].forward;
69ef89f2 282 zslDeleteNode(zsl,x,update);
e2641e09 283 dictDelete(dict,x->obj);
284 zslFreeNode(x);
285 removed++;
286 x = next;
287 }
91504b6c 288 return removed;
e2641e09 289}
290
291/* Delete all the elements with rank between start and end from the skiplist.
292 * Start and end are inclusive. Note that start and end need to be 1-based */
293unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
294 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
295 unsigned long traversed = 0, removed = 0;
296 int i;
297
298 x = zsl->header;
299 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
300 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
301 traversed += x->level[i].span;
302 x = x->level[i].forward;
e2641e09 303 }
304 update[i] = x;
305 }
306
307 traversed++;
2159782b 308 x = x->level[0].forward;
e2641e09 309 while (x && traversed <= end) {
2159782b 310 zskiplistNode *next = x->level[0].forward;
69ef89f2 311 zslDeleteNode(zsl,x,update);
e2641e09 312 dictDelete(dict,x->obj);
313 zslFreeNode(x);
314 removed++;
315 traversed++;
316 x = next;
317 }
318 return removed;
319}
320
e2641e09 321/* Find the rank for an element by both score and key.
322 * Returns 0 when the element cannot be found, rank otherwise.
323 * Note that the rank is 1-based due to the span of zsl->header to the
324 * first element. */
a3004773 325unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
e2641e09 326 zskiplistNode *x;
327 unsigned long rank = 0;
328 int i;
329
330 x = zsl->header;
331 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
332 while (x->level[i].forward &&
333 (x->level[i].forward->score < score ||
334 (x->level[i].forward->score == score &&
335 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
336 rank += x->level[i].span;
337 x = x->level[i].forward;
e2641e09 338 }
339
340 /* x might be equal to zsl->header, so test if obj is non-NULL */
341 if (x->obj && equalStringObjects(x->obj,o)) {
342 return rank;
343 }
344 }
345 return 0;
346}
347
348/* Finds an element by its rank. The rank argument needs to be 1-based. */
a3004773 349zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
e2641e09 350 zskiplistNode *x;
351 unsigned long traversed = 0;
352 int i;
353
354 x = zsl->header;
355 for (i = zsl->level-1; i >= 0; i--) {
2159782b 356 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
e2641e09 357 {
2159782b
PN
358 traversed += x->level[i].span;
359 x = x->level[i].forward;
e2641e09 360 }
361 if (traversed == rank) {
362 return x;
363 }
364 }
365 return NULL;
366}
367
25bb8a44 368/* Populate the rangespec according to the objects min and max. */
7236fdb2
PN
369static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
370 char *eptr;
25bb8a44
PN
371 spec->minex = spec->maxex = 0;
372
373 /* Parse the min-max interval. If one of the values is prefixed
374 * by the "(" character, it's considered "open". For instance
375 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
376 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
377 if (min->encoding == REDIS_ENCODING_INT) {
378 spec->min = (long)min->ptr;
379 } else {
380 if (((char*)min->ptr)[0] == '(') {
7236fdb2
PN
381 spec->min = strtod((char*)min->ptr+1,&eptr);
382 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
383 spec->minex = 1;
384 } else {
7236fdb2
PN
385 spec->min = strtod((char*)min->ptr,&eptr);
386 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
387 }
388 }
389 if (max->encoding == REDIS_ENCODING_INT) {
390 spec->max = (long)max->ptr;
391 } else {
392 if (((char*)max->ptr)[0] == '(') {
7236fdb2
PN
393 spec->max = strtod((char*)max->ptr+1,&eptr);
394 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
395 spec->maxex = 1;
396 } else {
7236fdb2
PN
397 spec->max = strtod((char*)max->ptr,&eptr);
398 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
399 }
400 }
401
402 return REDIS_OK;
403}
404
21c5b508
PN
405/*-----------------------------------------------------------------------------
406 * Ziplist-backed sorted set API
407 *----------------------------------------------------------------------------*/
408
409double zzlGetScore(unsigned char *sptr) {
410 unsigned char *vstr;
411 unsigned int vlen;
412 long long vlong;
413 char buf[128];
414 double score;
415
416 redisAssert(sptr != NULL);
417 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
418
419 if (vstr) {
420 memcpy(buf,vstr,vlen);
421 buf[vlen] = '\0';
422 score = strtod(buf,NULL);
423 } else {
424 score = vlong;
425 }
426
427 return score;
428}
429
430/* Compare element in sorted set with given element. */
431int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
432 unsigned char *vstr;
433 unsigned int vlen;
434 long long vlong;
435 unsigned char vbuf[32];
436 int minlen, cmp;
437
438 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
439 if (vstr == NULL) {
440 /* Store string representation of long long in buf. */
441 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
442 vstr = vbuf;
443 }
444
445 minlen = (vlen < clen) ? vlen : clen;
446 cmp = memcmp(vstr,cstr,minlen);
447 if (cmp == 0) return vlen-clen;
448 return cmp;
449}
450
bbfe232f 451unsigned int zzlLength(unsigned char *zl) {
0b10e104
PN
452 return ziplistLen(zl)/2;
453}
454
4c5f0966
PN
455/* Move to next entry based on the values in eptr and sptr. Both are set to
456 * NULL when there is no next entry. */
457void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
458 unsigned char *_eptr, *_sptr;
459 redisAssert(*eptr != NULL && *sptr != NULL);
460
461 _eptr = ziplistNext(zl,*sptr);
462 if (_eptr != NULL) {
463 _sptr = ziplistNext(zl,_eptr);
464 redisAssert(_sptr != NULL);
465 } else {
466 /* No next entry. */
467 _sptr = NULL;
468 }
469
470 *eptr = _eptr;
471 *sptr = _sptr;
472}
473
474/* Move to the previous entry based on the values in eptr and sptr. Both are
475 * set to NULL when there is no next entry. */
476void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
477 unsigned char *_eptr, *_sptr;
478 redisAssert(*eptr != NULL && *sptr != NULL);
479
480 _sptr = ziplistPrev(zl,*eptr);
481 if (_sptr != NULL) {
482 _eptr = ziplistPrev(zl,_sptr);
483 redisAssert(_eptr != NULL);
484 } else {
485 /* No previous entry. */
486 _eptr = NULL;
487 }
488
489 *eptr = _eptr;
490 *sptr = _sptr;
491}
492
4a14dbba
PN
493/* Returns if there is a part of the zset is in range. Should only be used
494 * internally by zzlFirstInRange and zzlLastInRange. */
495int zzlIsInRange(unsigned char *zl, zrangespec *range) {
496 unsigned char *p;
497 double score;
498
499 /* Test for ranges that will always be empty. */
500 if (range->min > range->max ||
501 (range->min == range->max && (range->minex || range->maxex)))
502 return 0;
503
504 p = ziplistIndex(zl,-1); /* Last score. */
feb28288 505 if (p == NULL) return 0; /* Empty sorted set */
4a14dbba
PN
506 score = zzlGetScore(p);
507 if (!zslValueGteMin(score,range))
508 return 0;
509
510 p = ziplistIndex(zl,1); /* First score. */
511 redisAssert(p != NULL);
512 score = zzlGetScore(p);
513 if (!zslValueLteMax(score,range))
514 return 0;
515
516 return 1;
517}
518
519/* Find pointer to the first element contained in the specified range.
520 * Returns NULL when no element is contained in the range. */
8588bfa3 521unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
4a14dbba
PN
522 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
523 double score;
524
525 /* If everything is out of range, return early. */
526 if (!zzlIsInRange(zl,&range)) return NULL;
527
528 while (eptr != NULL) {
529 sptr = ziplistNext(zl,eptr);
530 redisAssert(sptr != NULL);
531
532 score = zzlGetScore(sptr);
e53ca04b
PN
533 if (zslValueGteMin(score,&range)) {
534 /* Check if score <= max. */
535 if (zslValueLteMax(score,&range))
536 return eptr;
537 return NULL;
538 }
4a14dbba
PN
539
540 /* Move to next element. */
541 eptr = ziplistNext(zl,sptr);
542 }
543
544 return NULL;
545}
546
547/* Find pointer to the last element contained in the specified range.
548 * Returns NULL when no element is contained in the range. */
8588bfa3 549unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) {
4a14dbba
PN
550 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
551 double score;
552
553 /* If everything is out of range, return early. */
554 if (!zzlIsInRange(zl,&range)) return NULL;
555
556 while (eptr != NULL) {
557 sptr = ziplistNext(zl,eptr);
558 redisAssert(sptr != NULL);
559
560 score = zzlGetScore(sptr);
e53ca04b
PN
561 if (zslValueLteMax(score,&range)) {
562 /* Check if score >= min. */
563 if (zslValueGteMin(score,&range))
564 return eptr;
565 return NULL;
566 }
4a14dbba
PN
567
568 /* Move to previous element by moving to the score of previous element.
569 * When this returns NULL, we know there also is no element. */
570 sptr = ziplistPrev(zl,eptr);
571 if (sptr != NULL)
572 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
573 else
574 eptr = NULL;
575 }
576
577 return NULL;
578}
579
8588bfa3 580unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
21c5b508
PN
581 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
582
583 ele = getDecodedObject(ele);
584 while (eptr != NULL) {
585 sptr = ziplistNext(zl,eptr);
eab0e26e 586 redisAssertWithInfo(NULL,ele,sptr != NULL);
21c5b508
PN
587
588 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
589 /* Matching element, pull out score. */
0b10e104 590 if (score != NULL) *score = zzlGetScore(sptr);
21c5b508
PN
591 decrRefCount(ele);
592 return eptr;
593 }
594
595 /* Move to next element. */
596 eptr = ziplistNext(zl,sptr);
597 }
598
599 decrRefCount(ele);
600 return NULL;
601}
602
603/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
604 * don't want to modify the one given as argument. */
8588bfa3 605unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
21c5b508
PN
606 unsigned char *p = eptr;
607
608 /* TODO: add function to ziplist API to delete N elements from offset. */
609 zl = ziplistDelete(zl,&p);
610 zl = ziplistDelete(zl,&p);
8588bfa3 611 return zl;
21c5b508
PN
612}
613
8588bfa3 614unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
21c5b508
PN
615 unsigned char *sptr;
616 char scorebuf[128];
617 int scorelen;
69298a05 618 size_t offset;
21c5b508 619
eab0e26e 620 redisAssertWithInfo(NULL,ele,ele->encoding == REDIS_ENCODING_RAW);
21c5b508
PN
621 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
622 if (eptr == NULL) {
623 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
624 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
625 } else {
626 /* Keep offset relative to zl, as it might be re-allocated. */
627 offset = eptr-zl;
628 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
629 eptr = zl+offset;
630
631 /* Insert score after the element. */
eab0e26e 632 redisAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
21c5b508
PN
633 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
634 }
635
8588bfa3 636 return zl;
21c5b508
PN
637}
638
639/* Insert (element,score) pair in ziplist. This function assumes the element is
640 * not yet present in the list. */
8588bfa3 641unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
21c5b508
PN
642 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
643 double s;
21c5b508
PN
644
645 ele = getDecodedObject(ele);
646 while (eptr != NULL) {
647 sptr = ziplistNext(zl,eptr);
eab0e26e 648 redisAssertWithInfo(NULL,ele,sptr != NULL);
21c5b508
PN
649 s = zzlGetScore(sptr);
650
651 if (s > score) {
652 /* First element with score larger than score for element to be
653 * inserted. This means we should take its spot in the list to
654 * maintain ordering. */
8588bfa3 655 zl = zzlInsertAt(zl,eptr,ele,score);
21c5b508 656 break;
8218db3d
PN
657 } else if (s == score) {
658 /* Ensure lexicographical ordering for elements. */
d1c920c5 659 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
8588bfa3 660 zl = zzlInsertAt(zl,eptr,ele,score);
8218db3d
PN
661 break;
662 }
21c5b508
PN
663 }
664
665 /* Move to next element. */
666 eptr = ziplistNext(zl,sptr);
667 }
668
669 /* Push on tail of list when it was not yet inserted. */
8218db3d 670 if (eptr == NULL)
8588bfa3 671 zl = zzlInsertAt(zl,NULL,ele,score);
21c5b508
PN
672
673 decrRefCount(ele);
8588bfa3 674 return zl;
21c5b508 675}
25bb8a44 676
8588bfa3 677unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
4a14dbba
PN
678 unsigned char *eptr, *sptr;
679 double score;
8588bfa3 680 unsigned long num = 0;
4a14dbba 681
8588bfa3 682 if (deleted != NULL) *deleted = 0;
4a14dbba 683
8588bfa3
PN
684 eptr = zzlFirstInRange(zl,range);
685 if (eptr == NULL) return zl;
4a14dbba
PN
686
687 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
688 * byte and ziplistNext will return NULL. */
689 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
690 score = zzlGetScore(sptr);
691 if (zslValueLteMax(score,&range)) {
692 /* Delete both the element and the score. */
693 zl = ziplistDelete(zl,&eptr);
694 zl = ziplistDelete(zl,&eptr);
8588bfa3 695 num++;
4a14dbba
PN
696 } else {
697 /* No longer in range. */
698 break;
699 }
700 }
701
8588bfa3
PN
702 if (deleted != NULL) *deleted = num;
703 return zl;
4a14dbba
PN
704}
705
63b7b7fb
PN
706/* Delete all the elements with rank between start and end from the skiplist.
707 * Start and end are inclusive. Note that start and end need to be 1-based */
8588bfa3 708unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
63b7b7fb 709 unsigned int num = (end-start)+1;
8588bfa3
PN
710 if (deleted) *deleted = num;
711 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
712 return zl;
63b7b7fb
PN
713}
714
5d1b4fb6
PN
715/*-----------------------------------------------------------------------------
716 * Common sorted set API
717 *----------------------------------------------------------------------------*/
718
df26a0ae 719unsigned int zsetLength(robj *zobj) {
5d1b4fb6
PN
720 int length = -1;
721 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
bbfe232f 722 length = zzlLength(zobj->ptr);
100ed062 723 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
5d1b4fb6
PN
724 length = ((zset*)zobj->ptr)->zsl->length;
725 } else {
726 redisPanic("Unknown sorted set encoding");
727 }
728 return length;
729}
730
df26a0ae 731void zsetConvert(robj *zobj, int encoding) {
a669d5e9
PN
732 zset *zs;
733 zskiplistNode *node, *next;
734 robj *ele;
735 double score;
736
737 if (zobj->encoding == encoding) return;
738 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
739 unsigned char *zl = zobj->ptr;
740 unsigned char *eptr, *sptr;
741 unsigned char *vstr;
742 unsigned int vlen;
743 long long vlong;
744
100ed062 745 if (encoding != REDIS_ENCODING_SKIPLIST)
a669d5e9
PN
746 redisPanic("Unknown target encoding");
747
748 zs = zmalloc(sizeof(*zs));
749 zs->dict = dictCreate(&zsetDictType,NULL);
750 zs->zsl = zslCreate();
751
752 eptr = ziplistIndex(zl,0);
eab0e26e 753 redisAssertWithInfo(NULL,zobj,eptr != NULL);
a669d5e9 754 sptr = ziplistNext(zl,eptr);
eab0e26e 755 redisAssertWithInfo(NULL,zobj,sptr != NULL);
a669d5e9
PN
756
757 while (eptr != NULL) {
758 score = zzlGetScore(sptr);
eab0e26e 759 redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
a669d5e9
PN
760 if (vstr == NULL)
761 ele = createStringObjectFromLongLong(vlong);
762 else
763 ele = createStringObject((char*)vstr,vlen);
764
765 /* Has incremented refcount since it was just created. */
766 node = zslInsert(zs->zsl,score,ele);
eab0e26e 767 redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
a669d5e9
PN
768 incrRefCount(ele); /* Added to dictionary. */
769 zzlNext(zl,&eptr,&sptr);
770 }
771
772 zfree(zobj->ptr);
773 zobj->ptr = zs;
100ed062
PN
774 zobj->encoding = REDIS_ENCODING_SKIPLIST;
775 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
a669d5e9
PN
776 unsigned char *zl = ziplistNew();
777
778 if (encoding != REDIS_ENCODING_ZIPLIST)
779 redisPanic("Unknown target encoding");
780
781 /* Approach similar to zslFree(), since we want to free the skiplist at
782 * the same time as creating the ziplist. */
783 zs = zobj->ptr;
784 dictRelease(zs->dict);
785 node = zs->zsl->header->level[0].forward;
786 zfree(zs->zsl->header);
787 zfree(zs->zsl);
788
a669d5e9
PN
789 while (node) {
790 ele = getDecodedObject(node->obj);
8588bfa3 791 zl = zzlInsertAt(zl,NULL,ele,node->score);
a669d5e9
PN
792 decrRefCount(ele);
793
794 next = node->level[0].forward;
795 zslFreeNode(node);
796 node = next;
797 }
798
799 zfree(zs);
8588bfa3 800 zobj->ptr = zl;
a669d5e9
PN
801 zobj->encoding = REDIS_ENCODING_ZIPLIST;
802 } else {
803 redisPanic("Unknown sorted set encoding");
804 }
805}
806
e2641e09 807/*-----------------------------------------------------------------------------
808 * Sorted set commands
809 *----------------------------------------------------------------------------*/
810
69ef89f2 811/* This generic command implements both ZADD and ZINCRBY. */
3ca7532a 812void zaddGenericCommand(redisClient *c, int incr) {
21c5b508 813 static char *nanerr = "resulting score is not a number (NaN)";
3ca7532a
PN
814 robj *key = c->argv[1];
815 robj *ele;
21c5b508
PN
816 robj *zobj;
817 robj *curobj;
ef231a7c 818 double score = 0, *scores, curscore = 0.0;
819 int j, elements = (c->argc-2)/2;
820 int added = 0;
3ca7532a 821
ef231a7c 822 if (c->argc % 2) {
823 addReply(c,shared.syntaxerr);
3ca7532a 824 return;
ef231a7c 825 }
826
827 /* Start parsing all the scores, we need to emit any syntax error
828 * before executing additions to the sorted set, as the command should
829 * either execute fully or nothing at all. */
830 scores = zmalloc(sizeof(double)*elements);
831 for (j = 0; j < elements; j++) {
832 if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
833 != REDIS_OK)
834 {
835 zfree(scores);
836 return;
837 }
838 }
21c5b508 839
ef231a7c 840 /* Lookup the key and create the sorted set if does not exist. */
21c5b508
PN
841 zobj = lookupKeyWrite(c->db,key);
842 if (zobj == NULL) {
a669d5e9
PN
843 if (server.zset_max_ziplist_entries == 0 ||
844 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
845 {
846 zobj = createZsetObject();
847 } else {
848 zobj = createZsetZiplistObject();
849 }
21c5b508 850 dbAdd(c->db,key,zobj);
e2641e09 851 } else {
21c5b508 852 if (zobj->type != REDIS_ZSET) {
e2641e09 853 addReply(c,shared.wrongtypeerr);
ef231a7c 854 zfree(scores);
e2641e09 855 return;
856 }
857 }
e2641e09 858
ef231a7c 859 for (j = 0; j < elements; j++) {
860 score = scores[j];
861
862 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
863 unsigned char *eptr;
864
865 /* Prefer non-encoded element when dealing with ziplists. */
866 ele = c->argv[3+j*2];
867 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
868 if (incr) {
869 score += curscore;
870 if (isnan(score)) {
871 addReplyError(c,nanerr);
872 /* Don't need to check if the sorted set is empty
873 * because we know it has at least one element. */
874 zfree(scores);
875 return;
876 }
21c5b508 877 }
21c5b508 878
ef231a7c 879 /* Remove and re-insert when score changed. */
880 if (score != curscore) {
881 zobj->ptr = zzlDelete(zobj->ptr,eptr);
882 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
883
884 signalModifiedKey(c->db,key);
885 server.dirty++;
886 }
887 } else {
888 /* Optimize: check if the element is too large or the list
889 * becomes too long *before* executing zzlInsert. */
8588bfa3 890 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
ef231a7c 891 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
892 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
893 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
894 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
21c5b508
PN
895
896 signalModifiedKey(c->db,key);
897 server.dirty++;
ef231a7c 898 if (!incr) added++;
21c5b508 899 }
ef231a7c 900 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
901 zset *zs = zobj->ptr;
902 zskiplistNode *znode;
903 dictEntry *de;
904
905 ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
906 de = dictFind(zs->dict,ele);
907 if (de != NULL) {
c0ba9ebe 908 curobj = dictGetKey(de);
909 curscore = *(double*)dictGetVal(de);
ef231a7c 910
911 if (incr) {
912 score += curscore;
913 if (isnan(score)) {
914 addReplyError(c,nanerr);
915 /* Don't need to check if the sorted set is empty
916 * because we know it has at least one element. */
917 zfree(scores);
918 return;
919 }
21c5b508 920 }
21c5b508 921
ef231a7c 922 /* Remove and re-insert when score changed. We can safely
923 * delete the key object from the skiplist, since the
924 * dictionary still has a reference to it. */
925 if (score != curscore) {
eab0e26e 926 redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
ef231a7c 927 znode = zslInsert(zs->zsl,score,curobj);
928 incrRefCount(curobj); /* Re-inserted in skiplist. */
c0ba9ebe 929 dictGetVal(de) = &znode->score; /* Update score ptr. */
ef231a7c 930
931 signalModifiedKey(c->db,key);
932 server.dirty++;
933 }
934 } else {
935 znode = zslInsert(zs->zsl,score,ele);
936 incrRefCount(ele); /* Inserted in skiplist. */
f013f400 937 redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
ef231a7c 938 incrRefCount(ele); /* Added to dictionary. */
21c5b508
PN
939
940 signalModifiedKey(c->db,key);
941 server.dirty++;
ef231a7c 942 if (!incr) added++;
21c5b508 943 }
21c5b508 944 } else {
ef231a7c 945 redisPanic("Unknown sorted set encoding");
e2641e09 946 }
e2641e09 947 }
ef231a7c 948 zfree(scores);
949 if (incr) /* ZINCRBY */
950 addReplyDouble(c,score);
951 else /* ZADD */
952 addReplyLongLong(c,added);
e2641e09 953}
954
955void zaddCommand(redisClient *c) {
3ca7532a 956 zaddGenericCommand(c,0);
e2641e09 957}
958
959void zincrbyCommand(redisClient *c) {
3ca7532a 960 zaddGenericCommand(c,1);
e2641e09 961}
962
963void zremCommand(redisClient *c) {
0b10e104 964 robj *key = c->argv[1];
0b10e104 965 robj *zobj;
3f7b2b1f 966 int deleted = 0, j;
0b10e104
PN
967
968 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
969 checkType(c,zobj,REDIS_ZSET)) return;
970
971 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
972 unsigned char *eptr;
973
3f7b2b1f 974 for (j = 2; j < c->argc; j++) {
975 if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
976 deleted++;
977 zobj->ptr = zzlDelete(zobj->ptr,eptr);
978 if (zzlLength(zobj->ptr) == 0) {
979 dbDelete(c->db,key);
980 break;
981 }
982 }
0b10e104 983 }
100ed062 984 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
0b10e104
PN
985 zset *zs = zobj->ptr;
986 dictEntry *de;
987 double score;
988
3f7b2b1f 989 for (j = 2; j < c->argc; j++) {
990 de = dictFind(zs->dict,c->argv[j]);
991 if (de != NULL) {
992 deleted++;
993
994 /* Delete from the skiplist */
c0ba9ebe 995 score = *(double*)dictGetVal(de);
eab0e26e 996 redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
3f7b2b1f 997
998 /* Delete from the hash table */
999 dictDelete(zs->dict,c->argv[j]);
1000 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1001 if (dictSize(zs->dict) == 0) {
1002 dbDelete(c->db,key);
1003 break;
1004 }
1005 }
0b10e104
PN
1006 }
1007 } else {
1008 redisPanic("Unknown sorted set encoding");
e2641e09 1009 }
e2641e09 1010
3f7b2b1f 1011 if (deleted) {
1012 signalModifiedKey(c->db,key);
1013 server.dirty += deleted;
1014 }
1015 addReplyLongLong(c,deleted);
e2641e09 1016}
1017
1018void zremrangebyscoreCommand(redisClient *c) {
4a14dbba
PN
1019 robj *key = c->argv[1];
1020 robj *zobj;
91504b6c 1021 zrangespec range;
4a14dbba 1022 unsigned long deleted;
e2641e09 1023
91504b6c 1024 /* Parse the range arguments. */
7236fdb2 1025 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
d93f9a86 1026 addReplyError(c,"min or max is not a float");
7236fdb2
PN
1027 return;
1028 }
e2641e09 1029
4a14dbba
PN
1030 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1031 checkType(c,zobj,REDIS_ZSET)) return;
1032
1033 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
8588bfa3 1034 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
48991620 1035 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
100ed062 1036 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
4a14dbba
PN
1037 zset *zs = zobj->ptr;
1038 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1039 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1040 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1041 } else {
1042 redisPanic("Unknown sorted set encoding");
1043 }
e2641e09 1044
4a14dbba 1045 if (deleted) signalModifiedKey(c->db,key);
e2641e09 1046 server.dirty += deleted;
1047 addReplyLongLong(c,deleted);
1048}
1049
1050void zremrangebyrankCommand(redisClient *c) {
63b7b7fb
PN
1051 robj *key = c->argv[1];
1052 robj *zobj;
e2641e09 1053 long start;
1054 long end;
1055 int llen;
63b7b7fb 1056 unsigned long deleted;
e2641e09 1057
1058 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1059 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1060
63b7b7fb
PN
1061 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1062 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1063
63b7b7fb 1064 /* Sanitize indexes. */
df26a0ae 1065 llen = zsetLength(zobj);
e2641e09 1066 if (start < 0) start = llen+start;
1067 if (end < 0) end = llen+end;
1068 if (start < 0) start = 0;
e2641e09 1069
d0a4e24e
PN
1070 /* Invariant: start >= 0, so this test will be true when end < 0.
1071 * The range is empty when start > end or start >= length. */
e2641e09 1072 if (start > end || start >= llen) {
1073 addReply(c,shared.czero);
1074 return;
1075 }
1076 if (end >= llen) end = llen-1;
1077
63b7b7fb
PN
1078 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1079 /* Correct for 1-based rank. */
8588bfa3 1080 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
48991620 1081 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
100ed062 1082 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
63b7b7fb
PN
1083 zset *zs = zobj->ptr;
1084
1085 /* Correct for 1-based rank. */
1086 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1087 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1088 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1089 } else {
1090 redisPanic("Unknown sorted set encoding");
1091 }
1092
1093 if (deleted) signalModifiedKey(c->db,key);
e2641e09 1094 server.dirty += deleted;
63b7b7fb 1095 addReplyLongLong(c,deleted);
e2641e09 1096}
1097
1098typedef struct {
56ce42fa
PN
1099 robj *subject;
1100 int type; /* Set, sorted set */
1101 int encoding;
e2641e09 1102 double weight;
56ce42fa
PN
1103
1104 union {
1105 /* Set iterators. */
1106 union _iterset {
1107 struct {
1108 intset *is;
1109 int ii;
1110 } is;
1111 struct {
1112 dict *dict;
1113 dictIterator *di;
1114 dictEntry *de;
1115 } ht;
1116 } set;
1117
1118 /* Sorted set iterators. */
1119 union _iterzset {
1120 struct {
1121 unsigned char *zl;
1122 unsigned char *eptr, *sptr;
1123 } zl;
1124 struct {
1125 zset *zs;
1126 zskiplistNode *node;
1127 } sl;
1128 } zset;
1129 } iter;
e2641e09 1130} zsetopsrc;
1131
56ce42fa
PN
1132
1133/* Use dirty flags for pointers that need to be cleaned up in the next
1134 * iteration over the zsetopval. The dirty flag for the long long value is
1135 * special, since long long values don't need cleanup. Instead, it means that
1136 * we already checked that "ell" holds a long long, or tried to convert another
1137 * representation into a long long value. When this was successful,
1138 * OPVAL_VALID_LL is set as well. */
1139#define OPVAL_DIRTY_ROBJ 1
1140#define OPVAL_DIRTY_LL 2
1141#define OPVAL_VALID_LL 4
1142
1143/* Store value retrieved from the iterator. */
1144typedef struct {
1145 int flags;
1146 unsigned char _buf[32]; /* Private buffer. */
1147 robj *ele;
1148 unsigned char *estr;
1149 unsigned int elen;
1150 long long ell;
1151 double score;
1152} zsetopval;
1153
1154typedef union _iterset iterset;
1155typedef union _iterzset iterzset;
1156
1157void zuiInitIterator(zsetopsrc *op) {
1158 if (op->subject == NULL)
1159 return;
1160
1161 if (op->type == REDIS_SET) {
1162 iterset *it = &op->iter.set;
1163 if (op->encoding == REDIS_ENCODING_INTSET) {
1164 it->is.is = op->subject->ptr;
1165 it->is.ii = 0;
1166 } else if (op->encoding == REDIS_ENCODING_HT) {
1167 it->ht.dict = op->subject->ptr;
1168 it->ht.di = dictGetIterator(op->subject->ptr);
1169 it->ht.de = dictNext(it->ht.di);
1170 } else {
1171 redisPanic("Unknown set encoding");
1172 }
1173 } else if (op->type == REDIS_ZSET) {
1174 iterzset *it = &op->iter.zset;
1175 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1176 it->zl.zl = op->subject->ptr;
1177 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1178 if (it->zl.eptr != NULL) {
1179 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1180 redisAssert(it->zl.sptr != NULL);
1181 }
100ed062 1182 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1183 it->sl.zs = op->subject->ptr;
1184 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1185 } else {
1186 redisPanic("Unknown sorted set encoding");
1187 }
1188 } else {
1189 redisPanic("Unsupported type");
1190 }
1191}
1192
1193void zuiClearIterator(zsetopsrc *op) {
1194 if (op->subject == NULL)
1195 return;
1196
1197 if (op->type == REDIS_SET) {
1198 iterset *it = &op->iter.set;
1199 if (op->encoding == REDIS_ENCODING_INTSET) {
1200 REDIS_NOTUSED(it); /* skip */
1201 } else if (op->encoding == REDIS_ENCODING_HT) {
1202 dictReleaseIterator(it->ht.di);
1203 } else {
1204 redisPanic("Unknown set encoding");
1205 }
1206 } else if (op->type == REDIS_ZSET) {
1207 iterzset *it = &op->iter.zset;
1208 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1209 REDIS_NOTUSED(it); /* skip */
100ed062 1210 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1211 REDIS_NOTUSED(it); /* skip */
1212 } else {
1213 redisPanic("Unknown sorted set encoding");
1214 }
1215 } else {
1216 redisPanic("Unsupported type");
1217 }
1218}
1219
1220int zuiLength(zsetopsrc *op) {
1221 if (op->subject == NULL)
1222 return 0;
1223
1224 if (op->type == REDIS_SET) {
1225 iterset *it = &op->iter.set;
1226 if (op->encoding == REDIS_ENCODING_INTSET) {
1227 return intsetLen(it->is.is);
1228 } else if (op->encoding == REDIS_ENCODING_HT) {
1229 return dictSize(it->ht.dict);
1230 } else {
1231 redisPanic("Unknown set encoding");
1232 }
1233 } else if (op->type == REDIS_ZSET) {
1234 iterzset *it = &op->iter.zset;
1235 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1236 return zzlLength(it->zl.zl);
100ed062 1237 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1238 return it->sl.zs->zsl->length;
1239 } else {
1240 redisPanic("Unknown sorted set encoding");
1241 }
1242 } else {
1243 redisPanic("Unsupported type");
1244 }
1245}
1246
1247/* Check if the current value is valid. If so, store it in the passed structure
1248 * and move to the next element. If not valid, this means we have reached the
1249 * end of the structure and can abort. */
1250int zuiNext(zsetopsrc *op, zsetopval *val) {
1251 if (op->subject == NULL)
1252 return 0;
1253
1254 if (val->flags & OPVAL_DIRTY_ROBJ)
1255 decrRefCount(val->ele);
1256
fe7be460 1257 memset(val,0,sizeof(zsetopval));
56ce42fa
PN
1258
1259 if (op->type == REDIS_SET) {
1260 iterset *it = &op->iter.set;
1261 if (op->encoding == REDIS_ENCODING_INTSET) {
9de5d460 1262 int64_t ell = val->ell;
1263
1264 if (!intsetGet(it->is.is,it->is.ii,&ell))
56ce42fa
PN
1265 return 0;
1266 val->score = 1.0;
1267
1268 /* Move to next element. */
1269 it->is.ii++;
1270 } else if (op->encoding == REDIS_ENCODING_HT) {
1271 if (it->ht.de == NULL)
1272 return 0;
c0ba9ebe 1273 val->ele = dictGetKey(it->ht.de);
56ce42fa
PN
1274 val->score = 1.0;
1275
1276 /* Move to next element. */
1277 it->ht.de = dictNext(it->ht.di);
1278 } else {
1279 redisPanic("Unknown set encoding");
1280 }
1281 } else if (op->type == REDIS_ZSET) {
1282 iterzset *it = &op->iter.zset;
1283 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1284 /* No need to check both, but better be explicit. */
1285 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1286 return 0;
1287 redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1288 val->score = zzlGetScore(it->zl.sptr);
1289
1290 /* Move to next element. */
1291 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
100ed062 1292 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1293 if (it->sl.node == NULL)
1294 return 0;
1295 val->ele = it->sl.node->obj;
1296 val->score = it->sl.node->score;
1297
1298 /* Move to next element. */
1299 it->sl.node = it->sl.node->level[0].forward;
1300 } else {
1301 redisPanic("Unknown sorted set encoding");
1302 }
1303 } else {
1304 redisPanic("Unsupported type");
1305 }
1306 return 1;
1307}
1308
1309int zuiLongLongFromValue(zsetopval *val) {
1310 if (!(val->flags & OPVAL_DIRTY_LL)) {
1311 val->flags |= OPVAL_DIRTY_LL;
1312
1313 if (val->ele != NULL) {
1314 if (val->ele->encoding == REDIS_ENCODING_INT) {
1315 val->ell = (long)val->ele->ptr;
1316 val->flags |= OPVAL_VALID_LL;
1317 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1318 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1319 val->flags |= OPVAL_VALID_LL;
1320 } else {
1321 redisPanic("Unsupported element encoding");
1322 }
1323 } else if (val->estr != NULL) {
1324 if (string2ll((char*)val->estr,val->elen,&val->ell))
1325 val->flags |= OPVAL_VALID_LL;
1326 } else {
1327 /* The long long was already set, flag as valid. */
1328 val->flags |= OPVAL_VALID_LL;
1329 }
1330 }
1331 return val->flags & OPVAL_VALID_LL;
1332}
1333
1334robj *zuiObjectFromValue(zsetopval *val) {
1335 if (val->ele == NULL) {
1336 if (val->estr != NULL) {
1337 val->ele = createStringObject((char*)val->estr,val->elen);
1338 } else {
1339 val->ele = createStringObjectFromLongLong(val->ell);
1340 }
1341 val->flags |= OPVAL_DIRTY_ROBJ;
1342 }
1343 return val->ele;
1344}
1345
1346int zuiBufferFromValue(zsetopval *val) {
1347 if (val->estr == NULL) {
1348 if (val->ele != NULL) {
1349 if (val->ele->encoding == REDIS_ENCODING_INT) {
1350 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1351 val->estr = val->_buf;
1352 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1353 val->elen = sdslen(val->ele->ptr);
1354 val->estr = val->ele->ptr;
1355 } else {
1356 redisPanic("Unsupported element encoding");
1357 }
1358 } else {
1359 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1360 val->estr = val->_buf;
1361 }
1362 }
1363 return 1;
1364}
1365
1366/* Find value pointed to by val in the source pointer to by op. When found,
1367 * return 1 and store its score in target. Return 0 otherwise. */
1368int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1369 if (op->subject == NULL)
1370 return 0;
1371
1372 if (op->type == REDIS_SET) {
1373 iterset *it = &op->iter.set;
1374
1375 if (op->encoding == REDIS_ENCODING_INTSET) {
1376 if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1377 *score = 1.0;
1378 return 1;
1379 } else {
1380 return 0;
1381 }
1382 } else if (op->encoding == REDIS_ENCODING_HT) {
1383 zuiObjectFromValue(val);
1384 if (dictFind(it->ht.dict,val->ele) != NULL) {
1385 *score = 1.0;
1386 return 1;
1387 } else {
1388 return 0;
1389 }
1390 } else {
1391 redisPanic("Unknown set encoding");
1392 }
1393 } else if (op->type == REDIS_ZSET) {
1394 iterzset *it = &op->iter.zset;
1395 zuiObjectFromValue(val);
1396
1397 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
8588bfa3 1398 if (zzlFind(it->zl.zl,val->ele,score) != NULL) {
56ce42fa
PN
1399 /* Score is already set by zzlFind. */
1400 return 1;
1401 } else {
1402 return 0;
1403 }
100ed062 1404 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1405 dictEntry *de;
1406 if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
c0ba9ebe 1407 *score = *(double*)dictGetVal(de);
56ce42fa
PN
1408 return 1;
1409 } else {
1410 return 0;
1411 }
1412 } else {
1413 redisPanic("Unknown sorted set encoding");
1414 }
1415 } else {
1416 redisPanic("Unsupported type");
1417 }
1418}
1419
1420int zuiCompareByCardinality(const void *s1, const void *s2) {
1421 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
e2641e09 1422}
1423
1424#define REDIS_AGGR_SUM 1
1425#define REDIS_AGGR_MIN 2
1426#define REDIS_AGGR_MAX 3
c0ba9ebe 1427#define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
e2641e09 1428
1429inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1430 if (aggregate == REDIS_AGGR_SUM) {
1431 *target = *target + val;
d9e28bcf
PN
1432 /* The result of adding two doubles is NaN when one variable
1433 * is +inf and the other is -inf. When these numbers are added,
1434 * we maintain the convention of the result being 0.0. */
1435 if (isnan(*target)) *target = 0.0;
e2641e09 1436 } else if (aggregate == REDIS_AGGR_MIN) {
1437 *target = val < *target ? val : *target;
1438 } else if (aggregate == REDIS_AGGR_MAX) {
1439 *target = val > *target ? val : *target;
1440 } else {
1441 /* safety net */
1442 redisPanic("Unknown ZUNION/INTER aggregate type");
1443 }
1444}
1445
1446void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
706b32e0
B
1447 int i, j;
1448 long setnum;
e2641e09 1449 int aggregate = REDIS_AGGR_SUM;
1450 zsetopsrc *src;
56ce42fa
PN
1451 zsetopval zval;
1452 robj *tmp;
255eebe2 1453 unsigned int maxelelen = 0;
e2641e09 1454 robj *dstobj;
1455 zset *dstzset;
69ef89f2 1456 zskiplistNode *znode;
8c1420ff 1457 int touched = 0;
e2641e09 1458
1459 /* expect setnum input keys to be given */
706b32e0
B
1460 if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK))
1461 return;
1462
e2641e09 1463 if (setnum < 1) {
3ab20376
PN
1464 addReplyError(c,
1465 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
e2641e09 1466 return;
1467 }
1468
1469 /* test if the expected number of keys would overflow */
1470 if (3+setnum > c->argc) {
1471 addReply(c,shared.syntaxerr);
1472 return;
1473 }
1474
1475 /* read keys to be used for input */
56ce42fa 1476 src = zcalloc(sizeof(zsetopsrc) * setnum);
e2641e09 1477 for (i = 0, j = 3; i < setnum; i++, j++) {
1478 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
56ce42fa
PN
1479 if (obj != NULL) {
1480 if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
e2641e09 1481 zfree(src);
1482 addReply(c,shared.wrongtypeerr);
1483 return;
1484 }
56ce42fa
PN
1485
1486 src[i].subject = obj;
1487 src[i].type = obj->type;
1488 src[i].encoding = obj->encoding;
1489 } else {
1490 src[i].subject = NULL;
e2641e09 1491 }
1492
56ce42fa 1493 /* Default all weights to 1. */
e2641e09 1494 src[i].weight = 1.0;
1495 }
1496
1497 /* parse optional extra arguments */
1498 if (j < c->argc) {
1499 int remaining = c->argc - j;
1500
1501 while (remaining) {
1502 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1503 j++; remaining--;
1504 for (i = 0; i < setnum; i++, j++, remaining--) {
673e1fb7 1505 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
d93f9a86 1506 "weight value is not a float") != REDIS_OK)
673e1fb7
PN
1507 {
1508 zfree(src);
e2641e09 1509 return;
673e1fb7 1510 }
e2641e09 1511 }
1512 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1513 j++; remaining--;
1514 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1515 aggregate = REDIS_AGGR_SUM;
1516 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1517 aggregate = REDIS_AGGR_MIN;
1518 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1519 aggregate = REDIS_AGGR_MAX;
1520 } else {
1521 zfree(src);
1522 addReply(c,shared.syntaxerr);
1523 return;
1524 }
1525 j++; remaining--;
1526 } else {
1527 zfree(src);
1528 addReply(c,shared.syntaxerr);
1529 return;
1530 }
1531 }
1532 }
1533
56ce42fa
PN
1534 for (i = 0; i < setnum; i++)
1535 zuiInitIterator(&src[i]);
1536
e2641e09 1537 /* sort sets from the smallest to largest, this will improve our
1538 * algorithm's performance */
56ce42fa 1539 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
e2641e09 1540
1541 dstobj = createZsetObject();
1542 dstzset = dstobj->ptr;
02e60065 1543 memset(&zval, 0, sizeof(zval));
e2641e09 1544
1545 if (op == REDIS_OP_INTER) {
56ce42fa
PN
1546 /* Skip everything if the smallest input is empty. */
1547 if (zuiLength(&src[0]) > 0) {
1548 /* Precondition: as src[0] is non-empty and the inputs are ordered
1549 * by size, all src[i > 0] are non-empty too. */
1550 while (zuiNext(&src[0],&zval)) {
d433ebc6 1551 double score, value;
e2641e09 1552
56ce42fa 1553 score = src[0].weight * zval.score;
256356ff 1554 if (isnan(score)) score = 0;
1555
e2641e09 1556 for (j = 1; j < setnum; j++) {
d200342a 1557 /* It is not safe to access the zset we are
cb16b6c3 1558 * iterating, so explicitly check for equal object. */
d070abe4 1559 if (src[j].subject == src[0].subject) {
1560 value = zval.score*src[j].weight;
1561 zunionInterAggregate(&score,value,aggregate);
1562 } else if (zuiFind(&src[j],&zval,&value)) {
56ce42fa 1563 value *= src[j].weight;
d433ebc6 1564 zunionInterAggregate(&score,value,aggregate);
e2641e09 1565 } else {
1566 break;
1567 }
1568 }
1569
56ce42fa 1570 /* Only continue when present in every input. */
d433ebc6 1571 if (j == setnum) {
56ce42fa
PN
1572 tmp = zuiObjectFromValue(&zval);
1573 znode = zslInsert(dstzset->zsl,score,tmp);
1574 incrRefCount(tmp); /* added to skiplist */
1575 dictAdd(dstzset->dict,tmp,&znode->score);
1576 incrRefCount(tmp); /* added to dictionary */
255eebe2
PN
1577
1578 if (tmp->encoding == REDIS_ENCODING_RAW)
1579 if (sdslen(tmp->ptr) > maxelelen)
1580 maxelelen = sdslen(tmp->ptr);
e2641e09 1581 }
1582 }
e2641e09 1583 }
1584 } else if (op == REDIS_OP_UNION) {
1585 for (i = 0; i < setnum; i++) {
521ddcce 1586 if (zuiLength(&src[i]) == 0)
56ce42fa 1587 continue;
e2641e09 1588
56ce42fa 1589 while (zuiNext(&src[i],&zval)) {
d433ebc6
PN
1590 double score, value;
1591
56ce42fa
PN
1592 /* Skip key when already processed */
1593 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
d433ebc6 1594 continue;
e2641e09 1595
56ce42fa
PN
1596 /* Initialize score */
1597 score = src[i].weight * zval.score;
256356ff 1598 if (isnan(score)) score = 0;
e2641e09 1599
56ce42fa
PN
1600 /* Because the inputs are sorted by size, it's only possible
1601 * for sets at larger indices to hold this element. */
e2641e09 1602 for (j = (i+1); j < setnum; j++) {
d200342a 1603 /* It is not safe to access the zset we are
cb16b6c3 1604 * iterating, so explicitly check for equal object. */
1605 if(src[j].subject == src[i].subject) {
1606 value = zval.score*src[j].weight;
1607 zunionInterAggregate(&score,value,aggregate);
1608 } else if (zuiFind(&src[j],&zval,&value)) {
56ce42fa 1609 value *= src[j].weight;
d433ebc6 1610 zunionInterAggregate(&score,value,aggregate);
e2641e09 1611 }
1612 }
1613
56ce42fa
PN
1614 tmp = zuiObjectFromValue(&zval);
1615 znode = zslInsert(dstzset->zsl,score,tmp);
1616 incrRefCount(zval.ele); /* added to skiplist */
1617 dictAdd(dstzset->dict,tmp,&znode->score);
1618 incrRefCount(zval.ele); /* added to dictionary */
255eebe2
PN
1619
1620 if (tmp->encoding == REDIS_ENCODING_RAW)
1621 if (sdslen(tmp->ptr) > maxelelen)
1622 maxelelen = sdslen(tmp->ptr);
e2641e09 1623 }
e2641e09 1624 }
1625 } else {
56ce42fa 1626 redisPanic("Unknown operator");
e2641e09 1627 }
1628
56ce42fa
PN
1629 for (i = 0; i < setnum; i++)
1630 zuiClearIterator(&src[i]);
1631
8c1420ff 1632 if (dbDelete(c->db,dstkey)) {
cea8c5cd 1633 signalModifiedKey(c->db,dstkey);
8c1420ff 1634 touched = 1;
1635 server.dirty++;
1636 }
e2641e09 1637 if (dstzset->zsl->length) {
255eebe2
PN
1638 /* Convert to ziplist when in limits. */
1639 if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
1640 maxelelen <= server.zset_max_ziplist_value)
df26a0ae 1641 zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
255eebe2 1642
e2641e09 1643 dbAdd(c->db,dstkey,dstobj);
df26a0ae 1644 addReplyLongLong(c,zsetLength(dstobj));
cea8c5cd 1645 if (!touched) signalModifiedKey(c->db,dstkey);
cbf7e107 1646 server.dirty++;
e2641e09 1647 } else {
1648 decrRefCount(dstobj);
255eebe2 1649 addReply(c,shared.czero);
e2641e09 1650 }
1651 zfree(src);
1652}
1653
1654void zunionstoreCommand(redisClient *c) {
1655 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1656}
1657
1658void zinterstoreCommand(redisClient *c) {
1659 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1660}
1661
1662void zrangeGenericCommand(redisClient *c, int reverse) {
5d1b4fb6
PN
1663 robj *key = c->argv[1];
1664 robj *zobj;
1665 int withscores = 0;
e2641e09 1666 long start;
1667 long end;
e2641e09 1668 int llen;
5d1b4fb6 1669 int rangelen;
e2641e09 1670
1671 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1672 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1673
1674 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1675 withscores = 1;
1676 } else if (c->argc >= 5) {
1677 addReply(c,shared.syntaxerr);
1678 return;
1679 }
1680
5d1b4fb6
PN
1681 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1682 || checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1683
5d1b4fb6 1684 /* Sanitize indexes. */
df26a0ae 1685 llen = zsetLength(zobj);
e2641e09 1686 if (start < 0) start = llen+start;
1687 if (end < 0) end = llen+end;
1688 if (start < 0) start = 0;
e2641e09 1689
d0a4e24e
PN
1690 /* Invariant: start >= 0, so this test will be true when end < 0.
1691 * The range is empty when start > end or start >= length. */
e2641e09 1692 if (start > end || start >= llen) {
e2641e09 1693 addReply(c,shared.emptymultibulk);
1694 return;
1695 }
1696 if (end >= llen) end = llen-1;
1697 rangelen = (end-start)+1;
1698
e2641e09 1699 /* Return the result in form of a multi-bulk reply */
5d1b4fb6
PN
1700 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1701
1702 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1703 unsigned char *zl = zobj->ptr;
1704 unsigned char *eptr, *sptr;
1705 unsigned char *vstr;
1706 unsigned int vlen;
1707 long long vlong;
1708
1709 if (reverse)
1710 eptr = ziplistIndex(zl,-2-(2*start));
1711 else
1712 eptr = ziplistIndex(zl,2*start);
1713
eab0e26e 1714 redisAssertWithInfo(c,zobj,eptr != NULL);
4c5f0966
PN
1715 sptr = ziplistNext(zl,eptr);
1716
5d1b4fb6 1717 while (rangelen--) {
eab0e26e 1718 redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
1719 redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
5d1b4fb6
PN
1720 if (vstr == NULL)
1721 addReplyBulkLongLong(c,vlong);
1722 else
1723 addReplyBulkCBuffer(c,vstr,vlen);
1724
4c5f0966 1725 if (withscores)
5d1b4fb6 1726 addReplyDouble(c,zzlGetScore(sptr));
5d1b4fb6 1727
4c5f0966
PN
1728 if (reverse)
1729 zzlPrev(zl,&eptr,&sptr);
1730 else
1731 zzlNext(zl,&eptr,&sptr);
5d1b4fb6
PN
1732 }
1733
100ed062 1734 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
5d1b4fb6
PN
1735 zset *zs = zobj->ptr;
1736 zskiplist *zsl = zs->zsl;
1737 zskiplistNode *ln;
1738 robj *ele;
1739
1740 /* Check if starting point is trivial, before doing log(N) lookup. */
1741 if (reverse) {
1742 ln = zsl->tail;
1743 if (start > 0)
1744 ln = zslGetElementByRank(zsl,llen-start);
1745 } else {
1746 ln = zsl->header->level[0].forward;
1747 if (start > 0)
1748 ln = zslGetElementByRank(zsl,start+1);
1749 }
1750
1751 while(rangelen--) {
eab0e26e 1752 redisAssertWithInfo(c,zobj,ln != NULL);
5d1b4fb6
PN
1753 ele = ln->obj;
1754 addReplyBulk(c,ele);
1755 if (withscores)
1756 addReplyDouble(c,ln->score);
1757 ln = reverse ? ln->backward : ln->level[0].forward;
1758 }
1759 } else {
1760 redisPanic("Unknown sorted set encoding");
e2641e09 1761 }
1762}
1763
1764void zrangeCommand(redisClient *c) {
1765 zrangeGenericCommand(c,0);
1766}
1767
1768void zrevrangeCommand(redisClient *c) {
1769 zrangeGenericCommand(c,1);
1770}
1771
0cfc8940
PN
1772/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
1773void genericZrangebyscoreCommand(redisClient *c, int reverse) {
25bb8a44 1774 zrangespec range;
aff255c8 1775 robj *key = c->argv[1];
0cfc8940 1776 robj *zobj;
706b32e0 1777 long offset = 0, limit = -1;
e2641e09 1778 int withscores = 0;
25bb8a44
PN
1779 unsigned long rangelen = 0;
1780 void *replylen = NULL;
22b9bf15 1781 int minidx, maxidx;
e2641e09 1782
25bb8a44 1783 /* Parse the range arguments. */
22b9bf15
PN
1784 if (reverse) {
1785 /* Range is given as [max,min] */
1786 maxidx = 2; minidx = 3;
1787 } else {
1788 /* Range is given as [min,max] */
1789 minidx = 2; maxidx = 3;
1790 }
1791
1792 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
d93f9a86 1793 addReplyError(c,"min or max is not a float");
7236fdb2
PN
1794 return;
1795 }
25bb8a44
PN
1796
1797 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1798 * 4 arguments, so we'll never enter the following code path. */
1799 if (c->argc > 4) {
1800 int remaining = c->argc - 4;
1801 int pos = 4;
1802
1803 while (remaining) {
1804 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1805 pos++; remaining--;
1806 withscores = 1;
1807 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
706b32e0
B
1808 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != REDIS_OK) ||
1809 (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != REDIS_OK)) return;
25bb8a44
PN
1810 pos += 3; remaining -= 3;
1811 } else {
1812 addReply(c,shared.syntaxerr);
1813 return;
1814 }
1815 }
e2641e09 1816 }
25bb8a44
PN
1817
1818 /* Ok, lookup the key and get the range */
0cfc8940 1819 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
aff255c8 1820 checkType(c,zobj,REDIS_ZSET)) return;
25bb8a44 1821
aff255c8
PN
1822 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1823 unsigned char *zl = zobj->ptr;
1824 unsigned char *eptr, *sptr;
1825 unsigned char *vstr;
1826 unsigned int vlen;
1827 long long vlong;
1828 double score;
e2641e09 1829
aff255c8 1830 /* If reversed, get the last node in range as starting point. */
0cfc8940 1831 if (reverse) {
8588bfa3 1832 eptr = zzlLastInRange(zl,range);
0cfc8940 1833 } else {
8588bfa3 1834 eptr = zzlFirstInRange(zl,range);
0cfc8940 1835 }
e2641e09 1836
aff255c8
PN
1837 /* No "first" element in the specified interval. */
1838 if (eptr == NULL) {
0cfc8940 1839 addReply(c, shared.emptymultibulk);
aff255c8
PN
1840 return;
1841 }
e2641e09 1842
aff255c8 1843 /* Get score pointer for the first element. */
eab0e26e 1844 redisAssertWithInfo(c,zobj,eptr != NULL);
aff255c8 1845 sptr = ziplistNext(zl,eptr);
e2641e09 1846
aff255c8
PN
1847 /* We don't know in advance how many matching elements there are in the
1848 * list, so we push this object that will represent the multi-bulk
1849 * length in the output buffer, and will "fix" it later */
0cfc8940 1850 replylen = addDeferredMultiBulkLength(c);
aff255c8
PN
1851
1852 /* If there is an offset, just traverse the number of elements without
1853 * checking the score because that is done in the next loop. */
0cfc8940
PN
1854 while (eptr && offset--) {
1855 if (reverse) {
aff255c8 1856 zzlPrev(zl,&eptr,&sptr);
0cfc8940 1857 } else {
aff255c8 1858 zzlNext(zl,&eptr,&sptr);
0cfc8940
PN
1859 }
1860 }
aff255c8
PN
1861
1862 while (eptr && limit--) {
1863 score = zzlGetScore(sptr);
1864
1865 /* Abort when the node is no longer in range. */
1866 if (reverse) {
1867 if (!zslValueGteMin(score,&range)) break;
1868 } else {
1869 if (!zslValueLteMax(score,&range)) break;
1870 }
1871
0cfc8940 1872 /* We know the element exists, so ziplistGet should always succeed */
eab0e26e 1873 redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
0cfc8940 1874
aff255c8 1875 rangelen++;
0cfc8940
PN
1876 if (vstr == NULL) {
1877 addReplyBulkLongLong(c,vlong);
1878 } else {
1879 addReplyBulkCBuffer(c,vstr,vlen);
1880 }
1881
1882 if (withscores) {
1883 addReplyDouble(c,score);
aff255c8
PN
1884 }
1885
1886 /* Move to next node */
0cfc8940 1887 if (reverse) {
aff255c8 1888 zzlPrev(zl,&eptr,&sptr);
0cfc8940 1889 } else {
aff255c8 1890 zzlNext(zl,&eptr,&sptr);
0cfc8940 1891 }
e2641e09 1892 }
100ed062 1893 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
aff255c8
PN
1894 zset *zs = zobj->ptr;
1895 zskiplist *zsl = zs->zsl;
1896 zskiplistNode *ln;
25bb8a44 1897
aff255c8 1898 /* If reversed, get the last node in range as starting point. */
0cfc8940 1899 if (reverse) {
aff255c8 1900 ln = zslLastInRange(zsl,range);
0cfc8940 1901 } else {
aff255c8 1902 ln = zslFirstInRange(zsl,range);
0cfc8940 1903 }
aff255c8
PN
1904
1905 /* No "first" element in the specified interval. */
1906 if (ln == NULL) {
0cfc8940 1907 addReply(c, shared.emptymultibulk);
aff255c8 1908 return;
25bb8a44
PN
1909 }
1910
aff255c8
PN
1911 /* We don't know in advance how many matching elements there are in the
1912 * list, so we push this object that will represent the multi-bulk
1913 * length in the output buffer, and will "fix" it later */
0cfc8940 1914 replylen = addDeferredMultiBulkLength(c);
aff255c8
PN
1915
1916 /* If there is an offset, just traverse the number of elements without
1917 * checking the score because that is done in the next loop. */
0cfc8940
PN
1918 while (ln && offset--) {
1919 if (reverse) {
1920 ln = ln->backward;
1921 } else {
1922 ln = ln->level[0].forward;
1923 }
1924 }
aff255c8
PN
1925
1926 while (ln && limit--) {
1927 /* Abort when the node is no longer in range. */
1928 if (reverse) {
1929 if (!zslValueGteMin(ln->score,&range)) break;
1930 } else {
1931 if (!zslValueLteMax(ln->score,&range)) break;
1932 }
1933
aff255c8 1934 rangelen++;
0cfc8940
PN
1935 addReplyBulk(c,ln->obj);
1936
1937 if (withscores) {
1938 addReplyDouble(c,ln->score);
aff255c8
PN
1939 }
1940
1941 /* Move to next node */
0cfc8940
PN
1942 if (reverse) {
1943 ln = ln->backward;
1944 } else {
1945 ln = ln->level[0].forward;
1946 }
aff255c8
PN
1947 }
1948 } else {
1949 redisPanic("Unknown sorted set encoding");
25bb8a44
PN
1950 }
1951
0cfc8940
PN
1952 if (withscores) {
1953 rangelen *= 2;
e2641e09 1954 }
0cfc8940
PN
1955
1956 setDeferredMultiBulkLength(c, replylen, rangelen);
e2641e09 1957}
1958
1959void zrangebyscoreCommand(redisClient *c) {
0cfc8940 1960 genericZrangebyscoreCommand(c,0);
25bb8a44
PN
1961}
1962
1963void zrevrangebyscoreCommand(redisClient *c) {
0cfc8940 1964 genericZrangebyscoreCommand(c,1);
e2641e09 1965}
1966
1967void zcountCommand(redisClient *c) {
62d774e5
PN
1968 robj *key = c->argv[1];
1969 robj *zobj;
1970 zrangespec range;
1971 int count = 0;
1972
1973 /* Parse the range arguments */
1974 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
d93f9a86 1975 addReplyError(c,"min or max is not a float");
62d774e5
PN
1976 return;
1977 }
1978
1979 /* Lookup the sorted set */
1980 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
1981 checkType(c, zobj, REDIS_ZSET)) return;
1982
1983 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1984 unsigned char *zl = zobj->ptr;
1985 unsigned char *eptr, *sptr;
1986 double score;
1987
1988 /* Use the first element in range as the starting point */
1989 eptr = zzlFirstInRange(zl,range);
1990
1991 /* No "first" element */
1992 if (eptr == NULL) {
1993 addReply(c, shared.czero);
1994 return;
1995 }
1996
1997 /* First element is in range */
1998 sptr = ziplistNext(zl,eptr);
1999 score = zzlGetScore(sptr);
eab0e26e 2000 redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
62d774e5
PN
2001
2002 /* Iterate over elements in range */
2003 while (eptr) {
2004 score = zzlGetScore(sptr);
2005
2006 /* Abort when the node is no longer in range. */
2007 if (!zslValueLteMax(score,&range)) {
2008 break;
2009 } else {
2010 count++;
2011 zzlNext(zl,&eptr,&sptr);
2012 }
2013 }
2014 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2015 zset *zs = zobj->ptr;
2016 zskiplist *zsl = zs->zsl;
2017 zskiplistNode *zn;
2018 unsigned long rank;
2019
2020 /* Find first element in range */
2021 zn = zslFirstInRange(zsl, range);
2022
2023 /* Use rank of first element, if any, to determine preliminary count */
2024 if (zn != NULL) {
2025 rank = zslGetRank(zsl, zn->score, zn->obj);
2026 count = (zsl->length - (rank - 1));
2027
2028 /* Find last element in range */
2029 zn = zslLastInRange(zsl, range);
2030
2031 /* Use rank of last element, if any, to determine the actual count */
2032 if (zn != NULL) {
2033 rank = zslGetRank(zsl, zn->score, zn->obj);
2034 count -= (zsl->length - rank);
2035 }
2036 }
2037 } else {
2038 redisPanic("Unknown sorted set encoding");
2039 }
2040
2041 addReplyLongLong(c, count);
e2641e09 2042}
2043
2044void zcardCommand(redisClient *c) {
d1c920c5
PN
2045 robj *key = c->argv[1];
2046 robj *zobj;
e2641e09 2047
d1c920c5
PN
2048 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
2049 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 2050
df26a0ae 2051 addReplyLongLong(c,zsetLength(zobj));
e2641e09 2052}
2053
2054void zscoreCommand(redisClient *c) {
d1c920c5
PN
2055 robj *key = c->argv[1];
2056 robj *zobj;
2057 double score;
e2641e09 2058
d1c920c5
PN
2059 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2060 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 2061
d1c920c5 2062 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
8588bfa3 2063 if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
d1c920c5
PN
2064 addReplyDouble(c,score);
2065 else
2066 addReply(c,shared.nullbulk);
100ed062 2067 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
d1c920c5
PN
2068 zset *zs = zobj->ptr;
2069 dictEntry *de;
e2641e09 2070
d1c920c5
PN
2071 c->argv[2] = tryObjectEncoding(c->argv[2]);
2072 de = dictFind(zs->dict,c->argv[2]);
2073 if (de != NULL) {
c0ba9ebe 2074 score = *(double*)dictGetVal(de);
d1c920c5
PN
2075 addReplyDouble(c,score);
2076 } else {
2077 addReply(c,shared.nullbulk);
2078 }
2079 } else {
2080 redisPanic("Unknown sorted set encoding");
e2641e09 2081 }
2082}
2083
2084void zrankGenericCommand(redisClient *c, int reverse) {
d1c920c5
PN
2085 robj *key = c->argv[1];
2086 robj *ele = c->argv[2];
2087 robj *zobj;
2088 unsigned long llen;
e2641e09 2089 unsigned long rank;
e2641e09 2090
d1c920c5
PN
2091 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2092 checkType(c,zobj,REDIS_ZSET)) return;
df26a0ae 2093 llen = zsetLength(zobj);
e2641e09 2094
eab0e26e 2095 redisAssertWithInfo(c,ele,ele->encoding == REDIS_ENCODING_RAW);
d1c920c5
PN
2096 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2097 unsigned char *zl = zobj->ptr;
2098 unsigned char *eptr, *sptr;
e2641e09 2099
d1c920c5 2100 eptr = ziplistIndex(zl,0);
eab0e26e 2101 redisAssertWithInfo(c,zobj,eptr != NULL);
d1c920c5 2102 sptr = ziplistNext(zl,eptr);
eab0e26e 2103 redisAssertWithInfo(c,zobj,sptr != NULL);
d1c920c5
PN
2104
2105 rank = 1;
2106 while(eptr != NULL) {
2107 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
2108 break;
2109 rank++;
2110 zzlNext(zl,&eptr,&sptr);
2111 }
2112
2113 if (eptr != NULL) {
2114 if (reverse)
2115 addReplyLongLong(c,llen-rank);
2116 else
2117 addReplyLongLong(c,rank-1);
e2641e09 2118 } else {
d1c920c5
PN
2119 addReply(c,shared.nullbulk);
2120 }
100ed062 2121 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
d1c920c5
PN
2122 zset *zs = zobj->ptr;
2123 zskiplist *zsl = zs->zsl;
2124 dictEntry *de;
2125 double score;
2126
2127 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
2128 de = dictFind(zs->dict,ele);
2129 if (de != NULL) {
c0ba9ebe 2130 score = *(double*)dictGetVal(de);
d1c920c5 2131 rank = zslGetRank(zsl,score,ele);
eab0e26e 2132 redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
d1c920c5
PN
2133 if (reverse)
2134 addReplyLongLong(c,llen-rank);
2135 else
2136 addReplyLongLong(c,rank-1);
2137 } else {
2138 addReply(c,shared.nullbulk);
e2641e09 2139 }
2140 } else {
d1c920c5 2141 redisPanic("Unknown sorted set encoding");
e2641e09 2142 }
2143}
2144
2145void zrankCommand(redisClient *c) {
2146 zrankGenericCommand(c, 0);
2147}
2148
2149void zrevrankCommand(redisClient *c) {
2150 zrankGenericCommand(c, 1);
2151}