]> git.saurik.com Git - redis.git/blame - src/t_zset.c
Make replication faster (biggest gain for small number of slaves)
[redis.git] / src / t_zset.c
CommitLineData
e2641e09 1#include "redis.h"
2
3#include <math.h>
4
5/*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9/* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17/* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
2159782b 27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
e2641e09 28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31}
32
33zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
2159782b
PN
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
e2641e09 44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48}
49
50void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
e2641e09 52 zfree(node);
53}
54
55void zslFree(zskiplist *zsl) {
2159782b 56 zskiplistNode *node = zsl->header->level[0].forward, *next;
e2641e09 57
e2641e09 58 zfree(zsl->header);
59 while(node) {
2159782b 60 next = node->level[0].forward;
e2641e09 61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65}
66
67int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72}
73
69ef89f2 74zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
e2641e09 75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
2159782b
PN
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
e2641e09 89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
2159782b 101 update[i]->level[i].span = zsl->length;
e2641e09 102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
2159782b
PN
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
e2641e09 109
110 /* update span covered by update[i] as x is inserted here */
2159782b
PN
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
e2641e09 113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
2159782b 117 update[i]->level[i].span++;
e2641e09 118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
2159782b
PN
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
e2641e09 123 else
124 zsl->tail = x;
125 zsl->length++;
69ef89f2 126 return x;
e2641e09 127}
128
129/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
2159782b
PN
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
e2641e09 136 } else {
2159782b 137 update[i]->level[i].span -= 1;
e2641e09 138 }
139 }
2159782b
PN
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
e2641e09 142 } else {
143 zsl->tail = x->backward;
144 }
2159782b 145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
e2641e09 146 zsl->level--;
147 zsl->length--;
148}
149
150/* Delete an element with matching score/object from the skiplist. */
151int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
e2641e09 162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
2159782b 166 x = x->level[0].forward;
e2641e09 167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175}
176
45290ad9 177static int zslValueGteMin(double value, zrangespec *spec) {
22b9bf15
PN
178 return spec->minex ? (value > spec->min) : (value >= spec->min);
179}
180
45290ad9 181static int zslValueLteMax(double value, zrangespec *spec) {
22b9bf15
PN
182 return spec->maxex ? (value < spec->max) : (value <= spec->max);
183}
184
22b9bf15
PN
185/* Returns if there is a part of the zset is in range. */
186int zslIsInRange(zskiplist *zsl, zrangespec *range) {
187 zskiplistNode *x;
188
8e1b3277
PN
189 /* Test for ranges that will always be empty. */
190 if (range->min > range->max ||
191 (range->min == range->max && (range->minex || range->maxex)))
192 return 0;
22b9bf15 193 x = zsl->tail;
45290ad9 194 if (x == NULL || !zslValueGteMin(x->score,range))
22b9bf15
PN
195 return 0;
196 x = zsl->header->level[0].forward;
45290ad9 197 if (x == NULL || !zslValueLteMax(x->score,range))
22b9bf15
PN
198 return 0;
199 return 1;
200}
201
202/* Find the first node that is contained in the specified range.
203 * Returns NULL when no element is contained in the range. */
204zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
205 zskiplistNode *x;
206 int i;
207
208 /* If everything is out of range, return early. */
209 if (!zslIsInRange(zsl,&range)) return NULL;
210
211 x = zsl->header;
212 for (i = zsl->level-1; i >= 0; i--) {
213 /* Go forward while *OUT* of range. */
214 while (x->level[i].forward &&
45290ad9 215 !zslValueGteMin(x->level[i].forward->score,&range))
22b9bf15
PN
216 x = x->level[i].forward;
217 }
218
e53ca04b 219 /* This is an inner range, so the next node cannot be NULL. */
22b9bf15 220 x = x->level[0].forward;
e53ca04b
PN
221 redisAssert(x != NULL);
222
223 /* Check if score <= max. */
224 if (!zslValueLteMax(x->score,&range)) return NULL;
22b9bf15
PN
225 return x;
226}
227
228/* Find the last node that is contained in the specified range.
229 * Returns NULL when no element is contained in the range. */
230zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
231 zskiplistNode *x;
232 int i;
233
234 /* If everything is out of range, return early. */
235 if (!zslIsInRange(zsl,&range)) return NULL;
236
237 x = zsl->header;
238 for (i = zsl->level-1; i >= 0; i--) {
239 /* Go forward while *IN* range. */
240 while (x->level[i].forward &&
45290ad9 241 zslValueLteMax(x->level[i].forward->score,&range))
22b9bf15
PN
242 x = x->level[i].forward;
243 }
244
e53ca04b
PN
245 /* This is an inner range, so this node cannot be NULL. */
246 redisAssert(x != NULL);
247
248 /* Check if score >= min. */
249 if (!zslValueGteMin(x->score,&range)) return NULL;
22b9bf15
PN
250 return x;
251}
252
e2641e09 253/* Delete all the elements with score between min and max from the skiplist.
254 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
255 * Note that this function takes the reference to the hash table view of the
256 * sorted set, in order to remove the elements from the hash table too. */
91504b6c 257unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
e2641e09 258 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
259 unsigned long removed = 0;
260 int i;
261
262 x = zsl->header;
263 for (i = zsl->level-1; i >= 0; i--) {
91504b6c
PN
264 while (x->level[i].forward && (range.minex ?
265 x->level[i].forward->score <= range.min :
266 x->level[i].forward->score < range.min))
267 x = x->level[i].forward;
e2641e09 268 update[i] = x;
269 }
91504b6c
PN
270
271 /* Current node is the last with score < or <= min. */
2159782b 272 x = x->level[0].forward;
91504b6c
PN
273
274 /* Delete nodes while in range. */
275 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
2159782b 276 zskiplistNode *next = x->level[0].forward;
69ef89f2 277 zslDeleteNode(zsl,x,update);
e2641e09 278 dictDelete(dict,x->obj);
279 zslFreeNode(x);
280 removed++;
281 x = next;
282 }
91504b6c 283 return removed;
e2641e09 284}
285
286/* Delete all the elements with rank between start and end from the skiplist.
287 * Start and end are inclusive. Note that start and end need to be 1-based */
288unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
289 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
290 unsigned long traversed = 0, removed = 0;
291 int i;
292
293 x = zsl->header;
294 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
295 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
296 traversed += x->level[i].span;
297 x = x->level[i].forward;
e2641e09 298 }
299 update[i] = x;
300 }
301
302 traversed++;
2159782b 303 x = x->level[0].forward;
e2641e09 304 while (x && traversed <= end) {
2159782b 305 zskiplistNode *next = x->level[0].forward;
69ef89f2 306 zslDeleteNode(zsl,x,update);
e2641e09 307 dictDelete(dict,x->obj);
308 zslFreeNode(x);
309 removed++;
310 traversed++;
311 x = next;
312 }
313 return removed;
314}
315
e2641e09 316/* Find the rank for an element by both score and key.
317 * Returns 0 when the element cannot be found, rank otherwise.
318 * Note that the rank is 1-based due to the span of zsl->header to the
319 * first element. */
a3004773 320unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
e2641e09 321 zskiplistNode *x;
322 unsigned long rank = 0;
323 int i;
324
325 x = zsl->header;
326 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
327 while (x->level[i].forward &&
328 (x->level[i].forward->score < score ||
329 (x->level[i].forward->score == score &&
330 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
331 rank += x->level[i].span;
332 x = x->level[i].forward;
e2641e09 333 }
334
335 /* x might be equal to zsl->header, so test if obj is non-NULL */
336 if (x->obj && equalStringObjects(x->obj,o)) {
337 return rank;
338 }
339 }
340 return 0;
341}
342
343/* Finds an element by its rank. The rank argument needs to be 1-based. */
a3004773 344zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
e2641e09 345 zskiplistNode *x;
346 unsigned long traversed = 0;
347 int i;
348
349 x = zsl->header;
350 for (i = zsl->level-1; i >= 0; i--) {
2159782b 351 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
e2641e09 352 {
2159782b
PN
353 traversed += x->level[i].span;
354 x = x->level[i].forward;
e2641e09 355 }
356 if (traversed == rank) {
357 return x;
358 }
359 }
360 return NULL;
361}
362
25bb8a44 363/* Populate the rangespec according to the objects min and max. */
7236fdb2
PN
364static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
365 char *eptr;
25bb8a44
PN
366 spec->minex = spec->maxex = 0;
367
368 /* Parse the min-max interval. If one of the values is prefixed
369 * by the "(" character, it's considered "open". For instance
370 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
371 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
372 if (min->encoding == REDIS_ENCODING_INT) {
373 spec->min = (long)min->ptr;
374 } else {
375 if (((char*)min->ptr)[0] == '(') {
7236fdb2
PN
376 spec->min = strtod((char*)min->ptr+1,&eptr);
377 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
378 spec->minex = 1;
379 } else {
7236fdb2
PN
380 spec->min = strtod((char*)min->ptr,&eptr);
381 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
382 }
383 }
384 if (max->encoding == REDIS_ENCODING_INT) {
385 spec->max = (long)max->ptr;
386 } else {
387 if (((char*)max->ptr)[0] == '(') {
7236fdb2
PN
388 spec->max = strtod((char*)max->ptr+1,&eptr);
389 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
390 spec->maxex = 1;
391 } else {
7236fdb2
PN
392 spec->max = strtod((char*)max->ptr,&eptr);
393 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
394 }
395 }
396
397 return REDIS_OK;
398}
399
21c5b508
PN
400/*-----------------------------------------------------------------------------
401 * Ziplist-backed sorted set API
402 *----------------------------------------------------------------------------*/
403
404double zzlGetScore(unsigned char *sptr) {
405 unsigned char *vstr;
406 unsigned int vlen;
407 long long vlong;
408 char buf[128];
409 double score;
410
411 redisAssert(sptr != NULL);
412 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
413
414 if (vstr) {
415 memcpy(buf,vstr,vlen);
416 buf[vlen] = '\0';
417 score = strtod(buf,NULL);
418 } else {
419 score = vlong;
420 }
421
422 return score;
423}
424
425/* Compare element in sorted set with given element. */
426int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
427 unsigned char *vstr;
428 unsigned int vlen;
429 long long vlong;
430 unsigned char vbuf[32];
431 int minlen, cmp;
432
433 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
434 if (vstr == NULL) {
435 /* Store string representation of long long in buf. */
436 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
437 vstr = vbuf;
438 }
439
440 minlen = (vlen < clen) ? vlen : clen;
441 cmp = memcmp(vstr,cstr,minlen);
442 if (cmp == 0) return vlen-clen;
443 return cmp;
444}
445
bbfe232f 446unsigned int zzlLength(unsigned char *zl) {
0b10e104
PN
447 return ziplistLen(zl)/2;
448}
449
4c5f0966
PN
450/* Move to next entry based on the values in eptr and sptr. Both are set to
451 * NULL when there is no next entry. */
452void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
453 unsigned char *_eptr, *_sptr;
454 redisAssert(*eptr != NULL && *sptr != NULL);
455
456 _eptr = ziplistNext(zl,*sptr);
457 if (_eptr != NULL) {
458 _sptr = ziplistNext(zl,_eptr);
459 redisAssert(_sptr != NULL);
460 } else {
461 /* No next entry. */
462 _sptr = NULL;
463 }
464
465 *eptr = _eptr;
466 *sptr = _sptr;
467}
468
469/* Move to the previous entry based on the values in eptr and sptr. Both are
470 * set to NULL when there is no next entry. */
471void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
472 unsigned char *_eptr, *_sptr;
473 redisAssert(*eptr != NULL && *sptr != NULL);
474
475 _sptr = ziplistPrev(zl,*eptr);
476 if (_sptr != NULL) {
477 _eptr = ziplistPrev(zl,_sptr);
478 redisAssert(_eptr != NULL);
479 } else {
480 /* No previous entry. */
481 _eptr = NULL;
482 }
483
484 *eptr = _eptr;
485 *sptr = _sptr;
486}
487
4a14dbba
PN
488/* Returns if there is a part of the zset is in range. Should only be used
489 * internally by zzlFirstInRange and zzlLastInRange. */
490int zzlIsInRange(unsigned char *zl, zrangespec *range) {
491 unsigned char *p;
492 double score;
493
494 /* Test for ranges that will always be empty. */
495 if (range->min > range->max ||
496 (range->min == range->max && (range->minex || range->maxex)))
497 return 0;
498
499 p = ziplistIndex(zl,-1); /* Last score. */
500 redisAssert(p != NULL);
501 score = zzlGetScore(p);
502 if (!zslValueGteMin(score,range))
503 return 0;
504
505 p = ziplistIndex(zl,1); /* First score. */
506 redisAssert(p != NULL);
507 score = zzlGetScore(p);
508 if (!zslValueLteMax(score,range))
509 return 0;
510
511 return 1;
512}
513
514/* Find pointer to the first element contained in the specified range.
515 * Returns NULL when no element is contained in the range. */
8588bfa3 516unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
4a14dbba
PN
517 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
518 double score;
519
520 /* If everything is out of range, return early. */
521 if (!zzlIsInRange(zl,&range)) return NULL;
522
523 while (eptr != NULL) {
524 sptr = ziplistNext(zl,eptr);
525 redisAssert(sptr != NULL);
526
527 score = zzlGetScore(sptr);
e53ca04b
PN
528 if (zslValueGteMin(score,&range)) {
529 /* Check if score <= max. */
530 if (zslValueLteMax(score,&range))
531 return eptr;
532 return NULL;
533 }
4a14dbba
PN
534
535 /* Move to next element. */
536 eptr = ziplistNext(zl,sptr);
537 }
538
539 return NULL;
540}
541
542/* Find pointer to the last element contained in the specified range.
543 * Returns NULL when no element is contained in the range. */
8588bfa3 544unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) {
4a14dbba
PN
545 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
546 double score;
547
548 /* If everything is out of range, return early. */
549 if (!zzlIsInRange(zl,&range)) return NULL;
550
551 while (eptr != NULL) {
552 sptr = ziplistNext(zl,eptr);
553 redisAssert(sptr != NULL);
554
555 score = zzlGetScore(sptr);
e53ca04b
PN
556 if (zslValueLteMax(score,&range)) {
557 /* Check if score >= min. */
558 if (zslValueGteMin(score,&range))
559 return eptr;
560 return NULL;
561 }
4a14dbba
PN
562
563 /* Move to previous element by moving to the score of previous element.
564 * When this returns NULL, we know there also is no element. */
565 sptr = ziplistPrev(zl,eptr);
566 if (sptr != NULL)
567 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
568 else
569 eptr = NULL;
570 }
571
572 return NULL;
573}
574
8588bfa3 575unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
21c5b508
PN
576 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
577
578 ele = getDecodedObject(ele);
579 while (eptr != NULL) {
580 sptr = ziplistNext(zl,eptr);
581 redisAssert(sptr != NULL);
582
583 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
584 /* Matching element, pull out score. */
0b10e104 585 if (score != NULL) *score = zzlGetScore(sptr);
21c5b508
PN
586 decrRefCount(ele);
587 return eptr;
588 }
589
590 /* Move to next element. */
591 eptr = ziplistNext(zl,sptr);
592 }
593
594 decrRefCount(ele);
595 return NULL;
596}
597
598/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
599 * don't want to modify the one given as argument. */
8588bfa3 600unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
21c5b508
PN
601 unsigned char *p = eptr;
602
603 /* TODO: add function to ziplist API to delete N elements from offset. */
604 zl = ziplistDelete(zl,&p);
605 zl = ziplistDelete(zl,&p);
8588bfa3 606 return zl;
21c5b508
PN
607}
608
8588bfa3 609unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
21c5b508
PN
610 unsigned char *sptr;
611 char scorebuf[128];
612 int scorelen;
69298a05 613 size_t offset;
21c5b508
PN
614
615 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
616 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
617 if (eptr == NULL) {
618 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
619 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
620 } else {
621 /* Keep offset relative to zl, as it might be re-allocated. */
622 offset = eptr-zl;
623 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
624 eptr = zl+offset;
625
626 /* Insert score after the element. */
627 redisAssert((sptr = ziplistNext(zl,eptr)) != NULL);
628 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
629 }
630
8588bfa3 631 return zl;
21c5b508
PN
632}
633
634/* Insert (element,score) pair in ziplist. This function assumes the element is
635 * not yet present in the list. */
8588bfa3 636unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
21c5b508
PN
637 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
638 double s;
21c5b508
PN
639
640 ele = getDecodedObject(ele);
641 while (eptr != NULL) {
642 sptr = ziplistNext(zl,eptr);
643 redisAssert(sptr != NULL);
644 s = zzlGetScore(sptr);
645
646 if (s > score) {
647 /* First element with score larger than score for element to be
648 * inserted. This means we should take its spot in the list to
649 * maintain ordering. */
8588bfa3 650 zl = zzlInsertAt(zl,eptr,ele,score);
21c5b508 651 break;
8218db3d
PN
652 } else if (s == score) {
653 /* Ensure lexicographical ordering for elements. */
d1c920c5 654 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
8588bfa3 655 zl = zzlInsertAt(zl,eptr,ele,score);
8218db3d
PN
656 break;
657 }
21c5b508
PN
658 }
659
660 /* Move to next element. */
661 eptr = ziplistNext(zl,sptr);
662 }
663
664 /* Push on tail of list when it was not yet inserted. */
8218db3d 665 if (eptr == NULL)
8588bfa3 666 zl = zzlInsertAt(zl,NULL,ele,score);
21c5b508
PN
667
668 decrRefCount(ele);
8588bfa3 669 return zl;
21c5b508 670}
25bb8a44 671
8588bfa3 672unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
4a14dbba
PN
673 unsigned char *eptr, *sptr;
674 double score;
8588bfa3 675 unsigned long num = 0;
4a14dbba 676
8588bfa3 677 if (deleted != NULL) *deleted = 0;
4a14dbba 678
8588bfa3
PN
679 eptr = zzlFirstInRange(zl,range);
680 if (eptr == NULL) return zl;
4a14dbba
PN
681
682 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
683 * byte and ziplistNext will return NULL. */
684 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
685 score = zzlGetScore(sptr);
686 if (zslValueLteMax(score,&range)) {
687 /* Delete both the element and the score. */
688 zl = ziplistDelete(zl,&eptr);
689 zl = ziplistDelete(zl,&eptr);
8588bfa3 690 num++;
4a14dbba
PN
691 } else {
692 /* No longer in range. */
693 break;
694 }
695 }
696
8588bfa3
PN
697 if (deleted != NULL) *deleted = num;
698 return zl;
4a14dbba
PN
699}
700
63b7b7fb
PN
701/* Delete all the elements with rank between start and end from the skiplist.
702 * Start and end are inclusive. Note that start and end need to be 1-based */
8588bfa3 703unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
63b7b7fb 704 unsigned int num = (end-start)+1;
8588bfa3
PN
705 if (deleted) *deleted = num;
706 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
707 return zl;
63b7b7fb
PN
708}
709
5d1b4fb6
PN
710/*-----------------------------------------------------------------------------
711 * Common sorted set API
712 *----------------------------------------------------------------------------*/
713
df26a0ae 714unsigned int zsetLength(robj *zobj) {
5d1b4fb6
PN
715 int length = -1;
716 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
bbfe232f 717 length = zzlLength(zobj->ptr);
100ed062 718 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
5d1b4fb6
PN
719 length = ((zset*)zobj->ptr)->zsl->length;
720 } else {
721 redisPanic("Unknown sorted set encoding");
722 }
723 return length;
724}
725
df26a0ae 726void zsetConvert(robj *zobj, int encoding) {
a669d5e9
PN
727 zset *zs;
728 zskiplistNode *node, *next;
729 robj *ele;
730 double score;
731
732 if (zobj->encoding == encoding) return;
733 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
734 unsigned char *zl = zobj->ptr;
735 unsigned char *eptr, *sptr;
736 unsigned char *vstr;
737 unsigned int vlen;
738 long long vlong;
739
100ed062 740 if (encoding != REDIS_ENCODING_SKIPLIST)
a669d5e9
PN
741 redisPanic("Unknown target encoding");
742
743 zs = zmalloc(sizeof(*zs));
744 zs->dict = dictCreate(&zsetDictType,NULL);
745 zs->zsl = zslCreate();
746
747 eptr = ziplistIndex(zl,0);
748 redisAssert(eptr != NULL);
749 sptr = ziplistNext(zl,eptr);
750 redisAssert(sptr != NULL);
751
752 while (eptr != NULL) {
753 score = zzlGetScore(sptr);
754 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
755 if (vstr == NULL)
756 ele = createStringObjectFromLongLong(vlong);
757 else
758 ele = createStringObject((char*)vstr,vlen);
759
760 /* Has incremented refcount since it was just created. */
761 node = zslInsert(zs->zsl,score,ele);
762 redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
763 incrRefCount(ele); /* Added to dictionary. */
764 zzlNext(zl,&eptr,&sptr);
765 }
766
767 zfree(zobj->ptr);
768 zobj->ptr = zs;
100ed062
PN
769 zobj->encoding = REDIS_ENCODING_SKIPLIST;
770 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
a669d5e9
PN
771 unsigned char *zl = ziplistNew();
772
773 if (encoding != REDIS_ENCODING_ZIPLIST)
774 redisPanic("Unknown target encoding");
775
776 /* Approach similar to zslFree(), since we want to free the skiplist at
777 * the same time as creating the ziplist. */
778 zs = zobj->ptr;
779 dictRelease(zs->dict);
780 node = zs->zsl->header->level[0].forward;
781 zfree(zs->zsl->header);
782 zfree(zs->zsl);
783
a669d5e9
PN
784 while (node) {
785 ele = getDecodedObject(node->obj);
8588bfa3 786 zl = zzlInsertAt(zl,NULL,ele,node->score);
a669d5e9
PN
787 decrRefCount(ele);
788
789 next = node->level[0].forward;
790 zslFreeNode(node);
791 node = next;
792 }
793
794 zfree(zs);
8588bfa3 795 zobj->ptr = zl;
a669d5e9
PN
796 zobj->encoding = REDIS_ENCODING_ZIPLIST;
797 } else {
798 redisPanic("Unknown sorted set encoding");
799 }
800}
801
e2641e09 802/*-----------------------------------------------------------------------------
803 * Sorted set commands
804 *----------------------------------------------------------------------------*/
805
69ef89f2 806/* This generic command implements both ZADD and ZINCRBY. */
3ca7532a 807void zaddGenericCommand(redisClient *c, int incr) {
21c5b508 808 static char *nanerr = "resulting score is not a number (NaN)";
3ca7532a
PN
809 robj *key = c->argv[1];
810 robj *ele;
21c5b508
PN
811 robj *zobj;
812 robj *curobj;
3ca7532a
PN
813 double score, curscore = 0.0;
814
815 if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
816 return;
21c5b508
PN
817
818 zobj = lookupKeyWrite(c->db,key);
819 if (zobj == NULL) {
a669d5e9
PN
820 if (server.zset_max_ziplist_entries == 0 ||
821 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
822 {
823 zobj = createZsetObject();
824 } else {
825 zobj = createZsetZiplistObject();
826 }
21c5b508 827 dbAdd(c->db,key,zobj);
e2641e09 828 } else {
21c5b508 829 if (zobj->type != REDIS_ZSET) {
e2641e09 830 addReply(c,shared.wrongtypeerr);
831 return;
832 }
833 }
e2641e09 834
21c5b508
PN
835 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
836 unsigned char *eptr;
837
3ca7532a
PN
838 /* Prefer non-encoded element when dealing with ziplists. */
839 ele = c->argv[3];
8588bfa3 840 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
21c5b508
PN
841 if (incr) {
842 score += curscore;
843 if (isnan(score)) {
844 addReplyError(c,nanerr);
845 /* Don't need to check if the sorted set is empty, because
846 * we know it has at least one element. */
847 return;
848 }
849 }
850
851 /* Remove and re-insert when score changed. */
852 if (score != curscore) {
8588bfa3
PN
853 zobj->ptr = zzlDelete(zobj->ptr,eptr);
854 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
21c5b508
PN
855
856 signalModifiedKey(c->db,key);
857 server.dirty++;
858 }
859
860 if (incr) /* ZINCRBY */
861 addReplyDouble(c,score);
862 else /* ZADD */
863 addReply(c,shared.czero);
864 } else {
a669d5e9
PN
865 /* Optimize: check if the element is too large or the list becomes
866 * too long *before* executing zzlInsert. */
8588bfa3 867 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
bbfe232f 868 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
100ed062 869 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
a669d5e9 870 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
100ed062 871 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
69ef89f2 872
21c5b508
PN
873 signalModifiedKey(c->db,key);
874 server.dirty++;
69ef89f2 875
21c5b508
PN
876 if (incr) /* ZINCRBY */
877 addReplyDouble(c,score);
878 else /* ZADD */
879 addReply(c,shared.cone);
880 }
100ed062 881 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
21c5b508
PN
882 zset *zs = zobj->ptr;
883 zskiplistNode *znode;
e2641e09 884 dictEntry *de;
e2641e09 885
3ca7532a 886 ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
e2641e09 887 de = dictFind(zs->dict,ele);
21c5b508
PN
888 if (de != NULL) {
889 curobj = dictGetEntryKey(de);
890 curscore = *(double*)dictGetEntryVal(de);
891
892 if (incr) {
893 score += curscore;
894 if (isnan(score)) {
895 addReplyError(c,nanerr);
896 /* Don't need to check if the sorted set is empty, because
897 * we know it has at least one element. */
898 return;
899 }
900 }
901
902 /* Remove and re-insert when score changed. We can safely delete
903 * the key object from the skiplist, since the dictionary still has
904 * a reference to it. */
905 if (score != curscore) {
906 redisAssert(zslDelete(zs->zsl,curscore,curobj));
907 znode = zslInsert(zs->zsl,score,curobj);
908 incrRefCount(curobj); /* Re-inserted in skiplist. */
909 dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
910
911 signalModifiedKey(c->db,key);
912 server.dirty++;
913 }
914
915 if (incr) /* ZINCRBY */
916 addReplyDouble(c,score);
917 else /* ZADD */
918 addReply(c,shared.czero);
919 } else {
920 znode = zslInsert(zs->zsl,score,ele);
921 incrRefCount(ele); /* Inserted in skiplist. */
922 redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
923 incrRefCount(ele); /* Added to dictionary. */
924
925 signalModifiedKey(c->db,key);
e2641e09 926 server.dirty++;
21c5b508
PN
927
928 if (incr) /* ZINCRBY */
929 addReplyDouble(c,score);
930 else /* ZADD */
931 addReply(c,shared.cone);
e2641e09 932 }
21c5b508
PN
933 } else {
934 redisPanic("Unknown sorted set encoding");
e2641e09 935 }
936}
937
938void zaddCommand(redisClient *c) {
3ca7532a 939 zaddGenericCommand(c,0);
e2641e09 940}
941
942void zincrbyCommand(redisClient *c) {
3ca7532a 943 zaddGenericCommand(c,1);
e2641e09 944}
945
946void zremCommand(redisClient *c) {
0b10e104
PN
947 robj *key = c->argv[1];
948 robj *ele = c->argv[2];
949 robj *zobj;
950
951 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
952 checkType(c,zobj,REDIS_ZSET)) return;
953
954 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
955 unsigned char *eptr;
956
8588bfa3
PN
957 if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
958 zobj->ptr = zzlDelete(zobj->ptr,eptr);
bbfe232f 959 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
0b10e104
PN
960 } else {
961 addReply(c,shared.czero);
962 return;
963 }
100ed062 964 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
0b10e104
PN
965 zset *zs = zobj->ptr;
966 dictEntry *de;
967 double score;
968
969 de = dictFind(zs->dict,ele);
970 if (de != NULL) {
971 /* Delete from the skiplist */
972 score = *(double*)dictGetEntryVal(de);
973 redisAssert(zslDelete(zs->zsl,score,ele));
974
975 /* Delete from the hash table */
976 dictDelete(zs->dict,ele);
977 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
978 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
979 } else {
980 addReply(c,shared.czero);
981 return;
982 }
983 } else {
984 redisPanic("Unknown sorted set encoding");
e2641e09 985 }
e2641e09 986
0b10e104 987 signalModifiedKey(c->db,key);
e2641e09 988 server.dirty++;
989 addReply(c,shared.cone);
990}
991
992void zremrangebyscoreCommand(redisClient *c) {
4a14dbba
PN
993 robj *key = c->argv[1];
994 robj *zobj;
91504b6c 995 zrangespec range;
4a14dbba 996 unsigned long deleted;
e2641e09 997
91504b6c 998 /* Parse the range arguments. */
7236fdb2
PN
999 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1000 addReplyError(c,"min or max is not a double");
1001 return;
1002 }
e2641e09 1003
4a14dbba
PN
1004 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1005 checkType(c,zobj,REDIS_ZSET)) return;
1006
1007 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
8588bfa3 1008 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
48991620 1009 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
100ed062 1010 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
4a14dbba
PN
1011 zset *zs = zobj->ptr;
1012 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1013 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1014 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1015 } else {
1016 redisPanic("Unknown sorted set encoding");
1017 }
e2641e09 1018
4a14dbba 1019 if (deleted) signalModifiedKey(c->db,key);
e2641e09 1020 server.dirty += deleted;
1021 addReplyLongLong(c,deleted);
1022}
1023
1024void zremrangebyrankCommand(redisClient *c) {
63b7b7fb
PN
1025 robj *key = c->argv[1];
1026 robj *zobj;
e2641e09 1027 long start;
1028 long end;
1029 int llen;
63b7b7fb 1030 unsigned long deleted;
e2641e09 1031
1032 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1033 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1034
63b7b7fb
PN
1035 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1036 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1037
63b7b7fb 1038 /* Sanitize indexes. */
df26a0ae 1039 llen = zsetLength(zobj);
e2641e09 1040 if (start < 0) start = llen+start;
1041 if (end < 0) end = llen+end;
1042 if (start < 0) start = 0;
e2641e09 1043
d0a4e24e
PN
1044 /* Invariant: start >= 0, so this test will be true when end < 0.
1045 * The range is empty when start > end or start >= length. */
e2641e09 1046 if (start > end || start >= llen) {
1047 addReply(c,shared.czero);
1048 return;
1049 }
1050 if (end >= llen) end = llen-1;
1051
63b7b7fb
PN
1052 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1053 /* Correct for 1-based rank. */
8588bfa3 1054 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
48991620 1055 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
100ed062 1056 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
63b7b7fb
PN
1057 zset *zs = zobj->ptr;
1058
1059 /* Correct for 1-based rank. */
1060 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1061 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1062 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1063 } else {
1064 redisPanic("Unknown sorted set encoding");
1065 }
1066
1067 if (deleted) signalModifiedKey(c->db,key);
e2641e09 1068 server.dirty += deleted;
63b7b7fb 1069 addReplyLongLong(c,deleted);
e2641e09 1070}
1071
1072typedef struct {
56ce42fa
PN
1073 robj *subject;
1074 int type; /* Set, sorted set */
1075 int encoding;
e2641e09 1076 double weight;
56ce42fa
PN
1077
1078 union {
1079 /* Set iterators. */
1080 union _iterset {
1081 struct {
1082 intset *is;
1083 int ii;
1084 } is;
1085 struct {
1086 dict *dict;
1087 dictIterator *di;
1088 dictEntry *de;
1089 } ht;
1090 } set;
1091
1092 /* Sorted set iterators. */
1093 union _iterzset {
1094 struct {
1095 unsigned char *zl;
1096 unsigned char *eptr, *sptr;
1097 } zl;
1098 struct {
1099 zset *zs;
1100 zskiplistNode *node;
1101 } sl;
1102 } zset;
1103 } iter;
e2641e09 1104} zsetopsrc;
1105
56ce42fa
PN
1106
1107/* Use dirty flags for pointers that need to be cleaned up in the next
1108 * iteration over the zsetopval. The dirty flag for the long long value is
1109 * special, since long long values don't need cleanup. Instead, it means that
1110 * we already checked that "ell" holds a long long, or tried to convert another
1111 * representation into a long long value. When this was successful,
1112 * OPVAL_VALID_LL is set as well. */
1113#define OPVAL_DIRTY_ROBJ 1
1114#define OPVAL_DIRTY_LL 2
1115#define OPVAL_VALID_LL 4
1116
1117/* Store value retrieved from the iterator. */
1118typedef struct {
1119 int flags;
1120 unsigned char _buf[32]; /* Private buffer. */
1121 robj *ele;
1122 unsigned char *estr;
1123 unsigned int elen;
1124 long long ell;
1125 double score;
1126} zsetopval;
1127
1128typedef union _iterset iterset;
1129typedef union _iterzset iterzset;
1130
1131void zuiInitIterator(zsetopsrc *op) {
1132 if (op->subject == NULL)
1133 return;
1134
1135 if (op->type == REDIS_SET) {
1136 iterset *it = &op->iter.set;
1137 if (op->encoding == REDIS_ENCODING_INTSET) {
1138 it->is.is = op->subject->ptr;
1139 it->is.ii = 0;
1140 } else if (op->encoding == REDIS_ENCODING_HT) {
1141 it->ht.dict = op->subject->ptr;
1142 it->ht.di = dictGetIterator(op->subject->ptr);
1143 it->ht.de = dictNext(it->ht.di);
1144 } else {
1145 redisPanic("Unknown set encoding");
1146 }
1147 } else if (op->type == REDIS_ZSET) {
1148 iterzset *it = &op->iter.zset;
1149 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1150 it->zl.zl = op->subject->ptr;
1151 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1152 if (it->zl.eptr != NULL) {
1153 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1154 redisAssert(it->zl.sptr != NULL);
1155 }
100ed062 1156 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1157 it->sl.zs = op->subject->ptr;
1158 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1159 } else {
1160 redisPanic("Unknown sorted set encoding");
1161 }
1162 } else {
1163 redisPanic("Unsupported type");
1164 }
1165}
1166
1167void zuiClearIterator(zsetopsrc *op) {
1168 if (op->subject == NULL)
1169 return;
1170
1171 if (op->type == REDIS_SET) {
1172 iterset *it = &op->iter.set;
1173 if (op->encoding == REDIS_ENCODING_INTSET) {
1174 REDIS_NOTUSED(it); /* skip */
1175 } else if (op->encoding == REDIS_ENCODING_HT) {
1176 dictReleaseIterator(it->ht.di);
1177 } else {
1178 redisPanic("Unknown set encoding");
1179 }
1180 } else if (op->type == REDIS_ZSET) {
1181 iterzset *it = &op->iter.zset;
1182 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1183 REDIS_NOTUSED(it); /* skip */
100ed062 1184 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1185 REDIS_NOTUSED(it); /* skip */
1186 } else {
1187 redisPanic("Unknown sorted set encoding");
1188 }
1189 } else {
1190 redisPanic("Unsupported type");
1191 }
1192}
1193
1194int zuiLength(zsetopsrc *op) {
1195 if (op->subject == NULL)
1196 return 0;
1197
1198 if (op->type == REDIS_SET) {
1199 iterset *it = &op->iter.set;
1200 if (op->encoding == REDIS_ENCODING_INTSET) {
1201 return intsetLen(it->is.is);
1202 } else if (op->encoding == REDIS_ENCODING_HT) {
1203 return dictSize(it->ht.dict);
1204 } else {
1205 redisPanic("Unknown set encoding");
1206 }
1207 } else if (op->type == REDIS_ZSET) {
1208 iterzset *it = &op->iter.zset;
1209 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1210 return zzlLength(it->zl.zl);
100ed062 1211 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1212 return it->sl.zs->zsl->length;
1213 } else {
1214 redisPanic("Unknown sorted set encoding");
1215 }
1216 } else {
1217 redisPanic("Unsupported type");
1218 }
1219}
1220
1221/* Check if the current value is valid. If so, store it in the passed structure
1222 * and move to the next element. If not valid, this means we have reached the
1223 * end of the structure and can abort. */
1224int zuiNext(zsetopsrc *op, zsetopval *val) {
1225 if (op->subject == NULL)
1226 return 0;
1227
1228 if (val->flags & OPVAL_DIRTY_ROBJ)
1229 decrRefCount(val->ele);
1230
1231 bzero(val,sizeof(zsetopval));
1232
1233 if (op->type == REDIS_SET) {
1234 iterset *it = &op->iter.set;
1235 if (op->encoding == REDIS_ENCODING_INTSET) {
a5dce407 1236 if (!intsetGet(it->is.is,it->is.ii,(int64_t*)&val->ell))
56ce42fa
PN
1237 return 0;
1238 val->score = 1.0;
1239
1240 /* Move to next element. */
1241 it->is.ii++;
1242 } else if (op->encoding == REDIS_ENCODING_HT) {
1243 if (it->ht.de == NULL)
1244 return 0;
1245 val->ele = dictGetEntryKey(it->ht.de);
1246 val->score = 1.0;
1247
1248 /* Move to next element. */
1249 it->ht.de = dictNext(it->ht.di);
1250 } else {
1251 redisPanic("Unknown set encoding");
1252 }
1253 } else if (op->type == REDIS_ZSET) {
1254 iterzset *it = &op->iter.zset;
1255 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1256 /* No need to check both, but better be explicit. */
1257 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1258 return 0;
1259 redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1260 val->score = zzlGetScore(it->zl.sptr);
1261
1262 /* Move to next element. */
1263 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
100ed062 1264 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1265 if (it->sl.node == NULL)
1266 return 0;
1267 val->ele = it->sl.node->obj;
1268 val->score = it->sl.node->score;
1269
1270 /* Move to next element. */
1271 it->sl.node = it->sl.node->level[0].forward;
1272 } else {
1273 redisPanic("Unknown sorted set encoding");
1274 }
1275 } else {
1276 redisPanic("Unsupported type");
1277 }
1278 return 1;
1279}
1280
1281int zuiLongLongFromValue(zsetopval *val) {
1282 if (!(val->flags & OPVAL_DIRTY_LL)) {
1283 val->flags |= OPVAL_DIRTY_LL;
1284
1285 if (val->ele != NULL) {
1286 if (val->ele->encoding == REDIS_ENCODING_INT) {
1287 val->ell = (long)val->ele->ptr;
1288 val->flags |= OPVAL_VALID_LL;
1289 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1290 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1291 val->flags |= OPVAL_VALID_LL;
1292 } else {
1293 redisPanic("Unsupported element encoding");
1294 }
1295 } else if (val->estr != NULL) {
1296 if (string2ll((char*)val->estr,val->elen,&val->ell))
1297 val->flags |= OPVAL_VALID_LL;
1298 } else {
1299 /* The long long was already set, flag as valid. */
1300 val->flags |= OPVAL_VALID_LL;
1301 }
1302 }
1303 return val->flags & OPVAL_VALID_LL;
1304}
1305
1306robj *zuiObjectFromValue(zsetopval *val) {
1307 if (val->ele == NULL) {
1308 if (val->estr != NULL) {
1309 val->ele = createStringObject((char*)val->estr,val->elen);
1310 } else {
1311 val->ele = createStringObjectFromLongLong(val->ell);
1312 }
1313 val->flags |= OPVAL_DIRTY_ROBJ;
1314 }
1315 return val->ele;
1316}
1317
1318int zuiBufferFromValue(zsetopval *val) {
1319 if (val->estr == NULL) {
1320 if (val->ele != NULL) {
1321 if (val->ele->encoding == REDIS_ENCODING_INT) {
1322 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1323 val->estr = val->_buf;
1324 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1325 val->elen = sdslen(val->ele->ptr);
1326 val->estr = val->ele->ptr;
1327 } else {
1328 redisPanic("Unsupported element encoding");
1329 }
1330 } else {
1331 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1332 val->estr = val->_buf;
1333 }
1334 }
1335 return 1;
1336}
1337
1338/* Find value pointed to by val in the source pointer to by op. When found,
1339 * return 1 and store its score in target. Return 0 otherwise. */
1340int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1341 if (op->subject == NULL)
1342 return 0;
1343
1344 if (op->type == REDIS_SET) {
1345 iterset *it = &op->iter.set;
1346
1347 if (op->encoding == REDIS_ENCODING_INTSET) {
1348 if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1349 *score = 1.0;
1350 return 1;
1351 } else {
1352 return 0;
1353 }
1354 } else if (op->encoding == REDIS_ENCODING_HT) {
1355 zuiObjectFromValue(val);
1356 if (dictFind(it->ht.dict,val->ele) != NULL) {
1357 *score = 1.0;
1358 return 1;
1359 } else {
1360 return 0;
1361 }
1362 } else {
1363 redisPanic("Unknown set encoding");
1364 }
1365 } else if (op->type == REDIS_ZSET) {
1366 iterzset *it = &op->iter.zset;
1367 zuiObjectFromValue(val);
1368
1369 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
8588bfa3 1370 if (zzlFind(it->zl.zl,val->ele,score) != NULL) {
56ce42fa
PN
1371 /* Score is already set by zzlFind. */
1372 return 1;
1373 } else {
1374 return 0;
1375 }
100ed062 1376 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1377 dictEntry *de;
1378 if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
1379 *score = *(double*)dictGetEntryVal(de);
1380 return 1;
1381 } else {
1382 return 0;
1383 }
1384 } else {
1385 redisPanic("Unknown sorted set encoding");
1386 }
1387 } else {
1388 redisPanic("Unsupported type");
1389 }
1390}
1391
1392int zuiCompareByCardinality(const void *s1, const void *s2) {
1393 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
e2641e09 1394}
1395
1396#define REDIS_AGGR_SUM 1
1397#define REDIS_AGGR_MIN 2
1398#define REDIS_AGGR_MAX 3
1399#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e))
1400
1401inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1402 if (aggregate == REDIS_AGGR_SUM) {
1403 *target = *target + val;
d9e28bcf
PN
1404 /* The result of adding two doubles is NaN when one variable
1405 * is +inf and the other is -inf. When these numbers are added,
1406 * we maintain the convention of the result being 0.0. */
1407 if (isnan(*target)) *target = 0.0;
e2641e09 1408 } else if (aggregate == REDIS_AGGR_MIN) {
1409 *target = val < *target ? val : *target;
1410 } else if (aggregate == REDIS_AGGR_MAX) {
1411 *target = val > *target ? val : *target;
1412 } else {
1413 /* safety net */
1414 redisPanic("Unknown ZUNION/INTER aggregate type");
1415 }
1416}
1417
1418void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1419 int i, j, setnum;
1420 int aggregate = REDIS_AGGR_SUM;
1421 zsetopsrc *src;
56ce42fa
PN
1422 zsetopval zval;
1423 robj *tmp;
255eebe2 1424 unsigned int maxelelen = 0;
e2641e09 1425 robj *dstobj;
1426 zset *dstzset;
69ef89f2 1427 zskiplistNode *znode;
8c1420ff 1428 int touched = 0;
e2641e09 1429
1430 /* expect setnum input keys to be given */
1431 setnum = atoi(c->argv[2]->ptr);
1432 if (setnum < 1) {
3ab20376
PN
1433 addReplyError(c,
1434 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
e2641e09 1435 return;
1436 }
1437
1438 /* test if the expected number of keys would overflow */
1439 if (3+setnum > c->argc) {
1440 addReply(c,shared.syntaxerr);
1441 return;
1442 }
1443
1444 /* read keys to be used for input */
56ce42fa 1445 src = zcalloc(sizeof(zsetopsrc) * setnum);
e2641e09 1446 for (i = 0, j = 3; i < setnum; i++, j++) {
1447 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
56ce42fa
PN
1448 if (obj != NULL) {
1449 if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
e2641e09 1450 zfree(src);
1451 addReply(c,shared.wrongtypeerr);
1452 return;
1453 }
56ce42fa
PN
1454
1455 src[i].subject = obj;
1456 src[i].type = obj->type;
1457 src[i].encoding = obj->encoding;
1458 } else {
1459 src[i].subject = NULL;
e2641e09 1460 }
1461
56ce42fa 1462 /* Default all weights to 1. */
e2641e09 1463 src[i].weight = 1.0;
1464 }
1465
1466 /* parse optional extra arguments */
1467 if (j < c->argc) {
1468 int remaining = c->argc - j;
1469
1470 while (remaining) {
1471 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1472 j++; remaining--;
1473 for (i = 0; i < setnum; i++, j++, remaining--) {
673e1fb7
PN
1474 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1475 "weight value is not a double") != REDIS_OK)
1476 {
1477 zfree(src);
e2641e09 1478 return;
673e1fb7 1479 }
e2641e09 1480 }
1481 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1482 j++; remaining--;
1483 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1484 aggregate = REDIS_AGGR_SUM;
1485 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1486 aggregate = REDIS_AGGR_MIN;
1487 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1488 aggregate = REDIS_AGGR_MAX;
1489 } else {
1490 zfree(src);
1491 addReply(c,shared.syntaxerr);
1492 return;
1493 }
1494 j++; remaining--;
1495 } else {
1496 zfree(src);
1497 addReply(c,shared.syntaxerr);
1498 return;
1499 }
1500 }
1501 }
1502
56ce42fa
PN
1503 for (i = 0; i < setnum; i++)
1504 zuiInitIterator(&src[i]);
1505
e2641e09 1506 /* sort sets from the smallest to largest, this will improve our
1507 * algorithm's performance */
56ce42fa 1508 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
e2641e09 1509
1510 dstobj = createZsetObject();
1511 dstzset = dstobj->ptr;
02e60065 1512 memset(&zval, 0, sizeof(zval));
e2641e09 1513
1514 if (op == REDIS_OP_INTER) {
56ce42fa
PN
1515 /* Skip everything if the smallest input is empty. */
1516 if (zuiLength(&src[0]) > 0) {
1517 /* Precondition: as src[0] is non-empty and the inputs are ordered
1518 * by size, all src[i > 0] are non-empty too. */
1519 while (zuiNext(&src[0],&zval)) {
d433ebc6 1520 double score, value;
e2641e09 1521
56ce42fa 1522 score = src[0].weight * zval.score;
e2641e09 1523 for (j = 1; j < setnum; j++) {
d200342a 1524 /* It is not safe to access the zset we are
cb16b6c3 1525 * iterating, so explicitly check for equal object. */
d070abe4 1526 if (src[j].subject == src[0].subject) {
1527 value = zval.score*src[j].weight;
1528 zunionInterAggregate(&score,value,aggregate);
1529 } else if (zuiFind(&src[j],&zval,&value)) {
56ce42fa 1530 value *= src[j].weight;
d433ebc6 1531 zunionInterAggregate(&score,value,aggregate);
e2641e09 1532 } else {
1533 break;
1534 }
1535 }
1536
56ce42fa 1537 /* Only continue when present in every input. */
d433ebc6 1538 if (j == setnum) {
56ce42fa
PN
1539 tmp = zuiObjectFromValue(&zval);
1540 znode = zslInsert(dstzset->zsl,score,tmp);
1541 incrRefCount(tmp); /* added to skiplist */
1542 dictAdd(dstzset->dict,tmp,&znode->score);
1543 incrRefCount(tmp); /* added to dictionary */
255eebe2
PN
1544
1545 if (tmp->encoding == REDIS_ENCODING_RAW)
1546 if (sdslen(tmp->ptr) > maxelelen)
1547 maxelelen = sdslen(tmp->ptr);
e2641e09 1548 }
1549 }
e2641e09 1550 }
1551 } else if (op == REDIS_OP_UNION) {
1552 for (i = 0; i < setnum; i++) {
521ddcce 1553 if (zuiLength(&src[i]) == 0)
56ce42fa 1554 continue;
e2641e09 1555
56ce42fa 1556 while (zuiNext(&src[i],&zval)) {
d433ebc6
PN
1557 double score, value;
1558
56ce42fa
PN
1559 /* Skip key when already processed */
1560 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
d433ebc6 1561 continue;
e2641e09 1562
56ce42fa
PN
1563 /* Initialize score */
1564 score = src[i].weight * zval.score;
e2641e09 1565
56ce42fa
PN
1566 /* Because the inputs are sorted by size, it's only possible
1567 * for sets at larger indices to hold this element. */
e2641e09 1568 for (j = (i+1); j < setnum; j++) {
d200342a 1569 /* It is not safe to access the zset we are
cb16b6c3 1570 * iterating, so explicitly check for equal object. */
1571 if(src[j].subject == src[i].subject) {
1572 value = zval.score*src[j].weight;
1573 zunionInterAggregate(&score,value,aggregate);
1574 } else if (zuiFind(&src[j],&zval,&value)) {
56ce42fa 1575 value *= src[j].weight;
d433ebc6 1576 zunionInterAggregate(&score,value,aggregate);
e2641e09 1577 }
1578 }
1579
56ce42fa
PN
1580 tmp = zuiObjectFromValue(&zval);
1581 znode = zslInsert(dstzset->zsl,score,tmp);
1582 incrRefCount(zval.ele); /* added to skiplist */
1583 dictAdd(dstzset->dict,tmp,&znode->score);
1584 incrRefCount(zval.ele); /* added to dictionary */
255eebe2
PN
1585
1586 if (tmp->encoding == REDIS_ENCODING_RAW)
1587 if (sdslen(tmp->ptr) > maxelelen)
1588 maxelelen = sdslen(tmp->ptr);
e2641e09 1589 }
e2641e09 1590 }
1591 } else {
56ce42fa 1592 redisPanic("Unknown operator");
e2641e09 1593 }
1594
56ce42fa
PN
1595 for (i = 0; i < setnum; i++)
1596 zuiClearIterator(&src[i]);
1597
8c1420ff 1598 if (dbDelete(c->db,dstkey)) {
cea8c5cd 1599 signalModifiedKey(c->db,dstkey);
8c1420ff 1600 touched = 1;
1601 server.dirty++;
1602 }
e2641e09 1603 if (dstzset->zsl->length) {
255eebe2
PN
1604 /* Convert to ziplist when in limits. */
1605 if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
1606 maxelelen <= server.zset_max_ziplist_value)
df26a0ae 1607 zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
255eebe2 1608
e2641e09 1609 dbAdd(c->db,dstkey,dstobj);
df26a0ae 1610 addReplyLongLong(c,zsetLength(dstobj));
cea8c5cd 1611 if (!touched) signalModifiedKey(c->db,dstkey);
cbf7e107 1612 server.dirty++;
e2641e09 1613 } else {
1614 decrRefCount(dstobj);
255eebe2 1615 addReply(c,shared.czero);
e2641e09 1616 }
1617 zfree(src);
1618}
1619
1620void zunionstoreCommand(redisClient *c) {
1621 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1622}
1623
1624void zinterstoreCommand(redisClient *c) {
1625 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1626}
1627
1628void zrangeGenericCommand(redisClient *c, int reverse) {
5d1b4fb6
PN
1629 robj *key = c->argv[1];
1630 robj *zobj;
1631 int withscores = 0;
e2641e09 1632 long start;
1633 long end;
e2641e09 1634 int llen;
5d1b4fb6 1635 int rangelen;
e2641e09 1636
1637 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1638 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1639
1640 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1641 withscores = 1;
1642 } else if (c->argc >= 5) {
1643 addReply(c,shared.syntaxerr);
1644 return;
1645 }
1646
5d1b4fb6
PN
1647 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1648 || checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1649
5d1b4fb6 1650 /* Sanitize indexes. */
df26a0ae 1651 llen = zsetLength(zobj);
e2641e09 1652 if (start < 0) start = llen+start;
1653 if (end < 0) end = llen+end;
1654 if (start < 0) start = 0;
e2641e09 1655
d0a4e24e
PN
1656 /* Invariant: start >= 0, so this test will be true when end < 0.
1657 * The range is empty when start > end or start >= length. */
e2641e09 1658 if (start > end || start >= llen) {
e2641e09 1659 addReply(c,shared.emptymultibulk);
1660 return;
1661 }
1662 if (end >= llen) end = llen-1;
1663 rangelen = (end-start)+1;
1664
e2641e09 1665 /* Return the result in form of a multi-bulk reply */
5d1b4fb6
PN
1666 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1667
1668 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1669 unsigned char *zl = zobj->ptr;
1670 unsigned char *eptr, *sptr;
1671 unsigned char *vstr;
1672 unsigned int vlen;
1673 long long vlong;
1674
1675 if (reverse)
1676 eptr = ziplistIndex(zl,-2-(2*start));
1677 else
1678 eptr = ziplistIndex(zl,2*start);
1679
4c5f0966
PN
1680 redisAssert(eptr != NULL);
1681 sptr = ziplistNext(zl,eptr);
1682
5d1b4fb6 1683 while (rangelen--) {
4c5f0966 1684 redisAssert(eptr != NULL && sptr != NULL);
5d1b4fb6
PN
1685 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1686 if (vstr == NULL)
1687 addReplyBulkLongLong(c,vlong);
1688 else
1689 addReplyBulkCBuffer(c,vstr,vlen);
1690
4c5f0966 1691 if (withscores)
5d1b4fb6 1692 addReplyDouble(c,zzlGetScore(sptr));
5d1b4fb6 1693
4c5f0966
PN
1694 if (reverse)
1695 zzlPrev(zl,&eptr,&sptr);
1696 else
1697 zzlNext(zl,&eptr,&sptr);
5d1b4fb6
PN
1698 }
1699
100ed062 1700 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
5d1b4fb6
PN
1701 zset *zs = zobj->ptr;
1702 zskiplist *zsl = zs->zsl;
1703 zskiplistNode *ln;
1704 robj *ele;
1705
1706 /* Check if starting point is trivial, before doing log(N) lookup. */
1707 if (reverse) {
1708 ln = zsl->tail;
1709 if (start > 0)
1710 ln = zslGetElementByRank(zsl,llen-start);
1711 } else {
1712 ln = zsl->header->level[0].forward;
1713 if (start > 0)
1714 ln = zslGetElementByRank(zsl,start+1);
1715 }
1716
1717 while(rangelen--) {
1718 redisAssert(ln != NULL);
1719 ele = ln->obj;
1720 addReplyBulk(c,ele);
1721 if (withscores)
1722 addReplyDouble(c,ln->score);
1723 ln = reverse ? ln->backward : ln->level[0].forward;
1724 }
1725 } else {
1726 redisPanic("Unknown sorted set encoding");
e2641e09 1727 }
1728}
1729
1730void zrangeCommand(redisClient *c) {
1731 zrangeGenericCommand(c,0);
1732}
1733
1734void zrevrangeCommand(redisClient *c) {
1735 zrangeGenericCommand(c,1);
1736}
1737
25bb8a44
PN
1738/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT.
1739 * If "justcount", only the number of elements in the range is returned. */
1740void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
1741 zrangespec range;
aff255c8
PN
1742 robj *key = c->argv[1];
1743 robj *emptyreply, *zobj;
e2641e09 1744 int offset = 0, limit = -1;
1745 int withscores = 0;
25bb8a44
PN
1746 unsigned long rangelen = 0;
1747 void *replylen = NULL;
22b9bf15 1748 int minidx, maxidx;
e2641e09 1749
25bb8a44 1750 /* Parse the range arguments. */
22b9bf15
PN
1751 if (reverse) {
1752 /* Range is given as [max,min] */
1753 maxidx = 2; minidx = 3;
1754 } else {
1755 /* Range is given as [min,max] */
1756 minidx = 2; maxidx = 3;
1757 }
1758
1759 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
7236fdb2
PN
1760 addReplyError(c,"min or max is not a double");
1761 return;
1762 }
25bb8a44
PN
1763
1764 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1765 * 4 arguments, so we'll never enter the following code path. */
1766 if (c->argc > 4) {
1767 int remaining = c->argc - 4;
1768 int pos = 4;
1769
1770 while (remaining) {
1771 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1772 pos++; remaining--;
1773 withscores = 1;
1774 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1775 offset = atoi(c->argv[pos+1]->ptr);
1776 limit = atoi(c->argv[pos+2]->ptr);
1777 pos += 3; remaining -= 3;
1778 } else {
1779 addReply(c,shared.syntaxerr);
1780 return;
1781 }
1782 }
e2641e09 1783 }
25bb8a44
PN
1784
1785 /* Ok, lookup the key and get the range */
1786 emptyreply = justcount ? shared.czero : shared.emptymultibulk;
aff255c8
PN
1787 if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL ||
1788 checkType(c,zobj,REDIS_ZSET)) return;
25bb8a44 1789
aff255c8
PN
1790 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1791 unsigned char *zl = zobj->ptr;
1792 unsigned char *eptr, *sptr;
1793 unsigned char *vstr;
1794 unsigned int vlen;
1795 long long vlong;
1796 double score;
e2641e09 1797
aff255c8
PN
1798 /* If reversed, get the last node in range as starting point. */
1799 if (reverse)
8588bfa3 1800 eptr = zzlLastInRange(zl,range);
aff255c8 1801 else
8588bfa3 1802 eptr = zzlFirstInRange(zl,range);
e2641e09 1803
aff255c8
PN
1804 /* No "first" element in the specified interval. */
1805 if (eptr == NULL) {
1806 addReply(c,emptyreply);
1807 return;
1808 }
e2641e09 1809
aff255c8
PN
1810 /* Get score pointer for the first element. */
1811 redisAssert(eptr != NULL);
1812 sptr = ziplistNext(zl,eptr);
e2641e09 1813
aff255c8
PN
1814 /* We don't know in advance how many matching elements there are in the
1815 * list, so we push this object that will represent the multi-bulk
1816 * length in the output buffer, and will "fix" it later */
1817 if (!justcount)
1818 replylen = addDeferredMultiBulkLength(c);
1819
1820 /* If there is an offset, just traverse the number of elements without
1821 * checking the score because that is done in the next loop. */
1822 while (eptr && offset--)
1823 if (reverse)
1824 zzlPrev(zl,&eptr,&sptr);
1825 else
1826 zzlNext(zl,&eptr,&sptr);
1827
1828 while (eptr && limit--) {
1829 score = zzlGetScore(sptr);
1830
1831 /* Abort when the node is no longer in range. */
1832 if (reverse) {
1833 if (!zslValueGteMin(score,&range)) break;
1834 } else {
1835 if (!zslValueLteMax(score,&range)) break;
1836 }
1837
1838 /* Do our magic */
1839 rangelen++;
1840 if (!justcount) {
1841 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
1842 if (vstr == NULL)
1843 addReplyBulkLongLong(c,vlong);
1844 else
1845 addReplyBulkCBuffer(c,vstr,vlen);
1846
1847 if (withscores)
1848 addReplyDouble(c,score);
1849 }
1850
1851 /* Move to next node */
1852 if (reverse)
1853 zzlPrev(zl,&eptr,&sptr);
1854 else
1855 zzlNext(zl,&eptr,&sptr);
e2641e09 1856 }
100ed062 1857 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
aff255c8
PN
1858 zset *zs = zobj->ptr;
1859 zskiplist *zsl = zs->zsl;
1860 zskiplistNode *ln;
25bb8a44 1861
aff255c8
PN
1862 /* If reversed, get the last node in range as starting point. */
1863 if (reverse)
1864 ln = zslLastInRange(zsl,range);
1865 else
1866 ln = zslFirstInRange(zsl,range);
1867
1868 /* No "first" element in the specified interval. */
1869 if (ln == NULL) {
1870 addReply(c,emptyreply);
1871 return;
25bb8a44
PN
1872 }
1873
aff255c8
PN
1874 /* We don't know in advance how many matching elements there are in the
1875 * list, so we push this object that will represent the multi-bulk
1876 * length in the output buffer, and will "fix" it later */
1877 if (!justcount)
1878 replylen = addDeferredMultiBulkLength(c);
1879
1880 /* If there is an offset, just traverse the number of elements without
1881 * checking the score because that is done in the next loop. */
1882 while (ln && offset--)
1883 ln = reverse ? ln->backward : ln->level[0].forward;
1884
1885 while (ln && limit--) {
1886 /* Abort when the node is no longer in range. */
1887 if (reverse) {
1888 if (!zslValueGteMin(ln->score,&range)) break;
1889 } else {
1890 if (!zslValueLteMax(ln->score,&range)) break;
1891 }
1892
1893 /* Do our magic */
1894 rangelen++;
1895 if (!justcount) {
1896 addReplyBulk(c,ln->obj);
1897 if (withscores)
1898 addReplyDouble(c,ln->score);
1899 }
1900
1901 /* Move to next node */
1902 ln = reverse ? ln->backward : ln->level[0].forward;
1903 }
1904 } else {
1905 redisPanic("Unknown sorted set encoding");
25bb8a44
PN
1906 }
1907
1908 if (justcount) {
1909 addReplyLongLong(c,(long)rangelen);
1910 } else {
aff255c8
PN
1911 if (withscores) rangelen *= 2;
1912 setDeferredMultiBulkLength(c,replylen,rangelen);
e2641e09 1913 }
1914}
1915
1916void zrangebyscoreCommand(redisClient *c) {
25bb8a44
PN
1917 genericZrangebyscoreCommand(c,0,0);
1918}
1919
1920void zrevrangebyscoreCommand(redisClient *c) {
1921 genericZrangebyscoreCommand(c,1,0);
e2641e09 1922}
1923
1924void zcountCommand(redisClient *c) {
25bb8a44 1925 genericZrangebyscoreCommand(c,0,1);
e2641e09 1926}
1927
1928void zcardCommand(redisClient *c) {
d1c920c5
PN
1929 robj *key = c->argv[1];
1930 robj *zobj;
e2641e09 1931
d1c920c5
PN
1932 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
1933 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1934
df26a0ae 1935 addReplyLongLong(c,zsetLength(zobj));
e2641e09 1936}
1937
1938void zscoreCommand(redisClient *c) {
d1c920c5
PN
1939 robj *key = c->argv[1];
1940 robj *zobj;
1941 double score;
e2641e09 1942
d1c920c5
PN
1943 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1944 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1945
d1c920c5 1946 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
8588bfa3 1947 if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
d1c920c5
PN
1948 addReplyDouble(c,score);
1949 else
1950 addReply(c,shared.nullbulk);
100ed062 1951 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
d1c920c5
PN
1952 zset *zs = zobj->ptr;
1953 dictEntry *de;
e2641e09 1954
d1c920c5
PN
1955 c->argv[2] = tryObjectEncoding(c->argv[2]);
1956 de = dictFind(zs->dict,c->argv[2]);
1957 if (de != NULL) {
1958 score = *(double*)dictGetEntryVal(de);
1959 addReplyDouble(c,score);
1960 } else {
1961 addReply(c,shared.nullbulk);
1962 }
1963 } else {
1964 redisPanic("Unknown sorted set encoding");
e2641e09 1965 }
1966}
1967
1968void zrankGenericCommand(redisClient *c, int reverse) {
d1c920c5
PN
1969 robj *key = c->argv[1];
1970 robj *ele = c->argv[2];
1971 robj *zobj;
1972 unsigned long llen;
e2641e09 1973 unsigned long rank;
e2641e09 1974
d1c920c5
PN
1975 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
1976 checkType(c,zobj,REDIS_ZSET)) return;
df26a0ae 1977 llen = zsetLength(zobj);
e2641e09 1978
d1c920c5
PN
1979 redisAssert(ele->encoding == REDIS_ENCODING_RAW);
1980 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1981 unsigned char *zl = zobj->ptr;
1982 unsigned char *eptr, *sptr;
e2641e09 1983
d1c920c5
PN
1984 eptr = ziplistIndex(zl,0);
1985 redisAssert(eptr != NULL);
1986 sptr = ziplistNext(zl,eptr);
1987 redisAssert(sptr != NULL);
1988
1989 rank = 1;
1990 while(eptr != NULL) {
1991 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
1992 break;
1993 rank++;
1994 zzlNext(zl,&eptr,&sptr);
1995 }
1996
1997 if (eptr != NULL) {
1998 if (reverse)
1999 addReplyLongLong(c,llen-rank);
2000 else
2001 addReplyLongLong(c,rank-1);
e2641e09 2002 } else {
d1c920c5
PN
2003 addReply(c,shared.nullbulk);
2004 }
100ed062 2005 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
d1c920c5
PN
2006 zset *zs = zobj->ptr;
2007 zskiplist *zsl = zs->zsl;
2008 dictEntry *de;
2009 double score;
2010
2011 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
2012 de = dictFind(zs->dict,ele);
2013 if (de != NULL) {
2014 score = *(double*)dictGetEntryVal(de);
2015 rank = zslGetRank(zsl,score,ele);
2016 redisAssert(rank); /* Existing elements always have a rank. */
2017 if (reverse)
2018 addReplyLongLong(c,llen-rank);
2019 else
2020 addReplyLongLong(c,rank-1);
2021 } else {
2022 addReply(c,shared.nullbulk);
e2641e09 2023 }
2024 } else {
d1c920c5 2025 redisPanic("Unknown sorted set encoding");
e2641e09 2026 }
2027}
2028
2029void zrankCommand(redisClient *c) {
2030 zrankGenericCommand(c, 0);
2031}
2032
2033void zrevrankCommand(redisClient *c) {
2034 zrankGenericCommand(c, 1);
2035}