]> git.saurik.com Git - redis.git/blame - src/t_zset.c
Fix used function in ZCARD
[redis.git] / src / t_zset.c
CommitLineData
e2641e09 1#include "redis.h"
2
3#include <math.h>
4
5/*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9/* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17/* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
2159782b 27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
e2641e09 28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31}
32
33zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
2159782b
PN
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
e2641e09 44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48}
49
50void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
e2641e09 52 zfree(node);
53}
54
55void zslFree(zskiplist *zsl) {
2159782b 56 zskiplistNode *node = zsl->header->level[0].forward, *next;
e2641e09 57
e2641e09 58 zfree(zsl->header);
59 while(node) {
2159782b 60 next = node->level[0].forward;
e2641e09 61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65}
66
67int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72}
73
69ef89f2 74zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
e2641e09 75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
2159782b
PN
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
e2641e09 89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
2159782b 101 update[i]->level[i].span = zsl->length;
e2641e09 102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
2159782b
PN
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
e2641e09 109
110 /* update span covered by update[i] as x is inserted here */
2159782b
PN
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
e2641e09 113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
2159782b 117 update[i]->level[i].span++;
e2641e09 118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
2159782b
PN
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
e2641e09 123 else
124 zsl->tail = x;
125 zsl->length++;
69ef89f2 126 return x;
e2641e09 127}
128
129/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
2159782b
PN
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
e2641e09 136 } else {
2159782b 137 update[i]->level[i].span -= 1;
e2641e09 138 }
139 }
2159782b
PN
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
e2641e09 142 } else {
143 zsl->tail = x->backward;
144 }
2159782b 145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
e2641e09 146 zsl->level--;
147 zsl->length--;
148}
149
150/* Delete an element with matching score/object from the skiplist. */
151int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
e2641e09 162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
2159782b 166 x = x->level[0].forward;
e2641e09 167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175}
176
91504b6c
PN
177/* Struct to hold a inclusive/exclusive range spec. */
178typedef struct {
179 double min, max;
180 int minex, maxex; /* are min or max exclusive? */
181} zrangespec;
182
45290ad9 183static int zslValueGteMin(double value, zrangespec *spec) {
22b9bf15
PN
184 return spec->minex ? (value > spec->min) : (value >= spec->min);
185}
186
45290ad9 187static int zslValueLteMax(double value, zrangespec *spec) {
22b9bf15
PN
188 return spec->maxex ? (value < spec->max) : (value <= spec->max);
189}
190
df278b8b 191static int zslValueInRange(double value, zrangespec *spec) {
45290ad9 192 return zslValueGteMin(value,spec) && zslValueLteMax(value,spec);
22b9bf15
PN
193}
194
195/* Returns if there is a part of the zset is in range. */
196int zslIsInRange(zskiplist *zsl, zrangespec *range) {
197 zskiplistNode *x;
198
8e1b3277
PN
199 /* Test for ranges that will always be empty. */
200 if (range->min > range->max ||
201 (range->min == range->max && (range->minex || range->maxex)))
202 return 0;
22b9bf15 203 x = zsl->tail;
45290ad9 204 if (x == NULL || !zslValueGteMin(x->score,range))
22b9bf15
PN
205 return 0;
206 x = zsl->header->level[0].forward;
45290ad9 207 if (x == NULL || !zslValueLteMax(x->score,range))
22b9bf15
PN
208 return 0;
209 return 1;
210}
211
212/* Find the first node that is contained in the specified range.
213 * Returns NULL when no element is contained in the range. */
214zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
215 zskiplistNode *x;
216 int i;
217
218 /* If everything is out of range, return early. */
219 if (!zslIsInRange(zsl,&range)) return NULL;
220
221 x = zsl->header;
222 for (i = zsl->level-1; i >= 0; i--) {
223 /* Go forward while *OUT* of range. */
224 while (x->level[i].forward &&
45290ad9 225 !zslValueGteMin(x->level[i].forward->score,&range))
22b9bf15
PN
226 x = x->level[i].forward;
227 }
228
229 /* The tail is in range, so the previous block should always return a
230 * node that is non-NULL and the last one to be out of range. */
231 x = x->level[0].forward;
232 redisAssert(x != NULL && zslValueInRange(x->score,&range));
233 return x;
234}
235
236/* Find the last node that is contained in the specified range.
237 * Returns NULL when no element is contained in the range. */
238zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
239 zskiplistNode *x;
240 int i;
241
242 /* If everything is out of range, return early. */
243 if (!zslIsInRange(zsl,&range)) return NULL;
244
245 x = zsl->header;
246 for (i = zsl->level-1; i >= 0; i--) {
247 /* Go forward while *IN* range. */
248 while (x->level[i].forward &&
45290ad9 249 zslValueLteMax(x->level[i].forward->score,&range))
22b9bf15
PN
250 x = x->level[i].forward;
251 }
252
253 /* The header is in range, so the previous block should always return a
254 * node that is non-NULL and in range. */
255 redisAssert(x != NULL && zslValueInRange(x->score,&range));
256 return x;
257}
258
e2641e09 259/* Delete all the elements with score between min and max from the skiplist.
260 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
261 * Note that this function takes the reference to the hash table view of the
262 * sorted set, in order to remove the elements from the hash table too. */
91504b6c 263unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
e2641e09 264 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
265 unsigned long removed = 0;
266 int i;
267
268 x = zsl->header;
269 for (i = zsl->level-1; i >= 0; i--) {
91504b6c
PN
270 while (x->level[i].forward && (range.minex ?
271 x->level[i].forward->score <= range.min :
272 x->level[i].forward->score < range.min))
273 x = x->level[i].forward;
e2641e09 274 update[i] = x;
275 }
91504b6c
PN
276
277 /* Current node is the last with score < or <= min. */
2159782b 278 x = x->level[0].forward;
91504b6c
PN
279
280 /* Delete nodes while in range. */
281 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
2159782b 282 zskiplistNode *next = x->level[0].forward;
69ef89f2 283 zslDeleteNode(zsl,x,update);
e2641e09 284 dictDelete(dict,x->obj);
285 zslFreeNode(x);
286 removed++;
287 x = next;
288 }
91504b6c 289 return removed;
e2641e09 290}
291
292/* Delete all the elements with rank between start and end from the skiplist.
293 * Start and end are inclusive. Note that start and end need to be 1-based */
294unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
295 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
296 unsigned long traversed = 0, removed = 0;
297 int i;
298
299 x = zsl->header;
300 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
301 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
302 traversed += x->level[i].span;
303 x = x->level[i].forward;
e2641e09 304 }
305 update[i] = x;
306 }
307
308 traversed++;
2159782b 309 x = x->level[0].forward;
e2641e09 310 while (x && traversed <= end) {
2159782b 311 zskiplistNode *next = x->level[0].forward;
69ef89f2 312 zslDeleteNode(zsl,x,update);
e2641e09 313 dictDelete(dict,x->obj);
314 zslFreeNode(x);
315 removed++;
316 traversed++;
317 x = next;
318 }
319 return removed;
320}
321
e2641e09 322/* Find the rank for an element by both score and key.
323 * Returns 0 when the element cannot be found, rank otherwise.
324 * Note that the rank is 1-based due to the span of zsl->header to the
325 * first element. */
a3004773 326unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
e2641e09 327 zskiplistNode *x;
328 unsigned long rank = 0;
329 int i;
330
331 x = zsl->header;
332 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
333 while (x->level[i].forward &&
334 (x->level[i].forward->score < score ||
335 (x->level[i].forward->score == score &&
336 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
337 rank += x->level[i].span;
338 x = x->level[i].forward;
e2641e09 339 }
340
341 /* x might be equal to zsl->header, so test if obj is non-NULL */
342 if (x->obj && equalStringObjects(x->obj,o)) {
343 return rank;
344 }
345 }
346 return 0;
347}
348
349/* Finds an element by its rank. The rank argument needs to be 1-based. */
a3004773 350zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
e2641e09 351 zskiplistNode *x;
352 unsigned long traversed = 0;
353 int i;
354
355 x = zsl->header;
356 for (i = zsl->level-1; i >= 0; i--) {
2159782b 357 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
e2641e09 358 {
2159782b
PN
359 traversed += x->level[i].span;
360 x = x->level[i].forward;
e2641e09 361 }
362 if (traversed == rank) {
363 return x;
364 }
365 }
366 return NULL;
367}
368
25bb8a44 369/* Populate the rangespec according to the objects min and max. */
7236fdb2
PN
370static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
371 char *eptr;
25bb8a44
PN
372 spec->minex = spec->maxex = 0;
373
374 /* Parse the min-max interval. If one of the values is prefixed
375 * by the "(" character, it's considered "open". For instance
376 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
377 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
378 if (min->encoding == REDIS_ENCODING_INT) {
379 spec->min = (long)min->ptr;
380 } else {
381 if (((char*)min->ptr)[0] == '(') {
7236fdb2
PN
382 spec->min = strtod((char*)min->ptr+1,&eptr);
383 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
384 spec->minex = 1;
385 } else {
7236fdb2
PN
386 spec->min = strtod((char*)min->ptr,&eptr);
387 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
388 }
389 }
390 if (max->encoding == REDIS_ENCODING_INT) {
391 spec->max = (long)max->ptr;
392 } else {
393 if (((char*)max->ptr)[0] == '(') {
7236fdb2
PN
394 spec->max = strtod((char*)max->ptr+1,&eptr);
395 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
396 spec->maxex = 1;
397 } else {
7236fdb2
PN
398 spec->max = strtod((char*)max->ptr,&eptr);
399 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
400 }
401 }
402
403 return REDIS_OK;
404}
405
21c5b508
PN
406/*-----------------------------------------------------------------------------
407 * Ziplist-backed sorted set API
408 *----------------------------------------------------------------------------*/
409
410double zzlGetScore(unsigned char *sptr) {
411 unsigned char *vstr;
412 unsigned int vlen;
413 long long vlong;
414 char buf[128];
415 double score;
416
417 redisAssert(sptr != NULL);
418 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
419
420 if (vstr) {
421 memcpy(buf,vstr,vlen);
422 buf[vlen] = '\0';
423 score = strtod(buf,NULL);
424 } else {
425 score = vlong;
426 }
427
428 return score;
429}
430
431/* Compare element in sorted set with given element. */
432int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
433 unsigned char *vstr;
434 unsigned int vlen;
435 long long vlong;
436 unsigned char vbuf[32];
437 int minlen, cmp;
438
439 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
440 if (vstr == NULL) {
441 /* Store string representation of long long in buf. */
442 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
443 vstr = vbuf;
444 }
445
446 minlen = (vlen < clen) ? vlen : clen;
447 cmp = memcmp(vstr,cstr,minlen);
448 if (cmp == 0) return vlen-clen;
449 return cmp;
450}
451
9f9b60f9 452unsigned int zzlLength(robj *zobj) {
0b10e104 453 unsigned char *zl = zobj->ptr;
cc4c964b 454 redisAssert(zobj->encoding == REDIS_ENCODING_ZIPLIST);
0b10e104
PN
455 return ziplistLen(zl)/2;
456}
457
4c5f0966
PN
458/* Move to next entry based on the values in eptr and sptr. Both are set to
459 * NULL when there is no next entry. */
460void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
461 unsigned char *_eptr, *_sptr;
462 redisAssert(*eptr != NULL && *sptr != NULL);
463
464 _eptr = ziplistNext(zl,*sptr);
465 if (_eptr != NULL) {
466 _sptr = ziplistNext(zl,_eptr);
467 redisAssert(_sptr != NULL);
468 } else {
469 /* No next entry. */
470 _sptr = NULL;
471 }
472
473 *eptr = _eptr;
474 *sptr = _sptr;
475}
476
477/* Move to the previous entry based on the values in eptr and sptr. Both are
478 * set to NULL when there is no next entry. */
479void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
480 unsigned char *_eptr, *_sptr;
481 redisAssert(*eptr != NULL && *sptr != NULL);
482
483 _sptr = ziplistPrev(zl,*eptr);
484 if (_sptr != NULL) {
485 _eptr = ziplistPrev(zl,_sptr);
486 redisAssert(_eptr != NULL);
487 } else {
488 /* No previous entry. */
489 _eptr = NULL;
490 }
491
492 *eptr = _eptr;
493 *sptr = _sptr;
494}
495
4a14dbba
PN
496/* Returns if there is a part of the zset is in range. Should only be used
497 * internally by zzlFirstInRange and zzlLastInRange. */
498int zzlIsInRange(unsigned char *zl, zrangespec *range) {
499 unsigned char *p;
500 double score;
501
502 /* Test for ranges that will always be empty. */
503 if (range->min > range->max ||
504 (range->min == range->max && (range->minex || range->maxex)))
505 return 0;
506
507 p = ziplistIndex(zl,-1); /* Last score. */
508 redisAssert(p != NULL);
509 score = zzlGetScore(p);
510 if (!zslValueGteMin(score,range))
511 return 0;
512
513 p = ziplistIndex(zl,1); /* First score. */
514 redisAssert(p != NULL);
515 score = zzlGetScore(p);
516 if (!zslValueLteMax(score,range))
517 return 0;
518
519 return 1;
520}
521
522/* Find pointer to the first element contained in the specified range.
523 * Returns NULL when no element is contained in the range. */
524unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) {
525 unsigned char *zl = zobj->ptr;
526 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
527 double score;
528
529 /* If everything is out of range, return early. */
530 if (!zzlIsInRange(zl,&range)) return NULL;
531
532 while (eptr != NULL) {
533 sptr = ziplistNext(zl,eptr);
534 redisAssert(sptr != NULL);
535
536 score = zzlGetScore(sptr);
537 if (zslValueGteMin(score,&range))
538 return eptr;
539
540 /* Move to next element. */
541 eptr = ziplistNext(zl,sptr);
542 }
543
544 return NULL;
545}
546
547/* Find pointer to the last element contained in the specified range.
548 * Returns NULL when no element is contained in the range. */
549unsigned char *zzlLastInRange(robj *zobj, zrangespec range) {
550 unsigned char *zl = zobj->ptr;
551 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
552 double score;
553
554 /* If everything is out of range, return early. */
555 if (!zzlIsInRange(zl,&range)) return NULL;
556
557 while (eptr != NULL) {
558 sptr = ziplistNext(zl,eptr);
559 redisAssert(sptr != NULL);
560
561 score = zzlGetScore(sptr);
562 if (zslValueLteMax(score,&range))
563 return eptr;
564
565 /* Move to previous element by moving to the score of previous element.
566 * When this returns NULL, we know there also is no element. */
567 sptr = ziplistPrev(zl,eptr);
568 if (sptr != NULL)
569 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
570 else
571 eptr = NULL;
572 }
573
574 return NULL;
575}
576
21c5b508
PN
577unsigned char *zzlFind(robj *zobj, robj *ele, double *score) {
578 unsigned char *zl = zobj->ptr;
579 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
580
581 ele = getDecodedObject(ele);
582 while (eptr != NULL) {
583 sptr = ziplistNext(zl,eptr);
584 redisAssert(sptr != NULL);
585
586 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
587 /* Matching element, pull out score. */
0b10e104 588 if (score != NULL) *score = zzlGetScore(sptr);
21c5b508
PN
589 decrRefCount(ele);
590 return eptr;
591 }
592
593 /* Move to next element. */
594 eptr = ziplistNext(zl,sptr);
595 }
596
597 decrRefCount(ele);
598 return NULL;
599}
600
601/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
602 * don't want to modify the one given as argument. */
603int zzlDelete(robj *zobj, unsigned char *eptr) {
604 unsigned char *zl = zobj->ptr;
605 unsigned char *p = eptr;
606
607 /* TODO: add function to ziplist API to delete N elements from offset. */
608 zl = ziplistDelete(zl,&p);
609 zl = ziplistDelete(zl,&p);
610 zobj->ptr = zl;
611 return REDIS_OK;
612}
613
614int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) {
615 unsigned char *zl = zobj->ptr;
616 unsigned char *sptr;
617 char scorebuf[128];
618 int scorelen;
619 int offset;
620
621 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
622 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
623 if (eptr == NULL) {
624 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
625 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
626 } else {
627 /* Keep offset relative to zl, as it might be re-allocated. */
628 offset = eptr-zl;
629 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
630 eptr = zl+offset;
631
632 /* Insert score after the element. */
633 redisAssert((sptr = ziplistNext(zl,eptr)) != NULL);
634 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
635 }
636
637 zobj->ptr = zl;
638 return REDIS_OK;
639}
640
641/* Insert (element,score) pair in ziplist. This function assumes the element is
642 * not yet present in the list. */
643int zzlInsert(robj *zobj, robj *ele, double score) {
644 unsigned char *zl = zobj->ptr;
645 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
646 double s;
21c5b508
PN
647
648 ele = getDecodedObject(ele);
649 while (eptr != NULL) {
650 sptr = ziplistNext(zl,eptr);
651 redisAssert(sptr != NULL);
652 s = zzlGetScore(sptr);
653
654 if (s > score) {
655 /* First element with score larger than score for element to be
656 * inserted. This means we should take its spot in the list to
657 * maintain ordering. */
21c5b508
PN
658 zzlInsertAt(zobj,ele,score,eptr);
659 break;
8218db3d
PN
660 } else if (s == score) {
661 /* Ensure lexicographical ordering for elements. */
d1c920c5 662 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
8218db3d
PN
663 zzlInsertAt(zobj,ele,score,eptr);
664 break;
665 }
21c5b508
PN
666 }
667
668 /* Move to next element. */
669 eptr = ziplistNext(zl,sptr);
670 }
671
672 /* Push on tail of list when it was not yet inserted. */
8218db3d
PN
673 if (eptr == NULL)
674 zzlInsertAt(zobj,ele,score,NULL);
21c5b508
PN
675
676 decrRefCount(ele);
677 return REDIS_OK;
678}
25bb8a44 679
4a14dbba
PN
680unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) {
681 unsigned char *zl = zobj->ptr;
682 unsigned char *eptr, *sptr;
683 double score;
684 unsigned long deleted = 0;
685
686 eptr = zzlFirstInRange(zobj,range);
687 if (eptr == NULL) return deleted;
688
689
690 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
691 * byte and ziplistNext will return NULL. */
692 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
693 score = zzlGetScore(sptr);
694 if (zslValueLteMax(score,&range)) {
695 /* Delete both the element and the score. */
696 zl = ziplistDelete(zl,&eptr);
697 zl = ziplistDelete(zl,&eptr);
698 deleted++;
699 } else {
700 /* No longer in range. */
701 break;
702 }
703 }
704
705 return deleted;
706}
707
63b7b7fb
PN
708/* Delete all the elements with rank between start and end from the skiplist.
709 * Start and end are inclusive. Note that start and end need to be 1-based */
710unsigned long zzlDeleteRangeByRank(robj *zobj, unsigned int start, unsigned int end) {
711 unsigned int num = (end-start)+1;
712 zobj->ptr = ziplistDeleteRange(zobj->ptr,2*(start-1),2*num);
713 return num;
714}
715
5d1b4fb6
PN
716/*-----------------------------------------------------------------------------
717 * Common sorted set API
718 *----------------------------------------------------------------------------*/
719
720int zsLength(robj *zobj) {
721 int length = -1;
722 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
723 length = zzlLength(zobj);
724 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
725 length = ((zset*)zobj->ptr)->zsl->length;
726 } else {
727 redisPanic("Unknown sorted set encoding");
728 }
729 return length;
730}
731
a669d5e9
PN
732void zsConvert(robj *zobj, int encoding) {
733 zset *zs;
734 zskiplistNode *node, *next;
735 robj *ele;
736 double score;
737
738 if (zobj->encoding == encoding) return;
739 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
740 unsigned char *zl = zobj->ptr;
741 unsigned char *eptr, *sptr;
742 unsigned char *vstr;
743 unsigned int vlen;
744 long long vlong;
745
746 if (encoding != REDIS_ENCODING_RAW)
747 redisPanic("Unknown target encoding");
748
749 zs = zmalloc(sizeof(*zs));
750 zs->dict = dictCreate(&zsetDictType,NULL);
751 zs->zsl = zslCreate();
752
753 eptr = ziplistIndex(zl,0);
754 redisAssert(eptr != NULL);
755 sptr = ziplistNext(zl,eptr);
756 redisAssert(sptr != NULL);
757
758 while (eptr != NULL) {
759 score = zzlGetScore(sptr);
760 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
761 if (vstr == NULL)
762 ele = createStringObjectFromLongLong(vlong);
763 else
764 ele = createStringObject((char*)vstr,vlen);
765
766 /* Has incremented refcount since it was just created. */
767 node = zslInsert(zs->zsl,score,ele);
768 redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
769 incrRefCount(ele); /* Added to dictionary. */
770 zzlNext(zl,&eptr,&sptr);
771 }
772
773 zfree(zobj->ptr);
774 zobj->ptr = zs;
775 zobj->encoding = REDIS_ENCODING_RAW;
776 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
777 unsigned char *zl = ziplistNew();
778
779 if (encoding != REDIS_ENCODING_ZIPLIST)
780 redisPanic("Unknown target encoding");
781
782 /* Approach similar to zslFree(), since we want to free the skiplist at
783 * the same time as creating the ziplist. */
784 zs = zobj->ptr;
785 dictRelease(zs->dict);
786 node = zs->zsl->header->level[0].forward;
787 zfree(zs->zsl->header);
788 zfree(zs->zsl);
789
790 /* Immediately store pointer to ziplist in object because it will
791 * change because of reallocations when pushing to the ziplist. */
792 zobj->ptr = zl;
793
794 while (node) {
795 ele = getDecodedObject(node->obj);
796 redisAssert(zzlInsertAt(zobj,ele,node->score,NULL) == REDIS_OK);
797 decrRefCount(ele);
798
799 next = node->level[0].forward;
800 zslFreeNode(node);
801 node = next;
802 }
803
804 zfree(zs);
805 zobj->encoding = REDIS_ENCODING_ZIPLIST;
806 } else {
807 redisPanic("Unknown sorted set encoding");
808 }
809}
810
e2641e09 811/*-----------------------------------------------------------------------------
812 * Sorted set commands
813 *----------------------------------------------------------------------------*/
814
69ef89f2 815/* This generic command implements both ZADD and ZINCRBY. */
3ca7532a 816void zaddGenericCommand(redisClient *c, int incr) {
21c5b508 817 static char *nanerr = "resulting score is not a number (NaN)";
3ca7532a
PN
818 robj *key = c->argv[1];
819 robj *ele;
21c5b508
PN
820 robj *zobj;
821 robj *curobj;
3ca7532a
PN
822 double score, curscore = 0.0;
823
824 if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
825 return;
21c5b508
PN
826
827 zobj = lookupKeyWrite(c->db,key);
828 if (zobj == NULL) {
a669d5e9
PN
829 if (server.zset_max_ziplist_entries == 0 ||
830 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
831 {
832 zobj = createZsetObject();
833 } else {
834 zobj = createZsetZiplistObject();
835 }
21c5b508 836 dbAdd(c->db,key,zobj);
e2641e09 837 } else {
21c5b508 838 if (zobj->type != REDIS_ZSET) {
e2641e09 839 addReply(c,shared.wrongtypeerr);
840 return;
841 }
842 }
e2641e09 843
21c5b508
PN
844 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
845 unsigned char *eptr;
846
3ca7532a
PN
847 /* Prefer non-encoded element when dealing with ziplists. */
848 ele = c->argv[3];
21c5b508
PN
849 if ((eptr = zzlFind(zobj,ele,&curscore)) != NULL) {
850 if (incr) {
851 score += curscore;
852 if (isnan(score)) {
853 addReplyError(c,nanerr);
854 /* Don't need to check if the sorted set is empty, because
855 * we know it has at least one element. */
856 return;
857 }
858 }
859
860 /* Remove and re-insert when score changed. */
861 if (score != curscore) {
862 redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
863 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
864
865 signalModifiedKey(c->db,key);
866 server.dirty++;
867 }
868
869 if (incr) /* ZINCRBY */
870 addReplyDouble(c,score);
871 else /* ZADD */
872 addReply(c,shared.czero);
873 } else {
a669d5e9
PN
874 /* Optimize: check if the element is too large or the list becomes
875 * too long *before* executing zzlInsert. */
21c5b508 876 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
a669d5e9
PN
877 if (zzlLength(zobj) > server.zset_max_ziplist_entries)
878 zsConvert(zobj,REDIS_ENCODING_RAW);
879 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
880 zsConvert(zobj,REDIS_ENCODING_RAW);
69ef89f2 881
21c5b508
PN
882 signalModifiedKey(c->db,key);
883 server.dirty++;
69ef89f2 884
21c5b508
PN
885 if (incr) /* ZINCRBY */
886 addReplyDouble(c,score);
887 else /* ZADD */
888 addReply(c,shared.cone);
889 }
890 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
891 zset *zs = zobj->ptr;
892 zskiplistNode *znode;
e2641e09 893 dictEntry *de;
e2641e09 894
3ca7532a 895 ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
e2641e09 896 de = dictFind(zs->dict,ele);
21c5b508
PN
897 if (de != NULL) {
898 curobj = dictGetEntryKey(de);
899 curscore = *(double*)dictGetEntryVal(de);
900
901 if (incr) {
902 score += curscore;
903 if (isnan(score)) {
904 addReplyError(c,nanerr);
905 /* Don't need to check if the sorted set is empty, because
906 * we know it has at least one element. */
907 return;
908 }
909 }
910
911 /* Remove and re-insert when score changed. We can safely delete
912 * the key object from the skiplist, since the dictionary still has
913 * a reference to it. */
914 if (score != curscore) {
915 redisAssert(zslDelete(zs->zsl,curscore,curobj));
916 znode = zslInsert(zs->zsl,score,curobj);
917 incrRefCount(curobj); /* Re-inserted in skiplist. */
918 dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
919
920 signalModifiedKey(c->db,key);
921 server.dirty++;
922 }
923
924 if (incr) /* ZINCRBY */
925 addReplyDouble(c,score);
926 else /* ZADD */
927 addReply(c,shared.czero);
928 } else {
929 znode = zslInsert(zs->zsl,score,ele);
930 incrRefCount(ele); /* Inserted in skiplist. */
931 redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
932 incrRefCount(ele); /* Added to dictionary. */
933
934 signalModifiedKey(c->db,key);
e2641e09 935 server.dirty++;
21c5b508
PN
936
937 if (incr) /* ZINCRBY */
938 addReplyDouble(c,score);
939 else /* ZADD */
940 addReply(c,shared.cone);
e2641e09 941 }
21c5b508
PN
942 } else {
943 redisPanic("Unknown sorted set encoding");
e2641e09 944 }
945}
946
947void zaddCommand(redisClient *c) {
3ca7532a 948 zaddGenericCommand(c,0);
e2641e09 949}
950
951void zincrbyCommand(redisClient *c) {
3ca7532a 952 zaddGenericCommand(c,1);
e2641e09 953}
954
955void zremCommand(redisClient *c) {
0b10e104
PN
956 robj *key = c->argv[1];
957 robj *ele = c->argv[2];
958 robj *zobj;
959
960 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
961 checkType(c,zobj,REDIS_ZSET)) return;
962
963 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
964 unsigned char *eptr;
965
966 if ((eptr = zzlFind(zobj,ele,NULL)) != NULL) {
967 redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
968 if (zzlLength(zobj) == 0) dbDelete(c->db,key);
969 } else {
970 addReply(c,shared.czero);
971 return;
972 }
973 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
974 zset *zs = zobj->ptr;
975 dictEntry *de;
976 double score;
977
978 de = dictFind(zs->dict,ele);
979 if (de != NULL) {
980 /* Delete from the skiplist */
981 score = *(double*)dictGetEntryVal(de);
982 redisAssert(zslDelete(zs->zsl,score,ele));
983
984 /* Delete from the hash table */
985 dictDelete(zs->dict,ele);
986 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
987 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
988 } else {
989 addReply(c,shared.czero);
990 return;
991 }
992 } else {
993 redisPanic("Unknown sorted set encoding");
e2641e09 994 }
e2641e09 995
0b10e104 996 signalModifiedKey(c->db,key);
e2641e09 997 server.dirty++;
998 addReply(c,shared.cone);
999}
1000
1001void zremrangebyscoreCommand(redisClient *c) {
4a14dbba
PN
1002 robj *key = c->argv[1];
1003 robj *zobj;
91504b6c 1004 zrangespec range;
4a14dbba 1005 unsigned long deleted;
e2641e09 1006
91504b6c 1007 /* Parse the range arguments. */
7236fdb2
PN
1008 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1009 addReplyError(c,"min or max is not a double");
1010 return;
1011 }
e2641e09 1012
4a14dbba
PN
1013 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1014 checkType(c,zobj,REDIS_ZSET)) return;
1015
1016 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1017 deleted = zzlDeleteRangeByScore(zobj,range);
1018 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1019 zset *zs = zobj->ptr;
1020 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1021 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1022 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1023 } else {
1024 redisPanic("Unknown sorted set encoding");
1025 }
e2641e09 1026
4a14dbba 1027 if (deleted) signalModifiedKey(c->db,key);
e2641e09 1028 server.dirty += deleted;
1029 addReplyLongLong(c,deleted);
1030}
1031
1032void zremrangebyrankCommand(redisClient *c) {
63b7b7fb
PN
1033 robj *key = c->argv[1];
1034 robj *zobj;
e2641e09 1035 long start;
1036 long end;
1037 int llen;
63b7b7fb 1038 unsigned long deleted;
e2641e09 1039
1040 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1041 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1042
63b7b7fb
PN
1043 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1044 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1045
63b7b7fb
PN
1046 /* Sanitize indexes. */
1047 llen = zsLength(zobj);
e2641e09 1048 if (start < 0) start = llen+start;
1049 if (end < 0) end = llen+end;
1050 if (start < 0) start = 0;
e2641e09 1051
d0a4e24e
PN
1052 /* Invariant: start >= 0, so this test will be true when end < 0.
1053 * The range is empty when start > end or start >= length. */
e2641e09 1054 if (start > end || start >= llen) {
1055 addReply(c,shared.czero);
1056 return;
1057 }
1058 if (end >= llen) end = llen-1;
1059
63b7b7fb
PN
1060 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1061 /* Correct for 1-based rank. */
1062 deleted = zzlDeleteRangeByRank(zobj,start+1,end+1);
1063 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1064 zset *zs = zobj->ptr;
1065
1066 /* Correct for 1-based rank. */
1067 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1068 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1069 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1070 } else {
1071 redisPanic("Unknown sorted set encoding");
1072 }
1073
1074 if (deleted) signalModifiedKey(c->db,key);
e2641e09 1075 server.dirty += deleted;
63b7b7fb 1076 addReplyLongLong(c,deleted);
e2641e09 1077}
1078
1079typedef struct {
1080 dict *dict;
1081 double weight;
1082} zsetopsrc;
1083
1084int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) {
1085 zsetopsrc *d1 = (void*) s1, *d2 = (void*) s2;
1086 unsigned long size1, size2;
1087 size1 = d1->dict ? dictSize(d1->dict) : 0;
1088 size2 = d2->dict ? dictSize(d2->dict) : 0;
1089 return size1 - size2;
1090}
1091
1092#define REDIS_AGGR_SUM 1
1093#define REDIS_AGGR_MIN 2
1094#define REDIS_AGGR_MAX 3
1095#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
1096
1097inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1098 if (aggregate == REDIS_AGGR_SUM) {
1099 *target = *target + val;
d9e28bcf
PN
1100 /* The result of adding two doubles is NaN when one variable
1101 * is +inf and the other is -inf. When these numbers are added,
1102 * we maintain the convention of the result being 0.0. */
1103 if (isnan(*target)) *target = 0.0;
e2641e09 1104 } else if (aggregate == REDIS_AGGR_MIN) {
1105 *target = val < *target ? val : *target;
1106 } else if (aggregate == REDIS_AGGR_MAX) {
1107 *target = val > *target ? val : *target;
1108 } else {
1109 /* safety net */
1110 redisPanic("Unknown ZUNION/INTER aggregate type");
1111 }
1112}
1113
1114void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1115 int i, j, setnum;
1116 int aggregate = REDIS_AGGR_SUM;
1117 zsetopsrc *src;
1118 robj *dstobj;
1119 zset *dstzset;
69ef89f2 1120 zskiplistNode *znode;
e2641e09 1121 dictIterator *di;
1122 dictEntry *de;
8c1420ff 1123 int touched = 0;
e2641e09 1124
1125 /* expect setnum input keys to be given */
1126 setnum = atoi(c->argv[2]->ptr);
1127 if (setnum < 1) {
3ab20376
PN
1128 addReplyError(c,
1129 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
e2641e09 1130 return;
1131 }
1132
1133 /* test if the expected number of keys would overflow */
1134 if (3+setnum > c->argc) {
1135 addReply(c,shared.syntaxerr);
1136 return;
1137 }
1138
1139 /* read keys to be used for input */
1140 src = zmalloc(sizeof(zsetopsrc) * setnum);
1141 for (i = 0, j = 3; i < setnum; i++, j++) {
1142 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1143 if (!obj) {
1144 src[i].dict = NULL;
1145 } else {
1146 if (obj->type == REDIS_ZSET) {
1147 src[i].dict = ((zset*)obj->ptr)->dict;
1148 } else if (obj->type == REDIS_SET) {
1149 src[i].dict = (obj->ptr);
1150 } else {
1151 zfree(src);
1152 addReply(c,shared.wrongtypeerr);
1153 return;
1154 }
1155 }
1156
1157 /* default all weights to 1 */
1158 src[i].weight = 1.0;
1159 }
1160
1161 /* parse optional extra arguments */
1162 if (j < c->argc) {
1163 int remaining = c->argc - j;
1164
1165 while (remaining) {
1166 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1167 j++; remaining--;
1168 for (i = 0; i < setnum; i++, j++, remaining--) {
673e1fb7
PN
1169 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1170 "weight value is not a double") != REDIS_OK)
1171 {
1172 zfree(src);
e2641e09 1173 return;
673e1fb7 1174 }
e2641e09 1175 }
1176 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1177 j++; remaining--;
1178 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1179 aggregate = REDIS_AGGR_SUM;
1180 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1181 aggregate = REDIS_AGGR_MIN;
1182 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1183 aggregate = REDIS_AGGR_MAX;
1184 } else {
1185 zfree(src);
1186 addReply(c,shared.syntaxerr);
1187 return;
1188 }
1189 j++; remaining--;
1190 } else {
1191 zfree(src);
1192 addReply(c,shared.syntaxerr);
1193 return;
1194 }
1195 }
1196 }
1197
1198 /* sort sets from the smallest to largest, this will improve our
1199 * algorithm's performance */
1200 qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality);
1201
1202 dstobj = createZsetObject();
1203 dstzset = dstobj->ptr;
1204
1205 if (op == REDIS_OP_INTER) {
1206 /* skip going over all entries if the smallest zset is NULL or empty */
1207 if (src[0].dict && dictSize(src[0].dict) > 0) {
1208 /* precondition: as src[0].dict is non-empty and the zsets are ordered
1209 * from small to large, all src[i > 0].dict are non-empty too */
1210 di = dictGetIterator(src[0].dict);
1211 while((de = dictNext(di)) != NULL) {
d433ebc6 1212 double score, value;
e2641e09 1213
50a9fad5 1214 score = src[0].weight * zunionInterDictValue(de);
e2641e09 1215 for (j = 1; j < setnum; j++) {
1216 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
1217 if (other) {
1218 value = src[j].weight * zunionInterDictValue(other);
d433ebc6 1219 zunionInterAggregate(&score,value,aggregate);
e2641e09 1220 } else {
1221 break;
1222 }
1223 }
1224
d433ebc6
PN
1225 /* Only continue when present in every source dict. */
1226 if (j == setnum) {
e2641e09 1227 robj *o = dictGetEntryKey(de);
d433ebc6 1228 znode = zslInsert(dstzset->zsl,score,o);
e2641e09 1229 incrRefCount(o); /* added to skiplist */
69ef89f2
PN
1230 dictAdd(dstzset->dict,o,&znode->score);
1231 incrRefCount(o); /* added to dictionary */
e2641e09 1232 }
1233 }
1234 dictReleaseIterator(di);
1235 }
1236 } else if (op == REDIS_OP_UNION) {
1237 for (i = 0; i < setnum; i++) {
1238 if (!src[i].dict) continue;
1239
1240 di = dictGetIterator(src[i].dict);
1241 while((de = dictNext(di)) != NULL) {
d433ebc6
PN
1242 double score, value;
1243
e2641e09 1244 /* skip key when already processed */
d433ebc6
PN
1245 if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL)
1246 continue;
e2641e09 1247
d433ebc6
PN
1248 /* initialize score */
1249 score = src[i].weight * zunionInterDictValue(de);
e2641e09 1250
1251 /* because the zsets are sorted by size, its only possible
1252 * for sets at larger indices to hold this entry */
1253 for (j = (i+1); j < setnum; j++) {
1254 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
1255 if (other) {
1256 value = src[j].weight * zunionInterDictValue(other);
d433ebc6 1257 zunionInterAggregate(&score,value,aggregate);
e2641e09 1258 }
1259 }
1260
1261 robj *o = dictGetEntryKey(de);
d433ebc6 1262 znode = zslInsert(dstzset->zsl,score,o);
e2641e09 1263 incrRefCount(o); /* added to skiplist */
69ef89f2
PN
1264 dictAdd(dstzset->dict,o,&znode->score);
1265 incrRefCount(o); /* added to dictionary */
e2641e09 1266 }
1267 dictReleaseIterator(di);
1268 }
1269 } else {
1270 /* unknown operator */
1271 redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION);
1272 }
1273
8c1420ff 1274 if (dbDelete(c->db,dstkey)) {
cea8c5cd 1275 signalModifiedKey(c->db,dstkey);
8c1420ff 1276 touched = 1;
1277 server.dirty++;
1278 }
e2641e09 1279 if (dstzset->zsl->length) {
1280 dbAdd(c->db,dstkey,dstobj);
1281 addReplyLongLong(c, dstzset->zsl->length);
cea8c5cd 1282 if (!touched) signalModifiedKey(c->db,dstkey);
cbf7e107 1283 server.dirty++;
e2641e09 1284 } else {
1285 decrRefCount(dstobj);
1286 addReply(c, shared.czero);
1287 }
1288 zfree(src);
1289}
1290
1291void zunionstoreCommand(redisClient *c) {
1292 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1293}
1294
1295void zinterstoreCommand(redisClient *c) {
1296 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1297}
1298
1299void zrangeGenericCommand(redisClient *c, int reverse) {
5d1b4fb6
PN
1300 robj *key = c->argv[1];
1301 robj *zobj;
1302 int withscores = 0;
e2641e09 1303 long start;
1304 long end;
e2641e09 1305 int llen;
5d1b4fb6 1306 int rangelen;
e2641e09 1307
1308 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1309 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1310
1311 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1312 withscores = 1;
1313 } else if (c->argc >= 5) {
1314 addReply(c,shared.syntaxerr);
1315 return;
1316 }
1317
5d1b4fb6
PN
1318 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1319 || checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1320
5d1b4fb6
PN
1321 /* Sanitize indexes. */
1322 llen = zsLength(zobj);
e2641e09 1323 if (start < 0) start = llen+start;
1324 if (end < 0) end = llen+end;
1325 if (start < 0) start = 0;
e2641e09 1326
d0a4e24e
PN
1327 /* Invariant: start >= 0, so this test will be true when end < 0.
1328 * The range is empty when start > end or start >= length. */
e2641e09 1329 if (start > end || start >= llen) {
e2641e09 1330 addReply(c,shared.emptymultibulk);
1331 return;
1332 }
1333 if (end >= llen) end = llen-1;
1334 rangelen = (end-start)+1;
1335
e2641e09 1336 /* Return the result in form of a multi-bulk reply */
5d1b4fb6
PN
1337 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1338
1339 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1340 unsigned char *zl = zobj->ptr;
1341 unsigned char *eptr, *sptr;
1342 unsigned char *vstr;
1343 unsigned int vlen;
1344 long long vlong;
1345
1346 if (reverse)
1347 eptr = ziplistIndex(zl,-2-(2*start));
1348 else
1349 eptr = ziplistIndex(zl,2*start);
1350
4c5f0966
PN
1351 redisAssert(eptr != NULL);
1352 sptr = ziplistNext(zl,eptr);
1353
5d1b4fb6 1354 while (rangelen--) {
4c5f0966 1355 redisAssert(eptr != NULL && sptr != NULL);
5d1b4fb6
PN
1356 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1357 if (vstr == NULL)
1358 addReplyBulkLongLong(c,vlong);
1359 else
1360 addReplyBulkCBuffer(c,vstr,vlen);
1361
4c5f0966 1362 if (withscores)
5d1b4fb6 1363 addReplyDouble(c,zzlGetScore(sptr));
5d1b4fb6 1364
4c5f0966
PN
1365 if (reverse)
1366 zzlPrev(zl,&eptr,&sptr);
1367 else
1368 zzlNext(zl,&eptr,&sptr);
5d1b4fb6
PN
1369 }
1370
1371 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1372 zset *zs = zobj->ptr;
1373 zskiplist *zsl = zs->zsl;
1374 zskiplistNode *ln;
1375 robj *ele;
1376
1377 /* Check if starting point is trivial, before doing log(N) lookup. */
1378 if (reverse) {
1379 ln = zsl->tail;
1380 if (start > 0)
1381 ln = zslGetElementByRank(zsl,llen-start);
1382 } else {
1383 ln = zsl->header->level[0].forward;
1384 if (start > 0)
1385 ln = zslGetElementByRank(zsl,start+1);
1386 }
1387
1388 while(rangelen--) {
1389 redisAssert(ln != NULL);
1390 ele = ln->obj;
1391 addReplyBulk(c,ele);
1392 if (withscores)
1393 addReplyDouble(c,ln->score);
1394 ln = reverse ? ln->backward : ln->level[0].forward;
1395 }
1396 } else {
1397 redisPanic("Unknown sorted set encoding");
e2641e09 1398 }
1399}
1400
1401void zrangeCommand(redisClient *c) {
1402 zrangeGenericCommand(c,0);
1403}
1404
1405void zrevrangeCommand(redisClient *c) {
1406 zrangeGenericCommand(c,1);
1407}
1408
25bb8a44
PN
1409/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
1410 * If "justcount", only the number of elements in the range is returned. */
1411void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
1412 zrangespec range;
aff255c8
PN
1413 robj *key = c->argv[1];
1414 robj *emptyreply, *zobj;
e2641e09 1415 int offset = 0, limit = -1;
1416 int withscores = 0;
25bb8a44
PN
1417 unsigned long rangelen = 0;
1418 void *replylen = NULL;
22b9bf15 1419 int minidx, maxidx;
e2641e09 1420
25bb8a44 1421 /* Parse the range arguments. */
22b9bf15
PN
1422 if (reverse) {
1423 /* Range is given as [max,min] */
1424 maxidx = 2; minidx = 3;
1425 } else {
1426 /* Range is given as [min,max] */
1427 minidx = 2; maxidx = 3;
1428 }
1429
1430 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
7236fdb2
PN
1431 addReplyError(c,"min or max is not a double");
1432 return;
1433 }
25bb8a44
PN
1434
1435 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1436 * 4 arguments, so we'll never enter the following code path. */
1437 if (c->argc > 4) {
1438 int remaining = c->argc - 4;
1439 int pos = 4;
1440
1441 while (remaining) {
1442 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1443 pos++; remaining--;
1444 withscores = 1;
1445 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1446 offset = atoi(c->argv[pos+1]->ptr);
1447 limit = atoi(c->argv[pos+2]->ptr);
1448 pos += 3; remaining -= 3;
1449 } else {
1450 addReply(c,shared.syntaxerr);
1451 return;
1452 }
1453 }
e2641e09 1454 }
25bb8a44
PN
1455
1456 /* Ok, lookup the key and get the range */
1457 emptyreply = justcount ? shared.czero : shared.emptymultibulk;
aff255c8
PN
1458 if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL ||
1459 checkType(c,zobj,REDIS_ZSET)) return;
25bb8a44 1460
aff255c8
PN
1461 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1462 unsigned char *zl = zobj->ptr;
1463 unsigned char *eptr, *sptr;
1464 unsigned char *vstr;
1465 unsigned int vlen;
1466 long long vlong;
1467 double score;
e2641e09 1468
aff255c8
PN
1469 /* If reversed, get the last node in range as starting point. */
1470 if (reverse)
1471 eptr = zzlLastInRange(zobj,range);
1472 else
1473 eptr = zzlFirstInRange(zobj,range);
e2641e09 1474
aff255c8
PN
1475 /* No "first" element in the specified interval. */
1476 if (eptr == NULL) {
1477 addReply(c,emptyreply);
1478 return;
1479 }
e2641e09 1480
aff255c8
PN
1481 /* Get score pointer for the first element. */
1482 redisAssert(eptr != NULL);
1483 sptr = ziplistNext(zl,eptr);
e2641e09 1484
aff255c8
PN
1485 /* We don't know in advance how many matching elements there are in the
1486 * list, so we push this object that will represent the multi-bulk
1487 * length in the output buffer, and will "fix" it later */
1488 if (!justcount)
1489 replylen = addDeferredMultiBulkLength(c);
1490
1491 /* If there is an offset, just traverse the number of elements without
1492 * checking the score because that is done in the next loop. */
1493 while (eptr && offset--)
1494 if (reverse)
1495 zzlPrev(zl,&eptr,&sptr);
1496 else
1497 zzlNext(zl,&eptr,&sptr);
1498
1499 while (eptr && limit--) {
1500 score = zzlGetScore(sptr);
1501
1502 /* Abort when the node is no longer in range. */
1503 if (reverse) {
1504 if (!zslValueGteMin(score,&range)) break;
1505 } else {
1506 if (!zslValueLteMax(score,&range)) break;
1507 }
1508
1509 /* Do our magic */
1510 rangelen++;
1511 if (!justcount) {
1512 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1513 if (vstr == NULL)
1514 addReplyBulkLongLong(c,vlong);
1515 else
1516 addReplyBulkCBuffer(c,vstr,vlen);
1517
1518 if (withscores)
1519 addReplyDouble(c,score);
1520 }
1521
1522 /* Move to next node */
1523 if (reverse)
1524 zzlPrev(zl,&eptr,&sptr);
1525 else
1526 zzlNext(zl,&eptr,&sptr);
e2641e09 1527 }
aff255c8
PN
1528 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1529 zset *zs = zobj->ptr;
1530 zskiplist *zsl = zs->zsl;
1531 zskiplistNode *ln;
25bb8a44 1532
aff255c8
PN
1533 /* If reversed, get the last node in range as starting point. */
1534 if (reverse)
1535 ln = zslLastInRange(zsl,range);
1536 else
1537 ln = zslFirstInRange(zsl,range);
1538
1539 /* No "first" element in the specified interval. */
1540 if (ln == NULL) {
1541 addReply(c,emptyreply);
1542 return;
25bb8a44
PN
1543 }
1544
aff255c8
PN
1545 /* We don't know in advance how many matching elements there are in the
1546 * list, so we push this object that will represent the multi-bulk
1547 * length in the output buffer, and will "fix" it later */
1548 if (!justcount)
1549 replylen = addDeferredMultiBulkLength(c);
1550
1551 /* If there is an offset, just traverse the number of elements without
1552 * checking the score because that is done in the next loop. */
1553 while (ln && offset--)
1554 ln = reverse ? ln->backward : ln->level[0].forward;
1555
1556 while (ln && limit--) {
1557 /* Abort when the node is no longer in range. */
1558 if (reverse) {
1559 if (!zslValueGteMin(ln->score,&range)) break;
1560 } else {
1561 if (!zslValueLteMax(ln->score,&range)) break;
1562 }
1563
1564 /* Do our magic */
1565 rangelen++;
1566 if (!justcount) {
1567 addReplyBulk(c,ln->obj);
1568 if (withscores)
1569 addReplyDouble(c,ln->score);
1570 }
1571
1572 /* Move to next node */
1573 ln = reverse ? ln->backward : ln->level[0].forward;
1574 }
1575 } else {
1576 redisPanic("Unknown sorted set encoding");
25bb8a44
PN
1577 }
1578
1579 if (justcount) {
1580 addReplyLongLong(c,(long)rangelen);
1581 } else {
aff255c8
PN
1582 if (withscores) rangelen *= 2;
1583 setDeferredMultiBulkLength(c,replylen,rangelen);
e2641e09 1584 }
1585}
1586
1587void zrangebyscoreCommand(redisClient *c) {
25bb8a44
PN
1588 genericZrangebyscoreCommand(c,0,0);
1589}
1590
1591void zrevrangebyscoreCommand(redisClient *c) {
1592 genericZrangebyscoreCommand(c,1,0);
e2641e09 1593}
1594
1595void zcountCommand(redisClient *c) {
25bb8a44 1596 genericZrangebyscoreCommand(c,0,1);
e2641e09 1597}
1598
1599void zcardCommand(redisClient *c) {
d1c920c5
PN
1600 robj *key = c->argv[1];
1601 robj *zobj;
e2641e09 1602
d1c920c5
PN
1603 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
1604 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1605
cc4c964b 1606 addReplyLongLong(c,zsLength(zobj));
e2641e09 1607}
1608
1609void zscoreCommand(redisClient *c) {
d1c920c5
PN
1610 robj *key = c->argv[1];
1611 robj *zobj;
1612 double score;
e2641e09 1613
d1c920c5
PN
1614 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1615 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1616
d1c920c5
PN
1617 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1618 if (zzlFind(zobj,c->argv[2],&score) != NULL)
1619 addReplyDouble(c,score);
1620 else
1621 addReply(c,shared.nullbulk);
1622 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1623 zset *zs = zobj->ptr;
1624 dictEntry *de;
e2641e09 1625
d1c920c5
PN
1626 c->argv[2] = tryObjectEncoding(c->argv[2]);
1627 de = dictFind(zs->dict,c->argv[2]);
1628 if (de != NULL) {
1629 score = *(double*)dictGetEntryVal(de);
1630 addReplyDouble(c,score);
1631 } else {
1632 addReply(c,shared.nullbulk);
1633 }
1634 } else {
1635 redisPanic("Unknown sorted set encoding");
e2641e09 1636 }
1637}
1638
1639void zrankGenericCommand(redisClient *c, int reverse) {
d1c920c5
PN
1640 robj *key = c->argv[1];
1641 robj *ele = c->argv[2];
1642 robj *zobj;
1643 unsigned long llen;
e2641e09 1644 unsigned long rank;
e2641e09 1645
d1c920c5
PN
1646 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1647 checkType(c,zobj,REDIS_ZSET)) return;
1648 llen = zsLength(zobj);
e2641e09 1649
d1c920c5
PN
1650 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
1651 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1652 unsigned char *zl = zobj->ptr;
1653 unsigned char *eptr, *sptr;
e2641e09 1654
d1c920c5
PN
1655 eptr = ziplistIndex(zl,0);
1656 redisAssert(eptr != NULL);
1657 sptr = ziplistNext(zl,eptr);
1658 redisAssert(sptr != NULL);
1659
1660 rank = 1;
1661 while(eptr != NULL) {
1662 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
1663 break;
1664 rank++;
1665 zzlNext(zl,&eptr,&sptr);
1666 }
1667
1668 if (eptr != NULL) {
1669 if (reverse)
1670 addReplyLongLong(c,llen-rank);
1671 else
1672 addReplyLongLong(c,rank-1);
e2641e09 1673 } else {
d1c920c5
PN
1674 addReply(c,shared.nullbulk);
1675 }
1676 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
1677 zset *zs = zobj->ptr;
1678 zskiplist *zsl = zs->zsl;
1679 dictEntry *de;
1680 double score;
1681
1682 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
1683 de = dictFind(zs->dict,ele);
1684 if (de != NULL) {
1685 score = *(double*)dictGetEntryVal(de);
1686 rank = zslGetRank(zsl,score,ele);
1687 redisAssert(rank); /* Existing elements always have a rank. */
1688 if (reverse)
1689 addReplyLongLong(c,llen-rank);
1690 else
1691 addReplyLongLong(c,rank-1);
1692 } else {
1693 addReply(c,shared.nullbulk);
e2641e09 1694 }
1695 } else {
d1c920c5 1696 redisPanic("Unknown sorted set encoding");
e2641e09 1697 }
1698}
1699
1700void zrankCommand(redisClient *c) {
1701 zrankGenericCommand(c, 0);
1702}
1703
1704void zrevrankCommand(redisClient *c) {
1705 zrankGenericCommand(c, 1);
1706}