]>
git.saurik.com Git - redis.git/blob - src/t_zset.c
5 /*-----------------------------------------------------------------------------
7 *----------------------------------------------------------------------------*/
9 /* ZSETs are ordered sets using two data structures to hold the same elements
10 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
13 * The elements are added to an hash table mapping Redis objects to scores.
14 * At the same time the elements are added to a skip list mapping scores
15 * to Redis objects (so objects are sorted by scores in this "view"). */
17 /* This skiplist implementation is almost a C translation of the original
18 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
19 * Alternative to Balanced Trees", modified in three ways:
20 * a) this implementation allows for repeated scores.
21 * b) the comparison is not just by key (our 'score') but by satellite data.
22 * c) there is a back pointer, so it's a doubly linked list with the back
23 * pointers being only at "level 1". This allows to traverse the list
24 * from tail to head, useful for ZREVRANGE. */
26 zskiplistNode
*zslCreateNode(int level
, double score
, robj
*obj
) {
27 zskiplistNode
*zn
= zmalloc(sizeof(*zn
)+level
*sizeof(struct zskiplistLevel
));
33 zskiplist
*zslCreate(void) {
37 zsl
= zmalloc(sizeof(*zsl
));
40 zsl
->header
= zslCreateNode(ZSKIPLIST_MAXLEVEL
,0,NULL
);
41 for (j
= 0; j
< ZSKIPLIST_MAXLEVEL
; j
++) {
42 zsl
->header
->level
[j
].forward
= NULL
;
43 zsl
->header
->level
[j
].span
= 0;
45 zsl
->header
->backward
= NULL
;
50 void zslFreeNode(zskiplistNode
*node
) {
51 decrRefCount(node
->obj
);
55 void zslFree(zskiplist
*zsl
) {
56 zskiplistNode
*node
= zsl
->header
->level
[0].forward
, *next
;
60 next
= node
->level
[0].forward
;
67 int zslRandomLevel(void) {
69 while ((random()&0xFFFF) < (ZSKIPLIST_P
* 0xFFFF))
71 return (level
<ZSKIPLIST_MAXLEVEL
) ? level
: ZSKIPLIST_MAXLEVEL
;
74 zskiplistNode
*zslInsert(zskiplist
*zsl
, double score
, robj
*obj
) {
75 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
76 unsigned int rank
[ZSKIPLIST_MAXLEVEL
];
79 redisAssert(!isnan(score
));
81 for (i
= zsl
->level
-1; i
>= 0; i
--) {
82 /* store rank that is crossed to reach the insert position */
83 rank
[i
] = i
== (zsl
->level
-1) ? 0 : rank
[i
+1];
84 while (x
->level
[i
].forward
&&
85 (x
->level
[i
].forward
->score
< score
||
86 (x
->level
[i
].forward
->score
== score
&&
87 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0))) {
88 rank
[i
] += x
->level
[i
].span
;
89 x
= x
->level
[i
].forward
;
93 /* we assume the key is not already inside, since we allow duplicated
94 * scores, and the re-insertion of score and redis object should never
95 * happpen since the caller of zslInsert() should test in the hash table
96 * if the element is already inside or not. */
97 level
= zslRandomLevel();
98 if (level
> zsl
->level
) {
99 for (i
= zsl
->level
; i
< level
; i
++) {
101 update
[i
] = zsl
->header
;
102 update
[i
]->level
[i
].span
= zsl
->length
;
106 x
= zslCreateNode(level
,score
,obj
);
107 for (i
= 0; i
< level
; i
++) {
108 x
->level
[i
].forward
= update
[i
]->level
[i
].forward
;
109 update
[i
]->level
[i
].forward
= x
;
111 /* update span covered by update[i] as x is inserted here */
112 x
->level
[i
].span
= update
[i
]->level
[i
].span
- (rank
[0] - rank
[i
]);
113 update
[i
]->level
[i
].span
= (rank
[0] - rank
[i
]) + 1;
116 /* increment span for untouched levels */
117 for (i
= level
; i
< zsl
->level
; i
++) {
118 update
[i
]->level
[i
].span
++;
121 x
->backward
= (update
[0] == zsl
->header
) ? NULL
: update
[0];
122 if (x
->level
[0].forward
)
123 x
->level
[0].forward
->backward
= x
;
130 /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
131 void zslDeleteNode(zskiplist
*zsl
, zskiplistNode
*x
, zskiplistNode
**update
) {
133 for (i
= 0; i
< zsl
->level
; i
++) {
134 if (update
[i
]->level
[i
].forward
== x
) {
135 update
[i
]->level
[i
].span
+= x
->level
[i
].span
- 1;
136 update
[i
]->level
[i
].forward
= x
->level
[i
].forward
;
138 update
[i
]->level
[i
].span
-= 1;
141 if (x
->level
[0].forward
) {
142 x
->level
[0].forward
->backward
= x
->backward
;
144 zsl
->tail
= x
->backward
;
146 while(zsl
->level
> 1 && zsl
->header
->level
[zsl
->level
-1].forward
== NULL
)
151 /* Delete an element with matching score/object from the skiplist. */
152 int zslDelete(zskiplist
*zsl
, double score
, robj
*obj
) {
153 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
157 for (i
= zsl
->level
-1; i
>= 0; i
--) {
158 while (x
->level
[i
].forward
&&
159 (x
->level
[i
].forward
->score
< score
||
160 (x
->level
[i
].forward
->score
== score
&&
161 compareStringObjects(x
->level
[i
].forward
->obj
,obj
) < 0)))
162 x
= x
->level
[i
].forward
;
165 /* We may have multiple elements with the same score, what we need
166 * is to find the element with both the right score and object. */
167 x
= x
->level
[0].forward
;
168 if (x
&& score
== x
->score
&& equalStringObjects(x
->obj
,obj
)) {
169 zslDeleteNode(zsl
, x
, update
);
173 return 0; /* not found */
175 return 0; /* not found */
178 static int zslValueGteMin(double value
, zrangespec
*spec
) {
179 return spec
->minex
? (value
> spec
->min
) : (value
>= spec
->min
);
182 static int zslValueLteMax(double value
, zrangespec
*spec
) {
183 return spec
->maxex
? (value
< spec
->max
) : (value
<= spec
->max
);
186 /* Returns if there is a part of the zset is in range. */
187 int zslIsInRange(zskiplist
*zsl
, zrangespec
*range
) {
190 /* Test for ranges that will always be empty. */
191 if (range
->min
> range
->max
||
192 (range
->min
== range
->max
&& (range
->minex
|| range
->maxex
)))
195 if (x
== NULL
|| !zslValueGteMin(x
->score
,range
))
197 x
= zsl
->header
->level
[0].forward
;
198 if (x
== NULL
|| !zslValueLteMax(x
->score
,range
))
203 /* Find the first node that is contained in the specified range.
204 * Returns NULL when no element is contained in the range. */
205 zskiplistNode
*zslFirstInRange(zskiplist
*zsl
, zrangespec range
) {
209 /* If everything is out of range, return early. */
210 if (!zslIsInRange(zsl
,&range
)) return NULL
;
213 for (i
= zsl
->level
-1; i
>= 0; i
--) {
214 /* Go forward while *OUT* of range. */
215 while (x
->level
[i
].forward
&&
216 !zslValueGteMin(x
->level
[i
].forward
->score
,&range
))
217 x
= x
->level
[i
].forward
;
220 /* This is an inner range, so the next node cannot be NULL. */
221 x
= x
->level
[0].forward
;
222 redisAssert(x
!= NULL
);
224 /* Check if score <= max. */
225 if (!zslValueLteMax(x
->score
,&range
)) return NULL
;
229 /* Find the last node that is contained in the specified range.
230 * Returns NULL when no element is contained in the range. */
231 zskiplistNode
*zslLastInRange(zskiplist
*zsl
, zrangespec range
) {
235 /* If everything is out of range, return early. */
236 if (!zslIsInRange(zsl
,&range
)) return NULL
;
239 for (i
= zsl
->level
-1; i
>= 0; i
--) {
240 /* Go forward while *IN* range. */
241 while (x
->level
[i
].forward
&&
242 zslValueLteMax(x
->level
[i
].forward
->score
,&range
))
243 x
= x
->level
[i
].forward
;
246 /* This is an inner range, so this node cannot be NULL. */
247 redisAssert(x
!= NULL
);
249 /* Check if score >= min. */
250 if (!zslValueGteMin(x
->score
,&range
)) return NULL
;
254 /* Delete all the elements with score between min and max from the skiplist.
255 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
256 * Note that this function takes the reference to the hash table view of the
257 * sorted set, in order to remove the elements from the hash table too. */
258 unsigned long zslDeleteRangeByScore(zskiplist
*zsl
, zrangespec range
, dict
*dict
) {
259 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
260 unsigned long removed
= 0;
264 for (i
= zsl
->level
-1; i
>= 0; i
--) {
265 while (x
->level
[i
].forward
&& (range
.minex
?
266 x
->level
[i
].forward
->score
<= range
.min
:
267 x
->level
[i
].forward
->score
< range
.min
))
268 x
= x
->level
[i
].forward
;
272 /* Current node is the last with score < or <= min. */
273 x
= x
->level
[0].forward
;
275 /* Delete nodes while in range. */
276 while (x
&& (range
.maxex
? x
->score
< range
.max
: x
->score
<= range
.max
)) {
277 zskiplistNode
*next
= x
->level
[0].forward
;
278 zslDeleteNode(zsl
,x
,update
);
279 dictDelete(dict
,x
->obj
);
287 /* Delete all the elements with rank between start and end from the skiplist.
288 * Start and end are inclusive. Note that start and end need to be 1-based */
289 unsigned long zslDeleteRangeByRank(zskiplist
*zsl
, unsigned int start
, unsigned int end
, dict
*dict
) {
290 zskiplistNode
*update
[ZSKIPLIST_MAXLEVEL
], *x
;
291 unsigned long traversed
= 0, removed
= 0;
295 for (i
= zsl
->level
-1; i
>= 0; i
--) {
296 while (x
->level
[i
].forward
&& (traversed
+ x
->level
[i
].span
) < start
) {
297 traversed
+= x
->level
[i
].span
;
298 x
= x
->level
[i
].forward
;
304 x
= x
->level
[0].forward
;
305 while (x
&& traversed
<= end
) {
306 zskiplistNode
*next
= x
->level
[0].forward
;
307 zslDeleteNode(zsl
,x
,update
);
308 dictDelete(dict
,x
->obj
);
317 /* Find the rank for an element by both score and key.
318 * Returns 0 when the element cannot be found, rank otherwise.
319 * Note that the rank is 1-based due to the span of zsl->header to the
321 unsigned long zslGetRank(zskiplist
*zsl
, double score
, robj
*o
) {
323 unsigned long rank
= 0;
327 for (i
= zsl
->level
-1; i
>= 0; i
--) {
328 while (x
->level
[i
].forward
&&
329 (x
->level
[i
].forward
->score
< score
||
330 (x
->level
[i
].forward
->score
== score
&&
331 compareStringObjects(x
->level
[i
].forward
->obj
,o
) <= 0))) {
332 rank
+= x
->level
[i
].span
;
333 x
= x
->level
[i
].forward
;
336 /* x might be equal to zsl->header, so test if obj is non-NULL */
337 if (x
->obj
&& equalStringObjects(x
->obj
,o
)) {
344 /* Finds an element by its rank. The rank argument needs to be 1-based. */
345 zskiplistNode
* zslGetElementByRank(zskiplist
*zsl
, unsigned long rank
) {
347 unsigned long traversed
= 0;
351 for (i
= zsl
->level
-1; i
>= 0; i
--) {
352 while (x
->level
[i
].forward
&& (traversed
+ x
->level
[i
].span
) <= rank
)
354 traversed
+= x
->level
[i
].span
;
355 x
= x
->level
[i
].forward
;
357 if (traversed
== rank
) {
364 /* Populate the rangespec according to the objects min and max. */
365 static int zslParseRange(robj
*min
, robj
*max
, zrangespec
*spec
) {
367 spec
->minex
= spec
->maxex
= 0;
369 /* Parse the min-max interval. If one of the values is prefixed
370 * by the "(" character, it's considered "open". For instance
371 * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
372 * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
373 if (min
->encoding
== REDIS_ENCODING_INT
) {
374 spec
->min
= (long)min
->ptr
;
376 if (((char*)min
->ptr
)[0] == '(') {
377 spec
->min
= strtod((char*)min
->ptr
+1,&eptr
);
378 if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
;
381 spec
->min
= strtod((char*)min
->ptr
,&eptr
);
382 if (eptr
[0] != '\0' || isnan(spec
->min
)) return REDIS_ERR
;
385 if (max
->encoding
== REDIS_ENCODING_INT
) {
386 spec
->max
= (long)max
->ptr
;
388 if (((char*)max
->ptr
)[0] == '(') {
389 spec
->max
= strtod((char*)max
->ptr
+1,&eptr
);
390 if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
;
393 spec
->max
= strtod((char*)max
->ptr
,&eptr
);
394 if (eptr
[0] != '\0' || isnan(spec
->max
)) return REDIS_ERR
;
401 /*-----------------------------------------------------------------------------
402 * Ziplist-backed sorted set API
403 *----------------------------------------------------------------------------*/
405 double zzlGetScore(unsigned char *sptr
) {
412 redisAssert(sptr
!= NULL
);
413 redisAssert(ziplistGet(sptr
,&vstr
,&vlen
,&vlong
));
416 memcpy(buf
,vstr
,vlen
);
418 score
= strtod(buf
,NULL
);
426 /* Compare element in sorted set with given element. */
427 int zzlCompareElements(unsigned char *eptr
, unsigned char *cstr
, unsigned int clen
) {
431 unsigned char vbuf
[32];
434 redisAssert(ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
436 /* Store string representation of long long in buf. */
437 vlen
= ll2string((char*)vbuf
,sizeof(vbuf
),vlong
);
441 minlen
= (vlen
< clen
) ? vlen
: clen
;
442 cmp
= memcmp(vstr
,cstr
,minlen
);
443 if (cmp
== 0) return vlen
-clen
;
447 unsigned int zzlLength(unsigned char *zl
) {
448 return ziplistLen(zl
)/2;
451 /* Move to next entry based on the values in eptr and sptr. Both are set to
452 * NULL when there is no next entry. */
453 void zzlNext(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) {
454 unsigned char *_eptr
, *_sptr
;
455 redisAssert(*eptr
!= NULL
&& *sptr
!= NULL
);
457 _eptr
= ziplistNext(zl
,*sptr
);
459 _sptr
= ziplistNext(zl
,_eptr
);
460 redisAssert(_sptr
!= NULL
);
470 /* Move to the previous entry based on the values in eptr and sptr. Both are
471 * set to NULL when there is no next entry. */
472 void zzlPrev(unsigned char *zl
, unsigned char **eptr
, unsigned char **sptr
) {
473 unsigned char *_eptr
, *_sptr
;
474 redisAssert(*eptr
!= NULL
&& *sptr
!= NULL
);
476 _sptr
= ziplistPrev(zl
,*eptr
);
478 _eptr
= ziplistPrev(zl
,_sptr
);
479 redisAssert(_eptr
!= NULL
);
481 /* No previous entry. */
489 /* Returns if there is a part of the zset is in range. Should only be used
490 * internally by zzlFirstInRange and zzlLastInRange. */
491 int zzlIsInRange(unsigned char *zl
, zrangespec
*range
) {
495 /* Test for ranges that will always be empty. */
496 if (range
->min
> range
->max
||
497 (range
->min
== range
->max
&& (range
->minex
|| range
->maxex
)))
500 p
= ziplistIndex(zl
,-1); /* Last score. */
501 redisAssert(p
!= NULL
);
502 score
= zzlGetScore(p
);
503 if (!zslValueGteMin(score
,range
))
506 p
= ziplistIndex(zl
,1); /* First score. */
507 redisAssert(p
!= NULL
);
508 score
= zzlGetScore(p
);
509 if (!zslValueLteMax(score
,range
))
515 /* Find pointer to the first element contained in the specified range.
516 * Returns NULL when no element is contained in the range. */
517 unsigned char *zzlFirstInRange(unsigned char *zl
, zrangespec range
) {
518 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
521 /* If everything is out of range, return early. */
522 if (!zzlIsInRange(zl
,&range
)) return NULL
;
524 while (eptr
!= NULL
) {
525 sptr
= ziplistNext(zl
,eptr
);
526 redisAssert(sptr
!= NULL
);
528 score
= zzlGetScore(sptr
);
529 if (zslValueGteMin(score
,&range
)) {
530 /* Check if score <= max. */
531 if (zslValueLteMax(score
,&range
))
536 /* Move to next element. */
537 eptr
= ziplistNext(zl
,sptr
);
543 /* Find pointer to the last element contained in the specified range.
544 * Returns NULL when no element is contained in the range. */
545 unsigned char *zzlLastInRange(unsigned char *zl
, zrangespec range
) {
546 unsigned char *eptr
= ziplistIndex(zl
,-2), *sptr
;
549 /* If everything is out of range, return early. */
550 if (!zzlIsInRange(zl
,&range
)) return NULL
;
552 while (eptr
!= NULL
) {
553 sptr
= ziplistNext(zl
,eptr
);
554 redisAssert(sptr
!= NULL
);
556 score
= zzlGetScore(sptr
);
557 if (zslValueLteMax(score
,&range
)) {
558 /* Check if score >= min. */
559 if (zslValueGteMin(score
,&range
))
564 /* Move to previous element by moving to the score of previous element.
565 * When this returns NULL, we know there also is no element. */
566 sptr
= ziplistPrev(zl
,eptr
);
568 redisAssert((eptr
= ziplistPrev(zl
,sptr
)) != NULL
);
576 unsigned char *zzlFind(unsigned char *zl
, robj
*ele
, double *score
) {
577 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
579 ele
= getDecodedObject(ele
);
580 while (eptr
!= NULL
) {
581 sptr
= ziplistNext(zl
,eptr
);
582 redisAssertWithInfo(NULL
,ele
,sptr
!= NULL
);
584 if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
))) {
585 /* Matching element, pull out score. */
586 if (score
!= NULL
) *score
= zzlGetScore(sptr
);
591 /* Move to next element. */
592 eptr
= ziplistNext(zl
,sptr
);
599 /* Delete (element,score) pair from ziplist. Use local copy of eptr because we
600 * don't want to modify the one given as argument. */
601 unsigned char *zzlDelete(unsigned char *zl
, unsigned char *eptr
) {
602 unsigned char *p
= eptr
;
604 /* TODO: add function to ziplist API to delete N elements from offset. */
605 zl
= ziplistDelete(zl
,&p
);
606 zl
= ziplistDelete(zl
,&p
);
610 unsigned char *zzlInsertAt(unsigned char *zl
, unsigned char *eptr
, robj
*ele
, double score
) {
616 redisAssertWithInfo(NULL
,ele
,ele
->encoding
== REDIS_ENCODING_RAW
);
617 scorelen
= d2string(scorebuf
,sizeof(scorebuf
),score
);
619 zl
= ziplistPush(zl
,ele
->ptr
,sdslen(ele
->ptr
),ZIPLIST_TAIL
);
620 zl
= ziplistPush(zl
,(unsigned char*)scorebuf
,scorelen
,ZIPLIST_TAIL
);
622 /* Keep offset relative to zl, as it might be re-allocated. */
624 zl
= ziplistInsert(zl
,eptr
,ele
->ptr
,sdslen(ele
->ptr
));
627 /* Insert score after the element. */
628 redisAssertWithInfo(NULL
,ele
,(sptr
= ziplistNext(zl
,eptr
)) != NULL
);
629 zl
= ziplistInsert(zl
,sptr
,(unsigned char*)scorebuf
,scorelen
);
635 /* Insert (element,score) pair in ziplist. This function assumes the element is
636 * not yet present in the list. */
637 unsigned char *zzlInsert(unsigned char *zl
, robj
*ele
, double score
) {
638 unsigned char *eptr
= ziplistIndex(zl
,0), *sptr
;
641 ele
= getDecodedObject(ele
);
642 while (eptr
!= NULL
) {
643 sptr
= ziplistNext(zl
,eptr
);
644 redisAssertWithInfo(NULL
,ele
,sptr
!= NULL
);
645 s
= zzlGetScore(sptr
);
648 /* First element with score larger than score for element to be
649 * inserted. This means we should take its spot in the list to
650 * maintain ordering. */
651 zl
= zzlInsertAt(zl
,eptr
,ele
,score
);
653 } else if (s
== score
) {
654 /* Ensure lexicographical ordering for elements. */
655 if (zzlCompareElements(eptr
,ele
->ptr
,sdslen(ele
->ptr
)) > 0) {
656 zl
= zzlInsertAt(zl
,eptr
,ele
,score
);
661 /* Move to next element. */
662 eptr
= ziplistNext(zl
,sptr
);
665 /* Push on tail of list when it was not yet inserted. */
667 zl
= zzlInsertAt(zl
,NULL
,ele
,score
);
673 unsigned char *zzlDeleteRangeByScore(unsigned char *zl
, zrangespec range
, unsigned long *deleted
) {
674 unsigned char *eptr
, *sptr
;
676 unsigned long num
= 0;
678 if (deleted
!= NULL
) *deleted
= 0;
680 eptr
= zzlFirstInRange(zl
,range
);
681 if (eptr
== NULL
) return zl
;
683 /* When the tail of the ziplist is deleted, eptr will point to the sentinel
684 * byte and ziplistNext will return NULL. */
685 while ((sptr
= ziplistNext(zl
,eptr
)) != NULL
) {
686 score
= zzlGetScore(sptr
);
687 if (zslValueLteMax(score
,&range
)) {
688 /* Delete both the element and the score. */
689 zl
= ziplistDelete(zl
,&eptr
);
690 zl
= ziplistDelete(zl
,&eptr
);
693 /* No longer in range. */
698 if (deleted
!= NULL
) *deleted
= num
;
702 /* Delete all the elements with rank between start and end from the skiplist.
703 * Start and end are inclusive. Note that start and end need to be 1-based */
704 unsigned char *zzlDeleteRangeByRank(unsigned char *zl
, unsigned int start
, unsigned int end
, unsigned long *deleted
) {
705 unsigned int num
= (end
-start
)+1;
706 if (deleted
) *deleted
= num
;
707 zl
= ziplistDeleteRange(zl
,2*(start
-1),2*num
);
711 /*-----------------------------------------------------------------------------
712 * Common sorted set API
713 *----------------------------------------------------------------------------*/
715 unsigned int zsetLength(robj
*zobj
) {
717 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
718 length
= zzlLength(zobj
->ptr
);
719 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
720 length
= ((zset
*)zobj
->ptr
)->zsl
->length
;
722 redisPanic("Unknown sorted set encoding");
727 void zsetConvert(robj
*zobj
, int encoding
) {
729 zskiplistNode
*node
, *next
;
733 if (zobj
->encoding
== encoding
) return;
734 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
735 unsigned char *zl
= zobj
->ptr
;
736 unsigned char *eptr
, *sptr
;
741 if (encoding
!= REDIS_ENCODING_SKIPLIST
)
742 redisPanic("Unknown target encoding");
744 zs
= zmalloc(sizeof(*zs
));
745 zs
->dict
= dictCreate(&zsetDictType
,NULL
);
746 zs
->zsl
= zslCreate();
748 eptr
= ziplistIndex(zl
,0);
749 redisAssertWithInfo(NULL
,zobj
,eptr
!= NULL
);
750 sptr
= ziplistNext(zl
,eptr
);
751 redisAssertWithInfo(NULL
,zobj
,sptr
!= NULL
);
753 while (eptr
!= NULL
) {
754 score
= zzlGetScore(sptr
);
755 redisAssertWithInfo(NULL
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
757 ele
= createStringObjectFromLongLong(vlong
);
759 ele
= createStringObject((char*)vstr
,vlen
);
761 /* Has incremented refcount since it was just created. */
762 node
= zslInsert(zs
->zsl
,score
,ele
);
763 redisAssertWithInfo(NULL
,zobj
,dictAdd(zs
->dict
,ele
,&node
->score
) == DICT_OK
);
764 incrRefCount(ele
); /* Added to dictionary. */
765 zzlNext(zl
,&eptr
,&sptr
);
770 zobj
->encoding
= REDIS_ENCODING_SKIPLIST
;
771 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
772 unsigned char *zl
= ziplistNew();
774 if (encoding
!= REDIS_ENCODING_ZIPLIST
)
775 redisPanic("Unknown target encoding");
777 /* Approach similar to zslFree(), since we want to free the skiplist at
778 * the same time as creating the ziplist. */
780 dictRelease(zs
->dict
);
781 node
= zs
->zsl
->header
->level
[0].forward
;
782 zfree(zs
->zsl
->header
);
786 ele
= getDecodedObject(node
->obj
);
787 zl
= zzlInsertAt(zl
,NULL
,ele
,node
->score
);
790 next
= node
->level
[0].forward
;
797 zobj
->encoding
= REDIS_ENCODING_ZIPLIST
;
799 redisPanic("Unknown sorted set encoding");
803 /*-----------------------------------------------------------------------------
804 * Sorted set commands
805 *----------------------------------------------------------------------------*/
807 /* This generic command implements both ZADD and ZINCRBY. */
808 void zaddGenericCommand(redisClient
*c
, int incr
) {
809 static char *nanerr
= "resulting score is not a number (NaN)";
810 robj
*key
= c
->argv
[1];
814 double score
= 0, *scores
, curscore
= 0.0;
815 int j
, elements
= (c
->argc
-2)/2;
819 addReply(c
,shared
.syntaxerr
);
823 /* Start parsing all the scores, we need to emit any syntax error
824 * before executing additions to the sorted set, as the command should
825 * either execute fully or nothing at all. */
826 scores
= zmalloc(sizeof(double)*elements
);
827 for (j
= 0; j
< elements
; j
++) {
828 if (getDoubleFromObjectOrReply(c
,c
->argv
[2+j
*2],&scores
[j
],NULL
)
836 /* Lookup the key and create the sorted set if does not exist. */
837 zobj
= lookupKeyWrite(c
->db
,key
);
839 if (server
.zset_max_ziplist_entries
== 0 ||
840 server
.zset_max_ziplist_value
< sdslen(c
->argv
[3]->ptr
))
842 zobj
= createZsetObject();
844 zobj
= createZsetZiplistObject();
846 dbAdd(c
->db
,key
,zobj
);
848 if (zobj
->type
!= REDIS_ZSET
) {
849 addReply(c
,shared
.wrongtypeerr
);
855 for (j
= 0; j
< elements
; j
++) {
858 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
861 /* Prefer non-encoded element when dealing with ziplists. */
862 ele
= c
->argv
[3+j
*2];
863 if ((eptr
= zzlFind(zobj
->ptr
,ele
,&curscore
)) != NULL
) {
867 addReplyError(c
,nanerr
);
868 /* Don't need to check if the sorted set is empty
869 * because we know it has at least one element. */
875 /* Remove and re-insert when score changed. */
876 if (score
!= curscore
) {
877 zobj
->ptr
= zzlDelete(zobj
->ptr
,eptr
);
878 zobj
->ptr
= zzlInsert(zobj
->ptr
,ele
,score
);
880 signalModifiedKey(c
->db
,key
);
884 /* Optimize: check if the element is too large or the list
885 * becomes too long *before* executing zzlInsert. */
886 zobj
->ptr
= zzlInsert(zobj
->ptr
,ele
,score
);
887 if (zzlLength(zobj
->ptr
) > server
.zset_max_ziplist_entries
)
888 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
);
889 if (sdslen(ele
->ptr
) > server
.zset_max_ziplist_value
)
890 zsetConvert(zobj
,REDIS_ENCODING_SKIPLIST
);
892 signalModifiedKey(c
->db
,key
);
896 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
897 zset
*zs
= zobj
->ptr
;
898 zskiplistNode
*znode
;
901 ele
= c
->argv
[3+j
*2] = tryObjectEncoding(c
->argv
[3+j
*2]);
902 de
= dictFind(zs
->dict
,ele
);
904 curobj
= dictGetKey(de
);
905 curscore
= *(double*)dictGetVal(de
);
910 addReplyError(c
,nanerr
);
911 /* Don't need to check if the sorted set is empty
912 * because we know it has at least one element. */
918 /* Remove and re-insert when score changed. We can safely
919 * delete the key object from the skiplist, since the
920 * dictionary still has a reference to it. */
921 if (score
!= curscore
) {
922 redisAssertWithInfo(c
,curobj
,zslDelete(zs
->zsl
,curscore
,curobj
));
923 znode
= zslInsert(zs
->zsl
,score
,curobj
);
924 incrRefCount(curobj
); /* Re-inserted in skiplist. */
925 dictGetVal(de
) = &znode
->score
; /* Update score ptr. */
927 signalModifiedKey(c
->db
,key
);
931 znode
= zslInsert(zs
->zsl
,score
,ele
);
932 incrRefCount(ele
); /* Inserted in skiplist. */
933 redisAssertWithInfo(c
,NULL
,dictAdd(zs
->dict
,ele
,&znode
->score
) == DICT_OK
);
934 incrRefCount(ele
); /* Added to dictionary. */
936 signalModifiedKey(c
->db
,key
);
941 redisPanic("Unknown sorted set encoding");
945 if (incr
) /* ZINCRBY */
946 addReplyDouble(c
,score
);
948 addReplyLongLong(c
,added
);
951 void zaddCommand(redisClient
*c
) {
952 zaddGenericCommand(c
,0);
955 void zincrbyCommand(redisClient
*c
) {
956 zaddGenericCommand(c
,1);
959 void zremCommand(redisClient
*c
) {
960 robj
*key
= c
->argv
[1];
964 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
965 checkType(c
,zobj
,REDIS_ZSET
)) return;
967 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
970 for (j
= 2; j
< c
->argc
; j
++) {
971 if ((eptr
= zzlFind(zobj
->ptr
,c
->argv
[j
],NULL
)) != NULL
) {
973 zobj
->ptr
= zzlDelete(zobj
->ptr
,eptr
);
974 if (zzlLength(zobj
->ptr
) == 0) {
980 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
981 zset
*zs
= zobj
->ptr
;
985 for (j
= 2; j
< c
->argc
; j
++) {
986 de
= dictFind(zs
->dict
,c
->argv
[j
]);
990 /* Delete from the skiplist */
991 score
= *(double*)dictGetVal(de
);
992 redisAssertWithInfo(c
,c
->argv
[j
],zslDelete(zs
->zsl
,score
,c
->argv
[j
]));
994 /* Delete from the hash table */
995 dictDelete(zs
->dict
,c
->argv
[j
]);
996 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
997 if (dictSize(zs
->dict
) == 0) {
1004 redisPanic("Unknown sorted set encoding");
1008 signalModifiedKey(c
->db
,key
);
1009 server
.dirty
+= deleted
;
1011 addReplyLongLong(c
,deleted
);
1014 void zremrangebyscoreCommand(redisClient
*c
) {
1015 robj
*key
= c
->argv
[1];
1018 unsigned long deleted
;
1020 /* Parse the range arguments. */
1021 if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) {
1022 addReplyError(c
,"min or max is not a float");
1026 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
1027 checkType(c
,zobj
,REDIS_ZSET
)) return;
1029 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1030 zobj
->ptr
= zzlDeleteRangeByScore(zobj
->ptr
,range
,&deleted
);
1031 if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
);
1032 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1033 zset
*zs
= zobj
->ptr
;
1034 deleted
= zslDeleteRangeByScore(zs
->zsl
,range
,zs
->dict
);
1035 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1036 if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
);
1038 redisPanic("Unknown sorted set encoding");
1041 if (deleted
) signalModifiedKey(c
->db
,key
);
1042 server
.dirty
+= deleted
;
1043 addReplyLongLong(c
,deleted
);
1046 void zremrangebyrankCommand(redisClient
*c
) {
1047 robj
*key
= c
->argv
[1];
1052 unsigned long deleted
;
1054 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
1055 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
1057 if ((zobj
= lookupKeyWriteOrReply(c
,key
,shared
.czero
)) == NULL
||
1058 checkType(c
,zobj
,REDIS_ZSET
)) return;
1060 /* Sanitize indexes. */
1061 llen
= zsetLength(zobj
);
1062 if (start
< 0) start
= llen
+start
;
1063 if (end
< 0) end
= llen
+end
;
1064 if (start
< 0) start
= 0;
1066 /* Invariant: start >= 0, so this test will be true when end < 0.
1067 * The range is empty when start > end or start >= length. */
1068 if (start
> end
|| start
>= llen
) {
1069 addReply(c
,shared
.czero
);
1072 if (end
>= llen
) end
= llen
-1;
1074 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1075 /* Correct for 1-based rank. */
1076 zobj
->ptr
= zzlDeleteRangeByRank(zobj
->ptr
,start
+1,end
+1,&deleted
);
1077 if (zzlLength(zobj
->ptr
) == 0) dbDelete(c
->db
,key
);
1078 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1079 zset
*zs
= zobj
->ptr
;
1081 /* Correct for 1-based rank. */
1082 deleted
= zslDeleteRangeByRank(zs
->zsl
,start
+1,end
+1,zs
->dict
);
1083 if (htNeedsResize(zs
->dict
)) dictResize(zs
->dict
);
1084 if (dictSize(zs
->dict
) == 0) dbDelete(c
->db
,key
);
1086 redisPanic("Unknown sorted set encoding");
1089 if (deleted
) signalModifiedKey(c
->db
,key
);
1090 server
.dirty
+= deleted
;
1091 addReplyLongLong(c
,deleted
);
1096 int type
; /* Set, sorted set */
1101 /* Set iterators. */
1114 /* Sorted set iterators. */
1118 unsigned char *eptr
, *sptr
;
1122 zskiplistNode
*node
;
1129 /* Use dirty flags for pointers that need to be cleaned up in the next
1130 * iteration over the zsetopval. The dirty flag for the long long value is
1131 * special, since long long values don't need cleanup. Instead, it means that
1132 * we already checked that "ell" holds a long long, or tried to convert another
1133 * representation into a long long value. When this was successful,
1134 * OPVAL_VALID_LL is set as well. */
1135 #define OPVAL_DIRTY_ROBJ 1
1136 #define OPVAL_DIRTY_LL 2
1137 #define OPVAL_VALID_LL 4
1139 /* Store value retrieved from the iterator. */
1142 unsigned char _buf
[32]; /* Private buffer. */
1144 unsigned char *estr
;
1150 typedef union _iterset iterset
;
1151 typedef union _iterzset iterzset
;
1153 void zuiInitIterator(zsetopsrc
*op
) {
1154 if (op
->subject
== NULL
)
1157 if (op
->type
== REDIS_SET
) {
1158 iterset
*it
= &op
->iter
.set
;
1159 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1160 it
->is
.is
= op
->subject
->ptr
;
1162 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1163 it
->ht
.dict
= op
->subject
->ptr
;
1164 it
->ht
.di
= dictGetIterator(op
->subject
->ptr
);
1165 it
->ht
.de
= dictNext(it
->ht
.di
);
1167 redisPanic("Unknown set encoding");
1169 } else if (op
->type
== REDIS_ZSET
) {
1170 iterzset
*it
= &op
->iter
.zset
;
1171 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1172 it
->zl
.zl
= op
->subject
->ptr
;
1173 it
->zl
.eptr
= ziplistIndex(it
->zl
.zl
,0);
1174 if (it
->zl
.eptr
!= NULL
) {
1175 it
->zl
.sptr
= ziplistNext(it
->zl
.zl
,it
->zl
.eptr
);
1176 redisAssert(it
->zl
.sptr
!= NULL
);
1178 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1179 it
->sl
.zs
= op
->subject
->ptr
;
1180 it
->sl
.node
= it
->sl
.zs
->zsl
->header
->level
[0].forward
;
1182 redisPanic("Unknown sorted set encoding");
1185 redisPanic("Unsupported type");
1189 void zuiClearIterator(zsetopsrc
*op
) {
1190 if (op
->subject
== NULL
)
1193 if (op
->type
== REDIS_SET
) {
1194 iterset
*it
= &op
->iter
.set
;
1195 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1196 REDIS_NOTUSED(it
); /* skip */
1197 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1198 dictReleaseIterator(it
->ht
.di
);
1200 redisPanic("Unknown set encoding");
1202 } else if (op
->type
== REDIS_ZSET
) {
1203 iterzset
*it
= &op
->iter
.zset
;
1204 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1205 REDIS_NOTUSED(it
); /* skip */
1206 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1207 REDIS_NOTUSED(it
); /* skip */
1209 redisPanic("Unknown sorted set encoding");
1212 redisPanic("Unsupported type");
1216 int zuiLength(zsetopsrc
*op
) {
1217 if (op
->subject
== NULL
)
1220 if (op
->type
== REDIS_SET
) {
1221 iterset
*it
= &op
->iter
.set
;
1222 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1223 return intsetLen(it
->is
.is
);
1224 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1225 return dictSize(it
->ht
.dict
);
1227 redisPanic("Unknown set encoding");
1229 } else if (op
->type
== REDIS_ZSET
) {
1230 iterzset
*it
= &op
->iter
.zset
;
1231 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1232 return zzlLength(it
->zl
.zl
);
1233 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1234 return it
->sl
.zs
->zsl
->length
;
1236 redisPanic("Unknown sorted set encoding");
1239 redisPanic("Unsupported type");
1243 /* Check if the current value is valid. If so, store it in the passed structure
1244 * and move to the next element. If not valid, this means we have reached the
1245 * end of the structure and can abort. */
1246 int zuiNext(zsetopsrc
*op
, zsetopval
*val
) {
1247 if (op
->subject
== NULL
)
1250 if (val
->flags
& OPVAL_DIRTY_ROBJ
)
1251 decrRefCount(val
->ele
);
1253 bzero(val
,sizeof(zsetopval
));
1255 if (op
->type
== REDIS_SET
) {
1256 iterset
*it
= &op
->iter
.set
;
1257 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1258 if (!intsetGet(it
->is
.is
,it
->is
.ii
,(int64_t*)&val
->ell
))
1262 /* Move to next element. */
1264 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1265 if (it
->ht
.de
== NULL
)
1267 val
->ele
= dictGetKey(it
->ht
.de
);
1270 /* Move to next element. */
1271 it
->ht
.de
= dictNext(it
->ht
.di
);
1273 redisPanic("Unknown set encoding");
1275 } else if (op
->type
== REDIS_ZSET
) {
1276 iterzset
*it
= &op
->iter
.zset
;
1277 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1278 /* No need to check both, but better be explicit. */
1279 if (it
->zl
.eptr
== NULL
|| it
->zl
.sptr
== NULL
)
1281 redisAssert(ziplistGet(it
->zl
.eptr
,&val
->estr
,&val
->elen
,&val
->ell
));
1282 val
->score
= zzlGetScore(it
->zl
.sptr
);
1284 /* Move to next element. */
1285 zzlNext(it
->zl
.zl
,&it
->zl
.eptr
,&it
->zl
.sptr
);
1286 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1287 if (it
->sl
.node
== NULL
)
1289 val
->ele
= it
->sl
.node
->obj
;
1290 val
->score
= it
->sl
.node
->score
;
1292 /* Move to next element. */
1293 it
->sl
.node
= it
->sl
.node
->level
[0].forward
;
1295 redisPanic("Unknown sorted set encoding");
1298 redisPanic("Unsupported type");
1303 int zuiLongLongFromValue(zsetopval
*val
) {
1304 if (!(val
->flags
& OPVAL_DIRTY_LL
)) {
1305 val
->flags
|= OPVAL_DIRTY_LL
;
1307 if (val
->ele
!= NULL
) {
1308 if (val
->ele
->encoding
== REDIS_ENCODING_INT
) {
1309 val
->ell
= (long)val
->ele
->ptr
;
1310 val
->flags
|= OPVAL_VALID_LL
;
1311 } else if (val
->ele
->encoding
== REDIS_ENCODING_RAW
) {
1312 if (string2ll(val
->ele
->ptr
,sdslen(val
->ele
->ptr
),&val
->ell
))
1313 val
->flags
|= OPVAL_VALID_LL
;
1315 redisPanic("Unsupported element encoding");
1317 } else if (val
->estr
!= NULL
) {
1318 if (string2ll((char*)val
->estr
,val
->elen
,&val
->ell
))
1319 val
->flags
|= OPVAL_VALID_LL
;
1321 /* The long long was already set, flag as valid. */
1322 val
->flags
|= OPVAL_VALID_LL
;
1325 return val
->flags
& OPVAL_VALID_LL
;
1328 robj
*zuiObjectFromValue(zsetopval
*val
) {
1329 if (val
->ele
== NULL
) {
1330 if (val
->estr
!= NULL
) {
1331 val
->ele
= createStringObject((char*)val
->estr
,val
->elen
);
1333 val
->ele
= createStringObjectFromLongLong(val
->ell
);
1335 val
->flags
|= OPVAL_DIRTY_ROBJ
;
1340 int zuiBufferFromValue(zsetopval
*val
) {
1341 if (val
->estr
== NULL
) {
1342 if (val
->ele
!= NULL
) {
1343 if (val
->ele
->encoding
== REDIS_ENCODING_INT
) {
1344 val
->elen
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),(long)val
->ele
->ptr
);
1345 val
->estr
= val
->_buf
;
1346 } else if (val
->ele
->encoding
== REDIS_ENCODING_RAW
) {
1347 val
->elen
= sdslen(val
->ele
->ptr
);
1348 val
->estr
= val
->ele
->ptr
;
1350 redisPanic("Unsupported element encoding");
1353 val
->elen
= ll2string((char*)val
->_buf
,sizeof(val
->_buf
),val
->ell
);
1354 val
->estr
= val
->_buf
;
1360 /* Find value pointed to by val in the source pointer to by op. When found,
1361 * return 1 and store its score in target. Return 0 otherwise. */
1362 int zuiFind(zsetopsrc
*op
, zsetopval
*val
, double *score
) {
1363 if (op
->subject
== NULL
)
1366 if (op
->type
== REDIS_SET
) {
1367 iterset
*it
= &op
->iter
.set
;
1369 if (op
->encoding
== REDIS_ENCODING_INTSET
) {
1370 if (zuiLongLongFromValue(val
) && intsetFind(it
->is
.is
,val
->ell
)) {
1376 } else if (op
->encoding
== REDIS_ENCODING_HT
) {
1377 zuiObjectFromValue(val
);
1378 if (dictFind(it
->ht
.dict
,val
->ele
) != NULL
) {
1385 redisPanic("Unknown set encoding");
1387 } else if (op
->type
== REDIS_ZSET
) {
1388 iterzset
*it
= &op
->iter
.zset
;
1389 zuiObjectFromValue(val
);
1391 if (op
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1392 if (zzlFind(it
->zl
.zl
,val
->ele
,score
) != NULL
) {
1393 /* Score is already set by zzlFind. */
1398 } else if (op
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1400 if ((de
= dictFind(it
->sl
.zs
->dict
,val
->ele
)) != NULL
) {
1401 *score
= *(double*)dictGetVal(de
);
1407 redisPanic("Unknown sorted set encoding");
1410 redisPanic("Unsupported type");
1414 int zuiCompareByCardinality(const void *s1
, const void *s2
) {
1415 return zuiLength((zsetopsrc
*)s1
) - zuiLength((zsetopsrc
*)s2
);
1418 #define REDIS_AGGR_SUM 1
1419 #define REDIS_AGGR_MIN 2
1420 #define REDIS_AGGR_MAX 3
1421 #define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e))
1423 inline static void zunionInterAggregate(double *target
, double val
, int aggregate
) {
1424 if (aggregate
== REDIS_AGGR_SUM
) {
1425 *target
= *target
+ val
;
1426 /* The result of adding two doubles is NaN when one variable
1427 * is +inf and the other is -inf. When these numbers are added,
1428 * we maintain the convention of the result being 0.0. */
1429 if (isnan(*target
)) *target
= 0.0;
1430 } else if (aggregate
== REDIS_AGGR_MIN
) {
1431 *target
= val
< *target
? val
: *target
;
1432 } else if (aggregate
== REDIS_AGGR_MAX
) {
1433 *target
= val
> *target
? val
: *target
;
1436 redisPanic("Unknown ZUNION/INTER aggregate type");
1440 void zunionInterGenericCommand(redisClient
*c
, robj
*dstkey
, int op
) {
1443 int aggregate
= REDIS_AGGR_SUM
;
1447 unsigned int maxelelen
= 0;
1450 zskiplistNode
*znode
;
1453 /* expect setnum input keys to be given */
1454 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &setnum
, NULL
) != REDIS_OK
))
1459 "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
1463 /* test if the expected number of keys would overflow */
1464 if (3+setnum
> c
->argc
) {
1465 addReply(c
,shared
.syntaxerr
);
1469 /* read keys to be used for input */
1470 src
= zcalloc(sizeof(zsetopsrc
) * setnum
);
1471 for (i
= 0, j
= 3; i
< setnum
; i
++, j
++) {
1472 robj
*obj
= lookupKeyWrite(c
->db
,c
->argv
[j
]);
1474 if (obj
->type
!= REDIS_ZSET
&& obj
->type
!= REDIS_SET
) {
1476 addReply(c
,shared
.wrongtypeerr
);
1480 src
[i
].subject
= obj
;
1481 src
[i
].type
= obj
->type
;
1482 src
[i
].encoding
= obj
->encoding
;
1484 src
[i
].subject
= NULL
;
1487 /* Default all weights to 1. */
1488 src
[i
].weight
= 1.0;
1491 /* parse optional extra arguments */
1493 int remaining
= c
->argc
- j
;
1496 if (remaining
>= (setnum
+ 1) && !strcasecmp(c
->argv
[j
]->ptr
,"weights")) {
1498 for (i
= 0; i
< setnum
; i
++, j
++, remaining
--) {
1499 if (getDoubleFromObjectOrReply(c
,c
->argv
[j
],&src
[i
].weight
,
1500 "weight value is not a float") != REDIS_OK
)
1506 } else if (remaining
>= 2 && !strcasecmp(c
->argv
[j
]->ptr
,"aggregate")) {
1508 if (!strcasecmp(c
->argv
[j
]->ptr
,"sum")) {
1509 aggregate
= REDIS_AGGR_SUM
;
1510 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"min")) {
1511 aggregate
= REDIS_AGGR_MIN
;
1512 } else if (!strcasecmp(c
->argv
[j
]->ptr
,"max")) {
1513 aggregate
= REDIS_AGGR_MAX
;
1516 addReply(c
,shared
.syntaxerr
);
1522 addReply(c
,shared
.syntaxerr
);
1528 for (i
= 0; i
< setnum
; i
++)
1529 zuiInitIterator(&src
[i
]);
1531 /* sort sets from the smallest to largest, this will improve our
1532 * algorithm's performance */
1533 qsort(src
,setnum
,sizeof(zsetopsrc
),zuiCompareByCardinality
);
1535 dstobj
= createZsetObject();
1536 dstzset
= dstobj
->ptr
;
1537 memset(&zval
, 0, sizeof(zval
));
1539 if (op
== REDIS_OP_INTER
) {
1540 /* Skip everything if the smallest input is empty. */
1541 if (zuiLength(&src
[0]) > 0) {
1542 /* Precondition: as src[0] is non-empty and the inputs are ordered
1543 * by size, all src[i > 0] are non-empty too. */
1544 while (zuiNext(&src
[0],&zval
)) {
1545 double score
, value
;
1547 score
= src
[0].weight
* zval
.score
;
1548 if (isnan(score
)) score
= 0;
1550 for (j
= 1; j
< setnum
; j
++) {
1551 /* It is not safe to access the zset we are
1552 * iterating, so explicitly check for equal object. */
1553 if (src
[j
].subject
== src
[0].subject
) {
1554 value
= zval
.score
*src
[j
].weight
;
1555 zunionInterAggregate(&score
,value
,aggregate
);
1556 } else if (zuiFind(&src
[j
],&zval
,&value
)) {
1557 value
*= src
[j
].weight
;
1558 zunionInterAggregate(&score
,value
,aggregate
);
1564 /* Only continue when present in every input. */
1566 tmp
= zuiObjectFromValue(&zval
);
1567 znode
= zslInsert(dstzset
->zsl
,score
,tmp
);
1568 incrRefCount(tmp
); /* added to skiplist */
1569 dictAdd(dstzset
->dict
,tmp
,&znode
->score
);
1570 incrRefCount(tmp
); /* added to dictionary */
1572 if (tmp
->encoding
== REDIS_ENCODING_RAW
)
1573 if (sdslen(tmp
->ptr
) > maxelelen
)
1574 maxelelen
= sdslen(tmp
->ptr
);
1578 } else if (op
== REDIS_OP_UNION
) {
1579 for (i
= 0; i
< setnum
; i
++) {
1580 if (zuiLength(&src
[i
]) == 0)
1583 while (zuiNext(&src
[i
],&zval
)) {
1584 double score
, value
;
1586 /* Skip key when already processed */
1587 if (dictFind(dstzset
->dict
,zuiObjectFromValue(&zval
)) != NULL
)
1590 /* Initialize score */
1591 score
= src
[i
].weight
* zval
.score
;
1592 if (isnan(score
)) score
= 0;
1594 /* Because the inputs are sorted by size, it's only possible
1595 * for sets at larger indices to hold this element. */
1596 for (j
= (i
+1); j
< setnum
; j
++) {
1597 /* It is not safe to access the zset we are
1598 * iterating, so explicitly check for equal object. */
1599 if(src
[j
].subject
== src
[i
].subject
) {
1600 value
= zval
.score
*src
[j
].weight
;
1601 zunionInterAggregate(&score
,value
,aggregate
);
1602 } else if (zuiFind(&src
[j
],&zval
,&value
)) {
1603 value
*= src
[j
].weight
;
1604 zunionInterAggregate(&score
,value
,aggregate
);
1608 tmp
= zuiObjectFromValue(&zval
);
1609 znode
= zslInsert(dstzset
->zsl
,score
,tmp
);
1610 incrRefCount(zval
.ele
); /* added to skiplist */
1611 dictAdd(dstzset
->dict
,tmp
,&znode
->score
);
1612 incrRefCount(zval
.ele
); /* added to dictionary */
1614 if (tmp
->encoding
== REDIS_ENCODING_RAW
)
1615 if (sdslen(tmp
->ptr
) > maxelelen
)
1616 maxelelen
= sdslen(tmp
->ptr
);
1620 redisPanic("Unknown operator");
1623 for (i
= 0; i
< setnum
; i
++)
1624 zuiClearIterator(&src
[i
]);
1626 if (dbDelete(c
->db
,dstkey
)) {
1627 signalModifiedKey(c
->db
,dstkey
);
1631 if (dstzset
->zsl
->length
) {
1632 /* Convert to ziplist when in limits. */
1633 if (dstzset
->zsl
->length
<= server
.zset_max_ziplist_entries
&&
1634 maxelelen
<= server
.zset_max_ziplist_value
)
1635 zsetConvert(dstobj
,REDIS_ENCODING_ZIPLIST
);
1637 dbAdd(c
->db
,dstkey
,dstobj
);
1638 addReplyLongLong(c
,zsetLength(dstobj
));
1639 if (!touched
) signalModifiedKey(c
->db
,dstkey
);
1642 decrRefCount(dstobj
);
1643 addReply(c
,shared
.czero
);
1648 void zunionstoreCommand(redisClient
*c
) {
1649 zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_UNION
);
1652 void zinterstoreCommand(redisClient
*c
) {
1653 zunionInterGenericCommand(c
,c
->argv
[1], REDIS_OP_INTER
);
1656 void zrangeGenericCommand(redisClient
*c
, int reverse
) {
1657 robj
*key
= c
->argv
[1];
1665 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &start
, NULL
) != REDIS_OK
) ||
1666 (getLongFromObjectOrReply(c
, c
->argv
[3], &end
, NULL
) != REDIS_OK
)) return;
1668 if (c
->argc
== 5 && !strcasecmp(c
->argv
[4]->ptr
,"withscores")) {
1670 } else if (c
->argc
>= 5) {
1671 addReply(c
,shared
.syntaxerr
);
1675 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
1676 || checkType(c
,zobj
,REDIS_ZSET
)) return;
1678 /* Sanitize indexes. */
1679 llen
= zsetLength(zobj
);
1680 if (start
< 0) start
= llen
+start
;
1681 if (end
< 0) end
= llen
+end
;
1682 if (start
< 0) start
= 0;
1684 /* Invariant: start >= 0, so this test will be true when end < 0.
1685 * The range is empty when start > end or start >= length. */
1686 if (start
> end
|| start
>= llen
) {
1687 addReply(c
,shared
.emptymultibulk
);
1690 if (end
>= llen
) end
= llen
-1;
1691 rangelen
= (end
-start
)+1;
1693 /* Return the result in form of a multi-bulk reply */
1694 addReplyMultiBulkLen(c
, withscores
? (rangelen
*2) : rangelen
);
1696 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1697 unsigned char *zl
= zobj
->ptr
;
1698 unsigned char *eptr
, *sptr
;
1699 unsigned char *vstr
;
1704 eptr
= ziplistIndex(zl
,-2-(2*start
));
1706 eptr
= ziplistIndex(zl
,2*start
);
1708 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
1709 sptr
= ziplistNext(zl
,eptr
);
1711 while (rangelen
--) {
1712 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
&& sptr
!= NULL
);
1713 redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
1715 addReplyBulkLongLong(c
,vlong
);
1717 addReplyBulkCBuffer(c
,vstr
,vlen
);
1720 addReplyDouble(c
,zzlGetScore(sptr
));
1723 zzlPrev(zl
,&eptr
,&sptr
);
1725 zzlNext(zl
,&eptr
,&sptr
);
1728 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1729 zset
*zs
= zobj
->ptr
;
1730 zskiplist
*zsl
= zs
->zsl
;
1734 /* Check if starting point is trivial, before doing log(N) lookup. */
1738 ln
= zslGetElementByRank(zsl
,llen
-start
);
1740 ln
= zsl
->header
->level
[0].forward
;
1742 ln
= zslGetElementByRank(zsl
,start
+1);
1746 redisAssertWithInfo(c
,zobj
,ln
!= NULL
);
1748 addReplyBulk(c
,ele
);
1750 addReplyDouble(c
,ln
->score
);
1751 ln
= reverse
? ln
->backward
: ln
->level
[0].forward
;
1754 redisPanic("Unknown sorted set encoding");
1758 void zrangeCommand(redisClient
*c
) {
1759 zrangeGenericCommand(c
,0);
1762 void zrevrangeCommand(redisClient
*c
) {
1763 zrangeGenericCommand(c
,1);
1766 /* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
1767 void genericZrangebyscoreCommand(redisClient
*c
, int reverse
) {
1769 robj
*key
= c
->argv
[1];
1771 long offset
= 0, limit
= -1;
1773 unsigned long rangelen
= 0;
1774 void *replylen
= NULL
;
1777 /* Parse the range arguments. */
1779 /* Range is given as [max,min] */
1780 maxidx
= 2; minidx
= 3;
1782 /* Range is given as [min,max] */
1783 minidx
= 2; maxidx
= 3;
1786 if (zslParseRange(c
->argv
[minidx
],c
->argv
[maxidx
],&range
) != REDIS_OK
) {
1787 addReplyError(c
,"min or max is not a float");
1791 /* Parse optional extra arguments. Note that ZCOUNT will exactly have
1792 * 4 arguments, so we'll never enter the following code path. */
1794 int remaining
= c
->argc
- 4;
1798 if (remaining
>= 1 && !strcasecmp(c
->argv
[pos
]->ptr
,"withscores")) {
1801 } else if (remaining
>= 3 && !strcasecmp(c
->argv
[pos
]->ptr
,"limit")) {
1802 if ((getLongFromObjectOrReply(c
, c
->argv
[pos
+1], &offset
, NULL
) != REDIS_OK
) ||
1803 (getLongFromObjectOrReply(c
, c
->argv
[pos
+2], &limit
, NULL
) != REDIS_OK
)) return;
1804 pos
+= 3; remaining
-= 3;
1806 addReply(c
,shared
.syntaxerr
);
1812 /* Ok, lookup the key and get the range */
1813 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.emptymultibulk
)) == NULL
||
1814 checkType(c
,zobj
,REDIS_ZSET
)) return;
1816 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1817 unsigned char *zl
= zobj
->ptr
;
1818 unsigned char *eptr
, *sptr
;
1819 unsigned char *vstr
;
1824 /* If reversed, get the last node in range as starting point. */
1826 eptr
= zzlLastInRange(zl
,range
);
1828 eptr
= zzlFirstInRange(zl
,range
);
1831 /* No "first" element in the specified interval. */
1833 addReply(c
, shared
.emptymultibulk
);
1837 /* Get score pointer for the first element. */
1838 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
1839 sptr
= ziplistNext(zl
,eptr
);
1841 /* We don't know in advance how many matching elements there are in the
1842 * list, so we push this object that will represent the multi-bulk
1843 * length in the output buffer, and will "fix" it later */
1844 replylen
= addDeferredMultiBulkLength(c
);
1846 /* If there is an offset, just traverse the number of elements without
1847 * checking the score because that is done in the next loop. */
1848 while (eptr
&& offset
--) {
1850 zzlPrev(zl
,&eptr
,&sptr
);
1852 zzlNext(zl
,&eptr
,&sptr
);
1856 while (eptr
&& limit
--) {
1857 score
= zzlGetScore(sptr
);
1859 /* Abort when the node is no longer in range. */
1861 if (!zslValueGteMin(score
,&range
)) break;
1863 if (!zslValueLteMax(score
,&range
)) break;
1866 /* We know the element exists, so ziplistGet should always succeed */
1867 redisAssertWithInfo(c
,zobj
,ziplistGet(eptr
,&vstr
,&vlen
,&vlong
));
1871 addReplyBulkLongLong(c
,vlong
);
1873 addReplyBulkCBuffer(c
,vstr
,vlen
);
1877 addReplyDouble(c
,score
);
1880 /* Move to next node */
1882 zzlPrev(zl
,&eptr
,&sptr
);
1884 zzlNext(zl
,&eptr
,&sptr
);
1887 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
1888 zset
*zs
= zobj
->ptr
;
1889 zskiplist
*zsl
= zs
->zsl
;
1892 /* If reversed, get the last node in range as starting point. */
1894 ln
= zslLastInRange(zsl
,range
);
1896 ln
= zslFirstInRange(zsl
,range
);
1899 /* No "first" element in the specified interval. */
1901 addReply(c
, shared
.emptymultibulk
);
1905 /* We don't know in advance how many matching elements there are in the
1906 * list, so we push this object that will represent the multi-bulk
1907 * length in the output buffer, and will "fix" it later */
1908 replylen
= addDeferredMultiBulkLength(c
);
1910 /* If there is an offset, just traverse the number of elements without
1911 * checking the score because that is done in the next loop. */
1912 while (ln
&& offset
--) {
1916 ln
= ln
->level
[0].forward
;
1920 while (ln
&& limit
--) {
1921 /* Abort when the node is no longer in range. */
1923 if (!zslValueGteMin(ln
->score
,&range
)) break;
1925 if (!zslValueLteMax(ln
->score
,&range
)) break;
1929 addReplyBulk(c
,ln
->obj
);
1932 addReplyDouble(c
,ln
->score
);
1935 /* Move to next node */
1939 ln
= ln
->level
[0].forward
;
1943 redisPanic("Unknown sorted set encoding");
1950 setDeferredMultiBulkLength(c
, replylen
, rangelen
);
1953 void zrangebyscoreCommand(redisClient
*c
) {
1954 genericZrangebyscoreCommand(c
,0);
1957 void zrevrangebyscoreCommand(redisClient
*c
) {
1958 genericZrangebyscoreCommand(c
,1);
1961 void zcountCommand(redisClient
*c
) {
1962 robj
*key
= c
->argv
[1];
1967 /* Parse the range arguments */
1968 if (zslParseRange(c
->argv
[2],c
->argv
[3],&range
) != REDIS_OK
) {
1969 addReplyError(c
,"min or max is not a float");
1973 /* Lookup the sorted set */
1974 if ((zobj
= lookupKeyReadOrReply(c
, key
, shared
.czero
)) == NULL
||
1975 checkType(c
, zobj
, REDIS_ZSET
)) return;
1977 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
1978 unsigned char *zl
= zobj
->ptr
;
1979 unsigned char *eptr
, *sptr
;
1982 /* Use the first element in range as the starting point */
1983 eptr
= zzlFirstInRange(zl
,range
);
1985 /* No "first" element */
1987 addReply(c
, shared
.czero
);
1991 /* First element is in range */
1992 sptr
= ziplistNext(zl
,eptr
);
1993 score
= zzlGetScore(sptr
);
1994 redisAssertWithInfo(c
,zobj
,zslValueLteMax(score
,&range
));
1996 /* Iterate over elements in range */
1998 score
= zzlGetScore(sptr
);
2000 /* Abort when the node is no longer in range. */
2001 if (!zslValueLteMax(score
,&range
)) {
2005 zzlNext(zl
,&eptr
,&sptr
);
2008 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2009 zset
*zs
= zobj
->ptr
;
2010 zskiplist
*zsl
= zs
->zsl
;
2014 /* Find first element in range */
2015 zn
= zslFirstInRange(zsl
, range
);
2017 /* Use rank of first element, if any, to determine preliminary count */
2019 rank
= zslGetRank(zsl
, zn
->score
, zn
->obj
);
2020 count
= (zsl
->length
- (rank
- 1));
2022 /* Find last element in range */
2023 zn
= zslLastInRange(zsl
, range
);
2025 /* Use rank of last element, if any, to determine the actual count */
2027 rank
= zslGetRank(zsl
, zn
->score
, zn
->obj
);
2028 count
-= (zsl
->length
- rank
);
2032 redisPanic("Unknown sorted set encoding");
2035 addReplyLongLong(c
, count
);
2038 void zcardCommand(redisClient
*c
) {
2039 robj
*key
= c
->argv
[1];
2042 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.czero
)) == NULL
||
2043 checkType(c
,zobj
,REDIS_ZSET
)) return;
2045 addReplyLongLong(c
,zsetLength(zobj
));
2048 void zscoreCommand(redisClient
*c
) {
2049 robj
*key
= c
->argv
[1];
2053 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL
||
2054 checkType(c
,zobj
,REDIS_ZSET
)) return;
2056 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
2057 if (zzlFind(zobj
->ptr
,c
->argv
[2],&score
) != NULL
)
2058 addReplyDouble(c
,score
);
2060 addReply(c
,shared
.nullbulk
);
2061 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2062 zset
*zs
= zobj
->ptr
;
2065 c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
2066 de
= dictFind(zs
->dict
,c
->argv
[2]);
2068 score
= *(double*)dictGetVal(de
);
2069 addReplyDouble(c
,score
);
2071 addReply(c
,shared
.nullbulk
);
2074 redisPanic("Unknown sorted set encoding");
2078 void zrankGenericCommand(redisClient
*c
, int reverse
) {
2079 robj
*key
= c
->argv
[1];
2080 robj
*ele
= c
->argv
[2];
2085 if ((zobj
= lookupKeyReadOrReply(c
,key
,shared
.nullbulk
)) == NULL
||
2086 checkType(c
,zobj
,REDIS_ZSET
)) return;
2087 llen
= zsetLength(zobj
);
2089 redisAssertWithInfo(c
,ele
,ele
->encoding
== REDIS_ENCODING_RAW
);
2090 if (zobj
->encoding
== REDIS_ENCODING_ZIPLIST
) {
2091 unsigned char *zl
= zobj
->ptr
;
2092 unsigned char *eptr
, *sptr
;
2094 eptr
= ziplistIndex(zl
,0);
2095 redisAssertWithInfo(c
,zobj
,eptr
!= NULL
);
2096 sptr
= ziplistNext(zl
,eptr
);
2097 redisAssertWithInfo(c
,zobj
,sptr
!= NULL
);
2100 while(eptr
!= NULL
) {
2101 if (ziplistCompare(eptr
,ele
->ptr
,sdslen(ele
->ptr
)))
2104 zzlNext(zl
,&eptr
,&sptr
);
2109 addReplyLongLong(c
,llen
-rank
);
2111 addReplyLongLong(c
,rank
-1);
2113 addReply(c
,shared
.nullbulk
);
2115 } else if (zobj
->encoding
== REDIS_ENCODING_SKIPLIST
) {
2116 zset
*zs
= zobj
->ptr
;
2117 zskiplist
*zsl
= zs
->zsl
;
2121 ele
= c
->argv
[2] = tryObjectEncoding(c
->argv
[2]);
2122 de
= dictFind(zs
->dict
,ele
);
2124 score
= *(double*)dictGetVal(de
);
2125 rank
= zslGetRank(zsl
,score
,ele
);
2126 redisAssertWithInfo(c
,ele
,rank
); /* Existing elements always have a rank. */
2128 addReplyLongLong(c
,llen
-rank
);
2130 addReplyLongLong(c
,rank
-1);
2132 addReply(c
,shared
.nullbulk
);
2135 redisPanic("Unknown sorted set encoding");
2139 void zrankCommand(redisClient
*c
) {
2140 zrankGenericCommand(c
, 0);
2143 void zrevrankCommand(redisClient
*c
) {
2144 zrankGenericCommand(c
, 1);