]> git.saurik.com Git - redis.git/blob - src/t_zset.c
string to number API is now more strict not accepting spaces before or after the...
[redis.git] / src / t_zset.c
1 #include "redis.h"
2
3 #include <math.h>
4
5 /*-----------------------------------------------------------------------------
6 * Sorted set API
7 *----------------------------------------------------------------------------*/
8
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
11 * data structure.
12 *
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
16
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated values.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
25
26 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
27 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
28 zn->score = score;
29 zn->obj = obj;
30 return zn;
31 }
32
33 zskiplist *zslCreate(void) {
34 int j;
35 zskiplist *zsl;
36
37 zsl = zmalloc(sizeof(*zsl));
38 zsl->level = 1;
39 zsl->length = 0;
40 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
41 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
42 zsl->header->level[j].forward = NULL;
43 zsl->header->level[j].span = 0;
44 }
45 zsl->header->backward = NULL;
46 zsl->tail = NULL;
47 return zsl;
48 }
49
50 void zslFreeNode(zskiplistNode *node) {
51 decrRefCount(node->obj);
52 zfree(node);
53 }
54
55 void zslFree(zskiplist *zsl) {
56 zskiplistNode *node = zsl->header->level[0].forward, *next;
57
58 zfree(zsl->header);
59 while(node) {
60 next = node->level[0].forward;
61 zslFreeNode(node);
62 node = next;
63 }
64 zfree(zsl);
65 }
66
67 int zslRandomLevel(void) {
68 int level = 1;
69 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
70 level += 1;
71 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
72 }
73
74 zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
75 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
76 unsigned int rank[ZSKIPLIST_MAXLEVEL];
77 int i, level;
78
79 x = zsl->header;
80 for (i = zsl->level-1; i >= 0; i--) {
81 /* store rank that is crossed to reach the insert position */
82 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
83 while (x->level[i].forward &&
84 (x->level[i].forward->score < score ||
85 (x->level[i].forward->score == score &&
86 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
87 rank[i] += x->level[i].span;
88 x = x->level[i].forward;
89 }
90 update[i] = x;
91 }
92 /* we assume the key is not already inside, since we allow duplicated
93 * scores, and the re-insertion of score and redis object should never
94 * happpen since the caller of zslInsert() should test in the hash table
95 * if the element is already inside or not. */
96 level = zslRandomLevel();
97 if (level > zsl->level) {
98 for (i = zsl->level; i < level; i++) {
99 rank[i] = 0;
100 update[i] = zsl->header;
101 update[i]->level[i].span = zsl->length;
102 }
103 zsl->level = level;
104 }
105 x = zslCreateNode(level,score,obj);
106 for (i = 0; i < level; i++) {
107 x->level[i].forward = update[i]->level[i].forward;
108 update[i]->level[i].forward = x;
109
110 /* update span covered by update[i] as x is inserted here */
111 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
112 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
113 }
114
115 /* increment span for untouched levels */
116 for (i = level; i < zsl->level; i++) {
117 update[i]->level[i].span++;
118 }
119
120 x->backward = (update[0] == zsl->header) ? NULL : update[0];
121 if (x->level[0].forward)
122 x->level[0].forward->backward = x;
123 else
124 zsl->tail = x;
125 zsl->length++;
126 return x;
127 }
128
129 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
130 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
131 int i;
132 for (i = 0; i < zsl->level; i++) {
133 if (update[i]->level[i].forward == x) {
134 update[i]->level[i].span += x->level[i].span - 1;
135 update[i]->level[i].forward = x->level[i].forward;
136 } else {
137 update[i]->level[i].span -= 1;
138 }
139 }
140 if (x->level[0].forward) {
141 x->level[0].forward->backward = x->backward;
142 } else {
143 zsl->tail = x->backward;
144 }
145 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
146 zsl->level--;
147 zsl->length--;
148 }
149
150 /* Delete an element with matching score/object from the skiplist. */
151 int zslDelete(zskiplist *zsl, double score, robj *obj) {
152 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
153 int i;
154
155 x = zsl->header;
156 for (i = zsl->level-1; i >= 0; i--) {
157 while (x->level[i].forward &&
158 (x->level[i].forward->score < score ||
159 (x->level[i].forward->score == score &&
160 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
161 x = x->level[i].forward;
162 update[i] = x;
163 }
164 /* We may have multiple elements with the same score, what we need
165 * is to find the element with both the right score and object. */
166 x = x->level[0].forward;
167 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
168 zslDeleteNode(zsl, x, update);
169 zslFreeNode(x);
170 return 1;
171 } else {
172 return 0; /* not found */
173 }
174 return 0; /* not found */
175 }
176
177 static int zslValueGteMin(double value, zrangespec *spec) {
178 return spec->minex ? (value > spec->min) : (value >= spec->min);
179 }
180
181 static int zslValueLteMax(double value, zrangespec *spec) {
182 return spec->maxex ? (value < spec->max) : (value <= spec->max);
183 }
184
185 /* Returns if there is a part of the zset is in range. */
186 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
187 zskiplistNode *x;
188
189 /* Test for ranges that will always be empty. */
190 if (range->min > range->max ||
191 (range->min == range->max && (range->minex || range->maxex)))
192 return 0;
193 x = zsl->tail;
194 if (x == NULL || !zslValueGteMin(x->score,range))
195 return 0;
196 x = zsl->header->level[0].forward;
197 if (x == NULL || !zslValueLteMax(x->score,range))
198 return 0;
199 return 1;
200 }
201
202 /* Find the first node that is contained in the specified range.
203 * Returns NULL when no element is contained in the range. */
204 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
205 zskiplistNode *x;
206 int i;
207
208 /* If everything is out of range, return early. */
209 if (!zslIsInRange(zsl,&range)) return NULL;
210
211 x = zsl->header;
212 for (i = zsl->level-1; i >= 0; i--) {
213 /* Go forward while *OUT* of range. */
214 while (x->level[i].forward &&
215 !zslValueGteMin(x->level[i].forward->score,&range))
216 x = x->level[i].forward;
217 }
218
219 /* This is an inner range, so the next node cannot be NULL. */
220 x = x->level[0].forward;
221 redisAssert(x != NULL);
222
223 /* Check if score <= max. */
224 if (!zslValueLteMax(x->score,&range)) return NULL;
225 return x;
226 }
227
228 /* Find the last node that is contained in the specified range.
229 * Returns NULL when no element is contained in the range. */
230 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
231 zskiplistNode *x;
232 int i;
233
234 /* If everything is out of range, return early. */
235 if (!zslIsInRange(zsl,&range)) return NULL;
236
237 x = zsl->header;
238 for (i = zsl->level-1; i >= 0; i--) {
239 /* Go forward while *IN* range. */
240 while (x->level[i].forward &&
241 zslValueLteMax(x->level[i].forward->score,&range))
242 x = x->level[i].forward;
243 }
244
245 /* This is an inner range, so this node cannot be NULL. */
246 redisAssert(x != NULL);
247
248 /* Check if score >= min. */
249 if (!zslValueGteMin(x->score,&range)) return NULL;
250 return x;
251 }
252
253 /* Delete all the elements with score between min and max from the skiplist.
254 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
255 * Note that this function takes the reference to the hash table view of the
256 * sorted set, in order to remove the elements from the hash table too. */
257 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
258 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
259 unsigned long removed = 0;
260 int i;
261
262 x = zsl->header;
263 for (i = zsl->level-1; i >= 0; i--) {
264 while (x->level[i].forward && (range.minex ?
265 x->level[i].forward->score <= range.min :
266 x->level[i].forward->score < range.min))
267 x = x->level[i].forward;
268 update[i] = x;
269 }
270
271 /* Current node is the last with score < or <= min. */
272 x = x->level[0].forward;
273
274 /* Delete nodes while in range. */
275 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
276 zskiplistNode *next = x->level[0].forward;
277 zslDeleteNode(zsl,x,update);
278 dictDelete(dict,x->obj);
279 zslFreeNode(x);
280 removed++;
281 x = next;
282 }
283 return removed;
284 }
285
286 /* Delete all the elements with rank between start and end from the skiplist.
287 * Start and end are inclusive. Note that start and end need to be 1-based */
288 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
289 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
290 unsigned long traversed = 0, removed = 0;
291 int i;
292
293 x = zsl->header;
294 for (i = zsl->level-1; i >= 0; i--) {
295 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
296 traversed += x->level[i].span;
297 x = x->level[i].forward;
298 }
299 update[i] = x;
300 }
301
302 traversed++;
303 x = x->level[0].forward;
304 while (x && traversed <= end) {
305 zskiplistNode *next = x->level[0].forward;
306 zslDeleteNode(zsl,x,update);
307 dictDelete(dict,x->obj);
308 zslFreeNode(x);
309 removed++;
310 traversed++;
311 x = next;
312 }
313 return removed;
314 }
315
316 /* Find the rank for an element by both score and key.
317 * Returns 0 when the element cannot be found, rank otherwise.
318 * Note that the rank is 1-based due to the span of zsl->header to the
319 * first element. */
320 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
321 zskiplistNode *x;
322 unsigned long rank = 0;
323 int i;
324
325 x = zsl->header;
326 for (i = zsl->level-1; i >= 0; i--) {
327 while (x->level[i].forward &&
328 (x->level[i].forward->score < score ||
329 (x->level[i].forward->score == score &&
330 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
331 rank += x->level[i].span;
332 x = x->level[i].forward;
333 }
334
335 /* x might be equal to zsl->header, so test if obj is non-NULL */
336 if (x->obj && equalStringObjects(x->obj,o)) {
337 return rank;
338 }
339 }
340 return 0;
341 }
342
343 /* Finds an element by its rank. The rank argument needs to be 1-based. */
344 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
345 zskiplistNode *x;
346 unsigned long traversed = 0;
347 int i;
348
349 x = zsl->header;
350 for (i = zsl->level-1; i >= 0; i--) {
351 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
352 {
353 traversed += x->level[i].span;
354 x = x->level[i].forward;
355 }
356 if (traversed == rank) {
357 return x;
358 }
359 }
360 return NULL;
361 }
362
363 /* Populate the rangespec according to the objects min and max. */
364 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
365 char *eptr;
366 spec->minex = spec->maxex = 0;
367
368 /* Parse the min-max interval. If one of the values is prefixed
369 * by the "(" character, it's considered "open". For instance
370 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
371 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
372 if (min->encoding == REDIS_ENCODING_INT) {
373 spec->min = (long)min->ptr;
374 } else {
375 if (((char*)min->ptr)[0] == '(') {
376 spec->min = strtod((char*)min->ptr+1,&eptr);
377 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
378 spec->minex = 1;
379 } else {
380 spec->min = strtod((char*)min->ptr,&eptr);
381 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
382 }
383 }
384 if (max->encoding == REDIS_ENCODING_INT) {
385 spec->max = (long)max->ptr;
386 } else {
387 if (((char*)max->ptr)[0] == '(') {
388 spec->max = strtod((char*)max->ptr+1,&eptr);
389 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
390 spec->maxex = 1;
391 } else {
392 spec->max = strtod((char*)max->ptr,&eptr);
393 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
394 }
395 }
396
397 return REDIS_OK;
398 }
399
400 /*-----------------------------------------------------------------------------
401 * Ziplist-backed sorted set API
402 *----------------------------------------------------------------------------*/
403
404 double zzlGetScore(unsigned char *sptr) {
405 unsigned char *vstr;
406 unsigned int vlen;
407 long long vlong;
408 char buf[128];
409 double score;
410
411 redisAssert(sptr != NULL);
412 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
413
414 if (vstr) {
415 memcpy(buf,vstr,vlen);
416 buf[vlen] = '\0';
417 score = strtod(buf,NULL);
418 } else {
419 score = vlong;
420 }
421
422 return score;
423 }
424
425 /* Compare element in sorted set with given element. */
426 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
427 unsigned char *vstr;
428 unsigned int vlen;
429 long long vlong;
430 unsigned char vbuf[32];
431 int minlen, cmp;
432
433 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
434 if (vstr == NULL) {
435 /* Store string representation of long long in buf. */
436 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
437 vstr = vbuf;
438 }
439
440 minlen = (vlen < clen) ? vlen : clen;
441 cmp = memcmp(vstr,cstr,minlen);
442 if (cmp == 0) return vlen-clen;
443 return cmp;
444 }
445
446 unsigned int zzlLength(unsigned char *zl) {
447 return ziplistLen(zl)/2;
448 }
449
450 /* Move to next entry based on the values in eptr and sptr. Both are set to
451 * NULL when there is no next entry. */
452 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
453 unsigned char *_eptr, *_sptr;
454 redisAssert(*eptr != NULL && *sptr != NULL);
455
456 _eptr = ziplistNext(zl,*sptr);
457 if (_eptr != NULL) {
458 _sptr = ziplistNext(zl,_eptr);
459 redisAssert(_sptr != NULL);
460 } else {
461 /* No next entry. */
462 _sptr = NULL;
463 }
464
465 *eptr = _eptr;
466 *sptr = _sptr;
467 }
468
469 /* Move to the previous entry based on the values in eptr and sptr. Both are
470 * set to NULL when there is no next entry. */
471 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
472 unsigned char *_eptr, *_sptr;
473 redisAssert(*eptr != NULL && *sptr != NULL);
474
475 _sptr = ziplistPrev(zl,*eptr);
476 if (_sptr != NULL) {
477 _eptr = ziplistPrev(zl,_sptr);
478 redisAssert(_eptr != NULL);
479 } else {
480 /* No previous entry. */
481 _eptr = NULL;
482 }
483
484 *eptr = _eptr;
485 *sptr = _sptr;
486 }
487
488 /* Returns if there is a part of the zset is in range. Should only be used
489 * internally by zzlFirstInRange and zzlLastInRange. */
490 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
491 unsigned char *p;
492 double score;
493
494 /* Test for ranges that will always be empty. */
495 if (range->min > range->max ||
496 (range->min == range->max && (range->minex || range->maxex)))
497 return 0;
498
499 p = ziplistIndex(zl,-1); /* Last score. */
500 redisAssert(p != NULL);
501 score = zzlGetScore(p);
502 if (!zslValueGteMin(score,range))
503 return 0;
504
505 p = ziplistIndex(zl,1); /* First score. */
506 redisAssert(p != NULL);
507 score = zzlGetScore(p);
508 if (!zslValueLteMax(score,range))
509 return 0;
510
511 return 1;
512 }
513
514 /* Find pointer to the first element contained in the specified range.
515 * Returns NULL when no element is contained in the range. */
516 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
517 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
518 double score;
519
520 /* If everything is out of range, return early. */
521 if (!zzlIsInRange(zl,&range)) return NULL;
522
523 while (eptr != NULL) {
524 sptr = ziplistNext(zl,eptr);
525 redisAssert(sptr != NULL);
526
527 score = zzlGetScore(sptr);
528 if (zslValueGteMin(score,&range)) {
529 /* Check if score <= max. */
530 if (zslValueLteMax(score,&range))
531 return eptr;
532 return NULL;
533 }
534
535 /* Move to next element. */
536 eptr = ziplistNext(zl,sptr);
537 }
538
539 return NULL;
540 }
541
542 /* Find pointer to the last element contained in the specified range.
543 * Returns NULL when no element is contained in the range. */
544 unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) {
545 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
546 double score;
547
548 /* If everything is out of range, return early. */
549 if (!zzlIsInRange(zl,&range)) return NULL;
550
551 while (eptr != NULL) {
552 sptr = ziplistNext(zl,eptr);
553 redisAssert(sptr != NULL);
554
555 score = zzlGetScore(sptr);
556 if (zslValueLteMax(score,&range)) {
557 /* Check if score >= min. */
558 if (zslValueGteMin(score,&range))
559 return eptr;
560 return NULL;
561 }
562
563 /* Move to previous element by moving to the score of previous element.
564 * When this returns NULL, we know there also is no element. */
565 sptr = ziplistPrev(zl,eptr);
566 if (sptr != NULL)
567 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
568 else
569 eptr = NULL;
570 }
571
572 return NULL;
573 }
574
575 unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
576 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
577
578 ele = getDecodedObject(ele);
579 while (eptr != NULL) {
580 sptr = ziplistNext(zl,eptr);
581 redisAssertWithInfo(NULL,ele,sptr != NULL);
582
583 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
584 /* Matching element, pull out score. */
585 if (score != NULL) *score = zzlGetScore(sptr);
586 decrRefCount(ele);
587 return eptr;
588 }
589
590 /* Move to next element. */
591 eptr = ziplistNext(zl,sptr);
592 }
593
594 decrRefCount(ele);
595 return NULL;
596 }
597
598 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
599 * don't want to modify the one given as argument. */
600 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
601 unsigned char *p = eptr;
602
603 /* TODO: add function to ziplist API to delete N elements from offset. */
604 zl = ziplistDelete(zl,&p);
605 zl = ziplistDelete(zl,&p);
606 return zl;
607 }
608
609 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
610 unsigned char *sptr;
611 char scorebuf[128];
612 int scorelen;
613 size_t offset;
614
615 redisAssertWithInfo(NULL,ele,ele->encoding == REDIS_ENCODING_RAW);
616 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
617 if (eptr == NULL) {
618 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
619 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
620 } else {
621 /* Keep offset relative to zl, as it might be re-allocated. */
622 offset = eptr-zl;
623 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
624 eptr = zl+offset;
625
626 /* Insert score after the element. */
627 redisAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
628 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
629 }
630
631 return zl;
632 }
633
634 /* Insert (element,score) pair in ziplist. This function assumes the element is
635 * not yet present in the list. */
636 unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
637 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
638 double s;
639
640 ele = getDecodedObject(ele);
641 while (eptr != NULL) {
642 sptr = ziplistNext(zl,eptr);
643 redisAssertWithInfo(NULL,ele,sptr != NULL);
644 s = zzlGetScore(sptr);
645
646 if (s > score) {
647 /* First element with score larger than score for element to be
648 * inserted. This means we should take its spot in the list to
649 * maintain ordering. */
650 zl = zzlInsertAt(zl,eptr,ele,score);
651 break;
652 } else if (s == score) {
653 /* Ensure lexicographical ordering for elements. */
654 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
655 zl = zzlInsertAt(zl,eptr,ele,score);
656 break;
657 }
658 }
659
660 /* Move to next element. */
661 eptr = ziplistNext(zl,sptr);
662 }
663
664 /* Push on tail of list when it was not yet inserted. */
665 if (eptr == NULL)
666 zl = zzlInsertAt(zl,NULL,ele,score);
667
668 decrRefCount(ele);
669 return zl;
670 }
671
672 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
673 unsigned char *eptr, *sptr;
674 double score;
675 unsigned long num = 0;
676
677 if (deleted != NULL) *deleted = 0;
678
679 eptr = zzlFirstInRange(zl,range);
680 if (eptr == NULL) return zl;
681
682 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
683 * byte and ziplistNext will return NULL. */
684 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
685 score = zzlGetScore(sptr);
686 if (zslValueLteMax(score,&range)) {
687 /* Delete both the element and the score. */
688 zl = ziplistDelete(zl,&eptr);
689 zl = ziplistDelete(zl,&eptr);
690 num++;
691 } else {
692 /* No longer in range. */
693 break;
694 }
695 }
696
697 if (deleted != NULL) *deleted = num;
698 return zl;
699 }
700
701 /* Delete all the elements with rank between start and end from the skiplist.
702 * Start and end are inclusive. Note that start and end need to be 1-based */
703 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
704 unsigned int num = (end-start)+1;
705 if (deleted) *deleted = num;
706 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
707 return zl;
708 }
709
710 /*-----------------------------------------------------------------------------
711 * Common sorted set API
712 *----------------------------------------------------------------------------*/
713
714 unsigned int zsetLength(robj *zobj) {
715 int length = -1;
716 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
717 length = zzlLength(zobj->ptr);
718 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
719 length = ((zset*)zobj->ptr)->zsl->length;
720 } else {
721 redisPanic("Unknown sorted set encoding");
722 }
723 return length;
724 }
725
726 void zsetConvert(robj *zobj, int encoding) {
727 zset *zs;
728 zskiplistNode *node, *next;
729 robj *ele;
730 double score;
731
732 if (zobj->encoding == encoding) return;
733 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
734 unsigned char *zl = zobj->ptr;
735 unsigned char *eptr, *sptr;
736 unsigned char *vstr;
737 unsigned int vlen;
738 long long vlong;
739
740 if (encoding != REDIS_ENCODING_SKIPLIST)
741 redisPanic("Unknown target encoding");
742
743 zs = zmalloc(sizeof(*zs));
744 zs->dict = dictCreate(&zsetDictType,NULL);
745 zs->zsl = zslCreate();
746
747 eptr = ziplistIndex(zl,0);
748 redisAssertWithInfo(NULL,zobj,eptr != NULL);
749 sptr = ziplistNext(zl,eptr);
750 redisAssertWithInfo(NULL,zobj,sptr != NULL);
751
752 while (eptr != NULL) {
753 score = zzlGetScore(sptr);
754 redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
755 if (vstr == NULL)
756 ele = createStringObjectFromLongLong(vlong);
757 else
758 ele = createStringObject((char*)vstr,vlen);
759
760 /* Has incremented refcount since it was just created. */
761 node = zslInsert(zs->zsl,score,ele);
762 redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
763 incrRefCount(ele); /* Added to dictionary. */
764 zzlNext(zl,&eptr,&sptr);
765 }
766
767 zfree(zobj->ptr);
768 zobj->ptr = zs;
769 zobj->encoding = REDIS_ENCODING_SKIPLIST;
770 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
771 unsigned char *zl = ziplistNew();
772
773 if (encoding != REDIS_ENCODING_ZIPLIST)
774 redisPanic("Unknown target encoding");
775
776 /* Approach similar to zslFree(), since we want to free the skiplist at
777 * the same time as creating the ziplist. */
778 zs = zobj->ptr;
779 dictRelease(zs->dict);
780 node = zs->zsl->header->level[0].forward;
781 zfree(zs->zsl->header);
782 zfree(zs->zsl);
783
784 while (node) {
785 ele = getDecodedObject(node->obj);
786 zl = zzlInsertAt(zl,NULL,ele,node->score);
787 decrRefCount(ele);
788
789 next = node->level[0].forward;
790 zslFreeNode(node);
791 node = next;
792 }
793
794 zfree(zs);
795 zobj->ptr = zl;
796 zobj->encoding = REDIS_ENCODING_ZIPLIST;
797 } else {
798 redisPanic("Unknown sorted set encoding");
799 }
800 }
801
802 /*-----------------------------------------------------------------------------
803 * Sorted set commands
804 *----------------------------------------------------------------------------*/
805
806 /* This generic command implements both ZADD and ZINCRBY. */
807 void zaddGenericCommand(redisClient *c, int incr) {
808 static char *nanerr = "resulting score is not a number (NaN)";
809 robj *key = c->argv[1];
810 robj *ele;
811 robj *zobj;
812 robj *curobj;
813 double score = 0, *scores, curscore = 0.0;
814 int j, elements = (c->argc-2)/2;
815 int added = 0;
816
817 if (c->argc % 2) {
818 addReply(c,shared.syntaxerr);
819 return;
820 }
821
822 /* Start parsing all the scores, we need to emit any syntax error
823 * before executing additions to the sorted set, as the command should
824 * either execute fully or nothing at all. */
825 scores = zmalloc(sizeof(double)*elements);
826 for (j = 0; j < elements; j++) {
827 if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
828 != REDIS_OK)
829 {
830 zfree(scores);
831 return;
832 }
833 }
834
835 /* Lookup the key and create the sorted set if does not exist. */
836 zobj = lookupKeyWrite(c->db,key);
837 if (zobj == NULL) {
838 if (server.zset_max_ziplist_entries == 0 ||
839 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
840 {
841 zobj = createZsetObject();
842 } else {
843 zobj = createZsetZiplistObject();
844 }
845 dbAdd(c->db,key,zobj);
846 } else {
847 if (zobj->type != REDIS_ZSET) {
848 addReply(c,shared.wrongtypeerr);
849 zfree(scores);
850 return;
851 }
852 }
853
854 for (j = 0; j < elements; j++) {
855 score = scores[j];
856
857 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
858 unsigned char *eptr;
859
860 /* Prefer non-encoded element when dealing with ziplists. */
861 ele = c->argv[3+j*2];
862 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
863 if (incr) {
864 score += curscore;
865 if (isnan(score)) {
866 addReplyError(c,nanerr);
867 /* Don't need to check if the sorted set is empty
868 * because we know it has at least one element. */
869 zfree(scores);
870 return;
871 }
872 }
873
874 /* Remove and re-insert when score changed. */
875 if (score != curscore) {
876 zobj->ptr = zzlDelete(zobj->ptr,eptr);
877 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
878
879 signalModifiedKey(c->db,key);
880 server.dirty++;
881 }
882 } else {
883 /* Optimize: check if the element is too large or the list
884 * becomes too long *before* executing zzlInsert. */
885 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
886 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
887 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
888 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
889 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
890
891 signalModifiedKey(c->db,key);
892 server.dirty++;
893 if (!incr) added++;
894 }
895 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
896 zset *zs = zobj->ptr;
897 zskiplistNode *znode;
898 dictEntry *de;
899
900 ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
901 de = dictFind(zs->dict,ele);
902 if (de != NULL) {
903 curobj = dictGetKey(de);
904 curscore = *(double*)dictGetVal(de);
905
906 if (incr) {
907 score += curscore;
908 if (isnan(score)) {
909 addReplyError(c,nanerr);
910 /* Don't need to check if the sorted set is empty
911 * because we know it has at least one element. */
912 zfree(scores);
913 return;
914 }
915 }
916
917 /* Remove and re-insert when score changed. We can safely
918 * delete the key object from the skiplist, since the
919 * dictionary still has a reference to it. */
920 if (score != curscore) {
921 redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
922 znode = zslInsert(zs->zsl,score,curobj);
923 incrRefCount(curobj); /* Re-inserted in skiplist. */
924 dictGetVal(de) = &znode->score; /* Update score ptr. */
925
926 signalModifiedKey(c->db,key);
927 server.dirty++;
928 }
929 } else {
930 znode = zslInsert(zs->zsl,score,ele);
931 incrRefCount(ele); /* Inserted in skiplist. */
932 redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
933 incrRefCount(ele); /* Added to dictionary. */
934
935 signalModifiedKey(c->db,key);
936 server.dirty++;
937 if (!incr) added++;
938 }
939 } else {
940 redisPanic("Unknown sorted set encoding");
941 }
942 }
943 zfree(scores);
944 if (incr) /* ZINCRBY */
945 addReplyDouble(c,score);
946 else /* ZADD */
947 addReplyLongLong(c,added);
948 }
949
950 void zaddCommand(redisClient *c) {
951 zaddGenericCommand(c,0);
952 }
953
954 void zincrbyCommand(redisClient *c) {
955 zaddGenericCommand(c,1);
956 }
957
958 void zremCommand(redisClient *c) {
959 robj *key = c->argv[1];
960 robj *zobj;
961 int deleted = 0, j;
962
963 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
964 checkType(c,zobj,REDIS_ZSET)) return;
965
966 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
967 unsigned char *eptr;
968
969 for (j = 2; j < c->argc; j++) {
970 if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
971 deleted++;
972 zobj->ptr = zzlDelete(zobj->ptr,eptr);
973 if (zzlLength(zobj->ptr) == 0) {
974 dbDelete(c->db,key);
975 break;
976 }
977 }
978 }
979 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
980 zset *zs = zobj->ptr;
981 dictEntry *de;
982 double score;
983
984 for (j = 2; j < c->argc; j++) {
985 de = dictFind(zs->dict,c->argv[j]);
986 if (de != NULL) {
987 deleted++;
988
989 /* Delete from the skiplist */
990 score = *(double*)dictGetVal(de);
991 redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
992
993 /* Delete from the hash table */
994 dictDelete(zs->dict,c->argv[j]);
995 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
996 if (dictSize(zs->dict) == 0) {
997 dbDelete(c->db,key);
998 break;
999 }
1000 }
1001 }
1002 } else {
1003 redisPanic("Unknown sorted set encoding");
1004 }
1005
1006 if (deleted) {
1007 signalModifiedKey(c->db,key);
1008 server.dirty += deleted;
1009 }
1010 addReplyLongLong(c,deleted);
1011 }
1012
1013 void zremrangebyscoreCommand(redisClient *c) {
1014 robj *key = c->argv[1];
1015 robj *zobj;
1016 zrangespec range;
1017 unsigned long deleted;
1018
1019 /* Parse the range arguments. */
1020 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1021 addReplyError(c,"min or max is not a float");
1022 return;
1023 }
1024
1025 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1026 checkType(c,zobj,REDIS_ZSET)) return;
1027
1028 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1029 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
1030 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
1031 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1032 zset *zs = zobj->ptr;
1033 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1034 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1035 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1036 } else {
1037 redisPanic("Unknown sorted set encoding");
1038 }
1039
1040 if (deleted) signalModifiedKey(c->db,key);
1041 server.dirty += deleted;
1042 addReplyLongLong(c,deleted);
1043 }
1044
1045 void zremrangebyrankCommand(redisClient *c) {
1046 robj *key = c->argv[1];
1047 robj *zobj;
1048 long start;
1049 long end;
1050 int llen;
1051 unsigned long deleted;
1052
1053 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1054 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1055
1056 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1057 checkType(c,zobj,REDIS_ZSET)) return;
1058
1059 /* Sanitize indexes. */
1060 llen = zsetLength(zobj);
1061 if (start < 0) start = llen+start;
1062 if (end < 0) end = llen+end;
1063 if (start < 0) start = 0;
1064
1065 /* Invariant: start >= 0, so this test will be true when end < 0.
1066 * The range is empty when start > end or start >= length. */
1067 if (start > end || start >= llen) {
1068 addReply(c,shared.czero);
1069 return;
1070 }
1071 if (end >= llen) end = llen-1;
1072
1073 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1074 /* Correct for 1-based rank. */
1075 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1076 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
1077 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1078 zset *zs = zobj->ptr;
1079
1080 /* Correct for 1-based rank. */
1081 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1082 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1083 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1084 } else {
1085 redisPanic("Unknown sorted set encoding");
1086 }
1087
1088 if (deleted) signalModifiedKey(c->db,key);
1089 server.dirty += deleted;
1090 addReplyLongLong(c,deleted);
1091 }
1092
1093 typedef struct {
1094 robj *subject;
1095 int type; /* Set, sorted set */
1096 int encoding;
1097 double weight;
1098
1099 union {
1100 /* Set iterators. */
1101 union _iterset {
1102 struct {
1103 intset *is;
1104 int ii;
1105 } is;
1106 struct {
1107 dict *dict;
1108 dictIterator *di;
1109 dictEntry *de;
1110 } ht;
1111 } set;
1112
1113 /* Sorted set iterators. */
1114 union _iterzset {
1115 struct {
1116 unsigned char *zl;
1117 unsigned char *eptr, *sptr;
1118 } zl;
1119 struct {
1120 zset *zs;
1121 zskiplistNode *node;
1122 } sl;
1123 } zset;
1124 } iter;
1125 } zsetopsrc;
1126
1127
1128 /* Use dirty flags for pointers that need to be cleaned up in the next
1129 * iteration over the zsetopval. The dirty flag for the long long value is
1130 * special, since long long values don't need cleanup. Instead, it means that
1131 * we already checked that "ell" holds a long long, or tried to convert another
1132 * representation into a long long value. When this was successful,
1133 * OPVAL_VALID_LL is set as well. */
1134 #define OPVAL_DIRTY_ROBJ 1
1135 #define OPVAL_DIRTY_LL 2
1136 #define OPVAL_VALID_LL 4
1137
1138 /* Store value retrieved from the iterator. */
1139 typedef struct {
1140 int flags;
1141 unsigned char _buf[32]; /* Private buffer. */
1142 robj *ele;
1143 unsigned char *estr;
1144 unsigned int elen;
1145 long long ell;
1146 double score;
1147 } zsetopval;
1148
1149 typedef union _iterset iterset;
1150 typedef union _iterzset iterzset;
1151
1152 void zuiInitIterator(zsetopsrc *op) {
1153 if (op->subject == NULL)
1154 return;
1155
1156 if (op->type == REDIS_SET) {
1157 iterset *it = &op->iter.set;
1158 if (op->encoding == REDIS_ENCODING_INTSET) {
1159 it->is.is = op->subject->ptr;
1160 it->is.ii = 0;
1161 } else if (op->encoding == REDIS_ENCODING_HT) {
1162 it->ht.dict = op->subject->ptr;
1163 it->ht.di = dictGetIterator(op->subject->ptr);
1164 it->ht.de = dictNext(it->ht.di);
1165 } else {
1166 redisPanic("Unknown set encoding");
1167 }
1168 } else if (op->type == REDIS_ZSET) {
1169 iterzset *it = &op->iter.zset;
1170 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1171 it->zl.zl = op->subject->ptr;
1172 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1173 if (it->zl.eptr != NULL) {
1174 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1175 redisAssert(it->zl.sptr != NULL);
1176 }
1177 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1178 it->sl.zs = op->subject->ptr;
1179 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1180 } else {
1181 redisPanic("Unknown sorted set encoding");
1182 }
1183 } else {
1184 redisPanic("Unsupported type");
1185 }
1186 }
1187
1188 void zuiClearIterator(zsetopsrc *op) {
1189 if (op->subject == NULL)
1190 return;
1191
1192 if (op->type == REDIS_SET) {
1193 iterset *it = &op->iter.set;
1194 if (op->encoding == REDIS_ENCODING_INTSET) {
1195 REDIS_NOTUSED(it); /* skip */
1196 } else if (op->encoding == REDIS_ENCODING_HT) {
1197 dictReleaseIterator(it->ht.di);
1198 } else {
1199 redisPanic("Unknown set encoding");
1200 }
1201 } else if (op->type == REDIS_ZSET) {
1202 iterzset *it = &op->iter.zset;
1203 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1204 REDIS_NOTUSED(it); /* skip */
1205 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1206 REDIS_NOTUSED(it); /* skip */
1207 } else {
1208 redisPanic("Unknown sorted set encoding");
1209 }
1210 } else {
1211 redisPanic("Unsupported type");
1212 }
1213 }
1214
1215 int zuiLength(zsetopsrc *op) {
1216 if (op->subject == NULL)
1217 return 0;
1218
1219 if (op->type == REDIS_SET) {
1220 iterset *it = &op->iter.set;
1221 if (op->encoding == REDIS_ENCODING_INTSET) {
1222 return intsetLen(it->is.is);
1223 } else if (op->encoding == REDIS_ENCODING_HT) {
1224 return dictSize(it->ht.dict);
1225 } else {
1226 redisPanic("Unknown set encoding");
1227 }
1228 } else if (op->type == REDIS_ZSET) {
1229 iterzset *it = &op->iter.zset;
1230 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1231 return zzlLength(it->zl.zl);
1232 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1233 return it->sl.zs->zsl->length;
1234 } else {
1235 redisPanic("Unknown sorted set encoding");
1236 }
1237 } else {
1238 redisPanic("Unsupported type");
1239 }
1240 }
1241
1242 /* Check if the current value is valid. If so, store it in the passed structure
1243 * and move to the next element. If not valid, this means we have reached the
1244 * end of the structure and can abort. */
1245 int zuiNext(zsetopsrc *op, zsetopval *val) {
1246 if (op->subject == NULL)
1247 return 0;
1248
1249 if (val->flags & OPVAL_DIRTY_ROBJ)
1250 decrRefCount(val->ele);
1251
1252 bzero(val,sizeof(zsetopval));
1253
1254 if (op->type == REDIS_SET) {
1255 iterset *it = &op->iter.set;
1256 if (op->encoding == REDIS_ENCODING_INTSET) {
1257 if (!intsetGet(it->is.is,it->is.ii,(int64_t*)&val->ell))
1258 return 0;
1259 val->score = 1.0;
1260
1261 /* Move to next element. */
1262 it->is.ii++;
1263 } else if (op->encoding == REDIS_ENCODING_HT) {
1264 if (it->ht.de == NULL)
1265 return 0;
1266 val->ele = dictGetKey(it->ht.de);
1267 val->score = 1.0;
1268
1269 /* Move to next element. */
1270 it->ht.de = dictNext(it->ht.di);
1271 } else {
1272 redisPanic("Unknown set encoding");
1273 }
1274 } else if (op->type == REDIS_ZSET) {
1275 iterzset *it = &op->iter.zset;
1276 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1277 /* No need to check both, but better be explicit. */
1278 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1279 return 0;
1280 redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1281 val->score = zzlGetScore(it->zl.sptr);
1282
1283 /* Move to next element. */
1284 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
1285 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1286 if (it->sl.node == NULL)
1287 return 0;
1288 val->ele = it->sl.node->obj;
1289 val->score = it->sl.node->score;
1290
1291 /* Move to next element. */
1292 it->sl.node = it->sl.node->level[0].forward;
1293 } else {
1294 redisPanic("Unknown sorted set encoding");
1295 }
1296 } else {
1297 redisPanic("Unsupported type");
1298 }
1299 return 1;
1300 }
1301
1302 int zuiLongLongFromValue(zsetopval *val) {
1303 if (!(val->flags & OPVAL_DIRTY_LL)) {
1304 val->flags |= OPVAL_DIRTY_LL;
1305
1306 if (val->ele != NULL) {
1307 if (val->ele->encoding == REDIS_ENCODING_INT) {
1308 val->ell = (long)val->ele->ptr;
1309 val->flags |= OPVAL_VALID_LL;
1310 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1311 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1312 val->flags |= OPVAL_VALID_LL;
1313 } else {
1314 redisPanic("Unsupported element encoding");
1315 }
1316 } else if (val->estr != NULL) {
1317 if (string2ll((char*)val->estr,val->elen,&val->ell))
1318 val->flags |= OPVAL_VALID_LL;
1319 } else {
1320 /* The long long was already set, flag as valid. */
1321 val->flags |= OPVAL_VALID_LL;
1322 }
1323 }
1324 return val->flags & OPVAL_VALID_LL;
1325 }
1326
1327 robj *zuiObjectFromValue(zsetopval *val) {
1328 if (val->ele == NULL) {
1329 if (val->estr != NULL) {
1330 val->ele = createStringObject((char*)val->estr,val->elen);
1331 } else {
1332 val->ele = createStringObjectFromLongLong(val->ell);
1333 }
1334 val->flags |= OPVAL_DIRTY_ROBJ;
1335 }
1336 return val->ele;
1337 }
1338
1339 int zuiBufferFromValue(zsetopval *val) {
1340 if (val->estr == NULL) {
1341 if (val->ele != NULL) {
1342 if (val->ele->encoding == REDIS_ENCODING_INT) {
1343 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1344 val->estr = val->_buf;
1345 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1346 val->elen = sdslen(val->ele->ptr);
1347 val->estr = val->ele->ptr;
1348 } else {
1349 redisPanic("Unsupported element encoding");
1350 }
1351 } else {
1352 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1353 val->estr = val->_buf;
1354 }
1355 }
1356 return 1;
1357 }
1358
1359 /* Find value pointed to by val in the source pointer to by op. When found,
1360 * return 1 and store its score in target. Return 0 otherwise. */
1361 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1362 if (op->subject == NULL)
1363 return 0;
1364
1365 if (op->type == REDIS_SET) {
1366 iterset *it = &op->iter.set;
1367
1368 if (op->encoding == REDIS_ENCODING_INTSET) {
1369 if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1370 *score = 1.0;
1371 return 1;
1372 } else {
1373 return 0;
1374 }
1375 } else if (op->encoding == REDIS_ENCODING_HT) {
1376 zuiObjectFromValue(val);
1377 if (dictFind(it->ht.dict,val->ele) != NULL) {
1378 *score = 1.0;
1379 return 1;
1380 } else {
1381 return 0;
1382 }
1383 } else {
1384 redisPanic("Unknown set encoding");
1385 }
1386 } else if (op->type == REDIS_ZSET) {
1387 iterzset *it = &op->iter.zset;
1388 zuiObjectFromValue(val);
1389
1390 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1391 if (zzlFind(it->zl.zl,val->ele,score) != NULL) {
1392 /* Score is already set by zzlFind. */
1393 return 1;
1394 } else {
1395 return 0;
1396 }
1397 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1398 dictEntry *de;
1399 if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
1400 *score = *(double*)dictGetVal(de);
1401 return 1;
1402 } else {
1403 return 0;
1404 }
1405 } else {
1406 redisPanic("Unknown sorted set encoding");
1407 }
1408 } else {
1409 redisPanic("Unsupported type");
1410 }
1411 }
1412
1413 int zuiCompareByCardinality(const void *s1, const void *s2) {
1414 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
1415 }
1416
1417 #define REDIS_AGGR_SUM 1
1418 #define REDIS_AGGR_MIN 2
1419 #define REDIS_AGGR_MAX 3
1420 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
1421
1422 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1423 if (aggregate == REDIS_AGGR_SUM) {
1424 *target = *target + val;
1425 /* The result of adding two doubles is NaN when one variable
1426 * is +inf and the other is -inf. When these numbers are added,
1427 * we maintain the convention of the result being 0.0. */
1428 if (isnan(*target)) *target = 0.0;
1429 } else if (aggregate == REDIS_AGGR_MIN) {
1430 *target = val < *target ? val : *target;
1431 } else if (aggregate == REDIS_AGGR_MAX) {
1432 *target = val > *target ? val : *target;
1433 } else {
1434 /* safety net */
1435 redisPanic("Unknown ZUNION/INTER aggregate type");
1436 }
1437 }
1438
1439 void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1440 int i, j, setnum;
1441 int aggregate = REDIS_AGGR_SUM;
1442 zsetopsrc *src;
1443 zsetopval zval;
1444 robj *tmp;
1445 unsigned int maxelelen = 0;
1446 robj *dstobj;
1447 zset *dstzset;
1448 zskiplistNode *znode;
1449 int touched = 0;
1450
1451 /* expect setnum input keys to be given */
1452 setnum = atoi(c->argv[2]->ptr);
1453 if (setnum < 1) {
1454 addReplyError(c,
1455 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1456 return;
1457 }
1458
1459 /* test if the expected number of keys would overflow */
1460 if (3+setnum > c->argc) {
1461 addReply(c,shared.syntaxerr);
1462 return;
1463 }
1464
1465 /* read keys to be used for input */
1466 src = zcalloc(sizeof(zsetopsrc) * setnum);
1467 for (i = 0, j = 3; i < setnum; i++, j++) {
1468 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1469 if (obj != NULL) {
1470 if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
1471 zfree(src);
1472 addReply(c,shared.wrongtypeerr);
1473 return;
1474 }
1475
1476 src[i].subject = obj;
1477 src[i].type = obj->type;
1478 src[i].encoding = obj->encoding;
1479 } else {
1480 src[i].subject = NULL;
1481 }
1482
1483 /* Default all weights to 1. */
1484 src[i].weight = 1.0;
1485 }
1486
1487 /* parse optional extra arguments */
1488 if (j < c->argc) {
1489 int remaining = c->argc - j;
1490
1491 while (remaining) {
1492 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1493 j++; remaining--;
1494 for (i = 0; i < setnum; i++, j++, remaining--) {
1495 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1496 "weight value is not a float") != REDIS_OK)
1497 {
1498 zfree(src);
1499 return;
1500 }
1501 }
1502 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1503 j++; remaining--;
1504 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1505 aggregate = REDIS_AGGR_SUM;
1506 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1507 aggregate = REDIS_AGGR_MIN;
1508 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1509 aggregate = REDIS_AGGR_MAX;
1510 } else {
1511 zfree(src);
1512 addReply(c,shared.syntaxerr);
1513 return;
1514 }
1515 j++; remaining--;
1516 } else {
1517 zfree(src);
1518 addReply(c,shared.syntaxerr);
1519 return;
1520 }
1521 }
1522 }
1523
1524 for (i = 0; i < setnum; i++)
1525 zuiInitIterator(&src[i]);
1526
1527 /* sort sets from the smallest to largest, this will improve our
1528 * algorithm's performance */
1529 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
1530
1531 dstobj = createZsetObject();
1532 dstzset = dstobj->ptr;
1533 memset(&zval, 0, sizeof(zval));
1534
1535 if (op == REDIS_OP_INTER) {
1536 /* Skip everything if the smallest input is empty. */
1537 if (zuiLength(&src[0]) > 0) {
1538 /* Precondition: as src[0] is non-empty and the inputs are ordered
1539 * by size, all src[i > 0] are non-empty too. */
1540 while (zuiNext(&src[0],&zval)) {
1541 double score, value;
1542
1543 score = src[0].weight * zval.score;
1544 for (j = 1; j < setnum; j++) {
1545 /* It is not safe to access the zset we are
1546 * iterating, so explicitly check for equal object. */
1547 if (src[j].subject == src[0].subject) {
1548 value = zval.score*src[j].weight;
1549 zunionInterAggregate(&score,value,aggregate);
1550 } else if (zuiFind(&src[j],&zval,&value)) {
1551 value *= src[j].weight;
1552 zunionInterAggregate(&score,value,aggregate);
1553 } else {
1554 break;
1555 }
1556 }
1557
1558 /* Only continue when present in every input. */
1559 if (j == setnum) {
1560 tmp = zuiObjectFromValue(&zval);
1561 znode = zslInsert(dstzset->zsl,score,tmp);
1562 incrRefCount(tmp); /* added to skiplist */
1563 dictAdd(dstzset->dict,tmp,&znode->score);
1564 incrRefCount(tmp); /* added to dictionary */
1565
1566 if (tmp->encoding == REDIS_ENCODING_RAW)
1567 if (sdslen(tmp->ptr) > maxelelen)
1568 maxelelen = sdslen(tmp->ptr);
1569 }
1570 }
1571 }
1572 } else if (op == REDIS_OP_UNION) {
1573 for (i = 0; i < setnum; i++) {
1574 if (zuiLength(&src[i]) == 0)
1575 continue;
1576
1577 while (zuiNext(&src[i],&zval)) {
1578 double score, value;
1579
1580 /* Skip key when already processed */
1581 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
1582 continue;
1583
1584 /* Initialize score */
1585 score = src[i].weight * zval.score;
1586
1587 /* Because the inputs are sorted by size, it's only possible
1588 * for sets at larger indices to hold this element. */
1589 for (j = (i+1); j < setnum; j++) {
1590 /* It is not safe to access the zset we are
1591 * iterating, so explicitly check for equal object. */
1592 if(src[j].subject == src[i].subject) {
1593 value = zval.score*src[j].weight;
1594 zunionInterAggregate(&score,value,aggregate);
1595 } else if (zuiFind(&src[j],&zval,&value)) {
1596 value *= src[j].weight;
1597 zunionInterAggregate(&score,value,aggregate);
1598 }
1599 }
1600
1601 tmp = zuiObjectFromValue(&zval);
1602 znode = zslInsert(dstzset->zsl,score,tmp);
1603 incrRefCount(zval.ele); /* added to skiplist */
1604 dictAdd(dstzset->dict,tmp,&znode->score);
1605 incrRefCount(zval.ele); /* added to dictionary */
1606
1607 if (tmp->encoding == REDIS_ENCODING_RAW)
1608 if (sdslen(tmp->ptr) > maxelelen)
1609 maxelelen = sdslen(tmp->ptr);
1610 }
1611 }
1612 } else {
1613 redisPanic("Unknown operator");
1614 }
1615
1616 for (i = 0; i < setnum; i++)
1617 zuiClearIterator(&src[i]);
1618
1619 if (dbDelete(c->db,dstkey)) {
1620 signalModifiedKey(c->db,dstkey);
1621 touched = 1;
1622 server.dirty++;
1623 }
1624 if (dstzset->zsl->length) {
1625 /* Convert to ziplist when in limits. */
1626 if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
1627 maxelelen <= server.zset_max_ziplist_value)
1628 zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
1629
1630 dbAdd(c->db,dstkey,dstobj);
1631 addReplyLongLong(c,zsetLength(dstobj));
1632 if (!touched) signalModifiedKey(c->db,dstkey);
1633 server.dirty++;
1634 } else {
1635 decrRefCount(dstobj);
1636 addReply(c,shared.czero);
1637 }
1638 zfree(src);
1639 }
1640
1641 void zunionstoreCommand(redisClient *c) {
1642 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1643 }
1644
1645 void zinterstoreCommand(redisClient *c) {
1646 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1647 }
1648
1649 void zrangeGenericCommand(redisClient *c, int reverse) {
1650 robj *key = c->argv[1];
1651 robj *zobj;
1652 int withscores = 0;
1653 long start;
1654 long end;
1655 int llen;
1656 int rangelen;
1657
1658 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1659 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1660
1661 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1662 withscores = 1;
1663 } else if (c->argc >= 5) {
1664 addReply(c,shared.syntaxerr);
1665 return;
1666 }
1667
1668 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1669 || checkType(c,zobj,REDIS_ZSET)) return;
1670
1671 /* Sanitize indexes. */
1672 llen = zsetLength(zobj);
1673 if (start < 0) start = llen+start;
1674 if (end < 0) end = llen+end;
1675 if (start < 0) start = 0;
1676
1677 /* Invariant: start >= 0, so this test will be true when end < 0.
1678 * The range is empty when start > end or start >= length. */
1679 if (start > end || start >= llen) {
1680 addReply(c,shared.emptymultibulk);
1681 return;
1682 }
1683 if (end >= llen) end = llen-1;
1684 rangelen = (end-start)+1;
1685
1686 /* Return the result in form of a multi-bulk reply */
1687 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1688
1689 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1690 unsigned char *zl = zobj->ptr;
1691 unsigned char *eptr, *sptr;
1692 unsigned char *vstr;
1693 unsigned int vlen;
1694 long long vlong;
1695
1696 if (reverse)
1697 eptr = ziplistIndex(zl,-2-(2*start));
1698 else
1699 eptr = ziplistIndex(zl,2*start);
1700
1701 redisAssertWithInfo(c,zobj,eptr != NULL);
1702 sptr = ziplistNext(zl,eptr);
1703
1704 while (rangelen--) {
1705 redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
1706 redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
1707 if (vstr == NULL)
1708 addReplyBulkLongLong(c,vlong);
1709 else
1710 addReplyBulkCBuffer(c,vstr,vlen);
1711
1712 if (withscores)
1713 addReplyDouble(c,zzlGetScore(sptr));
1714
1715 if (reverse)
1716 zzlPrev(zl,&eptr,&sptr);
1717 else
1718 zzlNext(zl,&eptr,&sptr);
1719 }
1720
1721 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1722 zset *zs = zobj->ptr;
1723 zskiplist *zsl = zs->zsl;
1724 zskiplistNode *ln;
1725 robj *ele;
1726
1727 /* Check if starting point is trivial, before doing log(N) lookup. */
1728 if (reverse) {
1729 ln = zsl->tail;
1730 if (start > 0)
1731 ln = zslGetElementByRank(zsl,llen-start);
1732 } else {
1733 ln = zsl->header->level[0].forward;
1734 if (start > 0)
1735 ln = zslGetElementByRank(zsl,start+1);
1736 }
1737
1738 while(rangelen--) {
1739 redisAssertWithInfo(c,zobj,ln != NULL);
1740 ele = ln->obj;
1741 addReplyBulk(c,ele);
1742 if (withscores)
1743 addReplyDouble(c,ln->score);
1744 ln = reverse ? ln->backward : ln->level[0].forward;
1745 }
1746 } else {
1747 redisPanic("Unknown sorted set encoding");
1748 }
1749 }
1750
1751 void zrangeCommand(redisClient *c) {
1752 zrangeGenericCommand(c,0);
1753 }
1754
1755 void zrevrangeCommand(redisClient *c) {
1756 zrangeGenericCommand(c,1);
1757 }
1758
1759 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
1760 void genericZrangebyscoreCommand(redisClient *c, int reverse) {
1761 zrangespec range;
1762 robj *key = c->argv[1];
1763 robj *zobj;
1764 int offset = 0, limit = -1;
1765 int withscores = 0;
1766 unsigned long rangelen = 0;
1767 void *replylen = NULL;
1768 int minidx, maxidx;
1769
1770 /* Parse the range arguments. */
1771 if (reverse) {
1772 /* Range is given as [max,min] */
1773 maxidx = 2; minidx = 3;
1774 } else {
1775 /* Range is given as [min,max] */
1776 minidx = 2; maxidx = 3;
1777 }
1778
1779 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
1780 addReplyError(c,"min or max is not a float");
1781 return;
1782 }
1783
1784 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1785 * 4 arguments, so we'll never enter the following code path. */
1786 if (c->argc > 4) {
1787 int remaining = c->argc - 4;
1788 int pos = 4;
1789
1790 while (remaining) {
1791 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1792 pos++; remaining--;
1793 withscores = 1;
1794 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1795 offset = atoi(c->argv[pos+1]->ptr);
1796 limit = atoi(c->argv[pos+2]->ptr);
1797 pos += 3; remaining -= 3;
1798 } else {
1799 addReply(c,shared.syntaxerr);
1800 return;
1801 }
1802 }
1803 }
1804
1805 /* Ok, lookup the key and get the range */
1806 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
1807 checkType(c,zobj,REDIS_ZSET)) return;
1808
1809 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1810 unsigned char *zl = zobj->ptr;
1811 unsigned char *eptr, *sptr;
1812 unsigned char *vstr;
1813 unsigned int vlen;
1814 long long vlong;
1815 double score;
1816
1817 /* If reversed, get the last node in range as starting point. */
1818 if (reverse) {
1819 eptr = zzlLastInRange(zl,range);
1820 } else {
1821 eptr = zzlFirstInRange(zl,range);
1822 }
1823
1824 /* No "first" element in the specified interval. */
1825 if (eptr == NULL) {
1826 addReply(c, shared.emptymultibulk);
1827 return;
1828 }
1829
1830 /* Get score pointer for the first element. */
1831 redisAssertWithInfo(c,zobj,eptr != NULL);
1832 sptr = ziplistNext(zl,eptr);
1833
1834 /* We don't know in advance how many matching elements there are in the
1835 * list, so we push this object that will represent the multi-bulk
1836 * length in the output buffer, and will "fix" it later */
1837 replylen = addDeferredMultiBulkLength(c);
1838
1839 /* If there is an offset, just traverse the number of elements without
1840 * checking the score because that is done in the next loop. */
1841 while (eptr && offset--) {
1842 if (reverse) {
1843 zzlPrev(zl,&eptr,&sptr);
1844 } else {
1845 zzlNext(zl,&eptr,&sptr);
1846 }
1847 }
1848
1849 while (eptr && limit--) {
1850 score = zzlGetScore(sptr);
1851
1852 /* Abort when the node is no longer in range. */
1853 if (reverse) {
1854 if (!zslValueGteMin(score,&range)) break;
1855 } else {
1856 if (!zslValueLteMax(score,&range)) break;
1857 }
1858
1859 /* We know the element exists, so ziplistGet should always succeed */
1860 redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
1861
1862 rangelen++;
1863 if (vstr == NULL) {
1864 addReplyBulkLongLong(c,vlong);
1865 } else {
1866 addReplyBulkCBuffer(c,vstr,vlen);
1867 }
1868
1869 if (withscores) {
1870 addReplyDouble(c,score);
1871 }
1872
1873 /* Move to next node */
1874 if (reverse) {
1875 zzlPrev(zl,&eptr,&sptr);
1876 } else {
1877 zzlNext(zl,&eptr,&sptr);
1878 }
1879 }
1880 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1881 zset *zs = zobj->ptr;
1882 zskiplist *zsl = zs->zsl;
1883 zskiplistNode *ln;
1884
1885 /* If reversed, get the last node in range as starting point. */
1886 if (reverse) {
1887 ln = zslLastInRange(zsl,range);
1888 } else {
1889 ln = zslFirstInRange(zsl,range);
1890 }
1891
1892 /* No "first" element in the specified interval. */
1893 if (ln == NULL) {
1894 addReply(c, shared.emptymultibulk);
1895 return;
1896 }
1897
1898 /* We don't know in advance how many matching elements there are in the
1899 * list, so we push this object that will represent the multi-bulk
1900 * length in the output buffer, and will "fix" it later */
1901 replylen = addDeferredMultiBulkLength(c);
1902
1903 /* If there is an offset, just traverse the number of elements without
1904 * checking the score because that is done in the next loop. */
1905 while (ln && offset--) {
1906 if (reverse) {
1907 ln = ln->backward;
1908 } else {
1909 ln = ln->level[0].forward;
1910 }
1911 }
1912
1913 while (ln && limit--) {
1914 /* Abort when the node is no longer in range. */
1915 if (reverse) {
1916 if (!zslValueGteMin(ln->score,&range)) break;
1917 } else {
1918 if (!zslValueLteMax(ln->score,&range)) break;
1919 }
1920
1921 rangelen++;
1922 addReplyBulk(c,ln->obj);
1923
1924 if (withscores) {
1925 addReplyDouble(c,ln->score);
1926 }
1927
1928 /* Move to next node */
1929 if (reverse) {
1930 ln = ln->backward;
1931 } else {
1932 ln = ln->level[0].forward;
1933 }
1934 }
1935 } else {
1936 redisPanic("Unknown sorted set encoding");
1937 }
1938
1939 if (withscores) {
1940 rangelen *= 2;
1941 }
1942
1943 setDeferredMultiBulkLength(c, replylen, rangelen);
1944 }
1945
1946 void zrangebyscoreCommand(redisClient *c) {
1947 genericZrangebyscoreCommand(c,0);
1948 }
1949
1950 void zrevrangebyscoreCommand(redisClient *c) {
1951 genericZrangebyscoreCommand(c,1);
1952 }
1953
1954 void zcountCommand(redisClient *c) {
1955 robj *key = c->argv[1];
1956 robj *zobj;
1957 zrangespec range;
1958 int count = 0;
1959
1960 /* Parse the range arguments */
1961 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1962 addReplyError(c,"min or max is not a float");
1963 return;
1964 }
1965
1966 /* Lookup the sorted set */
1967 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
1968 checkType(c, zobj, REDIS_ZSET)) return;
1969
1970 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1971 unsigned char *zl = zobj->ptr;
1972 unsigned char *eptr, *sptr;
1973 double score;
1974
1975 /* Use the first element in range as the starting point */
1976 eptr = zzlFirstInRange(zl,range);
1977
1978 /* No "first" element */
1979 if (eptr == NULL) {
1980 addReply(c, shared.czero);
1981 return;
1982 }
1983
1984 /* First element is in range */
1985 sptr = ziplistNext(zl,eptr);
1986 score = zzlGetScore(sptr);
1987 redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
1988
1989 /* Iterate over elements in range */
1990 while (eptr) {
1991 score = zzlGetScore(sptr);
1992
1993 /* Abort when the node is no longer in range. */
1994 if (!zslValueLteMax(score,&range)) {
1995 break;
1996 } else {
1997 count++;
1998 zzlNext(zl,&eptr,&sptr);
1999 }
2000 }
2001 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2002 zset *zs = zobj->ptr;
2003 zskiplist *zsl = zs->zsl;
2004 zskiplistNode *zn;
2005 unsigned long rank;
2006
2007 /* Find first element in range */
2008 zn = zslFirstInRange(zsl, range);
2009
2010 /* Use rank of first element, if any, to determine preliminary count */
2011 if (zn != NULL) {
2012 rank = zslGetRank(zsl, zn->score, zn->obj);
2013 count = (zsl->length - (rank - 1));
2014
2015 /* Find last element in range */
2016 zn = zslLastInRange(zsl, range);
2017
2018 /* Use rank of last element, if any, to determine the actual count */
2019 if (zn != NULL) {
2020 rank = zslGetRank(zsl, zn->score, zn->obj);
2021 count -= (zsl->length - rank);
2022 }
2023 }
2024 } else {
2025 redisPanic("Unknown sorted set encoding");
2026 }
2027
2028 addReplyLongLong(c, count);
2029 }
2030
2031 void zcardCommand(redisClient *c) {
2032 robj *key = c->argv[1];
2033 robj *zobj;
2034
2035 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
2036 checkType(c,zobj,REDIS_ZSET)) return;
2037
2038 addReplyLongLong(c,zsetLength(zobj));
2039 }
2040
2041 void zscoreCommand(redisClient *c) {
2042 robj *key = c->argv[1];
2043 robj *zobj;
2044 double score;
2045
2046 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2047 checkType(c,zobj,REDIS_ZSET)) return;
2048
2049 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2050 if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
2051 addReplyDouble(c,score);
2052 else
2053 addReply(c,shared.nullbulk);
2054 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2055 zset *zs = zobj->ptr;
2056 dictEntry *de;
2057
2058 c->argv[2] = tryObjectEncoding(c->argv[2]);
2059 de = dictFind(zs->dict,c->argv[2]);
2060 if (de != NULL) {
2061 score = *(double*)dictGetVal(de);
2062 addReplyDouble(c,score);
2063 } else {
2064 addReply(c,shared.nullbulk);
2065 }
2066 } else {
2067 redisPanic("Unknown sorted set encoding");
2068 }
2069 }
2070
2071 void zrankGenericCommand(redisClient *c, int reverse) {
2072 robj *key = c->argv[1];
2073 robj *ele = c->argv[2];
2074 robj *zobj;
2075 unsigned long llen;
2076 unsigned long rank;
2077
2078 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2079 checkType(c,zobj,REDIS_ZSET)) return;
2080 llen = zsetLength(zobj);
2081
2082 redisAssertWithInfo(c,ele,ele->encoding == REDIS_ENCODING_RAW);
2083 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2084 unsigned char *zl = zobj->ptr;
2085 unsigned char *eptr, *sptr;
2086
2087 eptr = ziplistIndex(zl,0);
2088 redisAssertWithInfo(c,zobj,eptr != NULL);
2089 sptr = ziplistNext(zl,eptr);
2090 redisAssertWithInfo(c,zobj,sptr != NULL);
2091
2092 rank = 1;
2093 while(eptr != NULL) {
2094 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
2095 break;
2096 rank++;
2097 zzlNext(zl,&eptr,&sptr);
2098 }
2099
2100 if (eptr != NULL) {
2101 if (reverse)
2102 addReplyLongLong(c,llen-rank);
2103 else
2104 addReplyLongLong(c,rank-1);
2105 } else {
2106 addReply(c,shared.nullbulk);
2107 }
2108 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2109 zset *zs = zobj->ptr;
2110 zskiplist *zsl = zs->zsl;
2111 dictEntry *de;
2112 double score;
2113
2114 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
2115 de = dictFind(zs->dict,ele);
2116 if (de != NULL) {
2117 score = *(double*)dictGetVal(de);
2118 rank = zslGetRank(zsl,score,ele);
2119 redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
2120 if (reverse)
2121 addReplyLongLong(c,llen-rank);
2122 else
2123 addReplyLongLong(c,rank-1);
2124 } else {
2125 addReply(c,shared.nullbulk);
2126 }
2127 } else {
2128 redisPanic("Unknown sorted set encoding");
2129 }
2130 }
2131
2132 void zrankCommand(redisClient *c) {
2133 zrankGenericCommand(c, 0);
2134 }
2135
2136 void zrevrankCommand(redisClient *c) {
2137 zrankGenericCommand(c, 1);
2138 }