]> git.saurik.com Git - redis.git/blame - src/t_zset.c
When closing the MDB DBI, do it in a transaction.
[redis.git] / src / t_zset.c
CommitLineData
d288ee65 1/*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
e2641e09 30
31/*-----------------------------------------------------------------------------
32 * Sorted set API
33 *----------------------------------------------------------------------------*/
34
35/* ZSETs are ordered sets using two data structures to hold the same elements
36 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
37 * data structure.
38 *
39 * The elements are added to an hash table mapping Redis objects to scores.
40 * At the same time the elements are added to a skip list mapping scores
41 * to Redis objects (so objects are sorted by scores in this "view"). */
42
43/* This skiplist implementation is almost a C translation of the original
44 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
45 * Alternative to Balanced Trees", modified in three ways:
d8bd12f9 46 * a) this implementation allows for repeated scores.
e2641e09 47 * b) the comparison is not just by key (our 'score') but by satellite data.
48 * c) there is a back pointer, so it's a doubly linked list with the back
49 * pointers being only at "level 1". This allows to traverse the list
50 * from tail to head, useful for ZREVRANGE. */
51
d288ee65 52#include "redis.h"
53#include <math.h>
54
e2641e09 55zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
2159782b 56 zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
e2641e09 57 zn->score = score;
58 zn->obj = obj;
59 return zn;
60}
61
62zskiplist *zslCreate(void) {
63 int j;
64 zskiplist *zsl;
65
66 zsl = zmalloc(sizeof(*zsl));
67 zsl->level = 1;
68 zsl->length = 0;
69 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
70 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
2159782b
PN
71 zsl->header->level[j].forward = NULL;
72 zsl->header->level[j].span = 0;
e2641e09 73 }
74 zsl->header->backward = NULL;
75 zsl->tail = NULL;
76 return zsl;
77}
78
79void zslFreeNode(zskiplistNode *node) {
80 decrRefCount(node->obj);
e2641e09 81 zfree(node);
82}
83
84void zslFree(zskiplist *zsl) {
2159782b 85 zskiplistNode *node = zsl->header->level[0].forward, *next;
e2641e09 86
e2641e09 87 zfree(zsl->header);
88 while(node) {
2159782b 89 next = node->level[0].forward;
e2641e09 90 zslFreeNode(node);
91 node = next;
92 }
93 zfree(zsl);
94}
95
7faa1f07 96/* Returns a random level for the new skiplist node we are going to create.
97 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
98 * (both inclusive), with a powerlaw-alike distribution where higher
99 * levels are less likely to be returned. */
e2641e09 100int zslRandomLevel(void) {
101 int level = 1;
102 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
103 level += 1;
104 return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
105}
106
69ef89f2 107zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
e2641e09 108 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
109 unsigned int rank[ZSKIPLIST_MAXLEVEL];
110 int i, level;
111
a244a13b 112 redisAssert(!isnan(score));
e2641e09 113 x = zsl->header;
114 for (i = zsl->level-1; i >= 0; i--) {
115 /* store rank that is crossed to reach the insert position */
116 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
2159782b
PN
117 while (x->level[i].forward &&
118 (x->level[i].forward->score < score ||
119 (x->level[i].forward->score == score &&
120 compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
121 rank[i] += x->level[i].span;
122 x = x->level[i].forward;
e2641e09 123 }
124 update[i] = x;
125 }
126 /* we assume the key is not already inside, since we allow duplicated
127 * scores, and the re-insertion of score and redis object should never
128 * happpen since the caller of zslInsert() should test in the hash table
129 * if the element is already inside or not. */
130 level = zslRandomLevel();
131 if (level > zsl->level) {
132 for (i = zsl->level; i < level; i++) {
133 rank[i] = 0;
134 update[i] = zsl->header;
2159782b 135 update[i]->level[i].span = zsl->length;
e2641e09 136 }
137 zsl->level = level;
138 }
139 x = zslCreateNode(level,score,obj);
140 for (i = 0; i < level; i++) {
2159782b
PN
141 x->level[i].forward = update[i]->level[i].forward;
142 update[i]->level[i].forward = x;
e2641e09 143
144 /* update span covered by update[i] as x is inserted here */
2159782b
PN
145 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
146 update[i]->level[i].span = (rank[0] - rank[i]) + 1;
e2641e09 147 }
148
149 /* increment span for untouched levels */
150 for (i = level; i < zsl->level; i++) {
2159782b 151 update[i]->level[i].span++;
e2641e09 152 }
153
154 x->backward = (update[0] == zsl->header) ? NULL : update[0];
2159782b
PN
155 if (x->level[0].forward)
156 x->level[0].forward->backward = x;
e2641e09 157 else
158 zsl->tail = x;
159 zsl->length++;
69ef89f2 160 return x;
e2641e09 161}
162
163/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
164void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
165 int i;
166 for (i = 0; i < zsl->level; i++) {
2159782b
PN
167 if (update[i]->level[i].forward == x) {
168 update[i]->level[i].span += x->level[i].span - 1;
169 update[i]->level[i].forward = x->level[i].forward;
e2641e09 170 } else {
2159782b 171 update[i]->level[i].span -= 1;
e2641e09 172 }
173 }
2159782b
PN
174 if (x->level[0].forward) {
175 x->level[0].forward->backward = x->backward;
e2641e09 176 } else {
177 zsl->tail = x->backward;
178 }
2159782b 179 while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
e2641e09 180 zsl->level--;
181 zsl->length--;
182}
183
184/* Delete an element with matching score/object from the skiplist. */
185int zslDelete(zskiplist *zsl, double score, robj *obj) {
186 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
187 int i;
188
189 x = zsl->header;
190 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
191 while (x->level[i].forward &&
192 (x->level[i].forward->score < score ||
193 (x->level[i].forward->score == score &&
194 compareStringObjects(x->level[i].forward->obj,obj) < 0)))
195 x = x->level[i].forward;
e2641e09 196 update[i] = x;
197 }
198 /* We may have multiple elements with the same score, what we need
199 * is to find the element with both the right score and object. */
2159782b 200 x = x->level[0].forward;
e2641e09 201 if (x && score == x->score && equalStringObjects(x->obj,obj)) {
202 zslDeleteNode(zsl, x, update);
203 zslFreeNode(x);
204 return 1;
205 } else {
206 return 0; /* not found */
207 }
208 return 0; /* not found */
209}
210
45290ad9 211static int zslValueGteMin(double value, zrangespec *spec) {
22b9bf15
PN
212 return spec->minex ? (value > spec->min) : (value >= spec->min);
213}
214
45290ad9 215static int zslValueLteMax(double value, zrangespec *spec) {
22b9bf15
PN
216 return spec->maxex ? (value < spec->max) : (value <= spec->max);
217}
218
22b9bf15
PN
219/* Returns if there is a part of the zset is in range. */
220int zslIsInRange(zskiplist *zsl, zrangespec *range) {
221 zskiplistNode *x;
222
8e1b3277
PN
223 /* Test for ranges that will always be empty. */
224 if (range->min > range->max ||
225 (range->min == range->max && (range->minex || range->maxex)))
226 return 0;
22b9bf15 227 x = zsl->tail;
45290ad9 228 if (x == NULL || !zslValueGteMin(x->score,range))
22b9bf15
PN
229 return 0;
230 x = zsl->header->level[0].forward;
45290ad9 231 if (x == NULL || !zslValueLteMax(x->score,range))
22b9bf15
PN
232 return 0;
233 return 1;
234}
235
236/* Find the first node that is contained in the specified range.
237 * Returns NULL when no element is contained in the range. */
238zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) {
239 zskiplistNode *x;
240 int i;
241
242 /* If everything is out of range, return early. */
243 if (!zslIsInRange(zsl,&range)) return NULL;
244
245 x = zsl->header;
246 for (i = zsl->level-1; i >= 0; i--) {
247 /* Go forward while *OUT* of range. */
248 while (x->level[i].forward &&
45290ad9 249 !zslValueGteMin(x->level[i].forward->score,&range))
22b9bf15
PN
250 x = x->level[i].forward;
251 }
252
e53ca04b 253 /* This is an inner range, so the next node cannot be NULL. */
22b9bf15 254 x = x->level[0].forward;
e53ca04b
PN
255 redisAssert(x != NULL);
256
257 /* Check if score <= max. */
258 if (!zslValueLteMax(x->score,&range)) return NULL;
22b9bf15
PN
259 return x;
260}
261
262/* Find the last node that is contained in the specified range.
263 * Returns NULL when no element is contained in the range. */
264zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) {
265 zskiplistNode *x;
266 int i;
267
268 /* If everything is out of range, return early. */
269 if (!zslIsInRange(zsl,&range)) return NULL;
270
271 x = zsl->header;
272 for (i = zsl->level-1; i >= 0; i--) {
273 /* Go forward while *IN* range. */
274 while (x->level[i].forward &&
45290ad9 275 zslValueLteMax(x->level[i].forward->score,&range))
22b9bf15
PN
276 x = x->level[i].forward;
277 }
278
e53ca04b
PN
279 /* This is an inner range, so this node cannot be NULL. */
280 redisAssert(x != NULL);
281
282 /* Check if score >= min. */
283 if (!zslValueGteMin(x->score,&range)) return NULL;
22b9bf15
PN
284 return x;
285}
286
e2641e09 287/* Delete all the elements with score between min and max from the skiplist.
288 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
289 * Note that this function takes the reference to the hash table view of the
290 * sorted set, in order to remove the elements from the hash table too. */
91504b6c 291unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) {
e2641e09 292 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
293 unsigned long removed = 0;
294 int i;
295
296 x = zsl->header;
297 for (i = zsl->level-1; i >= 0; i--) {
91504b6c
PN
298 while (x->level[i].forward && (range.minex ?
299 x->level[i].forward->score <= range.min :
300 x->level[i].forward->score < range.min))
301 x = x->level[i].forward;
e2641e09 302 update[i] = x;
303 }
91504b6c
PN
304
305 /* Current node is the last with score < or <= min. */
2159782b 306 x = x->level[0].forward;
91504b6c
PN
307
308 /* Delete nodes while in range. */
309 while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) {
2159782b 310 zskiplistNode *next = x->level[0].forward;
69ef89f2 311 zslDeleteNode(zsl,x,update);
e2641e09 312 dictDelete(dict,x->obj);
313 zslFreeNode(x);
314 removed++;
315 x = next;
316 }
91504b6c 317 return removed;
e2641e09 318}
319
320/* Delete all the elements with rank between start and end from the skiplist.
321 * Start and end are inclusive. Note that start and end need to be 1-based */
322unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
323 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
324 unsigned long traversed = 0, removed = 0;
325 int i;
326
327 x = zsl->header;
328 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
329 while (x->level[i].forward && (traversed + x->level[i].span) < start) {
330 traversed += x->level[i].span;
331 x = x->level[i].forward;
e2641e09 332 }
333 update[i] = x;
334 }
335
336 traversed++;
2159782b 337 x = x->level[0].forward;
e2641e09 338 while (x && traversed <= end) {
2159782b 339 zskiplistNode *next = x->level[0].forward;
69ef89f2 340 zslDeleteNode(zsl,x,update);
e2641e09 341 dictDelete(dict,x->obj);
342 zslFreeNode(x);
343 removed++;
344 traversed++;
345 x = next;
346 }
347 return removed;
348}
349
e2641e09 350/* Find the rank for an element by both score and key.
351 * Returns 0 when the element cannot be found, rank otherwise.
352 * Note that the rank is 1-based due to the span of zsl->header to the
353 * first element. */
a3004773 354unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) {
e2641e09 355 zskiplistNode *x;
356 unsigned long rank = 0;
357 int i;
358
359 x = zsl->header;
360 for (i = zsl->level-1; i >= 0; i--) {
2159782b
PN
361 while (x->level[i].forward &&
362 (x->level[i].forward->score < score ||
363 (x->level[i].forward->score == score &&
364 compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
365 rank += x->level[i].span;
366 x = x->level[i].forward;
e2641e09 367 }
368
369 /* x might be equal to zsl->header, so test if obj is non-NULL */
370 if (x->obj && equalStringObjects(x->obj,o)) {
371 return rank;
372 }
373 }
374 return 0;
375}
376
377/* Finds an element by its rank. The rank argument needs to be 1-based. */
a3004773 378zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
e2641e09 379 zskiplistNode *x;
380 unsigned long traversed = 0;
381 int i;
382
383 x = zsl->header;
384 for (i = zsl->level-1; i >= 0; i--) {
2159782b 385 while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
e2641e09 386 {
2159782b
PN
387 traversed += x->level[i].span;
388 x = x->level[i].forward;
e2641e09 389 }
390 if (traversed == rank) {
391 return x;
392 }
393 }
394 return NULL;
395}
396
25bb8a44 397/* Populate the rangespec according to the objects min and max. */
7236fdb2
PN
398static int zslParseRange(robj *min, robj *max, zrangespec *spec) {
399 char *eptr;
25bb8a44
PN
400 spec->minex = spec->maxex = 0;
401
402 /* Parse the min-max interval. If one of the values is prefixed
403 * by the "(" character, it's considered "open". For instance
404 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
405 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
406 if (min->encoding == REDIS_ENCODING_INT) {
407 spec->min = (long)min->ptr;
408 } else {
409 if (((char*)min->ptr)[0] == '(') {
7236fdb2
PN
410 spec->min = strtod((char*)min->ptr+1,&eptr);
411 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
412 spec->minex = 1;
413 } else {
7236fdb2
PN
414 spec->min = strtod((char*)min->ptr,&eptr);
415 if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR;
25bb8a44
PN
416 }
417 }
418 if (max->encoding == REDIS_ENCODING_INT) {
419 spec->max = (long)max->ptr;
420 } else {
421 if (((char*)max->ptr)[0] == '(') {
7236fdb2
PN
422 spec->max = strtod((char*)max->ptr+1,&eptr);
423 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
424 spec->maxex = 1;
425 } else {
7236fdb2
PN
426 spec->max = strtod((char*)max->ptr,&eptr);
427 if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR;
25bb8a44
PN
428 }
429 }
430
431 return REDIS_OK;
432}
433
21c5b508
PN
434/*-----------------------------------------------------------------------------
435 * Ziplist-backed sorted set API
436 *----------------------------------------------------------------------------*/
437
438double zzlGetScore(unsigned char *sptr) {
439 unsigned char *vstr;
440 unsigned int vlen;
441 long long vlong;
442 char buf[128];
443 double score;
444
445 redisAssert(sptr != NULL);
446 redisAssert(ziplistGet(sptr,&vstr,&vlen,&vlong));
447
448 if (vstr) {
449 memcpy(buf,vstr,vlen);
450 buf[vlen] = '\0';
451 score = strtod(buf,NULL);
452 } else {
453 score = vlong;
454 }
455
456 return score;
457}
458
459/* Compare element in sorted set with given element. */
460int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
461 unsigned char *vstr;
462 unsigned int vlen;
463 long long vlong;
464 unsigned char vbuf[32];
465 int minlen, cmp;
466
467 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
468 if (vstr == NULL) {
469 /* Store string representation of long long in buf. */
470 vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
471 vstr = vbuf;
472 }
473
474 minlen = (vlen < clen) ? vlen : clen;
475 cmp = memcmp(vstr,cstr,minlen);
476 if (cmp == 0) return vlen-clen;
477 return cmp;
478}
479
bbfe232f 480unsigned int zzlLength(unsigned char *zl) {
0b10e104
PN
481 return ziplistLen(zl)/2;
482}
483
4c5f0966
PN
484/* Move to next entry based on the values in eptr and sptr. Both are set to
485 * NULL when there is no next entry. */
486void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
487 unsigned char *_eptr, *_sptr;
488 redisAssert(*eptr != NULL && *sptr != NULL);
489
490 _eptr = ziplistNext(zl,*sptr);
491 if (_eptr != NULL) {
492 _sptr = ziplistNext(zl,_eptr);
493 redisAssert(_sptr != NULL);
494 } else {
495 /* No next entry. */
496 _sptr = NULL;
497 }
498
499 *eptr = _eptr;
500 *sptr = _sptr;
501}
502
503/* Move to the previous entry based on the values in eptr and sptr. Both are
504 * set to NULL when there is no next entry. */
505void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
506 unsigned char *_eptr, *_sptr;
507 redisAssert(*eptr != NULL && *sptr != NULL);
508
509 _sptr = ziplistPrev(zl,*eptr);
510 if (_sptr != NULL) {
511 _eptr = ziplistPrev(zl,_sptr);
512 redisAssert(_eptr != NULL);
513 } else {
514 /* No previous entry. */
515 _eptr = NULL;
516 }
517
518 *eptr = _eptr;
519 *sptr = _sptr;
520}
521
4a14dbba
PN
522/* Returns if there is a part of the zset is in range. Should only be used
523 * internally by zzlFirstInRange and zzlLastInRange. */
524int zzlIsInRange(unsigned char *zl, zrangespec *range) {
525 unsigned char *p;
526 double score;
527
528 /* Test for ranges that will always be empty. */
529 if (range->min > range->max ||
530 (range->min == range->max && (range->minex || range->maxex)))
531 return 0;
532
533 p = ziplistIndex(zl,-1); /* Last score. */
feb28288 534 if (p == NULL) return 0; /* Empty sorted set */
4a14dbba
PN
535 score = zzlGetScore(p);
536 if (!zslValueGteMin(score,range))
537 return 0;
538
539 p = ziplistIndex(zl,1); /* First score. */
540 redisAssert(p != NULL);
541 score = zzlGetScore(p);
542 if (!zslValueLteMax(score,range))
543 return 0;
544
545 return 1;
546}
547
548/* Find pointer to the first element contained in the specified range.
549 * Returns NULL when no element is contained in the range. */
8588bfa3 550unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) {
4a14dbba
PN
551 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
552 double score;
553
554 /* If everything is out of range, return early. */
555 if (!zzlIsInRange(zl,&range)) return NULL;
556
557 while (eptr != NULL) {
558 sptr = ziplistNext(zl,eptr);
559 redisAssert(sptr != NULL);
560
561 score = zzlGetScore(sptr);
e53ca04b
PN
562 if (zslValueGteMin(score,&range)) {
563 /* Check if score <= max. */
564 if (zslValueLteMax(score,&range))
565 return eptr;
566 return NULL;
567 }
4a14dbba
PN
568
569 /* Move to next element. */
570 eptr = ziplistNext(zl,sptr);
571 }
572
573 return NULL;
574}
575
576/* Find pointer to the last element contained in the specified range.
577 * Returns NULL when no element is contained in the range. */
8588bfa3 578unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) {
4a14dbba
PN
579 unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
580 double score;
581
582 /* If everything is out of range, return early. */
583 if (!zzlIsInRange(zl,&range)) return NULL;
584
585 while (eptr != NULL) {
586 sptr = ziplistNext(zl,eptr);
587 redisAssert(sptr != NULL);
588
589 score = zzlGetScore(sptr);
e53ca04b
PN
590 if (zslValueLteMax(score,&range)) {
591 /* Check if score >= min. */
592 if (zslValueGteMin(score,&range))
593 return eptr;
594 return NULL;
595 }
4a14dbba
PN
596
597 /* Move to previous element by moving to the score of previous element.
598 * When this returns NULL, we know there also is no element. */
599 sptr = ziplistPrev(zl,eptr);
600 if (sptr != NULL)
601 redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
602 else
603 eptr = NULL;
604 }
605
606 return NULL;
607}
608
8588bfa3 609unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) {
21c5b508
PN
610 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
611
612 ele = getDecodedObject(ele);
613 while (eptr != NULL) {
614 sptr = ziplistNext(zl,eptr);
eab0e26e 615 redisAssertWithInfo(NULL,ele,sptr != NULL);
21c5b508
PN
616
617 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
618 /* Matching element, pull out score. */
0b10e104 619 if (score != NULL) *score = zzlGetScore(sptr);
21c5b508
PN
620 decrRefCount(ele);
621 return eptr;
622 }
623
624 /* Move to next element. */
625 eptr = ziplistNext(zl,sptr);
626 }
627
628 decrRefCount(ele);
629 return NULL;
630}
631
632/* Delete (element,score) pair from ziplist. Use local copy of eptr because we
633 * don't want to modify the one given as argument. */
8588bfa3 634unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
21c5b508
PN
635 unsigned char *p = eptr;
636
637 /* TODO: add function to ziplist API to delete N elements from offset. */
638 zl = ziplistDelete(zl,&p);
639 zl = ziplistDelete(zl,&p);
8588bfa3 640 return zl;
21c5b508
PN
641}
642
8588bfa3 643unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) {
21c5b508
PN
644 unsigned char *sptr;
645 char scorebuf[128];
646 int scorelen;
69298a05 647 size_t offset;
21c5b508 648
eab0e26e 649 redisAssertWithInfo(NULL,ele,ele->encoding == REDIS_ENCODING_RAW);
21c5b508
PN
650 scorelen = d2string(scorebuf,sizeof(scorebuf),score);
651 if (eptr == NULL) {
652 zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL);
653 zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL);
654 } else {
655 /* Keep offset relative to zl, as it might be re-allocated. */
656 offset = eptr-zl;
657 zl = ziplistInsert(zl,eptr,ele->ptr,sdslen(ele->ptr));
658 eptr = zl+offset;
659
660 /* Insert score after the element. */
eab0e26e 661 redisAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL);
21c5b508
PN
662 zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen);
663 }
664
8588bfa3 665 return zl;
21c5b508
PN
666}
667
668/* Insert (element,score) pair in ziplist. This function assumes the element is
669 * not yet present in the list. */
8588bfa3 670unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) {
21c5b508
PN
671 unsigned char *eptr = ziplistIndex(zl,0), *sptr;
672 double s;
21c5b508
PN
673
674 ele = getDecodedObject(ele);
675 while (eptr != NULL) {
676 sptr = ziplistNext(zl,eptr);
eab0e26e 677 redisAssertWithInfo(NULL,ele,sptr != NULL);
21c5b508
PN
678 s = zzlGetScore(sptr);
679
680 if (s > score) {
681 /* First element with score larger than score for element to be
682 * inserted. This means we should take its spot in the list to
683 * maintain ordering. */
8588bfa3 684 zl = zzlInsertAt(zl,eptr,ele,score);
21c5b508 685 break;
8218db3d
PN
686 } else if (s == score) {
687 /* Ensure lexicographical ordering for elements. */
d1c920c5 688 if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
8588bfa3 689 zl = zzlInsertAt(zl,eptr,ele,score);
8218db3d
PN
690 break;
691 }
21c5b508
PN
692 }
693
694 /* Move to next element. */
695 eptr = ziplistNext(zl,sptr);
696 }
697
698 /* Push on tail of list when it was not yet inserted. */
8218db3d 699 if (eptr == NULL)
8588bfa3 700 zl = zzlInsertAt(zl,NULL,ele,score);
21c5b508
PN
701
702 decrRefCount(ele);
8588bfa3 703 return zl;
21c5b508 704}
25bb8a44 705
8588bfa3 706unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) {
4a14dbba
PN
707 unsigned char *eptr, *sptr;
708 double score;
8588bfa3 709 unsigned long num = 0;
4a14dbba 710
8588bfa3 711 if (deleted != NULL) *deleted = 0;
4a14dbba 712
8588bfa3
PN
713 eptr = zzlFirstInRange(zl,range);
714 if (eptr == NULL) return zl;
4a14dbba
PN
715
716 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
717 * byte and ziplistNext will return NULL. */
718 while ((sptr = ziplistNext(zl,eptr)) != NULL) {
719 score = zzlGetScore(sptr);
720 if (zslValueLteMax(score,&range)) {
721 /* Delete both the element and the score. */
722 zl = ziplistDelete(zl,&eptr);
723 zl = ziplistDelete(zl,&eptr);
8588bfa3 724 num++;
4a14dbba
PN
725 } else {
726 /* No longer in range. */
727 break;
728 }
729 }
730
8588bfa3
PN
731 if (deleted != NULL) *deleted = num;
732 return zl;
4a14dbba
PN
733}
734
63b7b7fb
PN
735/* Delete all the elements with rank between start and end from the skiplist.
736 * Start and end are inclusive. Note that start and end need to be 1-based */
8588bfa3 737unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) {
63b7b7fb 738 unsigned int num = (end-start)+1;
8588bfa3
PN
739 if (deleted) *deleted = num;
740 zl = ziplistDeleteRange(zl,2*(start-1),2*num);
741 return zl;
63b7b7fb
PN
742}
743
5d1b4fb6
PN
744/*-----------------------------------------------------------------------------
745 * Common sorted set API
746 *----------------------------------------------------------------------------*/
747
df26a0ae 748unsigned int zsetLength(robj *zobj) {
5d1b4fb6
PN
749 int length = -1;
750 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
bbfe232f 751 length = zzlLength(zobj->ptr);
100ed062 752 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
5d1b4fb6
PN
753 length = ((zset*)zobj->ptr)->zsl->length;
754 } else {
755 redisPanic("Unknown sorted set encoding");
756 }
757 return length;
758}
759
df26a0ae 760void zsetConvert(robj *zobj, int encoding) {
a669d5e9
PN
761 zset *zs;
762 zskiplistNode *node, *next;
763 robj *ele;
764 double score;
765
766 if (zobj->encoding == encoding) return;
767 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
768 unsigned char *zl = zobj->ptr;
769 unsigned char *eptr, *sptr;
770 unsigned char *vstr;
771 unsigned int vlen;
772 long long vlong;
773
100ed062 774 if (encoding != REDIS_ENCODING_SKIPLIST)
a669d5e9
PN
775 redisPanic("Unknown target encoding");
776
777 zs = zmalloc(sizeof(*zs));
778 zs->dict = dictCreate(&zsetDictType,NULL);
779 zs->zsl = zslCreate();
780
781 eptr = ziplistIndex(zl,0);
eab0e26e 782 redisAssertWithInfo(NULL,zobj,eptr != NULL);
a669d5e9 783 sptr = ziplistNext(zl,eptr);
eab0e26e 784 redisAssertWithInfo(NULL,zobj,sptr != NULL);
a669d5e9
PN
785
786 while (eptr != NULL) {
787 score = zzlGetScore(sptr);
eab0e26e 788 redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
a669d5e9
PN
789 if (vstr == NULL)
790 ele = createStringObjectFromLongLong(vlong);
791 else
792 ele = createStringObject((char*)vstr,vlen);
793
794 /* Has incremented refcount since it was just created. */
795 node = zslInsert(zs->zsl,score,ele);
eab0e26e 796 redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
a669d5e9
PN
797 incrRefCount(ele); /* Added to dictionary. */
798 zzlNext(zl,&eptr,&sptr);
799 }
800
801 zfree(zobj->ptr);
802 zobj->ptr = zs;
100ed062
PN
803 zobj->encoding = REDIS_ENCODING_SKIPLIST;
804 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
a669d5e9
PN
805 unsigned char *zl = ziplistNew();
806
807 if (encoding != REDIS_ENCODING_ZIPLIST)
808 redisPanic("Unknown target encoding");
809
810 /* Approach similar to zslFree(), since we want to free the skiplist at
811 * the same time as creating the ziplist. */
812 zs = zobj->ptr;
813 dictRelease(zs->dict);
814 node = zs->zsl->header->level[0].forward;
815 zfree(zs->zsl->header);
816 zfree(zs->zsl);
817
a669d5e9
PN
818 while (node) {
819 ele = getDecodedObject(node->obj);
8588bfa3 820 zl = zzlInsertAt(zl,NULL,ele,node->score);
a669d5e9
PN
821 decrRefCount(ele);
822
823 next = node->level[0].forward;
824 zslFreeNode(node);
825 node = next;
826 }
827
828 zfree(zs);
8588bfa3 829 zobj->ptr = zl;
a669d5e9
PN
830 zobj->encoding = REDIS_ENCODING_ZIPLIST;
831 } else {
832 redisPanic("Unknown sorted set encoding");
833 }
834}
835
e2641e09 836/*-----------------------------------------------------------------------------
837 * Sorted set commands
838 *----------------------------------------------------------------------------*/
839
69ef89f2 840/* This generic command implements both ZADD and ZINCRBY. */
3ca7532a 841void zaddGenericCommand(redisClient *c, int incr) {
21c5b508 842 static char *nanerr = "resulting score is not a number (NaN)";
3ca7532a
PN
843 robj *key = c->argv[1];
844 robj *ele;
21c5b508
PN
845 robj *zobj;
846 robj *curobj;
ef231a7c 847 double score = 0, *scores, curscore = 0.0;
848 int j, elements = (c->argc-2)/2;
849 int added = 0;
3ca7532a 850
ef231a7c 851 if (c->argc % 2) {
852 addReply(c,shared.syntaxerr);
3ca7532a 853 return;
ef231a7c 854 }
855
856 /* Start parsing all the scores, we need to emit any syntax error
857 * before executing additions to the sorted set, as the command should
858 * either execute fully or nothing at all. */
859 scores = zmalloc(sizeof(double)*elements);
860 for (j = 0; j < elements; j++) {
861 if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
862 != REDIS_OK)
863 {
864 zfree(scores);
865 return;
866 }
867 }
21c5b508 868
ef231a7c 869 /* Lookup the key and create the sorted set if does not exist. */
21c5b508
PN
870 zobj = lookupKeyWrite(c->db,key);
871 if (zobj == NULL) {
a669d5e9
PN
872 if (server.zset_max_ziplist_entries == 0 ||
873 server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
874 {
875 zobj = createZsetObject();
876 } else {
877 zobj = createZsetZiplistObject();
878 }
21c5b508 879 dbAdd(c->db,key,zobj);
e2641e09 880 } else {
21c5b508 881 if (zobj->type != REDIS_ZSET) {
e2641e09 882 addReply(c,shared.wrongtypeerr);
ef231a7c 883 zfree(scores);
e2641e09 884 return;
885 }
886 }
e2641e09 887
ef231a7c 888 for (j = 0; j < elements; j++) {
889 score = scores[j];
890
891 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
892 unsigned char *eptr;
893
894 /* Prefer non-encoded element when dealing with ziplists. */
895 ele = c->argv[3+j*2];
896 if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
897 if (incr) {
898 score += curscore;
899 if (isnan(score)) {
900 addReplyError(c,nanerr);
901 /* Don't need to check if the sorted set is empty
902 * because we know it has at least one element. */
903 zfree(scores);
904 return;
905 }
21c5b508 906 }
21c5b508 907
ef231a7c 908 /* Remove and re-insert when score changed. */
909 if (score != curscore) {
910 zobj->ptr = zzlDelete(zobj->ptr,eptr);
911 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
912
913 signalModifiedKey(c->db,key);
914 server.dirty++;
915 }
916 } else {
917 /* Optimize: check if the element is too large or the list
918 * becomes too long *before* executing zzlInsert. */
8588bfa3 919 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
ef231a7c 920 if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
921 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
922 if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
923 zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
21c5b508
PN
924
925 signalModifiedKey(c->db,key);
926 server.dirty++;
ef231a7c 927 if (!incr) added++;
21c5b508 928 }
ef231a7c 929 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
930 zset *zs = zobj->ptr;
931 zskiplistNode *znode;
932 dictEntry *de;
933
934 ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
935 de = dictFind(zs->dict,ele);
936 if (de != NULL) {
c0ba9ebe 937 curobj = dictGetKey(de);
938 curscore = *(double*)dictGetVal(de);
ef231a7c 939
940 if (incr) {
941 score += curscore;
942 if (isnan(score)) {
943 addReplyError(c,nanerr);
944 /* Don't need to check if the sorted set is empty
945 * because we know it has at least one element. */
946 zfree(scores);
947 return;
948 }
21c5b508 949 }
21c5b508 950
ef231a7c 951 /* Remove and re-insert when score changed. We can safely
952 * delete the key object from the skiplist, since the
953 * dictionary still has a reference to it. */
954 if (score != curscore) {
eab0e26e 955 redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj));
ef231a7c 956 znode = zslInsert(zs->zsl,score,curobj);
957 incrRefCount(curobj); /* Re-inserted in skiplist. */
c0ba9ebe 958 dictGetVal(de) = &znode->score; /* Update score ptr. */
ef231a7c 959
960 signalModifiedKey(c->db,key);
961 server.dirty++;
962 }
963 } else {
964 znode = zslInsert(zs->zsl,score,ele);
965 incrRefCount(ele); /* Inserted in skiplist. */
f013f400 966 redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
ef231a7c 967 incrRefCount(ele); /* Added to dictionary. */
21c5b508
PN
968
969 signalModifiedKey(c->db,key);
970 server.dirty++;
ef231a7c 971 if (!incr) added++;
21c5b508 972 }
21c5b508 973 } else {
ef231a7c 974 redisPanic("Unknown sorted set encoding");
e2641e09 975 }
e2641e09 976 }
ef231a7c 977 zfree(scores);
978 if (incr) /* ZINCRBY */
979 addReplyDouble(c,score);
980 else /* ZADD */
981 addReplyLongLong(c,added);
e2641e09 982}
983
984void zaddCommand(redisClient *c) {
3ca7532a 985 zaddGenericCommand(c,0);
e2641e09 986}
987
988void zincrbyCommand(redisClient *c) {
3ca7532a 989 zaddGenericCommand(c,1);
e2641e09 990}
991
992void zremCommand(redisClient *c) {
0b10e104 993 robj *key = c->argv[1];
0b10e104 994 robj *zobj;
3f7b2b1f 995 int deleted = 0, j;
0b10e104
PN
996
997 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
998 checkType(c,zobj,REDIS_ZSET)) return;
999
1000 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1001 unsigned char *eptr;
1002
3f7b2b1f 1003 for (j = 2; j < c->argc; j++) {
1004 if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
1005 deleted++;
1006 zobj->ptr = zzlDelete(zobj->ptr,eptr);
1007 if (zzlLength(zobj->ptr) == 0) {
1008 dbDelete(c->db,key);
1009 break;
1010 }
1011 }
0b10e104 1012 }
100ed062 1013 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
0b10e104
PN
1014 zset *zs = zobj->ptr;
1015 dictEntry *de;
1016 double score;
1017
3f7b2b1f 1018 for (j = 2; j < c->argc; j++) {
1019 de = dictFind(zs->dict,c->argv[j]);
1020 if (de != NULL) {
1021 deleted++;
1022
1023 /* Delete from the skiplist */
c0ba9ebe 1024 score = *(double*)dictGetVal(de);
eab0e26e 1025 redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j]));
3f7b2b1f 1026
1027 /* Delete from the hash table */
1028 dictDelete(zs->dict,c->argv[j]);
1029 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1030 if (dictSize(zs->dict) == 0) {
1031 dbDelete(c->db,key);
1032 break;
1033 }
1034 }
0b10e104
PN
1035 }
1036 } else {
1037 redisPanic("Unknown sorted set encoding");
e2641e09 1038 }
e2641e09 1039
3f7b2b1f 1040 if (deleted) {
1041 signalModifiedKey(c->db,key);
1042 server.dirty += deleted;
1043 }
1044 addReplyLongLong(c,deleted);
e2641e09 1045}
1046
1047void zremrangebyscoreCommand(redisClient *c) {
4a14dbba
PN
1048 robj *key = c->argv[1];
1049 robj *zobj;
91504b6c 1050 zrangespec range;
4a14dbba 1051 unsigned long deleted;
e2641e09 1052
91504b6c 1053 /* Parse the range arguments. */
7236fdb2 1054 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
d93f9a86 1055 addReplyError(c,"min or max is not a float");
7236fdb2
PN
1056 return;
1057 }
e2641e09 1058
4a14dbba
PN
1059 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1060 checkType(c,zobj,REDIS_ZSET)) return;
1061
1062 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
8588bfa3 1063 zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
48991620 1064 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
100ed062 1065 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
4a14dbba
PN
1066 zset *zs = zobj->ptr;
1067 deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
1068 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1069 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1070 } else {
1071 redisPanic("Unknown sorted set encoding");
1072 }
e2641e09 1073
4a14dbba 1074 if (deleted) signalModifiedKey(c->db,key);
e2641e09 1075 server.dirty += deleted;
1076 addReplyLongLong(c,deleted);
1077}
1078
1079void zremrangebyrankCommand(redisClient *c) {
63b7b7fb
PN
1080 robj *key = c->argv[1];
1081 robj *zobj;
e2641e09 1082 long start;
1083 long end;
1084 int llen;
63b7b7fb 1085 unsigned long deleted;
e2641e09 1086
1087 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1088 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1089
63b7b7fb
PN
1090 if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
1091 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1092
63b7b7fb 1093 /* Sanitize indexes. */
df26a0ae 1094 llen = zsetLength(zobj);
e2641e09 1095 if (start < 0) start = llen+start;
1096 if (end < 0) end = llen+end;
1097 if (start < 0) start = 0;
e2641e09 1098
d0a4e24e
PN
1099 /* Invariant: start >= 0, so this test will be true when end < 0.
1100 * The range is empty when start > end or start >= length. */
e2641e09 1101 if (start > end || start >= llen) {
1102 addReply(c,shared.czero);
1103 return;
1104 }
1105 if (end >= llen) end = llen-1;
1106
63b7b7fb
PN
1107 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1108 /* Correct for 1-based rank. */
8588bfa3 1109 zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
48991620 1110 if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
100ed062 1111 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
63b7b7fb
PN
1112 zset *zs = zobj->ptr;
1113
1114 /* Correct for 1-based rank. */
1115 deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
1116 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
1117 if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
1118 } else {
1119 redisPanic("Unknown sorted set encoding");
1120 }
1121
1122 if (deleted) signalModifiedKey(c->db,key);
e2641e09 1123 server.dirty += deleted;
63b7b7fb 1124 addReplyLongLong(c,deleted);
e2641e09 1125}
1126
1127typedef struct {
56ce42fa
PN
1128 robj *subject;
1129 int type; /* Set, sorted set */
1130 int encoding;
e2641e09 1131 double weight;
56ce42fa
PN
1132
1133 union {
1134 /* Set iterators. */
1135 union _iterset {
1136 struct {
1137 intset *is;
1138 int ii;
1139 } is;
1140 struct {
1141 dict *dict;
1142 dictIterator *di;
1143 dictEntry *de;
1144 } ht;
1145 } set;
1146
1147 /* Sorted set iterators. */
1148 union _iterzset {
1149 struct {
1150 unsigned char *zl;
1151 unsigned char *eptr, *sptr;
1152 } zl;
1153 struct {
1154 zset *zs;
1155 zskiplistNode *node;
1156 } sl;
1157 } zset;
1158 } iter;
e2641e09 1159} zsetopsrc;
1160
56ce42fa
PN
1161
1162/* Use dirty flags for pointers that need to be cleaned up in the next
1163 * iteration over the zsetopval. The dirty flag for the long long value is
1164 * special, since long long values don't need cleanup. Instead, it means that
1165 * we already checked that "ell" holds a long long, or tried to convert another
1166 * representation into a long long value. When this was successful,
1167 * OPVAL_VALID_LL is set as well. */
1168#define OPVAL_DIRTY_ROBJ 1
1169#define OPVAL_DIRTY_LL 2
1170#define OPVAL_VALID_LL 4
1171
1172/* Store value retrieved from the iterator. */
1173typedef struct {
1174 int flags;
1175 unsigned char _buf[32]; /* Private buffer. */
1176 robj *ele;
1177 unsigned char *estr;
1178 unsigned int elen;
1179 long long ell;
1180 double score;
1181} zsetopval;
1182
1183typedef union _iterset iterset;
1184typedef union _iterzset iterzset;
1185
1186void zuiInitIterator(zsetopsrc *op) {
1187 if (op->subject == NULL)
1188 return;
1189
1190 if (op->type == REDIS_SET) {
1191 iterset *it = &op->iter.set;
1192 if (op->encoding == REDIS_ENCODING_INTSET) {
1193 it->is.is = op->subject->ptr;
1194 it->is.ii = 0;
1195 } else if (op->encoding == REDIS_ENCODING_HT) {
1196 it->ht.dict = op->subject->ptr;
1197 it->ht.di = dictGetIterator(op->subject->ptr);
1198 it->ht.de = dictNext(it->ht.di);
1199 } else {
1200 redisPanic("Unknown set encoding");
1201 }
1202 } else if (op->type == REDIS_ZSET) {
1203 iterzset *it = &op->iter.zset;
1204 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1205 it->zl.zl = op->subject->ptr;
1206 it->zl.eptr = ziplistIndex(it->zl.zl,0);
1207 if (it->zl.eptr != NULL) {
1208 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
1209 redisAssert(it->zl.sptr != NULL);
1210 }
100ed062 1211 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1212 it->sl.zs = op->subject->ptr;
1213 it->sl.node = it->sl.zs->zsl->header->level[0].forward;
1214 } else {
1215 redisPanic("Unknown sorted set encoding");
1216 }
1217 } else {
1218 redisPanic("Unsupported type");
1219 }
1220}
1221
1222void zuiClearIterator(zsetopsrc *op) {
1223 if (op->subject == NULL)
1224 return;
1225
1226 if (op->type == REDIS_SET) {
1227 iterset *it = &op->iter.set;
1228 if (op->encoding == REDIS_ENCODING_INTSET) {
1229 REDIS_NOTUSED(it); /* skip */
1230 } else if (op->encoding == REDIS_ENCODING_HT) {
1231 dictReleaseIterator(it->ht.di);
1232 } else {
1233 redisPanic("Unknown set encoding");
1234 }
1235 } else if (op->type == REDIS_ZSET) {
1236 iterzset *it = &op->iter.zset;
1237 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1238 REDIS_NOTUSED(it); /* skip */
100ed062 1239 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1240 REDIS_NOTUSED(it); /* skip */
1241 } else {
1242 redisPanic("Unknown sorted set encoding");
1243 }
1244 } else {
1245 redisPanic("Unsupported type");
1246 }
1247}
1248
1249int zuiLength(zsetopsrc *op) {
1250 if (op->subject == NULL)
1251 return 0;
1252
1253 if (op->type == REDIS_SET) {
1254 iterset *it = &op->iter.set;
1255 if (op->encoding == REDIS_ENCODING_INTSET) {
1256 return intsetLen(it->is.is);
1257 } else if (op->encoding == REDIS_ENCODING_HT) {
1258 return dictSize(it->ht.dict);
1259 } else {
1260 redisPanic("Unknown set encoding");
1261 }
1262 } else if (op->type == REDIS_ZSET) {
1263 iterzset *it = &op->iter.zset;
1264 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1265 return zzlLength(it->zl.zl);
100ed062 1266 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1267 return it->sl.zs->zsl->length;
1268 } else {
1269 redisPanic("Unknown sorted set encoding");
1270 }
1271 } else {
1272 redisPanic("Unsupported type");
1273 }
1274}
1275
1276/* Check if the current value is valid. If so, store it in the passed structure
1277 * and move to the next element. If not valid, this means we have reached the
1278 * end of the structure and can abort. */
1279int zuiNext(zsetopsrc *op, zsetopval *val) {
1280 if (op->subject == NULL)
1281 return 0;
1282
1283 if (val->flags & OPVAL_DIRTY_ROBJ)
1284 decrRefCount(val->ele);
1285
fe7be460 1286 memset(val,0,sizeof(zsetopval));
56ce42fa
PN
1287
1288 if (op->type == REDIS_SET) {
1289 iterset *it = &op->iter.set;
1290 if (op->encoding == REDIS_ENCODING_INTSET) {
4dada1b5 1291 int64_t ell;
9de5d460 1292
1293 if (!intsetGet(it->is.is,it->is.ii,&ell))
56ce42fa 1294 return 0;
4dada1b5 1295 val->ell = ell;
56ce42fa
PN
1296 val->score = 1.0;
1297
1298 /* Move to next element. */
1299 it->is.ii++;
1300 } else if (op->encoding == REDIS_ENCODING_HT) {
1301 if (it->ht.de == NULL)
1302 return 0;
c0ba9ebe 1303 val->ele = dictGetKey(it->ht.de);
56ce42fa
PN
1304 val->score = 1.0;
1305
1306 /* Move to next element. */
1307 it->ht.de = dictNext(it->ht.di);
1308 } else {
1309 redisPanic("Unknown set encoding");
1310 }
1311 } else if (op->type == REDIS_ZSET) {
1312 iterzset *it = &op->iter.zset;
1313 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
1314 /* No need to check both, but better be explicit. */
1315 if (it->zl.eptr == NULL || it->zl.sptr == NULL)
1316 return 0;
1317 redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell));
1318 val->score = zzlGetScore(it->zl.sptr);
1319
1320 /* Move to next element. */
1321 zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
100ed062 1322 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1323 if (it->sl.node == NULL)
1324 return 0;
1325 val->ele = it->sl.node->obj;
1326 val->score = it->sl.node->score;
1327
1328 /* Move to next element. */
1329 it->sl.node = it->sl.node->level[0].forward;
1330 } else {
1331 redisPanic("Unknown sorted set encoding");
1332 }
1333 } else {
1334 redisPanic("Unsupported type");
1335 }
1336 return 1;
1337}
1338
1339int zuiLongLongFromValue(zsetopval *val) {
1340 if (!(val->flags & OPVAL_DIRTY_LL)) {
1341 val->flags |= OPVAL_DIRTY_LL;
1342
1343 if (val->ele != NULL) {
1344 if (val->ele->encoding == REDIS_ENCODING_INT) {
1345 val->ell = (long)val->ele->ptr;
1346 val->flags |= OPVAL_VALID_LL;
1347 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1348 if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell))
1349 val->flags |= OPVAL_VALID_LL;
1350 } else {
1351 redisPanic("Unsupported element encoding");
1352 }
1353 } else if (val->estr != NULL) {
1354 if (string2ll((char*)val->estr,val->elen,&val->ell))
1355 val->flags |= OPVAL_VALID_LL;
1356 } else {
1357 /* The long long was already set, flag as valid. */
1358 val->flags |= OPVAL_VALID_LL;
1359 }
1360 }
1361 return val->flags & OPVAL_VALID_LL;
1362}
1363
1364robj *zuiObjectFromValue(zsetopval *val) {
1365 if (val->ele == NULL) {
1366 if (val->estr != NULL) {
1367 val->ele = createStringObject((char*)val->estr,val->elen);
1368 } else {
1369 val->ele = createStringObjectFromLongLong(val->ell);
1370 }
1371 val->flags |= OPVAL_DIRTY_ROBJ;
1372 }
1373 return val->ele;
1374}
1375
1376int zuiBufferFromValue(zsetopval *val) {
1377 if (val->estr == NULL) {
1378 if (val->ele != NULL) {
1379 if (val->ele->encoding == REDIS_ENCODING_INT) {
1380 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr);
1381 val->estr = val->_buf;
1382 } else if (val->ele->encoding == REDIS_ENCODING_RAW) {
1383 val->elen = sdslen(val->ele->ptr);
1384 val->estr = val->ele->ptr;
1385 } else {
1386 redisPanic("Unsupported element encoding");
1387 }
1388 } else {
1389 val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell);
1390 val->estr = val->_buf;
1391 }
1392 }
1393 return 1;
1394}
1395
1396/* Find value pointed to by val in the source pointer to by op. When found,
1397 * return 1 and store its score in target. Return 0 otherwise. */
1398int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
1399 if (op->subject == NULL)
1400 return 0;
1401
1402 if (op->type == REDIS_SET) {
1403 iterset *it = &op->iter.set;
1404
1405 if (op->encoding == REDIS_ENCODING_INTSET) {
1406 if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) {
1407 *score = 1.0;
1408 return 1;
1409 } else {
1410 return 0;
1411 }
1412 } else if (op->encoding == REDIS_ENCODING_HT) {
1413 zuiObjectFromValue(val);
1414 if (dictFind(it->ht.dict,val->ele) != NULL) {
1415 *score = 1.0;
1416 return 1;
1417 } else {
1418 return 0;
1419 }
1420 } else {
1421 redisPanic("Unknown set encoding");
1422 }
1423 } else if (op->type == REDIS_ZSET) {
1424 iterzset *it = &op->iter.zset;
1425 zuiObjectFromValue(val);
1426
1427 if (op->encoding == REDIS_ENCODING_ZIPLIST) {
8588bfa3 1428 if (zzlFind(it->zl.zl,val->ele,score) != NULL) {
56ce42fa
PN
1429 /* Score is already set by zzlFind. */
1430 return 1;
1431 } else {
1432 return 0;
1433 }
100ed062 1434 } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
56ce42fa
PN
1435 dictEntry *de;
1436 if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
c0ba9ebe 1437 *score = *(double*)dictGetVal(de);
56ce42fa
PN
1438 return 1;
1439 } else {
1440 return 0;
1441 }
1442 } else {
1443 redisPanic("Unknown sorted set encoding");
1444 }
1445 } else {
1446 redisPanic("Unsupported type");
1447 }
1448}
1449
1450int zuiCompareByCardinality(const void *s1, const void *s2) {
1451 return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2);
e2641e09 1452}
1453
1454#define REDIS_AGGR_SUM 1
1455#define REDIS_AGGR_MIN 2
1456#define REDIS_AGGR_MAX 3
c0ba9ebe 1457#define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
e2641e09 1458
1459inline static void zunionInterAggregate(double *target, double val, int aggregate) {
1460 if (aggregate == REDIS_AGGR_SUM) {
1461 *target = *target + val;
d9e28bcf
PN
1462 /* The result of adding two doubles is NaN when one variable
1463 * is +inf and the other is -inf. When these numbers are added,
1464 * we maintain the convention of the result being 0.0. */
1465 if (isnan(*target)) *target = 0.0;
e2641e09 1466 } else if (aggregate == REDIS_AGGR_MIN) {
1467 *target = val < *target ? val : *target;
1468 } else if (aggregate == REDIS_AGGR_MAX) {
1469 *target = val > *target ? val : *target;
1470 } else {
1471 /* safety net */
1472 redisPanic("Unknown ZUNION/INTER aggregate type");
1473 }
1474}
1475
1476void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
706b32e0
B
1477 int i, j;
1478 long setnum;
e2641e09 1479 int aggregate = REDIS_AGGR_SUM;
1480 zsetopsrc *src;
56ce42fa
PN
1481 zsetopval zval;
1482 robj *tmp;
255eebe2 1483 unsigned int maxelelen = 0;
e2641e09 1484 robj *dstobj;
1485 zset *dstzset;
69ef89f2 1486 zskiplistNode *znode;
8c1420ff 1487 int touched = 0;
e2641e09 1488
1489 /* expect setnum input keys to be given */
706b32e0
B
1490 if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK))
1491 return;
1492
e2641e09 1493 if (setnum < 1) {
3ab20376
PN
1494 addReplyError(c,
1495 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
e2641e09 1496 return;
1497 }
1498
1499 /* test if the expected number of keys would overflow */
de00a5a0 1500 if (setnum > c->argc-3) {
e2641e09 1501 addReply(c,shared.syntaxerr);
1502 return;
1503 }
1504
1505 /* read keys to be used for input */
56ce42fa 1506 src = zcalloc(sizeof(zsetopsrc) * setnum);
e2641e09 1507 for (i = 0, j = 3; i < setnum; i++, j++) {
1508 robj *obj = lookupKeyWrite(c->db,c->argv[j]);
56ce42fa
PN
1509 if (obj != NULL) {
1510 if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
e2641e09 1511 zfree(src);
1512 addReply(c,shared.wrongtypeerr);
1513 return;
1514 }
56ce42fa
PN
1515
1516 src[i].subject = obj;
1517 src[i].type = obj->type;
1518 src[i].encoding = obj->encoding;
1519 } else {
1520 src[i].subject = NULL;
e2641e09 1521 }
1522
56ce42fa 1523 /* Default all weights to 1. */
e2641e09 1524 src[i].weight = 1.0;
1525 }
1526
1527 /* parse optional extra arguments */
1528 if (j < c->argc) {
1529 int remaining = c->argc - j;
1530
1531 while (remaining) {
1532 if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
1533 j++; remaining--;
1534 for (i = 0; i < setnum; i++, j++, remaining--) {
673e1fb7 1535 if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
d93f9a86 1536 "weight value is not a float") != REDIS_OK)
673e1fb7
PN
1537 {
1538 zfree(src);
e2641e09 1539 return;
673e1fb7 1540 }
e2641e09 1541 }
1542 } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
1543 j++; remaining--;
1544 if (!strcasecmp(c->argv[j]->ptr,"sum")) {
1545 aggregate = REDIS_AGGR_SUM;
1546 } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
1547 aggregate = REDIS_AGGR_MIN;
1548 } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
1549 aggregate = REDIS_AGGR_MAX;
1550 } else {
1551 zfree(src);
1552 addReply(c,shared.syntaxerr);
1553 return;
1554 }
1555 j++; remaining--;
1556 } else {
1557 zfree(src);
1558 addReply(c,shared.syntaxerr);
1559 return;
1560 }
1561 }
1562 }
1563
56ce42fa
PN
1564 for (i = 0; i < setnum; i++)
1565 zuiInitIterator(&src[i]);
1566
e2641e09 1567 /* sort sets from the smallest to largest, this will improve our
1568 * algorithm's performance */
56ce42fa 1569 qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
e2641e09 1570
1571 dstobj = createZsetObject();
1572 dstzset = dstobj->ptr;
02e60065 1573 memset(&zval, 0, sizeof(zval));
e2641e09 1574
1575 if (op == REDIS_OP_INTER) {
56ce42fa
PN
1576 /* Skip everything if the smallest input is empty. */
1577 if (zuiLength(&src[0]) > 0) {
1578 /* Precondition: as src[0] is non-empty and the inputs are ordered
1579 * by size, all src[i > 0] are non-empty too. */
1580 while (zuiNext(&src[0],&zval)) {
d433ebc6 1581 double score, value;
e2641e09 1582
56ce42fa 1583 score = src[0].weight * zval.score;
256356ff 1584 if (isnan(score)) score = 0;
1585
e2641e09 1586 for (j = 1; j < setnum; j++) {
d200342a 1587 /* It is not safe to access the zset we are
cb16b6c3 1588 * iterating, so explicitly check for equal object. */
d070abe4 1589 if (src[j].subject == src[0].subject) {
1590 value = zval.score*src[j].weight;
1591 zunionInterAggregate(&score,value,aggregate);
1592 } else if (zuiFind(&src[j],&zval,&value)) {
56ce42fa 1593 value *= src[j].weight;
d433ebc6 1594 zunionInterAggregate(&score,value,aggregate);
e2641e09 1595 } else {
1596 break;
1597 }
1598 }
1599
56ce42fa 1600 /* Only continue when present in every input. */
d433ebc6 1601 if (j == setnum) {
56ce42fa
PN
1602 tmp = zuiObjectFromValue(&zval);
1603 znode = zslInsert(dstzset->zsl,score,tmp);
1604 incrRefCount(tmp); /* added to skiplist */
1605 dictAdd(dstzset->dict,tmp,&znode->score);
1606 incrRefCount(tmp); /* added to dictionary */
255eebe2
PN
1607
1608 if (tmp->encoding == REDIS_ENCODING_RAW)
1609 if (sdslen(tmp->ptr) > maxelelen)
1610 maxelelen = sdslen(tmp->ptr);
e2641e09 1611 }
1612 }
e2641e09 1613 }
1614 } else if (op == REDIS_OP_UNION) {
1615 for (i = 0; i < setnum; i++) {
521ddcce 1616 if (zuiLength(&src[i]) == 0)
56ce42fa 1617 continue;
e2641e09 1618
56ce42fa 1619 while (zuiNext(&src[i],&zval)) {
d433ebc6
PN
1620 double score, value;
1621
56ce42fa
PN
1622 /* Skip key when already processed */
1623 if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
d433ebc6 1624 continue;
e2641e09 1625
56ce42fa
PN
1626 /* Initialize score */
1627 score = src[i].weight * zval.score;
256356ff 1628 if (isnan(score)) score = 0;
e2641e09 1629
56ce42fa
PN
1630 /* Because the inputs are sorted by size, it's only possible
1631 * for sets at larger indices to hold this element. */
e2641e09 1632 for (j = (i+1); j < setnum; j++) {
d200342a 1633 /* It is not safe to access the zset we are
cb16b6c3 1634 * iterating, so explicitly check for equal object. */
1635 if(src[j].subject == src[i].subject) {
1636 value = zval.score*src[j].weight;
1637 zunionInterAggregate(&score,value,aggregate);
1638 } else if (zuiFind(&src[j],&zval,&value)) {
56ce42fa 1639 value *= src[j].weight;
d433ebc6 1640 zunionInterAggregate(&score,value,aggregate);
e2641e09 1641 }
1642 }
1643
56ce42fa
PN
1644 tmp = zuiObjectFromValue(&zval);
1645 znode = zslInsert(dstzset->zsl,score,tmp);
1646 incrRefCount(zval.ele); /* added to skiplist */
1647 dictAdd(dstzset->dict,tmp,&znode->score);
1648 incrRefCount(zval.ele); /* added to dictionary */
255eebe2
PN
1649
1650 if (tmp->encoding == REDIS_ENCODING_RAW)
1651 if (sdslen(tmp->ptr) > maxelelen)
1652 maxelelen = sdslen(tmp->ptr);
e2641e09 1653 }
e2641e09 1654 }
1655 } else {
56ce42fa 1656 redisPanic("Unknown operator");
e2641e09 1657 }
1658
56ce42fa
PN
1659 for (i = 0; i < setnum; i++)
1660 zuiClearIterator(&src[i]);
1661
8c1420ff 1662 if (dbDelete(c->db,dstkey)) {
cea8c5cd 1663 signalModifiedKey(c->db,dstkey);
8c1420ff 1664 touched = 1;
1665 server.dirty++;
1666 }
e2641e09 1667 if (dstzset->zsl->length) {
255eebe2
PN
1668 /* Convert to ziplist when in limits. */
1669 if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&
1670 maxelelen <= server.zset_max_ziplist_value)
df26a0ae 1671 zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST);
255eebe2 1672
e2641e09 1673 dbAdd(c->db,dstkey,dstobj);
df26a0ae 1674 addReplyLongLong(c,zsetLength(dstobj));
cea8c5cd 1675 if (!touched) signalModifiedKey(c->db,dstkey);
cbf7e107 1676 server.dirty++;
e2641e09 1677 } else {
1678 decrRefCount(dstobj);
255eebe2 1679 addReply(c,shared.czero);
e2641e09 1680 }
1681 zfree(src);
1682}
1683
1684void zunionstoreCommand(redisClient *c) {
1685 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
1686}
1687
1688void zinterstoreCommand(redisClient *c) {
1689 zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
1690}
1691
1692void zrangeGenericCommand(redisClient *c, int reverse) {
5d1b4fb6
PN
1693 robj *key = c->argv[1];
1694 robj *zobj;
1695 int withscores = 0;
e2641e09 1696 long start;
1697 long end;
e2641e09 1698 int llen;
5d1b4fb6 1699 int rangelen;
e2641e09 1700
1701 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
1702 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
1703
1704 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
1705 withscores = 1;
1706 } else if (c->argc >= 5) {
1707 addReply(c,shared.syntaxerr);
1708 return;
1709 }
1710
5d1b4fb6
PN
1711 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
1712 || checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 1713
5d1b4fb6 1714 /* Sanitize indexes. */
df26a0ae 1715 llen = zsetLength(zobj);
e2641e09 1716 if (start < 0) start = llen+start;
1717 if (end < 0) end = llen+end;
1718 if (start < 0) start = 0;
e2641e09 1719
d0a4e24e
PN
1720 /* Invariant: start >= 0, so this test will be true when end < 0.
1721 * The range is empty when start > end or start >= length. */
e2641e09 1722 if (start > end || start >= llen) {
e2641e09 1723 addReply(c,shared.emptymultibulk);
1724 return;
1725 }
1726 if (end >= llen) end = llen-1;
1727 rangelen = (end-start)+1;
1728
e2641e09 1729 /* Return the result in form of a multi-bulk reply */
5d1b4fb6
PN
1730 addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
1731
1732 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1733 unsigned char *zl = zobj->ptr;
1734 unsigned char *eptr, *sptr;
1735 unsigned char *vstr;
1736 unsigned int vlen;
1737 long long vlong;
1738
1739 if (reverse)
1740 eptr = ziplistIndex(zl,-2-(2*start));
1741 else
1742 eptr = ziplistIndex(zl,2*start);
1743
eab0e26e 1744 redisAssertWithInfo(c,zobj,eptr != NULL);
4c5f0966
PN
1745 sptr = ziplistNext(zl,eptr);
1746
5d1b4fb6 1747 while (rangelen--) {
eab0e26e 1748 redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL);
1749 redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
5d1b4fb6
PN
1750 if (vstr == NULL)
1751 addReplyBulkLongLong(c,vlong);
1752 else
1753 addReplyBulkCBuffer(c,vstr,vlen);
1754
4c5f0966 1755 if (withscores)
5d1b4fb6 1756 addReplyDouble(c,zzlGetScore(sptr));
5d1b4fb6 1757
4c5f0966
PN
1758 if (reverse)
1759 zzlPrev(zl,&eptr,&sptr);
1760 else
1761 zzlNext(zl,&eptr,&sptr);
5d1b4fb6
PN
1762 }
1763
100ed062 1764 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
5d1b4fb6
PN
1765 zset *zs = zobj->ptr;
1766 zskiplist *zsl = zs->zsl;
1767 zskiplistNode *ln;
1768 robj *ele;
1769
1770 /* Check if starting point is trivial, before doing log(N) lookup. */
1771 if (reverse) {
1772 ln = zsl->tail;
1773 if (start > 0)
1774 ln = zslGetElementByRank(zsl,llen-start);
1775 } else {
1776 ln = zsl->header->level[0].forward;
1777 if (start > 0)
1778 ln = zslGetElementByRank(zsl,start+1);
1779 }
1780
1781 while(rangelen--) {
eab0e26e 1782 redisAssertWithInfo(c,zobj,ln != NULL);
5d1b4fb6
PN
1783 ele = ln->obj;
1784 addReplyBulk(c,ele);
1785 if (withscores)
1786 addReplyDouble(c,ln->score);
1787 ln = reverse ? ln->backward : ln->level[0].forward;
1788 }
1789 } else {
1790 redisPanic("Unknown sorted set encoding");
e2641e09 1791 }
1792}
1793
1794void zrangeCommand(redisClient *c) {
1795 zrangeGenericCommand(c,0);
1796}
1797
1798void zrevrangeCommand(redisClient *c) {
1799 zrangeGenericCommand(c,1);
1800}
1801
0cfc8940
PN
1802/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
1803void genericZrangebyscoreCommand(redisClient *c, int reverse) {
25bb8a44 1804 zrangespec range;
aff255c8 1805 robj *key = c->argv[1];
0cfc8940 1806 robj *zobj;
706b32e0 1807 long offset = 0, limit = -1;
e2641e09 1808 int withscores = 0;
25bb8a44
PN
1809 unsigned long rangelen = 0;
1810 void *replylen = NULL;
22b9bf15 1811 int minidx, maxidx;
e2641e09 1812
25bb8a44 1813 /* Parse the range arguments. */
22b9bf15
PN
1814 if (reverse) {
1815 /* Range is given as [max,min] */
1816 maxidx = 2; minidx = 3;
1817 } else {
1818 /* Range is given as [min,max] */
1819 minidx = 2; maxidx = 3;
1820 }
1821
1822 if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) {
d93f9a86 1823 addReplyError(c,"min or max is not a float");
7236fdb2
PN
1824 return;
1825 }
25bb8a44
PN
1826
1827 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1828 * 4 arguments, so we'll never enter the following code path. */
1829 if (c->argc > 4) {
1830 int remaining = c->argc - 4;
1831 int pos = 4;
1832
1833 while (remaining) {
1834 if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) {
1835 pos++; remaining--;
1836 withscores = 1;
1837 } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) {
706b32e0
B
1838 if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != REDIS_OK) ||
1839 (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != REDIS_OK)) return;
25bb8a44
PN
1840 pos += 3; remaining -= 3;
1841 } else {
1842 addReply(c,shared.syntaxerr);
1843 return;
1844 }
1845 }
e2641e09 1846 }
25bb8a44
PN
1847
1848 /* Ok, lookup the key and get the range */
0cfc8940 1849 if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL ||
aff255c8 1850 checkType(c,zobj,REDIS_ZSET)) return;
25bb8a44 1851
aff255c8
PN
1852 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
1853 unsigned char *zl = zobj->ptr;
1854 unsigned char *eptr, *sptr;
1855 unsigned char *vstr;
1856 unsigned int vlen;
1857 long long vlong;
1858 double score;
e2641e09 1859
aff255c8 1860 /* If reversed, get the last node in range as starting point. */
0cfc8940 1861 if (reverse) {
8588bfa3 1862 eptr = zzlLastInRange(zl,range);
0cfc8940 1863 } else {
8588bfa3 1864 eptr = zzlFirstInRange(zl,range);
0cfc8940 1865 }
e2641e09 1866
aff255c8
PN
1867 /* No "first" element in the specified interval. */
1868 if (eptr == NULL) {
0cfc8940 1869 addReply(c, shared.emptymultibulk);
aff255c8
PN
1870 return;
1871 }
e2641e09 1872
aff255c8 1873 /* Get score pointer for the first element. */
eab0e26e 1874 redisAssertWithInfo(c,zobj,eptr != NULL);
aff255c8 1875 sptr = ziplistNext(zl,eptr);
e2641e09 1876
aff255c8
PN
1877 /* We don't know in advance how many matching elements there are in the
1878 * list, so we push this object that will represent the multi-bulk
1879 * length in the output buffer, and will "fix" it later */
0cfc8940 1880 replylen = addDeferredMultiBulkLength(c);
aff255c8
PN
1881
1882 /* If there is an offset, just traverse the number of elements without
1883 * checking the score because that is done in the next loop. */
0cfc8940
PN
1884 while (eptr && offset--) {
1885 if (reverse) {
aff255c8 1886 zzlPrev(zl,&eptr,&sptr);
0cfc8940 1887 } else {
aff255c8 1888 zzlNext(zl,&eptr,&sptr);
0cfc8940
PN
1889 }
1890 }
aff255c8
PN
1891
1892 while (eptr && limit--) {
1893 score = zzlGetScore(sptr);
1894
1895 /* Abort when the node is no longer in range. */
1896 if (reverse) {
1897 if (!zslValueGteMin(score,&range)) break;
1898 } else {
1899 if (!zslValueLteMax(score,&range)) break;
1900 }
1901
0cfc8940 1902 /* We know the element exists, so ziplistGet should always succeed */
eab0e26e 1903 redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
0cfc8940 1904
aff255c8 1905 rangelen++;
0cfc8940
PN
1906 if (vstr == NULL) {
1907 addReplyBulkLongLong(c,vlong);
1908 } else {
1909 addReplyBulkCBuffer(c,vstr,vlen);
1910 }
1911
1912 if (withscores) {
1913 addReplyDouble(c,score);
aff255c8
PN
1914 }
1915
1916 /* Move to next node */
0cfc8940 1917 if (reverse) {
aff255c8 1918 zzlPrev(zl,&eptr,&sptr);
0cfc8940 1919 } else {
aff255c8 1920 zzlNext(zl,&eptr,&sptr);
0cfc8940 1921 }
e2641e09 1922 }
100ed062 1923 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
aff255c8
PN
1924 zset *zs = zobj->ptr;
1925 zskiplist *zsl = zs->zsl;
1926 zskiplistNode *ln;
25bb8a44 1927
aff255c8 1928 /* If reversed, get the last node in range as starting point. */
0cfc8940 1929 if (reverse) {
aff255c8 1930 ln = zslLastInRange(zsl,range);
0cfc8940 1931 } else {
aff255c8 1932 ln = zslFirstInRange(zsl,range);
0cfc8940 1933 }
aff255c8
PN
1934
1935 /* No "first" element in the specified interval. */
1936 if (ln == NULL) {
0cfc8940 1937 addReply(c, shared.emptymultibulk);
aff255c8 1938 return;
25bb8a44
PN
1939 }
1940
aff255c8
PN
1941 /* We don't know in advance how many matching elements there are in the
1942 * list, so we push this object that will represent the multi-bulk
1943 * length in the output buffer, and will "fix" it later */
0cfc8940 1944 replylen = addDeferredMultiBulkLength(c);
aff255c8
PN
1945
1946 /* If there is an offset, just traverse the number of elements without
1947 * checking the score because that is done in the next loop. */
0cfc8940
PN
1948 while (ln && offset--) {
1949 if (reverse) {
1950 ln = ln->backward;
1951 } else {
1952 ln = ln->level[0].forward;
1953 }
1954 }
aff255c8
PN
1955
1956 while (ln && limit--) {
1957 /* Abort when the node is no longer in range. */
1958 if (reverse) {
1959 if (!zslValueGteMin(ln->score,&range)) break;
1960 } else {
1961 if (!zslValueLteMax(ln->score,&range)) break;
1962 }
1963
aff255c8 1964 rangelen++;
0cfc8940
PN
1965 addReplyBulk(c,ln->obj);
1966
1967 if (withscores) {
1968 addReplyDouble(c,ln->score);
aff255c8
PN
1969 }
1970
1971 /* Move to next node */
0cfc8940
PN
1972 if (reverse) {
1973 ln = ln->backward;
1974 } else {
1975 ln = ln->level[0].forward;
1976 }
aff255c8
PN
1977 }
1978 } else {
1979 redisPanic("Unknown sorted set encoding");
25bb8a44
PN
1980 }
1981
0cfc8940
PN
1982 if (withscores) {
1983 rangelen *= 2;
e2641e09 1984 }
0cfc8940
PN
1985
1986 setDeferredMultiBulkLength(c, replylen, rangelen);
e2641e09 1987}
1988
1989void zrangebyscoreCommand(redisClient *c) {
0cfc8940 1990 genericZrangebyscoreCommand(c,0);
25bb8a44
PN
1991}
1992
1993void zrevrangebyscoreCommand(redisClient *c) {
0cfc8940 1994 genericZrangebyscoreCommand(c,1);
e2641e09 1995}
1996
1997void zcountCommand(redisClient *c) {
62d774e5
PN
1998 robj *key = c->argv[1];
1999 robj *zobj;
2000 zrangespec range;
2001 int count = 0;
2002
2003 /* Parse the range arguments */
2004 if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
d93f9a86 2005 addReplyError(c,"min or max is not a float");
62d774e5
PN
2006 return;
2007 }
2008
2009 /* Lookup the sorted set */
2010 if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL ||
2011 checkType(c, zobj, REDIS_ZSET)) return;
2012
2013 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2014 unsigned char *zl = zobj->ptr;
2015 unsigned char *eptr, *sptr;
2016 double score;
2017
2018 /* Use the first element in range as the starting point */
2019 eptr = zzlFirstInRange(zl,range);
2020
2021 /* No "first" element */
2022 if (eptr == NULL) {
2023 addReply(c, shared.czero);
2024 return;
2025 }
2026
2027 /* First element is in range */
2028 sptr = ziplistNext(zl,eptr);
2029 score = zzlGetScore(sptr);
eab0e26e 2030 redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range));
62d774e5
PN
2031
2032 /* Iterate over elements in range */
2033 while (eptr) {
2034 score = zzlGetScore(sptr);
2035
2036 /* Abort when the node is no longer in range. */
2037 if (!zslValueLteMax(score,&range)) {
2038 break;
2039 } else {
2040 count++;
2041 zzlNext(zl,&eptr,&sptr);
2042 }
2043 }
2044 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
2045 zset *zs = zobj->ptr;
2046 zskiplist *zsl = zs->zsl;
2047 zskiplistNode *zn;
2048 unsigned long rank;
2049
2050 /* Find first element in range */
2051 zn = zslFirstInRange(zsl, range);
2052
2053 /* Use rank of first element, if any, to determine preliminary count */
2054 if (zn != NULL) {
2055 rank = zslGetRank(zsl, zn->score, zn->obj);
2056 count = (zsl->length - (rank - 1));
2057
2058 /* Find last element in range */
2059 zn = zslLastInRange(zsl, range);
2060
2061 /* Use rank of last element, if any, to determine the actual count */
2062 if (zn != NULL) {
2063 rank = zslGetRank(zsl, zn->score, zn->obj);
2064 count -= (zsl->length - rank);
2065 }
2066 }
2067 } else {
2068 redisPanic("Unknown sorted set encoding");
2069 }
2070
2071 addReplyLongLong(c, count);
e2641e09 2072}
2073
2074void zcardCommand(redisClient *c) {
d1c920c5
PN
2075 robj *key = c->argv[1];
2076 robj *zobj;
e2641e09 2077
d1c920c5
PN
2078 if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
2079 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 2080
df26a0ae 2081 addReplyLongLong(c,zsetLength(zobj));
e2641e09 2082}
2083
2084void zscoreCommand(redisClient *c) {
d1c920c5
PN
2085 robj *key = c->argv[1];
2086 robj *zobj;
2087 double score;
e2641e09 2088
d1c920c5
PN
2089 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2090 checkType(c,zobj,REDIS_ZSET)) return;
e2641e09 2091
d1c920c5 2092 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
8588bfa3 2093 if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL)
d1c920c5
PN
2094 addReplyDouble(c,score);
2095 else
2096 addReply(c,shared.nullbulk);
100ed062 2097 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
d1c920c5
PN
2098 zset *zs = zobj->ptr;
2099 dictEntry *de;
e2641e09 2100
d1c920c5
PN
2101 c->argv[2] = tryObjectEncoding(c->argv[2]);
2102 de = dictFind(zs->dict,c->argv[2]);
2103 if (de != NULL) {
c0ba9ebe 2104 score = *(double*)dictGetVal(de);
d1c920c5
PN
2105 addReplyDouble(c,score);
2106 } else {
2107 addReply(c,shared.nullbulk);
2108 }
2109 } else {
2110 redisPanic("Unknown sorted set encoding");
e2641e09 2111 }
2112}
2113
2114void zrankGenericCommand(redisClient *c, int reverse) {
d1c920c5
PN
2115 robj *key = c->argv[1];
2116 robj *ele = c->argv[2];
2117 robj *zobj;
2118 unsigned long llen;
e2641e09 2119 unsigned long rank;
e2641e09 2120
d1c920c5
PN
2121 if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
2122 checkType(c,zobj,REDIS_ZSET)) return;
df26a0ae 2123 llen = zsetLength(zobj);
e2641e09 2124
eab0e26e 2125 redisAssertWithInfo(c,ele,ele->encoding == REDIS_ENCODING_RAW);
d1c920c5
PN
2126 if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
2127 unsigned char *zl = zobj->ptr;
2128 unsigned char *eptr, *sptr;
e2641e09 2129
d1c920c5 2130 eptr = ziplistIndex(zl,0);
eab0e26e 2131 redisAssertWithInfo(c,zobj,eptr != NULL);
d1c920c5 2132 sptr = ziplistNext(zl,eptr);
eab0e26e 2133 redisAssertWithInfo(c,zobj,sptr != NULL);
d1c920c5
PN
2134
2135 rank = 1;
2136 while(eptr != NULL) {
2137 if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
2138 break;
2139 rank++;
2140 zzlNext(zl,&eptr,&sptr);
2141 }
2142
2143 if (eptr != NULL) {
2144 if (reverse)
2145 addReplyLongLong(c,llen-rank);
2146 else
2147 addReplyLongLong(c,rank-1);
e2641e09 2148 } else {
d1c920c5
PN
2149 addReply(c,shared.nullbulk);
2150 }
100ed062 2151 } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
d1c920c5
PN
2152 zset *zs = zobj->ptr;
2153 zskiplist *zsl = zs->zsl;
2154 dictEntry *de;
2155 double score;
2156
2157 ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
2158 de = dictFind(zs->dict,ele);
2159 if (de != NULL) {
c0ba9ebe 2160 score = *(double*)dictGetVal(de);
d1c920c5 2161 rank = zslGetRank(zsl,score,ele);
eab0e26e 2162 redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */
d1c920c5
PN
2163 if (reverse)
2164 addReplyLongLong(c,llen-rank);
2165 else
2166 addReplyLongLong(c,rank-1);
2167 } else {
2168 addReply(c,shared.nullbulk);
e2641e09 2169 }
2170 } else {
d1c920c5 2171 redisPanic("Unknown sorted set encoding");
e2641e09 2172 }
2173}
2174
2175void zrankCommand(redisClient *c) {
2176 zrankGenericCommand(c, 0);
2177}
2178
2179void zrevrankCommand(redisClient *c) {
2180 zrankGenericCommand(c, 1);
2181}