]> git.saurik.com Git - redis.git/blob - src/t_zset.c
503486385adbd7700a697f436a9c8b29f5460ad1
[redis.git] / src / t_zset.c
1 #include "redis.h"
2
3 #include <math.h>
4
5 /*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31 }
32
33 zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48 }
49
50 void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
52 zfree(node);
53 }
54
55 void zslFree(zskiplist *zsl) {
56 zskiplistNode *node = zsl->header->level[0].forward, *next;
57
58 zfree(zsl->header);
59 while(node) {
60 next = node->level[0].forward;
61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65 }
66
67 int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72 }
73
74 void zslInsert(zskiplist *zsl, double score, robj *obj) {
75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
101 update[i]->level[i].span = zsl->length;
102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
109
110 /* update span covered by update[i] as x is inserted here */
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
117 update[i]->level[i].span++;
118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
123 else
124 zsl->tail = x;
125 zsl->length++;
126 }
127
128 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
129 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
130 int i;
131 for (i = 0; i < zsl->level; i++) {
132 if (update[i]->level[i].forward == x) {
133 update[i]->level[i].span += x->level[i].span - 1;
134 update[i]->level[i].forward = x->level[i].forward;
135 } else {
136 update[i]->level[i].span -= 1;
137 }
138 }
139 if (x->level[0].forward) {
140 x->level[0].forward->backward = x->backward;
141 } else {
142 zsl->tail = x->backward;
143 }
144 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
145 zsl->level--;
146 zsl->length--;
147 }
148
149 /* Delete an element with matching score/object from the skiplist. */
150 int zslDelete(zskiplist *zsl, double score, robj *obj) {
151 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
152 int i;
153
154 x = zsl->header;
155 for (i = zsl->level-1; i >= 0; i--) {
156 while (x->level[i].forward &&
157 (x->level[i].forward->score < score ||
158 (x->level[i].forward->score == score &&
159 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
160 x = x->level[i].forward;
161 update[i] = x;
162 }
163 /* We may have multiple elements with the same score, what we need
164 * is to find the element with both the right score and object. */
165 x = x->level[0].forward;
166 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
167 zslDeleteNode(zsl, x, update);
168 zslFreeNode(x);
169 return 1;
170 } else {
171 return 0; /* not found */
172 }
173 return 0; /* not found */
174 }
175
176 /* Delete all the elements with score between min and max from the skiplist.
177 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
178 * Note that this function takes the reference to the hash table view of the
179 * sorted set, in order to remove the elements from the hash table too. */
180 unsigned long zslDeleteRangeByScore(zskiplist *zsl, double min, double max, dict *dict) {
181 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
182 unsigned long removed = 0;
183 int i;
184
185 x = zsl->header;
186 for (i = zsl->level-1; i >= 0; i--) {
187 while (x->level[i].forward && x->level[i].forward->score < min)
188 x = x->level[i].forward;
189 update[i] = x;
190 }
191 /* We may have multiple elements with the same score, what we need
192 * is to find the element with both the right score and object. */
193 x = x->level[0].forward;
194 while (x && x->score <= max) {
195 zskiplistNode *next = x->level[0].forward;
196 zslDeleteNode(zsl, x, update);
197 dictDelete(dict,x->obj);
198 zslFreeNode(x);
199 removed++;
200 x = next;
201 }
202 return removed; /* not found */
203 }
204
205 /* Delete all the elements with rank between start and end from the skiplist.
206 * Start and end are inclusive. Note that start and end need to be 1-based */
207 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
208 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
209 unsigned long traversed = 0, removed = 0;
210 int i;
211
212 x = zsl->header;
213 for (i = zsl->level-1; i >= 0; i--) {
214 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
215 traversed += x->level[i].span;
216 x = x->level[i].forward;
217 }
218 update[i] = x;
219 }
220
221 traversed++;
222 x = x->level[0].forward;
223 while (x && traversed <= end) {
224 zskiplistNode *next = x->level[0].forward;
225 zslDeleteNode(zsl, x, update);
226 dictDelete(dict,x->obj);
227 zslFreeNode(x);
228 removed++;
229 traversed++;
230 x = next;
231 }
232 return removed;
233 }
234
235 /* Find the first node having a score equal or greater than the specified one.
236 * Returns NULL if there is no match. */
237 zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
238 zskiplistNode *x;
239 int i;
240
241 x = zsl->header;
242 for (i = zsl->level-1; i >= 0; i--) {
243 while (x->level[i].forward && x->level[i].forward->score < score)
244 x = x->level[i].forward;
245 }
246 /* We may have multiple elements with the same score, what we need
247 * is to find the element with both the right score and object. */
248 return x->level[0].forward;
249 }
250
251 /* Find the rank for an element by both score and key.
252 * Returns 0 when the element cannot be found, rank otherwise.
253 * Note that the rank is 1-based due to the span of zsl->header to the
254 * first element. */
255 unsigned long zslistTypeGetRank(zskiplist *zsl, double score, robj *o) {
256 zskiplistNode *x;
257 unsigned long rank = 0;
258 int i;
259
260 x = zsl->header;
261 for (i = zsl->level-1; i >= 0; i--) {
262 while (x->level[i].forward &&
263 (x->level[i].forward->score < score ||
264 (x->level[i].forward->score == score &&
265 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
266 rank += x->level[i].span;
267 x = x->level[i].forward;
268 }
269
270 /* x might be equal to zsl->header, so test if obj is non-NULL */
271 if (x->obj && equalStringObjects(x->obj,o)) {
272 return rank;
273 }
274 }
275 return 0;
276 }
277
278 /* Finds an element by its rank. The rank argument needs to be 1-based. */
279 zskiplistNode* zslistTypeGetElementByRank(zskiplist *zsl, unsigned long rank) {
280 zskiplistNode *x;
281 unsigned long traversed = 0;
282 int i;
283
284 x = zsl->header;
285 for (i = zsl->level-1; i >= 0; i--) {
286 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
287 {
288 traversed += x->level[i].span;
289 x = x->level[i].forward;
290 }
291 if (traversed == rank) {
292 return x;
293 }
294 }
295 return NULL;
296 }
297
298 /*-----------------------------------------------------------------------------
299 * Sorted set commands
300 *----------------------------------------------------------------------------*/
301
302 /* This generic command implements both ZADD and ZINCRBY.
303 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
304 * the increment if the operation is a ZINCRBY (doincrement == 1). */
305 void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
306 robj *zsetobj;
307 zset *zs;
308 double *score;
309
310 zsetobj = lookupKeyWrite(c->db,key);
311 if (zsetobj == NULL) {
312 zsetobj = createZsetObject();
313 dbAdd(c->db,key,zsetobj);
314 } else {
315 if (zsetobj->type != REDIS_ZSET) {
316 addReply(c,shared.wrongtypeerr);
317 return;
318 }
319 }
320 zs = zsetobj->ptr;
321
322 /* Ok now since we implement both ZADD and ZINCRBY here the code
323 * needs to handle the two different conditions. It's all about setting
324 * '*score', that is, the new score to set, to the right value. */
325 score = zmalloc(sizeof(double));
326 if (doincrement) {
327 dictEntry *de;
328
329 /* Read the old score. If the element was not present starts from 0 */
330 de = dictFind(zs->dict,ele);
331 if (de) {
332 double *oldscore = dictGetEntryVal(de);
333 *score = *oldscore + scoreval;
334 } else {
335 *score = scoreval;
336 }
337 if (isnan(*score)) {
338 addReplySds(c,
339 sdsnew("-ERR resulting score is not a number (NaN)\r\n"));
340 zfree(score);
341 /* Note that we don't need to check if the zset may be empty and
342 * should be removed here, as we can only obtain Nan as score if
343 * there was already an element in the sorted set. */
344 return;
345 }
346 } else {
347 *score = scoreval;
348 }
349
350 /* What follows is a simple remove and re-insert operation that is common
351 * to both ZADD and ZINCRBY... */
352 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
353 /* case 1: New element */
354 incrRefCount(ele); /* added to hash */
355 zslInsert(zs->zsl,*score,ele);
356 incrRefCount(ele); /* added to skiplist */
357 touchWatchedKey(c->db,c->argv[1]);
358 server.dirty++;
359 if (doincrement)
360 addReplyDouble(c,*score);
361 else
362 addReply(c,shared.cone);
363 } else {
364 dictEntry *de;
365 double *oldscore;
366
367 /* case 2: Score update operation */
368 de = dictFind(zs->dict,ele);
369 redisAssert(de != NULL);
370 oldscore = dictGetEntryVal(de);
371 if (*score != *oldscore) {
372 int deleted;
373
374 /* Remove and insert the element in the skip list with new score */
375 deleted = zslDelete(zs->zsl,*oldscore,ele);
376 redisAssert(deleted != 0);
377 zslInsert(zs->zsl,*score,ele);
378 incrRefCount(ele);
379 /* Update the score in the hash table */
380 dictReplace(zs->dict,ele,score);
381 touchWatchedKey(c->db,c->argv[1]);
382 server.dirty++;
383 } else {
384 zfree(score);
385 }
386 if (doincrement)
387 addReplyDouble(c,*score);
388 else
389 addReply(c,shared.czero);
390 }
391 }
392
393 void zaddCommand(redisClient *c) {
394 double scoreval;
395 if (getDoubleFromObjectOrReply(c,c->argv[2],&scoreval,NULL) != REDIS_OK) return;
396 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
397 }
398
399 void zincrbyCommand(redisClient *c) {
400 double scoreval;
401 if (getDoubleFromObjectOrReply(c,c->argv[2],&scoreval,NULL) != REDIS_OK) return;
402 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
403 }
404
405 void zremCommand(redisClient *c) {
406 robj *zsetobj;
407 zset *zs;
408 dictEntry *de;
409 double *oldscore;
410 int deleted;
411
412 if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
413 checkType(c,zsetobj,REDIS_ZSET)) return;
414
415 zs = zsetobj->ptr;
416 de = dictFind(zs->dict,c->argv[2]);
417 if (de == NULL) {
418 addReply(c,shared.czero);
419 return;
420 }
421 /* Delete from the skiplist */
422 oldscore = dictGetEntryVal(de);
423 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
424 redisAssert(deleted != 0);
425
426 /* Delete from the hash table */
427 dictDelete(zs->dict,c->argv[2]);
428 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
429 if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
430 touchWatchedKey(c->db,c->argv[1]);
431 server.dirty++;
432 addReply(c,shared.cone);
433 }
434
435 void zremrangebyscoreCommand(redisClient *c) {
436 double min;
437 double max;
438 long deleted;
439 robj *zsetobj;
440 zset *zs;
441
442 if ((getDoubleFromObjectOrReply(c, c->argv[2], &min, NULL) != REDIS_OK) ||
443 (getDoubleFromObjectOrReply(c, c->argv[3], &max, NULL) != REDIS_OK)) return;
444
445 if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
446 checkType(c,zsetobj,REDIS_ZSET)) return;
447
448 zs = zsetobj->ptr;
449 deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict);
450 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
451 if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
452 if (deleted) touchWatchedKey(c->db,c->argv[1]);
453 server.dirty += deleted;
454 addReplyLongLong(c,deleted);
455 }
456
457 void zremrangebyrankCommand(redisClient *c) {
458 long start;
459 long end;
460 int llen;
461 long deleted;
462 robj *zsetobj;
463 zset *zs;
464
465 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
466 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
467
468 if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
469 checkType(c,zsetobj,REDIS_ZSET)) return;
470 zs = zsetobj->ptr;
471 llen = zs->zsl->length;
472
473 /* convert negative indexes */
474 if (start < 0) start = llen+start;
475 if (end < 0) end = llen+end;
476 if (start < 0) start = 0;
477
478 /* Invariant: start >= 0, so this test will be true when end < 0.
479 * The range is empty when start > end or start >= length. */
480 if (start > end || start >= llen) {
481 addReply(c,shared.czero);
482 return;
483 }
484 if (end >= llen) end = llen-1;
485
486 /* increment start and end because zsl*Rank functions
487 * use 1-based rank */
488 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
489 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
490 if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
491 if (deleted) touchWatchedKey(c->db,c->argv[1]);
492 server.dirty += deleted;
493 addReplyLongLong(c, deleted);
494 }
495
496 typedef struct {
497 dict *dict;
498 double weight;
499 } zsetopsrc;
500
501 int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) {
502 zsetopsrc *d1 = (void*) s1, *d2 = (void*) s2;
503 unsigned long size1, size2;
504 size1 = d1->dict ? dictSize(d1->dict) : 0;
505 size2 = d2->dict ? dictSize(d2->dict) : 0;
506 return size1 - size2;
507 }
508
509 #define REDIS_AGGR_SUM 1
510 #define REDIS_AGGR_MIN 2
511 #define REDIS_AGGR_MAX 3
512 #define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
513
514 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
515 if (aggregate == REDIS_AGGR_SUM) {
516 *target = *target + val;
517 /* The result of adding two doubles is NaN when one variable
518 * is +inf and the other is -inf. When these numbers are added,
519 * we maintain the convention of the result being 0.0. */
520 if (isnan(*target)) *target = 0.0;
521 } else if (aggregate == REDIS_AGGR_MIN) {
522 *target = val < *target ? val : *target;
523 } else if (aggregate == REDIS_AGGR_MAX) {
524 *target = val > *target ? val : *target;
525 } else {
526 /* safety net */
527 redisPanic("Unknown ZUNION/INTER aggregate type");
528 }
529 }
530
531 void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
532 int i, j, setnum;
533 int aggregate = REDIS_AGGR_SUM;
534 zsetopsrc *src;
535 robj *dstobj;
536 zset *dstzset;
537 dictIterator *di;
538 dictEntry *de;
539 int touched = 0;
540
541 /* expect setnum input keys to be given */
542 setnum = atoi(c->argv[2]->ptr);
543 if (setnum < 1) {
544 addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n"));
545 return;
546 }
547
548 /* test if the expected number of keys would overflow */
549 if (3+setnum > c->argc) {
550 addReply(c,shared.syntaxerr);
551 return;
552 }
553
554 /* read keys to be used for input */
555 src = zmalloc(sizeof(zsetopsrc) * setnum);
556 for (i = 0, j = 3; i < setnum; i++, j++) {
557 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
558 if (!obj) {
559 src[i].dict = NULL;
560 } else {
561 if (obj->type == REDIS_ZSET) {
562 src[i].dict = ((zset*)obj->ptr)->dict;
563 } else if (obj->type == REDIS_SET) {
564 src[i].dict = (obj->ptr);
565 } else {
566 zfree(src);
567 addReply(c,shared.wrongtypeerr);
568 return;
569 }
570 }
571
572 /* default all weights to 1 */
573 src[i].weight = 1.0;
574 }
575
576 /* parse optional extra arguments */
577 if (j < c->argc) {
578 int remaining = c->argc - j;
579
580 while (remaining) {
581 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
582 j++; remaining--;
583 for (i = 0; i < setnum; i++, j++, remaining--) {
584 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
585 "weight value is not a double") != REDIS_OK)
586 {
587 zfree(src);
588 return;
589 }
590 }
591 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
592 j++; remaining--;
593 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
594 aggregate = REDIS_AGGR_SUM;
595 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
596 aggregate = REDIS_AGGR_MIN;
597 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
598 aggregate = REDIS_AGGR_MAX;
599 } else {
600 zfree(src);
601 addReply(c,shared.syntaxerr);
602 return;
603 }
604 j++; remaining--;
605 } else {
606 zfree(src);
607 addReply(c,shared.syntaxerr);
608 return;
609 }
610 }
611 }
612
613 /* sort sets from the smallest to largest, this will improve our
614 * algorithm's performance */
615 qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality);
616
617 dstobj = createZsetObject();
618 dstzset = dstobj->ptr;
619
620 if (op == REDIS_OP_INTER) {
621 /* skip going over all entries if the smallest zset is NULL or empty */
622 if (src[0].dict && dictSize(src[0].dict) > 0) {
623 /* precondition: as src[0].dict is non-empty and the zsets are ordered
624 * from small to large, all src[i > 0].dict are non-empty too */
625 di = dictGetIterator(src[0].dict);
626 while((de = dictNext(di)) != NULL) {
627 double *score = zmalloc(sizeof(double)), value;
628 *score = src[0].weight * zunionInterDictValue(de);
629
630 for (j = 1; j < setnum; j++) {
631 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
632 if (other) {
633 value = src[j].weight * zunionInterDictValue(other);
634 zunionInterAggregate(score, value, aggregate);
635 } else {
636 break;
637 }
638 }
639
640 /* skip entry when not present in every source dict */
641 if (j != setnum) {
642 zfree(score);
643 } else {
644 robj *o = dictGetEntryKey(de);
645 dictAdd(dstzset->dict,o,score);
646 incrRefCount(o); /* added to dictionary */
647 zslInsert(dstzset->zsl,*score,o);
648 incrRefCount(o); /* added to skiplist */
649 }
650 }
651 dictReleaseIterator(di);
652 }
653 } else if (op == REDIS_OP_UNION) {
654 for (i = 0; i < setnum; i++) {
655 if (!src[i].dict) continue;
656
657 di = dictGetIterator(src[i].dict);
658 while((de = dictNext(di)) != NULL) {
659 /* skip key when already processed */
660 if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue;
661
662 double *score = zmalloc(sizeof(double)), value;
663 *score = src[i].weight * zunionInterDictValue(de);
664
665 /* because the zsets are sorted by size, its only possible
666 * for sets at larger indices to hold this entry */
667 for (j = (i+1); j < setnum; j++) {
668 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
669 if (other) {
670 value = src[j].weight * zunionInterDictValue(other);
671 zunionInterAggregate(score, value, aggregate);
672 }
673 }
674
675 robj *o = dictGetEntryKey(de);
676 dictAdd(dstzset->dict,o,score);
677 incrRefCount(o); /* added to dictionary */
678 zslInsert(dstzset->zsl,*score,o);
679 incrRefCount(o); /* added to skiplist */
680 }
681 dictReleaseIterator(di);
682 }
683 } else {
684 /* unknown operator */
685 redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION);
686 }
687
688 if (dbDelete(c->db,dstkey)) {
689 touchWatchedKey(c->db,dstkey);
690 touched = 1;
691 server.dirty++;
692 }
693 if (dstzset->zsl->length) {
694 dbAdd(c->db,dstkey,dstobj);
695 addReplyLongLong(c, dstzset->zsl->length);
696 if (!touched) touchWatchedKey(c->db,dstkey);
697 server.dirty++;
698 } else {
699 decrRefCount(dstobj);
700 addReply(c, shared.czero);
701 }
702 zfree(src);
703 }
704
705 void zunionstoreCommand(redisClient *c) {
706 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
707 }
708
709 void zinterstoreCommand(redisClient *c) {
710 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
711 }
712
713 void zrangeGenericCommand(redisClient *c, int reverse) {
714 robj *o;
715 long start;
716 long end;
717 int withscores = 0;
718 int llen;
719 int rangelen, j;
720 zset *zsetobj;
721 zskiplist *zsl;
722 zskiplistNode *ln;
723 robj *ele;
724
725 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
726 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
727
728 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
729 withscores = 1;
730 } else if (c->argc >= 5) {
731 addReply(c,shared.syntaxerr);
732 return;
733 }
734
735 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
736 || checkType(c,o,REDIS_ZSET)) return;
737 zsetobj = o->ptr;
738 zsl = zsetobj->zsl;
739 llen = zsl->length;
740
741 /* convert negative indexes */
742 if (start < 0) start = llen+start;
743 if (end < 0) end = llen+end;
744 if (start < 0) start = 0;
745
746 /* Invariant: start >= 0, so this test will be true when end < 0.
747 * The range is empty when start > end or start >= length. */
748 if (start > end || start >= llen) {
749 addReply(c,shared.emptymultibulk);
750 return;
751 }
752 if (end >= llen) end = llen-1;
753 rangelen = (end-start)+1;
754
755 /* check if starting point is trivial, before searching
756 * the element in log(N) time */
757 if (reverse) {
758 ln = start == 0 ? zsl->tail : zslistTypeGetElementByRank(zsl, llen-start);
759 } else {
760 ln = start == 0 ?
761 zsl->header->level[0].forward : zslistTypeGetElementByRank(zsl, start+1);
762 }
763
764 /* Return the result in form of a multi-bulk reply */
765 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
766 withscores ? (rangelen*2) : rangelen));
767 for (j = 0; j < rangelen; j++) {
768 ele = ln->obj;
769 addReplyBulk(c,ele);
770 if (withscores)
771 addReplyDouble(c,ln->score);
772 ln = reverse ? ln->backward : ln->level[0].forward;
773 }
774 }
775
776 void zrangeCommand(redisClient *c) {
777 zrangeGenericCommand(c,0);
778 }
779
780 void zrevrangeCommand(redisClient *c) {
781 zrangeGenericCommand(c,1);
782 }
783
784 /* This command implements both ZRANGEBYSCORE and ZCOUNT.
785 * If justcount is non-zero, just the count is returned. */
786 void genericZrangebyscoreCommand(redisClient *c, int justcount) {
787 robj *o;
788 double min, max;
789 int minex = 0, maxex = 0; /* are min or max exclusive? */
790 int offset = 0, limit = -1;
791 int withscores = 0;
792 int badsyntax = 0;
793
794 /* Parse the min-max interval. If one of the values is prefixed
795 * by the "(" character, it's considered "open". For instance
796 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
797 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
798 if (((char*)c->argv[2]->ptr)[0] == '(') {
799 min = strtod((char*)c->argv[2]->ptr+1,NULL);
800 minex = 1;
801 } else {
802 min = strtod(c->argv[2]->ptr,NULL);
803 }
804 if (((char*)c->argv[3]->ptr)[0] == '(') {
805 max = strtod((char*)c->argv[3]->ptr+1,NULL);
806 maxex = 1;
807 } else {
808 max = strtod(c->argv[3]->ptr,NULL);
809 }
810
811 /* Parse "WITHSCORES": note that if the command was called with
812 * the name ZCOUNT then we are sure that c->argc == 4, so we'll never
813 * enter the following paths to parse WITHSCORES and LIMIT. */
814 if (c->argc == 5 || c->argc == 8) {
815 if (strcasecmp(c->argv[c->argc-1]->ptr,"withscores") == 0)
816 withscores = 1;
817 else
818 badsyntax = 1;
819 }
820 if (c->argc != (4 + withscores) && c->argc != (7 + withscores))
821 badsyntax = 1;
822 if (badsyntax) {
823 addReplySds(c,
824 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
825 return;
826 }
827
828 /* Parse "LIMIT" */
829 if (c->argc == (7 + withscores) && strcasecmp(c->argv[4]->ptr,"limit")) {
830 addReply(c,shared.syntaxerr);
831 return;
832 } else if (c->argc == (7 + withscores)) {
833 offset = atoi(c->argv[5]->ptr);
834 limit = atoi(c->argv[6]->ptr);
835 if (offset < 0) offset = 0;
836 }
837
838 /* Ok, lookup the key and get the range */
839 o = lookupKeyRead(c->db,c->argv[1]);
840 if (o == NULL) {
841 addReply(c,justcount ? shared.czero : shared.emptymultibulk);
842 } else {
843 if (o->type != REDIS_ZSET) {
844 addReply(c,shared.wrongtypeerr);
845 } else {
846 zset *zsetobj = o->ptr;
847 zskiplist *zsl = zsetobj->zsl;
848 zskiplistNode *ln;
849 robj *ele, *lenobj = NULL;
850 unsigned long rangelen = 0;
851
852 /* Get the first node with the score >= min, or with
853 * score > min if 'minex' is true. */
854 ln = zslFirstWithScore(zsl,min);
855 while (minex && ln && ln->score == min) ln = ln->level[0].forward;
856
857 if (ln == NULL) {
858 /* No element matching the speciifed interval */
859 addReply(c,justcount ? shared.czero : shared.emptymultibulk);
860 return;
861 }
862
863 /* We don't know in advance how many matching elements there
864 * are in the list, so we push this object that will represent
865 * the multi-bulk length in the output buffer, and will "fix"
866 * it later */
867 if (!justcount) {
868 lenobj = createObject(REDIS_STRING,NULL);
869 addReply(c,lenobj);
870 decrRefCount(lenobj);
871 }
872
873 while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) {
874 if (offset) {
875 offset--;
876 ln = ln->level[0].forward;
877 continue;
878 }
879 if (limit == 0) break;
880 if (!justcount) {
881 ele = ln->obj;
882 addReplyBulk(c,ele);
883 if (withscores)
884 addReplyDouble(c,ln->score);
885 }
886 ln = ln->level[0].forward;
887 rangelen++;
888 if (limit > 0) limit--;
889 }
890 if (justcount) {
891 addReplyLongLong(c,(long)rangelen);
892 } else {
893 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",
894 withscores ? (rangelen*2) : rangelen);
895 }
896 }
897 }
898 }
899
900 void zrangebyscoreCommand(redisClient *c) {
901 genericZrangebyscoreCommand(c,0);
902 }
903
904 void zcountCommand(redisClient *c) {
905 genericZrangebyscoreCommand(c,1);
906 }
907
908 void zcardCommand(redisClient *c) {
909 robj *o;
910 zset *zs;
911
912 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
913 checkType(c,o,REDIS_ZSET)) return;
914
915 zs = o->ptr;
916 addReplyUlong(c,zs->zsl->length);
917 }
918
919 void zscoreCommand(redisClient *c) {
920 robj *o;
921 zset *zs;
922 dictEntry *de;
923
924 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
925 checkType(c,o,REDIS_ZSET)) return;
926
927 zs = o->ptr;
928 de = dictFind(zs->dict,c->argv[2]);
929 if (!de) {
930 addReply(c,shared.nullbulk);
931 } else {
932 double *score = dictGetEntryVal(de);
933
934 addReplyDouble(c,*score);
935 }
936 }
937
938 void zrankGenericCommand(redisClient *c, int reverse) {
939 robj *o;
940 zset *zs;
941 zskiplist *zsl;
942 dictEntry *de;
943 unsigned long rank;
944 double *score;
945
946 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
947 checkType(c,o,REDIS_ZSET)) return;
948
949 zs = o->ptr;
950 zsl = zs->zsl;
951 de = dictFind(zs->dict,c->argv[2]);
952 if (!de) {
953 addReply(c,shared.nullbulk);
954 return;
955 }
956
957 score = dictGetEntryVal(de);
958 rank = zslistTypeGetRank(zsl, *score, c->argv[2]);
959 if (rank) {
960 if (reverse) {
961 addReplyLongLong(c, zsl->length - rank);
962 } else {
963 addReplyLongLong(c, rank-1);
964 }
965 } else {
966 addReply(c,shared.nullbulk);
967 }
968 }
969
970 void zrankCommand(redisClient *c) {
971 zrankGenericCommand(c, 0);
972 }
973
974 void zrevrankCommand(redisClient *c) {
975 zrankGenericCommand(c, 1);
976 }