]> git.saurik.com Git - redis.git/blob - src/t_zset.c
MIGRATE count of cached sockets in INFO output.
[redis.git] / src / t_zset.c
1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 /*-----------------------------------------------------------------------------
32 * Sorted set API
33 *----------------------------------------------------------------------------*/
34
35 /* ZSETs are ordered sets using two data structures to hold the same elements
36 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
37 * data structure.
38 *
39 * The elements are added to an hash table mapping Redis objects to scores.
40 * At the same time the elements are added to a skip list mapping scores
41 * to Redis objects (so objects are sorted by scores in this "view"). */
42
43 /* This skiplist implementation is almost a C translation of the original
44 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
45 * Alternative to Balanced Trees", modified in three ways:
46 * a) this implementation allows for repeated scores.
47 * b) the comparison is not just by key (our 'score') but by satellite data.
48 * c) there is a back pointer, so it's a doubly linked list with the back
49 * pointers being only at "level 1". This allows to traverse the list
50 * from tail to head, useful for ZREVRANGE. */
51
52 #include "redis.h"
53 #include <math.h>
54
55 zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
56 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
57 zn->score = score;
58 zn->obj = obj;
59 return zn;
60 }
61
62 zskiplist *zslCreate(void) {
63 int j;
64 zskiplist *zsl;
65
66 zsl = zmalloc(sizeof(*zsl));
67 zsl->level = 1;
68 zsl->length = 0;
69 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
70 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
71 zsl->header->level[j].forward = NULL;
72 zsl->header->level[j].span = 0;
73 }
74 zsl->header->backward = NULL;
75 zsl->tail = NULL;
76 return zsl;
77 }
78
79 void zslFreeNode(zskiplistNode *node) {
80 decrRefCount(node->obj);
81 zfree(node);
82 }
83
84 void zslFree(zskiplist *zsl) {
85 zskiplistNode *node = zsl->header->level[0].forward, *next;
86
87 zfree(zsl->header);
88 while(node) {
89 next = node->level[0].forward;
90 zslFreeNode(node);
91 node = next;
92 }
93 zfree(zsl);
94 }
95
96 /* Returns a random level for the new skiplist node we are going to create.
97 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
98 * (both inclusive), with a powerlaw-alike distribution where higher
99 * levels are less likely to be returned. */
100 int zslRandomLevel(void) {
101 int level = 1;
102 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
103 level += 1;
104 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
105 }
106
107 zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
108 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
109 unsigned int rank[ZSKIPLIST_MAXLEVEL];
110 int i, level;
111
112 redisAssert(!isnan(score));
113 x = zsl->header;
114 for (i = zsl->level-1; i >= 0; i--) {
115 /* store rank that is crossed to reach the insert position */
116 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
117 while (x->level[i].forward &&
118 (x->level[i].forward->score < score ||
119 (x->level[i].forward->score == score &&
120 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
121 rank[i] += x->level[i].span;
122 x = x->level[i].forward;
123 }
124 update[i] = x;
125 }
126 /* we assume the key is not already inside, since we allow duplicated
127 * scores, and the re-insertion of score and redis object should never
128 * happpen since the caller of zslInsert() should test in the hash table
129 * if the element is already inside or not. */
130 level = zslRandomLevel();
131 if (level > zsl->level) {
132 for (i = zsl->level; i < level; i++) {
133 rank[i] = 0;
134 update[i] = zsl->header;
135 update[i]->level[i].span = zsl->length;
136 }
137 zsl->level = level;
138 }
139 x = zslCreateNode(level,score,obj);
140 for (i = 0; i < level; i++) {
141 x->level[i].forward = update[i]->level[i].forward;
142 update[i]->level[i].forward = x;
143
144 /* update span covered by update[i] as x is inserted here */
145 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
146 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
147 }
148
149 /* increment span for untouched levels */
150 for (i = level; i < zsl->level; i++) {
151 update[i]->level[i].span++;
152 }
153
154 x->backward = (update[0] == zsl->header) ? NULL : update[0];
155 if (x->level[0].forward)
156 x->level[0].forward->backward = x;
157 else
158 zsl->tail = x;
159 zsl->length++;
160 return x;
161 }
162
163 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
164 void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
165 int i;
166 for (i = 0; i < zsl->level; i++) {
167 if (update[i]->level[i].forward == x) {
168 update[i]->level[i].span += x->level[i].span - 1;
169 update[i]->level[i].forward = x->level[i].forward;
170 } else {
171 update[i]->level[i].span -= 1;
172 }
173 }
174 if (x->level[0].forward) {
175 x->level[0].forward->backward = x->backward;
176 } else {
177 zsl->tail = x->backward;
178 }
179 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
180 zsl->level--;
181 zsl->length--;
182 }
183
184 /* Delete an element with matching score/object from the skiplist. */
185 int zslDelete(zskiplist *zsl, double score, robj *obj) {
186 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
187 int i;
188
189 x = zsl->header;
190 for (i = zsl->level-1; i >= 0; i--) {
191 while (x->level[i].forward &&
192 (x->level[i].forward->score < score ||
193 (x->level[i].forward->score == score &&
194 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
195 x = x->level[i].forward;
196 update[i] = x;
197 }
198 /* We may have multiple elements with the same score, what we need
199 * is to find the element with both the right score and object. */
200 x = x->level[0].forward;
201 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
202 zslDeleteNode(zsl, x, update);
203 zslFreeNode(x);
204 return 1;
205 } else {
206 return 0; /* not found */
207 }
208 return 0; /* not found */
209 }
210
211 static int zslValueGteMin(double value, zrangespec *spec) {
212 return spec->minex ? (value > spec->min) : (value >= spec->min);
213 }
214
215 static int zslValueLteMax(double value, zrangespec *spec) {
216 return spec->maxex ? (value < spec->max) : (value <= spec->max);
217 }
218
219 /* Returns if there is a part of the zset is in range. */
220 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
221 zskiplistNode *x;
222
223 /* Test for ranges that will always be empty. */
224 if (range->min > range->max ||
225 (range->min == range->max && (range->minex || range->maxex)))
226 return 0;
227 x = zsl->tail;
228 if (x == NULL || !zslValueGteMin(x->score,range))
229 return 0;
230 x = zsl->header->level[0].forward;
231 if (x == NULL || !zslValueLteMax(x->score,range))
232 return 0;
233 return 1;
234 }
235
236 /* Find the first node that is contained in the specified range.
237 * Returns NULL when no element is contained in the range. */
238 zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
239 zskiplistNode *x;
240 int i;
241
242 /* If everything is out of range, return early. */
243 if (!zslIsInRange(zsl,&range)) return NULL;
244
245 x = zsl->header;
246 for (i = zsl->level-1; i >= 0; i--) {
247 /* Go forward while *OUT* of range. */
248 while (x->level[i].forward &&
249 !zslValueGteMin(x->level[i].forward->score,&range))
250 x = x->level[i].forward;
251 }
252
253 /* This is an inner range, so the next node cannot be NULL. */
254 x = x->level[0].forward;
255 redisAssert(x != NULL);
256
257 /* Check if score <= max. */
258 if (!zslValueLteMax(x->score,&range)) return NULL;
259 return x;
260 }
261
262 /* Find the last node that is contained in the specified range.
263 * Returns NULL when no element is contained in the range. */
264 zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
265 zskiplistNode *x;
266 int i;
267
268 /* If everything is out of range, return early. */
269 if (!zslIsInRange(zsl,&range)) return NULL;
270
271 x = zsl->header;
272 for (i = zsl->level-1; i >= 0; i--) {
273 /* Go forward while *IN* range. */
274 while (x->level[i].forward &&
275 zslValueLteMax(x->level[i].forward->score,&range))
276 x = x->level[i].forward;
277 }
278
279 /* This is an inner range, so this node cannot be NULL. */
280 redisAssert(x != NULL);
281
282 /* Check if score >= min. */
283 if (!zslValueGteMin(x->score,&range)) return NULL;
284 return x;
285 }
286
287 /* Delete all the elements with score between min and max from the skiplist.
288 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
289 * Note that this function takes the reference to the hash table view of the
290 * sorted set, in order to remove the elements from the hash table too. */
291 unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
292 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
293 unsigned long removed = 0;
294 int i;
295
296 x = zsl->header;
297 for (i = zsl->level-1; i >= 0; i--) {
298 while (x->level[i].forward && (range.minex ?
299 x->level[i].forward->score <= range.min :
300 x->level[i].forward->score < range.min))
301 x = x->level[i].forward;
302 update[i] = x;
303 }
304
305 /* Current node is the last with score < or <= min. */
306 x = x->level[0].forward;
307
308 /* Delete nodes while in range. */
309 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
310 zskiplistNode *next = x->level[0].forward;
311 zslDeleteNode(zsl,x,update);
312 dictDelete(dict,x->obj);
313 zslFreeNode(x);
314 removed++;
315 x = next;
316 }
317 return removed;
318 }
319
320 /* Delete all the elements with rank between start and end from the skiplist.
321 * Start and end are inclusive. Note that start and end need to be 1-based */
322 unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
323 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
324 unsigned long traversed = 0, removed = 0;
325 int i;
326
327 x = zsl->header;
328 for (i = zsl->level-1; i >= 0; i--) {
329 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
330 traversed += x->level[i].span;
331 x = x->level[i].forward;
332 }
333 update[i] = x;
334 }
335
336 traversed++;
337 x = x->level[0].forward;
338 while (x && traversed <= end) {
339 zskiplistNode *next = x->level[0].forward;
340 zslDeleteNode(zsl,x,update);
341 dictDelete(dict,x->obj);
342 zslFreeNode(x);
343 removed++;
344 traversed++;
345 x = next;
346 }
347 return removed;
348 }
349
350 /* Find the rank for an element by both score and key.
351 * Returns 0 when the element cannot be found, rank otherwise.
352 * Note that the rank is 1-based due to the span of zsl->header to the
353 * first element. */
354 unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
355 zskiplistNode *x;
356 unsigned long rank = 0;
357 int i;
358
359 x = zsl->header;
360 for (i = zsl->level-1; i >= 0; i--) {
361 while (x->level[i].forward &&
362 (x->level[i].forward->score < score ||
363 (x->level[i].forward->score == score &&
364 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
365 rank += x->level[i].span;
366 x = x->level[i].forward;
367 }
368
369 /* x might be equal to zsl->header, so test if obj is non-NULL */
370 if (x->obj && equalStringObjects(x->obj,o)) {
371 return rank;
372 }
373 }
374 return 0;
375 }
376
377 /* Finds an element by its rank. The rank argument needs to be 1-based. */
378 zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
379 zskiplistNode *x;
380 unsigned long traversed = 0;
381 int i;
382
383 x = zsl->header;
384 for (i = zsl->level-1; i >= 0; i--) {
385 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
386 {
387 traversed += x->level[i].span;
388 x = x->level[i].forward;
389 }
390 if (traversed == rank) {
391 return x;
392 }
393 }
394 return NULL;
395 }
396
397 /* Populate the rangespec according to the objects min and max. */
398 static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
399 char *eptr;
400 spec->minex = spec->maxex = 0;
401
402 /* Parse the min-max interval. If one of the values is prefixed
403 * by the "(" character, it's considered "open". For instance
404 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
405 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
406 if (min->encoding == REDIS_ENCODING_INT) {
407 spec->min = (long)min->ptr;
408 } else {
409 if (((char*)min->ptr)[0] == '(') {
410 spec->min = strtod((char*)min->ptr+1,&eptr);
411 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
412 spec->minex = 1;
413 } else {
414 spec->min = strtod((char*)min->ptr,&eptr);
415 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
416 }
417 }
418 if (max->encoding == REDIS_ENCODING_INT) {
419 spec->max = (long)max->ptr;
420 } else {
421 if (((char*)max->ptr)[0] == '(') {
422 spec->max = strtod((char*)max->ptr+1,&eptr);
423 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
424 spec->maxex = 1;
425 } else {
426 spec->max = strtod((char*)max->ptr,&eptr);
427 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
428 }
429 }
430
431 return REDIS_OK;
432 }
433
434 /*-----------------------------------------------------------------------------
435 * Ziplist-backed sorted set API
436 *----------------------------------------------------------------------------*/
437
438 double zzlGetScore(unsigned char *sptr) {
439 unsigned char *vstr;
440 unsigned int vlen;
441 long long vlong;
442 char buf[128];
443 double score;
444
445 redisAssert(sptr != NULL);
446 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
447
448 if (vstr) {
449 memcpy(buf,vstr,vlen);
450 buf[vlen] = '\0';
451 score = strtod(buf,NULL);
452 } else {
453 score = vlong;
454 }
455
456 return score;
457 }
458
459 /* Compare element in sorted set with given element. */
460 int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
461 unsigned char *vstr;
462 unsigned int vlen;
463 long long vlong;
464 unsigned char vbuf[32];
465 int minlen, cmp;
466
467 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
468 if (vstr == NULL) {
469 /* Store string representation of long long in buf. */
470 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
471 vstr = vbuf;
472 }
473
474 minlen = (vlen < clen) ? vlen : clen;
475 cmp = memcmp(vstr,cstr,minlen);
476 if (cmp == 0) return vlen-clen;
477 return cmp;
478 }
479
480 unsigned int zzlLength(unsigned char *zl) {
481 return ziplistLen(zl)/2;
482 }
483
484 /* Move to next entry based on the values in eptr and sptr. Both are set to
485 * NULL when there is no next entry. */
486 void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
487 unsigned char *_eptr, *_sptr;
488 redisAssert(*eptr != NULL && *sptr != NULL);
489
490 _eptr = ziplistNext(zl,*sptr);
491 if (_eptr != NULL) {
492 _sptr = ziplistNext(zl,_eptr);
493 redisAssert(_sptr != NULL);
494 } else {
495 /* No next entry. */
496 _sptr = NULL;
497 }
498
499 *eptr = _eptr;
500 *sptr = _sptr;
501 }
502
503 /* Move to the previous entry based on the values in eptr and sptr. Both are
504 * set to NULL when there is no next entry. */
505 void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
506 unsigned char *_eptr, *_sptr;
507 redisAssert(*eptr != NULL && *sptr != NULL);
508
509 _sptr = ziplistPrev(zl,*eptr);
510 if (_sptr != NULL) {
511 _eptr = ziplistPrev(zl,_sptr);
512 redisAssert(_eptr != NULL);
513 } else {
514 /* No previous entry. */
515 _eptr = NULL;
516 }
517
518 *eptr = _eptr;
519 *sptr = _sptr;
520 }
521
522 /* Returns if there is a part of the zset is in range. Should only be used
523 * internally by zzlFirstInRange and zzlLastInRange. */
524 int zzlIsInRange(unsigned char *zl, zrangespec *range) {
525 unsigned char *p;
526 double score;
527
528 /* Test for ranges that will always be empty. */
529 if (range->min > range->max ||
530 (range->min == range->max && (range->minex || range->maxex)))
531 return 0;
532
533 p = ziplistIndex(zl,-1); /* Last score. */
534 if (p == NULL) return 0; /* Empty sorted set */
535 score = zzlGetScore(p);
536 if (!zslValueGteMin(score,range))
537 return 0;
538
539 p = ziplistIndex(zl,1); /* First score. */
540 redisAssert(p != NULL);
541 score = zzlGetScore(p);
542 if (!zslValueLteMax(score,range))
543 return 0;
544
545 return 1;
546 }
547
548 /* Find pointer to the first element contained in the specified range.
549 * Returns NULL when no element is contained in the range. */
550 unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
551 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
552 double score;
553
554 /* If everything is out of range, return early. */
555 if (!zzlIsInRange(zl,&range)) return NULL;
556
557 while (eptr != NULL) {
558 sptr = ziplistNext(zl,eptr);
559 redisAssert(sptr != NULL);
560
561 score = zzlGetScore(sptr);
562 if (zslValueGteMin(score,&range)) {
563 /* Check if score <= max. */
564 if (zslValueLteMax(score,&range))
565 return eptr;
566 return NULL;
567 }
568
569 /* Move to next element. */
570 eptr = ziplistNext(zl,sptr);
571 }
572
573 return NULL;
574 }
575
576 /* Find pointer to the last element contained in the specified range.
577 * Returns NULL when no element is contained in the range. */
578 unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) {
579 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
580 double score;
581
582 /* If everything is out of range, return early. */
583 if (!zzlIsInRange(zl,&range)) return NULL;
584
585 while (eptr != NULL) {
586 sptr = ziplistNext(zl,eptr);
587 redisAssert(sptr != NULL);
588
589 score = zzlGetScore(sptr);
590 if (zslValueLteMax(score,&range)) {
591 /* Check if score >= min. */
592 if (zslValueGteMin(score,&range))
593 return eptr;
594 return NULL;
595 }
596
597 /* Move to previous element by moving to the score of previous element.
598 * When this returns NULL, we know there also is no element. */
599 sptr = ziplistPrev(zl,eptr);
600 if (sptr != NULL)
601 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
602 else
603 eptr = NULL;
604 }
605
606 return NULL;
607 }
608
609 unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
610 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
611
612 ele = getDecodedObject(ele);
613 while (eptr != NULL) {
614 sptr = ziplistNext(zl,eptr);
615 redisAssertWithInfo(NULL,ele,sptr != NULL);
616
617 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
618 /* Matching element, pull out score. */
619 if (score != NULL) *score = zzlGetScore(sptr);
620 decrRefCount(ele);
621 return eptr;
622 }
623
624 /* Move to next element. */
625 eptr = ziplistNext(zl,sptr);
626 }
627
628 decrRefCount(ele);
629 return NULL;
630 }
631
632 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
633 * don't want to modify the one given as argument. */
634 unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
635 unsigned char *p = eptr;
636
637 /* TODO: add function to ziplist API to delete N elements from offset. */
638 zl = ziplistDelete(zl,&p);
639 zl = ziplistDelete(zl,&p);
640 return zl;
641 }
642
643 unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
644 unsigned char *sptr;
645 char scorebuf[128];
646 int scorelen;
647 size_t offset;
648
649 redisAssertWithInfo(NULL,ele,ele->encoding == REDIS_ENCODING_RAW);
650 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
651 if (eptr == NULL) {
652 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
653 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
654 } else {
655 /* Keep offset relative to zl, as it might be re-allocated. */
656 offset = eptr-zl;
657 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
658 eptr = zl+offset;
659
660 /* Insert score after the element. */
661 redisAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
662 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
663 }
664
665 return zl;
666 }
667
668 /* Insert (element,score) pair in ziplist. This function assumes the element is
669 * not yet present in the list. */
670 unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
671 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
672 double s;
673
674 ele = getDecodedObject(ele);
675 while (eptr != NULL) {
676 sptr = ziplistNext(zl,eptr);
677 redisAssertWithInfo(NULL,ele,sptr != NULL);
678 s = zzlGetScore(sptr);
679
680 if (s > score) {
681 /* First element with score larger than score for element to be
682 * inserted. This means we should take its spot in the list to
683 * maintain ordering. */
684 zl = zzlInsertAt(zl,eptr,ele,score);
685 break;
686 } else if (s == score) {
687 /* Ensure lexicographical ordering for elements. */
688 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
689 zl = zzlInsertAt(zl,eptr,ele,score);
690 break;
691 }
692 }
693
694 /* Move to next element. */
695 eptr = ziplistNext(zl,sptr);
696 }
697
698 /* Push on tail of list when it was not yet inserted. */
699 if (eptr == NULL)
700 zl = zzlInsertAt(zl,NULL,ele,score);
701
702 decrRefCount(ele);
703 return zl;
704 }
705
706 unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
707 unsigned char *eptr, *sptr;
708 double score;
709 unsigned long num = 0;
710
711 if (deleted != NULL) *deleted = 0;
712
713 eptr = zzlFirstInRange(zl,range);
714 if (eptr == NULL) return zl;
715
716 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
717 * byte and ziplistNext will return NULL. */
718 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
719 score = zzlGetScore(sptr);
720 if (zslValueLteMax(score,&range)) {
721 /* Delete both the element and the score. */
722 zl = ziplistDelete(zl,&eptr);
723 zl = ziplistDelete(zl,&eptr);
724 num++;
725 } else {
726 /* No longer in range. */
727 break;
728 }
729 }
730
731 if (deleted != NULL) *deleted = num;
732 return zl;
733 }
734
735 /* Delete all the elements with rank between start and end from the skiplist.
736 * Start and end are inclusive. Note that start and end need to be 1-based */
737 unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
738 unsigned int num = (end-start)+1;
739 if (deleted) *deleted = num;
740 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
741 return zl;
742 }
743
744 /*-----------------------------------------------------------------------------
745 * Common sorted set API
746 *----------------------------------------------------------------------------*/
747
748 unsigned int zsetLength(robj *zobj) {
749 int length = -1;
750 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
751 length = zzlLength(zobj->ptr);
752 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
753 length = ((zset*)zobj->ptr)->zsl->length;
754 } else {
755 redisPanic("Unknown sorted set encoding");
756 }
757 return length;
758 }
759
760 void zsetConvert(robj *zobj, int encoding) {
761 zset *zs;
762 zskiplistNode *node, *next;
763 robj *ele;
764 double score;
765
766 if (zobj->encoding == encoding) return;
767 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
768 unsigned char *zl = zobj->ptr;
769 unsigned char *eptr, *sptr;
770 unsigned char *vstr;
771 unsigned int vlen;
772 long long vlong;
773
774 if (encoding != REDIS_ENCODING_SKIPLIST)
775 redisPanic("Unknown target encoding");
776
777 zs = zmalloc(sizeof(*zs));
778 zs->dict = dictCreate(&zsetDictType,NULL);
779 zs->zsl = zslCreate();
780
781 eptr = ziplistIndex(zl,0);
782 redisAssertWithInfo(NULL,zobj,eptr != NULL);
783 sptr = ziplistNext(zl,eptr);
784 redisAssertWithInfo(NULL,zobj,sptr != NULL);
785
786 while (eptr != NULL) {
787 score = zzlGetScore(sptr);
788 redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
789 if (vstr == NULL)
790 ele = createStringObjectFromLongLong(vlong);
791 else
792 ele = createStringObject((char*)vstr,vlen);
793
794 /* Has incremented refcount since it was just created. */
795 node = zslInsert(zs->zsl,score,ele);
796 redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
797 incrRefCount(ele); /* Added to dictionary. */
798 zzlNext(zl,&eptr,&sptr);
799 }
800
801 zfree(zobj->ptr);
802 zobj->ptr = zs;
803 zobj->encoding = REDIS_ENCODING_SKIPLIST;
804 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
805 unsigned char *zl = ziplistNew();
806
807 if (encoding != REDIS_ENCODING_ZIPLIST)
808 redisPanic("Unknown target encoding");
809
810 /* Approach similar to zslFree(), since we want to free the skiplist at
811 * the same time as creating the ziplist. */
812 zs = zobj->ptr;
813 dictRelease(zs->dict);
814 node = zs->zsl->header->level[0].forward;
815 zfree(zs->zsl->header);
816 zfree(zs->zsl);
817
818 while (node) {
819 ele = getDecodedObject(node->obj);
820 zl = zzlInsertAt(zl,NULL,ele,node->score);
821 decrRefCount(ele);
822
823 next = node->level[0].forward;
824 zslFreeNode(node);
825 node = next;
826 }
827
828 zfree(zs);
829 zobj->ptr = zl;
830 zobj->encoding = REDIS_ENCODING_ZIPLIST;
831 } else {
832 redisPanic("Unknown sorted set encoding");
833 }
834 }
835
836 /*-----------------------------------------------------------------------------
837 * Sorted set commands
838 *----------------------------------------------------------------------------*/
839
840 /* This generic command implements both ZADD and ZINCRBY. */
841 void zaddGenericCommand(redisClient *c, int incr) {
842 static char *nanerr = "resulting score is not a number (NaN)";
843 robj *key = c->argv[1];
844 robj *ele;
845 robj *zobj;
846 robj *curobj;
847 double score = 0, *scores, curscore = 0.0;
848 int j, elements = (c->argc-2)/2;
849 int added = 0;
850
851 if (c->argc % 2) {
852 addReply(c,shared.syntaxerr);
853 return;
854 }
855
856 /* Start parsing all the scores, we need to emit any syntax error
857 * before executing additions to the sorted set, as the command should
858 * either execute fully or nothing at all. */
859 scores = zmalloc(sizeof(double)*elements);
860 for (j = 0; j < elements; j++) {
861 if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
862 != REDIS_OK)
863 {
864 zfree(scores);
865 return;
866 }
867 }
868
869 /* Lookup the key and create the sorted set if does not exist. */
870 zobj = lookupKeyWrite(c->db,key);
871 if (zobj == NULL) {
872 if (server.zset_max_ziplist_entries == 0 ||
873 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
874 {
875 zobj = createZsetObject();
876 } else {
877 zobj = createZsetZiplistObject();
878 }
879 dbAdd(c->db,key,zobj);
880 } else {
881 if (zobj->type != REDIS_ZSET) {
882 addReply(c,shared.wrongtypeerr);
883 zfree(scores);
884 return;
885 }
886 }
887
888 for (j = 0; j < elements; j++) {
889 score = scores[j];
890
891 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
892 unsigned char *eptr;
893
894 /* Prefer non-encoded element when dealing with ziplists. */
895 ele = c->argv[3+j*2];
896 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
897 if (incr) {
898 score += curscore;
899 if (isnan(score)) {
900 addReplyError(c,nanerr);
901 /* Don't need to check if the sorted set is empty
902 * because we know it has at least one element. */
903 zfree(scores);
904 return;
905 }
906 }
907
908 /* Remove and re-insert when score changed. */
909 if (score != curscore) {
910 zobj->ptr = zzlDelete(zobj->ptr,eptr);
911 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
912
913 signalModifiedKey(c->db,key);
914 server.dirty++;
915 }
916 } else {
917 /* Optimize: check if the element is too large or the list
918 * becomes too long *before* executing zzlInsert. */
919 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
920 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
921 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
922 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
923 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
924
925 signalModifiedKey(c->db,key);
926 server.dirty++;
927 if (!incr) added++;
928 }
929 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
930 zset *zs = zobj->ptr;
931 zskiplistNode *znode;
932 dictEntry *de;
933
934 ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
935 de = dictFind(zs->dict,ele);
936 if (de != NULL) {
937 curobj = dictGetKey(de);
938 curscore = *(double*)dictGetVal(de);
939
940 if (incr) {
941 score += curscore;
942 if (isnan(score)) {
943 addReplyError(c,nanerr);
944 /* Don't need to check if the sorted set is empty
945 * because we know it has at least one element. */
946 zfree(scores);
947 return;
948 }
949 }
950
951 /* Remove and re-insert when score changed. We can safely
952 * delete the key object from the skiplist, since the
953 * dictionary still has a reference to it. */
954 if (score != curscore) {
955 redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
956 znode = zslInsert(zs->zsl,score,curobj);
957 incrRefCount(curobj); /* Re-inserted in skiplist. */
958 dictGetVal(de) = &znode->score; /* Update score ptr. */
959
960 signalModifiedKey(c->db,key);
961 server.dirty++;
962 }
963 } else {
964 znode = zslInsert(zs->zsl,score,ele);
965 incrRefCount(ele); /* Inserted in skiplist. */
966 redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
967 incrRefCount(ele); /* Added to dictionary. */
968
969 signalModifiedKey(c->db,key);
970 server.dirty++;
971 if (!incr) added++;
972 }
973 } else {
974 redisPanic("Unknown sorted set encoding");
975 }
976 }
977 zfree(scores);
978 if (incr) /* ZINCRBY */
979 addReplyDouble(c,score);
980 else /* ZADD */
981 addReplyLongLong(c,added);
982 }
983
984 void zaddCommand(redisClient *c) {
985 zaddGenericCommand(c,0);
986 }
987
988 void zincrbyCommand(redisClient *c) {
989 zaddGenericCommand(c,1);
990 }
991
992 void zremCommand(redisClient *c) {
993 robj *key = c->argv[1];
994 robj *zobj;
995 int deleted = 0, j;
996
997 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
998 checkType(c,zobj,REDIS_ZSET)) return;
999
1000 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1001 unsigned char *eptr;
1002
1003 for (j = 2; j < c->argc; j++) {
1004 if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
1005 deleted++;
1006 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1007 if (zzlLength(zobj->ptr) == 0) {
1008 dbDelete(c->db,key);
1009 break;
1010 }
1011 }
1012 }
1013 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1014 zset *zs = zobj->ptr;
1015 dictEntry *de;
1016 double score;
1017
1018 for (j = 2; j < c->argc; j++) {
1019 de = dictFind(zs->dict,c->argv[j]);
1020 if (de != NULL) {
1021 deleted++;
1022
1023 /* Delete from the skiplist */
1024 score = *(double*)dictGetVal(de);
1025 redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
1026
1027 /* Delete from the hash table */
1028 dictDelete(zs->dict,c->argv[j]);
1029 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1030 if (dictSize(zs->dict) == 0) {
1031 dbDelete(c->db,key);
1032 break;
1033 }
1034 }
1035 }
1036 } else {
1037 redisPanic("Unknown sorted set encoding");
1038 }
1039
1040 if (deleted) {
1041 signalModifiedKey(c->db,key);
1042 server.dirty += deleted;
1043 }
1044 addReplyLongLong(c,deleted);
1045 }
1046
1047 void zremrangebyscoreCommand(redisClient *c) {
1048 robj *key = c->argv[1];
1049 robj *zobj;
1050 zrangespec range;
1051 unsigned long deleted;
1052
1053 /* Parse the range arguments. */
1054 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
1055 addReplyError(c,"min or max is not a float");
1056 return;
1057 }
1058
1059 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1060 checkType(c,zobj,REDIS_ZSET)) return;
1061
1062 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1063 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
1064 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
1065 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1066 zset *zs = zobj->ptr;
1067 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1068 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1069 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1070 } else {
1071 redisPanic("Unknown sorted set encoding");
1072 }
1073
1074 if (deleted) signalModifiedKey(c->db,key);
1075 server.dirty += deleted;
1076 addReplyLongLong(c,deleted);
1077 }
1078
1079 void zremrangebyrankCommand(redisClient *c) {
1080 robj *key = c->argv[1];
1081 robj *zobj;
1082 long start;
1083 long end;
1084 int llen;
1085 unsigned long deleted;
1086
1087 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1088 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1089
1090 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1091 checkType(c,zobj,REDIS_ZSET)) return;
1092
1093 /* Sanitize indexes. */
1094 llen = zsetLength(zobj);
1095 if (start < 0) start = llen+start;
1096 if (end < 0) end = llen+end;
1097 if (start < 0) start = 0;
1098
1099 /* Invariant: start >= 0, so this test will be true when end < 0.
1100 * The range is empty when start > end or start >= length. */
1101 if (start > end || start >= llen) {
1102 addReply(c,shared.czero);
1103 return;
1104 }
1105 if (end >= llen) end = llen-1;
1106
1107 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1108 /* Correct for 1-based rank. */
1109 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
1110 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
1111 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1112 zset *zs = zobj->ptr;
1113
1114 /* Correct for 1-based rank. */
1115 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1116 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1117 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1118 } else {
1119 redisPanic("Unknown sorted set encoding");
1120 }
1121
1122 if (deleted) signalModifiedKey(c->db,key);
1123 server.dirty += deleted;
1124 addReplyLongLong(c,deleted);
1125 }
1126
1127 typedef struct {
1128 robj *subject;
1129 int type; /* Set, sorted set */
1130 int encoding;
1131 double weight;
1132
1133 union {
1134 /* Set iterators. */
1135 union _iterset {
1136 struct {
1137 intset *is;
1138 int ii;
1139 } is;
1140 struct {
1141 dict *dict;
1142 dictIterator *di;
1143 dictEntry *de;
1144 } ht;
1145 } set;
1146
1147 /* Sorted set iterators. */
1148 union _iterzset {
1149 struct {
1150 unsigned char *zl;
1151 unsigned char *eptr, *sptr;
1152 } zl;
1153 struct {
1154 zset *zs;
1155 zskiplistNode *node;
1156 } sl;
1157 } zset;
1158 } iter;
1159 } zsetopsrc;
1160
1161
1162 /* Use dirty flags for pointers that need to be cleaned up in the next
1163 * iteration over the zsetopval. The dirty flag for the long long value is
1164 * special, since long long values don't need cleanup. Instead, it means that
1165 * we already checked that "ell" holds a long long, or tried to convert another
1166 * representation into a long long value. When this was successful,
1167 * OPVAL_VALID_LL is set as well. */
1168 #define OPVAL_DIRTY_ROBJ 1
1169 #define OPVAL_DIRTY_LL 2
1170 #define OPVAL_VALID_LL 4
1171
1172 /* Store value retrieved from the iterator. */
1173 typedef struct {
1174 int flags;
1175 unsigned char _buf[32]; /* Private buffer. */
1176 robj *ele;
1177 unsigned char *estr;
1178 unsigned int elen;
1179 long long ell;
1180 double score;
1181 } zsetopval;
1182
1183 typedef union _iterset iterset;
1184 typedef union _iterzset iterzset;
1185
1186 void zuiInitIterator(zsetopsrc *op) {
1187 if (op->subject == NULL)
1188 return;
1189
1190 if (op->type == REDIS_SET) {
1191 iterset *it = &op->iter.set;
1192 if (op->encoding == REDIS_ENCODING_INTSET) {
1193 it->is.is = op->subject->ptr;
1194 it->is.ii = 0;
1195 } else if (op->encoding == REDIS_ENCODING_HT) {
1196 it->ht.dict = op->subject->ptr;
1197 it->ht.di = dictGetIterator(op->subject->ptr);
1198 it->ht.de = dictNext(it->ht.di);
1199 } else {
1200 redisPanic("Unknown set encoding");
1201 }
1202 } else if (op->type == REDIS_ZSET) {
1203 iterzset *it = &op->iter.zset;
1204 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1205 it->zl.zl = op->subject->ptr;
1206 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1207 if (it->zl.eptr != NULL) {
1208 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1209 redisAssert(it->zl.sptr != NULL);
1210 }
1211 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1212 it->sl.zs = op->subject->ptr;
1213 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1214 } else {
1215 redisPanic("Unknown sorted set encoding");
1216 }
1217 } else {
1218 redisPanic("Unsupported type");
1219 }
1220 }
1221
1222 void zuiClearIterator(zsetopsrc *op) {
1223 if (op->subject == NULL)
1224 return;
1225
1226 if (op->type == REDIS_SET) {
1227 iterset *it = &op->iter.set;
1228 if (op->encoding == REDIS_ENCODING_INTSET) {
1229 REDIS_NOTUSED(it); /* skip */
1230 } else if (op->encoding == REDIS_ENCODING_HT) {
1231 dictReleaseIterator(it->ht.di);
1232 } else {
1233 redisPanic("Unknown set encoding");
1234 }
1235 } else if (op->type == REDIS_ZSET) {
1236 iterzset *it = &op->iter.zset;
1237 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1238 REDIS_NOTUSED(it); /* skip */
1239 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1240 REDIS_NOTUSED(it); /* skip */
1241 } else {
1242 redisPanic("Unknown sorted set encoding");
1243 }
1244 } else {
1245 redisPanic("Unsupported type");
1246 }
1247 }
1248
1249 int zuiLength(zsetopsrc *op) {
1250 if (op->subject == NULL)
1251 return 0;
1252
1253 if (op->type == REDIS_SET) {
1254 iterset *it = &op->iter.set;
1255 if (op->encoding == REDIS_ENCODING_INTSET) {
1256 return intsetLen(it->is.is);
1257 } else if (op->encoding == REDIS_ENCODING_HT) {
1258 return dictSize(it->ht.dict);
1259 } else {
1260 redisPanic("Unknown set encoding");
1261 }
1262 } else if (op->type == REDIS_ZSET) {
1263 iterzset *it = &op->iter.zset;
1264 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1265 return zzlLength(it->zl.zl);
1266 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1267 return it->sl.zs->zsl->length;
1268 } else {
1269 redisPanic("Unknown sorted set encoding");
1270 }
1271 } else {
1272 redisPanic("Unsupported type");
1273 }
1274 }
1275
1276 /* Check if the current value is valid. If so, store it in the passed structure
1277 * and move to the next element. If not valid, this means we have reached the
1278 * end of the structure and can abort. */
1279 int zuiNext(zsetopsrc *op, zsetopval *val) {
1280 if (op->subject == NULL)
1281 return 0;
1282
1283 if (val->flags & OPVAL_DIRTY_ROBJ)
1284 decrRefCount(val->ele);
1285
1286 memset(val,0,sizeof(zsetopval));
1287
1288 if (op->type == REDIS_SET) {
1289 iterset *it = &op->iter.set;
1290 if (op->encoding == REDIS_ENCODING_INTSET) {
1291 int64_t ell;
1292
1293 if (!intsetGet(it->is.is,it->is.ii,&ell))
1294 return 0;
1295 val->ell = ell;
1296 val->score = 1.0;
1297
1298 /* Move to next element. */
1299 it->is.ii++;
1300 } else if (op->encoding == REDIS_ENCODING_HT) {
1301 if (it->ht.de == NULL)
1302 return 0;
1303 val->ele = dictGetKey(it->ht.de);
1304 val->score = 1.0;
1305
1306 /* Move to next element. */
1307 it->ht.de = dictNext(it->ht.di);
1308 } else {
1309 redisPanic("Unknown set encoding");
1310 }
1311 } else if (op->type == REDIS_ZSET) {
1312 iterzset *it = &op->iter.zset;
1313 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1314 /* No need to check both, but better be explicit. */
1315 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1316 return 0;
1317 redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1318 val->score = zzlGetScore(it->zl.sptr);
1319
1320 /* Move to next element. */
1321 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
1322 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1323 if (it->sl.node == NULL)
1324 return 0;
1325 val->ele = it->sl.node->obj;
1326 val->score = it->sl.node->score;
1327
1328 /* Move to next element. */
1329 it->sl.node = it->sl.node->level[0].forward;
1330 } else {
1331 redisPanic("Unknown sorted set encoding");
1332 }
1333 } else {
1334 redisPanic("Unsupported type");
1335 }
1336 return 1;
1337 }
1338
1339 int zuiLongLongFromValue(zsetopval *val) {
1340 if (!(val->flags & OPVAL_DIRTY_LL)) {
1341 val->flags |= OPVAL_DIRTY_LL;
1342
1343 if (val->ele != NULL) {
1344 if (val->ele->encoding == REDIS_ENCODING_INT) {
1345 val->ell = (long)val->ele->ptr;
1346 val->flags |= OPVAL_VALID_LL;
1347 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1348 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1349 val->flags |= OPVAL_VALID_LL;
1350 } else {
1351 redisPanic("Unsupported element encoding");
1352 }
1353 } else if (val->estr != NULL) {
1354 if (string2ll((char*)val->estr,val->elen,&val->ell))
1355 val->flags |= OPVAL_VALID_LL;
1356 } else {
1357 /* The long long was already set, flag as valid. */
1358 val->flags |= OPVAL_VALID_LL;
1359 }
1360 }
1361 return val->flags & OPVAL_VALID_LL;
1362 }
1363
1364 robj *zuiObjectFromValue(zsetopval *val) {
1365 if (val->ele == NULL) {
1366 if (val->estr != NULL) {
1367 val->ele = createStringObject((char*)val->estr,val->elen);
1368 } else {
1369 val->ele = createStringObjectFromLongLong(val->ell);
1370 }
1371 val->flags |= OPVAL_DIRTY_ROBJ;
1372 }
1373 return val->ele;
1374 }
1375
1376 int zuiBufferFromValue(zsetopval *val) {
1377 if (val->estr == NULL) {
1378 if (val->ele != NULL) {
1379 if (val->ele->encoding == REDIS_ENCODING_INT) {
1380 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1381 val->estr = val->_buf;
1382 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1383 val->elen = sdslen(val->ele->ptr);
1384 val->estr = val->ele->ptr;
1385 } else {
1386 redisPanic("Unsupported element encoding");
1387 }
1388 } else {
1389 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1390 val->estr = val->_buf;
1391 }
1392 }
1393 return 1;
1394 }
1395
1396 /* Find value pointed to by val in the source pointer to by op. When found,
1397 * return 1 and store its score in target. Return 0 otherwise. */
1398 int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1399 if (op->subject == NULL)
1400 return 0;
1401
1402 if (op->type == REDIS_SET) {
1403 iterset *it = &op->iter.set;
1404
1405 if (op->encoding == REDIS_ENCODING_INTSET) {
1406 if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1407 *score = 1.0;
1408 return 1;
1409 } else {
1410 return 0;
1411 }
1412 } else if (op->encoding == REDIS_ENCODING_HT) {
1413 zuiObjectFromValue(val);
1414 if (dictFind(it->ht.dict,val->ele) != NULL) {
1415 *score = 1.0;
1416 return 1;
1417 } else {
1418 return 0;
1419 }
1420 } else {
1421 redisPanic("Unknown set encoding");
1422 }
1423 } else if (op->type == REDIS_ZSET) {
1424 iterzset *it = &op->iter.zset;
1425 zuiObjectFromValue(val);
1426
1427 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1428 if (zzlFind(it->zl.zl,val->ele,score) != NULL) {
1429 /* Score is already set by zzlFind. */
1430 return 1;
1431 } else {
1432 return 0;
1433 }
1434 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
1435 dictEntry *de;
1436 if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
1437 *score = *(double*)dictGetVal(de);
1438 return 1;
1439 } else {
1440 return 0;
1441 }
1442 } else {
1443 redisPanic("Unknown sorted set encoding");
1444 }
1445 } else {
1446 redisPanic("Unsupported type");
1447 }
1448 }
1449
1450 int zuiCompareByCardinality(const void *s1, const void *s2) {
1451 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
1452 }
1453
1454 #define REDIS_AGGR_SUM 1
1455 #define REDIS_AGGR_MIN 2
1456 #define REDIS_AGGR_MAX 3
1457 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
1458
1459 inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1460 if (aggregate == REDIS_AGGR_SUM) {
1461 *target = *target + val;
1462 /* The result of adding two doubles is NaN when one variable
1463 * is +inf and the other is -inf. When these numbers are added,
1464 * we maintain the convention of the result being 0.0. */
1465 if (isnan(*target)) *target = 0.0;
1466 } else if (aggregate == REDIS_AGGR_MIN) {
1467 *target = val < *target ? val : *target;
1468 } else if (aggregate == REDIS_AGGR_MAX) {
1469 *target = val > *target ? val : *target;
1470 } else {
1471 /* safety net */
1472 redisPanic("Unknown ZUNION/INTER aggregate type");
1473 }
1474 }
1475
1476 void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
1477 int i, j;
1478 long setnum;
1479 int aggregate = REDIS_AGGR_SUM;
1480 zsetopsrc *src;
1481 zsetopval zval;
1482 robj *tmp;
1483 unsigned int maxelelen = 0;
1484 robj *dstobj;
1485 zset *dstzset;
1486 zskiplistNode *znode;
1487 int touched = 0;
1488
1489 /* expect setnum input keys to be given */
1490 if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK))
1491 return;
1492
1493 if (setnum < 1) {
1494 addReplyError(c,
1495 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1496 return;
1497 }
1498
1499 /* test if the expected number of keys would overflow */
1500 if (3+setnum > c->argc) {
1501 addReply(c,shared.syntaxerr);
1502 return;
1503 }
1504
1505 /* read keys to be used for input */
1506 src = zcalloc(sizeof(zsetopsrc) * setnum);
1507 for (i = 0, j = 3; i < setnum; i++, j++) {
1508 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
1509 if (obj != NULL) {
1510 if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
1511 zfree(src);
1512 addReply(c,shared.wrongtypeerr);
1513 return;
1514 }
1515
1516 src[i].subject = obj;
1517 src[i].type = obj->type;
1518 src[i].encoding = obj->encoding;
1519 } else {
1520 src[i].subject = NULL;
1521 }
1522
1523 /* Default all weights to 1. */
1524 src[i].weight = 1.0;
1525 }
1526
1527 /* parse optional extra arguments */
1528 if (j < c->argc) {
1529 int remaining = c->argc - j;
1530
1531 while (remaining) {
1532 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1533 j++; remaining--;
1534 for (i = 0; i < setnum; i++, j++, remaining--) {
1535 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
1536 "weight value is not a float") != REDIS_OK)
1537 {
1538 zfree(src);
1539 return;
1540 }
1541 }
1542 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1543 j++; remaining--;
1544 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1545 aggregate = REDIS_AGGR_SUM;
1546 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1547 aggregate = REDIS_AGGR_MIN;
1548 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1549 aggregate = REDIS_AGGR_MAX;
1550 } else {
1551 zfree(src);
1552 addReply(c,shared.syntaxerr);
1553 return;
1554 }
1555 j++; remaining--;
1556 } else {
1557 zfree(src);
1558 addReply(c,shared.syntaxerr);
1559 return;
1560 }
1561 }
1562 }
1563
1564 for (i = 0; i < setnum; i++)
1565 zuiInitIterator(&src[i]);
1566
1567 /* sort sets from the smallest to largest, this will improve our
1568 * algorithm's performance */
1569 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
1570
1571 dstobj = createZsetObject();
1572 dstzset = dstobj->ptr;
1573 memset(&zval, 0, sizeof(zval));
1574
1575 if (op == REDIS_OP_INTER) {
1576 /* Skip everything if the smallest input is empty. */
1577 if (zuiLength(&src[0]) > 0) {
1578 /* Precondition: as src[0] is non-empty and the inputs are ordered
1579 * by size, all src[i > 0] are non-empty too. */
1580 while (zuiNext(&src[0],&zval)) {
1581 double score, value;
1582
1583 score = src[0].weight * zval.score;
1584 if (isnan(score)) score = 0;
1585
1586 for (j = 1; j < setnum; j++) {
1587 /* It is not safe to access the zset we are
1588 * iterating, so explicitly check for equal object. */
1589 if (src[j].subject == src[0].subject) {
1590 value = zval.score*src[j].weight;
1591 zunionInterAggregate(&score,value,aggregate);
1592 } else if (zuiFind(&src[j],&zval,&value)) {
1593 value *= src[j].weight;
1594 zunionInterAggregate(&score,value,aggregate);
1595 } else {
1596 break;
1597 }
1598 }
1599
1600 /* Only continue when present in every input. */
1601 if (j == setnum) {
1602 tmp = zuiObjectFromValue(&zval);
1603 znode = zslInsert(dstzset->zsl,score,tmp);
1604 incrRefCount(tmp); /* added to skiplist */
1605 dictAdd(dstzset->dict,tmp,&znode->score);
1606 incrRefCount(tmp); /* added to dictionary */
1607
1608 if (tmp->encoding == REDIS_ENCODING_RAW)
1609 if (sdslen(tmp->ptr) > maxelelen)
1610 maxelelen = sdslen(tmp->ptr);
1611 }
1612 }
1613 }
1614 } else if (op == REDIS_OP_UNION) {
1615 for (i = 0; i < setnum; i++) {
1616 if (zuiLength(&src[i]) == 0)
1617 continue;
1618
1619 while (zuiNext(&src[i],&zval)) {
1620 double score, value;
1621
1622 /* Skip key when already processed */
1623 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
1624 continue;
1625
1626 /* Initialize score */
1627 score = src[i].weight * zval.score;
1628 if (isnan(score)) score = 0;
1629
1630 /* Because the inputs are sorted by size, it's only possible
1631 * for sets at larger indices to hold this element. */
1632 for (j = (i+1); j < setnum; j++) {
1633 /* It is not safe to access the zset we are
1634 * iterating, so explicitly check for equal object. */
1635 if(src[j].subject == src[i].subject) {
1636 value = zval.score*src[j].weight;
1637 zunionInterAggregate(&score,value,aggregate);
1638 } else if (zuiFind(&src[j],&zval,&value)) {
1639 value *= src[j].weight;
1640 zunionInterAggregate(&score,value,aggregate);
1641 }
1642 }
1643
1644 tmp = zuiObjectFromValue(&zval);
1645 znode = zslInsert(dstzset->zsl,score,tmp);
1646 incrRefCount(zval.ele); /* added to skiplist */
1647 dictAdd(dstzset->dict,tmp,&znode->score);
1648 incrRefCount(zval.ele); /* added to dictionary */
1649
1650 if (tmp->encoding == REDIS_ENCODING_RAW)
1651 if (sdslen(tmp->ptr) > maxelelen)
1652 maxelelen = sdslen(tmp->ptr);
1653 }
1654 }
1655 } else {
1656 redisPanic("Unknown operator");
1657 }
1658
1659 for (i = 0; i < setnum; i++)
1660 zuiClearIterator(&src[i]);
1661
1662 if (dbDelete(c->db,dstkey)) {
1663 signalModifiedKey(c->db,dstkey);
1664 touched = 1;
1665 server.dirty++;
1666 }
1667 if (dstzset->zsl->length) {
1668 /* Convert to ziplist when in limits. */
1669 if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
1670 maxelelen <= server.zset_max_ziplist_value)
1671 zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
1672
1673 dbAdd(c->db,dstkey,dstobj);
1674 addReplyLongLong(c,zsetLength(dstobj));
1675 if (!touched) signalModifiedKey(c->db,dstkey);
1676 server.dirty++;
1677 } else {
1678 decrRefCount(dstobj);
1679 addReply(c,shared.czero);
1680 }
1681 zfree(src);
1682 }
1683
1684 void zunionstoreCommand(redisClient *c) {
1685 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1686 }
1687
1688 void zinterstoreCommand(redisClient *c) {
1689 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1690 }
1691
1692 void zrangeGenericCommand(redisClient *c, int reverse) {
1693 robj *key = c->argv[1];
1694 robj *zobj;
1695 int withscores = 0;
1696 long start;
1697 long end;
1698 int llen;
1699 int rangelen;
1700
1701 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1702 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1703
1704 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1705 withscores = 1;
1706 } else if (c->argc >= 5) {
1707 addReply(c,shared.syntaxerr);
1708 return;
1709 }
1710
1711 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1712 || checkType(c,zobj,REDIS_ZSET)) return;
1713
1714 /* Sanitize indexes. */
1715 llen = zsetLength(zobj);
1716 if (start < 0) start = llen+start;
1717 if (end < 0) end = llen+end;
1718 if (start < 0) start = 0;
1719
1720 /* Invariant: start >= 0, so this test will be true when end < 0.
1721 * The range is empty when start > end or start >= length. */
1722 if (start > end || start >= llen) {
1723 addReply(c,shared.emptymultibulk);
1724 return;
1725 }
1726 if (end >= llen) end = llen-1;
1727 rangelen = (end-start)+1;
1728
1729 /* Return the result in form of a multi-bulk reply */
1730 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1731
1732 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1733 unsigned char *zl = zobj->ptr;
1734 unsigned char *eptr, *sptr;
1735 unsigned char *vstr;
1736 unsigned int vlen;
1737 long long vlong;
1738
1739 if (reverse)
1740 eptr = ziplistIndex(zl,-2-(2*start));
1741 else
1742 eptr = ziplistIndex(zl,2*start);
1743
1744 redisAssertWithInfo(c,zobj,eptr != NULL);
1745 sptr = ziplistNext(zl,eptr);
1746
1747 while (rangelen--) {
1748 redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
1749 redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
1750 if (vstr == NULL)
1751 addReplyBulkLongLong(c,vlong);
1752 else
1753 addReplyBulkCBuffer(c,vstr,vlen);
1754
1755 if (withscores)
1756 addReplyDouble(c,zzlGetScore(sptr));
1757
1758 if (reverse)
1759 zzlPrev(zl,&eptr,&sptr);
1760 else
1761 zzlNext(zl,&eptr,&sptr);
1762 }
1763
1764 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1765 zset *zs = zobj->ptr;
1766 zskiplist *zsl = zs->zsl;
1767 zskiplistNode *ln;
1768 robj *ele;
1769
1770 /* Check if starting point is trivial, before doing log(N) lookup. */
1771 if (reverse) {
1772 ln = zsl->tail;
1773 if (start > 0)
1774 ln = zslGetElementByRank(zsl,llen-start);
1775 } else {
1776 ln = zsl->header->level[0].forward;
1777 if (start > 0)
1778 ln = zslGetElementByRank(zsl,start+1);
1779 }
1780
1781 while(rangelen--) {
1782 redisAssertWithInfo(c,zobj,ln != NULL);
1783 ele = ln->obj;
1784 addReplyBulk(c,ele);
1785 if (withscores)
1786 addReplyDouble(c,ln->score);
1787 ln = reverse ? ln->backward : ln->level[0].forward;
1788 }
1789 } else {
1790 redisPanic("Unknown sorted set encoding");
1791 }
1792 }
1793
1794 void zrangeCommand(redisClient *c) {
1795 zrangeGenericCommand(c,0);
1796 }
1797
1798 void zrevrangeCommand(redisClient *c) {
1799 zrangeGenericCommand(c,1);
1800 }
1801
1802 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
1803 void genericZrangebyscoreCommand(redisClient *c, int reverse) {
1804 zrangespec range;
1805 robj *key = c->argv[1];
1806 robj *zobj;
1807 long offset = 0, limit = -1;
1808 int withscores = 0;
1809 unsigned long rangelen = 0;
1810 void *replylen = NULL;
1811 int minidx, maxidx;
1812
1813 /* Parse the range arguments. */
1814 if (reverse) {
1815 /* Range is given as [max,min] */
1816 maxidx = 2; minidx = 3;
1817 } else {
1818 /* Range is given as [min,max] */
1819 minidx = 2; maxidx = 3;
1820 }
1821
1822 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
1823 addReplyError(c,"min or max is not a float");
1824 return;
1825 }
1826
1827 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1828 * 4 arguments, so we'll never enter the following code path. */
1829 if (c->argc > 4) {
1830 int remaining = c->argc - 4;
1831 int pos = 4;
1832
1833 while (remaining) {
1834 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1835 pos++; remaining--;
1836 withscores = 1;
1837 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
1838 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != REDIS_OK) ||
1839 (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != REDIS_OK)) return;
1840 pos += 3; remaining -= 3;
1841 } else {
1842 addReply(c,shared.syntaxerr);
1843 return;
1844 }
1845 }
1846 }
1847
1848 /* Ok, lookup the key and get the range */
1849 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
1850 checkType(c,zobj,REDIS_ZSET)) return;
1851
1852 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1853 unsigned char *zl = zobj->ptr;
1854 unsigned char *eptr, *sptr;
1855 unsigned char *vstr;
1856 unsigned int vlen;
1857 long long vlong;
1858 double score;
1859
1860 /* If reversed, get the last node in range as starting point. */
1861 if (reverse) {
1862 eptr = zzlLastInRange(zl,range);
1863 } else {
1864 eptr = zzlFirstInRange(zl,range);
1865 }
1866
1867 /* No "first" element in the specified interval. */
1868 if (eptr == NULL) {
1869 addReply(c, shared.emptymultibulk);
1870 return;
1871 }
1872
1873 /* Get score pointer for the first element. */
1874 redisAssertWithInfo(c,zobj,eptr != NULL);
1875 sptr = ziplistNext(zl,eptr);
1876
1877 /* We don't know in advance how many matching elements there are in the
1878 * list, so we push this object that will represent the multi-bulk
1879 * length in the output buffer, and will "fix" it later */
1880 replylen = addDeferredMultiBulkLength(c);
1881
1882 /* If there is an offset, just traverse the number of elements without
1883 * checking the score because that is done in the next loop. */
1884 while (eptr && offset--) {
1885 if (reverse) {
1886 zzlPrev(zl,&eptr,&sptr);
1887 } else {
1888 zzlNext(zl,&eptr,&sptr);
1889 }
1890 }
1891
1892 while (eptr && limit--) {
1893 score = zzlGetScore(sptr);
1894
1895 /* Abort when the node is no longer in range. */
1896 if (reverse) {
1897 if (!zslValueGteMin(score,&range)) break;
1898 } else {
1899 if (!zslValueLteMax(score,&range)) break;
1900 }
1901
1902 /* We know the element exists, so ziplistGet should always succeed */
1903 redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
1904
1905 rangelen++;
1906 if (vstr == NULL) {
1907 addReplyBulkLongLong(c,vlong);
1908 } else {
1909 addReplyBulkCBuffer(c,vstr,vlen);
1910 }
1911
1912 if (withscores) {
1913 addReplyDouble(c,score);
1914 }
1915
1916 /* Move to next node */
1917 if (reverse) {
1918 zzlPrev(zl,&eptr,&sptr);
1919 } else {
1920 zzlNext(zl,&eptr,&sptr);
1921 }
1922 }
1923 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
1924 zset *zs = zobj->ptr;
1925 zskiplist *zsl = zs->zsl;
1926 zskiplistNode *ln;
1927
1928 /* If reversed, get the last node in range as starting point. */
1929 if (reverse) {
1930 ln = zslLastInRange(zsl,range);
1931 } else {
1932 ln = zslFirstInRange(zsl,range);
1933 }
1934
1935 /* No "first" element in the specified interval. */
1936 if (ln == NULL) {
1937 addReply(c, shared.emptymultibulk);
1938 return;
1939 }
1940
1941 /* We don't know in advance how many matching elements there are in the
1942 * list, so we push this object that will represent the multi-bulk
1943 * length in the output buffer, and will "fix" it later */
1944 replylen = addDeferredMultiBulkLength(c);
1945
1946 /* If there is an offset, just traverse the number of elements without
1947 * checking the score because that is done in the next loop. */
1948 while (ln && offset--) {
1949 if (reverse) {
1950 ln = ln->backward;
1951 } else {
1952 ln = ln->level[0].forward;
1953 }
1954 }
1955
1956 while (ln && limit--) {
1957 /* Abort when the node is no longer in range. */
1958 if (reverse) {
1959 if (!zslValueGteMin(ln->score,&range)) break;
1960 } else {
1961 if (!zslValueLteMax(ln->score,&range)) break;
1962 }
1963
1964 rangelen++;
1965 addReplyBulk(c,ln->obj);
1966
1967 if (withscores) {
1968 addReplyDouble(c,ln->score);
1969 }
1970
1971 /* Move to next node */
1972 if (reverse) {
1973 ln = ln->backward;
1974 } else {
1975 ln = ln->level[0].forward;
1976 }
1977 }
1978 } else {
1979 redisPanic("Unknown sorted set encoding");
1980 }
1981
1982 if (withscores) {
1983 rangelen *= 2;
1984 }
1985
1986 setDeferredMultiBulkLength(c, replylen, rangelen);
1987 }
1988
1989 void zrangebyscoreCommand(redisClient *c) {
1990 genericZrangebyscoreCommand(c,0);
1991 }
1992
1993 void zrevrangebyscoreCommand(redisClient *c) {
1994 genericZrangebyscoreCommand(c,1);
1995 }
1996
1997 void zcountCommand(redisClient *c) {
1998 robj *key = c->argv[1];
1999 robj *zobj;
2000 zrangespec range;
2001 int count = 0;
2002
2003 /* Parse the range arguments */
2004 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
2005 addReplyError(c,"min or max is not a float");
2006 return;
2007 }
2008
2009 /* Lookup the sorted set */
2010 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2011 checkType(c, zobj, REDIS_ZSET)) return;
2012
2013 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2014 unsigned char *zl = zobj->ptr;
2015 unsigned char *eptr, *sptr;
2016 double score;
2017
2018 /* Use the first element in range as the starting point */
2019 eptr = zzlFirstInRange(zl,range);
2020
2021 /* No "first" element */
2022 if (eptr == NULL) {
2023 addReply(c, shared.czero);
2024 return;
2025 }
2026
2027 /* First element is in range */
2028 sptr = ziplistNext(zl,eptr);
2029 score = zzlGetScore(sptr);
2030 redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
2031
2032 /* Iterate over elements in range */
2033 while (eptr) {
2034 score = zzlGetScore(sptr);
2035
2036 /* Abort when the node is no longer in range. */
2037 if (!zslValueLteMax(score,&range)) {
2038 break;
2039 } else {
2040 count++;
2041 zzlNext(zl,&eptr,&sptr);
2042 }
2043 }
2044 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2045 zset *zs = zobj->ptr;
2046 zskiplist *zsl = zs->zsl;
2047 zskiplistNode *zn;
2048 unsigned long rank;
2049
2050 /* Find first element in range */
2051 zn = zslFirstInRange(zsl, range);
2052
2053 /* Use rank of first element, if any, to determine preliminary count */
2054 if (zn != NULL) {
2055 rank = zslGetRank(zsl, zn->score, zn->obj);
2056 count = (zsl->length - (rank - 1));
2057
2058 /* Find last element in range */
2059 zn = zslLastInRange(zsl, range);
2060
2061 /* Use rank of last element, if any, to determine the actual count */
2062 if (zn != NULL) {
2063 rank = zslGetRank(zsl, zn->score, zn->obj);
2064 count -= (zsl->length - rank);
2065 }
2066 }
2067 } else {
2068 redisPanic("Unknown sorted set encoding");
2069 }
2070
2071 addReplyLongLong(c, count);
2072 }
2073
2074 void zcardCommand(redisClient *c) {
2075 robj *key = c->argv[1];
2076 robj *zobj;
2077
2078 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
2079 checkType(c,zobj,REDIS_ZSET)) return;
2080
2081 addReplyLongLong(c,zsetLength(zobj));
2082 }
2083
2084 void zscoreCommand(redisClient *c) {
2085 robj *key = c->argv[1];
2086 robj *zobj;
2087 double score;
2088
2089 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2090 checkType(c,zobj,REDIS_ZSET)) return;
2091
2092 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2093 if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
2094 addReplyDouble(c,score);
2095 else
2096 addReply(c,shared.nullbulk);
2097 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2098 zset *zs = zobj->ptr;
2099 dictEntry *de;
2100
2101 c->argv[2] = tryObjectEncoding(c->argv[2]);
2102 de = dictFind(zs->dict,c->argv[2]);
2103 if (de != NULL) {
2104 score = *(double*)dictGetVal(de);
2105 addReplyDouble(c,score);
2106 } else {
2107 addReply(c,shared.nullbulk);
2108 }
2109 } else {
2110 redisPanic("Unknown sorted set encoding");
2111 }
2112 }
2113
2114 void zrankGenericCommand(redisClient *c, int reverse) {
2115 robj *key = c->argv[1];
2116 robj *ele = c->argv[2];
2117 robj *zobj;
2118 unsigned long llen;
2119 unsigned long rank;
2120
2121 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2122 checkType(c,zobj,REDIS_ZSET)) return;
2123 llen = zsetLength(zobj);
2124
2125 redisAssertWithInfo(c,ele,ele->encoding == REDIS_ENCODING_RAW);
2126 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2127 unsigned char *zl = zobj->ptr;
2128 unsigned char *eptr, *sptr;
2129
2130 eptr = ziplistIndex(zl,0);
2131 redisAssertWithInfo(c,zobj,eptr != NULL);
2132 sptr = ziplistNext(zl,eptr);
2133 redisAssertWithInfo(c,zobj,sptr != NULL);
2134
2135 rank = 1;
2136 while(eptr != NULL) {
2137 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
2138 break;
2139 rank++;
2140 zzlNext(zl,&eptr,&sptr);
2141 }
2142
2143 if (eptr != NULL) {
2144 if (reverse)
2145 addReplyLongLong(c,llen-rank);
2146 else
2147 addReplyLongLong(c,rank-1);
2148 } else {
2149 addReply(c,shared.nullbulk);
2150 }
2151 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2152 zset *zs = zobj->ptr;
2153 zskiplist *zsl = zs->zsl;
2154 dictEntry *de;
2155 double score;
2156
2157 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
2158 de = dictFind(zs->dict,ele);
2159 if (de != NULL) {
2160 score = *(double*)dictGetVal(de);
2161 rank = zslGetRank(zsl,score,ele);
2162 redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
2163 if (reverse)
2164 addReplyLongLong(c,llen-rank);
2165 else
2166 addReplyLongLong(c,rank-1);
2167 } else {
2168 addReply(c,shared.nullbulk);
2169 }
2170 } else {
2171 redisPanic("Unknown sorted set encoding");
2172 }
2173 }
2174
2175 void zrankCommand(redisClient *c) {
2176 zrankGenericCommand(c, 0);
2177 }
2178
2179 void zrevrankCommand(redisClient *c) {
2180 zrankGenericCommand(c, 1);
2181 }