5 /*-----------------------------------------------------------------------------
7 *----------------------------------------------------------------------------*/
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated scores.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
26 zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
27 zskiplistNode
*zn
= zmalloc(sizeof(*zn
)+level
*sizeof(struct zskiplistLevel
));
33 zskiplist
*zslCreate(void) {
37 zsl
= zmalloc(sizeof(*zsl
));
40 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
41 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++) {
42 zsl
->header
->level
[j
].forward
= NULL
;
43 zsl
->header
->level
[j
].span
= 0;
45 zsl
->header
->backward
= NULL
;
50 void zslFreeNode(zskiplistNode
*node
) {
51 decrRefCount(node
->obj
);
55 void zslFree(zskiplist
*zsl
) {
56 zskiplistNode
*node
= zsl
->header
->level
[0].forward
, *next
;
60 next
= node
->level
[0].forward
;
67 /* Returns a random level for the new skiplist node we are going to create.
68 * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
69 * (both inclusive), with a powerlaw-alike distribution where higher
70 * levels are less likely to be returned. */
71 int zslRandomLevel(void) {
73 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
75 return (level
<ZSKIPLIST_MAXLEVEL
) ? level
: ZSKIPLIST_MAXLEVEL
;
78 zskiplistNode
*zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
79 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
80 unsigned int rank
[ZSKIPLIST_MAXLEVEL
];
83 redisAssert(!isnan(score
));
85 for (i
= zsl
->level
-1; i
>= 0; i
--) {
86 /* store rank that is crossed to reach the insert position */
87 rank
[i
] = i
== (zsl
->level
-1) ? 0 : rank
[i
+1];
88 while (x
->level
[i
].forward
&&
89 (x
->level
[i
].forward
->score
< score
||
90 (x
->level
[i
].forward
->score
== score
&&
91 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) {
92 rank
[i
] += x
->level
[i
].span
;
93 x
= x
->level
[i
].forward
;
97 /* we assume the key is not already inside, since we allow duplicated
98 * scores, and the re-insertion of score and redis object should never
99 * happpen since the caller of zslInsert() should test in the hash table
100 * if the element is already inside or not. */
101 level
= zslRandomLevel();
102 if (level
> zsl
->level
) {
103 for (i
= zsl
->level
; i
< level
; i
++) {
105 update
[i
] = zsl
->header
;
106 update
[i
]->level
[i
].span
= zsl
->length
;
110 x
= zslCreateNode(level
,score
,obj
);
111 for (i
= 0; i
< level
; i
++) {
112 x
->level
[i
].forward
= update
[i
]->level
[i
].forward
;
113 update
[i
]->level
[i
].forward
= x
;
115 /* update span covered by update[i] as x is inserted here */
116 x
->level
[i
].span
= update
[i
]->level
[i
].span
- (rank
[0] - rank
[i
]);
117 update
[i
]->level
[i
].span
= (rank
[0] - rank
[i
]) + 1;
120 /* increment span for untouched levels */
121 for (i
= level
; i
< zsl
->level
; i
++) {
122 update
[i
]->level
[i
].span
++;
125 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
126 if (x
->level
[0].forward
)
127 x
->level
[0].forward
->backward
= x
;
134 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
135 void zslDeleteNode(zskiplist
*zsl
, zskiplistNode
*x
, zskiplistNode
**update
) {
137 for (i
= 0; i
< zsl
->level
; i
++) {
138 if (update
[i
]->level
[i
].forward
== x
) {
139 update
[i
]->level
[i
].span
+= x
->level
[i
].span
- 1;
140 update
[i
]->level
[i
].forward
= x
->level
[i
].forward
;
142 update
[i
]->level
[i
].span
-= 1;
145 if (x
->level
[0].forward
) {
146 x
->level
[0].forward
->backward
= x
->backward
;
148 zsl
->tail
= x
->backward
;
150 while(zsl
->level
> 1 && zsl
->header
->level
[zsl
->level
-1].forward
== NULL
)
155 /* Delete an element with matching score/object from the skiplist. */
156 int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
157 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
161 for (i
= zsl
->level
-1; i
>= 0; i
--) {
162 while (x
->level
[i
].forward
&&
163 (x
->level
[i
].forward
->score
< score
||
164 (x
->level
[i
].forward
->score
== score
&&
165 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0)))
166 x
= x
->level
[i
].forward
;
169 /* We may have multiple elements with the same score, what we need
170 * is to find the element with both the right score and object. */
171 x
= x
->level
[0].forward
;
172 if (x
&& score
== x
->score
&& equalStringObjects(x
->obj
,obj
)) {
173 zslDeleteNode(zsl
, x
, update
);
177 return 0; /* not found */
179 return 0; /* not found */
182 static int zslValueGteMin(double value
, zrangespec
*spec
) {
183 return spec
->minex
? (value
> spec
->min
) : (value
>= spec
->min
);
186 static int zslValueLteMax(double value
, zrangespec
*spec
) {
187 return spec
->maxex
? (value
< spec
->max
) : (value
<= spec
->max
);
190 /* Returns if there is a part of the zset is in range. */
191 int zslIsInRange(zskiplist
*zsl
, zrangespec
*range
) {
194 /* Test for ranges that will always be empty. */
195 if (range
->min
> range
->max
||
196 (range
->min
== range
->max
&& (range
->minex
|| range
->maxex
)))
199 if (x
== NULL
|| !zslValueGteMin(x
->score
,range
))
201 x
= zsl
->header
->level
[0].forward
;
202 if (x
== NULL
|| !zslValueLteMax(x
->score
,range
))
207 /* Find the first node that is contained in the specified range.
208 * Returns NULL when no element is contained in the range. */
209 zskiplistNode
*zslFirstInRange(zskiplist
*zsl
, zrangespec range
) {
213 /* If everything is out of range, return early. */
214 if (!zslIsInRange(zsl
,&range
)) return NULL
;
217 for (i
= zsl
->level
-1; i
>= 0; i
--) {
218 /* Go forward while *OUT* of range. */
219 while (x
->level
[i
].forward
&&
220 !zslValueGteMin(x
->level
[i
].forward
->score
,&range
))
221 x
= x
->level
[i
].forward
;
224 /* This is an inner range, so the next node cannot be NULL. */
225 x
= x
->level
[0].forward
;
226 redisAssert(x
!= NULL
);
228 /* Check if score <= max. */
229 if (!zslValueLteMax(x
->score
,&range
)) return NULL
;
233 /* Find the last node that is contained in the specified range.
234 * Returns NULL when no element is contained in the range. */
235 zskiplistNode
*zslLastInRange(zskiplist
*zsl
, zrangespec range
) {
239 /* If everything is out of range, return early. */
240 if (!zslIsInRange(zsl
,&range
)) return NULL
;
243 for (i
= zsl
->level
-1; i
>= 0; i
--) {
244 /* Go forward while *IN* range. */
245 while (x
->level
[i
].forward
&&
246 zslValueLteMax(x
->level
[i
].forward
->score
,&range
))
247 x
= x
->level
[i
].forward
;
250 /* This is an inner range, so this node cannot be NULL. */
251 redisAssert(x
!= NULL
);
253 /* Check if score >= min. */
254 if (!zslValueGteMin(x
->score
,&range
)) return NULL
;
258 /* Delete all the elements with score between min and max from the skiplist.
259 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
260 * Note that this function takes the reference to the hash table view of the
261 * sorted set, in order to remove the elements from the hash table too. */
262 unsigned long zslDeleteRangeByScore(zskiplist
*zsl
, zrangespec range
, dict
*dict
) {
263 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
264 unsigned long removed
= 0;
268 for (i
= zsl
->level
-1; i
>= 0; i
--) {
269 while (x
->level
[i
].forward
&& (range
.minex
?
270 x
->level
[i
].forward
->score
<= range
.min
:
271 x
->level
[i
].forward
->score
< range
.min
))
272 x
= x
->level
[i
].forward
;
276 /* Current node is the last with score < or <= min. */
277 x
= x
->level
[0].forward
;
279 /* Delete nodes while in range. */
280 while (x
&& (range
.maxex
? x
->score
< range
.max
: x
->score
<= range
.max
)) {
281 zskiplistNode
*next
= x
->level
[0].forward
;
282 zslDeleteNode(zsl
,x
,update
);
283 dictDelete(dict
,x
->obj
);
291 /* Delete all the elements with rank between start and end from the skiplist.
292 * Start and end are inclusive. Note that start and end need to be 1-based */
293 unsigned long zslDeleteRangeByRank(zskiplist
*zsl
, unsigned int start
, unsigned int end
, dict
*dict
) {
294 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
295 unsigned long traversed
= 0, removed
= 0;
299 for (i
= zsl
->level
-1; i
>= 0; i
--) {
300 while (x
->level
[i
].forward
&& (traversed
+ x
->level
[i
].span
) < start
) {
301 traversed
+= x
->level
[i
].span
;
302 x
= x
->level
[i
].forward
;
308 x
= x
->level
[0].forward
;
309 while (x
&& traversed
<= end
) {
310 zskiplistNode
*next
= x
->level
[0].forward
;
311 zslDeleteNode(zsl
,x
,update
);
312 dictDelete(dict
,x
->obj
);
321 /* Find the rank for an element by both score and key.
322 * Returns 0 when the element cannot be found, rank otherwise.
323 * Note that the rank is 1-based due to the span of zsl->header to the
325 unsigned long zslGetRank(zskiplist
*zsl
, double score
, robj
*o
) {
327 unsigned long rank
= 0;
331 for (i
= zsl
->level
-1; i
>= 0; i
--) {
332 while (x
->level
[i
].forward
&&
333 (x
->level
[i
].forward
->score
< score
||
334 (x
->level
[i
].forward
->score
== score
&&
335 compareStringObjects(x
->level
[i
].forward
->obj
,o
) <= 0))) {
336 rank
+= x
->level
[i
].span
;
337 x
= x
->level
[i
].forward
;
340 /* x might be equal to zsl->header, so test if obj is non-NULL */
341 if (x
->obj
&& equalStringObjects(x
->obj
,o
)) {
348 /* Finds an element by its rank. The rank argument needs to be 1-based. */
349 zskiplistNode
* zslGetElementByRank(zskiplist
*zsl
, unsigned long rank
) {
351 unsigned long traversed
= 0;
355 for (i
= zsl
->level
-1; i
>= 0; i
--) {
356 while (x
->level
[i
].forward
&& (traversed
+ x
->level
[i
].span
) <= rank
)
358 traversed
+= x
->level
[i
].span
;
359 x
= x
->level
[i
].forward
;
361 if (traversed
== rank
) {
368 /* Populate the rangespec according to the objects min and max. */
369 static int zslParseRange(robj
*min
, robj
*max
, zrangespec
*spec
) {
371 spec
->minex
= spec
->maxex
= 0;
373 /* Parse the min-max interval. If one of the values is prefixed
374 * by the "(" character, it's considered "open". For instance
375 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
376 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
377 if (min
->encoding
== REDIS_ENCODING_INT
) {
378 spec
->min
= (long)min
->ptr
;
380 if (((char*)min
->ptr
)[0] == '(') {
381 spec
->min
= strtod((char*)min
->ptr
+1,&eptr
);
382 if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
;
385 spec
->min
= strtod((char*)min
->ptr
,&eptr
);
386 if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
;
389 if (max
->encoding
== REDIS_ENCODING_INT
) {
390 spec
->max
= (long)max
->ptr
;
392 if (((char*)max
->ptr
)[0] == '(') {
393 spec
->max
= strtod((char*)max
->ptr
+1,&eptr
);
394 if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
;
397 spec
->max
= strtod((char*)max
->ptr
,&eptr
);
398 if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
;
405 /*-----------------------------------------------------------------------------
406 * Ziplist-backed sorted set API
407 *----------------------------------------------------------------------------*/
409 double zzlGetScore(unsigned char *sptr
) {
416 redisAssert(sptr
!= NULL
);
417 redisAssert(ziplistGet(sptr
,&vstr
,&vlen
,&vlong
));
420 memcpy(buf
,vstr
,vlen
);
422 score
= strtod(buf
,NULL
);
430 /* Compare element in sorted set with given element. */
431 int zzlCompareElements(unsigned char *eptr
, unsigned char *cstr
, unsigned int clen
) {
435 unsigned char vbuf
[32];
438 redisAssert(ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
440 /* Store string representation of long long in buf. */
441 vlen
= ll2string((char*)vbuf
,sizeof(vbuf
),vlong
);
445 minlen
= (vlen
< clen
) ? vlen
: clen
;
446 cmp
= memcmp(vstr
,cstr
,minlen
);
447 if (cmp
== 0) return vlen
-clen
;
451 unsigned int zzlLength(unsigned char *zl
) {
452 return ziplistLen(zl
)/2;
455 /* Move to next entry based on the values in eptr and sptr. Both are set to
456 * NULL when there is no next entry. */
457 void zzlNext(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) {
458 unsigned char *_eptr
, *_sptr
;
459 redisAssert(*eptr
!= NULL
&& *sptr
!= NULL
);
461 _eptr
= ziplistNext(zl
,*sptr
);
463 _sptr
= ziplistNext(zl
,_eptr
);
464 redisAssert(_sptr
!= NULL
);
474 /* Move to the previous entry based on the values in eptr and sptr. Both are
475 * set to NULL when there is no next entry. */
476 void zzlPrev(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) {
477 unsigned char *_eptr
, *_sptr
;
478 redisAssert(*eptr
!= NULL
&& *sptr
!= NULL
);
480 _sptr
= ziplistPrev(zl
,*eptr
);
482 _eptr
= ziplistPrev(zl
,_sptr
);
483 redisAssert(_eptr
!= NULL
);
485 /* No previous entry. */
493 /* Returns if there is a part of the zset is in range. Should only be used
494 * internally by zzlFirstInRange and zzlLastInRange. */
495 int zzlIsInRange(unsigned char *zl
, zrangespec
*range
) {
499 /* Test for ranges that will always be empty. */
500 if (range
->min
> range
->max
||
501 (range
->min
== range
->max
&& (range
->minex
|| range
->maxex
)))
504 p
= ziplistIndex(zl
,-1); /* Last score. */
505 if (p
== NULL
) return 0; /* Empty sorted set */
506 score
= zzlGetScore(p
);
507 if (!zslValueGteMin(score
,range
))
510 p
= ziplistIndex(zl
,1); /* First score. */
511 redisAssert(p
!= NULL
);
512 score
= zzlGetScore(p
);
513 if (!zslValueLteMax(score
,range
))
519 /* Find pointer to the first element contained in the specified range.
520 * Returns NULL when no element is contained in the range. */
521 unsigned char *zzlFirstInRange(unsigned char *zl
, zrangespec range
) {
522 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
525 /* If everything is out of range, return early. */
526 if (!zzlIsInRange(zl
,&range
)) return NULL
;
528 while (eptr
!= NULL
) {
529 sptr
= ziplistNext(zl
,eptr
);
530 redisAssert(sptr
!= NULL
);
532 score
= zzlGetScore(sptr
);
533 if (zslValueGteMin(score
,&range
)) {
534 /* Check if score <= max. */
535 if (zslValueLteMax(score
,&range
))
540 /* Move to next element. */
541 eptr
= ziplistNext(zl
,sptr
);
547 /* Find pointer to the last element contained in the specified range.
548 * Returns NULL when no element is contained in the range. */
549 unsigned char *zzlLastInRange(unsigned char *zl
, zrangespec range
) {
550 unsigned char *eptr
= ziplistIndex(zl
,-2), *sptr
;
553 /* If everything is out of range, return early. */
554 if (!zzlIsInRange(zl
,&range
)) return NULL
;
556 while (eptr
!= NULL
) {
557 sptr
= ziplistNext(zl
,eptr
);
558 redisAssert(sptr
!= NULL
);
560 score
= zzlGetScore(sptr
);
561 if (zslValueLteMax(score
,&range
)) {
562 /* Check if score >= min. */
563 if (zslValueGteMin(score
,&range
))
568 /* Move to previous element by moving to the score of previous element.
569 * When this returns NULL, we know there also is no element. */
570 sptr
= ziplistPrev(zl
,eptr
);
572 redisAssert((eptr
= ziplistPrev(zl
,sptr
)) != NULL
);
580 unsigned char *zzlFind(unsigned char *zl
, robj
*ele
, double *score
) {
581 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
583 ele
= getDecodedObject(ele
);
584 while (eptr
!= NULL
) {
585 sptr
= ziplistNext(zl
,eptr
);
586 redisAssertWithInfo(NULL
,ele
,sptr
!= NULL
);
588 if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
))) {
589 /* Matching element, pull out score. */
590 if (score
!= NULL
) *score
= zzlGetScore(sptr
);
595 /* Move to next element. */
596 eptr
= ziplistNext(zl
,sptr
);
603 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
604 * don't want to modify the one given as argument. */
605 unsigned char *zzlDelete(unsigned char *zl
, unsigned char *eptr
) {
606 unsigned char *p
= eptr
;
608 /* TODO: add function to ziplist API to delete N elements from offset. */
609 zl
= ziplistDelete(zl
,&p
);
610 zl
= ziplistDelete(zl
,&p
);
614 unsigned char *zzlInsertAt(unsigned char *zl
, unsigned char *eptr
, robj
*ele
, double score
) {
620 redisAssertWithInfo(NULL
,ele
,ele
->encoding
== REDIS_ENCODING_RAW
);
621 scorelen
= d2string(scorebuf
,sizeof(scorebuf
),score
);
623 zl
= ziplistPush(zl
,ele
->ptr
,sdslen(ele
->ptr
),ZIPLIST_TAIL
);
624 zl
= ziplistPush(zl
,(unsigned char*)scorebuf
,scorelen
,ZIPLIST_TAIL
);
626 /* Keep offset relative to zl, as it might be re-allocated. */
628 zl
= ziplistInsert(zl
,eptr
,ele
->ptr
,sdslen(ele
->ptr
));
631 /* Insert score after the element. */
632 redisAssertWithInfo(NULL
,ele
,(sptr
= ziplistNext(zl
,eptr
)) != NULL
);
633 zl
= ziplistInsert(zl
,sptr
,(unsigned char*)scorebuf
,scorelen
);
639 /* Insert (element,score) pair in ziplist. This function assumes the element is
640 * not yet present in the list. */
641 unsigned char *zzlInsert(unsigned char *zl
, robj
*ele
, double score
) {
642 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
645 ele
= getDecodedObject(ele
);
646 while (eptr
!= NULL
) {
647 sptr
= ziplistNext(zl
,eptr
);
648 redisAssertWithInfo(NULL
,ele
,sptr
!= NULL
);
649 s
= zzlGetScore(sptr
);
652 /* First element with score larger than score for element to be
653 * inserted. This means we should take its spot in the list to
654 * maintain ordering. */
655 zl
= zzlInsertAt(zl
,eptr
,ele
,score
);
657 } else if (s
== score
) {
658 /* Ensure lexicographical ordering for elements. */
659 if (zzlCompareElements(eptr
,ele
->ptr
,sdslen(ele
->ptr
)) > 0) {
660 zl
= zzlInsertAt(zl
,eptr
,ele
,score
);
665 /* Move to next element. */
666 eptr
= ziplistNext(zl
,sptr
);
669 /* Push on tail of list when it was not yet inserted. */
671 zl
= zzlInsertAt(zl
,NULL
,ele
,score
);
677 unsigned char *zzlDeleteRangeByScore(unsigned char *zl
, zrangespec range
, unsigned long *deleted
) {
678 unsigned char *eptr
, *sptr
;
680 unsigned long num
= 0;
682 if (deleted
!= NULL
) *deleted
= 0;
684 eptr
= zzlFirstInRange(zl
,range
);
685 if (eptr
== NULL
) return zl
;
687 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
688 * byte and ziplistNext will return NULL. */
689 while ((sptr
= ziplistNext(zl
,eptr
)) != NULL
) {
690 score
= zzlGetScore(sptr
);
691 if (zslValueLteMax(score
,&range
)) {
692 /* Delete both the element and the score. */
693 zl
= ziplistDelete(zl
,&eptr
);
694 zl
= ziplistDelete(zl
,&eptr
);
697 /* No longer in range. */
702 if (deleted
!= NULL
) *deleted
= num
;
706 /* Delete all the elements with rank between start and end from the skiplist.
707 * Start and end are inclusive. Note that start and end need to be 1-based */
708 unsigned char *zzlDeleteRangeByRank(unsigned char *zl
, unsigned int start
, unsigned int end
, unsigned long *deleted
) {
709 unsigned int num
= (end
-start
)+1;
710 if (deleted
) *deleted
= num
;
711 zl
= ziplistDeleteRange(zl
,2*(start
-1),2*num
);
715 /*-----------------------------------------------------------------------------
716 * Common sorted set API
717 *----------------------------------------------------------------------------*/
719 unsigned int zsetLength(robj
*zobj
) {
721 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
722 length
= zzlLength(zobj
->ptr
);
723 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
724 length
= ((zset
*)zobj
->ptr
)->zsl
->length
;
726 redisPanic("Unknown sorted set encoding");
731 void zsetConvert(robj
*zobj
, int encoding
) {
733 zskiplistNode
*node
, *next
;
737 if (zobj
->encoding
== encoding
) return;
738 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
739 unsigned char *zl
= zobj
->ptr
;
740 unsigned char *eptr
, *sptr
;
745 if (encoding
!= REDIS_ENCODING_SKIPLIST
)
746 redisPanic("Unknown target encoding");
748 zs
= zmalloc(sizeof(*zs
));
749 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
750 zs
->zsl
= zslCreate();
752 eptr
= ziplistIndex(zl
,0);
753 redisAssertWithInfo(NULL
,zobj
,eptr
!= NULL
);
754 sptr
= ziplistNext(zl
,eptr
);
755 redisAssertWithInfo(NULL
,zobj
,sptr
!= NULL
);
757 while (eptr
!= NULL
) {
758 score
= zzlGetScore(sptr
);
759 redisAssertWithInfo(NULL
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
761 ele
= createStringObjectFromLongLong(vlong
);
763 ele
= createStringObject((char*)vstr
,vlen
);
765 /* Has incremented refcount since it was just created. */
766 node
= zslInsert(zs
->zsl
,score
,ele
);
767 redisAssertWithInfo(NULL
,zobj
,dictAdd(zs
->dict
,ele
,&node
->score
) == DICT_OK
);
768 incrRefCount(ele
); /* Added to dictionary. */
769 zzlNext(zl
,&eptr
,&sptr
);
774 zobj
->encoding
= REDIS_ENCODING_SKIPLIST
;
775 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
776 unsigned char *zl
= ziplistNew();
778 if (encoding
!= REDIS_ENCODING_ZIPLIST
)
779 redisPanic("Unknown target encoding");
781 /* Approach similar to zslFree(), since we want to free the skiplist at
782 * the same time as creating the ziplist. */
784 dictRelease(zs
->dict
);
785 node
= zs
->zsl
->header
->level
[0].forward
;
786 zfree(zs
->zsl
->header
);
790 ele
= getDecodedObject(node
->obj
);
791 zl
= zzlInsertAt(zl
,NULL
,ele
,node
->score
);
794 next
= node
->level
[0].forward
;
801 zobj
->encoding
= REDIS_ENCODING_ZIPLIST
;
803 redisPanic("Unknown sorted set encoding");
807 /*-----------------------------------------------------------------------------
808 * Sorted set commands
809 *----------------------------------------------------------------------------*/
811 /* This generic command implements both ZADD and ZINCRBY. */
812 void zaddGenericCommand(redisClient
*c
, int incr
) {
813 static char *nanerr
= "resulting score is not a number (NaN)";
814 robj
*key
= c
->argv
[1];
818 double score
= 0, *scores
, curscore
= 0.0;
819 int j
, elements
= (c
->argc
-2)/2;
823 addReply(c
,shared
.syntaxerr
);
827 /* Start parsing all the scores, we need to emit any syntax error
828 * before executing additions to the sorted set, as the command should
829 * either execute fully or nothing at all. */
830 scores
= zmalloc(sizeof(double)*elements
);
831 for (j
= 0; j
< elements
; j
++) {
832 if (getDoubleFromObjectOrReply(c
,c
->argv
[2+j
*2],&scores
[j
],NULL
)
840 /* Lookup the key and create the sorted set if does not exist. */
841 zobj
= lookupKeyWrite(c
->db
,key
);
843 if (server
.zset_max_ziplist_entries
== 0 ||
844 server
.zset_max_ziplist_value
< sdslen(c
->argv
[3]->ptr
))
846 zobj
= createZsetObject();
848 zobj
= createZsetZiplistObject();
850 dbAdd(c
->db
,key
,zobj
);
852 if (zobj
->type
!= REDIS_ZSET
) {
853 addReply(c
,shared
.wrongtypeerr
);
859 for (j
= 0; j
< elements
; j
++) {
862 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
865 /* Prefer non-encoded element when dealing with ziplists. */
866 ele
= c
->argv
[3+j
*2];
867 if ((eptr
= zzlFind(zobj
->ptr
,ele
,&curscore
)) != NULL
) {
871 addReplyError(c
,nanerr
);
872 /* Don't need to check if the sorted set is empty
873 * because we know it has at least one element. */
879 /* Remove and re-insert when score changed. */
880 if (score
!= curscore
) {
881 zobj
->ptr
= zzlDelete(zobj
->ptr
,eptr
);
882 zobj
->ptr
= zzlInsert(zobj
->ptr
,ele
,score
);
884 signalModifiedKey(c
->db
,key
);
888 /* Optimize: check if the element is too large or the list
889 * becomes too long *before* executing zzlInsert. */
890 zobj
->ptr
= zzlInsert(zobj
->ptr
,ele
,score
);
891 if (zzlLength(zobj
->ptr
) > server
.zset_max_ziplist_entries
)
892 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
);
893 if (sdslen(ele
->ptr
) > server
.zset_max_ziplist_value
)
894 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
);
896 signalModifiedKey(c
->db
,key
);
900 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
901 zset
*zs
= zobj
->ptr
;
902 zskiplistNode
*znode
;
905 ele
= c
->argv
[3+j
*2] = tryObjectEncoding(c
->argv
[3+j
*2]);
906 de
= dictFind(zs
->dict
,ele
);
908 curobj
= dictGetKey(de
);
909 curscore
= *(double*)dictGetVal(de
);
914 addReplyError(c
,nanerr
);
915 /* Don't need to check if the sorted set is empty
916 * because we know it has at least one element. */
922 /* Remove and re-insert when score changed. We can safely
923 * delete the key object from the skiplist, since the
924 * dictionary still has a reference to it. */
925 if (score
!= curscore
) {
926 redisAssertWithInfo(c
,curobj
,zslDelete(zs
->zsl
,curscore
,curobj
));
927 znode
= zslInsert(zs
->zsl
,score
,curobj
);
928 incrRefCount(curobj
); /* Re-inserted in skiplist. */
929 dictGetVal(de
) = &znode
->score
; /* Update score ptr. */
931 signalModifiedKey(c
->db
,key
);
935 znode
= zslInsert(zs
->zsl
,score
,ele
);
936 incrRefCount(ele
); /* Inserted in skiplist. */
937 redisAssertWithInfo(c
,NULL
,dictAdd(zs
->dict
,ele
,&znode
->score
) == DICT_OK
);
938 incrRefCount(ele
); /* Added to dictionary. */
940 signalModifiedKey(c
->db
,key
);
945 redisPanic("Unknown sorted set encoding");
949 if (incr
) /* ZINCRBY */
950 addReplyDouble(c
,score
);
952 addReplyLongLong(c
,added
);
955 void zaddCommand(redisClient
*c
) {
956 zaddGenericCommand(c
,0);
959 void zincrbyCommand(redisClient
*c
) {
960 zaddGenericCommand(c
,1);
963 void zremCommand(redisClient
*c
) {
964 robj
*key
= c
->argv
[1];
968 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
969 checkType(c
,zobj
,REDIS_ZSET
)) return;
971 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
974 for (j
= 2; j
< c
->argc
; j
++) {
975 if ((eptr
= zzlFind(zobj
->ptr
,c
->argv
[j
],NULL
)) != NULL
) {
977 zobj
->ptr
= zzlDelete(zobj
->ptr
,eptr
);
978 if (zzlLength(zobj
->ptr
) == 0) {
984 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
985 zset
*zs
= zobj
->ptr
;
989 for (j
= 2; j
< c
->argc
; j
++) {
990 de
= dictFind(zs
->dict
,c
->argv
[j
]);
994 /* Delete from the skiplist */
995 score
= *(double*)dictGetVal(de
);
996 redisAssertWithInfo(c
,c
->argv
[j
],zslDelete(zs
->zsl
,score
,c
->argv
[j
]));
998 /* Delete from the hash table */
999 dictDelete(zs
->dict
,c
->argv
[j
]);
1000 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1001 if (dictSize(zs
->dict
) == 0) {
1002 dbDelete(c
->db
,key
);
1008 redisPanic("Unknown sorted set encoding");
1012 signalModifiedKey(c
->db
,key
);
1013 server
.dirty
+= deleted
;
1015 addReplyLongLong(c
,deleted
);
1018 void zremrangebyscoreCommand(redisClient
*c
) {
1019 robj
*key
= c
->argv
[1];
1022 unsigned long deleted
;
1024 /* Parse the range arguments. */
1025 if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) {
1026 addReplyError(c
,"min or max is not a float");
1030 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
1031 checkType(c
,zobj
,REDIS_ZSET
)) return;
1033 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1034 zobj
->ptr
= zzlDeleteRangeByScore(zobj
->ptr
,range
,&deleted
);
1035 if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
);
1036 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1037 zset
*zs
= zobj
->ptr
;
1038 deleted
= zslDeleteRangeByScore(zs
->zsl
,range
,zs
->dict
);
1039 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1040 if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
);
1042 redisPanic("Unknown sorted set encoding");
1045 if (deleted
) signalModifiedKey(c
->db
,key
);
1046 server
.dirty
+= deleted
;
1047 addReplyLongLong(c
,deleted
);
1050 void zremrangebyrankCommand(redisClient
*c
) {
1051 robj
*key
= c
->argv
[1];
1056 unsigned long deleted
;
1058 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
1059 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
1061 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
1062 checkType(c
,zobj
,REDIS_ZSET
)) return;
1064 /* Sanitize indexes. */
1065 llen
= zsetLength(zobj
);
1066 if (start
< 0) start
= llen
+start
;
1067 if (end
< 0) end
= llen
+end
;
1068 if (start
< 0) start
= 0;
1070 /* Invariant: start >= 0, so this test will be true when end < 0.
1071 * The range is empty when start > end or start >= length. */
1072 if (start
> end
|| start
>= llen
) {
1073 addReply(c
,shared
.czero
);
1076 if (end
>= llen
) end
= llen
-1;
1078 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1079 /* Correct for 1-based rank. */
1080 zobj
->ptr
= zzlDeleteRangeByRank(zobj
->ptr
,start
+1,end
+1,&deleted
);
1081 if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
);
1082 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1083 zset
*zs
= zobj
->ptr
;
1085 /* Correct for 1-based rank. */
1086 deleted
= zslDeleteRangeByRank(zs
->zsl
,start
+1,end
+1,zs
->dict
);
1087 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1088 if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
);
1090 redisPanic("Unknown sorted set encoding");
1093 if (deleted
) signalModifiedKey(c
->db
,key
);
1094 server
.dirty
+= deleted
;
1095 addReplyLongLong(c
,deleted
);
1100 int type
; /* Set, sorted set */
1105 /* Set iterators. */
1118 /* Sorted set iterators. */
1122 unsigned char *eptr
, *sptr
;
1126 zskiplistNode
*node
;
1133 /* Use dirty flags for pointers that need to be cleaned up in the next
1134 * iteration over the zsetopval. The dirty flag for the long long value is
1135 * special, since long long values don't need cleanup. Instead, it means that
1136 * we already checked that "ell" holds a long long, or tried to convert another
1137 * representation into a long long value. When this was successful,
1138 * OPVAL_VALID_LL is set as well. */
1139 #define OPVAL_DIRTY_ROBJ 1
1140 #define OPVAL_DIRTY_LL 2
1141 #define OPVAL_VALID_LL 4
1143 /* Store value retrieved from the iterator. */
1146 unsigned char _buf
[32]; /* Private buffer. */
1148 unsigned char *estr
;
1154 typedef union _iterset iterset
;
1155 typedef union _iterzset iterzset
;
1157 void zuiInitIterator(zsetopsrc
*op
) {
1158 if (op
->subject
== NULL
)
1161 if (op
->type
== REDIS_SET
) {
1162 iterset
*it
= &op
->iter
.set
;
1163 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1164 it
->is
.is
= op
->subject
->ptr
;
1166 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1167 it
->ht
.dict
= op
->subject
->ptr
;
1168 it
->ht
.di
= dictGetIterator(op
->subject
->ptr
);
1169 it
->ht
.de
= dictNext(it
->ht
.di
);
1171 redisPanic("Unknown set encoding");
1173 } else if (op
->type
== REDIS_ZSET
) {
1174 iterzset
*it
= &op
->iter
.zset
;
1175 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1176 it
->zl
.zl
= op
->subject
->ptr
;
1177 it
->zl
.eptr
= ziplistIndex(it
->zl
.zl
,0);
1178 if (it
->zl
.eptr
!= NULL
) {
1179 it
->zl
.sptr
= ziplistNext(it
->zl
.zl
,it
->zl
.eptr
);
1180 redisAssert(it
->zl
.sptr
!= NULL
);
1182 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1183 it
->sl
.zs
= op
->subject
->ptr
;
1184 it
->sl
.node
= it
->sl
.zs
->zsl
->header
->level
[0].forward
;
1186 redisPanic("Unknown sorted set encoding");
1189 redisPanic("Unsupported type");
1193 void zuiClearIterator(zsetopsrc
*op
) {
1194 if (op
->subject
== NULL
)
1197 if (op
->type
== REDIS_SET
) {
1198 iterset
*it
= &op
->iter
.set
;
1199 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1200 REDIS_NOTUSED(it
); /* skip */
1201 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1202 dictReleaseIterator(it
->ht
.di
);
1204 redisPanic("Unknown set encoding");
1206 } else if (op
->type
== REDIS_ZSET
) {
1207 iterzset
*it
= &op
->iter
.zset
;
1208 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1209 REDIS_NOTUSED(it
); /* skip */
1210 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1211 REDIS_NOTUSED(it
); /* skip */
1213 redisPanic("Unknown sorted set encoding");
1216 redisPanic("Unsupported type");
1220 int zuiLength(zsetopsrc
*op
) {
1221 if (op
->subject
== NULL
)
1224 if (op
->type
== REDIS_SET
) {
1225 iterset
*it
= &op
->iter
.set
;
1226 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1227 return intsetLen(it
->is
.is
);
1228 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1229 return dictSize(it
->ht
.dict
);
1231 redisPanic("Unknown set encoding");
1233 } else if (op
->type
== REDIS_ZSET
) {
1234 iterzset
*it
= &op
->iter
.zset
;
1235 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1236 return zzlLength(it
->zl
.zl
);
1237 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1238 return it
->sl
.zs
->zsl
->length
;
1240 redisPanic("Unknown sorted set encoding");
1243 redisPanic("Unsupported type");
1247 /* Check if the current value is valid. If so, store it in the passed structure
1248 * and move to the next element. If not valid, this means we have reached the
1249 * end of the structure and can abort. */
1250 int zuiNext(zsetopsrc
*op
, zsetopval
*val
) {
1251 if (op
->subject
== NULL
)
1254 if (val
->flags
& OPVAL_DIRTY_ROBJ
)
1255 decrRefCount(val
->ele
);
1257 memset(val
,0,sizeof(zsetopval
));
1259 if (op
->type
== REDIS_SET
) {
1260 iterset
*it
= &op
->iter
.set
;
1261 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1262 int64_t ell
= val
->ell
;
1264 if (!intsetGet(it
->is
.is
,it
->is
.ii
,&ell
))
1268 /* Move to next element. */
1270 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1271 if (it
->ht
.de
== NULL
)
1273 val
->ele
= dictGetKey(it
->ht
.de
);
1276 /* Move to next element. */
1277 it
->ht
.de
= dictNext(it
->ht
.di
);
1279 redisPanic("Unknown set encoding");
1281 } else if (op
->type
== REDIS_ZSET
) {
1282 iterzset
*it
= &op
->iter
.zset
;
1283 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1284 /* No need to check both, but better be explicit. */
1285 if (it
->zl
.eptr
== NULL
|| it
->zl
.sptr
== NULL
)
1287 redisAssert(ziplistGet(it
->zl
.eptr
,&val
->estr
,&val
->elen
,&val
->ell
));
1288 val
->score
= zzlGetScore(it
->zl
.sptr
);
1290 /* Move to next element. */
1291 zzlNext(it
->zl
.zl
,&it
->zl
.eptr
,&it
->zl
.sptr
);
1292 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1293 if (it
->sl
.node
== NULL
)
1295 val
->ele
= it
->sl
.node
->obj
;
1296 val
->score
= it
->sl
.node
->score
;
1298 /* Move to next element. */
1299 it
->sl
.node
= it
->sl
.node
->level
[0].forward
;
1301 redisPanic("Unknown sorted set encoding");
1304 redisPanic("Unsupported type");
1309 int zuiLongLongFromValue(zsetopval
*val
) {
1310 if (!(val
->flags
& OPVAL_DIRTY_LL
)) {
1311 val
->flags
|= OPVAL_DIRTY_LL
;
1313 if (val
->ele
!= NULL
) {
1314 if (val
->ele
->encoding
== REDIS_ENCODING_INT
) {
1315 val
->ell
= (long)val
->ele
->ptr
;
1316 val
->flags
|= OPVAL_VALID_LL
;
1317 } else if (val
->ele
->encoding
== REDIS_ENCODING_RAW
) {
1318 if (string2ll(val
->ele
->ptr
,sdslen(val
->ele
->ptr
),&val
->ell
))
1319 val
->flags
|= OPVAL_VALID_LL
;
1321 redisPanic("Unsupported element encoding");
1323 } else if (val
->estr
!= NULL
) {
1324 if (string2ll((char*)val
->estr
,val
->elen
,&val
->ell
))
1325 val
->flags
|= OPVAL_VALID_LL
;
1327 /* The long long was already set, flag as valid. */
1328 val
->flags
|= OPVAL_VALID_LL
;
1331 return val
->flags
& OPVAL_VALID_LL
;
1334 robj
*zuiObjectFromValue(zsetopval
*val
) {
1335 if (val
->ele
== NULL
) {
1336 if (val
->estr
!= NULL
) {
1337 val
->ele
= createStringObject((char*)val
->estr
,val
->elen
);
1339 val
->ele
= createStringObjectFromLongLong(val
->ell
);
1341 val
->flags
|= OPVAL_DIRTY_ROBJ
;
1346 int zuiBufferFromValue(zsetopval
*val
) {
1347 if (val
->estr
== NULL
) {
1348 if (val
->ele
!= NULL
) {
1349 if (val
->ele
->encoding
== REDIS_ENCODING_INT
) {
1350 val
->elen
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),(long)val
->ele
->ptr
);
1351 val
->estr
= val
->_buf
;
1352 } else if (val
->ele
->encoding
== REDIS_ENCODING_RAW
) {
1353 val
->elen
= sdslen(val
->ele
->ptr
);
1354 val
->estr
= val
->ele
->ptr
;
1356 redisPanic("Unsupported element encoding");
1359 val
->elen
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),val
->ell
);
1360 val
->estr
= val
->_buf
;
1366 /* Find value pointed to by val in the source pointer to by op. When found,
1367 * return 1 and store its score in target. Return 0 otherwise. */
1368 int zuiFind(zsetopsrc
*op
, zsetopval
*val
, double *score
) {
1369 if (op
->subject
== NULL
)
1372 if (op
->type
== REDIS_SET
) {
1373 iterset
*it
= &op
->iter
.set
;
1375 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1376 if (zuiLongLongFromValue(val
) && intsetFind(it
->is
.is
,val
->ell
)) {
1382 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1383 zuiObjectFromValue(val
);
1384 if (dictFind(it
->ht
.dict
,val
->ele
) != NULL
) {
1391 redisPanic("Unknown set encoding");
1393 } else if (op
->type
== REDIS_ZSET
) {
1394 iterzset
*it
= &op
->iter
.zset
;
1395 zuiObjectFromValue(val
);
1397 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1398 if (zzlFind(it
->zl
.zl
,val
->ele
,score
) != NULL
) {
1399 /* Score is already set by zzlFind. */
1404 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1406 if ((de
= dictFind(it
->sl
.zs
->dict
,val
->ele
)) != NULL
) {
1407 *score
= *(double*)dictGetVal(de
);
1413 redisPanic("Unknown sorted set encoding");
1416 redisPanic("Unsupported type");
1420 int zuiCompareByCardinality(const void *s1
, const void *s2
) {
1421 return zuiLength((zsetopsrc
*)s1
) - zuiLength((zsetopsrc
*)s2
);
1424 #define REDIS_AGGR_SUM 1
1425 #define REDIS_AGGR_MIN 2
1426 #define REDIS_AGGR_MAX 3
1427 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
1429 inline static void zunionInterAggregate(double *target
, double val
, int aggregate
) {
1430 if (aggregate
== REDIS_AGGR_SUM
) {
1431 *target
= *target
+ val
;
1432 /* The result of adding two doubles is NaN when one variable
1433 * is +inf and the other is -inf. When these numbers are added,
1434 * we maintain the convention of the result being 0.0. */
1435 if (isnan(*target
)) *target
= 0.0;
1436 } else if (aggregate
== REDIS_AGGR_MIN
) {
1437 *target
= val
< *target
? val
: *target
;
1438 } else if (aggregate
== REDIS_AGGR_MAX
) {
1439 *target
= val
> *target
? val
: *target
;
1442 redisPanic("Unknown ZUNION/INTER aggregate type");
1446 void zunionInterGenericCommand(redisClient
*c
, robj
*dstkey
, int op
) {
1449 int aggregate
= REDIS_AGGR_SUM
;
1453 unsigned int maxelelen
= 0;
1456 zskiplistNode
*znode
;
1459 /* expect setnum input keys to be given */
1460 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &setnum
, NULL
) != REDIS_OK
))
1465 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1469 /* test if the expected number of keys would overflow */
1470 if (3+setnum
> c
->argc
) {
1471 addReply(c
,shared
.syntaxerr
);
1475 /* read keys to be used for input */
1476 src
= zcalloc(sizeof(zsetopsrc
) * setnum
);
1477 for (i
= 0, j
= 3; i
< setnum
; i
++, j
++) {
1478 robj
*obj
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
1480 if (obj
->type
!= REDIS_ZSET
&& obj
->type
!= REDIS_SET
) {
1482 addReply(c
,shared
.wrongtypeerr
);
1486 src
[i
].subject
= obj
;
1487 src
[i
].type
= obj
->type
;
1488 src
[i
].encoding
= obj
->encoding
;
1490 src
[i
].subject
= NULL
;
1493 /* Default all weights to 1. */
1494 src
[i
].weight
= 1.0;
1497 /* parse optional extra arguments */
1499 int remaining
= c
->argc
- j
;
1502 if (remaining
>= (setnum
+ 1) && !strcasecmp(c
->argv
[j
]->ptr
,"weights")) {
1504 for (i
= 0; i
< setnum
; i
++, j
++, remaining
--) {
1505 if (getDoubleFromObjectOrReply(c
,c
->argv
[j
],&src
[i
].weight
,
1506 "weight value is not a float") != REDIS_OK
)
1512 } else if (remaining
>= 2 && !strcasecmp(c
->argv
[j
]->ptr
,"aggregate")) {
1514 if (!strcasecmp(c
->argv
[j
]->ptr
,"sum")) {
1515 aggregate
= REDIS_AGGR_SUM
;
1516 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"min")) {
1517 aggregate
= REDIS_AGGR_MIN
;
1518 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"max")) {
1519 aggregate
= REDIS_AGGR_MAX
;
1522 addReply(c
,shared
.syntaxerr
);
1528 addReply(c
,shared
.syntaxerr
);
1534 for (i
= 0; i
< setnum
; i
++)
1535 zuiInitIterator(&src
[i
]);
1537 /* sort sets from the smallest to largest, this will improve our
1538 * algorithm's performance */
1539 qsort(src
,setnum
,sizeof(zsetopsrc
),zuiCompareByCardinality
);
1541 dstobj
= createZsetObject();
1542 dstzset
= dstobj
->ptr
;
1543 memset(&zval
, 0, sizeof(zval
));
1545 if (op
== REDIS_OP_INTER
) {
1546 /* Skip everything if the smallest input is empty. */
1547 if (zuiLength(&src
[0]) > 0) {
1548 /* Precondition: as src[0] is non-empty and the inputs are ordered
1549 * by size, all src[i > 0] are non-empty too. */
1550 while (zuiNext(&src
[0],&zval
)) {
1551 double score
, value
;
1553 score
= src
[0].weight
* zval
.score
;
1554 if (isnan(score
)) score
= 0;
1556 for (j
= 1; j
< setnum
; j
++) {
1557 /* It is not safe to access the zset we are
1558 * iterating, so explicitly check for equal object. */
1559 if (src
[j
].subject
== src
[0].subject
) {
1560 value
= zval
.score
*src
[j
].weight
;
1561 zunionInterAggregate(&score
,value
,aggregate
);
1562 } else if (zuiFind(&src
[j
],&zval
,&value
)) {
1563 value
*= src
[j
].weight
;
1564 zunionInterAggregate(&score
,value
,aggregate
);
1570 /* Only continue when present in every input. */
1572 tmp
= zuiObjectFromValue(&zval
);
1573 znode
= zslInsert(dstzset
->zsl
,score
,tmp
);
1574 incrRefCount(tmp
); /* added to skiplist */
1575 dictAdd(dstzset
->dict
,tmp
,&znode
->score
);
1576 incrRefCount(tmp
); /* added to dictionary */
1578 if (tmp
->encoding
== REDIS_ENCODING_RAW
)
1579 if (sdslen(tmp
->ptr
) > maxelelen
)
1580 maxelelen
= sdslen(tmp
->ptr
);
1584 } else if (op
== REDIS_OP_UNION
) {
1585 for (i
= 0; i
< setnum
; i
++) {
1586 if (zuiLength(&src
[i
]) == 0)
1589 while (zuiNext(&src
[i
],&zval
)) {
1590 double score
, value
;
1592 /* Skip key when already processed */
1593 if (dictFind(dstzset
->dict
,zuiObjectFromValue(&zval
)) != NULL
)
1596 /* Initialize score */
1597 score
= src
[i
].weight
* zval
.score
;
1598 if (isnan(score
)) score
= 0;
1600 /* Because the inputs are sorted by size, it's only possible
1601 * for sets at larger indices to hold this element. */
1602 for (j
= (i
+1); j
< setnum
; j
++) {
1603 /* It is not safe to access the zset we are
1604 * iterating, so explicitly check for equal object. */
1605 if(src
[j
].subject
== src
[i
].subject
) {
1606 value
= zval
.score
*src
[j
].weight
;
1607 zunionInterAggregate(&score
,value
,aggregate
);
1608 } else if (zuiFind(&src
[j
],&zval
,&value
)) {
1609 value
*= src
[j
].weight
;
1610 zunionInterAggregate(&score
,value
,aggregate
);
1614 tmp
= zuiObjectFromValue(&zval
);
1615 znode
= zslInsert(dstzset
->zsl
,score
,tmp
);
1616 incrRefCount(zval
.ele
); /* added to skiplist */
1617 dictAdd(dstzset
->dict
,tmp
,&znode
->score
);
1618 incrRefCount(zval
.ele
); /* added to dictionary */
1620 if (tmp
->encoding
== REDIS_ENCODING_RAW
)
1621 if (sdslen(tmp
->ptr
) > maxelelen
)
1622 maxelelen
= sdslen(tmp
->ptr
);
1626 redisPanic("Unknown operator");
1629 for (i
= 0; i
< setnum
; i
++)
1630 zuiClearIterator(&src
[i
]);
1632 if (dbDelete(c
->db
,dstkey
)) {
1633 signalModifiedKey(c
->db
,dstkey
);
1637 if (dstzset
->zsl
->length
) {
1638 /* Convert to ziplist when in limits. */
1639 if (dstzset
->zsl
->length
<= server
.zset_max_ziplist_entries
&&
1640 maxelelen
<= server
.zset_max_ziplist_value
)
1641 zsetConvert(dstobj
,REDIS_ENCODING_ZIPLIST
);
1643 dbAdd(c
->db
,dstkey
,dstobj
);
1644 addReplyLongLong(c
,zsetLength(dstobj
));
1645 if (!touched
) signalModifiedKey(c
->db
,dstkey
);
1648 decrRefCount(dstobj
);
1649 addReply(c
,shared
.czero
);
1654 void zunionstoreCommand(redisClient
*c
) {
1655 zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_UNION
);
1658 void zinterstoreCommand(redisClient
*c
) {
1659 zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_INTER
);
1662 void zrangeGenericCommand(redisClient
*c
, int reverse
) {
1663 robj
*key
= c
->argv
[1];
1671 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
1672 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
1674 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
1676 } else if (c
->argc
>= 5) {
1677 addReply(c
,shared
.syntaxerr
);
1681 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
1682 || checkType(c
,zobj
,REDIS_ZSET
)) return;
1684 /* Sanitize indexes. */
1685 llen
= zsetLength(zobj
);
1686 if (start
< 0) start
= llen
+start
;
1687 if (end
< 0) end
= llen
+end
;
1688 if (start
< 0) start
= 0;
1690 /* Invariant: start >= 0, so this test will be true when end < 0.
1691 * The range is empty when start > end or start >= length. */
1692 if (start
> end
|| start
>= llen
) {
1693 addReply(c
,shared
.emptymultibulk
);
1696 if (end
>= llen
) end
= llen
-1;
1697 rangelen
= (end
-start
)+1;
1699 /* Return the result in form of a multi-bulk reply */
1700 addReplyMultiBulkLen(c
, withscores
? (rangelen
*2) : rangelen
);
1702 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1703 unsigned char *zl
= zobj
->ptr
;
1704 unsigned char *eptr
, *sptr
;
1705 unsigned char *vstr
;
1710 eptr
= ziplistIndex(zl
,-2-(2*start
));
1712 eptr
= ziplistIndex(zl
,2*start
);
1714 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
1715 sptr
= ziplistNext(zl
,eptr
);
1717 while (rangelen
--) {
1718 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
&& sptr
!= NULL
);
1719 redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
1721 addReplyBulkLongLong(c
,vlong
);
1723 addReplyBulkCBuffer(c
,vstr
,vlen
);
1726 addReplyDouble(c
,zzlGetScore(sptr
));
1729 zzlPrev(zl
,&eptr
,&sptr
);
1731 zzlNext(zl
,&eptr
,&sptr
);
1734 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1735 zset
*zs
= zobj
->ptr
;
1736 zskiplist
*zsl
= zs
->zsl
;
1740 /* Check if starting point is trivial, before doing log(N) lookup. */
1744 ln
= zslGetElementByRank(zsl
,llen
-start
);
1746 ln
= zsl
->header
->level
[0].forward
;
1748 ln
= zslGetElementByRank(zsl
,start
+1);
1752 redisAssertWithInfo(c
,zobj
,ln
!= NULL
);
1754 addReplyBulk(c
,ele
);
1756 addReplyDouble(c
,ln
->score
);
1757 ln
= reverse
? ln
->backward
: ln
->level
[0].forward
;
1760 redisPanic("Unknown sorted set encoding");
1764 void zrangeCommand(redisClient
*c
) {
1765 zrangeGenericCommand(c
,0);
1768 void zrevrangeCommand(redisClient
*c
) {
1769 zrangeGenericCommand(c
,1);
1772 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
1773 void genericZrangebyscoreCommand(redisClient
*c
, int reverse
) {
1775 robj
*key
= c
->argv
[1];
1777 long offset
= 0, limit
= -1;
1779 unsigned long rangelen
= 0;
1780 void *replylen
= NULL
;
1783 /* Parse the range arguments. */
1785 /* Range is given as [max,min] */
1786 maxidx
= 2; minidx
= 3;
1788 /* Range is given as [min,max] */
1789 minidx
= 2; maxidx
= 3;
1792 if (zslParseRange(c
->argv
[minidx
],c
->argv
[maxidx
],&range
) != REDIS_OK
) {
1793 addReplyError(c
,"min or max is not a float");
1797 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1798 * 4 arguments, so we'll never enter the following code path. */
1800 int remaining
= c
->argc
- 4;
1804 if (remaining
>= 1 && !strcasecmp(c
->argv
[pos
]->ptr
,"withscores")) {
1807 } else if (remaining
>= 3 && !strcasecmp(c
->argv
[pos
]->ptr
,"limit")) {
1808 if ((getLongFromObjectOrReply(c
, c
->argv
[pos
+1], &offset
, NULL
) != REDIS_OK
) ||
1809 (getLongFromObjectOrReply(c
, c
->argv
[pos
+2], &limit
, NULL
) != REDIS_OK
)) return;
1810 pos
+= 3; remaining
-= 3;
1812 addReply(c
,shared
.syntaxerr
);
1818 /* Ok, lookup the key and get the range */
1819 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
||
1820 checkType(c
,zobj
,REDIS_ZSET
)) return;
1822 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1823 unsigned char *zl
= zobj
->ptr
;
1824 unsigned char *eptr
, *sptr
;
1825 unsigned char *vstr
;
1830 /* If reversed, get the last node in range as starting point. */
1832 eptr
= zzlLastInRange(zl
,range
);
1834 eptr
= zzlFirstInRange(zl
,range
);
1837 /* No "first" element in the specified interval. */
1839 addReply(c
, shared
.emptymultibulk
);
1843 /* Get score pointer for the first element. */
1844 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
1845 sptr
= ziplistNext(zl
,eptr
);
1847 /* We don't know in advance how many matching elements there are in the
1848 * list, so we push this object that will represent the multi-bulk
1849 * length in the output buffer, and will "fix" it later */
1850 replylen
= addDeferredMultiBulkLength(c
);
1852 /* If there is an offset, just traverse the number of elements without
1853 * checking the score because that is done in the next loop. */
1854 while (eptr
&& offset
--) {
1856 zzlPrev(zl
,&eptr
,&sptr
);
1858 zzlNext(zl
,&eptr
,&sptr
);
1862 while (eptr
&& limit
--) {
1863 score
= zzlGetScore(sptr
);
1865 /* Abort when the node is no longer in range. */
1867 if (!zslValueGteMin(score
,&range
)) break;
1869 if (!zslValueLteMax(score
,&range
)) break;
1872 /* We know the element exists, so ziplistGet should always succeed */
1873 redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
1877 addReplyBulkLongLong(c
,vlong
);
1879 addReplyBulkCBuffer(c
,vstr
,vlen
);
1883 addReplyDouble(c
,score
);
1886 /* Move to next node */
1888 zzlPrev(zl
,&eptr
,&sptr
);
1890 zzlNext(zl
,&eptr
,&sptr
);
1893 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1894 zset
*zs
= zobj
->ptr
;
1895 zskiplist
*zsl
= zs
->zsl
;
1898 /* If reversed, get the last node in range as starting point. */
1900 ln
= zslLastInRange(zsl
,range
);
1902 ln
= zslFirstInRange(zsl
,range
);
1905 /* No "first" element in the specified interval. */
1907 addReply(c
, shared
.emptymultibulk
);
1911 /* We don't know in advance how many matching elements there are in the
1912 * list, so we push this object that will represent the multi-bulk
1913 * length in the output buffer, and will "fix" it later */
1914 replylen
= addDeferredMultiBulkLength(c
);
1916 /* If there is an offset, just traverse the number of elements without
1917 * checking the score because that is done in the next loop. */
1918 while (ln
&& offset
--) {
1922 ln
= ln
->level
[0].forward
;
1926 while (ln
&& limit
--) {
1927 /* Abort when the node is no longer in range. */
1929 if (!zslValueGteMin(ln
->score
,&range
)) break;
1931 if (!zslValueLteMax(ln
->score
,&range
)) break;
1935 addReplyBulk(c
,ln
->obj
);
1938 addReplyDouble(c
,ln
->score
);
1941 /* Move to next node */
1945 ln
= ln
->level
[0].forward
;
1949 redisPanic("Unknown sorted set encoding");
1956 setDeferredMultiBulkLength(c
, replylen
, rangelen
);
1959 void zrangebyscoreCommand(redisClient
*c
) {
1960 genericZrangebyscoreCommand(c
,0);
1963 void zrevrangebyscoreCommand(redisClient
*c
) {
1964 genericZrangebyscoreCommand(c
,1);
1967 void zcountCommand(redisClient
*c
) {
1968 robj
*key
= c
->argv
[1];
1973 /* Parse the range arguments */
1974 if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) {
1975 addReplyError(c
,"min or max is not a float");
1979 /* Lookup the sorted set */
1980 if ((zobj
= lookupKeyReadOrReply(c
, key
, shared
.czero
)) == NULL
||
1981 checkType(c
, zobj
, REDIS_ZSET
)) return;
1983 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1984 unsigned char *zl
= zobj
->ptr
;
1985 unsigned char *eptr
, *sptr
;
1988 /* Use the first element in range as the starting point */
1989 eptr
= zzlFirstInRange(zl
,range
);
1991 /* No "first" element */
1993 addReply(c
, shared
.czero
);
1997 /* First element is in range */
1998 sptr
= ziplistNext(zl
,eptr
);
1999 score
= zzlGetScore(sptr
);
2000 redisAssertWithInfo(c
,zobj
,zslValueLteMax(score
,&range
));
2002 /* Iterate over elements in range */
2004 score
= zzlGetScore(sptr
);
2006 /* Abort when the node is no longer in range. */
2007 if (!zslValueLteMax(score
,&range
)) {
2011 zzlNext(zl
,&eptr
,&sptr
);
2014 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2015 zset
*zs
= zobj
->ptr
;
2016 zskiplist
*zsl
= zs
->zsl
;
2020 /* Find first element in range */
2021 zn
= zslFirstInRange(zsl
, range
);
2023 /* Use rank of first element, if any, to determine preliminary count */
2025 rank
= zslGetRank(zsl
, zn
->score
, zn
->obj
);
2026 count
= (zsl
->length
- (rank
- 1));
2028 /* Find last element in range */
2029 zn
= zslLastInRange(zsl
, range
);
2031 /* Use rank of last element, if any, to determine the actual count */
2033 rank
= zslGetRank(zsl
, zn
->score
, zn
->obj
);
2034 count
-= (zsl
->length
- rank
);
2038 redisPanic("Unknown sorted set encoding");
2041 addReplyLongLong(c
, count
);
2044 void zcardCommand(redisClient
*c
) {
2045 robj
*key
= c
->argv
[1];
2048 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.czero
)) == NULL
||
2049 checkType(c
,zobj
,REDIS_ZSET
)) return;
2051 addReplyLongLong(c
,zsetLength(zobj
));
2054 void zscoreCommand(redisClient
*c
) {
2055 robj
*key
= c
->argv
[1];
2059 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL
||
2060 checkType(c
,zobj
,REDIS_ZSET
)) return;
2062 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
2063 if (zzlFind(zobj
->ptr
,c
->argv
[2],&score
) != NULL
)
2064 addReplyDouble(c
,score
);
2066 addReply(c
,shared
.nullbulk
);
2067 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2068 zset
*zs
= zobj
->ptr
;
2071 c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
2072 de
= dictFind(zs
->dict
,c
->argv
[2]);
2074 score
= *(double*)dictGetVal(de
);
2075 addReplyDouble(c
,score
);
2077 addReply(c
,shared
.nullbulk
);
2080 redisPanic("Unknown sorted set encoding");
2084 void zrankGenericCommand(redisClient
*c
, int reverse
) {
2085 robj
*key
= c
->argv
[1];
2086 robj
*ele
= c
->argv
[2];
2091 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL
||
2092 checkType(c
,zobj
,REDIS_ZSET
)) return;
2093 llen
= zsetLength(zobj
);
2095 redisAssertWithInfo(c
,ele
,ele
->encoding
== REDIS_ENCODING_RAW
);
2096 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
2097 unsigned char *zl
= zobj
->ptr
;
2098 unsigned char *eptr
, *sptr
;
2100 eptr
= ziplistIndex(zl
,0);
2101 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
2102 sptr
= ziplistNext(zl
,eptr
);
2103 redisAssertWithInfo(c
,zobj
,sptr
!= NULL
);
2106 while(eptr
!= NULL
) {
2107 if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
)))
2110 zzlNext(zl
,&eptr
,&sptr
);
2115 addReplyLongLong(c
,llen
-rank
);
2117 addReplyLongLong(c
,rank
-1);
2119 addReply(c
,shared
.nullbulk
);
2121 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2122 zset
*zs
= zobj
->ptr
;
2123 zskiplist
*zsl
= zs
->zsl
;
2127 ele
= c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
2128 de
= dictFind(zs
->dict
,ele
);
2130 score
= *(double*)dictGetVal(de
);
2131 rank
= zslGetRank(zsl
,score
,ele
);
2132 redisAssertWithInfo(c
,ele
,rank
); /* Existing elements always have a rank. */
2134 addReplyLongLong(c
,llen
-rank
);
2136 addReplyLongLong(c
,rank
-1);
2138 addReply(c
,shared
.nullbulk
);
2141 redisPanic("Unknown sorted set encoding");
2145 void zrankCommand(redisClient
*c
) {
2146 zrankGenericCommand(c
, 0);
2149 void zrevrankCommand(redisClient
*c
) {
2150 zrankGenericCommand(c
, 1);