]> git.saurik.com Git - redis.git/blob - src/t_zset.c
e1c61772f857f9b547ac4cfdfdb0f0e003f093f3
[redis.git] / src / t_zset.c
1 #include "redis.h"
2
3 #include <math.h>
4
5 /*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31 }
32
33 zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48 }
49
50 void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
52 zfree(node);
53 }
54
55 void zslFree(zskiplist *zsl) {
56 zskiplistNode *node = zsl->header->level[0].forward, *next;
57
58 zfree(zsl->header);
59 while(node) {
60 next = node->level[0].forward;
61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65 }
66
67 int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72 }
73
74 zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
101 update[i]->level[i].span = zsl->length;
102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
109
110 /* update span covered by update[i] as x is inserted here */
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
117 update[i]->level[i].span++;
118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
123 else
124 zsl->tail = x;
125 zsl->length++;
126 return x;
127 }
128
129 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
136 } else {
137 update[i]->level[i].span -= 1;
138 }
139 }
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
142 } else {
143 zsl->tail = x->backward;
144 }
145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
146 zsl->level--;
147 zsl->length--;
148 }
149
150 /* Delete an element with matching score/object from the skiplist. */
151 int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
166 x = x->level[0].forward;
167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175 }
176
177 /* Struct to hold a inclusive/exclusive range spec. */
178 typedef struct {
179 double min, max;
180 int minex, maxex; /* are min or max exclusive? */
181 } zrangespec;
182
183 static int zslValueGteMin(double value, zrangespec *spec) {
184 return spec->minex ? (value > spec->min) : (value >= spec->min);
185 }
186
187 static int zslValueLteMax(double value, zrangespec *spec) {
188 return spec->maxex ? (value < spec->max) : (value <= spec->max);
189 }
190
191 static int zslValueInRange(double value, zrangespec *spec) {
192 return zslValueGteMin(value,spec) && zslValueLteMax(value,spec);
193 }
194
195 /* Returns if there is a part of the zset is in range. */
196 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
197 zskiplistNode *x;
198
199 /* Test for ranges that will always be empty. */
200 if (range->min > range->max ||
201 (range->min == range->max && (range->minex || range->maxex)))
202 return 0;
203 x = zsl->tail;
204 if (x == NULL || !zslValueGteMin(x->score,range))
205 return 0;
206 x = zsl->header->level[0].forward;
207 if (x == NULL || !zslValueLteMax(x->score,range))
208 return 0;
209 return 1;
210 }
211
212 /* Find the first node that is contained in the specified range.
213 * Returns NULL when no element is contained in the range. */
214 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
215 zskiplistNode *x;
216 int i;
217
218 /* If everything is out of range, return early. */
219 if (!zslIsInRange(zsl,&range)) return NULL;
220
221 x = zsl->header;
222 for (i = zsl->level-1; i >= 0; i--) {
223 /* Go forward while *OUT* of range. */
224 while (x->level[i].forward &&
225 !zslValueGteMin(x->level[i].forward->score,&range))
226 x = x->level[i].forward;
227 }
228
229 /* The tail is in range, so the previous block should always return a
230 * node that is non-NULL and the last one to be out of range. */
231 x = x->level[0].forward;
232 redisAssert(x != NULL && zslValueInRange(x->score,&range));
233 return x;
234 }
235
236 /* Find the last node that is contained in the specified range.
237 * Returns NULL when no element is contained in the range. */
238 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
239 zskiplistNode *x;
240 int i;
241
242 /* If everything is out of range, return early. */
243 if (!zslIsInRange(zsl,&range)) return NULL;
244
245 x = zsl->header;
246 for (i = zsl->level-1; i >= 0; i--) {
247 /* Go forward while *IN* range. */
248 while (x->level[i].forward &&
249 zslValueLteMax(x->level[i].forward->score,&range))
250 x = x->level[i].forward;
251 }
252
253 /* The header is in range, so the previous block should always return a
254 * node that is non-NULL and in range. */
255 redisAssert(x != NULL && zslValueInRange(x->score,&range));
256 return x;
257 }
258
259 /* Delete all the elements with score between min and max from the skiplist.
260 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
261 * Note that this function takes the reference to the hash table view of the
262 * sorted set, in order to remove the elements from the hash table too. */
263 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
264 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
265 unsigned long removed = 0;
266 int i;
267
268 x = zsl->header;
269 for (i = zsl->level-1; i >= 0; i--) {
270 while (x->level[i].forward && (range.minex ?
271 x->level[i].forward->score <= range.min :
272 x->level[i].forward->score < range.min))
273 x = x->level[i].forward;
274 update[i] = x;
275 }
276
277 /* Current node is the last with score < or <= min. */
278 x = x->level[0].forward;
279
280 /* Delete nodes while in range. */
281 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
282 zskiplistNode *next = x->level[0].forward;
283 zslDeleteNode(zsl,x,update);
284 dictDelete(dict,x->obj);
285 zslFreeNode(x);
286 removed++;
287 x = next;
288 }
289 return removed;
290 }
291
292 /* Delete all the elements with rank between start and end from the skiplist.
293 * Start and end are inclusive. Note that start and end need to be 1-based */
294 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
295 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
296 unsigned long traversed = 0, removed = 0;
297 int i;
298
299 x = zsl->header;
300 for (i = zsl->level-1; i >= 0; i--) {
301 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
302 traversed += x->level[i].span;
303 x = x->level[i].forward;
304 }
305 update[i] = x;
306 }
307
308 traversed++;
309 x = x->level[0].forward;
310 while (x && traversed <= end) {
311 zskiplistNode *next = x->level[0].forward;
312 zslDeleteNode(zsl,x,update);
313 dictDelete(dict,x->obj);
314 zslFreeNode(x);
315 removed++;
316 traversed++;
317 x = next;
318 }
319 return removed;
320 }
321
322 /* Find the rank for an element by both score and key.
323 * Returns 0 when the element cannot be found, rank otherwise.
324 * Note that the rank is 1-based due to the span of zsl->header to the
325 * first element. */
326 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
327 zskiplistNode *x;
328 unsigned long rank = 0;
329 int i;
330
331 x = zsl->header;
332 for (i = zsl->level-1; i >= 0; i--) {
333 while (x->level[i].forward &&
334 (x->level[i].forward->score < score ||
335 (x->level[i].forward->score == score &&
336 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
337 rank += x->level[i].span;
338 x = x->level[i].forward;
339 }
340
341 /* x might be equal to zsl->header, so test if obj is non-NULL */
342 if (x->obj && equalStringObjects(x->obj,o)) {
343 return rank;
344 }
345 }
346 return 0;
347 }
348
349 /* Finds an element by its rank. The rank argument needs to be 1-based. */
350 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
351 zskiplistNode *x;
352 unsigned long traversed = 0;
353 int i;
354
355 x = zsl->header;
356 for (i = zsl->level-1; i >= 0; i--) {
357 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
358 {
359 traversed += x->level[i].span;
360 x = x->level[i].forward;
361 }
362 if (traversed == rank) {
363 return x;
364 }
365 }
366 return NULL;
367 }
368
369 /* Populate the rangespec according to the objects min and max. */
370 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
371 char *eptr;
372 spec->minex = spec->maxex = 0;
373
374 /* Parse the min-max interval. If one of the values is prefixed
375 * by the "(" character, it's considered "open". For instance
376 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
377 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
378 if (min->encoding == REDIS_ENCODING_INT) {
379 spec->min = (long)min->ptr;
380 } else {
381 if (((char*)min->ptr)[0] == '(') {
382 spec->min = strtod((char*)min->ptr+1,&eptr);
383 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
384 spec->minex = 1;
385 } else {
386 spec->min = strtod((char*)min->ptr,&eptr);
387 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
388 }
389 }
390 if (max->encoding == REDIS_ENCODING_INT) {
391 spec->max = (long)max->ptr;
392 } else {
393 if (((char*)max->ptr)[0] == '(') {
394 spec->max = strtod((char*)max->ptr+1,&eptr);
395 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
396 spec->maxex = 1;
397 } else {
398 spec->max = strtod((char*)max->ptr,&eptr);
399 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
400 }
401 }
402
403 return REDIS_OK;
404 }
405
406 /*-----------------------------------------------------------------------------
407 * Ziplist-backed sorted set API
408 *----------------------------------------------------------------------------*/
409
410 double zzlGetScore(unsigned char *sptr) {
411 unsigned char *vstr;
412 unsigned int vlen;
413 long long vlong;
414 char buf[128];
415 double score;
416
417 redisAssert(sptr != NULL);
418 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
419
420 if (vstr) {
421 memcpy(buf,vstr,vlen);
422 buf[vlen] = '\0';
423 score = strtod(buf,NULL);
424 } else {
425 score = vlong;
426 }
427
428 return score;
429 }
430
431 /* Compare element in sorted set with given element. */
432 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
433 unsigned char *vstr;
434 unsigned int vlen;
435 long long vlong;
436 unsigned char vbuf[32];
437 int minlen, cmp;
438
439 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
440 if (vstr == NULL) {
441 /* Store string representation of long long in buf. */
442 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
443 vstr = vbuf;
444 }
445
446 minlen = (vlen < clen) ? vlen : clen;
447 cmp = memcmp(vstr,cstr,minlen);
448 if (cmp == 0) return vlen-clen;
449 return cmp;
450 }
451
452 unsigned char *zzlFind(robj *zobj, robj *ele, double *score) {
453 unsigned char *zl = zobj->ptr;
454 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
455
456 ele = getDecodedObject(ele);
457 while (eptr != NULL) {
458 sptr = ziplistNext(zl,eptr);
459 redisAssert(sptr != NULL);
460
461 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
462 /* Matching element, pull out score. */
463 *score = zzlGetScore(sptr);
464 decrRefCount(ele);
465 return eptr;
466 }
467
468 /* Move to next element. */
469 eptr = ziplistNext(zl,sptr);
470 }
471
472 decrRefCount(ele);
473 return NULL;
474 }
475
476 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
477 * don't want to modify the one given as argument. */
478 int zzlDelete(robj *zobj, unsigned char *eptr) {
479 unsigned char *zl = zobj->ptr;
480 unsigned char *p = eptr;
481
482 /* TODO: add function to ziplist API to delete N elements from offset. */
483 zl = ziplistDelete(zl,&p);
484 zl = ziplistDelete(zl,&p);
485 zobj->ptr = zl;
486 return REDIS_OK;
487 }
488
489 int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) {
490 unsigned char *zl = zobj->ptr;
491 unsigned char *sptr;
492 char scorebuf[128];
493 int scorelen;
494 int offset;
495
496 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
497 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
498 if (eptr == NULL) {
499 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
500 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
501 } else {
502 /* Keep offset relative to zl, as it might be re-allocated. */
503 offset = eptr-zl;
504 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
505 eptr = zl+offset;
506
507 /* Insert score after the element. */
508 redisAssert((sptr = ziplistNext(zl,eptr)) != NULL);
509 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
510 }
511
512 zobj->ptr = zl;
513 return REDIS_OK;
514 }
515
516 /* Insert (element,score) pair in ziplist. This function assumes the element is
517 * not yet present in the list. */
518 int zzlInsert(robj *zobj, robj *ele, double score) {
519 unsigned char *zl = zobj->ptr;
520 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
521 double s;
522 int insert = 0;
523
524 ele = getDecodedObject(ele);
525 while (eptr != NULL) {
526 sptr = ziplistNext(zl,eptr);
527 redisAssert(sptr != NULL);
528 s = zzlGetScore(sptr);
529
530 if (s > score) {
531 /* First element with score larger than score for element to be
532 * inserted. This means we should take its spot in the list to
533 * maintain ordering. */
534 insert = 1;
535 } else if (s == score) {
536 /* Ensure lexicographical ordering for elements. */
537 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) < 0)
538 insert = 1;
539 }
540
541 if (insert) {
542 zzlInsertAt(zobj,ele,score,eptr);
543 break;
544 }
545
546 /* Move to next element. */
547 eptr = ziplistNext(zl,sptr);
548 }
549
550 /* Push on tail of list when it was not yet inserted. */
551 if (!insert)
552 zzlInsertAt(zobj,ele,score,eptr);
553
554 decrRefCount(ele);
555 return REDIS_OK;
556 }
557
558 /*-----------------------------------------------------------------------------
559 * Sorted set commands
560 *----------------------------------------------------------------------------*/
561
562 /* This generic command implements both ZADD and ZINCRBY. */
563 void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int incr) {
564 static char *nanerr = "resulting score is not a number (NaN)";
565 robj *zobj;
566 robj *curobj;
567 double curscore = 0.0;
568
569 zobj = lookupKeyWrite(c->db,key);
570 if (zobj == NULL) {
571 zobj = createZsetZiplistObject();
572 dbAdd(c->db,key,zobj);
573 } else {
574 if (zobj->type != REDIS_ZSET) {
575 addReply(c,shared.wrongtypeerr);
576 return;
577 }
578 }
579
580 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
581 unsigned char *eptr;
582
583 if ((eptr = zzlFind(zobj,ele,&curscore)) != NULL) {
584 if (incr) {
585 score += curscore;
586 if (isnan(score)) {
587 addReplyError(c,nanerr);
588 /* Don't need to check if the sorted set is empty, because
589 * we know it has at least one element. */
590 return;
591 }
592 }
593
594 /* Remove and re-insert when score changed. */
595 if (score != curscore) {
596 redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
597 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
598
599 signalModifiedKey(c->db,key);
600 server.dirty++;
601 }
602
603 if (incr) /* ZINCRBY */
604 addReplyDouble(c,score);
605 else /* ZADD */
606 addReply(c,shared.czero);
607 } else {
608 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
609
610 signalModifiedKey(c->db,key);
611 server.dirty++;
612
613 if (incr) /* ZINCRBY */
614 addReplyDouble(c,score);
615 else /* ZADD */
616 addReply(c,shared.cone);
617 }
618 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
619 zset *zs = zobj->ptr;
620 zskiplistNode *znode;
621 dictEntry *de;
622
623 de = dictFind(zs->dict,ele);
624 if (de != NULL) {
625 curobj = dictGetEntryKey(de);
626 curscore = *(double*)dictGetEntryVal(de);
627
628 if (incr) {
629 score += curscore;
630 if (isnan(score)) {
631 addReplyError(c,nanerr);
632 /* Don't need to check if the sorted set is empty, because
633 * we know it has at least one element. */
634 return;
635 }
636 }
637
638 /* Remove and re-insert when score changed. We can safely delete
639 * the key object from the skiplist, since the dictionary still has
640 * a reference to it. */
641 if (score != curscore) {
642 redisAssert(zslDelete(zs->zsl,curscore,curobj));
643 znode = zslInsert(zs->zsl,score,curobj);
644 incrRefCount(curobj); /* Re-inserted in skiplist. */
645 dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
646
647 signalModifiedKey(c->db,key);
648 server.dirty++;
649 }
650
651 if (incr) /* ZINCRBY */
652 addReplyDouble(c,score);
653 else /* ZADD */
654 addReply(c,shared.czero);
655 } else {
656 znode = zslInsert(zs->zsl,score,ele);
657 incrRefCount(ele); /* Inserted in skiplist. */
658 redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
659 incrRefCount(ele); /* Added to dictionary. */
660
661 signalModifiedKey(c->db,key);
662 server.dirty++;
663
664 if (incr) /* ZINCRBY */
665 addReplyDouble(c,score);
666 else /* ZADD */
667 addReply(c,shared.cone);
668 }
669 } else {
670 redisPanic("Unknown sorted set encoding");
671 }
672 }
673
674 void zaddCommand(redisClient *c) {
675 double scoreval;
676 if (getDoubleFromObjectOrReply(c,c->argv[2],&scoreval,NULL) != REDIS_OK) return;
677 c->argv[3] = tryObjectEncoding(c->argv[3]);
678 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
679 }
680
681 void zincrbyCommand(redisClient *c) {
682 double scoreval;
683 if (getDoubleFromObjectOrReply(c,c->argv[2],&scoreval,NULL) != REDIS_OK) return;
684 c->argv[3] = tryObjectEncoding(c->argv[3]);
685 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
686 }
687
688 void zremCommand(redisClient *c) {
689 robj *zsetobj;
690 zset *zs;
691 dictEntry *de;
692 double curscore;
693 int deleted;
694
695 if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
696 checkType(c,zsetobj,REDIS_ZSET)) return;
697
698 zs = zsetobj->ptr;
699 c->argv[2] = tryObjectEncoding(c->argv[2]);
700 de = dictFind(zs->dict,c->argv[2]);
701 if (de == NULL) {
702 addReply(c,shared.czero);
703 return;
704 }
705 /* Delete from the skiplist */
706 curscore = *(double*)dictGetEntryVal(de);
707 deleted = zslDelete(zs->zsl,curscore,c->argv[2]);
708 redisAssert(deleted != 0);
709
710 /* Delete from the hash table */
711 dictDelete(zs->dict,c->argv[2]);
712 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
713 if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
714 signalModifiedKey(c->db,c->argv[1]);
715 server.dirty++;
716 addReply(c,shared.cone);
717 }
718
719 void zremrangebyscoreCommand(redisClient *c) {
720 zrangespec range;
721 long deleted;
722 robj *o;
723 zset *zs;
724
725 /* Parse the range arguments. */
726 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
727 addReplyError(c,"min or max is not a double");
728 return;
729 }
730
731 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
732 checkType(c,o,REDIS_ZSET)) return;
733
734 zs = o->ptr;
735 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
736 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
737 if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
738 if (deleted) signalModifiedKey(c->db,c->argv[1]);
739 server.dirty += deleted;
740 addReplyLongLong(c,deleted);
741 }
742
743 void zremrangebyrankCommand(redisClient *c) {
744 long start;
745 long end;
746 int llen;
747 long deleted;
748 robj *zsetobj;
749 zset *zs;
750
751 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
752 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
753
754 if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
755 checkType(c,zsetobj,REDIS_ZSET)) return;
756 zs = zsetobj->ptr;
757 llen = zs->zsl->length;
758
759 /* convert negative indexes */
760 if (start < 0) start = llen+start;
761 if (end < 0) end = llen+end;
762 if (start < 0) start = 0;
763
764 /* Invariant: start >= 0, so this test will be true when end < 0.
765 * The range is empty when start > end or start >= length. */
766 if (start > end || start >= llen) {
767 addReply(c,shared.czero);
768 return;
769 }
770 if (end >= llen) end = llen-1;
771
772 /* increment start and end because zsl*Rank functions
773 * use 1-based rank */
774 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
775 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
776 if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
777 if (deleted) signalModifiedKey(c->db,c->argv[1]);
778 server.dirty += deleted;
779 addReplyLongLong(c, deleted);
780 }
781
782 typedef struct {
783 dict *dict;
784 double weight;
785 } zsetopsrc;
786
787 int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) {
788 zsetopsrc *d1 = (void*) s1, *d2 = (void*) s2;
789 unsigned long size1, size2;
790 size1 = d1->dict ? dictSize(d1->dict) : 0;
791 size2 = d2->dict ? dictSize(d2->dict) : 0;
792 return size1 - size2;
793 }
794
795 #define REDIS_AGGR_SUM 1
796 #define REDIS_AGGR_MIN 2
797 #define REDIS_AGGR_MAX 3
798 #define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
799
800 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
801 if (aggregate == REDIS_AGGR_SUM) {
802 *target = *target + val;
803 /* The result of adding two doubles is NaN when one variable
804 * is +inf and the other is -inf. When these numbers are added,
805 * we maintain the convention of the result being 0.0. */
806 if (isnan(*target)) *target = 0.0;
807 } else if (aggregate == REDIS_AGGR_MIN) {
808 *target = val < *target ? val : *target;
809 } else if (aggregate == REDIS_AGGR_MAX) {
810 *target = val > *target ? val : *target;
811 } else {
812 /* safety net */
813 redisPanic("Unknown ZUNION/INTER aggregate type");
814 }
815 }
816
817 void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
818 int i, j, setnum;
819 int aggregate = REDIS_AGGR_SUM;
820 zsetopsrc *src;
821 robj *dstobj;
822 zset *dstzset;
823 zskiplistNode *znode;
824 dictIterator *di;
825 dictEntry *de;
826 int touched = 0;
827
828 /* expect setnum input keys to be given */
829 setnum = atoi(c->argv[2]->ptr);
830 if (setnum < 1) {
831 addReplyError(c,
832 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
833 return;
834 }
835
836 /* test if the expected number of keys would overflow */
837 if (3+setnum > c->argc) {
838 addReply(c,shared.syntaxerr);
839 return;
840 }
841
842 /* read keys to be used for input */
843 src = zmalloc(sizeof(zsetopsrc) * setnum);
844 for (i = 0, j = 3; i < setnum; i++, j++) {
845 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
846 if (!obj) {
847 src[i].dict = NULL;
848 } else {
849 if (obj->type == REDIS_ZSET) {
850 src[i].dict = ((zset*)obj->ptr)->dict;
851 } else if (obj->type == REDIS_SET) {
852 src[i].dict = (obj->ptr);
853 } else {
854 zfree(src);
855 addReply(c,shared.wrongtypeerr);
856 return;
857 }
858 }
859
860 /* default all weights to 1 */
861 src[i].weight = 1.0;
862 }
863
864 /* parse optional extra arguments */
865 if (j < c->argc) {
866 int remaining = c->argc - j;
867
868 while (remaining) {
869 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
870 j++; remaining--;
871 for (i = 0; i < setnum; i++, j++, remaining--) {
872 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
873 "weight value is not a double") != REDIS_OK)
874 {
875 zfree(src);
876 return;
877 }
878 }
879 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
880 j++; remaining--;
881 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
882 aggregate = REDIS_AGGR_SUM;
883 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
884 aggregate = REDIS_AGGR_MIN;
885 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
886 aggregate = REDIS_AGGR_MAX;
887 } else {
888 zfree(src);
889 addReply(c,shared.syntaxerr);
890 return;
891 }
892 j++; remaining--;
893 } else {
894 zfree(src);
895 addReply(c,shared.syntaxerr);
896 return;
897 }
898 }
899 }
900
901 /* sort sets from the smallest to largest, this will improve our
902 * algorithm's performance */
903 qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality);
904
905 dstobj = createZsetObject();
906 dstzset = dstobj->ptr;
907
908 if (op == REDIS_OP_INTER) {
909 /* skip going over all entries if the smallest zset is NULL or empty */
910 if (src[0].dict && dictSize(src[0].dict) > 0) {
911 /* precondition: as src[0].dict is non-empty and the zsets are ordered
912 * from small to large, all src[i > 0].dict are non-empty too */
913 di = dictGetIterator(src[0].dict);
914 while((de = dictNext(di)) != NULL) {
915 double score, value;
916
917 score = src[0].weight * zunionInterDictValue(de);
918 for (j = 1; j < setnum; j++) {
919 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
920 if (other) {
921 value = src[j].weight * zunionInterDictValue(other);
922 zunionInterAggregate(&score,value,aggregate);
923 } else {
924 break;
925 }
926 }
927
928 /* Only continue when present in every source dict. */
929 if (j == setnum) {
930 robj *o = dictGetEntryKey(de);
931 znode = zslInsert(dstzset->zsl,score,o);
932 incrRefCount(o); /* added to skiplist */
933 dictAdd(dstzset->dict,o,&znode->score);
934 incrRefCount(o); /* added to dictionary */
935 }
936 }
937 dictReleaseIterator(di);
938 }
939 } else if (op == REDIS_OP_UNION) {
940 for (i = 0; i < setnum; i++) {
941 if (!src[i].dict) continue;
942
943 di = dictGetIterator(src[i].dict);
944 while((de = dictNext(di)) != NULL) {
945 double score, value;
946
947 /* skip key when already processed */
948 if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL)
949 continue;
950
951 /* initialize score */
952 score = src[i].weight * zunionInterDictValue(de);
953
954 /* because the zsets are sorted by size, its only possible
955 * for sets at larger indices to hold this entry */
956 for (j = (i+1); j < setnum; j++) {
957 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
958 if (other) {
959 value = src[j].weight * zunionInterDictValue(other);
960 zunionInterAggregate(&score,value,aggregate);
961 }
962 }
963
964 robj *o = dictGetEntryKey(de);
965 znode = zslInsert(dstzset->zsl,score,o);
966 incrRefCount(o); /* added to skiplist */
967 dictAdd(dstzset->dict,o,&znode->score);
968 incrRefCount(o); /* added to dictionary */
969 }
970 dictReleaseIterator(di);
971 }
972 } else {
973 /* unknown operator */
974 redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION);
975 }
976
977 if (dbDelete(c->db,dstkey)) {
978 signalModifiedKey(c->db,dstkey);
979 touched = 1;
980 server.dirty++;
981 }
982 if (dstzset->zsl->length) {
983 dbAdd(c->db,dstkey,dstobj);
984 addReplyLongLong(c, dstzset->zsl->length);
985 if (!touched) signalModifiedKey(c->db,dstkey);
986 server.dirty++;
987 } else {
988 decrRefCount(dstobj);
989 addReply(c, shared.czero);
990 }
991 zfree(src);
992 }
993
994 void zunionstoreCommand(redisClient *c) {
995 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
996 }
997
998 void zinterstoreCommand(redisClient *c) {
999 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1000 }
1001
1002 void zrangeGenericCommand(redisClient *c, int reverse) {
1003 robj *o;
1004 long start;
1005 long end;
1006 int withscores = 0;
1007 int llen;
1008 int rangelen, j;
1009 zset *zsetobj;
1010 zskiplist *zsl;
1011 zskiplistNode *ln;
1012 robj *ele;
1013
1014 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1015 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1016
1017 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1018 withscores = 1;
1019 } else if (c->argc >= 5) {
1020 addReply(c,shared.syntaxerr);
1021 return;
1022 }
1023
1024 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
1025 || checkType(c,o,REDIS_ZSET)) return;
1026 zsetobj = o->ptr;
1027 zsl = zsetobj->zsl;
1028 llen = zsl->length;
1029
1030 /* convert negative indexes */
1031 if (start < 0) start = llen+start;
1032 if (end < 0) end = llen+end;
1033 if (start < 0) start = 0;
1034
1035 /* Invariant: start >= 0, so this test will be true when end < 0.
1036 * The range is empty when start > end or start >= length. */
1037 if (start > end || start >= llen) {
1038 addReply(c,shared.emptymultibulk);
1039 return;
1040 }
1041 if (end >= llen) end = llen-1;
1042 rangelen = (end-start)+1;
1043
1044 /* check if starting point is trivial, before searching
1045 * the element in log(N) time */
1046 if (reverse) {
1047 ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start);
1048 } else {
1049 ln = start == 0 ?
1050 zsl->header->level[0].forward : zslGetElementByRank(zsl, start+1);
1051 }
1052
1053 /* Return the result in form of a multi-bulk reply */
1054 addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen);
1055 for (j = 0; j < rangelen; j++) {
1056 ele = ln->obj;
1057 addReplyBulk(c,ele);
1058 if (withscores)
1059 addReplyDouble(c,ln->score);
1060 ln = reverse ? ln->backward : ln->level[0].forward;
1061 }
1062 }
1063
1064 void zrangeCommand(redisClient *c) {
1065 zrangeGenericCommand(c,0);
1066 }
1067
1068 void zrevrangeCommand(redisClient *c) {
1069 zrangeGenericCommand(c,1);
1070 }
1071
1072 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
1073 * If "justcount", only the number of elements in the range is returned. */
1074 void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
1075 zrangespec range;
1076 robj *o, *emptyreply;
1077 zset *zsetobj;
1078 zskiplist *zsl;
1079 zskiplistNode *ln;
1080 int offset = 0, limit = -1;
1081 int withscores = 0;
1082 unsigned long rangelen = 0;
1083 void *replylen = NULL;
1084 int minidx, maxidx;
1085
1086 /* Parse the range arguments. */
1087 if (reverse) {
1088 /* Range is given as [max,min] */
1089 maxidx = 2; minidx = 3;
1090 } else {
1091 /* Range is given as [min,max] */
1092 minidx = 2; maxidx = 3;
1093 }
1094
1095 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
1096 addReplyError(c,"min or max is not a double");
1097 return;
1098 }
1099
1100 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1101 * 4 arguments, so we'll never enter the following code path. */
1102 if (c->argc > 4) {
1103 int remaining = c->argc - 4;
1104 int pos = 4;
1105
1106 while (remaining) {
1107 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1108 pos++; remaining--;
1109 withscores = 1;
1110 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1111 offset = atoi(c->argv[pos+1]->ptr);
1112 limit = atoi(c->argv[pos+2]->ptr);
1113 pos += 3; remaining -= 3;
1114 } else {
1115 addReply(c,shared.syntaxerr);
1116 return;
1117 }
1118 }
1119 }
1120
1121 /* Ok, lookup the key and get the range */
1122 emptyreply = justcount ? shared.czero : shared.emptymultibulk;
1123 if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyreply)) == NULL ||
1124 checkType(c,o,REDIS_ZSET)) return;
1125 zsetobj = o->ptr;
1126 zsl = zsetobj->zsl;
1127
1128 /* If reversed, get the last node in range as starting point. */
1129 if (reverse) {
1130 ln = zslLastInRange(zsl,range);
1131 } else {
1132 ln = zslFirstInRange(zsl,range);
1133 }
1134
1135 /* No "first" element in the specified interval. */
1136 if (ln == NULL) {
1137 addReply(c,emptyreply);
1138 return;
1139 }
1140
1141 /* We don't know in advance how many matching elements there are in the
1142 * list, so we push this object that will represent the multi-bulk length
1143 * in the output buffer, and will "fix" it later */
1144 if (!justcount)
1145 replylen = addDeferredMultiBulkLength(c);
1146
1147 /* If there is an offset, just traverse the number of elements without
1148 * checking the score because that is done in the next loop. */
1149 while(ln && offset--) {
1150 ln = reverse ? ln->backward : ln->level[0].forward;
1151 }
1152
1153 while (ln && limit--) {
1154 /* Abort when the node is no longer in range. */
1155 if (reverse) {
1156 if (!zslValueGteMin(ln->score,&range)) break;
1157 } else {
1158 if (!zslValueLteMax(ln->score,&range)) break;
1159 }
1160
1161 /* Do our magic */
1162 rangelen++;
1163 if (!justcount) {
1164 addReplyBulk(c,ln->obj);
1165 if (withscores)
1166 addReplyDouble(c,ln->score);
1167 }
1168
1169 /* Move to next node */
1170 ln = reverse ? ln->backward : ln->level[0].forward;
1171 }
1172
1173 if (justcount) {
1174 addReplyLongLong(c,(long)rangelen);
1175 } else {
1176 setDeferredMultiBulkLength(c,replylen,
1177 withscores ? (rangelen*2) : rangelen);
1178 }
1179 }
1180
1181 void zrangebyscoreCommand(redisClient *c) {
1182 genericZrangebyscoreCommand(c,0,0);
1183 }
1184
1185 void zrevrangebyscoreCommand(redisClient *c) {
1186 genericZrangebyscoreCommand(c,1,0);
1187 }
1188
1189 void zcountCommand(redisClient *c) {
1190 genericZrangebyscoreCommand(c,0,1);
1191 }
1192
1193 void zcardCommand(redisClient *c) {
1194 robj *o;
1195 zset *zs;
1196
1197 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
1198 checkType(c,o,REDIS_ZSET)) return;
1199
1200 zs = o->ptr;
1201 addReplyLongLong(c,zs->zsl->length);
1202 }
1203
1204 void zscoreCommand(redisClient *c) {
1205 robj *o;
1206 zset *zs;
1207 dictEntry *de;
1208
1209 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
1210 checkType(c,o,REDIS_ZSET)) return;
1211
1212 zs = o->ptr;
1213 c->argv[2] = tryObjectEncoding(c->argv[2]);
1214 de = dictFind(zs->dict,c->argv[2]);
1215 if (!de) {
1216 addReply(c,shared.nullbulk);
1217 } else {
1218 double *score = dictGetEntryVal(de);
1219
1220 addReplyDouble(c,*score);
1221 }
1222 }
1223
1224 void zrankGenericCommand(redisClient *c, int reverse) {
1225 robj *o;
1226 zset *zs;
1227 zskiplist *zsl;
1228 dictEntry *de;
1229 unsigned long rank;
1230 double *score;
1231
1232 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
1233 checkType(c,o,REDIS_ZSET)) return;
1234
1235 zs = o->ptr;
1236 zsl = zs->zsl;
1237 c->argv[2] = tryObjectEncoding(c->argv[2]);
1238 de = dictFind(zs->dict,c->argv[2]);
1239 if (!de) {
1240 addReply(c,shared.nullbulk);
1241 return;
1242 }
1243
1244 score = dictGetEntryVal(de);
1245 rank = zslGetRank(zsl, *score, c->argv[2]);
1246 if (rank) {
1247 if (reverse) {
1248 addReplyLongLong(c, zsl->length - rank);
1249 } else {
1250 addReplyLongLong(c, rank-1);
1251 }
1252 } else {
1253 addReply(c,shared.nullbulk);
1254 }
1255 }
1256
1257 void zrankCommand(redisClient *c) {
1258 zrankGenericCommand(c, 0);
1259 }
1260
1261 void zrevrankCommand(redisClient *c) {
1262 zrankGenericCommand(c, 1);
1263 }