]> git.saurik.com Git - redis.git/blame - src/t_zset.c
Properly free encoded sorted set
[redis.git] / src / t_zset.c
CommitLineData
e2641e09 1#include "redis.h"
2
3#include <math.h>
4
5/*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9/* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17/* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
2159782b 27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
e2641e09 28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31}
32
33zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
2159782b
PN
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
e2641e09 44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48}
49
50void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
e2641e09 52 zfree(node);
53}
54
55void zslFree(zskiplist *zsl) {
2159782b 56 zskiplistNode *node = zsl->header->level[0].forward, *next;
e2641e09 57
e2641e09 58 zfree(zsl->header);
59 while(node) {
2159782b 60 next = node->level[0].forward;
e2641e09 61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65}
66
67int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72}
73
69ef89f2 74zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
e2641e09 75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
2159782b
PN
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
e2641e09 89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
2159782b 101 update[i]->level[i].span = zsl->length;
e2641e09 102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
2159782b
PN
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
e2641e09 109
110 /* update span covered by update[i] as x is inserted here */
2159782b
PN
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
e2641e09 113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
2159782b 117 update[i]->level[i].span++;
e2641e09 118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
2159782b
PN
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
e2641e09 123 else
124 zsl->tail = x;
125 zsl->length++;
69ef89f2 126 return x;
e2641e09 127}
128
129/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
2159782b
PN
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
e2641e09 136 } else {
2159782b 137 update[i]->level[i].span -= 1;
e2641e09 138 }
139 }
2159782b
PN
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
e2641e09 142 } else {
143 zsl->tail = x->backward;
144 }
2159782b 145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
e2641e09 146 zsl->level--;
147 zsl->length--;
148}
149
150/* Delete an element with matching score/object from the skiplist. */
151int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
e2641e09 162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
2159782b 166 x = x->level[0].forward;
e2641e09 167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175}
176
91504b6c
PN
177/* Struct to hold a inclusive/exclusive range spec. */
178typedef struct {
179 double min, max;
180 int minex, maxex; /* are min or max exclusive? */
181} zrangespec;
182
45290ad9 183static int zslValueGteMin(double value, zrangespec *spec) {
22b9bf15
PN
184 return spec->minex ? (value > spec->min) : (value >= spec->min);
185}
186
45290ad9 187static int zslValueLteMax(double value, zrangespec *spec) {
22b9bf15
PN
188 return spec->maxex ? (value < spec->max) : (value <= spec->max);
189}
190
df278b8b 191static int zslValueInRange(double value, zrangespec *spec) {
45290ad9 192 return zslValueGteMin(value,spec) && zslValueLteMax(value,spec);
22b9bf15
PN
193}
194
195/* Returns if there is a part of the zset is in range. */
196int zslIsInRange(zskiplist *zsl, zrangespec *range) {
197 zskiplistNode *x;
198
8e1b3277
PN
199 /* Test for ranges that will always be empty. */
200 if (range->min > range->max ||
201 (range->min == range->max && (range->minex || range->maxex)))
202 return 0;
22b9bf15 203 x = zsl->tail;
45290ad9 204 if (x == NULL || !zslValueGteMin(x->score,range))
22b9bf15
PN
205 return 0;
206 x = zsl->header->level[0].forward;
45290ad9 207 if (x == NULL || !zslValueLteMax(x->score,range))
22b9bf15
PN
208 return 0;
209 return 1;
210}
211
212/* Find the first node that is contained in the specified range.
213 * Returns NULL when no element is contained in the range. */
214zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
215 zskiplistNode *x;
216 int i;
217
218 /* If everything is out of range, return early. */
219 if (!zslIsInRange(zsl,&range)) return NULL;
220
221 x = zsl->header;
222 for (i = zsl->level-1; i >= 0; i--) {
223 /* Go forward while *OUT* of range. */
224 while (x->level[i].forward &&
45290ad9 225 !zslValueGteMin(x->level[i].forward->score,&range))
22b9bf15
PN
226 x = x->level[i].forward;
227 }
228
229 /* The tail is in range, so the previous block should always return a
230 * node that is non-NULL and the last one to be out of range. */
231 x = x->level[0].forward;
232 redisAssert(x != NULL && zslValueInRange(x->score,&range));
233 return x;
234}
235
236/* Find the last node that is contained in the specified range.
237 * Returns NULL when no element is contained in the range. */
238zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
239 zskiplistNode *x;
240 int i;
241
242 /* If everything is out of range, return early. */
243 if (!zslIsInRange(zsl,&range)) return NULL;
244
245 x = zsl->header;
246 for (i = zsl->level-1; i >= 0; i--) {
247 /* Go forward while *IN* range. */
248 while (x->level[i].forward &&
45290ad9 249 zslValueLteMax(x->level[i].forward->score,&range))
22b9bf15
PN
250 x = x->level[i].forward;
251 }
252
253 /* The header is in range, so the previous block should always return a
254 * node that is non-NULL and in range. */
255 redisAssert(x != NULL && zslValueInRange(x->score,&range));
256 return x;
257}
258
e2641e09 259/* Delete all the elements with score between min and max from the skiplist.
260 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
261 * Note that this function takes the reference to the hash table view of the
262 * sorted set, in order to remove the elements from the hash table too. */
91504b6c 263unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
e2641e09 264 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
265 unsigned long removed = 0;
266 int i;
267
268 x = zsl->header;
269 for (i = zsl->level-1; i >= 0; i--) {
91504b6c
PN
270 while (x->level[i].forward && (range.minex ?
271 x->level[i].forward->score <= range.min :
272 x->level[i].forward->score < range.min))
273 x = x->level[i].forward;
e2641e09 274 update[i] = x;
275 }
91504b6c
PN
276
277 /* Current node is the last with score < or <= min. */
2159782b 278 x = x->level[0].forward;
91504b6c
PN
279
280 /* Delete nodes while in range. */
281 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
2159782b 282 zskiplistNode *next = x->level[0].forward;
69ef89f2 283 zslDeleteNode(zsl,x,update);
e2641e09 284 dictDelete(dict,x->obj);
285 zslFreeNode(x);
286 removed++;
287 x = next;
288 }
91504b6c 289 return removed;
e2641e09 290}
291
292/* Delete all the elements with rank between start and end from the skiplist.
293 * Start and end are inclusive. Note that start and end need to be 1-based */
294unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
295 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
296 unsigned long traversed = 0, removed = 0;
297 int i;
298
299 x = zsl->header;
300 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
301 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
302 traversed += x->level[i].span;
303 x = x->level[i].forward;
e2641e09 304 }
305 update[i] = x;
306 }
307
308 traversed++;
2159782b 309 x = x->level[0].forward;
e2641e09 310 while (x && traversed <= end) {
2159782b 311 zskiplistNode *next = x->level[0].forward;
69ef89f2 312 zslDeleteNode(zsl,x,update);
e2641e09 313 dictDelete(dict,x->obj);
314 zslFreeNode(x);
315 removed++;
316 traversed++;
317 x = next;
318 }
319 return removed;
320}
321
e2641e09 322/* Find the rank for an element by both score and key.
323 * Returns 0 when the element cannot be found, rank otherwise.
324 * Note that the rank is 1-based due to the span of zsl->header to the
325 * first element. */
a3004773 326unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
e2641e09 327 zskiplistNode *x;
328 unsigned long rank = 0;
329 int i;
330
331 x = zsl->header;
332 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
333 while (x->level[i].forward &&
334 (x->level[i].forward->score < score ||
335 (x->level[i].forward->score == score &&
336 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
337 rank += x->level[i].span;
338 x = x->level[i].forward;
e2641e09 339 }
340
341 /* x might be equal to zsl->header, so test if obj is non-NULL */
342 if (x->obj && equalStringObjects(x->obj,o)) {
343 return rank;
344 }
345 }
346 return 0;
347}
348
349/* Finds an element by its rank. The rank argument needs to be 1-based. */
a3004773 350zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
e2641e09 351 zskiplistNode *x;
352 unsigned long traversed = 0;
353 int i;
354
355 x = zsl->header;
356 for (i = zsl->level-1; i >= 0; i--) {
2159782b 357 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
e2641e09 358 {
2159782b
PN
359 traversed += x->level[i].span;
360 x = x->level[i].forward;
e2641e09 361 }
362 if (traversed == rank) {
363 return x;
364 }
365 }
366 return NULL;
367}
368
25bb8a44 369/* Populate the rangespec according to the objects min and max. */
7236fdb2
PN
370static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
371 char *eptr;
25bb8a44
PN
372 spec->minex = spec->maxex = 0;
373
374 /* Parse the min-max interval. If one of the values is prefixed
375 * by the "(" character, it's considered "open". For instance
376 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
377 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
378 if (min->encoding == REDIS_ENCODING_INT) {
379 spec->min = (long)min->ptr;
380 } else {
381 if (((char*)min->ptr)[0] == '(') {
7236fdb2
PN
382 spec->min = strtod((char*)min->ptr+1,&eptr);
383 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
384 spec->minex = 1;
385 } else {
7236fdb2
PN
386 spec->min = strtod((char*)min->ptr,&eptr);
387 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
388 }
389 }
390 if (max->encoding == REDIS_ENCODING_INT) {
391 spec->max = (long)max->ptr;
392 } else {
393 if (((char*)max->ptr)[0] == '(') {
7236fdb2
PN
394 spec->max = strtod((char*)max->ptr+1,&eptr);
395 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
396 spec->maxex = 1;
397 } else {
7236fdb2
PN
398 spec->max = strtod((char*)max->ptr,&eptr);
399 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
400 }
401 }
402
403 return REDIS_OK;
404}
405
21c5b508
PN
406/*-----------------------------------------------------------------------------
407 * Ziplist-backed sorted set API
408 *----------------------------------------------------------------------------*/
409
410double zzlGetScore(unsigned char *sptr) {
411 unsigned char *vstr;
412 unsigned int vlen;
413 long long vlong;
414 char buf[128];
415 double score;
416
417 redisAssert(sptr != NULL);
418 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
419
420 if (vstr) {
421 memcpy(buf,vstr,vlen);
422 buf[vlen] = '\0';
423 score = strtod(buf,NULL);
424 } else {
425 score = vlong;
426 }
427
428 return score;
429}
430
431/* Compare element in sorted set with given element. */
432int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
433 unsigned char *vstr;
434 unsigned int vlen;
435 long long vlong;
436 unsigned char vbuf[32];
437 int minlen, cmp;
438
439 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
440 if (vstr == NULL) {
441 /* Store string representation of long long in buf. */
442 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
443 vstr = vbuf;
444 }
445
446 minlen = (vlen < clen) ? vlen : clen;
447 cmp = memcmp(vstr,cstr,minlen);
448 if (cmp == 0) return vlen-clen;
449 return cmp;
450}
451
9f9b60f9 452unsigned int zzlLength(robj *zobj) {
0b10e104
PN
453 unsigned char *zl = zobj->ptr;
454 return ziplistLen(zl)/2;
455}
456
4a14dbba
PN
457/* Returns if there is a part of the zset is in range. Should only be used
458 * internally by zzlFirstInRange and zzlLastInRange. */
459int zzlIsInRange(unsigned char *zl, zrangespec *range) {
460 unsigned char *p;
461 double score;
462
463 /* Test for ranges that will always be empty. */
464 if (range->min > range->max ||
465 (range->min == range->max && (range->minex || range->maxex)))
466 return 0;
467
468 p = ziplistIndex(zl,-1); /* Last score. */
469 redisAssert(p != NULL);
470 score = zzlGetScore(p);
471 if (!zslValueGteMin(score,range))
472 return 0;
473
474 p = ziplistIndex(zl,1); /* First score. */
475 redisAssert(p != NULL);
476 score = zzlGetScore(p);
477 if (!zslValueLteMax(score,range))
478 return 0;
479
480 return 1;
481}
482
483/* Find pointer to the first element contained in the specified range.
484 * Returns NULL when no element is contained in the range. */
485unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) {
486 unsigned char *zl = zobj->ptr;
487 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
488 double score;
489
490 /* If everything is out of range, return early. */
491 if (!zzlIsInRange(zl,&range)) return NULL;
492
493 while (eptr != NULL) {
494 sptr = ziplistNext(zl,eptr);
495 redisAssert(sptr != NULL);
496
497 score = zzlGetScore(sptr);
498 if (zslValueGteMin(score,&range))
499 return eptr;
500
501 /* Move to next element. */
502 eptr = ziplistNext(zl,sptr);
503 }
504
505 return NULL;
506}
507
508/* Find pointer to the last element contained in the specified range.
509 * Returns NULL when no element is contained in the range. */
510unsigned char *zzlLastInRange(robj *zobj, zrangespec range) {
511 unsigned char *zl = zobj->ptr;
512 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
513 double score;
514
515 /* If everything is out of range, return early. */
516 if (!zzlIsInRange(zl,&range)) return NULL;
517
518 while (eptr != NULL) {
519 sptr = ziplistNext(zl,eptr);
520 redisAssert(sptr != NULL);
521
522 score = zzlGetScore(sptr);
523 if (zslValueLteMax(score,&range))
524 return eptr;
525
526 /* Move to previous element by moving to the score of previous element.
527 * When this returns NULL, we know there also is no element. */
528 sptr = ziplistPrev(zl,eptr);
529 if (sptr != NULL)
530 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
531 else
532 eptr = NULL;
533 }
534
535 return NULL;
536}
537
21c5b508
PN
538unsigned char *zzlFind(robj *zobj, robj *ele, double *score) {
539 unsigned char *zl = zobj->ptr;
540 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
541
542 ele = getDecodedObject(ele);
543 while (eptr != NULL) {
544 sptr = ziplistNext(zl,eptr);
545 redisAssert(sptr != NULL);
546
547 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
548 /* Matching element, pull out score. */
0b10e104 549 if (score != NULL) *score = zzlGetScore(sptr);
21c5b508
PN
550 decrRefCount(ele);
551 return eptr;
552 }
553
554 /* Move to next element. */
555 eptr = ziplistNext(zl,sptr);
556 }
557
558 decrRefCount(ele);
559 return NULL;
560}
561
562/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
563 * don't want to modify the one given as argument. */
564int zzlDelete(robj *zobj, unsigned char *eptr) {
565 unsigned char *zl = zobj->ptr;
566 unsigned char *p = eptr;
567
568 /* TODO: add function to ziplist API to delete N elements from offset. */
569 zl = ziplistDelete(zl,&p);
570 zl = ziplistDelete(zl,&p);
571 zobj->ptr = zl;
572 return REDIS_OK;
573}
574
575int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) {
576 unsigned char *zl = zobj->ptr;
577 unsigned char *sptr;
578 char scorebuf[128];
579 int scorelen;
580 int offset;
581
582 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
583 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
584 if (eptr == NULL) {
585 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
586 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
587 } else {
588 /* Keep offset relative to zl, as it might be re-allocated. */
589 offset = eptr-zl;
590 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
591 eptr = zl+offset;
592
593 /* Insert score after the element. */
594 redisAssert((sptr = ziplistNext(zl,eptr)) != NULL);
595 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
596 }
597
598 zobj->ptr = zl;
599 return REDIS_OK;
600}
601
602/* Insert (element,score) pair in ziplist. This function assumes the element is
603 * not yet present in the list. */
604int zzlInsert(robj *zobj, robj *ele, double score) {
605 unsigned char *zl = zobj->ptr;
606 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
607 double s;
21c5b508
PN
608
609 ele = getDecodedObject(ele);
610 while (eptr != NULL) {
611 sptr = ziplistNext(zl,eptr);
612 redisAssert(sptr != NULL);
613 s = zzlGetScore(sptr);
614
615 if (s > score) {
616 /* First element with score larger than score for element to be
617 * inserted. This means we should take its spot in the list to
618 * maintain ordering. */
21c5b508
PN
619 zzlInsertAt(zobj,ele,score,eptr);
620 break;
8218db3d
PN
621 } else if (s == score) {
622 /* Ensure lexicographical ordering for elements. */
623 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) < 0) {
624 zzlInsertAt(zobj,ele,score,eptr);
625 break;
626 }
21c5b508
PN
627 }
628
629 /* Move to next element. */
630 eptr = ziplistNext(zl,sptr);
631 }
632
633 /* Push on tail of list when it was not yet inserted. */
8218db3d
PN
634 if (eptr == NULL)
635 zzlInsertAt(zobj,ele,score,NULL);
21c5b508
PN
636
637 decrRefCount(ele);
638 return REDIS_OK;
639}
25bb8a44 640
4a14dbba
PN
641unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) {
642 unsigned char *zl = zobj->ptr;
643 unsigned char *eptr, *sptr;
644 double score;
645 unsigned long deleted = 0;
646
647 eptr = zzlFirstInRange(zobj,range);
648 if (eptr == NULL) return deleted;
649
650
651 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
652 * byte and ziplistNext will return NULL. */
653 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
654 score = zzlGetScore(sptr);
655 if (zslValueLteMax(score,&range)) {
656 /* Delete both the element and the score. */
657 zl = ziplistDelete(zl,&eptr);
658 zl = ziplistDelete(zl,&eptr);
659 deleted++;
660 } else {
661 /* No longer in range. */
662 break;
663 }
664 }
665
666 return deleted;
667}
668
e2641e09 669/*-----------------------------------------------------------------------------
670 * Sorted set commands
671 *----------------------------------------------------------------------------*/
672
69ef89f2 673/* This generic command implements both ZADD and ZINCRBY. */
3ca7532a 674void zaddGenericCommand(redisClient *c, int incr) {
21c5b508 675 static char *nanerr = "resulting score is not a number (NaN)";
3ca7532a
PN
676 robj *key = c->argv[1];
677 robj *ele;
21c5b508
PN
678 robj *zobj;
679 robj *curobj;
3ca7532a
PN
680 double score, curscore = 0.0;
681
682 if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
683 return;
21c5b508
PN
684
685 zobj = lookupKeyWrite(c->db,key);
686 if (zobj == NULL) {
687 zobj = createZsetZiplistObject();
688 dbAdd(c->db,key,zobj);
e2641e09 689 } else {
21c5b508 690 if (zobj->type != REDIS_ZSET) {
e2641e09 691 addReply(c,shared.wrongtypeerr);
692 return;
693 }
694 }
e2641e09 695
21c5b508
PN
696 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
697 unsigned char *eptr;
698
3ca7532a
PN
699 /* Prefer non-encoded element when dealing with ziplists. */
700 ele = c->argv[3];
21c5b508
PN
701 if ((eptr = zzlFind(zobj,ele,&curscore)) != NULL) {
702 if (incr) {
703 score += curscore;
704 if (isnan(score)) {
705 addReplyError(c,nanerr);
706 /* Don't need to check if the sorted set is empty, because
707 * we know it has at least one element. */
708 return;
709 }
710 }
711
712 /* Remove and re-insert when score changed. */
713 if (score != curscore) {
714 redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
715 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
716
717 signalModifiedKey(c->db,key);
718 server.dirty++;
719 }
720
721 if (incr) /* ZINCRBY */
722 addReplyDouble(c,score);
723 else /* ZADD */
724 addReply(c,shared.czero);
725 } else {
726 redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
69ef89f2 727
21c5b508
PN
728 signalModifiedKey(c->db,key);
729 server.dirty++;
69ef89f2 730
21c5b508
PN
731 if (incr) /* ZINCRBY */
732 addReplyDouble(c,score);
733 else /* ZADD */
734 addReply(c,shared.cone);
735 }
736 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
737 zset *zs = zobj->ptr;
738 zskiplistNode *znode;
e2641e09 739 dictEntry *de;
e2641e09 740
3ca7532a 741 ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
e2641e09 742 de = dictFind(zs->dict,ele);
21c5b508
PN
743 if (de != NULL) {
744 curobj = dictGetEntryKey(de);
745 curscore = *(double*)dictGetEntryVal(de);
746
747 if (incr) {
748 score += curscore;
749 if (isnan(score)) {
750 addReplyError(c,nanerr);
751 /* Don't need to check if the sorted set is empty, because
752 * we know it has at least one element. */
753 return;
754 }
755 }
756
757 /* Remove and re-insert when score changed. We can safely delete
758 * the key object from the skiplist, since the dictionary still has
759 * a reference to it. */
760 if (score != curscore) {
761 redisAssert(zslDelete(zs->zsl,curscore,curobj));
762 znode = zslInsert(zs->zsl,score,curobj);
763 incrRefCount(curobj); /* Re-inserted in skiplist. */
764 dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
765
766 signalModifiedKey(c->db,key);
767 server.dirty++;
768 }
769
770 if (incr) /* ZINCRBY */
771 addReplyDouble(c,score);
772 else /* ZADD */
773 addReply(c,shared.czero);
774 } else {
775 znode = zslInsert(zs->zsl,score,ele);
776 incrRefCount(ele); /* Inserted in skiplist. */
777 redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
778 incrRefCount(ele); /* Added to dictionary. */
779
780 signalModifiedKey(c->db,key);
e2641e09 781 server.dirty++;
21c5b508
PN
782
783 if (incr) /* ZINCRBY */
784 addReplyDouble(c,score);
785 else /* ZADD */
786 addReply(c,shared.cone);
e2641e09 787 }
21c5b508
PN
788 } else {
789 redisPanic("Unknown sorted set encoding");
e2641e09 790 }
791}
792
793void zaddCommand(redisClient *c) {
3ca7532a 794 zaddGenericCommand(c,0);
e2641e09 795}
796
797void zincrbyCommand(redisClient *c) {
3ca7532a 798 zaddGenericCommand(c,1);
e2641e09 799}
800
801void zremCommand(redisClient *c) {
0b10e104
PN
802 robj *key = c->argv[1];
803 robj *ele = c->argv[2];
804 robj *zobj;
805
806 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
807 checkType(c,zobj,REDIS_ZSET)) return;
808
809 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
810 unsigned char *eptr;
811
812 if ((eptr = zzlFind(zobj,ele,NULL)) != NULL) {
813 redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
814 if (zzlLength(zobj) == 0) dbDelete(c->db,key);
815 } else {
816 addReply(c,shared.czero);
817 return;
818 }
819 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
820 zset *zs = zobj->ptr;
821 dictEntry *de;
822 double score;
823
824 de = dictFind(zs->dict,ele);
825 if (de != NULL) {
826 /* Delete from the skiplist */
827 score = *(double*)dictGetEntryVal(de);
828 redisAssert(zslDelete(zs->zsl,score,ele));
829
830 /* Delete from the hash table */
831 dictDelete(zs->dict,ele);
832 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
833 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
834 } else {
835 addReply(c,shared.czero);
836 return;
837 }
838 } else {
839 redisPanic("Unknown sorted set encoding");
e2641e09 840 }
e2641e09 841
0b10e104 842 signalModifiedKey(c->db,key);
e2641e09 843 server.dirty++;
844 addReply(c,shared.cone);
845}
846
847void zremrangebyscoreCommand(redisClient *c) {
4a14dbba
PN
848 robj *key = c->argv[1];
849 robj *zobj;
91504b6c 850 zrangespec range;
4a14dbba 851 unsigned long deleted;
e2641e09 852
91504b6c 853 /* Parse the range arguments. */
7236fdb2
PN
854 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
855 addReplyError(c,"min or max is not a double");
856 return;
857 }
e2641e09 858
4a14dbba
PN
859 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
860 checkType(c,zobj,REDIS_ZSET)) return;
861
862 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
863 deleted = zzlDeleteRangeByScore(zobj,range);
864 } else if (zobj->encoding == REDIS_ENCODING_RAW) {
865 zset *zs = zobj->ptr;
866 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
867 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
868 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
869 } else {
870 redisPanic("Unknown sorted set encoding");
871 }
e2641e09 872
4a14dbba 873 if (deleted) signalModifiedKey(c->db,key);
e2641e09 874 server.dirty += deleted;
875 addReplyLongLong(c,deleted);
876}
877
878void zremrangebyrankCommand(redisClient *c) {
879 long start;
880 long end;
881 int llen;
882 long deleted;
883 robj *zsetobj;
884 zset *zs;
885
886 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
887 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
888
889 if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
890 checkType(c,zsetobj,REDIS_ZSET)) return;
891 zs = zsetobj->ptr;
892 llen = zs->zsl->length;
893
894 /* convert negative indexes */
895 if (start < 0) start = llen+start;
896 if (end < 0) end = llen+end;
897 if (start < 0) start = 0;
e2641e09 898
d0a4e24e
PN
899 /* Invariant: start >= 0, so this test will be true when end < 0.
900 * The range is empty when start > end or start >= length. */
e2641e09 901 if (start > end || start >= llen) {
902 addReply(c,shared.czero);
903 return;
904 }
905 if (end >= llen) end = llen-1;
906
907 /* increment start and end because zsl*Rank functions
908 * use 1-based rank */
909 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
910 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
911 if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 912 if (deleted) signalModifiedKey(c->db,c->argv[1]);
e2641e09 913 server.dirty += deleted;
914 addReplyLongLong(c, deleted);
915}
916
917typedef struct {
918 dict *dict;
919 double weight;
920} zsetopsrc;
921
922int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) {
923 zsetopsrc *d1 = (void*) s1, *d2 = (void*) s2;
924 unsigned long size1, size2;
925 size1 = d1->dict ? dictSize(d1->dict) : 0;
926 size2 = d2->dict ? dictSize(d2->dict) : 0;
927 return size1 - size2;
928}
929
930#define REDIS_AGGR_SUM 1
931#define REDIS_AGGR_MIN 2
932#define REDIS_AGGR_MAX 3
933#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
934
935inline static void zunionInterAggregate(double *target, double val, int aggregate) {
936 if (aggregate == REDIS_AGGR_SUM) {
937 *target = *target + val;
d9e28bcf
PN
938 /* The result of adding two doubles is NaN when one variable
939 * is +inf and the other is -inf. When these numbers are added,
940 * we maintain the convention of the result being 0.0. */
941 if (isnan(*target)) *target = 0.0;
e2641e09 942 } else if (aggregate == REDIS_AGGR_MIN) {
943 *target = val < *target ? val : *target;
944 } else if (aggregate == REDIS_AGGR_MAX) {
945 *target = val > *target ? val : *target;
946 } else {
947 /* safety net */
948 redisPanic("Unknown ZUNION/INTER aggregate type");
949 }
950}
951
952void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
953 int i, j, setnum;
954 int aggregate = REDIS_AGGR_SUM;
955 zsetopsrc *src;
956 robj *dstobj;
957 zset *dstzset;
69ef89f2 958 zskiplistNode *znode;
e2641e09 959 dictIterator *di;
960 dictEntry *de;
8c1420ff 961 int touched = 0;
e2641e09 962
963 /* expect setnum input keys to be given */
964 setnum = atoi(c->argv[2]->ptr);
965 if (setnum < 1) {
3ab20376
PN
966 addReplyError(c,
967 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
e2641e09 968 return;
969 }
970
971 /* test if the expected number of keys would overflow */
972 if (3+setnum > c->argc) {
973 addReply(c,shared.syntaxerr);
974 return;
975 }
976
977 /* read keys to be used for input */
978 src = zmalloc(sizeof(zsetopsrc) * setnum);
979 for (i = 0, j = 3; i < setnum; i++, j++) {
980 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
981 if (!obj) {
982 src[i].dict = NULL;
983 } else {
984 if (obj->type == REDIS_ZSET) {
985 src[i].dict = ((zset*)obj->ptr)->dict;
986 } else if (obj->type == REDIS_SET) {
987 src[i].dict = (obj->ptr);
988 } else {
989 zfree(src);
990 addReply(c,shared.wrongtypeerr);
991 return;
992 }
993 }
994
995 /* default all weights to 1 */
996 src[i].weight = 1.0;
997 }
998
999 /* parse optional extra arguments */
1000 if (j < c->argc) {
1001 int remaining = c->argc - j;
1002
1003 while (remaining) {
1004 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1005 j++; remaining--;
1006 for (i = 0; i < setnum; i++, j++, remaining--) {
673e1fb7
PN
1007 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1008 "weight value is not a double") != REDIS_OK)
1009 {
1010 zfree(src);
e2641e09 1011 return;
673e1fb7 1012 }
e2641e09 1013 }
1014 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1015 j++; remaining--;
1016 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1017 aggregate = REDIS_AGGR_SUM;
1018 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1019 aggregate = REDIS_AGGR_MIN;
1020 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1021 aggregate = REDIS_AGGR_MAX;
1022 } else {
1023 zfree(src);
1024 addReply(c,shared.syntaxerr);
1025 return;
1026 }
1027 j++; remaining--;
1028 } else {
1029 zfree(src);
1030 addReply(c,shared.syntaxerr);
1031 return;
1032 }
1033 }
1034 }
1035
1036 /* sort sets from the smallest to largest, this will improve our
1037 * algorithm's performance */
1038 qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality);
1039
1040 dstobj = createZsetObject();
1041 dstzset = dstobj->ptr;
1042
1043 if (op == REDIS_OP_INTER) {
1044 /* skip going over all entries if the smallest zset is NULL or empty */
1045 if (src[0].dict && dictSize(src[0].dict) > 0) {
1046 /* precondition: as src[0].dict is non-empty and the zsets are ordered
1047 * from small to large, all src[i > 0].dict are non-empty too */
1048 di = dictGetIterator(src[0].dict);
1049 while((de = dictNext(di)) != NULL) {
d433ebc6 1050 double score, value;
e2641e09 1051
50a9fad5 1052 score = src[0].weight * zunionInterDictValue(de);
e2641e09 1053 for (j = 1; j < setnum; j++) {
1054 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
1055 if (other) {
1056 value = src[j].weight * zunionInterDictValue(other);
d433ebc6 1057 zunionInterAggregate(&score,value,aggregate);
e2641e09 1058 } else {
1059 break;
1060 }
1061 }
1062
d433ebc6
PN
1063 /* Only continue when present in every source dict. */
1064 if (j == setnum) {
e2641e09 1065 robj *o = dictGetEntryKey(de);
d433ebc6 1066 znode = zslInsert(dstzset->zsl,score,o);
e2641e09 1067 incrRefCount(o); /* added to skiplist */
69ef89f2
PN
1068 dictAdd(dstzset->dict,o,&znode->score);
1069 incrRefCount(o); /* added to dictionary */
e2641e09 1070 }
1071 }
1072 dictReleaseIterator(di);
1073 }
1074 } else if (op == REDIS_OP_UNION) {
1075 for (i = 0; i < setnum; i++) {
1076 if (!src[i].dict) continue;
1077
1078 di = dictGetIterator(src[i].dict);
1079 while((de = dictNext(di)) != NULL) {
d433ebc6
PN
1080 double score, value;
1081
e2641e09 1082 /* skip key when already processed */
d433ebc6
PN
1083 if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL)
1084 continue;
e2641e09 1085
d433ebc6
PN
1086 /* initialize score */
1087 score = src[i].weight * zunionInterDictValue(de);
e2641e09 1088
1089 /* because the zsets are sorted by size, its only possible
1090 * for sets at larger indices to hold this entry */
1091 for (j = (i+1); j < setnum; j++) {
1092 dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
1093 if (other) {
1094 value = src[j].weight * zunionInterDictValue(other);
d433ebc6 1095 zunionInterAggregate(&score,value,aggregate);
e2641e09 1096 }
1097 }
1098
1099 robj *o = dictGetEntryKey(de);
d433ebc6 1100 znode = zslInsert(dstzset->zsl,score,o);
e2641e09 1101 incrRefCount(o); /* added to skiplist */
69ef89f2
PN
1102 dictAdd(dstzset->dict,o,&znode->score);
1103 incrRefCount(o); /* added to dictionary */
e2641e09 1104 }
1105 dictReleaseIterator(di);
1106 }
1107 } else {
1108 /* unknown operator */
1109 redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION);
1110 }
1111
8c1420ff 1112 if (dbDelete(c->db,dstkey)) {
cea8c5cd 1113 signalModifiedKey(c->db,dstkey);
8c1420ff 1114 touched = 1;
1115 server.dirty++;
1116 }
e2641e09 1117 if (dstzset->zsl->length) {
1118 dbAdd(c->db,dstkey,dstobj);
1119 addReplyLongLong(c, dstzset->zsl->length);
cea8c5cd 1120 if (!touched) signalModifiedKey(c->db,dstkey);
cbf7e107 1121 server.dirty++;
e2641e09 1122 } else {
1123 decrRefCount(dstobj);
1124 addReply(c, shared.czero);
1125 }
1126 zfree(src);
1127}
1128
1129void zunionstoreCommand(redisClient *c) {
1130 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1131}
1132
1133void zinterstoreCommand(redisClient *c) {
1134 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1135}
1136
1137void zrangeGenericCommand(redisClient *c, int reverse) {
1138 robj *o;
1139 long start;
1140 long end;
1141 int withscores = 0;
1142 int llen;
1143 int rangelen, j;
1144 zset *zsetobj;
1145 zskiplist *zsl;
1146 zskiplistNode *ln;
1147 robj *ele;
1148
1149 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1150 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1151
1152 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1153 withscores = 1;
1154 } else if (c->argc >= 5) {
1155 addReply(c,shared.syntaxerr);
1156 return;
1157 }
1158
1159 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
1160 || checkType(c,o,REDIS_ZSET)) return;
1161 zsetobj = o->ptr;
1162 zsl = zsetobj->zsl;
1163 llen = zsl->length;
1164
1165 /* convert negative indexes */
1166 if (start < 0) start = llen+start;
1167 if (end < 0) end = llen+end;
1168 if (start < 0) start = 0;
e2641e09 1169
d0a4e24e
PN
1170 /* Invariant: start >= 0, so this test will be true when end < 0.
1171 * The range is empty when start > end or start >= length. */
e2641e09 1172 if (start > end || start >= llen) {
e2641e09 1173 addReply(c,shared.emptymultibulk);
1174 return;
1175 }
1176 if (end >= llen) end = llen-1;
1177 rangelen = (end-start)+1;
1178
1179 /* check if starting point is trivial, before searching
1180 * the element in log(N) time */
1181 if (reverse) {
a3004773 1182 ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start);
e2641e09 1183 } else {
1184 ln = start == 0 ?
a3004773 1185 zsl->header->level[0].forward : zslGetElementByRank(zsl, start+1);
e2641e09 1186 }
1187
1188 /* Return the result in form of a multi-bulk reply */
0537e7bf 1189 addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen);
e2641e09 1190 for (j = 0; j < rangelen; j++) {
1191 ele = ln->obj;
1192 addReplyBulk(c,ele);
1193 if (withscores)
1194 addReplyDouble(c,ln->score);
2159782b 1195 ln = reverse ? ln->backward : ln->level[0].forward;
e2641e09 1196 }
1197}
1198
1199void zrangeCommand(redisClient *c) {
1200 zrangeGenericCommand(c,0);
1201}
1202
1203void zrevrangeCommand(redisClient *c) {
1204 zrangeGenericCommand(c,1);
1205}
1206
25bb8a44
PN
1207/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
1208 * If "justcount", only the number of elements in the range is returned. */
1209void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
1210 zrangespec range;
1211 robj *o, *emptyreply;
1212 zset *zsetobj;
1213 zskiplist *zsl;
1214 zskiplistNode *ln;
e2641e09 1215 int offset = 0, limit = -1;
1216 int withscores = 0;
25bb8a44
PN
1217 unsigned long rangelen = 0;
1218 void *replylen = NULL;
22b9bf15 1219 int minidx, maxidx;
e2641e09 1220
25bb8a44 1221 /* Parse the range arguments. */
22b9bf15
PN
1222 if (reverse) {
1223 /* Range is given as [max,min] */
1224 maxidx = 2; minidx = 3;
1225 } else {
1226 /* Range is given as [min,max] */
1227 minidx = 2; maxidx = 3;
1228 }
1229
1230 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
7236fdb2
PN
1231 addReplyError(c,"min or max is not a double");
1232 return;
1233 }
25bb8a44
PN
1234
1235 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1236 * 4 arguments, so we'll never enter the following code path. */
1237 if (c->argc > 4) {
1238 int remaining = c->argc - 4;
1239 int pos = 4;
1240
1241 while (remaining) {
1242 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1243 pos++; remaining--;
1244 withscores = 1;
1245 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1246 offset = atoi(c->argv[pos+1]->ptr);
1247 limit = atoi(c->argv[pos+2]->ptr);
1248 pos += 3; remaining -= 3;
1249 } else {
1250 addReply(c,shared.syntaxerr);
1251 return;
1252 }
1253 }
e2641e09 1254 }
25bb8a44
PN
1255
1256 /* Ok, lookup the key and get the range */
1257 emptyreply = justcount ? shared.czero : shared.emptymultibulk;
1258 if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyreply)) == NULL ||
1259 checkType(c,o,REDIS_ZSET)) return;
1260 zsetobj = o->ptr;
1261 zsl = zsetobj->zsl;
1262
22b9bf15 1263 /* If reversed, get the last node in range as starting point. */
25bb8a44 1264 if (reverse) {
22b9bf15 1265 ln = zslLastInRange(zsl,range);
e2641e09 1266 } else {
22b9bf15 1267 ln = zslFirstInRange(zsl,range);
e2641e09 1268 }
1269
25bb8a44
PN
1270 /* No "first" element in the specified interval. */
1271 if (ln == NULL) {
1272 addReply(c,emptyreply);
e2641e09 1273 return;
1274 }
1275
22b9bf15
PN
1276 /* We don't know in advance how many matching elements there are in the
1277 * list, so we push this object that will represent the multi-bulk length
1278 * in the output buffer, and will "fix" it later */
25bb8a44
PN
1279 if (!justcount)
1280 replylen = addDeferredMultiBulkLength(c);
e2641e09 1281
25bb8a44
PN
1282 /* If there is an offset, just traverse the number of elements without
1283 * checking the score because that is done in the next loop. */
1284 while(ln && offset--) {
22b9bf15 1285 ln = reverse ? ln->backward : ln->level[0].forward;
25bb8a44 1286 }
e2641e09 1287
25bb8a44 1288 while (ln && limit--) {
22b9bf15 1289 /* Abort when the node is no longer in range. */
25bb8a44 1290 if (reverse) {
45290ad9 1291 if (!zslValueGteMin(ln->score,&range)) break;
25bb8a44 1292 } else {
45290ad9 1293 if (!zslValueLteMax(ln->score,&range)) break;
e2641e09 1294 }
25bb8a44
PN
1295
1296 /* Do our magic */
1297 rangelen++;
1298 if (!justcount) {
1299 addReplyBulk(c,ln->obj);
1300 if (withscores)
1301 addReplyDouble(c,ln->score);
1302 }
1303
22b9bf15
PN
1304 /* Move to next node */
1305 ln = reverse ? ln->backward : ln->level[0].forward;
25bb8a44
PN
1306 }
1307
1308 if (justcount) {
1309 addReplyLongLong(c,(long)rangelen);
1310 } else {
1311 setDeferredMultiBulkLength(c,replylen,
1312 withscores ? (rangelen*2) : rangelen);
e2641e09 1313 }
1314}
1315
1316void zrangebyscoreCommand(redisClient *c) {
25bb8a44
PN
1317 genericZrangebyscoreCommand(c,0,0);
1318}
1319
1320void zrevrangebyscoreCommand(redisClient *c) {
1321 genericZrangebyscoreCommand(c,1,0);
e2641e09 1322}
1323
1324void zcountCommand(redisClient *c) {
25bb8a44 1325 genericZrangebyscoreCommand(c,0,1);
e2641e09 1326}
1327
1328void zcardCommand(redisClient *c) {
1329 robj *o;
1330 zset *zs;
1331
1332 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
1333 checkType(c,o,REDIS_ZSET)) return;
1334
1335 zs = o->ptr;
b70d3555 1336 addReplyLongLong(c,zs->zsl->length);
e2641e09 1337}
1338
1339void zscoreCommand(redisClient *c) {
1340 robj *o;
1341 zset *zs;
1342 dictEntry *de;
1343
1344 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
1345 checkType(c,o,REDIS_ZSET)) return;
1346
1347 zs = o->ptr;
75b41de8 1348 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 1349 de = dictFind(zs->dict,c->argv[2]);
1350 if (!de) {
1351 addReply(c,shared.nullbulk);
1352 } else {
1353 double *score = dictGetEntryVal(de);
1354
1355 addReplyDouble(c,*score);
1356 }
1357}
1358
1359void zrankGenericCommand(redisClient *c, int reverse) {
1360 robj *o;
1361 zset *zs;
1362 zskiplist *zsl;
1363 dictEntry *de;
1364 unsigned long rank;
1365 double *score;
1366
1367 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
1368 checkType(c,o,REDIS_ZSET)) return;
1369
1370 zs = o->ptr;
1371 zsl = zs->zsl;
75b41de8 1372 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 1373 de = dictFind(zs->dict,c->argv[2]);
1374 if (!de) {
1375 addReply(c,shared.nullbulk);
1376 return;
1377 }
1378
1379 score = dictGetEntryVal(de);
a3004773 1380 rank = zslGetRank(zsl, *score, c->argv[2]);
e2641e09 1381 if (rank) {
1382 if (reverse) {
1383 addReplyLongLong(c, zsl->length - rank);
1384 } else {
1385 addReplyLongLong(c, rank-1);
1386 }
1387 } else {
1388 addReply(c,shared.nullbulk);
1389 }
1390}
1391
1392void zrankCommand(redisClient *c) {
1393 zrankGenericCommand(c, 0);
1394}
1395
1396void zrevrankCommand(redisClient *c) {
1397 zrankGenericCommand(c, 1);
1398}