]>
git.saurik.com Git - redis.git/blob - src/t_zset.c
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
31 /*-----------------------------------------------------------------------------
33 *----------------------------------------------------------------------------*/
35 /* ZSETs are ordered sets using two data structures to hold the same elements
36 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
39 * The elements are added to an hash table mapping Redis objects to scores.
40 * At the same time the elements are added to a skip list mapping scores
41 * to Redis objects (so objects are sorted by scores in this "view"). */
43 /* This skiplist implementation is almost a C translation of the original
44 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
45 * Alternative to Balanced Trees", modified in three ways:
46 * a) this implementation allows for repeated scores.
47 * b) the comparison is not just by key (our 'score') but by satellite data.
48 * c) there is a back pointer, so it's a doubly linked list with the back
49 * pointers being only at "level 1". This allows to traverse the list
50 * from tail to head, useful for ZREVRANGE. */
55 zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
56 zskiplistNode
*zn
= zmalloc(sizeof(*zn
)+level
*sizeof(struct zskiplistLevel
));
62 zskiplist
*zslCreate(void) {
66 zsl
= zmalloc(sizeof(*zsl
));
69 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
70 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++) {
71 zsl
->header
->level
[j
].forward
= NULL
;
72 zsl
->header
->level
[j
].span
= 0;
74 zsl
->header
->backward
= NULL
;
79 void zslFreeNode(zskiplistNode
*node
) {
80 decrRefCount(node
->obj
);
84 void zslFree(zskiplist
*zsl
) {
85 zskiplistNode
*node
= zsl
->header
->level
[0].forward
, *next
;
89 next
= node
->level
[0].forward
;
96 /* Returns a random level for the new skiplist node we are going to create.
97 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
98 * (both inclusive), with a powerlaw-alike distribution where higher
99 * levels are less likely to be returned. */
100 int zslRandomLevel(void) {
102 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
104 return (level
<ZSKIPLIST_MAXLEVEL
) ? level
: ZSKIPLIST_MAXLEVEL
;
107 zskiplistNode
*zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
108 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
109 unsigned int rank
[ZSKIPLIST_MAXLEVEL
];
112 redisAssert(!isnan(score
));
114 for (i
= zsl
->level
-1; i
>= 0; i
--) {
115 /* store rank that is crossed to reach the insert position */
116 rank
[i
] = i
== (zsl
->level
-1) ? 0 : rank
[i
+1];
117 while (x
->level
[i
].forward
&&
118 (x
->level
[i
].forward
->score
< score
||
119 (x
->level
[i
].forward
->score
== score
&&
120 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) {
121 rank
[i
] += x
->level
[i
].span
;
122 x
= x
->level
[i
].forward
;
126 /* we assume the key is not already inside, since we allow duplicated
127 * scores, and the re-insertion of score and redis object should never
128 * happpen since the caller of zslInsert() should test in the hash table
129 * if the element is already inside or not. */
130 level
= zslRandomLevel();
131 if (level
> zsl
->level
) {
132 for (i
= zsl
->level
; i
< level
; i
++) {
134 update
[i
] = zsl
->header
;
135 update
[i
]->level
[i
].span
= zsl
->length
;
139 x
= zslCreateNode(level
,score
,obj
);
140 for (i
= 0; i
< level
; i
++) {
141 x
->level
[i
].forward
= update
[i
]->level
[i
].forward
;
142 update
[i
]->level
[i
].forward
= x
;
144 /* update span covered by update[i] as x is inserted here */
145 x
->level
[i
].span
= update
[i
]->level
[i
].span
- (rank
[0] - rank
[i
]);
146 update
[i
]->level
[i
].span
= (rank
[0] - rank
[i
]) + 1;
149 /* increment span for untouched levels */
150 for (i
= level
; i
< zsl
->level
; i
++) {
151 update
[i
]->level
[i
].span
++;
154 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
155 if (x
->level
[0].forward
)
156 x
->level
[0].forward
->backward
= x
;
163 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
164 void zslDeleteNode(zskiplist
*zsl
, zskiplistNode
*x
, zskiplistNode
**update
) {
166 for (i
= 0; i
< zsl
->level
; i
++) {
167 if (update
[i
]->level
[i
].forward
== x
) {
168 update
[i
]->level
[i
].span
+= x
->level
[i
].span
- 1;
169 update
[i
]->level
[i
].forward
= x
->level
[i
].forward
;
171 update
[i
]->level
[i
].span
-= 1;
174 if (x
->level
[0].forward
) {
175 x
->level
[0].forward
->backward
= x
->backward
;
177 zsl
->tail
= x
->backward
;
179 while(zsl
->level
> 1 && zsl
->header
->level
[zsl
->level
-1].forward
== NULL
)
184 /* Delete an element with matching score/object from the skiplist. */
185 int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
186 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
190 for (i
= zsl
->level
-1; i
>= 0; i
--) {
191 while (x
->level
[i
].forward
&&
192 (x
->level
[i
].forward
->score
< score
||
193 (x
->level
[i
].forward
->score
== score
&&
194 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0)))
195 x
= x
->level
[i
].forward
;
198 /* We may have multiple elements with the same score, what we need
199 * is to find the element with both the right score and object. */
200 x
= x
->level
[0].forward
;
201 if (x
&& score
== x
->score
&& equalStringObjects(x
->obj
,obj
)) {
202 zslDeleteNode(zsl
, x
, update
);
206 return 0; /* not found */
208 return 0; /* not found */
211 static int zslValueGteMin(double value
, zrangespec
*spec
) {
212 return spec
->minex
? (value
> spec
->min
) : (value
>= spec
->min
);
215 static int zslValueLteMax(double value
, zrangespec
*spec
) {
216 return spec
->maxex
? (value
< spec
->max
) : (value
<= spec
->max
);
219 /* Returns if there is a part of the zset is in range. */
220 int zslIsInRange(zskiplist
*zsl
, zrangespec
*range
) {
223 /* Test for ranges that will always be empty. */
224 if (range
->min
> range
->max
||
225 (range
->min
== range
->max
&& (range
->minex
|| range
->maxex
)))
228 if (x
== NULL
|| !zslValueGteMin(x
->score
,range
))
230 x
= zsl
->header
->level
[0].forward
;
231 if (x
== NULL
|| !zslValueLteMax(x
->score
,range
))
236 /* Find the first node that is contained in the specified range.
237 * Returns NULL when no element is contained in the range. */
238 zskiplistNode
*zslFirstInRange(zskiplist
*zsl
, zrangespec range
) {
242 /* If everything is out of range, return early. */
243 if (!zslIsInRange(zsl
,&range
)) return NULL
;
246 for (i
= zsl
->level
-1; i
>= 0; i
--) {
247 /* Go forward while *OUT* of range. */
248 while (x
->level
[i
].forward
&&
249 !zslValueGteMin(x
->level
[i
].forward
->score
,&range
))
250 x
= x
->level
[i
].forward
;
253 /* This is an inner range, so the next node cannot be NULL. */
254 x
= x
->level
[0].forward
;
255 redisAssert(x
!= NULL
);
257 /* Check if score <= max. */
258 if (!zslValueLteMax(x
->score
,&range
)) return NULL
;
262 /* Find the last node that is contained in the specified range.
263 * Returns NULL when no element is contained in the range. */
264 zskiplistNode
*zslLastInRange(zskiplist
*zsl
, zrangespec range
) {
268 /* If everything is out of range, return early. */
269 if (!zslIsInRange(zsl
,&range
)) return NULL
;
272 for (i
= zsl
->level
-1; i
>= 0; i
--) {
273 /* Go forward while *IN* range. */
274 while (x
->level
[i
].forward
&&
275 zslValueLteMax(x
->level
[i
].forward
->score
,&range
))
276 x
= x
->level
[i
].forward
;
279 /* This is an inner range, so this node cannot be NULL. */
280 redisAssert(x
!= NULL
);
282 /* Check if score >= min. */
283 if (!zslValueGteMin(x
->score
,&range
)) return NULL
;
287 /* Delete all the elements with score between min and max from the skiplist.
288 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
289 * Note that this function takes the reference to the hash table view of the
290 * sorted set, in order to remove the elements from the hash table too. */
291 unsigned long zslDeleteRangeByScore(zskiplist
*zsl
, zrangespec range
, dict
*dict
) {
292 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
293 unsigned long removed
= 0;
297 for (i
= zsl
->level
-1; i
>= 0; i
--) {
298 while (x
->level
[i
].forward
&& (range
.minex
?
299 x
->level
[i
].forward
->score
<= range
.min
:
300 x
->level
[i
].forward
->score
< range
.min
))
301 x
= x
->level
[i
].forward
;
305 /* Current node is the last with score < or <= min. */
306 x
= x
->level
[0].forward
;
308 /* Delete nodes while in range. */
309 while (x
&& (range
.maxex
? x
->score
< range
.max
: x
->score
<= range
.max
)) {
310 zskiplistNode
*next
= x
->level
[0].forward
;
311 zslDeleteNode(zsl
,x
,update
);
312 dictDelete(dict
,x
->obj
);
320 /* Delete all the elements with rank between start and end from the skiplist.
321 * Start and end are inclusive. Note that start and end need to be 1-based */
322 unsigned long zslDeleteRangeByRank(zskiplist
*zsl
, unsigned int start
, unsigned int end
, dict
*dict
) {
323 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
324 unsigned long traversed
= 0, removed
= 0;
328 for (i
= zsl
->level
-1; i
>= 0; i
--) {
329 while (x
->level
[i
].forward
&& (traversed
+ x
->level
[i
].span
) < start
) {
330 traversed
+= x
->level
[i
].span
;
331 x
= x
->level
[i
].forward
;
337 x
= x
->level
[0].forward
;
338 while (x
&& traversed
<= end
) {
339 zskiplistNode
*next
= x
->level
[0].forward
;
340 zslDeleteNode(zsl
,x
,update
);
341 dictDelete(dict
,x
->obj
);
350 /* Find the rank for an element by both score and key.
351 * Returns 0 when the element cannot be found, rank otherwise.
352 * Note that the rank is 1-based due to the span of zsl->header to the
354 unsigned long zslGetRank(zskiplist
*zsl
, double score
, robj
*o
) {
356 unsigned long rank
= 0;
360 for (i
= zsl
->level
-1; i
>= 0; i
--) {
361 while (x
->level
[i
].forward
&&
362 (x
->level
[i
].forward
->score
< score
||
363 (x
->level
[i
].forward
->score
== score
&&
364 compareStringObjects(x
->level
[i
].forward
->obj
,o
) <= 0))) {
365 rank
+= x
->level
[i
].span
;
366 x
= x
->level
[i
].forward
;
369 /* x might be equal to zsl->header, so test if obj is non-NULL */
370 if (x
->obj
&& equalStringObjects(x
->obj
,o
)) {
377 /* Finds an element by its rank. The rank argument needs to be 1-based. */
378 zskiplistNode
* zslGetElementByRank(zskiplist
*zsl
, unsigned long rank
) {
380 unsigned long traversed
= 0;
384 for (i
= zsl
->level
-1; i
>= 0; i
--) {
385 while (x
->level
[i
].forward
&& (traversed
+ x
->level
[i
].span
) <= rank
)
387 traversed
+= x
->level
[i
].span
;
388 x
= x
->level
[i
].forward
;
390 if (traversed
== rank
) {
397 /* Populate the rangespec according to the objects min and max. */
398 static int zslParseRange(robj
*min
, robj
*max
, zrangespec
*spec
) {
400 spec
->minex
= spec
->maxex
= 0;
402 /* Parse the min-max interval. If one of the values is prefixed
403 * by the "(" character, it's considered "open". For instance
404 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
405 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
406 if (min
->encoding
== REDIS_ENCODING_INT
) {
407 spec
->min
= (long)min
->ptr
;
409 if (((char*)min
->ptr
)[0] == '(') {
410 spec
->min
= strtod((char*)min
->ptr
+1,&eptr
);
411 if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
;
414 spec
->min
= strtod((char*)min
->ptr
,&eptr
);
415 if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
;
418 if (max
->encoding
== REDIS_ENCODING_INT
) {
419 spec
->max
= (long)max
->ptr
;
421 if (((char*)max
->ptr
)[0] == '(') {
422 spec
->max
= strtod((char*)max
->ptr
+1,&eptr
);
423 if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
;
426 spec
->max
= strtod((char*)max
->ptr
,&eptr
);
427 if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
;
434 /*-----------------------------------------------------------------------------
435 * Ziplist-backed sorted set API
436 *----------------------------------------------------------------------------*/
438 double zzlGetScore(unsigned char *sptr
) {
445 redisAssert(sptr
!= NULL
);
446 redisAssert(ziplistGet(sptr
,&vstr
,&vlen
,&vlong
));
449 memcpy(buf
,vstr
,vlen
);
451 score
= strtod(buf
,NULL
);
459 /* Compare element in sorted set with given element. */
460 int zzlCompareElements(unsigned char *eptr
, unsigned char *cstr
, unsigned int clen
) {
464 unsigned char vbuf
[32];
467 redisAssert(ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
469 /* Store string representation of long long in buf. */
470 vlen
= ll2string((char*)vbuf
,sizeof(vbuf
),vlong
);
474 minlen
= (vlen
< clen
) ? vlen
: clen
;
475 cmp
= memcmp(vstr
,cstr
,minlen
);
476 if (cmp
== 0) return vlen
-clen
;
480 unsigned int zzlLength(unsigned char *zl
) {
481 return ziplistLen(zl
)/2;
484 /* Move to next entry based on the values in eptr and sptr. Both are set to
485 * NULL when there is no next entry. */
486 void zzlNext(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) {
487 unsigned char *_eptr
, *_sptr
;
488 redisAssert(*eptr
!= NULL
&& *sptr
!= NULL
);
490 _eptr
= ziplistNext(zl
,*sptr
);
492 _sptr
= ziplistNext(zl
,_eptr
);
493 redisAssert(_sptr
!= NULL
);
503 /* Move to the previous entry based on the values in eptr and sptr. Both are
504 * set to NULL when there is no next entry. */
505 void zzlPrev(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) {
506 unsigned char *_eptr
, *_sptr
;
507 redisAssert(*eptr
!= NULL
&& *sptr
!= NULL
);
509 _sptr
= ziplistPrev(zl
,*eptr
);
511 _eptr
= ziplistPrev(zl
,_sptr
);
512 redisAssert(_eptr
!= NULL
);
514 /* No previous entry. */
522 /* Returns if there is a part of the zset is in range. Should only be used
523 * internally by zzlFirstInRange and zzlLastInRange. */
524 int zzlIsInRange(unsigned char *zl
, zrangespec
*range
) {
528 /* Test for ranges that will always be empty. */
529 if (range
->min
> range
->max
||
530 (range
->min
== range
->max
&& (range
->minex
|| range
->maxex
)))
533 p
= ziplistIndex(zl
,-1); /* Last score. */
534 if (p
== NULL
) return 0; /* Empty sorted set */
535 score
= zzlGetScore(p
);
536 if (!zslValueGteMin(score
,range
))
539 p
= ziplistIndex(zl
,1); /* First score. */
540 redisAssert(p
!= NULL
);
541 score
= zzlGetScore(p
);
542 if (!zslValueLteMax(score
,range
))
548 /* Find pointer to the first element contained in the specified range.
549 * Returns NULL when no element is contained in the range. */
550 unsigned char *zzlFirstInRange(unsigned char *zl
, zrangespec range
) {
551 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
554 /* If everything is out of range, return early. */
555 if (!zzlIsInRange(zl
,&range
)) return NULL
;
557 while (eptr
!= NULL
) {
558 sptr
= ziplistNext(zl
,eptr
);
559 redisAssert(sptr
!= NULL
);
561 score
= zzlGetScore(sptr
);
562 if (zslValueGteMin(score
,&range
)) {
563 /* Check if score <= max. */
564 if (zslValueLteMax(score
,&range
))
569 /* Move to next element. */
570 eptr
= ziplistNext(zl
,sptr
);
576 /* Find pointer to the last element contained in the specified range.
577 * Returns NULL when no element is contained in the range. */
578 unsigned char *zzlLastInRange(unsigned char *zl
, zrangespec range
) {
579 unsigned char *eptr
= ziplistIndex(zl
,-2), *sptr
;
582 /* If everything is out of range, return early. */
583 if (!zzlIsInRange(zl
,&range
)) return NULL
;
585 while (eptr
!= NULL
) {
586 sptr
= ziplistNext(zl
,eptr
);
587 redisAssert(sptr
!= NULL
);
589 score
= zzlGetScore(sptr
);
590 if (zslValueLteMax(score
,&range
)) {
591 /* Check if score >= min. */
592 if (zslValueGteMin(score
,&range
))
597 /* Move to previous element by moving to the score of previous element.
598 * When this returns NULL, we know there also is no element. */
599 sptr
= ziplistPrev(zl
,eptr
);
601 redisAssert((eptr
= ziplistPrev(zl
,sptr
)) != NULL
);
609 unsigned char *zzlFind(unsigned char *zl
, robj
*ele
, double *score
) {
610 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
612 ele
= getDecodedObject(ele
);
613 while (eptr
!= NULL
) {
614 sptr
= ziplistNext(zl
,eptr
);
615 redisAssertWithInfo(NULL
,ele
,sptr
!= NULL
);
617 if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
))) {
618 /* Matching element, pull out score. */
619 if (score
!= NULL
) *score
= zzlGetScore(sptr
);
624 /* Move to next element. */
625 eptr
= ziplistNext(zl
,sptr
);
632 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
633 * don't want to modify the one given as argument. */
634 unsigned char *zzlDelete(unsigned char *zl
, unsigned char *eptr
) {
635 unsigned char *p
= eptr
;
637 /* TODO: add function to ziplist API to delete N elements from offset. */
638 zl
= ziplistDelete(zl
,&p
);
639 zl
= ziplistDelete(zl
,&p
);
643 unsigned char *zzlInsertAt(unsigned char *zl
, unsigned char *eptr
, robj
*ele
, double score
) {
649 redisAssertWithInfo(NULL
,ele
,ele
->encoding
== REDIS_ENCODING_RAW
);
650 scorelen
= d2string(scorebuf
,sizeof(scorebuf
),score
);
652 zl
= ziplistPush(zl
,ele
->ptr
,sdslen(ele
->ptr
),ZIPLIST_TAIL
);
653 zl
= ziplistPush(zl
,(unsigned char*)scorebuf
,scorelen
,ZIPLIST_TAIL
);
655 /* Keep offset relative to zl, as it might be re-allocated. */
657 zl
= ziplistInsert(zl
,eptr
,ele
->ptr
,sdslen(ele
->ptr
));
660 /* Insert score after the element. */
661 redisAssertWithInfo(NULL
,ele
,(sptr
= ziplistNext(zl
,eptr
)) != NULL
);
662 zl
= ziplistInsert(zl
,sptr
,(unsigned char*)scorebuf
,scorelen
);
668 /* Insert (element,score) pair in ziplist. This function assumes the element is
669 * not yet present in the list. */
670 unsigned char *zzlInsert(unsigned char *zl
, robj
*ele
, double score
) {
671 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
674 ele
= getDecodedObject(ele
);
675 while (eptr
!= NULL
) {
676 sptr
= ziplistNext(zl
,eptr
);
677 redisAssertWithInfo(NULL
,ele
,sptr
!= NULL
);
678 s
= zzlGetScore(sptr
);
681 /* First element with score larger than score for element to be
682 * inserted. This means we should take its spot in the list to
683 * maintain ordering. */
684 zl
= zzlInsertAt(zl
,eptr
,ele
,score
);
686 } else if (s
== score
) {
687 /* Ensure lexicographical ordering for elements. */
688 if (zzlCompareElements(eptr
,ele
->ptr
,sdslen(ele
->ptr
)) > 0) {
689 zl
= zzlInsertAt(zl
,eptr
,ele
,score
);
694 /* Move to next element. */
695 eptr
= ziplistNext(zl
,sptr
);
698 /* Push on tail of list when it was not yet inserted. */
700 zl
= zzlInsertAt(zl
,NULL
,ele
,score
);
706 unsigned char *zzlDeleteRangeByScore(unsigned char *zl
, zrangespec range
, unsigned long *deleted
) {
707 unsigned char *eptr
, *sptr
;
709 unsigned long num
= 0;
711 if (deleted
!= NULL
) *deleted
= 0;
713 eptr
= zzlFirstInRange(zl
,range
);
714 if (eptr
== NULL
) return zl
;
716 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
717 * byte and ziplistNext will return NULL. */
718 while ((sptr
= ziplistNext(zl
,eptr
)) != NULL
) {
719 score
= zzlGetScore(sptr
);
720 if (zslValueLteMax(score
,&range
)) {
721 /* Delete both the element and the score. */
722 zl
= ziplistDelete(zl
,&eptr
);
723 zl
= ziplistDelete(zl
,&eptr
);
726 /* No longer in range. */
731 if (deleted
!= NULL
) *deleted
= num
;
735 /* Delete all the elements with rank between start and end from the skiplist.
736 * Start and end are inclusive. Note that start and end need to be 1-based */
737 unsigned char *zzlDeleteRangeByRank(unsigned char *zl
, unsigned int start
, unsigned int end
, unsigned long *deleted
) {
738 unsigned int num
= (end
-start
)+1;
739 if (deleted
) *deleted
= num
;
740 zl
= ziplistDeleteRange(zl
,2*(start
-1),2*num
);
744 /*-----------------------------------------------------------------------------
745 * Common sorted set API
746 *----------------------------------------------------------------------------*/
748 unsigned int zsetLength(robj
*zobj
) {
750 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
751 length
= zzlLength(zobj
->ptr
);
752 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
753 length
= ((zset
*)zobj
->ptr
)->zsl
->length
;
755 redisPanic("Unknown sorted set encoding");
760 void zsetConvert(robj
*zobj
, int encoding
) {
762 zskiplistNode
*node
, *next
;
766 if (zobj
->encoding
== encoding
) return;
767 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
768 unsigned char *zl
= zobj
->ptr
;
769 unsigned char *eptr
, *sptr
;
774 if (encoding
!= REDIS_ENCODING_SKIPLIST
)
775 redisPanic("Unknown target encoding");
777 zs
= zmalloc(sizeof(*zs
));
778 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
779 zs
->zsl
= zslCreate();
781 eptr
= ziplistIndex(zl
,0);
782 redisAssertWithInfo(NULL
,zobj
,eptr
!= NULL
);
783 sptr
= ziplistNext(zl
,eptr
);
784 redisAssertWithInfo(NULL
,zobj
,sptr
!= NULL
);
786 while (eptr
!= NULL
) {
787 score
= zzlGetScore(sptr
);
788 redisAssertWithInfo(NULL
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
790 ele
= createStringObjectFromLongLong(vlong
);
792 ele
= createStringObject((char*)vstr
,vlen
);
794 /* Has incremented refcount since it was just created. */
795 node
= zslInsert(zs
->zsl
,score
,ele
);
796 redisAssertWithInfo(NULL
,zobj
,dictAdd(zs
->dict
,ele
,&node
->score
) == DICT_OK
);
797 incrRefCount(ele
); /* Added to dictionary. */
798 zzlNext(zl
,&eptr
,&sptr
);
803 zobj
->encoding
= REDIS_ENCODING_SKIPLIST
;
804 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
805 unsigned char *zl
= ziplistNew();
807 if (encoding
!= REDIS_ENCODING_ZIPLIST
)
808 redisPanic("Unknown target encoding");
810 /* Approach similar to zslFree(), since we want to free the skiplist at
811 * the same time as creating the ziplist. */
813 dictRelease(zs
->dict
);
814 node
= zs
->zsl
->header
->level
[0].forward
;
815 zfree(zs
->zsl
->header
);
819 ele
= getDecodedObject(node
->obj
);
820 zl
= zzlInsertAt(zl
,NULL
,ele
,node
->score
);
823 next
= node
->level
[0].forward
;
830 zobj
->encoding
= REDIS_ENCODING_ZIPLIST
;
832 redisPanic("Unknown sorted set encoding");
836 /*-----------------------------------------------------------------------------
837 * Sorted set commands
838 *----------------------------------------------------------------------------*/
840 /* This generic command implements both ZADD and ZINCRBY. */
841 void zaddGenericCommand(redisClient
*c
, int incr
) {
842 static char *nanerr
= "resulting score is not a number (NaN)";
843 robj
*key
= c
->argv
[1];
847 double score
= 0, *scores
, curscore
= 0.0;
848 int j
, elements
= (c
->argc
-2)/2;
852 addReply(c
,shared
.syntaxerr
);
856 /* Start parsing all the scores, we need to emit any syntax error
857 * before executing additions to the sorted set, as the command should
858 * either execute fully or nothing at all. */
859 scores
= zmalloc(sizeof(double)*elements
);
860 for (j
= 0; j
< elements
; j
++) {
861 if (getDoubleFromObjectOrReply(c
,c
->argv
[2+j
*2],&scores
[j
],NULL
)
869 /* Lookup the key and create the sorted set if does not exist. */
870 zobj
= lookupKeyWrite(c
->db
,key
);
872 if (server
.zset_max_ziplist_entries
== 0 ||
873 server
.zset_max_ziplist_value
< sdslen(c
->argv
[3]->ptr
))
875 zobj
= createZsetObject();
877 zobj
= createZsetZiplistObject();
879 dbAdd(c
->db
,key
,zobj
);
881 if (zobj
->type
!= REDIS_ZSET
) {
882 addReply(c
,shared
.wrongtypeerr
);
888 for (j
= 0; j
< elements
; j
++) {
891 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
894 /* Prefer non-encoded element when dealing with ziplists. */
895 ele
= c
->argv
[3+j
*2];
896 if ((eptr
= zzlFind(zobj
->ptr
,ele
,&curscore
)) != NULL
) {
900 addReplyError(c
,nanerr
);
901 /* Don't need to check if the sorted set is empty
902 * because we know it has at least one element. */
908 /* Remove and re-insert when score changed. */
909 if (score
!= curscore
) {
910 zobj
->ptr
= zzlDelete(zobj
->ptr
,eptr
);
911 zobj
->ptr
= zzlInsert(zobj
->ptr
,ele
,score
);
913 signalModifiedKey(c
->db
,key
);
917 /* Optimize: check if the element is too large or the list
918 * becomes too long *before* executing zzlInsert. */
919 zobj
->ptr
= zzlInsert(zobj
->ptr
,ele
,score
);
920 if (zzlLength(zobj
->ptr
) > server
.zset_max_ziplist_entries
)
921 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
);
922 if (sdslen(ele
->ptr
) > server
.zset_max_ziplist_value
)
923 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
);
925 signalModifiedKey(c
->db
,key
);
929 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
930 zset
*zs
= zobj
->ptr
;
931 zskiplistNode
*znode
;
934 ele
= c
->argv
[3+j
*2] = tryObjectEncoding(c
->argv
[3+j
*2]);
935 de
= dictFind(zs
->dict
,ele
);
937 curobj
= dictGetKey(de
);
938 curscore
= *(double*)dictGetVal(de
);
943 addReplyError(c
,nanerr
);
944 /* Don't need to check if the sorted set is empty
945 * because we know it has at least one element. */
951 /* Remove and re-insert when score changed. We can safely
952 * delete the key object from the skiplist, since the
953 * dictionary still has a reference to it. */
954 if (score
!= curscore
) {
955 redisAssertWithInfo(c
,curobj
,zslDelete(zs
->zsl
,curscore
,curobj
));
956 znode
= zslInsert(zs
->zsl
,score
,curobj
);
957 incrRefCount(curobj
); /* Re-inserted in skiplist. */
958 dictGetVal(de
) = &znode
->score
; /* Update score ptr. */
960 signalModifiedKey(c
->db
,key
);
964 znode
= zslInsert(zs
->zsl
,score
,ele
);
965 incrRefCount(ele
); /* Inserted in skiplist. */
966 redisAssertWithInfo(c
,NULL
,dictAdd(zs
->dict
,ele
,&znode
->score
) == DICT_OK
);
967 incrRefCount(ele
); /* Added to dictionary. */
969 signalModifiedKey(c
->db
,key
);
974 redisPanic("Unknown sorted set encoding");
978 if (incr
) /* ZINCRBY */
979 addReplyDouble(c
,score
);
981 addReplyLongLong(c
,added
);
984 void zaddCommand(redisClient
*c
) {
985 zaddGenericCommand(c
,0);
988 void zincrbyCommand(redisClient
*c
) {
989 zaddGenericCommand(c
,1);
992 void zremCommand(redisClient
*c
) {
993 robj
*key
= c
->argv
[1];
997 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
998 checkType(c
,zobj
,REDIS_ZSET
)) return;
1000 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1001 unsigned char *eptr
;
1003 for (j
= 2; j
< c
->argc
; j
++) {
1004 if ((eptr
= zzlFind(zobj
->ptr
,c
->argv
[j
],NULL
)) != NULL
) {
1006 zobj
->ptr
= zzlDelete(zobj
->ptr
,eptr
);
1007 if (zzlLength(zobj
->ptr
) == 0) {
1008 dbDelete(c
->db
,key
);
1013 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1014 zset
*zs
= zobj
->ptr
;
1018 for (j
= 2; j
< c
->argc
; j
++) {
1019 de
= dictFind(zs
->dict
,c
->argv
[j
]);
1023 /* Delete from the skiplist */
1024 score
= *(double*)dictGetVal(de
);
1025 redisAssertWithInfo(c
,c
->argv
[j
],zslDelete(zs
->zsl
,score
,c
->argv
[j
]));
1027 /* Delete from the hash table */
1028 dictDelete(zs
->dict
,c
->argv
[j
]);
1029 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1030 if (dictSize(zs
->dict
) == 0) {
1031 dbDelete(c
->db
,key
);
1037 redisPanic("Unknown sorted set encoding");
1041 signalModifiedKey(c
->db
,key
);
1042 server
.dirty
+= deleted
;
1044 addReplyLongLong(c
,deleted
);
1047 void zremrangebyscoreCommand(redisClient
*c
) {
1048 robj
*key
= c
->argv
[1];
1051 unsigned long deleted
;
1053 /* Parse the range arguments. */
1054 if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) {
1055 addReplyError(c
,"min or max is not a float");
1059 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
1060 checkType(c
,zobj
,REDIS_ZSET
)) return;
1062 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1063 zobj
->ptr
= zzlDeleteRangeByScore(zobj
->ptr
,range
,&deleted
);
1064 if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
);
1065 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1066 zset
*zs
= zobj
->ptr
;
1067 deleted
= zslDeleteRangeByScore(zs
->zsl
,range
,zs
->dict
);
1068 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1069 if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
);
1071 redisPanic("Unknown sorted set encoding");
1074 if (deleted
) signalModifiedKey(c
->db
,key
);
1075 server
.dirty
+= deleted
;
1076 addReplyLongLong(c
,deleted
);
1079 void zremrangebyrankCommand(redisClient
*c
) {
1080 robj
*key
= c
->argv
[1];
1085 unsigned long deleted
;
1087 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
1088 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
1090 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
1091 checkType(c
,zobj
,REDIS_ZSET
)) return;
1093 /* Sanitize indexes. */
1094 llen
= zsetLength(zobj
);
1095 if (start
< 0) start
= llen
+start
;
1096 if (end
< 0) end
= llen
+end
;
1097 if (start
< 0) start
= 0;
1099 /* Invariant: start >= 0, so this test will be true when end < 0.
1100 * The range is empty when start > end or start >= length. */
1101 if (start
> end
|| start
>= llen
) {
1102 addReply(c
,shared
.czero
);
1105 if (end
>= llen
) end
= llen
-1;
1107 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1108 /* Correct for 1-based rank. */
1109 zobj
->ptr
= zzlDeleteRangeByRank(zobj
->ptr
,start
+1,end
+1,&deleted
);
1110 if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
);
1111 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1112 zset
*zs
= zobj
->ptr
;
1114 /* Correct for 1-based rank. */
1115 deleted
= zslDeleteRangeByRank(zs
->zsl
,start
+1,end
+1,zs
->dict
);
1116 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1117 if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
);
1119 redisPanic("Unknown sorted set encoding");
1122 if (deleted
) signalModifiedKey(c
->db
,key
);
1123 server
.dirty
+= deleted
;
1124 addReplyLongLong(c
,deleted
);
1129 int type
; /* Set, sorted set */
1134 /* Set iterators. */
1147 /* Sorted set iterators. */
1151 unsigned char *eptr
, *sptr
;
1155 zskiplistNode
*node
;
1162 /* Use dirty flags for pointers that need to be cleaned up in the next
1163 * iteration over the zsetopval. The dirty flag for the long long value is
1164 * special, since long long values don't need cleanup. Instead, it means that
1165 * we already checked that "ell" holds a long long, or tried to convert another
1166 * representation into a long long value. When this was successful,
1167 * OPVAL_VALID_LL is set as well. */
1168 #define OPVAL_DIRTY_ROBJ 1
1169 #define OPVAL_DIRTY_LL 2
1170 #define OPVAL_VALID_LL 4
1172 /* Store value retrieved from the iterator. */
1175 unsigned char _buf
[32]; /* Private buffer. */
1177 unsigned char *estr
;
1183 typedef union _iterset iterset
;
1184 typedef union _iterzset iterzset
;
1186 void zuiInitIterator(zsetopsrc
*op
) {
1187 if (op
->subject
== NULL
)
1190 if (op
->type
== REDIS_SET
) {
1191 iterset
*it
= &op
->iter
.set
;
1192 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1193 it
->is
.is
= op
->subject
->ptr
;
1195 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1196 it
->ht
.dict
= op
->subject
->ptr
;
1197 it
->ht
.di
= dictGetIterator(op
->subject
->ptr
);
1198 it
->ht
.de
= dictNext(it
->ht
.di
);
1200 redisPanic("Unknown set encoding");
1202 } else if (op
->type
== REDIS_ZSET
) {
1203 iterzset
*it
= &op
->iter
.zset
;
1204 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1205 it
->zl
.zl
= op
->subject
->ptr
;
1206 it
->zl
.eptr
= ziplistIndex(it
->zl
.zl
,0);
1207 if (it
->zl
.eptr
!= NULL
) {
1208 it
->zl
.sptr
= ziplistNext(it
->zl
.zl
,it
->zl
.eptr
);
1209 redisAssert(it
->zl
.sptr
!= NULL
);
1211 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1212 it
->sl
.zs
= op
->subject
->ptr
;
1213 it
->sl
.node
= it
->sl
.zs
->zsl
->header
->level
[0].forward
;
1215 redisPanic("Unknown sorted set encoding");
1218 redisPanic("Unsupported type");
1222 void zuiClearIterator(zsetopsrc
*op
) {
1223 if (op
->subject
== NULL
)
1226 if (op
->type
== REDIS_SET
) {
1227 iterset
*it
= &op
->iter
.set
;
1228 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1229 REDIS_NOTUSED(it
); /* skip */
1230 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1231 dictReleaseIterator(it
->ht
.di
);
1233 redisPanic("Unknown set encoding");
1235 } else if (op
->type
== REDIS_ZSET
) {
1236 iterzset
*it
= &op
->iter
.zset
;
1237 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1238 REDIS_NOTUSED(it
); /* skip */
1239 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1240 REDIS_NOTUSED(it
); /* skip */
1242 redisPanic("Unknown sorted set encoding");
1245 redisPanic("Unsupported type");
1249 int zuiLength(zsetopsrc
*op
) {
1250 if (op
->subject
== NULL
)
1253 if (op
->type
== REDIS_SET
) {
1254 iterset
*it
= &op
->iter
.set
;
1255 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1256 return intsetLen(it
->is
.is
);
1257 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1258 return dictSize(it
->ht
.dict
);
1260 redisPanic("Unknown set encoding");
1262 } else if (op
->type
== REDIS_ZSET
) {
1263 iterzset
*it
= &op
->iter
.zset
;
1264 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1265 return zzlLength(it
->zl
.zl
);
1266 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1267 return it
->sl
.zs
->zsl
->length
;
1269 redisPanic("Unknown sorted set encoding");
1272 redisPanic("Unsupported type");
1276 /* Check if the current value is valid. If so, store it in the passed structure
1277 * and move to the next element. If not valid, this means we have reached the
1278 * end of the structure and can abort. */
1279 int zuiNext(zsetopsrc
*op
, zsetopval
*val
) {
1280 if (op
->subject
== NULL
)
1283 if (val
->flags
& OPVAL_DIRTY_ROBJ
)
1284 decrRefCount(val
->ele
);
1286 memset(val
,0,sizeof(zsetopval
));
1288 if (op
->type
== REDIS_SET
) {
1289 iterset
*it
= &op
->iter
.set
;
1290 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1293 if (!intsetGet(it
->is
.is
,it
->is
.ii
,&ell
))
1298 /* Move to next element. */
1300 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1301 if (it
->ht
.de
== NULL
)
1303 val
->ele
= dictGetKey(it
->ht
.de
);
1306 /* Move to next element. */
1307 it
->ht
.de
= dictNext(it
->ht
.di
);
1309 redisPanic("Unknown set encoding");
1311 } else if (op
->type
== REDIS_ZSET
) {
1312 iterzset
*it
= &op
->iter
.zset
;
1313 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1314 /* No need to check both, but better be explicit. */
1315 if (it
->zl
.eptr
== NULL
|| it
->zl
.sptr
== NULL
)
1317 redisAssert(ziplistGet(it
->zl
.eptr
,&val
->estr
,&val
->elen
,&val
->ell
));
1318 val
->score
= zzlGetScore(it
->zl
.sptr
);
1320 /* Move to next element. */
1321 zzlNext(it
->zl
.zl
,&it
->zl
.eptr
,&it
->zl
.sptr
);
1322 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1323 if (it
->sl
.node
== NULL
)
1325 val
->ele
= it
->sl
.node
->obj
;
1326 val
->score
= it
->sl
.node
->score
;
1328 /* Move to next element. */
1329 it
->sl
.node
= it
->sl
.node
->level
[0].forward
;
1331 redisPanic("Unknown sorted set encoding");
1334 redisPanic("Unsupported type");
1339 int zuiLongLongFromValue(zsetopval
*val
) {
1340 if (!(val
->flags
& OPVAL_DIRTY_LL
)) {
1341 val
->flags
|= OPVAL_DIRTY_LL
;
1343 if (val
->ele
!= NULL
) {
1344 if (val
->ele
->encoding
== REDIS_ENCODING_INT
) {
1345 val
->ell
= (long)val
->ele
->ptr
;
1346 val
->flags
|= OPVAL_VALID_LL
;
1347 } else if (val
->ele
->encoding
== REDIS_ENCODING_RAW
) {
1348 if (string2ll(val
->ele
->ptr
,sdslen(val
->ele
->ptr
),&val
->ell
))
1349 val
->flags
|= OPVAL_VALID_LL
;
1351 redisPanic("Unsupported element encoding");
1353 } else if (val
->estr
!= NULL
) {
1354 if (string2ll((char*)val
->estr
,val
->elen
,&val
->ell
))
1355 val
->flags
|= OPVAL_VALID_LL
;
1357 /* The long long was already set, flag as valid. */
1358 val
->flags
|= OPVAL_VALID_LL
;
1361 return val
->flags
& OPVAL_VALID_LL
;
1364 robj
*zuiObjectFromValue(zsetopval
*val
) {
1365 if (val
->ele
== NULL
) {
1366 if (val
->estr
!= NULL
) {
1367 val
->ele
= createStringObject((char*)val
->estr
,val
->elen
);
1369 val
->ele
= createStringObjectFromLongLong(val
->ell
);
1371 val
->flags
|= OPVAL_DIRTY_ROBJ
;
1376 int zuiBufferFromValue(zsetopval
*val
) {
1377 if (val
->estr
== NULL
) {
1378 if (val
->ele
!= NULL
) {
1379 if (val
->ele
->encoding
== REDIS_ENCODING_INT
) {
1380 val
->elen
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),(long)val
->ele
->ptr
);
1381 val
->estr
= val
->_buf
;
1382 } else if (val
->ele
->encoding
== REDIS_ENCODING_RAW
) {
1383 val
->elen
= sdslen(val
->ele
->ptr
);
1384 val
->estr
= val
->ele
->ptr
;
1386 redisPanic("Unsupported element encoding");
1389 val
->elen
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),val
->ell
);
1390 val
->estr
= val
->_buf
;
1396 /* Find value pointed to by val in the source pointer to by op. When found,
1397 * return 1 and store its score in target. Return 0 otherwise. */
1398 int zuiFind(zsetopsrc
*op
, zsetopval
*val
, double *score
) {
1399 if (op
->subject
== NULL
)
1402 if (op
->type
== REDIS_SET
) {
1403 iterset
*it
= &op
->iter
.set
;
1405 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1406 if (zuiLongLongFromValue(val
) && intsetFind(it
->is
.is
,val
->ell
)) {
1412 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1413 zuiObjectFromValue(val
);
1414 if (dictFind(it
->ht
.dict
,val
->ele
) != NULL
) {
1421 redisPanic("Unknown set encoding");
1423 } else if (op
->type
== REDIS_ZSET
) {
1424 iterzset
*it
= &op
->iter
.zset
;
1425 zuiObjectFromValue(val
);
1427 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1428 if (zzlFind(it
->zl
.zl
,val
->ele
,score
) != NULL
) {
1429 /* Score is already set by zzlFind. */
1434 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1436 if ((de
= dictFind(it
->sl
.zs
->dict
,val
->ele
)) != NULL
) {
1437 *score
= *(double*)dictGetVal(de
);
1443 redisPanic("Unknown sorted set encoding");
1446 redisPanic("Unsupported type");
1450 int zuiCompareByCardinality(const void *s1
, const void *s2
) {
1451 return zuiLength((zsetopsrc
*)s1
) - zuiLength((zsetopsrc
*)s2
);
1454 #define REDIS_AGGR_SUM 1
1455 #define REDIS_AGGR_MIN 2
1456 #define REDIS_AGGR_MAX 3
1457 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
1459 inline static void zunionInterAggregate(double *target
, double val
, int aggregate
) {
1460 if (aggregate
== REDIS_AGGR_SUM
) {
1461 *target
= *target
+ val
;
1462 /* The result of adding two doubles is NaN when one variable
1463 * is +inf and the other is -inf. When these numbers are added,
1464 * we maintain the convention of the result being 0.0. */
1465 if (isnan(*target
)) *target
= 0.0;
1466 } else if (aggregate
== REDIS_AGGR_MIN
) {
1467 *target
= val
< *target
? val
: *target
;
1468 } else if (aggregate
== REDIS_AGGR_MAX
) {
1469 *target
= val
> *target
? val
: *target
;
1472 redisPanic("Unknown ZUNION/INTER aggregate type");
1476 void zunionInterGenericCommand(redisClient
*c
, robj
*dstkey
, int op
) {
1479 int aggregate
= REDIS_AGGR_SUM
;
1483 unsigned int maxelelen
= 0;
1486 zskiplistNode
*znode
;
1489 /* expect setnum input keys to be given */
1490 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &setnum
, NULL
) != REDIS_OK
))
1495 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1499 /* test if the expected number of keys would overflow */
1500 if (3+setnum
> c
->argc
) {
1501 addReply(c
,shared
.syntaxerr
);
1505 /* read keys to be used for input */
1506 src
= zcalloc(sizeof(zsetopsrc
) * setnum
);
1507 for (i
= 0, j
= 3; i
< setnum
; i
++, j
++) {
1508 robj
*obj
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
1510 if (obj
->type
!= REDIS_ZSET
&& obj
->type
!= REDIS_SET
) {
1512 addReply(c
,shared
.wrongtypeerr
);
1516 src
[i
].subject
= obj
;
1517 src
[i
].type
= obj
->type
;
1518 src
[i
].encoding
= obj
->encoding
;
1520 src
[i
].subject
= NULL
;
1523 /* Default all weights to 1. */
1524 src
[i
].weight
= 1.0;
1527 /* parse optional extra arguments */
1529 int remaining
= c
->argc
- j
;
1532 if (remaining
>= (setnum
+ 1) && !strcasecmp(c
->argv
[j
]->ptr
,"weights")) {
1534 for (i
= 0; i
< setnum
; i
++, j
++, remaining
--) {
1535 if (getDoubleFromObjectOrReply(c
,c
->argv
[j
],&src
[i
].weight
,
1536 "weight value is not a float") != REDIS_OK
)
1542 } else if (remaining
>= 2 && !strcasecmp(c
->argv
[j
]->ptr
,"aggregate")) {
1544 if (!strcasecmp(c
->argv
[j
]->ptr
,"sum")) {
1545 aggregate
= REDIS_AGGR_SUM
;
1546 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"min")) {
1547 aggregate
= REDIS_AGGR_MIN
;
1548 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"max")) {
1549 aggregate
= REDIS_AGGR_MAX
;
1552 addReply(c
,shared
.syntaxerr
);
1558 addReply(c
,shared
.syntaxerr
);
1564 for (i
= 0; i
< setnum
; i
++)
1565 zuiInitIterator(&src
[i
]);
1567 /* sort sets from the smallest to largest, this will improve our
1568 * algorithm's performance */
1569 qsort(src
,setnum
,sizeof(zsetopsrc
),zuiCompareByCardinality
);
1571 dstobj
= createZsetObject();
1572 dstzset
= dstobj
->ptr
;
1573 memset(&zval
, 0, sizeof(zval
));
1575 if (op
== REDIS_OP_INTER
) {
1576 /* Skip everything if the smallest input is empty. */
1577 if (zuiLength(&src
[0]) > 0) {
1578 /* Precondition: as src[0] is non-empty and the inputs are ordered
1579 * by size, all src[i > 0] are non-empty too. */
1580 while (zuiNext(&src
[0],&zval
)) {
1581 double score
, value
;
1583 score
= src
[0].weight
* zval
.score
;
1584 if (isnan(score
)) score
= 0;
1586 for (j
= 1; j
< setnum
; j
++) {
1587 /* It is not safe to access the zset we are
1588 * iterating, so explicitly check for equal object. */
1589 if (src
[j
].subject
== src
[0].subject
) {
1590 value
= zval
.score
*src
[j
].weight
;
1591 zunionInterAggregate(&score
,value
,aggregate
);
1592 } else if (zuiFind(&src
[j
],&zval
,&value
)) {
1593 value
*= src
[j
].weight
;
1594 zunionInterAggregate(&score
,value
,aggregate
);
1600 /* Only continue when present in every input. */
1602 tmp
= zuiObjectFromValue(&zval
);
1603 znode
= zslInsert(dstzset
->zsl
,score
,tmp
);
1604 incrRefCount(tmp
); /* added to skiplist */
1605 dictAdd(dstzset
->dict
,tmp
,&znode
->score
);
1606 incrRefCount(tmp
); /* added to dictionary */
1608 if (tmp
->encoding
== REDIS_ENCODING_RAW
)
1609 if (sdslen(tmp
->ptr
) > maxelelen
)
1610 maxelelen
= sdslen(tmp
->ptr
);
1614 } else if (op
== REDIS_OP_UNION
) {
1615 for (i
= 0; i
< setnum
; i
++) {
1616 if (zuiLength(&src
[i
]) == 0)
1619 while (zuiNext(&src
[i
],&zval
)) {
1620 double score
, value
;
1622 /* Skip key when already processed */
1623 if (dictFind(dstzset
->dict
,zuiObjectFromValue(&zval
)) != NULL
)
1626 /* Initialize score */
1627 score
= src
[i
].weight
* zval
.score
;
1628 if (isnan(score
)) score
= 0;
1630 /* Because the inputs are sorted by size, it's only possible
1631 * for sets at larger indices to hold this element. */
1632 for (j
= (i
+1); j
< setnum
; j
++) {
1633 /* It is not safe to access the zset we are
1634 * iterating, so explicitly check for equal object. */
1635 if(src
[j
].subject
== src
[i
].subject
) {
1636 value
= zval
.score
*src
[j
].weight
;
1637 zunionInterAggregate(&score
,value
,aggregate
);
1638 } else if (zuiFind(&src
[j
],&zval
,&value
)) {
1639 value
*= src
[j
].weight
;
1640 zunionInterAggregate(&score
,value
,aggregate
);
1644 tmp
= zuiObjectFromValue(&zval
);
1645 znode
= zslInsert(dstzset
->zsl
,score
,tmp
);
1646 incrRefCount(zval
.ele
); /* added to skiplist */
1647 dictAdd(dstzset
->dict
,tmp
,&znode
->score
);
1648 incrRefCount(zval
.ele
); /* added to dictionary */
1650 if (tmp
->encoding
== REDIS_ENCODING_RAW
)
1651 if (sdslen(tmp
->ptr
) > maxelelen
)
1652 maxelelen
= sdslen(tmp
->ptr
);
1656 redisPanic("Unknown operator");
1659 for (i
= 0; i
< setnum
; i
++)
1660 zuiClearIterator(&src
[i
]);
1662 if (dbDelete(c
->db
,dstkey
)) {
1663 signalModifiedKey(c
->db
,dstkey
);
1667 if (dstzset
->zsl
->length
) {
1668 /* Convert to ziplist when in limits. */
1669 if (dstzset
->zsl
->length
<= server
.zset_max_ziplist_entries
&&
1670 maxelelen
<= server
.zset_max_ziplist_value
)
1671 zsetConvert(dstobj
,REDIS_ENCODING_ZIPLIST
);
1673 dbAdd(c
->db
,dstkey
,dstobj
);
1674 addReplyLongLong(c
,zsetLength(dstobj
));
1675 if (!touched
) signalModifiedKey(c
->db
,dstkey
);
1678 decrRefCount(dstobj
);
1679 addReply(c
,shared
.czero
);
1684 void zunionstoreCommand(redisClient
*c
) {
1685 zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_UNION
);
1688 void zinterstoreCommand(redisClient
*c
) {
1689 zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_INTER
);
1692 void zrangeGenericCommand(redisClient
*c
, int reverse
) {
1693 robj
*key
= c
->argv
[1];
1701 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
1702 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
1704 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
1706 } else if (c
->argc
>= 5) {
1707 addReply(c
,shared
.syntaxerr
);
1711 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
1712 || checkType(c
,zobj
,REDIS_ZSET
)) return;
1714 /* Sanitize indexes. */
1715 llen
= zsetLength(zobj
);
1716 if (start
< 0) start
= llen
+start
;
1717 if (end
< 0) end
= llen
+end
;
1718 if (start
< 0) start
= 0;
1720 /* Invariant: start >= 0, so this test will be true when end < 0.
1721 * The range is empty when start > end or start >= length. */
1722 if (start
> end
|| start
>= llen
) {
1723 addReply(c
,shared
.emptymultibulk
);
1726 if (end
>= llen
) end
= llen
-1;
1727 rangelen
= (end
-start
)+1;
1729 /* Return the result in form of a multi-bulk reply */
1730 addReplyMultiBulkLen(c
, withscores
? (rangelen
*2) : rangelen
);
1732 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1733 unsigned char *zl
= zobj
->ptr
;
1734 unsigned char *eptr
, *sptr
;
1735 unsigned char *vstr
;
1740 eptr
= ziplistIndex(zl
,-2-(2*start
));
1742 eptr
= ziplistIndex(zl
,2*start
);
1744 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
1745 sptr
= ziplistNext(zl
,eptr
);
1747 while (rangelen
--) {
1748 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
&& sptr
!= NULL
);
1749 redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
1751 addReplyBulkLongLong(c
,vlong
);
1753 addReplyBulkCBuffer(c
,vstr
,vlen
);
1756 addReplyDouble(c
,zzlGetScore(sptr
));
1759 zzlPrev(zl
,&eptr
,&sptr
);
1761 zzlNext(zl
,&eptr
,&sptr
);
1764 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1765 zset
*zs
= zobj
->ptr
;
1766 zskiplist
*zsl
= zs
->zsl
;
1770 /* Check if starting point is trivial, before doing log(N) lookup. */
1774 ln
= zslGetElementByRank(zsl
,llen
-start
);
1776 ln
= zsl
->header
->level
[0].forward
;
1778 ln
= zslGetElementByRank(zsl
,start
+1);
1782 redisAssertWithInfo(c
,zobj
,ln
!= NULL
);
1784 addReplyBulk(c
,ele
);
1786 addReplyDouble(c
,ln
->score
);
1787 ln
= reverse
? ln
->backward
: ln
->level
[0].forward
;
1790 redisPanic("Unknown sorted set encoding");
1794 void zrangeCommand(redisClient
*c
) {
1795 zrangeGenericCommand(c
,0);
1798 void zrevrangeCommand(redisClient
*c
) {
1799 zrangeGenericCommand(c
,1);
1802 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
1803 void genericZrangebyscoreCommand(redisClient
*c
, int reverse
) {
1805 robj
*key
= c
->argv
[1];
1807 long offset
= 0, limit
= -1;
1809 unsigned long rangelen
= 0;
1810 void *replylen
= NULL
;
1813 /* Parse the range arguments. */
1815 /* Range is given as [max,min] */
1816 maxidx
= 2; minidx
= 3;
1818 /* Range is given as [min,max] */
1819 minidx
= 2; maxidx
= 3;
1822 if (zslParseRange(c
->argv
[minidx
],c
->argv
[maxidx
],&range
) != REDIS_OK
) {
1823 addReplyError(c
,"min or max is not a float");
1827 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1828 * 4 arguments, so we'll never enter the following code path. */
1830 int remaining
= c
->argc
- 4;
1834 if (remaining
>= 1 && !strcasecmp(c
->argv
[pos
]->ptr
,"withscores")) {
1837 } else if (remaining
>= 3 && !strcasecmp(c
->argv
[pos
]->ptr
,"limit")) {
1838 if ((getLongFromObjectOrReply(c
, c
->argv
[pos
+1], &offset
, NULL
) != REDIS_OK
) ||
1839 (getLongFromObjectOrReply(c
, c
->argv
[pos
+2], &limit
, NULL
) != REDIS_OK
)) return;
1840 pos
+= 3; remaining
-= 3;
1842 addReply(c
,shared
.syntaxerr
);
1848 /* Ok, lookup the key and get the range */
1849 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
||
1850 checkType(c
,zobj
,REDIS_ZSET
)) return;
1852 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1853 unsigned char *zl
= zobj
->ptr
;
1854 unsigned char *eptr
, *sptr
;
1855 unsigned char *vstr
;
1860 /* If reversed, get the last node in range as starting point. */
1862 eptr
= zzlLastInRange(zl
,range
);
1864 eptr
= zzlFirstInRange(zl
,range
);
1867 /* No "first" element in the specified interval. */
1869 addReply(c
, shared
.emptymultibulk
);
1873 /* Get score pointer for the first element. */
1874 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
1875 sptr
= ziplistNext(zl
,eptr
);
1877 /* We don't know in advance how many matching elements there are in the
1878 * list, so we push this object that will represent the multi-bulk
1879 * length in the output buffer, and will "fix" it later */
1880 replylen
= addDeferredMultiBulkLength(c
);
1882 /* If there is an offset, just traverse the number of elements without
1883 * checking the score because that is done in the next loop. */
1884 while (eptr
&& offset
--) {
1886 zzlPrev(zl
,&eptr
,&sptr
);
1888 zzlNext(zl
,&eptr
,&sptr
);
1892 while (eptr
&& limit
--) {
1893 score
= zzlGetScore(sptr
);
1895 /* Abort when the node is no longer in range. */
1897 if (!zslValueGteMin(score
,&range
)) break;
1899 if (!zslValueLteMax(score
,&range
)) break;
1902 /* We know the element exists, so ziplistGet should always succeed */
1903 redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
1907 addReplyBulkLongLong(c
,vlong
);
1909 addReplyBulkCBuffer(c
,vstr
,vlen
);
1913 addReplyDouble(c
,score
);
1916 /* Move to next node */
1918 zzlPrev(zl
,&eptr
,&sptr
);
1920 zzlNext(zl
,&eptr
,&sptr
);
1923 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1924 zset
*zs
= zobj
->ptr
;
1925 zskiplist
*zsl
= zs
->zsl
;
1928 /* If reversed, get the last node in range as starting point. */
1930 ln
= zslLastInRange(zsl
,range
);
1932 ln
= zslFirstInRange(zsl
,range
);
1935 /* No "first" element in the specified interval. */
1937 addReply(c
, shared
.emptymultibulk
);
1941 /* We don't know in advance how many matching elements there are in the
1942 * list, so we push this object that will represent the multi-bulk
1943 * length in the output buffer, and will "fix" it later */
1944 replylen
= addDeferredMultiBulkLength(c
);
1946 /* If there is an offset, just traverse the number of elements without
1947 * checking the score because that is done in the next loop. */
1948 while (ln
&& offset
--) {
1952 ln
= ln
->level
[0].forward
;
1956 while (ln
&& limit
--) {
1957 /* Abort when the node is no longer in range. */
1959 if (!zslValueGteMin(ln
->score
,&range
)) break;
1961 if (!zslValueLteMax(ln
->score
,&range
)) break;
1965 addReplyBulk(c
,ln
->obj
);
1968 addReplyDouble(c
,ln
->score
);
1971 /* Move to next node */
1975 ln
= ln
->level
[0].forward
;
1979 redisPanic("Unknown sorted set encoding");
1986 setDeferredMultiBulkLength(c
, replylen
, rangelen
);
1989 void zrangebyscoreCommand(redisClient
*c
) {
1990 genericZrangebyscoreCommand(c
,0);
1993 void zrevrangebyscoreCommand(redisClient
*c
) {
1994 genericZrangebyscoreCommand(c
,1);
1997 void zcountCommand(redisClient
*c
) {
1998 robj
*key
= c
->argv
[1];
2003 /* Parse the range arguments */
2004 if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) {
2005 addReplyError(c
,"min or max is not a float");
2009 /* Lookup the sorted set */
2010 if ((zobj
= lookupKeyReadOrReply(c
, key
, shared
.czero
)) == NULL
||
2011 checkType(c
, zobj
, REDIS_ZSET
)) return;
2013 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
2014 unsigned char *zl
= zobj
->ptr
;
2015 unsigned char *eptr
, *sptr
;
2018 /* Use the first element in range as the starting point */
2019 eptr
= zzlFirstInRange(zl
,range
);
2021 /* No "first" element */
2023 addReply(c
, shared
.czero
);
2027 /* First element is in range */
2028 sptr
= ziplistNext(zl
,eptr
);
2029 score
= zzlGetScore(sptr
);
2030 redisAssertWithInfo(c
,zobj
,zslValueLteMax(score
,&range
));
2032 /* Iterate over elements in range */
2034 score
= zzlGetScore(sptr
);
2036 /* Abort when the node is no longer in range. */
2037 if (!zslValueLteMax(score
,&range
)) {
2041 zzlNext(zl
,&eptr
,&sptr
);
2044 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2045 zset
*zs
= zobj
->ptr
;
2046 zskiplist
*zsl
= zs
->zsl
;
2050 /* Find first element in range */
2051 zn
= zslFirstInRange(zsl
, range
);
2053 /* Use rank of first element, if any, to determine preliminary count */
2055 rank
= zslGetRank(zsl
, zn
->score
, zn
->obj
);
2056 count
= (zsl
->length
- (rank
- 1));
2058 /* Find last element in range */
2059 zn
= zslLastInRange(zsl
, range
);
2061 /* Use rank of last element, if any, to determine the actual count */
2063 rank
= zslGetRank(zsl
, zn
->score
, zn
->obj
);
2064 count
-= (zsl
->length
- rank
);
2068 redisPanic("Unknown sorted set encoding");
2071 addReplyLongLong(c
, count
);
2074 void zcardCommand(redisClient
*c
) {
2075 robj
*key
= c
->argv
[1];
2078 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.czero
)) == NULL
||
2079 checkType(c
,zobj
,REDIS_ZSET
)) return;
2081 addReplyLongLong(c
,zsetLength(zobj
));
2084 void zscoreCommand(redisClient
*c
) {
2085 robj
*key
= c
->argv
[1];
2089 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL
||
2090 checkType(c
,zobj
,REDIS_ZSET
)) return;
2092 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
2093 if (zzlFind(zobj
->ptr
,c
->argv
[2],&score
) != NULL
)
2094 addReplyDouble(c
,score
);
2096 addReply(c
,shared
.nullbulk
);
2097 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2098 zset
*zs
= zobj
->ptr
;
2101 c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
2102 de
= dictFind(zs
->dict
,c
->argv
[2]);
2104 score
= *(double*)dictGetVal(de
);
2105 addReplyDouble(c
,score
);
2107 addReply(c
,shared
.nullbulk
);
2110 redisPanic("Unknown sorted set encoding");
2114 void zrankGenericCommand(redisClient
*c
, int reverse
) {
2115 robj
*key
= c
->argv
[1];
2116 robj
*ele
= c
->argv
[2];
2121 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL
||
2122 checkType(c
,zobj
,REDIS_ZSET
)) return;
2123 llen
= zsetLength(zobj
);
2125 redisAssertWithInfo(c
,ele
,ele
->encoding
== REDIS_ENCODING_RAW
);
2126 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
2127 unsigned char *zl
= zobj
->ptr
;
2128 unsigned char *eptr
, *sptr
;
2130 eptr
= ziplistIndex(zl
,0);
2131 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
2132 sptr
= ziplistNext(zl
,eptr
);
2133 redisAssertWithInfo(c
,zobj
,sptr
!= NULL
);
2136 while(eptr
!= NULL
) {
2137 if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
)))
2140 zzlNext(zl
,&eptr
,&sptr
);
2145 addReplyLongLong(c
,llen
-rank
);
2147 addReplyLongLong(c
,rank
-1);
2149 addReply(c
,shared
.nullbulk
);
2151 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2152 zset
*zs
= zobj
->ptr
;
2153 zskiplist
*zsl
= zs
->zsl
;
2157 ele
= c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
2158 de
= dictFind(zs
->dict
,ele
);
2160 score
= *(double*)dictGetVal(de
);
2161 rank
= zslGetRank(zsl
,score
,ele
);
2162 redisAssertWithInfo(c
,ele
,rank
); /* Existing elements always have a rank. */
2164 addReplyLongLong(c
,llen
-rank
);
2166 addReplyLongLong(c
,rank
-1);
2168 addReply(c
,shared
.nullbulk
);
2171 redisPanic("Unknown sorted set encoding");
2175 void zrankCommand(redisClient
*c
) {
2176 zrankGenericCommand(c
, 0);
2179 void zrevrankCommand(redisClient
*c
) {
2180 zrankGenericCommand(c
, 1);