]>
git.saurik.com Git - redis.git/blob - src/t_zset.c
5 /*-----------------------------------------------------------------------------
7 *----------------------------------------------------------------------------*/
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated scores.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
26 zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
27 zskiplistNode
*zn
= zmalloc(sizeof(*zn
)+level
*sizeof(struct zskiplistLevel
));
33 zskiplist
*zslCreate(void) {
37 zsl
= zmalloc(sizeof(*zsl
));
40 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
41 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++) {
42 zsl
->header
->level
[j
].forward
= NULL
;
43 zsl
->header
->level
[j
].span
= 0;
45 zsl
->header
->backward
= NULL
;
50 void zslFreeNode(zskiplistNode
*node
) {
51 decrRefCount(node
->obj
);
55 void zslFree(zskiplist
*zsl
) {
56 zskiplistNode
*node
= zsl
->header
->level
[0].forward
, *next
;
60 next
= node
->level
[0].forward
;
67 /* Returns a random level for the new skiplist node we are going to create.
68 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
69 * (both inclusive), with a powerlaw-alike distribution where higher
70 * levels are less likely to be returned. */
71 int zslRandomLevel(void) {
73 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
75 return (level
<ZSKIPLIST_MAXLEVEL
) ? level
: ZSKIPLIST_MAXLEVEL
;
78 zskiplistNode
*zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
79 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
80 unsigned int rank
[ZSKIPLIST_MAXLEVEL
];
83 redisAssert(!isnan(score
));
85 for (i
= zsl
->level
-1; i
>= 0; i
--) {
86 /* store rank that is crossed to reach the insert position */
87 rank
[i
] = i
== (zsl
->level
-1) ? 0 : rank
[i
+1];
88 while (x
->level
[i
].forward
&&
89 (x
->level
[i
].forward
->score
< score
||
90 (x
->level
[i
].forward
->score
== score
&&
91 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) {
92 rank
[i
] += x
->level
[i
].span
;
93 x
= x
->level
[i
].forward
;
97 /* we assume the key is not already inside, since we allow duplicated
98 * scores, and the re-insertion of score and redis object should never
99 * happpen since the caller of zslInsert() should test in the hash table
100 * if the element is already inside or not. */
101 level
= zslRandomLevel();
102 if (level
> zsl
->level
) {
103 for (i
= zsl
->level
; i
< level
; i
++) {
105 update
[i
] = zsl
->header
;
106 update
[i
]->level
[i
].span
= zsl
->length
;
110 x
= zslCreateNode(level
,score
,obj
);
111 for (i
= 0; i
< level
; i
++) {
112 x
->level
[i
].forward
= update
[i
]->level
[i
].forward
;
113 update
[i
]->level
[i
].forward
= x
;
115 /* update span covered by update[i] as x is inserted here */
116 x
->level
[i
].span
= update
[i
]->level
[i
].span
- (rank
[0] - rank
[i
]);
117 update
[i
]->level
[i
].span
= (rank
[0] - rank
[i
]) + 1;
120 /* increment span for untouched levels */
121 for (i
= level
; i
< zsl
->level
; i
++) {
122 update
[i
]->level
[i
].span
++;
125 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
126 if (x
->level
[0].forward
)
127 x
->level
[0].forward
->backward
= x
;
134 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
135 void zslDeleteNode(zskiplist
*zsl
, zskiplistNode
*x
, zskiplistNode
**update
) {
137 for (i
= 0; i
< zsl
->level
; i
++) {
138 if (update
[i
]->level
[i
].forward
== x
) {
139 update
[i
]->level
[i
].span
+= x
->level
[i
].span
- 1;
140 update
[i
]->level
[i
].forward
= x
->level
[i
].forward
;
142 update
[i
]->level
[i
].span
-= 1;
145 if (x
->level
[0].forward
) {
146 x
->level
[0].forward
->backward
= x
->backward
;
148 zsl
->tail
= x
->backward
;
150 while(zsl
->level
> 1 && zsl
->header
->level
[zsl
->level
-1].forward
== NULL
)
155 /* Delete an element with matching score/object from the skiplist. */
156 int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
157 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
161 for (i
= zsl
->level
-1; i
>= 0; i
--) {
162 while (x
->level
[i
].forward
&&
163 (x
->level
[i
].forward
->score
< score
||
164 (x
->level
[i
].forward
->score
== score
&&
165 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0)))
166 x
= x
->level
[i
].forward
;
169 /* We may have multiple elements with the same score, what we need
170 * is to find the element with both the right score and object. */
171 x
= x
->level
[0].forward
;
172 if (x
&& score
== x
->score
&& equalStringObjects(x
->obj
,obj
)) {
173 zslDeleteNode(zsl
, x
, update
);
177 return 0; /* not found */
179 return 0; /* not found */
182 static int zslValueGteMin(double value
, zrangespec
*spec
) {
183 return spec
->minex
? (value
> spec
->min
) : (value
>= spec
->min
);
186 static int zslValueLteMax(double value
, zrangespec
*spec
) {
187 return spec
->maxex
? (value
< spec
->max
) : (value
<= spec
->max
);
190 /* Returns if there is a part of the zset is in range. */
191 int zslIsInRange(zskiplist
*zsl
, zrangespec
*range
) {
194 /* Test for ranges that will always be empty. */
195 if (range
->min
> range
->max
||
196 (range
->min
== range
->max
&& (range
->minex
|| range
->maxex
)))
199 if (x
== NULL
|| !zslValueGteMin(x
->score
,range
))
201 x
= zsl
->header
->level
[0].forward
;
202 if (x
== NULL
|| !zslValueLteMax(x
->score
,range
))
207 /* Find the first node that is contained in the specified range.
208 * Returns NULL when no element is contained in the range. */
209 zskiplistNode
*zslFirstInRange(zskiplist
*zsl
, zrangespec range
) {
213 /* If everything is out of range, return early. */
214 if (!zslIsInRange(zsl
,&range
)) return NULL
;
217 for (i
= zsl
->level
-1; i
>= 0; i
--) {
218 /* Go forward while *OUT* of range. */
219 while (x
->level
[i
].forward
&&
220 !zslValueGteMin(x
->level
[i
].forward
->score
,&range
))
221 x
= x
->level
[i
].forward
;
224 /* This is an inner range, so the next node cannot be NULL. */
225 x
= x
->level
[0].forward
;
226 redisAssert(x
!= NULL
);
228 /* Check if score <= max. */
229 if (!zslValueLteMax(x
->score
,&range
)) return NULL
;
233 /* Find the last node that is contained in the specified range.
234 * Returns NULL when no element is contained in the range. */
235 zskiplistNode
*zslLastInRange(zskiplist
*zsl
, zrangespec range
) {
239 /* If everything is out of range, return early. */
240 if (!zslIsInRange(zsl
,&range
)) return NULL
;
243 for (i
= zsl
->level
-1; i
>= 0; i
--) {
244 /* Go forward while *IN* range. */
245 while (x
->level
[i
].forward
&&
246 zslValueLteMax(x
->level
[i
].forward
->score
,&range
))
247 x
= x
->level
[i
].forward
;
250 /* This is an inner range, so this node cannot be NULL. */
251 redisAssert(x
!= NULL
);
253 /* Check if score >= min. */
254 if (!zslValueGteMin(x
->score
,&range
)) return NULL
;
258 /* Delete all the elements with score between min and max from the skiplist.
259 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
260 * Note that this function takes the reference to the hash table view of the
261 * sorted set, in order to remove the elements from the hash table too. */
262 unsigned long zslDeleteRangeByScore(zskiplist
*zsl
, zrangespec range
, dict
*dict
) {
263 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
264 unsigned long removed
= 0;
268 for (i
= zsl
->level
-1; i
>= 0; i
--) {
269 while (x
->level
[i
].forward
&& (range
.minex
?
270 x
->level
[i
].forward
->score
<= range
.min
:
271 x
->level
[i
].forward
->score
< range
.min
))
272 x
= x
->level
[i
].forward
;
276 /* Current node is the last with score < or <= min. */
277 x
= x
->level
[0].forward
;
279 /* Delete nodes while in range. */
280 while (x
&& (range
.maxex
? x
->score
< range
.max
: x
->score
<= range
.max
)) {
281 zskiplistNode
*next
= x
->level
[0].forward
;
282 zslDeleteNode(zsl
,x
,update
);
283 dictDelete(dict
,x
->obj
);
291 /* Delete all the elements with rank between start and end from the skiplist.
292 * Start and end are inclusive. Note that start and end need to be 1-based */
293 unsigned long zslDeleteRangeByRank(zskiplist
*zsl
, unsigned int start
, unsigned int end
, dict
*dict
) {
294 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
295 unsigned long traversed
= 0, removed
= 0;
299 for (i
= zsl
->level
-1; i
>= 0; i
--) {
300 while (x
->level
[i
].forward
&& (traversed
+ x
->level
[i
].span
) < start
) {
301 traversed
+= x
->level
[i
].span
;
302 x
= x
->level
[i
].forward
;
308 x
= x
->level
[0].forward
;
309 while (x
&& traversed
<= end
) {
310 zskiplistNode
*next
= x
->level
[0].forward
;
311 zslDeleteNode(zsl
,x
,update
);
312 dictDelete(dict
,x
->obj
);
321 /* Find the rank for an element by both score and key.
322 * Returns 0 when the element cannot be found, rank otherwise.
323 * Note that the rank is 1-based due to the span of zsl->header to the
325 unsigned long zslGetRank(zskiplist
*zsl
, double score
, robj
*o
) {
327 unsigned long rank
= 0;
331 for (i
= zsl
->level
-1; i
>= 0; i
--) {
332 while (x
->level
[i
].forward
&&
333 (x
->level
[i
].forward
->score
< score
||
334 (x
->level
[i
].forward
->score
== score
&&
335 compareStringObjects(x
->level
[i
].forward
->obj
,o
) <= 0))) {
336 rank
+= x
->level
[i
].span
;
337 x
= x
->level
[i
].forward
;
340 /* x might be equal to zsl->header, so test if obj is non-NULL */
341 if (x
->obj
&& equalStringObjects(x
->obj
,o
)) {
348 /* Finds an element by its rank. The rank argument needs to be 1-based. */
349 zskiplistNode
* zslGetElementByRank(zskiplist
*zsl
, unsigned long rank
) {
351 unsigned long traversed
= 0;
355 for (i
= zsl
->level
-1; i
>= 0; i
--) {
356 while (x
->level
[i
].forward
&& (traversed
+ x
->level
[i
].span
) <= rank
)
358 traversed
+= x
->level
[i
].span
;
359 x
= x
->level
[i
].forward
;
361 if (traversed
== rank
) {
368 /* Populate the rangespec according to the objects min and max. */
369 static int zslParseRange(robj
*min
, robj
*max
, zrangespec
*spec
) {
371 spec
->minex
= spec
->maxex
= 0;
373 /* Parse the min-max interval. If one of the values is prefixed
374 * by the "(" character, it's considered "open". For instance
375 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
376 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
377 if (min
->encoding
== REDIS_ENCODING_INT
) {
378 spec
->min
= (long)min
->ptr
;
380 if (((char*)min
->ptr
)[0] == '(') {
381 spec
->min
= strtod((char*)min
->ptr
+1,&eptr
);
382 if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
;
385 spec
->min
= strtod((char*)min
->ptr
,&eptr
);
386 if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
;
389 if (max
->encoding
== REDIS_ENCODING_INT
) {
390 spec
->max
= (long)max
->ptr
;
392 if (((char*)max
->ptr
)[0] == '(') {
393 spec
->max
= strtod((char*)max
->ptr
+1,&eptr
);
394 if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
;
397 spec
->max
= strtod((char*)max
->ptr
,&eptr
);
398 if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
;
405 /*-----------------------------------------------------------------------------
406 * Ziplist-backed sorted set API
407 *----------------------------------------------------------------------------*/
409 double zzlGetScore(unsigned char *sptr
) {
416 redisAssert(sptr
!= NULL
);
417 redisAssert(ziplistGet(sptr
,&vstr
,&vlen
,&vlong
));
420 memcpy(buf
,vstr
,vlen
);
422 score
= strtod(buf
,NULL
);
430 /* Compare element in sorted set with given element. */
431 int zzlCompareElements(unsigned char *eptr
, unsigned char *cstr
, unsigned int clen
) {
435 unsigned char vbuf
[32];
438 redisAssert(ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
440 /* Store string representation of long long in buf. */
441 vlen
= ll2string((char*)vbuf
,sizeof(vbuf
),vlong
);
445 minlen
= (vlen
< clen
) ? vlen
: clen
;
446 cmp
= memcmp(vstr
,cstr
,minlen
);
447 if (cmp
== 0) return vlen
-clen
;
451 unsigned int zzlLength(unsigned char *zl
) {
452 return ziplistLen(zl
)/2;
455 /* Move to next entry based on the values in eptr and sptr. Both are set to
456 * NULL when there is no next entry. */
457 void zzlNext(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) {
458 unsigned char *_eptr
, *_sptr
;
459 redisAssert(*eptr
!= NULL
&& *sptr
!= NULL
);
461 _eptr
= ziplistNext(zl
,*sptr
);
463 _sptr
= ziplistNext(zl
,_eptr
);
464 redisAssert(_sptr
!= NULL
);
474 /* Move to the previous entry based on the values in eptr and sptr. Both are
475 * set to NULL when there is no next entry. */
476 void zzlPrev(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) {
477 unsigned char *_eptr
, *_sptr
;
478 redisAssert(*eptr
!= NULL
&& *sptr
!= NULL
);
480 _sptr
= ziplistPrev(zl
,*eptr
);
482 _eptr
= ziplistPrev(zl
,_sptr
);
483 redisAssert(_eptr
!= NULL
);
485 /* No previous entry. */
493 /* Returns if there is a part of the zset is in range. Should only be used
494 * internally by zzlFirstInRange and zzlLastInRange. */
495 int zzlIsInRange(unsigned char *zl
, zrangespec
*range
) {
499 /* Test for ranges that will always be empty. */
500 if (range
->min
> range
->max
||
501 (range
->min
== range
->max
&& (range
->minex
|| range
->maxex
)))
504 p
= ziplistIndex(zl
,-1); /* Last score. */
505 redisAssert(p
!= NULL
);
506 score
= zzlGetScore(p
);
507 if (!zslValueGteMin(score
,range
))
510 p
= ziplistIndex(zl
,1); /* First score. */
511 redisAssert(p
!= NULL
);
512 score
= zzlGetScore(p
);
513 if (!zslValueLteMax(score
,range
))
519 /* Find pointer to the first element contained in the specified range.
520 * Returns NULL when no element is contained in the range. */
521 unsigned char *zzlFirstInRange(unsigned char *zl
, zrangespec range
) {
522 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
525 /* If everything is out of range, return early. */
526 if (!zzlIsInRange(zl
,&range
)) return NULL
;
528 while (eptr
!= NULL
) {
529 sptr
= ziplistNext(zl
,eptr
);
530 redisAssert(sptr
!= NULL
);
532 score
= zzlGetScore(sptr
);
533 if (zslValueGteMin(score
,&range
)) {
534 /* Check if score <= max. */
535 if (zslValueLteMax(score
,&range
))
540 /* Move to next element. */
541 eptr
= ziplistNext(zl
,sptr
);
547 /* Find pointer to the last element contained in the specified range.
548 * Returns NULL when no element is contained in the range. */
549 unsigned char *zzlLastInRange(unsigned char *zl
, zrangespec range
) {
550 unsigned char *eptr
= ziplistIndex(zl
,-2), *sptr
;
553 /* If everything is out of range, return early. */
554 if (!zzlIsInRange(zl
,&range
)) return NULL
;
556 while (eptr
!= NULL
) {
557 sptr
= ziplistNext(zl
,eptr
);
558 redisAssert(sptr
!= NULL
);
560 score
= zzlGetScore(sptr
);
561 if (zslValueLteMax(score
,&range
)) {
562 /* Check if score >= min. */
563 if (zslValueGteMin(score
,&range
))
568 /* Move to previous element by moving to the score of previous element.
569 * When this returns NULL, we know there also is no element. */
570 sptr
= ziplistPrev(zl
,eptr
);
572 redisAssert((eptr
= ziplistPrev(zl
,sptr
)) != NULL
);
580 unsigned char *zzlFind(unsigned char *zl
, robj
*ele
, double *score
) {
581 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
583 ele
= getDecodedObject(ele
);
584 while (eptr
!= NULL
) {
585 sptr
= ziplistNext(zl
,eptr
);
586 redisAssertWithInfo(NULL
,ele
,sptr
!= NULL
);
588 if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
))) {
589 /* Matching element, pull out score. */
590 if (score
!= NULL
) *score
= zzlGetScore(sptr
);
595 /* Move to next element. */
596 eptr
= ziplistNext(zl
,sptr
);
603 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
604 * don't want to modify the one given as argument. */
605 unsigned char *zzlDelete(unsigned char *zl
, unsigned char *eptr
) {
606 unsigned char *p
= eptr
;
608 /* TODO: add function to ziplist API to delete N elements from offset. */
609 zl
= ziplistDelete(zl
,&p
);
610 zl
= ziplistDelete(zl
,&p
);
614 unsigned char *zzlInsertAt(unsigned char *zl
, unsigned char *eptr
, robj
*ele
, double score
) {
620 redisAssertWithInfo(NULL
,ele
,ele
->encoding
== REDIS_ENCODING_RAW
);
621 scorelen
= d2string(scorebuf
,sizeof(scorebuf
),score
);
623 zl
= ziplistPush(zl
,ele
->ptr
,sdslen(ele
->ptr
),ZIPLIST_TAIL
);
624 zl
= ziplistPush(zl
,(unsigned char*)scorebuf
,scorelen
,ZIPLIST_TAIL
);
626 /* Keep offset relative to zl, as it might be re-allocated. */
628 zl
= ziplistInsert(zl
,eptr
,ele
->ptr
,sdslen(ele
->ptr
));
631 /* Insert score after the element. */
632 redisAssertWithInfo(NULL
,ele
,(sptr
= ziplistNext(zl
,eptr
)) != NULL
);
633 zl
= ziplistInsert(zl
,sptr
,(unsigned char*)scorebuf
,scorelen
);
639 /* Insert (element,score) pair in ziplist. This function assumes the element is
640 * not yet present in the list. */
641 unsigned char *zzlInsert(unsigned char *zl
, robj
*ele
, double score
) {
642 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
645 ele
= getDecodedObject(ele
);
646 while (eptr
!= NULL
) {
647 sptr
= ziplistNext(zl
,eptr
);
648 redisAssertWithInfo(NULL
,ele
,sptr
!= NULL
);
649 s
= zzlGetScore(sptr
);
652 /* First element with score larger than score for element to be
653 * inserted. This means we should take its spot in the list to
654 * maintain ordering. */
655 zl
= zzlInsertAt(zl
,eptr
,ele
,score
);
657 } else if (s
== score
) {
658 /* Ensure lexicographical ordering for elements. */
659 if (zzlCompareElements(eptr
,ele
->ptr
,sdslen(ele
->ptr
)) > 0) {
660 zl
= zzlInsertAt(zl
,eptr
,ele
,score
);
665 /* Move to next element. */
666 eptr
= ziplistNext(zl
,sptr
);
669 /* Push on tail of list when it was not yet inserted. */
671 zl
= zzlInsertAt(zl
,NULL
,ele
,score
);
677 unsigned char *zzlDeleteRangeByScore(unsigned char *zl
, zrangespec range
, unsigned long *deleted
) {
678 unsigned char *eptr
, *sptr
;
680 unsigned long num
= 0;
682 if (deleted
!= NULL
) *deleted
= 0;
684 eptr
= zzlFirstInRange(zl
,range
);
685 if (eptr
== NULL
) return zl
;
687 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
688 * byte and ziplistNext will return NULL. */
689 while ((sptr
= ziplistNext(zl
,eptr
)) != NULL
) {
690 score
= zzlGetScore(sptr
);
691 if (zslValueLteMax(score
,&range
)) {
692 /* Delete both the element and the score. */
693 zl
= ziplistDelete(zl
,&eptr
);
694 zl
= ziplistDelete(zl
,&eptr
);
697 /* No longer in range. */
702 if (deleted
!= NULL
) *deleted
= num
;
706 /* Delete all the elements with rank between start and end from the skiplist.
707 * Start and end are inclusive. Note that start and end need to be 1-based */
708 unsigned char *zzlDeleteRangeByRank(unsigned char *zl
, unsigned int start
, unsigned int end
, unsigned long *deleted
) {
709 unsigned int num
= (end
-start
)+1;
710 if (deleted
) *deleted
= num
;
711 zl
= ziplistDeleteRange(zl
,2*(start
-1),2*num
);
715 /*-----------------------------------------------------------------------------
716 * Common sorted set API
717 *----------------------------------------------------------------------------*/
719 unsigned int zsetLength(robj
*zobj
) {
721 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
722 length
= zzlLength(zobj
->ptr
);
723 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
724 length
= ((zset
*)zobj
->ptr
)->zsl
->length
;
726 redisPanic("Unknown sorted set encoding");
731 void zsetConvert(robj
*zobj
, int encoding
) {
733 zskiplistNode
*node
, *next
;
737 if (zobj
->encoding
== encoding
) return;
738 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
739 unsigned char *zl
= zobj
->ptr
;
740 unsigned char *eptr
, *sptr
;
745 if (encoding
!= REDIS_ENCODING_SKIPLIST
)
746 redisPanic("Unknown target encoding");
748 zs
= zmalloc(sizeof(*zs
));
749 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
750 zs
->zsl
= zslCreate();
752 eptr
= ziplistIndex(zl
,0);
753 redisAssertWithInfo(NULL
,zobj
,eptr
!= NULL
);
754 sptr
= ziplistNext(zl
,eptr
);
755 redisAssertWithInfo(NULL
,zobj
,sptr
!= NULL
);
757 while (eptr
!= NULL
) {
758 score
= zzlGetScore(sptr
);
759 redisAssertWithInfo(NULL
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
761 ele
= createStringObjectFromLongLong(vlong
);
763 ele
= createStringObject((char*)vstr
,vlen
);
765 /* Has incremented refcount since it was just created. */
766 node
= zslInsert(zs
->zsl
,score
,ele
);
767 redisAssertWithInfo(NULL
,zobj
,dictAdd(zs
->dict
,ele
,&node
->score
) == DICT_OK
);
768 incrRefCount(ele
); /* Added to dictionary. */
769 zzlNext(zl
,&eptr
,&sptr
);
774 zobj
->encoding
= REDIS_ENCODING_SKIPLIST
;
775 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
776 unsigned char *zl
= ziplistNew();
778 if (encoding
!= REDIS_ENCODING_ZIPLIST
)
779 redisPanic("Unknown target encoding");
781 /* Approach similar to zslFree(), since we want to free the skiplist at
782 * the same time as creating the ziplist. */
784 dictRelease(zs
->dict
);
785 node
= zs
->zsl
->header
->level
[0].forward
;
786 zfree(zs
->zsl
->header
);
790 ele
= getDecodedObject(node
->obj
);
791 zl
= zzlInsertAt(zl
,NULL
,ele
,node
->score
);
794 next
= node
->level
[0].forward
;
801 zobj
->encoding
= REDIS_ENCODING_ZIPLIST
;
803 redisPanic("Unknown sorted set encoding");
807 /*-----------------------------------------------------------------------------
808 * Sorted set commands
809 *----------------------------------------------------------------------------*/
811 /* This generic command implements both ZADD and ZINCRBY. */
812 void zaddGenericCommand(redisClient
*c
, int incr
) {
813 static char *nanerr
= "resulting score is not a number (NaN)";
814 robj
*key
= c
->argv
[1];
818 double score
= 0, *scores
, curscore
= 0.0;
819 int j
, elements
= (c
->argc
-2)/2;
823 addReply(c
,shared
.syntaxerr
);
827 /* Start parsing all the scores, we need to emit any syntax error
828 * before executing additions to the sorted set, as the command should
829 * either execute fully or nothing at all. */
830 scores
= zmalloc(sizeof(double)*elements
);
831 for (j
= 0; j
< elements
; j
++) {
832 if (getDoubleFromObjectOrReply(c
,c
->argv
[2+j
*2],&scores
[j
],NULL
)
840 /* Lookup the key and create the sorted set if does not exist. */
841 zobj
= lookupKeyWrite(c
->db
,key
);
843 if (server
.zset_max_ziplist_entries
== 0 ||
844 server
.zset_max_ziplist_value
< sdslen(c
->argv
[3]->ptr
))
846 zobj
= createZsetObject();
848 zobj
= createZsetZiplistObject();
850 dbAdd(c
->db
,key
,zobj
);
852 if (zobj
->type
!= REDIS_ZSET
) {
853 addReply(c
,shared
.wrongtypeerr
);
859 for (j
= 0; j
< elements
; j
++) {
862 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
865 /* Prefer non-encoded element when dealing with ziplists. */
866 ele
= c
->argv
[3+j
*2];
867 if ((eptr
= zzlFind(zobj
->ptr
,ele
,&curscore
)) != NULL
) {
871 addReplyError(c
,nanerr
);
872 /* Don't need to check if the sorted set is empty
873 * because we know it has at least one element. */
879 /* Remove and re-insert when score changed. */
880 if (score
!= curscore
) {
881 zobj
->ptr
= zzlDelete(zobj
->ptr
,eptr
);
882 zobj
->ptr
= zzlInsert(zobj
->ptr
,ele
,score
);
884 signalModifiedKey(c
->db
,key
);
888 /* Optimize: check if the element is too large or the list
889 * becomes too long *before* executing zzlInsert. */
890 zobj
->ptr
= zzlInsert(zobj
->ptr
,ele
,score
);
891 if (zzlLength(zobj
->ptr
) > server
.zset_max_ziplist_entries
)
892 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
);
893 if (sdslen(ele
->ptr
) > server
.zset_max_ziplist_value
)
894 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
);
896 signalModifiedKey(c
->db
,key
);
900 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
901 zset
*zs
= zobj
->ptr
;
902 zskiplistNode
*znode
;
905 ele
= c
->argv
[3+j
*2] = tryObjectEncoding(c
->argv
[3+j
*2]);
906 de
= dictFind(zs
->dict
,ele
);
908 curobj
= dictGetKey(de
);
909 curscore
= *(double*)dictGetVal(de
);
914 addReplyError(c
,nanerr
);
915 /* Don't need to check if the sorted set is empty
916 * because we know it has at least one element. */
922 /* Remove and re-insert when score changed. We can safely
923 * delete the key object from the skiplist, since the
924 * dictionary still has a reference to it. */
925 if (score
!= curscore
) {
926 redisAssertWithInfo(c
,curobj
,zslDelete(zs
->zsl
,curscore
,curobj
));
927 znode
= zslInsert(zs
->zsl
,score
,curobj
);
928 incrRefCount(curobj
); /* Re-inserted in skiplist. */
929 dictGetVal(de
) = &znode
->score
; /* Update score ptr. */
931 signalModifiedKey(c
->db
,key
);
935 znode
= zslInsert(zs
->zsl
,score
,ele
);
936 incrRefCount(ele
); /* Inserted in skiplist. */
937 redisAssertWithInfo(c
,NULL
,dictAdd(zs
->dict
,ele
,&znode
->score
) == DICT_OK
);
938 incrRefCount(ele
); /* Added to dictionary. */
940 signalModifiedKey(c
->db
,key
);
945 redisPanic("Unknown sorted set encoding");
949 if (incr
) /* ZINCRBY */
950 addReplyDouble(c
,score
);
952 addReplyLongLong(c
,added
);
955 void zaddCommand(redisClient
*c
) {
956 zaddGenericCommand(c
,0);
959 void zincrbyCommand(redisClient
*c
) {
960 zaddGenericCommand(c
,1);
963 void zremCommand(redisClient
*c
) {
964 robj
*key
= c
->argv
[1];
968 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
969 checkType(c
,zobj
,REDIS_ZSET
)) return;
971 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
974 for (j
= 2; j
< c
->argc
; j
++) {
975 if ((eptr
= zzlFind(zobj
->ptr
,c
->argv
[j
],NULL
)) != NULL
) {
977 zobj
->ptr
= zzlDelete(zobj
->ptr
,eptr
);
978 if (zzlLength(zobj
->ptr
) == 0) {
984 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
985 zset
*zs
= zobj
->ptr
;
989 for (j
= 2; j
< c
->argc
; j
++) {
990 de
= dictFind(zs
->dict
,c
->argv
[j
]);
994 /* Delete from the skiplist */
995 score
= *(double*)dictGetVal(de
);
996 redisAssertWithInfo(c
,c
->argv
[j
],zslDelete(zs
->zsl
,score
,c
->argv
[j
]));
998 /* Delete from the hash table */
999 dictDelete(zs
->dict
,c
->argv
[j
]);
1000 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1001 if (dictSize(zs
->dict
) == 0) {
1002 dbDelete(c
->db
,key
);
1008 redisPanic("Unknown sorted set encoding");
1012 signalModifiedKey(c
->db
,key
);
1013 server
.dirty
+= deleted
;
1015 addReplyLongLong(c
,deleted
);
1018 void zremrangebyscoreCommand(redisClient
*c
) {
1019 robj
*key
= c
->argv
[1];
1022 unsigned long deleted
;
1024 /* Parse the range arguments. */
1025 if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) {
1026 addReplyError(c
,"min or max is not a float");
1030 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
1031 checkType(c
,zobj
,REDIS_ZSET
)) return;
1033 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1034 zobj
->ptr
= zzlDeleteRangeByScore(zobj
->ptr
,range
,&deleted
);
1035 if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
);
1036 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1037 zset
*zs
= zobj
->ptr
;
1038 deleted
= zslDeleteRangeByScore(zs
->zsl
,range
,zs
->dict
);
1039 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1040 if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
);
1042 redisPanic("Unknown sorted set encoding");
1045 if (deleted
) signalModifiedKey(c
->db
,key
);
1046 server
.dirty
+= deleted
;
1047 addReplyLongLong(c
,deleted
);
1050 void zremrangebyrankCommand(redisClient
*c
) {
1051 robj
*key
= c
->argv
[1];
1056 unsigned long deleted
;
1058 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
1059 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
1061 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
1062 checkType(c
,zobj
,REDIS_ZSET
)) return;
1064 /* Sanitize indexes. */
1065 llen
= zsetLength(zobj
);
1066 if (start
< 0) start
= llen
+start
;
1067 if (end
< 0) end
= llen
+end
;
1068 if (start
< 0) start
= 0;
1070 /* Invariant: start >= 0, so this test will be true when end < 0.
1071 * The range is empty when start > end or start >= length. */
1072 if (start
> end
|| start
>= llen
) {
1073 addReply(c
,shared
.czero
);
1076 if (end
>= llen
) end
= llen
-1;
1078 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1079 /* Correct for 1-based rank. */
1080 zobj
->ptr
= zzlDeleteRangeByRank(zobj
->ptr
,start
+1,end
+1,&deleted
);
1081 if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
);
1082 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1083 zset
*zs
= zobj
->ptr
;
1085 /* Correct for 1-based rank. */
1086 deleted
= zslDeleteRangeByRank(zs
->zsl
,start
+1,end
+1,zs
->dict
);
1087 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1088 if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
);
1090 redisPanic("Unknown sorted set encoding");
1093 if (deleted
) signalModifiedKey(c
->db
,key
);
1094 server
.dirty
+= deleted
;
1095 addReplyLongLong(c
,deleted
);
1100 int type
; /* Set, sorted set */
1105 /* Set iterators. */
1118 /* Sorted set iterators. */
1122 unsigned char *eptr
, *sptr
;
1126 zskiplistNode
*node
;
1133 /* Use dirty flags for pointers that need to be cleaned up in the next
1134 * iteration over the zsetopval. The dirty flag for the long long value is
1135 * special, since long long values don't need cleanup. Instead, it means that
1136 * we already checked that "ell" holds a long long, or tried to convert another
1137 * representation into a long long value. When this was successful,
1138 * OPVAL_VALID_LL is set as well. */
1139 #define OPVAL_DIRTY_ROBJ 1
1140 #define OPVAL_DIRTY_LL 2
1141 #define OPVAL_VALID_LL 4
1143 /* Store value retrieved from the iterator. */
1146 unsigned char _buf
[32]; /* Private buffer. */
1148 unsigned char *estr
;
1154 typedef union _iterset iterset
;
1155 typedef union _iterzset iterzset
;
1157 void zuiInitIterator(zsetopsrc
*op
) {
1158 if (op
->subject
== NULL
)
1161 if (op
->type
== REDIS_SET
) {
1162 iterset
*it
= &op
->iter
.set
;
1163 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1164 it
->is
.is
= op
->subject
->ptr
;
1166 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1167 it
->ht
.dict
= op
->subject
->ptr
;
1168 it
->ht
.di
= dictGetIterator(op
->subject
->ptr
);
1169 it
->ht
.de
= dictNext(it
->ht
.di
);
1171 redisPanic("Unknown set encoding");
1173 } else if (op
->type
== REDIS_ZSET
) {
1174 iterzset
*it
= &op
->iter
.zset
;
1175 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1176 it
->zl
.zl
= op
->subject
->ptr
;
1177 it
->zl
.eptr
= ziplistIndex(it
->zl
.zl
,0);
1178 if (it
->zl
.eptr
!= NULL
) {
1179 it
->zl
.sptr
= ziplistNext(it
->zl
.zl
,it
->zl
.eptr
);
1180 redisAssert(it
->zl
.sptr
!= NULL
);
1182 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1183 it
->sl
.zs
= op
->subject
->ptr
;
1184 it
->sl
.node
= it
->sl
.zs
->zsl
->header
->level
[0].forward
;
1186 redisPanic("Unknown sorted set encoding");
1189 redisPanic("Unsupported type");
1193 void zuiClearIterator(zsetopsrc
*op
) {
1194 if (op
->subject
== NULL
)
1197 if (op
->type
== REDIS_SET
) {
1198 iterset
*it
= &op
->iter
.set
;
1199 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1200 REDIS_NOTUSED(it
); /* skip */
1201 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1202 dictReleaseIterator(it
->ht
.di
);
1204 redisPanic("Unknown set encoding");
1206 } else if (op
->type
== REDIS_ZSET
) {
1207 iterzset
*it
= &op
->iter
.zset
;
1208 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1209 REDIS_NOTUSED(it
); /* skip */
1210 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1211 REDIS_NOTUSED(it
); /* skip */
1213 redisPanic("Unknown sorted set encoding");
1216 redisPanic("Unsupported type");
1220 int zuiLength(zsetopsrc
*op
) {
1221 if (op
->subject
== NULL
)
1224 if (op
->type
== REDIS_SET
) {
1225 iterset
*it
= &op
->iter
.set
;
1226 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1227 return intsetLen(it
->is
.is
);
1228 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1229 return dictSize(it
->ht
.dict
);
1231 redisPanic("Unknown set encoding");
1233 } else if (op
->type
== REDIS_ZSET
) {
1234 iterzset
*it
= &op
->iter
.zset
;
1235 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1236 return zzlLength(it
->zl
.zl
);
1237 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1238 return it
->sl
.zs
->zsl
->length
;
1240 redisPanic("Unknown sorted set encoding");
1243 redisPanic("Unsupported type");
1247 /* Check if the current value is valid. If so, store it in the passed structure
1248 * and move to the next element. If not valid, this means we have reached the
1249 * end of the structure and can abort. */
1250 int zuiNext(zsetopsrc
*op
, zsetopval
*val
) {
1251 if (op
->subject
== NULL
)
1254 if (val
->flags
& OPVAL_DIRTY_ROBJ
)
1255 decrRefCount(val
->ele
);
1257 bzero(val
,sizeof(zsetopval
));
1259 if (op
->type
== REDIS_SET
) {
1260 iterset
*it
= &op
->iter
.set
;
1261 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1262 if (!intsetGet(it
->is
.is
,it
->is
.ii
,(int64_t*)&val
->ell
))
1266 /* Move to next element. */
1268 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1269 if (it
->ht
.de
== NULL
)
1271 val
->ele
= dictGetKey(it
->ht
.de
);
1274 /* Move to next element. */
1275 it
->ht
.de
= dictNext(it
->ht
.di
);
1277 redisPanic("Unknown set encoding");
1279 } else if (op
->type
== REDIS_ZSET
) {
1280 iterzset
*it
= &op
->iter
.zset
;
1281 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1282 /* No need to check both, but better be explicit. */
1283 if (it
->zl
.eptr
== NULL
|| it
->zl
.sptr
== NULL
)
1285 redisAssert(ziplistGet(it
->zl
.eptr
,&val
->estr
,&val
->elen
,&val
->ell
));
1286 val
->score
= zzlGetScore(it
->zl
.sptr
);
1288 /* Move to next element. */
1289 zzlNext(it
->zl
.zl
,&it
->zl
.eptr
,&it
->zl
.sptr
);
1290 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1291 if (it
->sl
.node
== NULL
)
1293 val
->ele
= it
->sl
.node
->obj
;
1294 val
->score
= it
->sl
.node
->score
;
1296 /* Move to next element. */
1297 it
->sl
.node
= it
->sl
.node
->level
[0].forward
;
1299 redisPanic("Unknown sorted set encoding");
1302 redisPanic("Unsupported type");
1307 int zuiLongLongFromValue(zsetopval
*val
) {
1308 if (!(val
->flags
& OPVAL_DIRTY_LL
)) {
1309 val
->flags
|= OPVAL_DIRTY_LL
;
1311 if (val
->ele
!= NULL
) {
1312 if (val
->ele
->encoding
== REDIS_ENCODING_INT
) {
1313 val
->ell
= (long)val
->ele
->ptr
;
1314 val
->flags
|= OPVAL_VALID_LL
;
1315 } else if (val
->ele
->encoding
== REDIS_ENCODING_RAW
) {
1316 if (string2ll(val
->ele
->ptr
,sdslen(val
->ele
->ptr
),&val
->ell
))
1317 val
->flags
|= OPVAL_VALID_LL
;
1319 redisPanic("Unsupported element encoding");
1321 } else if (val
->estr
!= NULL
) {
1322 if (string2ll((char*)val
->estr
,val
->elen
,&val
->ell
))
1323 val
->flags
|= OPVAL_VALID_LL
;
1325 /* The long long was already set, flag as valid. */
1326 val
->flags
|= OPVAL_VALID_LL
;
1329 return val
->flags
& OPVAL_VALID_LL
;
1332 robj
*zuiObjectFromValue(zsetopval
*val
) {
1333 if (val
->ele
== NULL
) {
1334 if (val
->estr
!= NULL
) {
1335 val
->ele
= createStringObject((char*)val
->estr
,val
->elen
);
1337 val
->ele
= createStringObjectFromLongLong(val
->ell
);
1339 val
->flags
|= OPVAL_DIRTY_ROBJ
;
1344 int zuiBufferFromValue(zsetopval
*val
) {
1345 if (val
->estr
== NULL
) {
1346 if (val
->ele
!= NULL
) {
1347 if (val
->ele
->encoding
== REDIS_ENCODING_INT
) {
1348 val
->elen
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),(long)val
->ele
->ptr
);
1349 val
->estr
= val
->_buf
;
1350 } else if (val
->ele
->encoding
== REDIS_ENCODING_RAW
) {
1351 val
->elen
= sdslen(val
->ele
->ptr
);
1352 val
->estr
= val
->ele
->ptr
;
1354 redisPanic("Unsupported element encoding");
1357 val
->elen
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),val
->ell
);
1358 val
->estr
= val
->_buf
;
1364 /* Find value pointed to by val in the source pointer to by op. When found,
1365 * return 1 and store its score in target. Return 0 otherwise. */
1366 int zuiFind(zsetopsrc
*op
, zsetopval
*val
, double *score
) {
1367 if (op
->subject
== NULL
)
1370 if (op
->type
== REDIS_SET
) {
1371 iterset
*it
= &op
->iter
.set
;
1373 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1374 if (zuiLongLongFromValue(val
) && intsetFind(it
->is
.is
,val
->ell
)) {
1380 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1381 zuiObjectFromValue(val
);
1382 if (dictFind(it
->ht
.dict
,val
->ele
) != NULL
) {
1389 redisPanic("Unknown set encoding");
1391 } else if (op
->type
== REDIS_ZSET
) {
1392 iterzset
*it
= &op
->iter
.zset
;
1393 zuiObjectFromValue(val
);
1395 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1396 if (zzlFind(it
->zl
.zl
,val
->ele
,score
) != NULL
) {
1397 /* Score is already set by zzlFind. */
1402 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1404 if ((de
= dictFind(it
->sl
.zs
->dict
,val
->ele
)) != NULL
) {
1405 *score
= *(double*)dictGetVal(de
);
1411 redisPanic("Unknown sorted set encoding");
1414 redisPanic("Unsupported type");
1418 int zuiCompareByCardinality(const void *s1
, const void *s2
) {
1419 return zuiLength((zsetopsrc
*)s1
) - zuiLength((zsetopsrc
*)s2
);
1422 #define REDIS_AGGR_SUM 1
1423 #define REDIS_AGGR_MIN 2
1424 #define REDIS_AGGR_MAX 3
1425 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
1427 inline static void zunionInterAggregate(double *target
, double val
, int aggregate
) {
1428 if (aggregate
== REDIS_AGGR_SUM
) {
1429 *target
= *target
+ val
;
1430 /* The result of adding two doubles is NaN when one variable
1431 * is +inf and the other is -inf. When these numbers are added,
1432 * we maintain the convention of the result being 0.0. */
1433 if (isnan(*target
)) *target
= 0.0;
1434 } else if (aggregate
== REDIS_AGGR_MIN
) {
1435 *target
= val
< *target
? val
: *target
;
1436 } else if (aggregate
== REDIS_AGGR_MAX
) {
1437 *target
= val
> *target
? val
: *target
;
1440 redisPanic("Unknown ZUNION/INTER aggregate type");
1444 void zunionInterGenericCommand(redisClient
*c
, robj
*dstkey
, int op
) {
1447 int aggregate
= REDIS_AGGR_SUM
;
1451 unsigned int maxelelen
= 0;
1454 zskiplistNode
*znode
;
1457 /* expect setnum input keys to be given */
1458 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &setnum
, NULL
) != REDIS_OK
))
1463 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1467 /* test if the expected number of keys would overflow */
1468 if (3+setnum
> c
->argc
) {
1469 addReply(c
,shared
.syntaxerr
);
1473 /* read keys to be used for input */
1474 src
= zcalloc(sizeof(zsetopsrc
) * setnum
);
1475 for (i
= 0, j
= 3; i
< setnum
; i
++, j
++) {
1476 robj
*obj
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
1478 if (obj
->type
!= REDIS_ZSET
&& obj
->type
!= REDIS_SET
) {
1480 addReply(c
,shared
.wrongtypeerr
);
1484 src
[i
].subject
= obj
;
1485 src
[i
].type
= obj
->type
;
1486 src
[i
].encoding
= obj
->encoding
;
1488 src
[i
].subject
= NULL
;
1491 /* Default all weights to 1. */
1492 src
[i
].weight
= 1.0;
1495 /* parse optional extra arguments */
1497 int remaining
= c
->argc
- j
;
1500 if (remaining
>= (setnum
+ 1) && !strcasecmp(c
->argv
[j
]->ptr
,"weights")) {
1502 for (i
= 0; i
< setnum
; i
++, j
++, remaining
--) {
1503 if (getDoubleFromObjectOrReply(c
,c
->argv
[j
],&src
[i
].weight
,
1504 "weight value is not a float") != REDIS_OK
)
1510 } else if (remaining
>= 2 && !strcasecmp(c
->argv
[j
]->ptr
,"aggregate")) {
1512 if (!strcasecmp(c
->argv
[j
]->ptr
,"sum")) {
1513 aggregate
= REDIS_AGGR_SUM
;
1514 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"min")) {
1515 aggregate
= REDIS_AGGR_MIN
;
1516 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"max")) {
1517 aggregate
= REDIS_AGGR_MAX
;
1520 addReply(c
,shared
.syntaxerr
);
1526 addReply(c
,shared
.syntaxerr
);
1532 for (i
= 0; i
< setnum
; i
++)
1533 zuiInitIterator(&src
[i
]);
1535 /* sort sets from the smallest to largest, this will improve our
1536 * algorithm's performance */
1537 qsort(src
,setnum
,sizeof(zsetopsrc
),zuiCompareByCardinality
);
1539 dstobj
= createZsetObject();
1540 dstzset
= dstobj
->ptr
;
1541 memset(&zval
, 0, sizeof(zval
));
1543 if (op
== REDIS_OP_INTER
) {
1544 /* Skip everything if the smallest input is empty. */
1545 if (zuiLength(&src
[0]) > 0) {
1546 /* Precondition: as src[0] is non-empty and the inputs are ordered
1547 * by size, all src[i > 0] are non-empty too. */
1548 while (zuiNext(&src
[0],&zval
)) {
1549 double score
, value
;
1551 score
= src
[0].weight
* zval
.score
;
1552 if (isnan(score
)) score
= 0;
1554 for (j
= 1; j
< setnum
; j
++) {
1555 /* It is not safe to access the zset we are
1556 * iterating, so explicitly check for equal object. */
1557 if (src
[j
].subject
== src
[0].subject
) {
1558 value
= zval
.score
*src
[j
].weight
;
1559 zunionInterAggregate(&score
,value
,aggregate
);
1560 } else if (zuiFind(&src
[j
],&zval
,&value
)) {
1561 value
*= src
[j
].weight
;
1562 zunionInterAggregate(&score
,value
,aggregate
);
1568 /* Only continue when present in every input. */
1570 tmp
= zuiObjectFromValue(&zval
);
1571 znode
= zslInsert(dstzset
->zsl
,score
,tmp
);
1572 incrRefCount(tmp
); /* added to skiplist */
1573 dictAdd(dstzset
->dict
,tmp
,&znode
->score
);
1574 incrRefCount(tmp
); /* added to dictionary */
1576 if (tmp
->encoding
== REDIS_ENCODING_RAW
)
1577 if (sdslen(tmp
->ptr
) > maxelelen
)
1578 maxelelen
= sdslen(tmp
->ptr
);
1582 } else if (op
== REDIS_OP_UNION
) {
1583 for (i
= 0; i
< setnum
; i
++) {
1584 if (zuiLength(&src
[i
]) == 0)
1587 while (zuiNext(&src
[i
],&zval
)) {
1588 double score
, value
;
1590 /* Skip key when already processed */
1591 if (dictFind(dstzset
->dict
,zuiObjectFromValue(&zval
)) != NULL
)
1594 /* Initialize score */
1595 score
= src
[i
].weight
* zval
.score
;
1596 if (isnan(score
)) score
= 0;
1598 /* Because the inputs are sorted by size, it's only possible
1599 * for sets at larger indices to hold this element. */
1600 for (j
= (i
+1); j
< setnum
; j
++) {
1601 /* It is not safe to access the zset we are
1602 * iterating, so explicitly check for equal object. */
1603 if(src
[j
].subject
== src
[i
].subject
) {
1604 value
= zval
.score
*src
[j
].weight
;
1605 zunionInterAggregate(&score
,value
,aggregate
);
1606 } else if (zuiFind(&src
[j
],&zval
,&value
)) {
1607 value
*= src
[j
].weight
;
1608 zunionInterAggregate(&score
,value
,aggregate
);
1612 tmp
= zuiObjectFromValue(&zval
);
1613 znode
= zslInsert(dstzset
->zsl
,score
,tmp
);
1614 incrRefCount(zval
.ele
); /* added to skiplist */
1615 dictAdd(dstzset
->dict
,tmp
,&znode
->score
);
1616 incrRefCount(zval
.ele
); /* added to dictionary */
1618 if (tmp
->encoding
== REDIS_ENCODING_RAW
)
1619 if (sdslen(tmp
->ptr
) > maxelelen
)
1620 maxelelen
= sdslen(tmp
->ptr
);
1624 redisPanic("Unknown operator");
1627 for (i
= 0; i
< setnum
; i
++)
1628 zuiClearIterator(&src
[i
]);
1630 if (dbDelete(c
->db
,dstkey
)) {
1631 signalModifiedKey(c
->db
,dstkey
);
1635 if (dstzset
->zsl
->length
) {
1636 /* Convert to ziplist when in limits. */
1637 if (dstzset
->zsl
->length
<= server
.zset_max_ziplist_entries
&&
1638 maxelelen
<= server
.zset_max_ziplist_value
)
1639 zsetConvert(dstobj
,REDIS_ENCODING_ZIPLIST
);
1641 dbAdd(c
->db
,dstkey
,dstobj
);
1642 addReplyLongLong(c
,zsetLength(dstobj
));
1643 if (!touched
) signalModifiedKey(c
->db
,dstkey
);
1646 decrRefCount(dstobj
);
1647 addReply(c
,shared
.czero
);
1652 void zunionstoreCommand(redisClient
*c
) {
1653 zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_UNION
);
1656 void zinterstoreCommand(redisClient
*c
) {
1657 zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_INTER
);
1660 void zrangeGenericCommand(redisClient
*c
, int reverse
) {
1661 robj
*key
= c
->argv
[1];
1669 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
1670 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
1672 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
1674 } else if (c
->argc
>= 5) {
1675 addReply(c
,shared
.syntaxerr
);
1679 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
1680 || checkType(c
,zobj
,REDIS_ZSET
)) return;
1682 /* Sanitize indexes. */
1683 llen
= zsetLength(zobj
);
1684 if (start
< 0) start
= llen
+start
;
1685 if (end
< 0) end
= llen
+end
;
1686 if (start
< 0) start
= 0;
1688 /* Invariant: start >= 0, so this test will be true when end < 0.
1689 * The range is empty when start > end or start >= length. */
1690 if (start
> end
|| start
>= llen
) {
1691 addReply(c
,shared
.emptymultibulk
);
1694 if (end
>= llen
) end
= llen
-1;
1695 rangelen
= (end
-start
)+1;
1697 /* Return the result in form of a multi-bulk reply */
1698 addReplyMultiBulkLen(c
, withscores
? (rangelen
*2) : rangelen
);
1700 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1701 unsigned char *zl
= zobj
->ptr
;
1702 unsigned char *eptr
, *sptr
;
1703 unsigned char *vstr
;
1708 eptr
= ziplistIndex(zl
,-2-(2*start
));
1710 eptr
= ziplistIndex(zl
,2*start
);
1712 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
1713 sptr
= ziplistNext(zl
,eptr
);
1715 while (rangelen
--) {
1716 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
&& sptr
!= NULL
);
1717 redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
1719 addReplyBulkLongLong(c
,vlong
);
1721 addReplyBulkCBuffer(c
,vstr
,vlen
);
1724 addReplyDouble(c
,zzlGetScore(sptr
));
1727 zzlPrev(zl
,&eptr
,&sptr
);
1729 zzlNext(zl
,&eptr
,&sptr
);
1732 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1733 zset
*zs
= zobj
->ptr
;
1734 zskiplist
*zsl
= zs
->zsl
;
1738 /* Check if starting point is trivial, before doing log(N) lookup. */
1742 ln
= zslGetElementByRank(zsl
,llen
-start
);
1744 ln
= zsl
->header
->level
[0].forward
;
1746 ln
= zslGetElementByRank(zsl
,start
+1);
1750 redisAssertWithInfo(c
,zobj
,ln
!= NULL
);
1752 addReplyBulk(c
,ele
);
1754 addReplyDouble(c
,ln
->score
);
1755 ln
= reverse
? ln
->backward
: ln
->level
[0].forward
;
1758 redisPanic("Unknown sorted set encoding");
1762 void zrangeCommand(redisClient
*c
) {
1763 zrangeGenericCommand(c
,0);
1766 void zrevrangeCommand(redisClient
*c
) {
1767 zrangeGenericCommand(c
,1);
1770 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
1771 void genericZrangebyscoreCommand(redisClient
*c
, int reverse
) {
1773 robj
*key
= c
->argv
[1];
1775 long offset
= 0, limit
= -1;
1777 unsigned long rangelen
= 0;
1778 void *replylen
= NULL
;
1781 /* Parse the range arguments. */
1783 /* Range is given as [max,min] */
1784 maxidx
= 2; minidx
= 3;
1786 /* Range is given as [min,max] */
1787 minidx
= 2; maxidx
= 3;
1790 if (zslParseRange(c
->argv
[minidx
],c
->argv
[maxidx
],&range
) != REDIS_OK
) {
1791 addReplyError(c
,"min or max is not a float");
1795 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1796 * 4 arguments, so we'll never enter the following code path. */
1798 int remaining
= c
->argc
- 4;
1802 if (remaining
>= 1 && !strcasecmp(c
->argv
[pos
]->ptr
,"withscores")) {
1805 } else if (remaining
>= 3 && !strcasecmp(c
->argv
[pos
]->ptr
,"limit")) {
1806 if ((getLongFromObjectOrReply(c
, c
->argv
[pos
+1], &offset
, NULL
) != REDIS_OK
) ||
1807 (getLongFromObjectOrReply(c
, c
->argv
[pos
+2], &limit
, NULL
) != REDIS_OK
)) return;
1808 pos
+= 3; remaining
-= 3;
1810 addReply(c
,shared
.syntaxerr
);
1816 /* Ok, lookup the key and get the range */
1817 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
||
1818 checkType(c
,zobj
,REDIS_ZSET
)) return;
1820 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1821 unsigned char *zl
= zobj
->ptr
;
1822 unsigned char *eptr
, *sptr
;
1823 unsigned char *vstr
;
1828 /* If reversed, get the last node in range as starting point. */
1830 eptr
= zzlLastInRange(zl
,range
);
1832 eptr
= zzlFirstInRange(zl
,range
);
1835 /* No "first" element in the specified interval. */
1837 addReply(c
, shared
.emptymultibulk
);
1841 /* Get score pointer for the first element. */
1842 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
1843 sptr
= ziplistNext(zl
,eptr
);
1845 /* We don't know in advance how many matching elements there are in the
1846 * list, so we push this object that will represent the multi-bulk
1847 * length in the output buffer, and will "fix" it later */
1848 replylen
= addDeferredMultiBulkLength(c
);
1850 /* If there is an offset, just traverse the number of elements without
1851 * checking the score because that is done in the next loop. */
1852 while (eptr
&& offset
--) {
1854 zzlPrev(zl
,&eptr
,&sptr
);
1856 zzlNext(zl
,&eptr
,&sptr
);
1860 while (eptr
&& limit
--) {
1861 score
= zzlGetScore(sptr
);
1863 /* Abort when the node is no longer in range. */
1865 if (!zslValueGteMin(score
,&range
)) break;
1867 if (!zslValueLteMax(score
,&range
)) break;
1870 /* We know the element exists, so ziplistGet should always succeed */
1871 redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
1875 addReplyBulkLongLong(c
,vlong
);
1877 addReplyBulkCBuffer(c
,vstr
,vlen
);
1881 addReplyDouble(c
,score
);
1884 /* Move to next node */
1886 zzlPrev(zl
,&eptr
,&sptr
);
1888 zzlNext(zl
,&eptr
,&sptr
);
1891 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1892 zset
*zs
= zobj
->ptr
;
1893 zskiplist
*zsl
= zs
->zsl
;
1896 /* If reversed, get the last node in range as starting point. */
1898 ln
= zslLastInRange(zsl
,range
);
1900 ln
= zslFirstInRange(zsl
,range
);
1903 /* No "first" element in the specified interval. */
1905 addReply(c
, shared
.emptymultibulk
);
1909 /* We don't know in advance how many matching elements there are in the
1910 * list, so we push this object that will represent the multi-bulk
1911 * length in the output buffer, and will "fix" it later */
1912 replylen
= addDeferredMultiBulkLength(c
);
1914 /* If there is an offset, just traverse the number of elements without
1915 * checking the score because that is done in the next loop. */
1916 while (ln
&& offset
--) {
1920 ln
= ln
->level
[0].forward
;
1924 while (ln
&& limit
--) {
1925 /* Abort when the node is no longer in range. */
1927 if (!zslValueGteMin(ln
->score
,&range
)) break;
1929 if (!zslValueLteMax(ln
->score
,&range
)) break;
1933 addReplyBulk(c
,ln
->obj
);
1936 addReplyDouble(c
,ln
->score
);
1939 /* Move to next node */
1943 ln
= ln
->level
[0].forward
;
1947 redisPanic("Unknown sorted set encoding");
1954 setDeferredMultiBulkLength(c
, replylen
, rangelen
);
1957 void zrangebyscoreCommand(redisClient
*c
) {
1958 genericZrangebyscoreCommand(c
,0);
1961 void zrevrangebyscoreCommand(redisClient
*c
) {
1962 genericZrangebyscoreCommand(c
,1);
1965 void zcountCommand(redisClient
*c
) {
1966 robj
*key
= c
->argv
[1];
1971 /* Parse the range arguments */
1972 if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) {
1973 addReplyError(c
,"min or max is not a float");
1977 /* Lookup the sorted set */
1978 if ((zobj
= lookupKeyReadOrReply(c
, key
, shared
.czero
)) == NULL
||
1979 checkType(c
, zobj
, REDIS_ZSET
)) return;
1981 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1982 unsigned char *zl
= zobj
->ptr
;
1983 unsigned char *eptr
, *sptr
;
1986 /* Use the first element in range as the starting point */
1987 eptr
= zzlFirstInRange(zl
,range
);
1989 /* No "first" element */
1991 addReply(c
, shared
.czero
);
1995 /* First element is in range */
1996 sptr
= ziplistNext(zl
,eptr
);
1997 score
= zzlGetScore(sptr
);
1998 redisAssertWithInfo(c
,zobj
,zslValueLteMax(score
,&range
));
2000 /* Iterate over elements in range */
2002 score
= zzlGetScore(sptr
);
2004 /* Abort when the node is no longer in range. */
2005 if (!zslValueLteMax(score
,&range
)) {
2009 zzlNext(zl
,&eptr
,&sptr
);
2012 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2013 zset
*zs
= zobj
->ptr
;
2014 zskiplist
*zsl
= zs
->zsl
;
2018 /* Find first element in range */
2019 zn
= zslFirstInRange(zsl
, range
);
2021 /* Use rank of first element, if any, to determine preliminary count */
2023 rank
= zslGetRank(zsl
, zn
->score
, zn
->obj
);
2024 count
= (zsl
->length
- (rank
- 1));
2026 /* Find last element in range */
2027 zn
= zslLastInRange(zsl
, range
);
2029 /* Use rank of last element, if any, to determine the actual count */
2031 rank
= zslGetRank(zsl
, zn
->score
, zn
->obj
);
2032 count
-= (zsl
->length
- rank
);
2036 redisPanic("Unknown sorted set encoding");
2039 addReplyLongLong(c
, count
);
2042 void zcardCommand(redisClient
*c
) {
2043 robj
*key
= c
->argv
[1];
2046 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.czero
)) == NULL
||
2047 checkType(c
,zobj
,REDIS_ZSET
)) return;
2049 addReplyLongLong(c
,zsetLength(zobj
));
2052 void zscoreCommand(redisClient
*c
) {
2053 robj
*key
= c
->argv
[1];
2057 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL
||
2058 checkType(c
,zobj
,REDIS_ZSET
)) return;
2060 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
2061 if (zzlFind(zobj
->ptr
,c
->argv
[2],&score
) != NULL
)
2062 addReplyDouble(c
,score
);
2064 addReply(c
,shared
.nullbulk
);
2065 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2066 zset
*zs
= zobj
->ptr
;
2069 c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
2070 de
= dictFind(zs
->dict
,c
->argv
[2]);
2072 score
= *(double*)dictGetVal(de
);
2073 addReplyDouble(c
,score
);
2075 addReply(c
,shared
.nullbulk
);
2078 redisPanic("Unknown sorted set encoding");
2082 void zrankGenericCommand(redisClient
*c
, int reverse
) {
2083 robj
*key
= c
->argv
[1];
2084 robj
*ele
= c
->argv
[2];
2089 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL
||
2090 checkType(c
,zobj
,REDIS_ZSET
)) return;
2091 llen
= zsetLength(zobj
);
2093 redisAssertWithInfo(c
,ele
,ele
->encoding
== REDIS_ENCODING_RAW
);
2094 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
2095 unsigned char *zl
= zobj
->ptr
;
2096 unsigned char *eptr
, *sptr
;
2098 eptr
= ziplistIndex(zl
,0);
2099 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
2100 sptr
= ziplistNext(zl
,eptr
);
2101 redisAssertWithInfo(c
,zobj
,sptr
!= NULL
);
2104 while(eptr
!= NULL
) {
2105 if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
)))
2108 zzlNext(zl
,&eptr
,&sptr
);
2113 addReplyLongLong(c
,llen
-rank
);
2115 addReplyLongLong(c
,rank
-1);
2117 addReply(c
,shared
.nullbulk
);
2119 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2120 zset
*zs
= zobj
->ptr
;
2121 zskiplist
*zsl
= zs
->zsl
;
2125 ele
= c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
2126 de
= dictFind(zs
->dict
,ele
);
2128 score
= *(double*)dictGetVal(de
);
2129 rank
= zslGetRank(zsl
,score
,ele
);
2130 redisAssertWithInfo(c
,ele
,rank
); /* Existing elements always have a rank. */
2132 addReplyLongLong(c
,llen
-rank
);
2134 addReplyLongLong(c
,rank
-1);
2136 addReply(c
,shared
.nullbulk
);
2139 redisPanic("Unknown sorted set encoding");
2143 void zrankCommand(redisClient
*c
) {
2144 zrankGenericCommand(c
, 0);
2147 void zrevrankCommand(redisClient
*c
) {
2148 zrankGenericCommand(c
, 1);