]> git.saurik.com Git - redis.git/blame - src/t_zset.c
Fast conversion of double when representable as long long
[redis.git] / src / t_zset.c
CommitLineData
e2641e09 1#include "redis.h"
2
3#include <math.h>
4
5/*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9/* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17/* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
2159782b 27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
e2641e09 28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31}
32
33zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
2159782b
PN
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
e2641e09 44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48}
49
50void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
e2641e09 52 zfree(node);
53}
54
55void zslFree(zskiplist *zsl) {
2159782b 56 zskiplistNode *node = zsl->header->level[0].forward, *next;
e2641e09 57
e2641e09 58 zfree(zsl->header);
59 while(node) {
2159782b 60 next = node->level[0].forward;
e2641e09 61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65}
66
67int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72}
73
69ef89f2 74zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
e2641e09 75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
2159782b
PN
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
e2641e09 89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
2159782b 101 update[i]->level[i].span = zsl->length;
e2641e09 102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
2159782b
PN
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
e2641e09 109
110 /* update span covered by update[i] as x is inserted here */
2159782b
PN
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
e2641e09 113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
2159782b 117 update[i]->level[i].span++;
e2641e09 118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
2159782b
PN
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
e2641e09 123 else
124 zsl->tail = x;
125 zsl->length++;
69ef89f2 126 return x;
e2641e09 127}
128
129/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
2159782b
PN
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
e2641e09 136 } else {
2159782b 137 update[i]->level[i].span -= 1;
e2641e09 138 }
139 }
2159782b
PN
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
e2641e09 142 } else {
143 zsl->tail = x->backward;
144 }
2159782b 145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
e2641e09 146 zsl->level--;
147 zsl->length--;
148}
149
150/* Delete an element with matching score/object from the skiplist. */
151int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
e2641e09 162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
2159782b 166 x = x->level[0].forward;
e2641e09 167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175}
176
91504b6c
PN
177/* Struct to hold a inclusive/exclusive range spec. */
178typedef struct {
179 double min, max;
180 int minex, maxex; /* are min or max exclusive? */
181} zrangespec;
182
45290ad9 183static int zslValueGteMin(double value, zrangespec *spec) {
22b9bf15
PN
184 return spec->minex ? (value > spec->min) : (value >= spec->min);
185}
186
45290ad9 187static int zslValueLteMax(double value, zrangespec *spec) {
22b9bf15
PN
188 return spec->maxex ? (value < spec->max) : (value <= spec->max);
189}
190
df278b8b 191static int zslValueInRange(double value, zrangespec *spec) {
45290ad9 192 return zslValueGteMin(value,spec) && zslValueLteMax(value,spec);
22b9bf15
PN
193}
194
195/* Returns if there is a part of the zset is in range. */
196int zslIsInRange(zskiplist *zsl, zrangespec *range) {
197 zskiplistNode *x;
198
8e1b3277
PN
199 /* Test for ranges that will always be empty. */
200 if (range->min > range->max ||
201 (range->min == range->max && (range->minex || range->maxex)))
202 return 0;
22b9bf15 203 x = zsl->tail;
45290ad9 204 if (x == NULL || !zslValueGteMin(x->score,range))
22b9bf15
PN
205 return 0;
206 x = zsl->header->level[0].forward;
45290ad9 207 if (x == NULL || !zslValueLteMax(x->score,range))
22b9bf15
PN
208 return 0;
209 return 1;
210}
211
212/* Find the first node that is contained in the specified range.
213 * Returns NULL when no element is contained in the range. */
214zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
215 zskiplistNode *x;
216 int i;
217
218 /* If everything is out of range, return early. */
219 if (!zslIsInRange(zsl,&range)) return NULL;
220
221 x = zsl->header;
222 for (i = zsl->level-1; i >= 0; i--) {
223 /* Go forward while *OUT* of range. */
224 while (x->level[i].forward &&
45290ad9 225 !zslValueGteMin(x->level[i].forward->score,&range))
22b9bf15
PN
226 x = x->level[i].forward;
227 }
228
229 /* The tail is in range, so the previous block should always return a
230 * node that is non-NULL and the last one to be out of range. */
231 x = x->level[0].forward;
232 redisAssert(x != NULL && zslValueInRange(x->score,&range));
233 return x;
234}
235
236/* Find the last node that is contained in the specified range.
237 * Returns NULL when no element is contained in the range. */
238zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
239 zskiplistNode *x;
240 int i;
241
242 /* If everything is out of range, return early. */
243 if (!zslIsInRange(zsl,&range)) return NULL;
244
245 x = zsl->header;
246 for (i = zsl->level-1; i >= 0; i--) {
247 /* Go forward while *IN* range. */
248 while (x->level[i].forward &&
45290ad9 249 zslValueLteMax(x->level[i].forward->score,&range))
22b9bf15
PN
250 x = x->level[i].forward;
251 }
252
253 /* The header is in range, so the previous block should always return a
254 * node that is non-NULL and in range. */
255 redisAssert(x != NULL && zslValueInRange(x->score,&range));
256 return x;
257}
258
e2641e09 259/* Delete all the elements with score between min and max from the skiplist.
260 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
261 * Note that this function takes the reference to the hash table view of the
262 * sorted set, in order to remove the elements from the hash table too. */
91504b6c 263unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
e2641e09 264 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
265 unsigned long removed = 0;
266 int i;
267
268 x = zsl->header;
269 for (i = zsl->level-1; i >= 0; i--) {
91504b6c
PN
270 while (x->level[i].forward && (range.minex ?
271 x->level[i].forward->score <= range.min :
272 x->level[i].forward->score < range.min))
273 x = x->level[i].forward;
e2641e09 274 update[i] = x;
275 }
91504b6c
PN
276
277 /* Current node is the last with score < or <= min. */
2159782b 278 x = x->level[0].forward;
91504b6c
PN
279
280 /* Delete nodes while in range. */
281 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
2159782b 282 zskiplistNode *next = x->level[0].forward;
69ef89f2 283 zslDeleteNode(zsl,x,update);
e2641e09 284 dictDelete(dict,x->obj);
285 zslFreeNode(x);
286 removed++;
287 x = next;
288 }
91504b6c 289 return removed;
e2641e09 290}
291
292/* Delete all the elements with rank between start and end from the skiplist.
293 * Start and end are inclusive. Note that start and end need to be 1-based */
294unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
295 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
296 unsigned long traversed = 0, removed = 0;
297 int i;
298
299 x = zsl->header;
300 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
301 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
302 traversed += x->level[i].span;
303 x = x->level[i].forward;
e2641e09 304 }
305 update[i] = x;
306 }
307
308 traversed++;
2159782b 309 x = x->level[0].forward;
e2641e09 310 while (x && traversed <= end) {
2159782b 311 zskiplistNode *next = x->level[0].forward;
69ef89f2 312 zslDeleteNode(zsl,x,update);
e2641e09 313 dictDelete(dict,x->obj);
314 zslFreeNode(x);
315 removed++;
316 traversed++;
317 x = next;
318 }
319 return removed;
320}
321
e2641e09 322/* Find the rank for an element by both score and key.
323 * Returns 0 when the element cannot be found, rank otherwise.
324 * Note that the rank is 1-based due to the span of zsl->header to the
325 * first element. */
a3004773 326unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
e2641e09 327 zskiplistNode *x;
328 unsigned long rank = 0;
329 int i;
330
331 x = zsl->header;
332 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
333 while (x->level[i].forward &&
334 (x->level[i].forward->score < score ||
335 (x->level[i].forward->score == score &&
336 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
337 rank += x->level[i].span;
338 x = x->level[i].forward;
e2641e09 339 }
340
341 /* x might be equal to zsl->header, so test if obj is non-NULL */
342 if (x->obj && equalStringObjects(x->obj,o)) {
343 return rank;
344 }
345 }
346 return 0;
347}
348
349/* Finds an element by its rank. The rank argument needs to be 1-based. */
a3004773 350zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
e2641e09 351 zskiplistNode *x;
352 unsigned long traversed = 0;
353 int i;
354
355 x = zsl->header;
356 for (i = zsl->level-1; i >= 0; i--) {
2159782b 357 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
e2641e09 358 {
2159782b
PN
359 traversed += x->level[i].span;
360 x = x->level[i].forward;
e2641e09 361 }
362 if (traversed == rank) {
363 return x;
364 }
365 }
366 return NULL;
367}
368
25bb8a44 369/* Populate the rangespec according to the objects min and max. */
7236fdb2
PN
370static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
371 char *eptr;
25bb8a44
PN
372 spec->minex = spec->maxex = 0;
373
374 /* Parse the min-max interval. If one of the values is prefixed
375 * by the "(" character, it's considered "open". For instance
376 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
377 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
378 if (min->encoding == REDIS_ENCODING_INT) {
379 spec->min = (long)min->ptr;
380 } else {
381 if (((char*)min->ptr)[0] == '(') {
7236fdb2
PN
382 spec->min = strtod((char*)min->ptr+1,&eptr);
383 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
384 spec->minex = 1;
385 } else {
7236fdb2
PN
386 spec->min = strtod((char*)min->ptr,&eptr);
387 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
388 }
389 }
390 if (max->encoding == REDIS_ENCODING_INT) {
391 spec->max = (long)max->ptr;
392 } else {
393 if (((char*)max->ptr)[0] == '(') {
7236fdb2
PN
394 spec->max = strtod((char*)max->ptr+1,&eptr);
395 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
396 spec->maxex = 1;
397 } else {
7236fdb2
PN
398 spec->max = strtod((char*)max->ptr,&eptr);
399 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
400 }
401 }
402
403 return REDIS_OK;
404}
405
406
e2641e09 407/*-----------------------------------------------------------------------------
408 * Sorted set commands
409 *----------------------------------------------------------------------------*/
410
69ef89f2
PN
411/* This generic command implements both ZADD and ZINCRBY. */
412void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int incr) {
e2641e09 413 robj *zsetobj;
414 zset *zs;
69ef89f2 415 zskiplistNode *znode;
e2641e09 416
e2641e09 417 zsetobj = lookupKeyWrite(c->db,key);
418 if (zsetobj == NULL) {
419 zsetobj = createZsetObject();
420 dbAdd(c->db,key,zsetobj);
421 } else {
422 if (zsetobj->type != REDIS_ZSET) {
423 addReply(c,shared.wrongtypeerr);
424 return;
425 }
426 }
427 zs = zsetobj->ptr;
428
69ef89f2
PN
429 /* Since both ZADD and ZINCRBY are implemented here, we need to increment
430 * the score first by the current score if ZINCRBY is called. */
431 if (incr) {
e2641e09 432 /* Read the old score. If the element was not present starts from 0 */
69ef89f2
PN
433 dictEntry *de = dictFind(zs->dict,ele);
434 if (de != NULL)
435 score += *(double*)dictGetEntryVal(de);
436
437 if (isnan(score)) {
3ab20376 438 addReplyError(c,"resulting score is not a number (NaN)");
e2641e09 439 /* Note that we don't need to check if the zset may be empty and
440 * should be removed here, as we can only obtain Nan as score if
441 * there was already an element in the sorted set. */
442 return;
443 }
e2641e09 444 }
445
69ef89f2
PN
446 /* We need to remove and re-insert the element when it was already present
447 * in the dictionary, to update the skiplist. Note that we delay adding a
448 * pointer to the score because we want to reference the score in the
449 * skiplist node. */
450 if (dictAdd(zs->dict,ele,NULL) == DICT_OK) {
451 dictEntry *de;
452
453 /* New element */
e2641e09 454 incrRefCount(ele); /* added to hash */
69ef89f2 455 znode = zslInsert(zs->zsl,score,ele);
e2641e09 456 incrRefCount(ele); /* added to skiplist */
69ef89f2
PN
457
458 /* Update the score in the dict entry */
459 de = dictFind(zs->dict,ele);
460 redisAssert(de != NULL);
461 dictGetEntryVal(de) = &znode->score;
cea8c5cd 462 signalModifiedKey(c->db,c->argv[1]);
e2641e09 463 server.dirty++;
69ef89f2
PN
464 if (incr)
465 addReplyDouble(c,score);
e2641e09 466 else
467 addReply(c,shared.cone);
468 } else {
469 dictEntry *de;
69ef89f2
PN
470 robj *curobj;
471 double *curscore;
472 int deleted;
e2641e09 473
69ef89f2 474 /* Update score */
e2641e09 475 de = dictFind(zs->dict,ele);
476 redisAssert(de != NULL);
69ef89f2
PN
477 curobj = dictGetEntryKey(de);
478 curscore = dictGetEntryVal(de);
e2641e09 479
69ef89f2
PN
480 /* When the score is updated, reuse the existing string object to
481 * prevent extra alloc/dealloc of strings on ZINCRBY. */
482 if (score != *curscore) {
483 deleted = zslDelete(zs->zsl,*curscore,curobj);
e2641e09 484 redisAssert(deleted != 0);
69ef89f2
PN
485 znode = zslInsert(zs->zsl,score,curobj);
486 incrRefCount(curobj);
487
488 /* Update the score in the current dict entry */
489 dictGetEntryVal(de) = &znode->score;
cea8c5cd 490 signalModifiedKey(c->db,c->argv[1]);
e2641e09 491 server.dirty++;
e2641e09 492 }
69ef89f2
PN
493 if (incr)
494 addReplyDouble(c,score);
e2641e09 495 else
496 addReply(c,shared.czero);
497 }
498}
499
500void zaddCommand(redisClient *c) {
501 double scoreval;
673e1fb7 502 if (getDoubleFromObjectOrReply(c,c->argv[2],&scoreval,NULL) != REDIS_OK) return;
75b41de8 503 c->argv[3] = tryObjectEncoding(c->argv[3]);
e2641e09 504 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
505}
506
507void zincrbyCommand(redisClient *c) {
508 double scoreval;
673e1fb7 509 if (getDoubleFromObjectOrReply(c,c->argv[2],&scoreval,NULL) != REDIS_OK) return;
75b41de8 510 c->argv[3] = tryObjectEncoding(c->argv[3]);
e2641e09 511 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
512}
513
514void zremCommand(redisClient *c) {
515 robj *zsetobj;
516 zset *zs;
517 dictEntry *de;
69ef89f2 518 double curscore;
e2641e09 519 int deleted;
520
521 if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
522 checkType(c,zsetobj,REDIS_ZSET)) return;
523
524 zs = zsetobj->ptr;
75b41de8 525 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 526 de = dictFind(zs->dict,c->argv[2]);
527 if (de == NULL) {
528 addReply(c,shared.czero);
529 return;
530 }
531 /* Delete from the skiplist */
69ef89f2
PN
532 curscore = *(double*)dictGetEntryVal(de);
533 deleted = zslDelete(zs->zsl,curscore,c->argv[2]);
e2641e09 534 redisAssert(deleted != 0);
535
536 /* Delete from the hash table */
537 dictDelete(zs->dict,c->argv[2]);
538 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
539 if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 540 signalModifiedKey(c->db,c->argv[1]);
e2641e09 541 server.dirty++;
542 addReply(c,shared.cone);
543}
544
545void zremrangebyscoreCommand(redisClient *c) {
91504b6c 546 zrangespec range;
e2641e09 547 long deleted;
91504b6c 548 robj *o;
e2641e09 549 zset *zs;
550
91504b6c 551 /* Parse the range arguments. */
7236fdb2
PN
552 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
553 addReplyError(c,"min or max is not a double");
554 return;
555 }
e2641e09 556
91504b6c
PN
557 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
558 checkType(c,o,REDIS_ZSET)) return;
e2641e09 559
91504b6c
PN
560 zs = o->ptr;
561 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
e2641e09 562 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
563 if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 564 if (deleted) signalModifiedKey(c->db,c->argv[1]);
e2641e09 565 server.dirty += deleted;
566 addReplyLongLong(c,deleted);
567}
568
569void zremrangebyrankCommand(redisClient *c) {
570 long start;
571 long end;
572 int llen;
573 long deleted;
574 robj *zsetobj;
575 zset *zs;
576
577 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
578 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
579
580 if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
581 checkType(c,zsetobj,REDIS_ZSET)) return;
582 zs = zsetobj->ptr;
583 llen = zs->zsl->length;
584
585 /* convert negative indexes */
586 if (start < 0) start = llen+start;
587 if (end < 0) end = llen+end;
588 if (start < 0) start = 0;
e2641e09 589
d0a4e24e
PN
590 /* Invariant: start >= 0, so this test will be true when end < 0.
591 * The range is empty when start > end or start >= length. */
e2641e09 592 if (start > end || start >= llen) {
593 addReply(c,shared.czero);
594 return;
595 }
596 if (end >= llen) end = llen-1;
597
598 /* increment start and end because zsl*Rank functions
599 * use 1-based rank */
600 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
601 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
602 if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 603 if (deleted) signalModifiedKey(c->db,c->argv[1]);
e2641e09 604 server.dirty += deleted;
605 addReplyLongLong(c, deleted);
606}
607
608typedef struct {
609 dict *dict;
610 double weight;
611} zsetopsrc;
612
613int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) {
614 zsetopsrc *d1 = (void*) s1, *d2 = (void*) s2;
615 unsigned long size1, size2;
616 size1 = d1->dict ? dictSize(d1->dict) : 0;
617 size2 = d2->dict ? dictSize(d2->dict) : 0;
618 return size1 - size2;
619}
620
621#define REDIS_AGGR_SUM 1
622#define REDIS_AGGR_MIN 2
623#define REDIS_AGGR_MAX 3
624#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
625
626inline static void zunionInterAggregate(double *target, double val, int aggregate) {
627 if (aggregate == REDIS_AGGR_SUM) {
628 *target = *target + val;
d9e28bcf
PN
629 /* The result of adding two doubles is NaN when one variable
630 * is +inf and the other is -inf. When these numbers are added,
631 * we maintain the convention of the result being 0.0. */
632 if (isnan(*target)) *target = 0.0;
e2641e09 633 } else if (aggregate == REDIS_AGGR_MIN) {
634 *target = val < *target ? val : *target;
635 } else if (aggregate == REDIS_AGGR_MAX) {
636 *target = val > *target ? val : *target;
637 } else {
638 /* safety net */
639 redisPanic("Unknown ZUNION/INTER aggregate type");
640 }
641}
642
643void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
644 int i, j, setnum;
645 int aggregate = REDIS_AGGR_SUM;
646 zsetopsrc *src;
647 robj *dstobj;
648 zset *dstzset;
69ef89f2 649 zskiplistNode *znode;
e2641e09 650 dictIterator *di;
651 dictEntry *de;
8c1420ff 652 int touched = 0;
e2641e09 653
654 /* expect setnum input keys to be given */
655 setnum = atoi(c->argv[2]->ptr);
656 if (setnum < 1) {
3ab20376
PN
657 addReplyError(c,
658 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
e2641e09 659 return;
660 }
661
662 /* test if the expected number of keys would overflow */
663 if (3+setnum > c->argc) {
664 addReply(c,shared.syntaxerr);
665 return;
666 }
667
668 /* read keys to be used for input */
669 src = zmalloc(sizeof(zsetopsrc) * setnum);
670 for (i = 0, j = 3; i < setnum; i++, j++) {
671 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
672 if (!obj) {
673 src[i].dict = NULL;
674 } else {
675 if (obj->type == REDIS_ZSET) {
676 src[i].dict = ((zset*)obj->ptr)->dict;
677 } else if (obj->type == REDIS_SET) {
678 src[i].dict = (obj->ptr);
679 } else {
680 zfree(src);
681 addReply(c,shared.wrongtypeerr);
682 return;
683 }
684 }
685
686 /* default all weights to 1 */
687 src[i].weight = 1.0;
688 }
689
690 /* parse optional extra arguments */
691 if (j < c->argc) {
692 int remaining = c->argc - j;
693
694 while (remaining) {
695 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
696 j++; remaining--;
697 for (i = 0; i < setnum; i++, j++, remaining--) {
673e1fb7
PN
698 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
699 "weight value is not a double") != REDIS_OK)
700 {
701 zfree(src);
e2641e09 702 return;
673e1fb7 703 }
e2641e09 704 }
705 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
706 j++; remaining--;
707 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
708 aggregate = REDIS_AGGR_SUM;
709 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
710 aggregate = REDIS_AGGR_MIN;
711 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
712 aggregate = REDIS_AGGR_MAX;
713 } else {
714 zfree(src);
715 addReply(c,shared.syntaxerr);
716 return;
717 }
718 j++; remaining--;
719 } else {
720 zfree(src);
721 addReply(c,shared.syntaxerr);
722 return;
723 }
724 }
725 }
726
727 /* sort sets from the smallest to largest, this will improve our
728 * algorithm's performance */
729 qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality);
730
731 dstobj = createZsetObject();
732 dstzset = dstobj->ptr;
733
734 if (op == REDIS_OP_INTER) {
735 /* skip going over all entries if the smallest zset is NULL or empty */
736 if (src[0].dict && dictSize(src[0].dict) > 0) {
737 /* precondition: as src[0].dict is non-empty and the zsets are ordered
738 * from small to large, all src[i > 0].dict are non-empty too */
739 di = dictGetIterator(src[0].dict);
740 while((de = dictNext(di)) != NULL) {
d433ebc6 741 double score, value;
e2641e09 742
50a9fad5 743 score = src[0].weight * zunionInterDictValue(de);
e2641e09 744 for (j = 1; j < setnum; j++) {
745 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
746 if (other) {
747 value = src[j].weight * zunionInterDictValue(other);
d433ebc6 748 zunionInterAggregate(&score,value,aggregate);
e2641e09 749 } else {
750 break;
751 }
752 }
753
d433ebc6
PN
754 /* Only continue when present in every source dict. */
755 if (j == setnum) {
e2641e09 756 robj *o = dictGetEntryKey(de);
d433ebc6 757 znode = zslInsert(dstzset->zsl,score,o);
e2641e09 758 incrRefCount(o); /* added to skiplist */
69ef89f2
PN
759 dictAdd(dstzset->dict,o,&znode->score);
760 incrRefCount(o); /* added to dictionary */
e2641e09 761 }
762 }
763 dictReleaseIterator(di);
764 }
765 } else if (op == REDIS_OP_UNION) {
766 for (i = 0; i < setnum; i++) {
767 if (!src[i].dict) continue;
768
769 di = dictGetIterator(src[i].dict);
770 while((de = dictNext(di)) != NULL) {
d433ebc6
PN
771 double score, value;
772
e2641e09 773 /* skip key when already processed */
d433ebc6
PN
774 if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL)
775 continue;
e2641e09 776
d433ebc6
PN
777 /* initialize score */
778 score = src[i].weight * zunionInterDictValue(de);
e2641e09 779
780 /* because the zsets are sorted by size, its only possible
781 * for sets at larger indices to hold this entry */
782 for (j = (i+1); j < setnum; j++) {
783 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
784 if (other) {
785 value = src[j].weight * zunionInterDictValue(other);
d433ebc6 786 zunionInterAggregate(&score,value,aggregate);
e2641e09 787 }
788 }
789
790 robj *o = dictGetEntryKey(de);
d433ebc6 791 znode = zslInsert(dstzset->zsl,score,o);
e2641e09 792 incrRefCount(o); /* added to skiplist */
69ef89f2
PN
793 dictAdd(dstzset->dict,o,&znode->score);
794 incrRefCount(o); /* added to dictionary */
e2641e09 795 }
796 dictReleaseIterator(di);
797 }
798 } else {
799 /* unknown operator */
800 redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION);
801 }
802
8c1420ff 803 if (dbDelete(c->db,dstkey)) {
cea8c5cd 804 signalModifiedKey(c->db,dstkey);
8c1420ff 805 touched = 1;
806 server.dirty++;
807 }
e2641e09 808 if (dstzset->zsl->length) {
809 dbAdd(c->db,dstkey,dstobj);
810 addReplyLongLong(c, dstzset->zsl->length);
cea8c5cd 811 if (!touched) signalModifiedKey(c->db,dstkey);
cbf7e107 812 server.dirty++;
e2641e09 813 } else {
814 decrRefCount(dstobj);
815 addReply(c, shared.czero);
816 }
817 zfree(src);
818}
819
820void zunionstoreCommand(redisClient *c) {
821 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
822}
823
824void zinterstoreCommand(redisClient *c) {
825 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
826}
827
828void zrangeGenericCommand(redisClient *c, int reverse) {
829 robj *o;
830 long start;
831 long end;
832 int withscores = 0;
833 int llen;
834 int rangelen, j;
835 zset *zsetobj;
836 zskiplist *zsl;
837 zskiplistNode *ln;
838 robj *ele;
839
840 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
841 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
842
843 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
844 withscores = 1;
845 } else if (c->argc >= 5) {
846 addReply(c,shared.syntaxerr);
847 return;
848 }
849
850 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
851 || checkType(c,o,REDIS_ZSET)) return;
852 zsetobj = o->ptr;
853 zsl = zsetobj->zsl;
854 llen = zsl->length;
855
856 /* convert negative indexes */
857 if (start < 0) start = llen+start;
858 if (end < 0) end = llen+end;
859 if (start < 0) start = 0;
e2641e09 860
d0a4e24e
PN
861 /* Invariant: start >= 0, so this test will be true when end < 0.
862 * The range is empty when start > end or start >= length. */
e2641e09 863 if (start > end || start >= llen) {
e2641e09 864 addReply(c,shared.emptymultibulk);
865 return;
866 }
867 if (end >= llen) end = llen-1;
868 rangelen = (end-start)+1;
869
870 /* check if starting point is trivial, before searching
871 * the element in log(N) time */
872 if (reverse) {
a3004773 873 ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start);
e2641e09 874 } else {
875 ln = start == 0 ?
a3004773 876 zsl->header->level[0].forward : zslGetElementByRank(zsl, start+1);
e2641e09 877 }
878
879 /* Return the result in form of a multi-bulk reply */
0537e7bf 880 addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen);
e2641e09 881 for (j = 0; j < rangelen; j++) {
882 ele = ln->obj;
883 addReplyBulk(c,ele);
884 if (withscores)
885 addReplyDouble(c,ln->score);
2159782b 886 ln = reverse ? ln->backward : ln->level[0].forward;
e2641e09 887 }
888}
889
890void zrangeCommand(redisClient *c) {
891 zrangeGenericCommand(c,0);
892}
893
894void zrevrangeCommand(redisClient *c) {
895 zrangeGenericCommand(c,1);
896}
897
25bb8a44
PN
898/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
899 * If "justcount", only the number of elements in the range is returned. */
900void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
901 zrangespec range;
902 robj *o, *emptyreply;
903 zset *zsetobj;
904 zskiplist *zsl;
905 zskiplistNode *ln;
e2641e09 906 int offset = 0, limit = -1;
907 int withscores = 0;
25bb8a44
PN
908 unsigned long rangelen = 0;
909 void *replylen = NULL;
22b9bf15 910 int minidx, maxidx;
e2641e09 911
25bb8a44 912 /* Parse the range arguments. */
22b9bf15
PN
913 if (reverse) {
914 /* Range is given as [max,min] */
915 maxidx = 2; minidx = 3;
916 } else {
917 /* Range is given as [min,max] */
918 minidx = 2; maxidx = 3;
919 }
920
921 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
7236fdb2
PN
922 addReplyError(c,"min or max is not a double");
923 return;
924 }
25bb8a44
PN
925
926 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
927 * 4 arguments, so we'll never enter the following code path. */
928 if (c->argc > 4) {
929 int remaining = c->argc - 4;
930 int pos = 4;
931
932 while (remaining) {
933 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
934 pos++; remaining--;
935 withscores = 1;
936 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
937 offset = atoi(c->argv[pos+1]->ptr);
938 limit = atoi(c->argv[pos+2]->ptr);
939 pos += 3; remaining -= 3;
940 } else {
941 addReply(c,shared.syntaxerr);
942 return;
943 }
944 }
e2641e09 945 }
25bb8a44
PN
946
947 /* Ok, lookup the key and get the range */
948 emptyreply = justcount ? shared.czero : shared.emptymultibulk;
949 if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyreply)) == NULL ||
950 checkType(c,o,REDIS_ZSET)) return;
951 zsetobj = o->ptr;
952 zsl = zsetobj->zsl;
953
22b9bf15 954 /* If reversed, get the last node in range as starting point. */
25bb8a44 955 if (reverse) {
22b9bf15 956 ln = zslLastInRange(zsl,range);
e2641e09 957 } else {
22b9bf15 958 ln = zslFirstInRange(zsl,range);
e2641e09 959 }
960
25bb8a44
PN
961 /* No "first" element in the specified interval. */
962 if (ln == NULL) {
963 addReply(c,emptyreply);
e2641e09 964 return;
965 }
966
22b9bf15
PN
967 /* We don't know in advance how many matching elements there are in the
968 * list, so we push this object that will represent the multi-bulk length
969 * in the output buffer, and will "fix" it later */
25bb8a44
PN
970 if (!justcount)
971 replylen = addDeferredMultiBulkLength(c);
e2641e09 972
25bb8a44
PN
973 /* If there is an offset, just traverse the number of elements without
974 * checking the score because that is done in the next loop. */
975 while(ln && offset--) {
22b9bf15 976 ln = reverse ? ln->backward : ln->level[0].forward;
25bb8a44 977 }
e2641e09 978
25bb8a44 979 while (ln && limit--) {
22b9bf15 980 /* Abort when the node is no longer in range. */
25bb8a44 981 if (reverse) {
45290ad9 982 if (!zslValueGteMin(ln->score,&range)) break;
25bb8a44 983 } else {
45290ad9 984 if (!zslValueLteMax(ln->score,&range)) break;
e2641e09 985 }
25bb8a44
PN
986
987 /* Do our magic */
988 rangelen++;
989 if (!justcount) {
990 addReplyBulk(c,ln->obj);
991 if (withscores)
992 addReplyDouble(c,ln->score);
993 }
994
22b9bf15
PN
995 /* Move to next node */
996 ln = reverse ? ln->backward : ln->level[0].forward;
25bb8a44
PN
997 }
998
999 if (justcount) {
1000 addReplyLongLong(c,(long)rangelen);
1001 } else {
1002 setDeferredMultiBulkLength(c,replylen,
1003 withscores ? (rangelen*2) : rangelen);
e2641e09 1004 }
1005}
1006
1007void zrangebyscoreCommand(redisClient *c) {
25bb8a44
PN
1008 genericZrangebyscoreCommand(c,0,0);
1009}
1010
1011void zrevrangebyscoreCommand(redisClient *c) {
1012 genericZrangebyscoreCommand(c,1,0);
e2641e09 1013}
1014
1015void zcountCommand(redisClient *c) {
25bb8a44 1016 genericZrangebyscoreCommand(c,0,1);
e2641e09 1017}
1018
1019void zcardCommand(redisClient *c) {
1020 robj *o;
1021 zset *zs;
1022
1023 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
1024 checkType(c,o,REDIS_ZSET)) return;
1025
1026 zs = o->ptr;
b70d3555 1027 addReplyLongLong(c,zs->zsl->length);
e2641e09 1028}
1029
1030void zscoreCommand(redisClient *c) {
1031 robj *o;
1032 zset *zs;
1033 dictEntry *de;
1034
1035 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
1036 checkType(c,o,REDIS_ZSET)) return;
1037
1038 zs = o->ptr;
75b41de8 1039 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 1040 de = dictFind(zs->dict,c->argv[2]);
1041 if (!de) {
1042 addReply(c,shared.nullbulk);
1043 } else {
1044 double *score = dictGetEntryVal(de);
1045
1046 addReplyDouble(c,*score);
1047 }
1048}
1049
1050void zrankGenericCommand(redisClient *c, int reverse) {
1051 robj *o;
1052 zset *zs;
1053 zskiplist *zsl;
1054 dictEntry *de;
1055 unsigned long rank;
1056 double *score;
1057
1058 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
1059 checkType(c,o,REDIS_ZSET)) return;
1060
1061 zs = o->ptr;
1062 zsl = zs->zsl;
75b41de8 1063 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 1064 de = dictFind(zs->dict,c->argv[2]);
1065 if (!de) {
1066 addReply(c,shared.nullbulk);
1067 return;
1068 }
1069
1070 score = dictGetEntryVal(de);
a3004773 1071 rank = zslGetRank(zsl, *score, c->argv[2]);
e2641e09 1072 if (rank) {
1073 if (reverse) {
1074 addReplyLongLong(c, zsl->length - rank);
1075 } else {
1076 addReplyLongLong(c, rank-1);
1077 }
1078 } else {
1079 addReply(c,shared.nullbulk);
1080 }
1081}
1082
1083void zrankCommand(redisClient *c) {
1084 zrankGenericCommand(c, 0);
1085}
1086
1087void zrevrankCommand(redisClient *c) {
1088 zrankGenericCommand(c, 1);
1089}