]>
git.saurik.com Git - redis.git/blob - src/t_zset.c
5 /*-----------------------------------------------------------------------------
7 *----------------------------------------------------------------------------*/
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated scores.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
26 zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
27 zskiplistNode
*zn
= zmalloc(sizeof(*zn
)+level
*sizeof(struct zskiplistLevel
));
33 zskiplist
*zslCreate(void) {
37 zsl
= zmalloc(sizeof(*zsl
));
40 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
41 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++) {
42 zsl
->header
->level
[j
].forward
= NULL
;
43 zsl
->header
->level
[j
].span
= 0;
45 zsl
->header
->backward
= NULL
;
50 void zslFreeNode(zskiplistNode
*node
) {
51 decrRefCount(node
->obj
);
55 void zslFree(zskiplist
*zsl
) {
56 zskiplistNode
*node
= zsl
->header
->level
[0].forward
, *next
;
60 next
= node
->level
[0].forward
;
67 /* Returns a random level for the new skiplist node we are going to create.
68 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
69 * (both inclusive), with a powerlaw-alike distribution where higher
70 * levels are less likely to be returned. */
71 int zslRandomLevel(void) {
73 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
75 return (level
<ZSKIPLIST_MAXLEVEL
) ? level
: ZSKIPLIST_MAXLEVEL
;
78 zskiplistNode
*zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
79 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
80 unsigned int rank
[ZSKIPLIST_MAXLEVEL
];
83 redisAssert(!isnan(score
));
85 for (i
= zsl
->level
-1; i
>= 0; i
--) {
86 /* store rank that is crossed to reach the insert position */
87 rank
[i
] = i
== (zsl
->level
-1) ? 0 : rank
[i
+1];
88 while (x
->level
[i
].forward
&&
89 (x
->level
[i
].forward
->score
< score
||
90 (x
->level
[i
].forward
->score
== score
&&
91 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) {
92 rank
[i
] += x
->level
[i
].span
;
93 x
= x
->level
[i
].forward
;
97 /* we assume the key is not already inside, since we allow duplicated
98 * scores, and the re-insertion of score and redis object should never
99 * happpen since the caller of zslInsert() should test in the hash table
100 * if the element is already inside or not. */
101 level
= zslRandomLevel();
102 if (level
> zsl
->level
) {
103 for (i
= zsl
->level
; i
< level
; i
++) {
105 update
[i
] = zsl
->header
;
106 update
[i
]->level
[i
].span
= zsl
->length
;
110 x
= zslCreateNode(level
,score
,obj
);
111 for (i
= 0; i
< level
; i
++) {
112 x
->level
[i
].forward
= update
[i
]->level
[i
].forward
;
113 update
[i
]->level
[i
].forward
= x
;
115 /* update span covered by update[i] as x is inserted here */
116 x
->level
[i
].span
= update
[i
]->level
[i
].span
- (rank
[0] - rank
[i
]);
117 update
[i
]->level
[i
].span
= (rank
[0] - rank
[i
]) + 1;
120 /* increment span for untouched levels */
121 for (i
= level
; i
< zsl
->level
; i
++) {
122 update
[i
]->level
[i
].span
++;
125 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
126 if (x
->level
[0].forward
)
127 x
->level
[0].forward
->backward
= x
;
134 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
135 void zslDeleteNode(zskiplist
*zsl
, zskiplistNode
*x
, zskiplistNode
**update
) {
137 for (i
= 0; i
< zsl
->level
; i
++) {
138 if (update
[i
]->level
[i
].forward
== x
) {
139 update
[i
]->level
[i
].span
+= x
->level
[i
].span
- 1;
140 update
[i
]->level
[i
].forward
= x
->level
[i
].forward
;
142 update
[i
]->level
[i
].span
-= 1;
145 if (x
->level
[0].forward
) {
146 x
->level
[0].forward
->backward
= x
->backward
;
148 zsl
->tail
= x
->backward
;
150 while(zsl
->level
> 1 && zsl
->header
->level
[zsl
->level
-1].forward
== NULL
)
155 /* Delete an element with matching score/object from the skiplist. */
156 int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
157 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
161 for (i
= zsl
->level
-1; i
>= 0; i
--) {
162 while (x
->level
[i
].forward
&&
163 (x
->level
[i
].forward
->score
< score
||
164 (x
->level
[i
].forward
->score
== score
&&
165 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0)))
166 x
= x
->level
[i
].forward
;
169 /* We may have multiple elements with the same score, what we need
170 * is to find the element with both the right score and object. */
171 x
= x
->level
[0].forward
;
172 if (x
&& score
== x
->score
&& equalStringObjects(x
->obj
,obj
)) {
173 zslDeleteNode(zsl
, x
, update
);
177 return 0; /* not found */
179 return 0; /* not found */
182 static int zslValueGteMin(double value
, zrangespec
*spec
) {
183 return spec
->minex
? (value
> spec
->min
) : (value
>= spec
->min
);
186 static int zslValueLteMax(double value
, zrangespec
*spec
) {
187 return spec
->maxex
? (value
< spec
->max
) : (value
<= spec
->max
);
190 /* Returns if there is a part of the zset is in range. */
191 int zslIsInRange(zskiplist
*zsl
, zrangespec
*range
) {
194 /* Test for ranges that will always be empty. */
195 if (range
->min
> range
->max
||
196 (range
->min
== range
->max
&& (range
->minex
|| range
->maxex
)))
199 if (x
== NULL
|| !zslValueGteMin(x
->score
,range
))
201 x
= zsl
->header
->level
[0].forward
;
202 if (x
== NULL
|| !zslValueLteMax(x
->score
,range
))
207 /* Find the first node that is contained in the specified range.
208 * Returns NULL when no element is contained in the range. */
209 zskiplistNode
*zslFirstInRange(zskiplist
*zsl
, zrangespec range
) {
213 /* If everything is out of range, return early. */
214 if (!zslIsInRange(zsl
,&range
)) return NULL
;
217 for (i
= zsl
->level
-1; i
>= 0; i
--) {
218 /* Go forward while *OUT* of range. */
219 while (x
->level
[i
].forward
&&
220 !zslValueGteMin(x
->level
[i
].forward
->score
,&range
))
221 x
= x
->level
[i
].forward
;
224 /* This is an inner range, so the next node cannot be NULL. */
225 x
= x
->level
[0].forward
;
226 redisAssert(x
!= NULL
);
228 /* Check if score <= max. */
229 if (!zslValueLteMax(x
->score
,&range
)) return NULL
;
233 /* Find the last node that is contained in the specified range.
234 * Returns NULL when no element is contained in the range. */
235 zskiplistNode
*zslLastInRange(zskiplist
*zsl
, zrangespec range
) {
239 /* If everything is out of range, return early. */
240 if (!zslIsInRange(zsl
,&range
)) return NULL
;
243 for (i
= zsl
->level
-1; i
>= 0; i
--) {
244 /* Go forward while *IN* range. */
245 while (x
->level
[i
].forward
&&
246 zslValueLteMax(x
->level
[i
].forward
->score
,&range
))
247 x
= x
->level
[i
].forward
;
250 /* This is an inner range, so this node cannot be NULL. */
251 redisAssert(x
!= NULL
);
253 /* Check if score >= min. */
254 if (!zslValueGteMin(x
->score
,&range
)) return NULL
;
258 /* Delete all the elements with score between min and max from the skiplist.
259 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
260 * Note that this function takes the reference to the hash table view of the
261 * sorted set, in order to remove the elements from the hash table too. */
262 unsigned long zslDeleteRangeByScore(zskiplist
*zsl
, zrangespec range
, dict
*dict
) {
263 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
264 unsigned long removed
= 0;
268 for (i
= zsl
->level
-1; i
>= 0; i
--) {
269 while (x
->level
[i
].forward
&& (range
.minex
?
270 x
->level
[i
].forward
->score
<= range
.min
:
271 x
->level
[i
].forward
->score
< range
.min
))
272 x
= x
->level
[i
].forward
;
276 /* Current node is the last with score < or <= min. */
277 x
= x
->level
[0].forward
;
279 /* Delete nodes while in range. */
280 while (x
&& (range
.maxex
? x
->score
< range
.max
: x
->score
<= range
.max
)) {
281 zskiplistNode
*next
= x
->level
[0].forward
;
282 zslDeleteNode(zsl
,x
,update
);
283 dictDelete(dict
,x
->obj
);
291 /* Delete all the elements with rank between start and end from the skiplist.
292 * Start and end are inclusive. Note that start and end need to be 1-based */
293 unsigned long zslDeleteRangeByRank(zskiplist
*zsl
, unsigned int start
, unsigned int end
, dict
*dict
) {
294 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
295 unsigned long traversed
= 0, removed
= 0;
299 for (i
= zsl
->level
-1; i
>= 0; i
--) {
300 while (x
->level
[i
].forward
&& (traversed
+ x
->level
[i
].span
) < start
) {
301 traversed
+= x
->level
[i
].span
;
302 x
= x
->level
[i
].forward
;
308 x
= x
->level
[0].forward
;
309 while (x
&& traversed
<= end
) {
310 zskiplistNode
*next
= x
->level
[0].forward
;
311 zslDeleteNode(zsl
,x
,update
);
312 dictDelete(dict
,x
->obj
);
321 /* Find the rank for an element by both score and key.
322 * Returns 0 when the element cannot be found, rank otherwise.
323 * Note that the rank is 1-based due to the span of zsl->header to the
325 unsigned long zslGetRank(zskiplist
*zsl
, double score
, robj
*o
) {
327 unsigned long rank
= 0;
331 for (i
= zsl
->level
-1; i
>= 0; i
--) {
332 while (x
->level
[i
].forward
&&
333 (x
->level
[i
].forward
->score
< score
||
334 (x
->level
[i
].forward
->score
== score
&&
335 compareStringObjects(x
->level
[i
].forward
->obj
,o
) <= 0))) {
336 rank
+= x
->level
[i
].span
;
337 x
= x
->level
[i
].forward
;
340 /* x might be equal to zsl->header, so test if obj is non-NULL */
341 if (x
->obj
&& equalStringObjects(x
->obj
,o
)) {
348 /* Finds an element by its rank. The rank argument needs to be 1-based. */
349 zskiplistNode
* zslGetElementByRank(zskiplist
*zsl
, unsigned long rank
) {
351 unsigned long traversed
= 0;
355 for (i
= zsl
->level
-1; i
>= 0; i
--) {
356 while (x
->level
[i
].forward
&& (traversed
+ x
->level
[i
].span
) <= rank
)
358 traversed
+= x
->level
[i
].span
;
359 x
= x
->level
[i
].forward
;
361 if (traversed
== rank
) {
368 /* Populate the rangespec according to the objects min and max. */
369 static int zslParseRange(robj
*min
, robj
*max
, zrangespec
*spec
) {
371 spec
->minex
= spec
->maxex
= 0;
373 /* Parse the min-max interval. If one of the values is prefixed
374 * by the "(" character, it's considered "open". For instance
375 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
376 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
377 if (min
->encoding
== REDIS_ENCODING_INT
) {
378 spec
->min
= (long)min
->ptr
;
380 if (((char*)min
->ptr
)[0] == '(') {
381 spec
->min
= strtod((char*)min
->ptr
+1,&eptr
);
382 if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
;
385 spec
->min
= strtod((char*)min
->ptr
,&eptr
);
386 if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
;
389 if (max
->encoding
== REDIS_ENCODING_INT
) {
390 spec
->max
= (long)max
->ptr
;
392 if (((char*)max
->ptr
)[0] == '(') {
393 spec
->max
= strtod((char*)max
->ptr
+1,&eptr
);
394 if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
;
397 spec
->max
= strtod((char*)max
->ptr
,&eptr
);
398 if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
;
405 /*-----------------------------------------------------------------------------
406 * Ziplist-backed sorted set API
407 *----------------------------------------------------------------------------*/
409 double zzlGetScore(unsigned char *sptr
) {
416 redisAssert(sptr
!= NULL
);
417 redisAssert(ziplistGet(sptr
,&vstr
,&vlen
,&vlong
));
420 memcpy(buf
,vstr
,vlen
);
422 score
= strtod(buf
,NULL
);
430 /* Compare element in sorted set with given element. */
431 int zzlCompareElements(unsigned char *eptr
, unsigned char *cstr
, unsigned int clen
) {
435 unsigned char vbuf
[32];
438 redisAssert(ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
440 /* Store string representation of long long in buf. */
441 vlen
= ll2string((char*)vbuf
,sizeof(vbuf
),vlong
);
445 minlen
= (vlen
< clen
) ? vlen
: clen
;
446 cmp
= memcmp(vstr
,cstr
,minlen
);
447 if (cmp
== 0) return vlen
-clen
;
451 unsigned int zzlLength(unsigned char *zl
) {
452 return ziplistLen(zl
)/2;
455 /* Move to next entry based on the values in eptr and sptr. Both are set to
456 * NULL when there is no next entry. */
457 void zzlNext(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) {
458 unsigned char *_eptr
, *_sptr
;
459 redisAssert(*eptr
!= NULL
&& *sptr
!= NULL
);
461 _eptr
= ziplistNext(zl
,*sptr
);
463 _sptr
= ziplistNext(zl
,_eptr
);
464 redisAssert(_sptr
!= NULL
);
474 /* Move to the previous entry based on the values in eptr and sptr. Both are
475 * set to NULL when there is no next entry. */
476 void zzlPrev(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) {
477 unsigned char *_eptr
, *_sptr
;
478 redisAssert(*eptr
!= NULL
&& *sptr
!= NULL
);
480 _sptr
= ziplistPrev(zl
,*eptr
);
482 _eptr
= ziplistPrev(zl
,_sptr
);
483 redisAssert(_eptr
!= NULL
);
485 /* No previous entry. */
493 /* Returns if there is a part of the zset is in range. Should only be used
494 * internally by zzlFirstInRange and zzlLastInRange. */
495 int zzlIsInRange(unsigned char *zl
, zrangespec
*range
) {
499 /* Test for ranges that will always be empty. */
500 if (range
->min
> range
->max
||
501 (range
->min
== range
->max
&& (range
->minex
|| range
->maxex
)))
504 p
= ziplistIndex(zl
,-1); /* Last score. */
505 if (p
== NULL
) return 0; /* Empty sorted set */
506 score
= zzlGetScore(p
);
507 if (!zslValueGteMin(score
,range
))
510 p
= ziplistIndex(zl
,1); /* First score. */
511 redisAssert(p
!= NULL
);
512 score
= zzlGetScore(p
);
513 if (!zslValueLteMax(score
,range
))
519 /* Find pointer to the first element contained in the specified range.
520 * Returns NULL when no element is contained in the range. */
521 unsigned char *zzlFirstInRange(unsigned char *zl
, zrangespec range
) {
522 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
525 /* If everything is out of range, return early. */
526 if (!zzlIsInRange(zl
,&range
)) return NULL
;
528 while (eptr
!= NULL
) {
529 sptr
= ziplistNext(zl
,eptr
);
530 redisAssert(sptr
!= NULL
);
532 score
= zzlGetScore(sptr
);
533 if (zslValueGteMin(score
,&range
)) {
534 /* Check if score <= max. */
535 if (zslValueLteMax(score
,&range
))
540 /* Move to next element. */
541 eptr
= ziplistNext(zl
,sptr
);
547 /* Find pointer to the last element contained in the specified range.
548 * Returns NULL when no element is contained in the range. */
549 unsigned char *zzlLastInRange(unsigned char *zl
, zrangespec range
) {
550 unsigned char *eptr
= ziplistIndex(zl
,-2), *sptr
;
553 /* If everything is out of range, return early. */
554 if (!zzlIsInRange(zl
,&range
)) return NULL
;
556 while (eptr
!= NULL
) {
557 sptr
= ziplistNext(zl
,eptr
);
558 redisAssert(sptr
!= NULL
);
560 score
= zzlGetScore(sptr
);
561 if (zslValueLteMax(score
,&range
)) {
562 /* Check if score >= min. */
563 if (zslValueGteMin(score
,&range
))
568 /* Move to previous element by moving to the score of previous element.
569 * When this returns NULL, we know there also is no element. */
570 sptr
= ziplistPrev(zl
,eptr
);
572 redisAssert((eptr
= ziplistPrev(zl
,sptr
)) != NULL
);
580 unsigned char *zzlFind(unsigned char *zl
, robj
*ele
, double *score
) {
581 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
583 ele
= getDecodedObject(ele
);
584 while (eptr
!= NULL
) {
585 sptr
= ziplistNext(zl
,eptr
);
586 redisAssertWithInfo(NULL
,ele
,sptr
!= NULL
);
588 if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
))) {
589 /* Matching element, pull out score. */
590 if (score
!= NULL
) *score
= zzlGetScore(sptr
);
595 /* Move to next element. */
596 eptr
= ziplistNext(zl
,sptr
);
603 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
604 * don't want to modify the one given as argument. */
605 unsigned char *zzlDelete(unsigned char *zl
, unsigned char *eptr
) {
606 unsigned char *p
= eptr
;
608 /* TODO: add function to ziplist API to delete N elements from offset. */
609 zl
= ziplistDelete(zl
,&p
);
610 zl
= ziplistDelete(zl
,&p
);
614 unsigned char *zzlInsertAt(unsigned char *zl
, unsigned char *eptr
, robj
*ele
, double score
) {
620 redisAssertWithInfo(NULL
,ele
,ele
->encoding
== REDIS_ENCODING_RAW
);
621 scorelen
= d2string(scorebuf
,sizeof(scorebuf
),score
);
623 zl
= ziplistPush(zl
,ele
->ptr
,sdslen(ele
->ptr
),ZIPLIST_TAIL
);
624 zl
= ziplistPush(zl
,(unsigned char*)scorebuf
,scorelen
,ZIPLIST_TAIL
);
626 /* Keep offset relative to zl, as it might be re-allocated. */
628 zl
= ziplistInsert(zl
,eptr
,ele
->ptr
,sdslen(ele
->ptr
));
631 /* Insert score after the element. */
632 redisAssertWithInfo(NULL
,ele
,(sptr
= ziplistNext(zl
,eptr
)) != NULL
);
633 zl
= ziplistInsert(zl
,sptr
,(unsigned char*)scorebuf
,scorelen
);
639 /* Insert (element,score) pair in ziplist. This function assumes the element is
640 * not yet present in the list. */
641 unsigned char *zzlInsert(unsigned char *zl
, robj
*ele
, double score
) {
642 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
645 ele
= getDecodedObject(ele
);
646 while (eptr
!= NULL
) {
647 sptr
= ziplistNext(zl
,eptr
);
648 redisAssertWithInfo(NULL
,ele
,sptr
!= NULL
);
649 s
= zzlGetScore(sptr
);
652 /* First element with score larger than score for element to be
653 * inserted. This means we should take its spot in the list to
654 * maintain ordering. */
655 zl
= zzlInsertAt(zl
,eptr
,ele
,score
);
657 } else if (s
== score
) {
658 /* Ensure lexicographical ordering for elements. */
659 if (zzlCompareElements(eptr
,ele
->ptr
,sdslen(ele
->ptr
)) > 0) {
660 zl
= zzlInsertAt(zl
,eptr
,ele
,score
);
665 /* Move to next element. */
666 eptr
= ziplistNext(zl
,sptr
);
669 /* Push on tail of list when it was not yet inserted. */
671 zl
= zzlInsertAt(zl
,NULL
,ele
,score
);
677 unsigned char *zzlDeleteRangeByScore(unsigned char *zl
, zrangespec range
, unsigned long *deleted
) {
678 unsigned char *eptr
, *sptr
;
680 unsigned long num
= 0;
682 if (deleted
!= NULL
) *deleted
= 0;
684 eptr
= zzlFirstInRange(zl
,range
);
685 if (eptr
== NULL
) return zl
;
687 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
688 * byte and ziplistNext will return NULL. */
689 while ((sptr
= ziplistNext(zl
,eptr
)) != NULL
) {
690 score
= zzlGetScore(sptr
);
691 if (zslValueLteMax(score
,&range
)) {
692 /* Delete both the element and the score. */
693 zl
= ziplistDelete(zl
,&eptr
);
694 zl
= ziplistDelete(zl
,&eptr
);
697 /* No longer in range. */
702 if (deleted
!= NULL
) *deleted
= num
;
706 /* Delete all the elements with rank between start and end from the skiplist.
707 * Start and end are inclusive. Note that start and end need to be 1-based */
708 unsigned char *zzlDeleteRangeByRank(unsigned char *zl
, unsigned int start
, unsigned int end
, unsigned long *deleted
) {
709 unsigned int num
= (end
-start
)+1;
710 if (deleted
) *deleted
= num
;
711 zl
= ziplistDeleteRange(zl
,2*(start
-1),2*num
);
715 /*-----------------------------------------------------------------------------
716 * Common sorted set API
717 *----------------------------------------------------------------------------*/
719 unsigned int zsetLength(robj
*zobj
) {
721 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
722 length
= zzlLength(zobj
->ptr
);
723 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
724 length
= ((zset
*)zobj
->ptr
)->zsl
->length
;
726 redisPanic("Unknown sorted set encoding");
731 void zsetConvert(robj
*zobj
, int encoding
) {
733 zskiplistNode
*node
, *next
;
737 if (zobj
->encoding
== encoding
) return;
738 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
739 unsigned char *zl
= zobj
->ptr
;
740 unsigned char *eptr
, *sptr
;
745 if (encoding
!= REDIS_ENCODING_SKIPLIST
)
746 redisPanic("Unknown target encoding");
748 zs
= zmalloc(sizeof(*zs
));
749 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
750 zs
->zsl
= zslCreate();
752 eptr
= ziplistIndex(zl
,0);
753 redisAssertWithInfo(NULL
,zobj
,eptr
!= NULL
);
754 sptr
= ziplistNext(zl
,eptr
);
755 redisAssertWithInfo(NULL
,zobj
,sptr
!= NULL
);
757 while (eptr
!= NULL
) {
758 score
= zzlGetScore(sptr
);
759 redisAssertWithInfo(NULL
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
761 ele
= createStringObjectFromLongLong(vlong
);
763 ele
= createStringObject((char*)vstr
,vlen
);
765 /* Has incremented refcount since it was just created. */
766 node
= zslInsert(zs
->zsl
,score
,ele
);
767 redisAssertWithInfo(NULL
,zobj
,dictAdd(zs
->dict
,ele
,&node
->score
) == DICT_OK
);
768 incrRefCount(ele
); /* Added to dictionary. */
769 zzlNext(zl
,&eptr
,&sptr
);
774 zobj
->encoding
= REDIS_ENCODING_SKIPLIST
;
775 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
776 unsigned char *zl
= ziplistNew();
778 if (encoding
!= REDIS_ENCODING_ZIPLIST
)
779 redisPanic("Unknown target encoding");
781 /* Approach similar to zslFree(), since we want to free the skiplist at
782 * the same time as creating the ziplist. */
784 dictRelease(zs
->dict
);
785 node
= zs
->zsl
->header
->level
[0].forward
;
786 zfree(zs
->zsl
->header
);
790 ele
= getDecodedObject(node
->obj
);
791 zl
= zzlInsertAt(zl
,NULL
,ele
,node
->score
);
794 next
= node
->level
[0].forward
;
801 zobj
->encoding
= REDIS_ENCODING_ZIPLIST
;
803 redisPanic("Unknown sorted set encoding");
807 /*-----------------------------------------------------------------------------
808 * Sorted set commands
809 *----------------------------------------------------------------------------*/
811 /* This generic command implements both ZADD and ZINCRBY. */
812 void zaddGenericCommand(redisClient
*c
, int incr
) {
813 static char *nanerr
= "resulting score is not a number (NaN)";
814 robj
*key
= c
->argv
[1];
818 double score
= 0, *scores
, curscore
= 0.0;
819 int j
, elements
= (c
->argc
-2)/2;
823 addReply(c
,shared
.syntaxerr
);
827 /* Start parsing all the scores, we need to emit any syntax error
828 * before executing additions to the sorted set, as the command should
829 * either execute fully or nothing at all. */
830 scores
= zmalloc(sizeof(double)*elements
);
831 for (j
= 0; j
< elements
; j
++) {
832 if (getDoubleFromObjectOrReply(c
,c
->argv
[2+j
*2],&scores
[j
],NULL
)
840 /* Lookup the key and create the sorted set if does not exist. */
841 zobj
= lookupKeyWrite(c
->db
,key
);
843 if (server
.zset_max_ziplist_entries
== 0 ||
844 server
.zset_max_ziplist_value
< sdslen(c
->argv
[3]->ptr
))
846 zobj
= createZsetObject();
848 zobj
= createZsetZiplistObject();
850 dbAdd(c
->db
,key
,zobj
);
852 if (zobj
->type
!= REDIS_ZSET
) {
853 addReply(c
,shared
.wrongtypeerr
);
859 for (j
= 0; j
< elements
; j
++) {
862 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
865 /* Prefer non-encoded element when dealing with ziplists. */
866 ele
= c
->argv
[3+j
*2];
867 if ((eptr
= zzlFind(zobj
->ptr
,ele
,&curscore
)) != NULL
) {
871 addReplyError(c
,nanerr
);
872 /* Don't need to check if the sorted set is empty
873 * because we know it has at least one element. */
879 /* Remove and re-insert when score changed. */
880 if (score
!= curscore
) {
881 zobj
->ptr
= zzlDelete(zobj
->ptr
,eptr
);
882 zobj
->ptr
= zzlInsert(zobj
->ptr
,ele
,score
);
884 signalModifiedKey(c
->db
,key
);
888 /* Optimize: check if the element is too large or the list
889 * becomes too long *before* executing zzlInsert. */
890 zobj
->ptr
= zzlInsert(zobj
->ptr
,ele
,score
);
891 if (zzlLength(zobj
->ptr
) > server
.zset_max_ziplist_entries
)
892 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
);
893 if (sdslen(ele
->ptr
) > server
.zset_max_ziplist_value
)
894 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
);
896 signalModifiedKey(c
->db
,key
);
900 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
901 zset
*zs
= zobj
->ptr
;
902 zskiplistNode
*znode
;
905 ele
= c
->argv
[3+j
*2] = tryObjectEncoding(c
->argv
[3+j
*2]);
906 de
= dictFind(zs
->dict
,ele
);
908 curobj
= dictGetKey(de
);
909 curscore
= *(double*)dictGetVal(de
);
914 addReplyError(c
,nanerr
);
915 /* Don't need to check if the sorted set is empty
916 * because we know it has at least one element. */
922 /* Remove and re-insert when score changed. We can safely
923 * delete the key object from the skiplist, since the
924 * dictionary still has a reference to it. */
925 if (score
!= curscore
) {
926 redisAssertWithInfo(c
,curobj
,zslDelete(zs
->zsl
,curscore
,curobj
));
927 znode
= zslInsert(zs
->zsl
,score
,curobj
);
928 incrRefCount(curobj
); /* Re-inserted in skiplist. */
929 dictGetVal(de
) = &znode
->score
; /* Update score ptr. */
931 signalModifiedKey(c
->db
,key
);
935 znode
= zslInsert(zs
->zsl
,score
,ele
);
936 incrRefCount(ele
); /* Inserted in skiplist. */
937 redisAssertWithInfo(c
,NULL
,dictAdd(zs
->dict
,ele
,&znode
->score
) == DICT_OK
);
938 incrRefCount(ele
); /* Added to dictionary. */
940 signalModifiedKey(c
->db
,key
);
945 redisPanic("Unknown sorted set encoding");
949 if (incr
) /* ZINCRBY */
950 addReplyDouble(c
,score
);
952 addReplyLongLong(c
,added
);
955 void zaddCommand(redisClient
*c
) {
956 zaddGenericCommand(c
,0);
959 void zincrbyCommand(redisClient
*c
) {
960 zaddGenericCommand(c
,1);
963 void zremCommand(redisClient
*c
) {
964 robj
*key
= c
->argv
[1];
968 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
969 checkType(c
,zobj
,REDIS_ZSET
)) return;
971 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
974 for (j
= 2; j
< c
->argc
; j
++) {
975 if ((eptr
= zzlFind(zobj
->ptr
,c
->argv
[j
],NULL
)) != NULL
) {
977 zobj
->ptr
= zzlDelete(zobj
->ptr
,eptr
);
978 if (zzlLength(zobj
->ptr
) == 0) {
984 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
985 zset
*zs
= zobj
->ptr
;
989 for (j
= 2; j
< c
->argc
; j
++) {
990 de
= dictFind(zs
->dict
,c
->argv
[j
]);
994 /* Delete from the skiplist */
995 score
= *(double*)dictGetVal(de
);
996 redisAssertWithInfo(c
,c
->argv
[j
],zslDelete(zs
->zsl
,score
,c
->argv
[j
]));
998 /* Delete from the hash table */
999 dictDelete(zs
->dict
,c
->argv
[j
]);
1000 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1001 if (dictSize(zs
->dict
) == 0) {
1002 dbDelete(c
->db
,key
);
1008 redisPanic("Unknown sorted set encoding");
1012 signalModifiedKey(c
->db
,key
);
1013 server
.dirty
+= deleted
;
1015 addReplyLongLong(c
,deleted
);
1018 void zremrangebyscoreCommand(redisClient
*c
) {
1019 robj
*key
= c
->argv
[1];
1022 unsigned long deleted
;
1024 /* Parse the range arguments. */
1025 if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) {
1026 addReplyError(c
,"min or max is not a float");
1030 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
1031 checkType(c
,zobj
,REDIS_ZSET
)) return;
1033 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1034 zobj
->ptr
= zzlDeleteRangeByScore(zobj
->ptr
,range
,&deleted
);
1035 if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
);
1036 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1037 zset
*zs
= zobj
->ptr
;
1038 deleted
= zslDeleteRangeByScore(zs
->zsl
,range
,zs
->dict
);
1039 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1040 if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
);
1042 redisPanic("Unknown sorted set encoding");
1045 if (deleted
) signalModifiedKey(c
->db
,key
);
1046 server
.dirty
+= deleted
;
1047 addReplyLongLong(c
,deleted
);
1050 void zremrangebyrankCommand(redisClient
*c
) {
1051 robj
*key
= c
->argv
[1];
1056 unsigned long deleted
;
1058 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
1059 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
1061 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
1062 checkType(c
,zobj
,REDIS_ZSET
)) return;
1064 /* Sanitize indexes. */
1065 llen
= zsetLength(zobj
);
1066 if (start
< 0) start
= llen
+start
;
1067 if (end
< 0) end
= llen
+end
;
1068 if (start
< 0) start
= 0;
1070 /* Invariant: start >= 0, so this test will be true when end < 0.
1071 * The range is empty when start > end or start >= length. */
1072 if (start
> end
|| start
>= llen
) {
1073 addReply(c
,shared
.czero
);
1076 if (end
>= llen
) end
= llen
-1;
1078 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1079 /* Correct for 1-based rank. */
1080 zobj
->ptr
= zzlDeleteRangeByRank(zobj
->ptr
,start
+1,end
+1,&deleted
);
1081 if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
);
1082 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1083 zset
*zs
= zobj
->ptr
;
1085 /* Correct for 1-based rank. */
1086 deleted
= zslDeleteRangeByRank(zs
->zsl
,start
+1,end
+1,zs
->dict
);
1087 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1088 if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
);
1090 redisPanic("Unknown sorted set encoding");
1093 if (deleted
) signalModifiedKey(c
->db
,key
);
1094 server
.dirty
+= deleted
;
1095 addReplyLongLong(c
,deleted
);
1100 int type
; /* Set, sorted set */
1105 /* Set iterators. */
1118 /* Sorted set iterators. */
1122 unsigned char *eptr
, *sptr
;
1126 zskiplistNode
*node
;
1133 /* Use dirty flags for pointers that need to be cleaned up in the next
1134 * iteration over the zsetopval. The dirty flag for the long long value is
1135 * special, since long long values don't need cleanup. Instead, it means that
1136 * we already checked that "ell" holds a long long, or tried to convert another
1137 * representation into a long long value. When this was successful,
1138 * OPVAL_VALID_LL is set as well. */
1139 #define OPVAL_DIRTY_ROBJ 1
1140 #define OPVAL_DIRTY_LL 2
1141 #define OPVAL_VALID_LL 4
1143 /* Store value retrieved from the iterator. */
1146 unsigned char _buf
[32]; /* Private buffer. */
1148 unsigned char *estr
;
1154 typedef union _iterset iterset
;
1155 typedef union _iterzset iterzset
;
1157 void zuiInitIterator(zsetopsrc
*op
) {
1158 if (op
->subject
== NULL
)
1161 if (op
->type
== REDIS_SET
) {
1162 iterset
*it
= &op
->iter
.set
;
1163 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1164 it
->is
.is
= op
->subject
->ptr
;
1166 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1167 it
->ht
.dict
= op
->subject
->ptr
;
1168 it
->ht
.di
= dictGetIterator(op
->subject
->ptr
);
1169 it
->ht
.de
= dictNext(it
->ht
.di
);
1171 redisPanic("Unknown set encoding");
1173 } else if (op
->type
== REDIS_ZSET
) {
1174 iterzset
*it
= &op
->iter
.zset
;
1175 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1176 it
->zl
.zl
= op
->subject
->ptr
;
1177 it
->zl
.eptr
= ziplistIndex(it
->zl
.zl
,0);
1178 if (it
->zl
.eptr
!= NULL
) {
1179 it
->zl
.sptr
= ziplistNext(it
->zl
.zl
,it
->zl
.eptr
);
1180 redisAssert(it
->zl
.sptr
!= NULL
);
1182 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1183 it
->sl
.zs
= op
->subject
->ptr
;
1184 it
->sl
.node
= it
->sl
.zs
->zsl
->header
->level
[0].forward
;
1186 redisPanic("Unknown sorted set encoding");
1189 redisPanic("Unsupported type");
1193 void zuiClearIterator(zsetopsrc
*op
) {
1194 if (op
->subject
== NULL
)
1197 if (op
->type
== REDIS_SET
) {
1198 iterset
*it
= &op
->iter
.set
;
1199 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1200 REDIS_NOTUSED(it
); /* skip */
1201 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1202 dictReleaseIterator(it
->ht
.di
);
1204 redisPanic("Unknown set encoding");
1206 } else if (op
->type
== REDIS_ZSET
) {
1207 iterzset
*it
= &op
->iter
.zset
;
1208 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1209 REDIS_NOTUSED(it
); /* skip */
1210 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1211 REDIS_NOTUSED(it
); /* skip */
1213 redisPanic("Unknown sorted set encoding");
1216 redisPanic("Unsupported type");
1220 int zuiLength(zsetopsrc
*op
) {
1221 if (op
->subject
== NULL
)
1224 if (op
->type
== REDIS_SET
) {
1225 iterset
*it
= &op
->iter
.set
;
1226 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1227 return intsetLen(it
->is
.is
);
1228 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1229 return dictSize(it
->ht
.dict
);
1231 redisPanic("Unknown set encoding");
1233 } else if (op
->type
== REDIS_ZSET
) {
1234 iterzset
*it
= &op
->iter
.zset
;
1235 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1236 return zzlLength(it
->zl
.zl
);
1237 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1238 return it
->sl
.zs
->zsl
->length
;
1240 redisPanic("Unknown sorted set encoding");
1243 redisPanic("Unsupported type");
1247 /* Check if the current value is valid. If so, store it in the passed structure
1248 * and move to the next element. If not valid, this means we have reached the
1249 * end of the structure and can abort. */
1250 int zuiNext(zsetopsrc
*op
, zsetopval
*val
) {
1251 if (op
->subject
== NULL
)
1254 if (val
->flags
& OPVAL_DIRTY_ROBJ
)
1255 decrRefCount(val
->ele
);
1257 memset(val
,0,sizeof(zsetopval
));
1259 if (op
->type
== REDIS_SET
) {
1260 iterset
*it
= &op
->iter
.set
;
1261 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1264 if (!intsetGet(it
->is
.is
,it
->is
.ii
,&ell
))
1269 /* Move to next element. */
1271 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1272 if (it
->ht
.de
== NULL
)
1274 val
->ele
= dictGetKey(it
->ht
.de
);
1277 /* Move to next element. */
1278 it
->ht
.de
= dictNext(it
->ht
.di
);
1280 redisPanic("Unknown set encoding");
1282 } else if (op
->type
== REDIS_ZSET
) {
1283 iterzset
*it
= &op
->iter
.zset
;
1284 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1285 /* No need to check both, but better be explicit. */
1286 if (it
->zl
.eptr
== NULL
|| it
->zl
.sptr
== NULL
)
1288 redisAssert(ziplistGet(it
->zl
.eptr
,&val
->estr
,&val
->elen
,&val
->ell
));
1289 val
->score
= zzlGetScore(it
->zl
.sptr
);
1291 /* Move to next element. */
1292 zzlNext(it
->zl
.zl
,&it
->zl
.eptr
,&it
->zl
.sptr
);
1293 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1294 if (it
->sl
.node
== NULL
)
1296 val
->ele
= it
->sl
.node
->obj
;
1297 val
->score
= it
->sl
.node
->score
;
1299 /* Move to next element. */
1300 it
->sl
.node
= it
->sl
.node
->level
[0].forward
;
1302 redisPanic("Unknown sorted set encoding");
1305 redisPanic("Unsupported type");
1310 int zuiLongLongFromValue(zsetopval
*val
) {
1311 if (!(val
->flags
& OPVAL_DIRTY_LL
)) {
1312 val
->flags
|= OPVAL_DIRTY_LL
;
1314 if (val
->ele
!= NULL
) {
1315 if (val
->ele
->encoding
== REDIS_ENCODING_INT
) {
1316 val
->ell
= (long)val
->ele
->ptr
;
1317 val
->flags
|= OPVAL_VALID_LL
;
1318 } else if (val
->ele
->encoding
== REDIS_ENCODING_RAW
) {
1319 if (string2ll(val
->ele
->ptr
,sdslen(val
->ele
->ptr
),&val
->ell
))
1320 val
->flags
|= OPVAL_VALID_LL
;
1322 redisPanic("Unsupported element encoding");
1324 } else if (val
->estr
!= NULL
) {
1325 if (string2ll((char*)val
->estr
,val
->elen
,&val
->ell
))
1326 val
->flags
|= OPVAL_VALID_LL
;
1328 /* The long long was already set, flag as valid. */
1329 val
->flags
|= OPVAL_VALID_LL
;
1332 return val
->flags
& OPVAL_VALID_LL
;
1335 robj
*zuiObjectFromValue(zsetopval
*val
) {
1336 if (val
->ele
== NULL
) {
1337 if (val
->estr
!= NULL
) {
1338 val
->ele
= createStringObject((char*)val
->estr
,val
->elen
);
1340 val
->ele
= createStringObjectFromLongLong(val
->ell
);
1342 val
->flags
|= OPVAL_DIRTY_ROBJ
;
1347 int zuiBufferFromValue(zsetopval
*val
) {
1348 if (val
->estr
== NULL
) {
1349 if (val
->ele
!= NULL
) {
1350 if (val
->ele
->encoding
== REDIS_ENCODING_INT
) {
1351 val
->elen
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),(long)val
->ele
->ptr
);
1352 val
->estr
= val
->_buf
;
1353 } else if (val
->ele
->encoding
== REDIS_ENCODING_RAW
) {
1354 val
->elen
= sdslen(val
->ele
->ptr
);
1355 val
->estr
= val
->ele
->ptr
;
1357 redisPanic("Unsupported element encoding");
1360 val
->elen
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),val
->ell
);
1361 val
->estr
= val
->_buf
;
1367 /* Find value pointed to by val in the source pointer to by op. When found,
1368 * return 1 and store its score in target. Return 0 otherwise. */
1369 int zuiFind(zsetopsrc
*op
, zsetopval
*val
, double *score
) {
1370 if (op
->subject
== NULL
)
1373 if (op
->type
== REDIS_SET
) {
1374 iterset
*it
= &op
->iter
.set
;
1376 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1377 if (zuiLongLongFromValue(val
) && intsetFind(it
->is
.is
,val
->ell
)) {
1383 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1384 zuiObjectFromValue(val
);
1385 if (dictFind(it
->ht
.dict
,val
->ele
) != NULL
) {
1392 redisPanic("Unknown set encoding");
1394 } else if (op
->type
== REDIS_ZSET
) {
1395 iterzset
*it
= &op
->iter
.zset
;
1396 zuiObjectFromValue(val
);
1398 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1399 if (zzlFind(it
->zl
.zl
,val
->ele
,score
) != NULL
) {
1400 /* Score is already set by zzlFind. */
1405 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1407 if ((de
= dictFind(it
->sl
.zs
->dict
,val
->ele
)) != NULL
) {
1408 *score
= *(double*)dictGetVal(de
);
1414 redisPanic("Unknown sorted set encoding");
1417 redisPanic("Unsupported type");
1421 int zuiCompareByCardinality(const void *s1
, const void *s2
) {
1422 return zuiLength((zsetopsrc
*)s1
) - zuiLength((zsetopsrc
*)s2
);
1425 #define REDIS_AGGR_SUM 1
1426 #define REDIS_AGGR_MIN 2
1427 #define REDIS_AGGR_MAX 3
1428 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
1430 inline static void zunionInterAggregate(double *target
, double val
, int aggregate
) {
1431 if (aggregate
== REDIS_AGGR_SUM
) {
1432 *target
= *target
+ val
;
1433 /* The result of adding two doubles is NaN when one variable
1434 * is +inf and the other is -inf. When these numbers are added,
1435 * we maintain the convention of the result being 0.0. */
1436 if (isnan(*target
)) *target
= 0.0;
1437 } else if (aggregate
== REDIS_AGGR_MIN
) {
1438 *target
= val
< *target
? val
: *target
;
1439 } else if (aggregate
== REDIS_AGGR_MAX
) {
1440 *target
= val
> *target
? val
: *target
;
1443 redisPanic("Unknown ZUNION/INTER aggregate type");
1447 void zunionInterGenericCommand(redisClient
*c
, robj
*dstkey
, int op
) {
1450 int aggregate
= REDIS_AGGR_SUM
;
1454 unsigned int maxelelen
= 0;
1457 zskiplistNode
*znode
;
1460 /* expect setnum input keys to be given */
1461 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &setnum
, NULL
) != REDIS_OK
))
1466 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1470 /* test if the expected number of keys would overflow */
1471 if (3+setnum
> c
->argc
) {
1472 addReply(c
,shared
.syntaxerr
);
1476 /* read keys to be used for input */
1477 src
= zcalloc(sizeof(zsetopsrc
) * setnum
);
1478 for (i
= 0, j
= 3; i
< setnum
; i
++, j
++) {
1479 robj
*obj
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
1481 if (obj
->type
!= REDIS_ZSET
&& obj
->type
!= REDIS_SET
) {
1483 addReply(c
,shared
.wrongtypeerr
);
1487 src
[i
].subject
= obj
;
1488 src
[i
].type
= obj
->type
;
1489 src
[i
].encoding
= obj
->encoding
;
1491 src
[i
].subject
= NULL
;
1494 /* Default all weights to 1. */
1495 src
[i
].weight
= 1.0;
1498 /* parse optional extra arguments */
1500 int remaining
= c
->argc
- j
;
1503 if (remaining
>= (setnum
+ 1) && !strcasecmp(c
->argv
[j
]->ptr
,"weights")) {
1505 for (i
= 0; i
< setnum
; i
++, j
++, remaining
--) {
1506 if (getDoubleFromObjectOrReply(c
,c
->argv
[j
],&src
[i
].weight
,
1507 "weight value is not a float") != REDIS_OK
)
1513 } else if (remaining
>= 2 && !strcasecmp(c
->argv
[j
]->ptr
,"aggregate")) {
1515 if (!strcasecmp(c
->argv
[j
]->ptr
,"sum")) {
1516 aggregate
= REDIS_AGGR_SUM
;
1517 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"min")) {
1518 aggregate
= REDIS_AGGR_MIN
;
1519 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"max")) {
1520 aggregate
= REDIS_AGGR_MAX
;
1523 addReply(c
,shared
.syntaxerr
);
1529 addReply(c
,shared
.syntaxerr
);
1535 for (i
= 0; i
< setnum
; i
++)
1536 zuiInitIterator(&src
[i
]);
1538 /* sort sets from the smallest to largest, this will improve our
1539 * algorithm's performance */
1540 qsort(src
,setnum
,sizeof(zsetopsrc
),zuiCompareByCardinality
);
1542 dstobj
= createZsetObject();
1543 dstzset
= dstobj
->ptr
;
1544 memset(&zval
, 0, sizeof(zval
));
1546 if (op
== REDIS_OP_INTER
) {
1547 /* Skip everything if the smallest input is empty. */
1548 if (zuiLength(&src
[0]) > 0) {
1549 /* Precondition: as src[0] is non-empty and the inputs are ordered
1550 * by size, all src[i > 0] are non-empty too. */
1551 while (zuiNext(&src
[0],&zval
)) {
1552 double score
, value
;
1554 score
= src
[0].weight
* zval
.score
;
1555 if (isnan(score
)) score
= 0;
1557 for (j
= 1; j
< setnum
; j
++) {
1558 /* It is not safe to access the zset we are
1559 * iterating, so explicitly check for equal object. */
1560 if (src
[j
].subject
== src
[0].subject
) {
1561 value
= zval
.score
*src
[j
].weight
;
1562 zunionInterAggregate(&score
,value
,aggregate
);
1563 } else if (zuiFind(&src
[j
],&zval
,&value
)) {
1564 value
*= src
[j
].weight
;
1565 zunionInterAggregate(&score
,value
,aggregate
);
1571 /* Only continue when present in every input. */
1573 tmp
= zuiObjectFromValue(&zval
);
1574 znode
= zslInsert(dstzset
->zsl
,score
,tmp
);
1575 incrRefCount(tmp
); /* added to skiplist */
1576 dictAdd(dstzset
->dict
,tmp
,&znode
->score
);
1577 incrRefCount(tmp
); /* added to dictionary */
1579 if (tmp
->encoding
== REDIS_ENCODING_RAW
)
1580 if (sdslen(tmp
->ptr
) > maxelelen
)
1581 maxelelen
= sdslen(tmp
->ptr
);
1585 } else if (op
== REDIS_OP_UNION
) {
1586 for (i
= 0; i
< setnum
; i
++) {
1587 if (zuiLength(&src
[i
]) == 0)
1590 while (zuiNext(&src
[i
],&zval
)) {
1591 double score
, value
;
1593 /* Skip key when already processed */
1594 if (dictFind(dstzset
->dict
,zuiObjectFromValue(&zval
)) != NULL
)
1597 /* Initialize score */
1598 score
= src
[i
].weight
* zval
.score
;
1599 if (isnan(score
)) score
= 0;
1601 /* Because the inputs are sorted by size, it's only possible
1602 * for sets at larger indices to hold this element. */
1603 for (j
= (i
+1); j
< setnum
; j
++) {
1604 /* It is not safe to access the zset we are
1605 * iterating, so explicitly check for equal object. */
1606 if(src
[j
].subject
== src
[i
].subject
) {
1607 value
= zval
.score
*src
[j
].weight
;
1608 zunionInterAggregate(&score
,value
,aggregate
);
1609 } else if (zuiFind(&src
[j
],&zval
,&value
)) {
1610 value
*= src
[j
].weight
;
1611 zunionInterAggregate(&score
,value
,aggregate
);
1615 tmp
= zuiObjectFromValue(&zval
);
1616 znode
= zslInsert(dstzset
->zsl
,score
,tmp
);
1617 incrRefCount(zval
.ele
); /* added to skiplist */
1618 dictAdd(dstzset
->dict
,tmp
,&znode
->score
);
1619 incrRefCount(zval
.ele
); /* added to dictionary */
1621 if (tmp
->encoding
== REDIS_ENCODING_RAW
)
1622 if (sdslen(tmp
->ptr
) > maxelelen
)
1623 maxelelen
= sdslen(tmp
->ptr
);
1627 redisPanic("Unknown operator");
1630 for (i
= 0; i
< setnum
; i
++)
1631 zuiClearIterator(&src
[i
]);
1633 if (dbDelete(c
->db
,dstkey
)) {
1634 signalModifiedKey(c
->db
,dstkey
);
1638 if (dstzset
->zsl
->length
) {
1639 /* Convert to ziplist when in limits. */
1640 if (dstzset
->zsl
->length
<= server
.zset_max_ziplist_entries
&&
1641 maxelelen
<= server
.zset_max_ziplist_value
)
1642 zsetConvert(dstobj
,REDIS_ENCODING_ZIPLIST
);
1644 dbAdd(c
->db
,dstkey
,dstobj
);
1645 addReplyLongLong(c
,zsetLength(dstobj
));
1646 if (!touched
) signalModifiedKey(c
->db
,dstkey
);
1649 decrRefCount(dstobj
);
1650 addReply(c
,shared
.czero
);
1655 void zunionstoreCommand(redisClient
*c
) {
1656 zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_UNION
);
1659 void zinterstoreCommand(redisClient
*c
) {
1660 zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_INTER
);
1663 void zrangeGenericCommand(redisClient
*c
, int reverse
) {
1664 robj
*key
= c
->argv
[1];
1672 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
1673 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
1675 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
1677 } else if (c
->argc
>= 5) {
1678 addReply(c
,shared
.syntaxerr
);
1682 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
1683 || checkType(c
,zobj
,REDIS_ZSET
)) return;
1685 /* Sanitize indexes. */
1686 llen
= zsetLength(zobj
);
1687 if (start
< 0) start
= llen
+start
;
1688 if (end
< 0) end
= llen
+end
;
1689 if (start
< 0) start
= 0;
1691 /* Invariant: start >= 0, so this test will be true when end < 0.
1692 * The range is empty when start > end or start >= length. */
1693 if (start
> end
|| start
>= llen
) {
1694 addReply(c
,shared
.emptymultibulk
);
1697 if (end
>= llen
) end
= llen
-1;
1698 rangelen
= (end
-start
)+1;
1700 /* Return the result in form of a multi-bulk reply */
1701 addReplyMultiBulkLen(c
, withscores
? (rangelen
*2) : rangelen
);
1703 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1704 unsigned char *zl
= zobj
->ptr
;
1705 unsigned char *eptr
, *sptr
;
1706 unsigned char *vstr
;
1711 eptr
= ziplistIndex(zl
,-2-(2*start
));
1713 eptr
= ziplistIndex(zl
,2*start
);
1715 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
1716 sptr
= ziplistNext(zl
,eptr
);
1718 while (rangelen
--) {
1719 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
&& sptr
!= NULL
);
1720 redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
1722 addReplyBulkLongLong(c
,vlong
);
1724 addReplyBulkCBuffer(c
,vstr
,vlen
);
1727 addReplyDouble(c
,zzlGetScore(sptr
));
1730 zzlPrev(zl
,&eptr
,&sptr
);
1732 zzlNext(zl
,&eptr
,&sptr
);
1735 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1736 zset
*zs
= zobj
->ptr
;
1737 zskiplist
*zsl
= zs
->zsl
;
1741 /* Check if starting point is trivial, before doing log(N) lookup. */
1745 ln
= zslGetElementByRank(zsl
,llen
-start
);
1747 ln
= zsl
->header
->level
[0].forward
;
1749 ln
= zslGetElementByRank(zsl
,start
+1);
1753 redisAssertWithInfo(c
,zobj
,ln
!= NULL
);
1755 addReplyBulk(c
,ele
);
1757 addReplyDouble(c
,ln
->score
);
1758 ln
= reverse
? ln
->backward
: ln
->level
[0].forward
;
1761 redisPanic("Unknown sorted set encoding");
1765 void zrangeCommand(redisClient
*c
) {
1766 zrangeGenericCommand(c
,0);
1769 void zrevrangeCommand(redisClient
*c
) {
1770 zrangeGenericCommand(c
,1);
1773 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
1774 void genericZrangebyscoreCommand(redisClient
*c
, int reverse
) {
1776 robj
*key
= c
->argv
[1];
1778 long offset
= 0, limit
= -1;
1780 unsigned long rangelen
= 0;
1781 void *replylen
= NULL
;
1784 /* Parse the range arguments. */
1786 /* Range is given as [max,min] */
1787 maxidx
= 2; minidx
= 3;
1789 /* Range is given as [min,max] */
1790 minidx
= 2; maxidx
= 3;
1793 if (zslParseRange(c
->argv
[minidx
],c
->argv
[maxidx
],&range
) != REDIS_OK
) {
1794 addReplyError(c
,"min or max is not a float");
1798 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1799 * 4 arguments, so we'll never enter the following code path. */
1801 int remaining
= c
->argc
- 4;
1805 if (remaining
>= 1 && !strcasecmp(c
->argv
[pos
]->ptr
,"withscores")) {
1808 } else if (remaining
>= 3 && !strcasecmp(c
->argv
[pos
]->ptr
,"limit")) {
1809 if ((getLongFromObjectOrReply(c
, c
->argv
[pos
+1], &offset
, NULL
) != REDIS_OK
) ||
1810 (getLongFromObjectOrReply(c
, c
->argv
[pos
+2], &limit
, NULL
) != REDIS_OK
)) return;
1811 pos
+= 3; remaining
-= 3;
1813 addReply(c
,shared
.syntaxerr
);
1819 /* Ok, lookup the key and get the range */
1820 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
||
1821 checkType(c
,zobj
,REDIS_ZSET
)) return;
1823 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1824 unsigned char *zl
= zobj
->ptr
;
1825 unsigned char *eptr
, *sptr
;
1826 unsigned char *vstr
;
1831 /* If reversed, get the last node in range as starting point. */
1833 eptr
= zzlLastInRange(zl
,range
);
1835 eptr
= zzlFirstInRange(zl
,range
);
1838 /* No "first" element in the specified interval. */
1840 addReply(c
, shared
.emptymultibulk
);
1844 /* Get score pointer for the first element. */
1845 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
1846 sptr
= ziplistNext(zl
,eptr
);
1848 /* We don't know in advance how many matching elements there are in the
1849 * list, so we push this object that will represent the multi-bulk
1850 * length in the output buffer, and will "fix" it later */
1851 replylen
= addDeferredMultiBulkLength(c
);
1853 /* If there is an offset, just traverse the number of elements without
1854 * checking the score because that is done in the next loop. */
1855 while (eptr
&& offset
--) {
1857 zzlPrev(zl
,&eptr
,&sptr
);
1859 zzlNext(zl
,&eptr
,&sptr
);
1863 while (eptr
&& limit
--) {
1864 score
= zzlGetScore(sptr
);
1866 /* Abort when the node is no longer in range. */
1868 if (!zslValueGteMin(score
,&range
)) break;
1870 if (!zslValueLteMax(score
,&range
)) break;
1873 /* We know the element exists, so ziplistGet should always succeed */
1874 redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
1878 addReplyBulkLongLong(c
,vlong
);
1880 addReplyBulkCBuffer(c
,vstr
,vlen
);
1884 addReplyDouble(c
,score
);
1887 /* Move to next node */
1889 zzlPrev(zl
,&eptr
,&sptr
);
1891 zzlNext(zl
,&eptr
,&sptr
);
1894 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1895 zset
*zs
= zobj
->ptr
;
1896 zskiplist
*zsl
= zs
->zsl
;
1899 /* If reversed, get the last node in range as starting point. */
1901 ln
= zslLastInRange(zsl
,range
);
1903 ln
= zslFirstInRange(zsl
,range
);
1906 /* No "first" element in the specified interval. */
1908 addReply(c
, shared
.emptymultibulk
);
1912 /* We don't know in advance how many matching elements there are in the
1913 * list, so we push this object that will represent the multi-bulk
1914 * length in the output buffer, and will "fix" it later */
1915 replylen
= addDeferredMultiBulkLength(c
);
1917 /* If there is an offset, just traverse the number of elements without
1918 * checking the score because that is done in the next loop. */
1919 while (ln
&& offset
--) {
1923 ln
= ln
->level
[0].forward
;
1927 while (ln
&& limit
--) {
1928 /* Abort when the node is no longer in range. */
1930 if (!zslValueGteMin(ln
->score
,&range
)) break;
1932 if (!zslValueLteMax(ln
->score
,&range
)) break;
1936 addReplyBulk(c
,ln
->obj
);
1939 addReplyDouble(c
,ln
->score
);
1942 /* Move to next node */
1946 ln
= ln
->level
[0].forward
;
1950 redisPanic("Unknown sorted set encoding");
1957 setDeferredMultiBulkLength(c
, replylen
, rangelen
);
1960 void zrangebyscoreCommand(redisClient
*c
) {
1961 genericZrangebyscoreCommand(c
,0);
1964 void zrevrangebyscoreCommand(redisClient
*c
) {
1965 genericZrangebyscoreCommand(c
,1);
1968 void zcountCommand(redisClient
*c
) {
1969 robj
*key
= c
->argv
[1];
1974 /* Parse the range arguments */
1975 if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) {
1976 addReplyError(c
,"min or max is not a float");
1980 /* Lookup the sorted set */
1981 if ((zobj
= lookupKeyReadOrReply(c
, key
, shared
.czero
)) == NULL
||
1982 checkType(c
, zobj
, REDIS_ZSET
)) return;
1984 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1985 unsigned char *zl
= zobj
->ptr
;
1986 unsigned char *eptr
, *sptr
;
1989 /* Use the first element in range as the starting point */
1990 eptr
= zzlFirstInRange(zl
,range
);
1992 /* No "first" element */
1994 addReply(c
, shared
.czero
);
1998 /* First element is in range */
1999 sptr
= ziplistNext(zl
,eptr
);
2000 score
= zzlGetScore(sptr
);
2001 redisAssertWithInfo(c
,zobj
,zslValueLteMax(score
,&range
));
2003 /* Iterate over elements in range */
2005 score
= zzlGetScore(sptr
);
2007 /* Abort when the node is no longer in range. */
2008 if (!zslValueLteMax(score
,&range
)) {
2012 zzlNext(zl
,&eptr
,&sptr
);
2015 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2016 zset
*zs
= zobj
->ptr
;
2017 zskiplist
*zsl
= zs
->zsl
;
2021 /* Find first element in range */
2022 zn
= zslFirstInRange(zsl
, range
);
2024 /* Use rank of first element, if any, to determine preliminary count */
2026 rank
= zslGetRank(zsl
, zn
->score
, zn
->obj
);
2027 count
= (zsl
->length
- (rank
- 1));
2029 /* Find last element in range */
2030 zn
= zslLastInRange(zsl
, range
);
2032 /* Use rank of last element, if any, to determine the actual count */
2034 rank
= zslGetRank(zsl
, zn
->score
, zn
->obj
);
2035 count
-= (zsl
->length
- rank
);
2039 redisPanic("Unknown sorted set encoding");
2042 addReplyLongLong(c
, count
);
2045 void zcardCommand(redisClient
*c
) {
2046 robj
*key
= c
->argv
[1];
2049 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.czero
)) == NULL
||
2050 checkType(c
,zobj
,REDIS_ZSET
)) return;
2052 addReplyLongLong(c
,zsetLength(zobj
));
2055 void zscoreCommand(redisClient
*c
) {
2056 robj
*key
= c
->argv
[1];
2060 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL
||
2061 checkType(c
,zobj
,REDIS_ZSET
)) return;
2063 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
2064 if (zzlFind(zobj
->ptr
,c
->argv
[2],&score
) != NULL
)
2065 addReplyDouble(c
,score
);
2067 addReply(c
,shared
.nullbulk
);
2068 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2069 zset
*zs
= zobj
->ptr
;
2072 c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
2073 de
= dictFind(zs
->dict
,c
->argv
[2]);
2075 score
= *(double*)dictGetVal(de
);
2076 addReplyDouble(c
,score
);
2078 addReply(c
,shared
.nullbulk
);
2081 redisPanic("Unknown sorted set encoding");
2085 void zrankGenericCommand(redisClient
*c
, int reverse
) {
2086 robj
*key
= c
->argv
[1];
2087 robj
*ele
= c
->argv
[2];
2092 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL
||
2093 checkType(c
,zobj
,REDIS_ZSET
)) return;
2094 llen
= zsetLength(zobj
);
2096 redisAssertWithInfo(c
,ele
,ele
->encoding
== REDIS_ENCODING_RAW
);
2097 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
2098 unsigned char *zl
= zobj
->ptr
;
2099 unsigned char *eptr
, *sptr
;
2101 eptr
= ziplistIndex(zl
,0);
2102 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
2103 sptr
= ziplistNext(zl
,eptr
);
2104 redisAssertWithInfo(c
,zobj
,sptr
!= NULL
);
2107 while(eptr
!= NULL
) {
2108 if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
)))
2111 zzlNext(zl
,&eptr
,&sptr
);
2116 addReplyLongLong(c
,llen
-rank
);
2118 addReplyLongLong(c
,rank
-1);
2120 addReply(c
,shared
.nullbulk
);
2122 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2123 zset
*zs
= zobj
->ptr
;
2124 zskiplist
*zsl
= zs
->zsl
;
2128 ele
= c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
2129 de
= dictFind(zs
->dict
,ele
);
2131 score
= *(double*)dictGetVal(de
);
2132 rank
= zslGetRank(zsl
,score
,ele
);
2133 redisAssertWithInfo(c
,ele
,rank
); /* Existing elements always have a rank. */
2135 addReplyLongLong(c
,llen
-rank
);
2137 addReplyLongLong(c
,rank
-1);
2139 addReply(c
,shared
.nullbulk
);
2142 redisPanic("Unknown sorted set encoding");
2146 void zrankCommand(redisClient
*c
) {
2147 zrankGenericCommand(c
, 0);
2150 void zrevrankCommand(redisClient
*c
) {
2151 zrankGenericCommand(c
, 1);