X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/5d1b4fb6983f5acd9cfb6ee5f5715547688448d2..497fc8775fd4b85289a6998bb4eaddbe657e6be7:/src/t_zset.c diff --git a/src/t_zset.c b/src/t_zset.c index 6b2b6d9f..a5dc27c7 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -174,12 +174,6 @@ int zslDelete(zskiplist *zsl, double score, robj *obj) { return 0; /* not found */ } -/* Struct to hold a inclusive/exclusive range spec. */ -typedef struct { - double min, max; - int minex, maxex; /* are min or max exclusive? */ -} zrangespec; - static int zslValueGteMin(double value, zrangespec *spec) { return spec->minex ? (value > spec->min) : (value >= spec->min); } @@ -188,10 +182,6 @@ static int zslValueLteMax(double value, zrangespec *spec) { return spec->maxex ? (value < spec->max) : (value <= spec->max); } -static int zslValueInRange(double value, zrangespec *spec) { - return zslValueGteMin(value,spec) && zslValueLteMax(value,spec); -} - /* Returns if there is a part of the zset is in range. */ int zslIsInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; @@ -226,10 +216,12 @@ zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) { x = x->level[i].forward; } - /* The tail is in range, so the previous block should always return a - * node that is non-NULL and the last one to be out of range. */ + /* This is an inner range, so the next node cannot be NULL. */ x = x->level[0].forward; - redisAssert(x != NULL && zslValueInRange(x->score,&range)); + redisAssert(x != NULL); + + /* Check if score <= max. */ + if (!zslValueLteMax(x->score,&range)) return NULL; return x; } @@ -250,9 +242,11 @@ zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) { x = x->level[i].forward; } - /* The header is in range, so the previous block should always return a - * node that is non-NULL and in range. */ - redisAssert(x != NULL && zslValueInRange(x->score,&range)); + /* This is an inner range, so this node cannot be NULL. */ + redisAssert(x != NULL); + + /* Check if score >= min. */ + if (!zslValueGteMin(x->score,&range)) return NULL; return x; } @@ -449,11 +443,48 @@ int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int cl return cmp; } -unsigned int zzlLength(robj *zobj) { - unsigned char *zl = zobj->ptr; +unsigned int zzlLength(unsigned char *zl) { return ziplistLen(zl)/2; } +/* Move to next entry based on the values in eptr and sptr. Both are set to + * NULL when there is no next entry. */ +void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) { + unsigned char *_eptr, *_sptr; + redisAssert(*eptr != NULL && *sptr != NULL); + + _eptr = ziplistNext(zl,*sptr); + if (_eptr != NULL) { + _sptr = ziplistNext(zl,_eptr); + redisAssert(_sptr != NULL); + } else { + /* No next entry. */ + _sptr = NULL; + } + + *eptr = _eptr; + *sptr = _sptr; +} + +/* Move to the previous entry based on the values in eptr and sptr. Both are + * set to NULL when there is no next entry. */ +void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) { + unsigned char *_eptr, *_sptr; + redisAssert(*eptr != NULL && *sptr != NULL); + + _sptr = ziplistPrev(zl,*eptr); + if (_sptr != NULL) { + _eptr = ziplistPrev(zl,_sptr); + redisAssert(_eptr != NULL); + } else { + /* No previous entry. */ + _eptr = NULL; + } + + *eptr = _eptr; + *sptr = _sptr; +} + /* Returns if there is a part of the zset is in range. Should only be used * internally by zzlFirstInRange and zzlLastInRange. */ int zzlIsInRange(unsigned char *zl, zrangespec *range) { @@ -482,8 +513,7 @@ int zzlIsInRange(unsigned char *zl, zrangespec *range) { /* Find pointer to the first element contained in the specified range. * Returns NULL when no element is contained in the range. */ -unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; double score; @@ -495,8 +525,12 @@ unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) { redisAssert(sptr != NULL); score = zzlGetScore(sptr); - if (zslValueGteMin(score,&range)) - return eptr; + if (zslValueGteMin(score,&range)) { + /* Check if score <= max. */ + if (zslValueLteMax(score,&range)) + return eptr; + return NULL; + } /* Move to next element. */ eptr = ziplistNext(zl,sptr); @@ -507,8 +541,7 @@ unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) { /* Find pointer to the last element contained in the specified range. * Returns NULL when no element is contained in the range. */ -unsigned char *zzlLastInRange(robj *zobj, zrangespec range) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) { unsigned char *eptr = ziplistIndex(zl,-2), *sptr; double score; @@ -520,8 +553,12 @@ unsigned char *zzlLastInRange(robj *zobj, zrangespec range) { redisAssert(sptr != NULL); score = zzlGetScore(sptr); - if (zslValueLteMax(score,&range)) - return eptr; + if (zslValueLteMax(score,&range)) { + /* Check if score >= min. */ + if (zslValueGteMin(score,&range)) + return eptr; + return NULL; + } /* Move to previous element by moving to the score of previous element. * When this returns NULL, we know there also is no element. */ @@ -535,8 +572,7 @@ unsigned char *zzlLastInRange(robj *zobj, zrangespec range) { return NULL; } -unsigned char *zzlFind(robj *zobj, robj *ele, double *score) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; ele = getDecodedObject(ele); @@ -561,23 +597,20 @@ unsigned char *zzlFind(robj *zobj, robj *ele, double *score) { /* Delete (element,score) pair from ziplist. Use local copy of eptr because we * don't want to modify the one given as argument. */ -int zzlDelete(robj *zobj, unsigned char *eptr) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) { unsigned char *p = eptr; /* TODO: add function to ziplist API to delete N elements from offset. */ zl = ziplistDelete(zl,&p); zl = ziplistDelete(zl,&p); - zobj->ptr = zl; - return REDIS_OK; + return zl; } -int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) { unsigned char *sptr; char scorebuf[128]; int scorelen; - int offset; + size_t offset; redisAssert(ele->encoding == REDIS_ENCODING_RAW); scorelen = d2string(scorebuf,sizeof(scorebuf),score); @@ -595,14 +628,12 @@ int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) { zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen); } - zobj->ptr = zl; - return REDIS_OK; + return zl; } /* Insert (element,score) pair in ziplist. This function assumes the element is * not yet present in the list. */ -int zzlInsert(robj *zobj, robj *ele, double score) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; double s; @@ -616,12 +647,12 @@ int zzlInsert(robj *zobj, robj *ele, double score) { /* First element with score larger than score for element to be * inserted. This means we should take its spot in the list to * maintain ordering. */ - zzlInsertAt(zobj,ele,score,eptr); + zl = zzlInsertAt(zl,eptr,ele,score); break; } else if (s == score) { /* Ensure lexicographical ordering for elements. */ - if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) < 0) { - zzlInsertAt(zobj,ele,score,eptr); + if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) { + zl = zzlInsertAt(zl,eptr,ele,score); break; } } @@ -632,21 +663,21 @@ int zzlInsert(robj *zobj, robj *ele, double score) { /* Push on tail of list when it was not yet inserted. */ if (eptr == NULL) - zzlInsertAt(zobj,ele,score,NULL); + zl = zzlInsertAt(zl,NULL,ele,score); decrRefCount(ele); - return REDIS_OK; + return zl; } -unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) { unsigned char *eptr, *sptr; double score; - unsigned long deleted = 0; + unsigned long num = 0; - eptr = zzlFirstInRange(zobj,range); - if (eptr == NULL) return deleted; + if (deleted != NULL) *deleted = 0; + eptr = zzlFirstInRange(zl,range); + if (eptr == NULL) return zl; /* When the tail of the ziplist is deleted, eptr will point to the sentinel * byte and ziplistNext will return NULL. */ @@ -656,25 +687,35 @@ unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) { /* Delete both the element and the score. */ zl = ziplistDelete(zl,&eptr); zl = ziplistDelete(zl,&eptr); - deleted++; + num++; } else { /* No longer in range. */ break; } } - return deleted; + if (deleted != NULL) *deleted = num; + return zl; +} + +/* Delete all the elements with rank between start and end from the skiplist. + * Start and end are inclusive. Note that start and end need to be 1-based */ +unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) { + unsigned int num = (end-start)+1; + if (deleted) *deleted = num; + zl = ziplistDeleteRange(zl,2*(start-1),2*num); + return zl; } /*----------------------------------------------------------------------------- * Common sorted set API *----------------------------------------------------------------------------*/ -int zsLength(robj *zobj) { +unsigned int zsetLength(robj *zobj) { int length = -1; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { - length = zzlLength(zobj); - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + length = zzlLength(zobj->ptr); + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { length = ((zset*)zobj->ptr)->zsl->length; } else { redisPanic("Unknown sorted set encoding"); @@ -682,6 +723,82 @@ int zsLength(robj *zobj) { return length; } +void zsetConvert(robj *zobj, int encoding) { + zset *zs; + zskiplistNode *node, *next; + robj *ele; + double score; + + if (zobj->encoding == encoding) return; + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + if (encoding != REDIS_ENCODING_SKIPLIST) + redisPanic("Unknown target encoding"); + + zs = zmalloc(sizeof(*zs)); + zs->dict = dictCreate(&zsetDictType,NULL); + zs->zsl = zslCreate(); + + eptr = ziplistIndex(zl,0); + redisAssert(eptr != NULL); + sptr = ziplistNext(zl,eptr); + redisAssert(sptr != NULL); + + while (eptr != NULL) { + score = zzlGetScore(sptr); + redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong)); + if (vstr == NULL) + ele = createStringObjectFromLongLong(vlong); + else + ele = createStringObject((char*)vstr,vlen); + + /* Has incremented refcount since it was just created. */ + node = zslInsert(zs->zsl,score,ele); + redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); + incrRefCount(ele); /* Added to dictionary. */ + zzlNext(zl,&eptr,&sptr); + } + + zfree(zobj->ptr); + zobj->ptr = zs; + zobj->encoding = REDIS_ENCODING_SKIPLIST; + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { + unsigned char *zl = ziplistNew(); + + if (encoding != REDIS_ENCODING_ZIPLIST) + redisPanic("Unknown target encoding"); + + /* Approach similar to zslFree(), since we want to free the skiplist at + * the same time as creating the ziplist. */ + zs = zobj->ptr; + dictRelease(zs->dict); + node = zs->zsl->header->level[0].forward; + zfree(zs->zsl->header); + zfree(zs->zsl); + + while (node) { + ele = getDecodedObject(node->obj); + zl = zzlInsertAt(zl,NULL,ele,node->score); + decrRefCount(ele); + + next = node->level[0].forward; + zslFreeNode(node); + node = next; + } + + zfree(zs); + zobj->ptr = zl; + zobj->encoding = REDIS_ENCODING_ZIPLIST; + } else { + redisPanic("Unknown sorted set encoding"); + } +} + /*----------------------------------------------------------------------------- * Sorted set commands *----------------------------------------------------------------------------*/ @@ -700,7 +817,13 @@ void zaddGenericCommand(redisClient *c, int incr) { zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { - zobj = createZsetZiplistObject(); + if (server.zset_max_ziplist_entries == 0 || + server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr)) + { + zobj = createZsetObject(); + } else { + zobj = createZsetZiplistObject(); + } dbAdd(c->db,key,zobj); } else { if (zobj->type != REDIS_ZSET) { @@ -714,7 +837,7 @@ void zaddGenericCommand(redisClient *c, int incr) { /* Prefer non-encoded element when dealing with ziplists. */ ele = c->argv[3]; - if ((eptr = zzlFind(zobj,ele,&curscore)) != NULL) { + if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { if (incr) { score += curscore; if (isnan(score)) { @@ -727,8 +850,8 @@ void zaddGenericCommand(redisClient *c, int incr) { /* Remove and re-insert when score changed. */ if (score != curscore) { - redisAssert(zzlDelete(zobj,eptr) == REDIS_OK); - redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK); + zobj->ptr = zzlDelete(zobj->ptr,eptr); + zobj->ptr = zzlInsert(zobj->ptr,ele,score); signalModifiedKey(c->db,key); server.dirty++; @@ -739,7 +862,13 @@ void zaddGenericCommand(redisClient *c, int incr) { else /* ZADD */ addReply(c,shared.czero); } else { - redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK); + /* Optimize: check if the element is too large or the list becomes + * too long *before* executing zzlInsert. */ + zobj->ptr = zzlInsert(zobj->ptr,ele,score); + if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) + zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); + if (sdslen(ele->ptr) > server.zset_max_ziplist_value) + zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); signalModifiedKey(c->db,key); server.dirty++; @@ -749,7 +878,7 @@ void zaddGenericCommand(redisClient *c, int incr) { else /* ZADD */ addReply(c,shared.cone); } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplistNode *znode; dictEntry *de; @@ -825,14 +954,14 @@ void zremCommand(redisClient *c) { if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *eptr; - if ((eptr = zzlFind(zobj,ele,NULL)) != NULL) { - redisAssert(zzlDelete(zobj,eptr) == REDIS_OK); - if (zzlLength(zobj) == 0) dbDelete(c->db,key); + if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) { + zobj->ptr = zzlDelete(zobj->ptr,eptr); + if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key); } else { addReply(c,shared.czero); return; } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; dictEntry *de; double score; @@ -876,8 +1005,9 @@ void zremrangebyscoreCommand(redisClient *c) { checkType(c,zobj,REDIS_ZSET)) return; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { - deleted = zzlDeleteRangeByScore(zobj,range); - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted); + if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key); + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); @@ -892,22 +1022,21 @@ void zremrangebyscoreCommand(redisClient *c) { } void zremrangebyrankCommand(redisClient *c) { + robj *key = c->argv[1]; + robj *zobj; long start; long end; int llen; - long deleted; - robj *zsetobj; - zset *zs; + unsigned long deleted; if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return; - if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,zsetobj,REDIS_ZSET)) return; - zs = zsetobj->ptr; - llen = zs->zsl->length; + if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || + checkType(c,zobj,REDIS_ZSET)) return; - /* convert negative indexes */ + /* Sanitize indexes. */ + llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; @@ -920,27 +1049,348 @@ void zremrangebyrankCommand(redisClient *c) { } if (end >= llen) end = llen-1; - /* increment start and end because zsl*Rank functions - * use 1-based rank */ - deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict); - if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); - if (deleted) signalModifiedKey(c->db,c->argv[1]); + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + /* Correct for 1-based rank. */ + zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted); + if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key); + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + + /* Correct for 1-based rank. */ + deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict); + if (htNeedsResize(zs->dict)) dictResize(zs->dict); + if (dictSize(zs->dict) == 0) dbDelete(c->db,key); + } else { + redisPanic("Unknown sorted set encoding"); + } + + if (deleted) signalModifiedKey(c->db,key); server.dirty += deleted; - addReplyLongLong(c, deleted); + addReplyLongLong(c,deleted); } typedef struct { - dict *dict; + robj *subject; + int type; /* Set, sorted set */ + int encoding; double weight; + + union { + /* Set iterators. */ + union _iterset { + struct { + intset *is; + int ii; + } is; + struct { + dict *dict; + dictIterator *di; + dictEntry *de; + } ht; + } set; + + /* Sorted set iterators. */ + union _iterzset { + struct { + unsigned char *zl; + unsigned char *eptr, *sptr; + } zl; + struct { + zset *zs; + zskiplistNode *node; + } sl; + } zset; + } iter; } zsetopsrc; -int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) { - zsetopsrc *d1 = (void*) s1, *d2 = (void*) s2; - unsigned long size1, size2; - size1 = d1->dict ? dictSize(d1->dict) : 0; - size2 = d2->dict ? dictSize(d2->dict) : 0; - return size1 - size2; + +/* Use dirty flags for pointers that need to be cleaned up in the next + * iteration over the zsetopval. The dirty flag for the long long value is + * special, since long long values don't need cleanup. Instead, it means that + * we already checked that "ell" holds a long long, or tried to convert another + * representation into a long long value. When this was successful, + * OPVAL_VALID_LL is set as well. */ +#define OPVAL_DIRTY_ROBJ 1 +#define OPVAL_DIRTY_LL 2 +#define OPVAL_VALID_LL 4 + +/* Store value retrieved from the iterator. */ +typedef struct { + int flags; + unsigned char _buf[32]; /* Private buffer. */ + robj *ele; + unsigned char *estr; + unsigned int elen; + long long ell; + double score; +} zsetopval; + +typedef union _iterset iterset; +typedef union _iterzset iterzset; + +void zuiInitIterator(zsetopsrc *op) { + if (op->subject == NULL) + return; + + if (op->type == REDIS_SET) { + iterset *it = &op->iter.set; + if (op->encoding == REDIS_ENCODING_INTSET) { + it->is.is = op->subject->ptr; + it->is.ii = 0; + } else if (op->encoding == REDIS_ENCODING_HT) { + it->ht.dict = op->subject->ptr; + it->ht.di = dictGetIterator(op->subject->ptr); + it->ht.de = dictNext(it->ht.di); + } else { + redisPanic("Unknown set encoding"); + } + } else if (op->type == REDIS_ZSET) { + iterzset *it = &op->iter.zset; + if (op->encoding == REDIS_ENCODING_ZIPLIST) { + it->zl.zl = op->subject->ptr; + it->zl.eptr = ziplistIndex(it->zl.zl,0); + if (it->zl.eptr != NULL) { + it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr); + redisAssert(it->zl.sptr != NULL); + } + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { + it->sl.zs = op->subject->ptr; + it->sl.node = it->sl.zs->zsl->header->level[0].forward; + } else { + redisPanic("Unknown sorted set encoding"); + } + } else { + redisPanic("Unsupported type"); + } +} + +void zuiClearIterator(zsetopsrc *op) { + if (op->subject == NULL) + return; + + if (op->type == REDIS_SET) { + iterset *it = &op->iter.set; + if (op->encoding == REDIS_ENCODING_INTSET) { + REDIS_NOTUSED(it); /* skip */ + } else if (op->encoding == REDIS_ENCODING_HT) { + dictReleaseIterator(it->ht.di); + } else { + redisPanic("Unknown set encoding"); + } + } else if (op->type == REDIS_ZSET) { + iterzset *it = &op->iter.zset; + if (op->encoding == REDIS_ENCODING_ZIPLIST) { + REDIS_NOTUSED(it); /* skip */ + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { + REDIS_NOTUSED(it); /* skip */ + } else { + redisPanic("Unknown sorted set encoding"); + } + } else { + redisPanic("Unsupported type"); + } +} + +int zuiLength(zsetopsrc *op) { + if (op->subject == NULL) + return 0; + + if (op->type == REDIS_SET) { + iterset *it = &op->iter.set; + if (op->encoding == REDIS_ENCODING_INTSET) { + return intsetLen(it->is.is); + } else if (op->encoding == REDIS_ENCODING_HT) { + return dictSize(it->ht.dict); + } else { + redisPanic("Unknown set encoding"); + } + } else if (op->type == REDIS_ZSET) { + iterzset *it = &op->iter.zset; + if (op->encoding == REDIS_ENCODING_ZIPLIST) { + return zzlLength(it->zl.zl); + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { + return it->sl.zs->zsl->length; + } else { + redisPanic("Unknown sorted set encoding"); + } + } else { + redisPanic("Unsupported type"); + } +} + +/* Check if the current value is valid. If so, store it in the passed structure + * and move to the next element. If not valid, this means we have reached the + * end of the structure and can abort. */ +int zuiNext(zsetopsrc *op, zsetopval *val) { + if (op->subject == NULL) + return 0; + + if (val->flags & OPVAL_DIRTY_ROBJ) + decrRefCount(val->ele); + + bzero(val,sizeof(zsetopval)); + + if (op->type == REDIS_SET) { + iterset *it = &op->iter.set; + if (op->encoding == REDIS_ENCODING_INTSET) { + if (!intsetGet(it->is.is,it->is.ii,(int64_t*)&val->ell)) + return 0; + val->score = 1.0; + + /* Move to next element. */ + it->is.ii++; + } else if (op->encoding == REDIS_ENCODING_HT) { + if (it->ht.de == NULL) + return 0; + val->ele = dictGetEntryKey(it->ht.de); + val->score = 1.0; + + /* Move to next element. */ + it->ht.de = dictNext(it->ht.di); + } else { + redisPanic("Unknown set encoding"); + } + } else if (op->type == REDIS_ZSET) { + iterzset *it = &op->iter.zset; + if (op->encoding == REDIS_ENCODING_ZIPLIST) { + /* No need to check both, but better be explicit. */ + if (it->zl.eptr == NULL || it->zl.sptr == NULL) + return 0; + redisAssert(ziplistGet(it->zl.eptr,&val->estr,&val->elen,&val->ell)); + val->score = zzlGetScore(it->zl.sptr); + + /* Move to next element. */ + zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr); + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { + if (it->sl.node == NULL) + return 0; + val->ele = it->sl.node->obj; + val->score = it->sl.node->score; + + /* Move to next element. */ + it->sl.node = it->sl.node->level[0].forward; + } else { + redisPanic("Unknown sorted set encoding"); + } + } else { + redisPanic("Unsupported type"); + } + return 1; +} + +int zuiLongLongFromValue(zsetopval *val) { + if (!(val->flags & OPVAL_DIRTY_LL)) { + val->flags |= OPVAL_DIRTY_LL; + + if (val->ele != NULL) { + if (val->ele->encoding == REDIS_ENCODING_INT) { + val->ell = (long)val->ele->ptr; + val->flags |= OPVAL_VALID_LL; + } else if (val->ele->encoding == REDIS_ENCODING_RAW) { + if (string2ll(val->ele->ptr,sdslen(val->ele->ptr),&val->ell)) + val->flags |= OPVAL_VALID_LL; + } else { + redisPanic("Unsupported element encoding"); + } + } else if (val->estr != NULL) { + if (string2ll((char*)val->estr,val->elen,&val->ell)) + val->flags |= OPVAL_VALID_LL; + } else { + /* The long long was already set, flag as valid. */ + val->flags |= OPVAL_VALID_LL; + } + } + return val->flags & OPVAL_VALID_LL; +} + +robj *zuiObjectFromValue(zsetopval *val) { + if (val->ele == NULL) { + if (val->estr != NULL) { + val->ele = createStringObject((char*)val->estr,val->elen); + } else { + val->ele = createStringObjectFromLongLong(val->ell); + } + val->flags |= OPVAL_DIRTY_ROBJ; + } + return val->ele; +} + +int zuiBufferFromValue(zsetopval *val) { + if (val->estr == NULL) { + if (val->ele != NULL) { + if (val->ele->encoding == REDIS_ENCODING_INT) { + val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),(long)val->ele->ptr); + val->estr = val->_buf; + } else if (val->ele->encoding == REDIS_ENCODING_RAW) { + val->elen = sdslen(val->ele->ptr); + val->estr = val->ele->ptr; + } else { + redisPanic("Unsupported element encoding"); + } + } else { + val->elen = ll2string((char*)val->_buf,sizeof(val->_buf),val->ell); + val->estr = val->_buf; + } + } + return 1; +} + +/* Find value pointed to by val in the source pointer to by op. When found, + * return 1 and store its score in target. Return 0 otherwise. */ +int zuiFind(zsetopsrc *op, zsetopval *val, double *score) { + if (op->subject == NULL) + return 0; + + if (op->type == REDIS_SET) { + iterset *it = &op->iter.set; + + if (op->encoding == REDIS_ENCODING_INTSET) { + if (zuiLongLongFromValue(val) && intsetFind(it->is.is,val->ell)) { + *score = 1.0; + return 1; + } else { + return 0; + } + } else if (op->encoding == REDIS_ENCODING_HT) { + zuiObjectFromValue(val); + if (dictFind(it->ht.dict,val->ele) != NULL) { + *score = 1.0; + return 1; + } else { + return 0; + } + } else { + redisPanic("Unknown set encoding"); + } + } else if (op->type == REDIS_ZSET) { + iterzset *it = &op->iter.zset; + zuiObjectFromValue(val); + + if (op->encoding == REDIS_ENCODING_ZIPLIST) { + if (zzlFind(it->zl.zl,val->ele,score) != NULL) { + /* Score is already set by zzlFind. */ + return 1; + } else { + return 0; + } + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { + dictEntry *de; + if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) { + *score = *(double*)dictGetEntryVal(de); + return 1; + } else { + return 0; + } + } else { + redisPanic("Unknown sorted set encoding"); + } + } else { + redisPanic("Unsupported type"); + } +} + +int zuiCompareByCardinality(const void *s1, const void *s2) { + return zuiLength((zsetopsrc*)s1) - zuiLength((zsetopsrc*)s2); } #define REDIS_AGGR_SUM 1 @@ -969,11 +1419,12 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { int i, j, setnum; int aggregate = REDIS_AGGR_SUM; zsetopsrc *src; + zsetopval zval; + robj *tmp; + unsigned int maxelelen = 0; robj *dstobj; zset *dstzset; zskiplistNode *znode; - dictIterator *di; - dictEntry *de; int touched = 0; /* expect setnum input keys to be given */ @@ -991,24 +1442,24 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { } /* read keys to be used for input */ - src = zmalloc(sizeof(zsetopsrc) * setnum); + src = zcalloc(sizeof(zsetopsrc) * setnum); for (i = 0, j = 3; i < setnum; i++, j++) { robj *obj = lookupKeyWrite(c->db,c->argv[j]); - if (!obj) { - src[i].dict = NULL; - } else { - if (obj->type == REDIS_ZSET) { - src[i].dict = ((zset*)obj->ptr)->dict; - } else if (obj->type == REDIS_SET) { - src[i].dict = (obj->ptr); - } else { + if (obj != NULL) { + if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) { zfree(src); addReply(c,shared.wrongtypeerr); return; } + + src[i].subject = obj; + src[i].type = obj->type; + src[i].encoding = obj->encoding; + } else { + src[i].subject = NULL; } - /* default all weights to 1 */ + /* Default all weights to 1. */ src[i].weight = 1.0; } @@ -1049,95 +1500,119 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { } } + for (i = 0; i < setnum; i++) + zuiInitIterator(&src[i]); + /* sort sets from the smallest to largest, this will improve our * algorithm's performance */ - qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality); + qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality); dstobj = createZsetObject(); dstzset = dstobj->ptr; + memset(&zval, 0, sizeof(zval)); if (op == REDIS_OP_INTER) { - /* skip going over all entries if the smallest zset is NULL or empty */ - if (src[0].dict && dictSize(src[0].dict) > 0) { - /* precondition: as src[0].dict is non-empty and the zsets are ordered - * from small to large, all src[i > 0].dict are non-empty too */ - di = dictGetIterator(src[0].dict); - while((de = dictNext(di)) != NULL) { + /* Skip everything if the smallest input is empty. */ + if (zuiLength(&src[0]) > 0) { + /* Precondition: as src[0] is non-empty and the inputs are ordered + * by size, all src[i > 0] are non-empty too. */ + while (zuiNext(&src[0],&zval)) { double score, value; - score = src[0].weight * zunionInterDictValue(de); + score = src[0].weight * zval.score; for (j = 1; j < setnum; j++) { - dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); - if (other) { - value = src[j].weight * zunionInterDictValue(other); + /* It is not safe to access the zset we are + * iterating, so explicitly check for equal object. */ + if (src[j].subject == src[0].subject) { + value = zval.score*src[j].weight; + zunionInterAggregate(&score,value,aggregate); + } else if (zuiFind(&src[j],&zval,&value)) { + value *= src[j].weight; zunionInterAggregate(&score,value,aggregate); } else { break; } } - /* Only continue when present in every source dict. */ + /* Only continue when present in every input. */ if (j == setnum) { - robj *o = dictGetEntryKey(de); - znode = zslInsert(dstzset->zsl,score,o); - incrRefCount(o); /* added to skiplist */ - dictAdd(dstzset->dict,o,&znode->score); - incrRefCount(o); /* added to dictionary */ + tmp = zuiObjectFromValue(&zval); + znode = zslInsert(dstzset->zsl,score,tmp); + incrRefCount(tmp); /* added to skiplist */ + dictAdd(dstzset->dict,tmp,&znode->score); + incrRefCount(tmp); /* added to dictionary */ + + if (tmp->encoding == REDIS_ENCODING_RAW) + if (sdslen(tmp->ptr) > maxelelen) + maxelelen = sdslen(tmp->ptr); } } - dictReleaseIterator(di); } } else if (op == REDIS_OP_UNION) { for (i = 0; i < setnum; i++) { - if (!src[i].dict) continue; + if (zuiLength(&src[i]) == 0) + continue; - di = dictGetIterator(src[i].dict); - while((de = dictNext(di)) != NULL) { + while (zuiNext(&src[i],&zval)) { double score, value; - /* skip key when already processed */ - if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) + /* Skip key when already processed */ + if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL) continue; - /* initialize score */ - score = src[i].weight * zunionInterDictValue(de); + /* Initialize score */ + score = src[i].weight * zval.score; - /* because the zsets are sorted by size, its only possible - * for sets at larger indices to hold this entry */ + /* Because the inputs are sorted by size, it's only possible + * for sets at larger indices to hold this element. */ for (j = (i+1); j < setnum; j++) { - dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); - if (other) { - value = src[j].weight * zunionInterDictValue(other); + /* It is not safe to access the zset we are + * iterating, so explicitly check for equal object. */ + if(src[j].subject == src[i].subject) { + value = zval.score*src[j].weight; + zunionInterAggregate(&score,value,aggregate); + } else if (zuiFind(&src[j],&zval,&value)) { + value *= src[j].weight; zunionInterAggregate(&score,value,aggregate); } } - robj *o = dictGetEntryKey(de); - znode = zslInsert(dstzset->zsl,score,o); - incrRefCount(o); /* added to skiplist */ - dictAdd(dstzset->dict,o,&znode->score); - incrRefCount(o); /* added to dictionary */ + tmp = zuiObjectFromValue(&zval); + znode = zslInsert(dstzset->zsl,score,tmp); + incrRefCount(zval.ele); /* added to skiplist */ + dictAdd(dstzset->dict,tmp,&znode->score); + incrRefCount(zval.ele); /* added to dictionary */ + + if (tmp->encoding == REDIS_ENCODING_RAW) + if (sdslen(tmp->ptr) > maxelelen) + maxelelen = sdslen(tmp->ptr); } - dictReleaseIterator(di); } } else { - /* unknown operator */ - redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION); + redisPanic("Unknown operator"); } + for (i = 0; i < setnum; i++) + zuiClearIterator(&src[i]); + if (dbDelete(c->db,dstkey)) { signalModifiedKey(c->db,dstkey); touched = 1; server.dirty++; } if (dstzset->zsl->length) { + /* Convert to ziplist when in limits. */ + if (dstzset->zsl->length <= server.zset_max_ziplist_entries && + maxelelen <= server.zset_max_ziplist_value) + zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST); + dbAdd(c->db,dstkey,dstobj); - addReplyLongLong(c, dstzset->zsl->length); + addReplyLongLong(c,zsetLength(dstobj)); if (!touched) signalModifiedKey(c->db,dstkey); server.dirty++; } else { decrRefCount(dstobj); - addReply(c, shared.czero); + addReply(c,shared.czero); } zfree(src); } @@ -1173,7 +1648,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) { || checkType(c,zobj,REDIS_ZSET)) return; /* Sanitize indexes. */ - llen = zsLength(zobj); + llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; @@ -1202,38 +1677,27 @@ void zrangeGenericCommand(redisClient *c, int reverse) { else eptr = ziplistIndex(zl,2*start); + redisAssert(eptr != NULL); + sptr = ziplistNext(zl,eptr); + while (rangelen--) { - redisAssert(eptr != NULL); + redisAssert(eptr != NULL && sptr != NULL); redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong)); if (vstr == NULL) addReplyBulkLongLong(c,vlong); else addReplyBulkCBuffer(c,vstr,vlen); - if (withscores) { - sptr = ziplistNext(zl,eptr); - redisAssert(sptr != NULL); + if (withscores) addReplyDouble(c,zzlGetScore(sptr)); - } - if (reverse) { - /* Move to previous element by moving to the score of previous - * element. When NULL, we know there also is no element. */ - sptr = ziplistPrev(zl,eptr); - if (sptr != NULL) { - eptr = ziplistPrev(zl,sptr); - redisAssert(eptr != NULL); - } else { - eptr = NULL; - } - } else { - sptr = ziplistNext(zl,eptr); - redisAssert(sptr != NULL); - eptr = ziplistNext(zl,sptr); - } + if (reverse) + zzlPrev(zl,&eptr,&sptr); + else + zzlNext(zl,&eptr,&sptr); } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; @@ -1275,10 +1739,8 @@ void zrevrangeCommand(redisClient *c) { * If "justcount", only the number of elements in the range is returned. */ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { zrangespec range; - robj *o, *emptyreply; - zset *zsetobj; - zskiplist *zsl; - zskiplistNode *ln; + robj *key = c->argv[1]; + robj *emptyreply, *zobj; int offset = 0, limit = -1; int withscores = 0; unsigned long rangelen = 0; @@ -1322,61 +1784,132 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { /* Ok, lookup the key and get the range */ emptyreply = justcount ? shared.czero : shared.emptymultibulk; - if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyreply)) == NULL || - checkType(c,o,REDIS_ZSET)) return; - zsetobj = o->ptr; - zsl = zsetobj->zsl; + if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL || + checkType(c,zobj,REDIS_ZSET)) return; - /* If reversed, get the last node in range as starting point. */ - if (reverse) { - ln = zslLastInRange(zsl,range); - } else { - ln = zslFirstInRange(zsl,range); - } + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + double score; - /* No "first" element in the specified interval. */ - if (ln == NULL) { - addReply(c,emptyreply); - return; - } + /* If reversed, get the last node in range as starting point. */ + if (reverse) + eptr = zzlLastInRange(zl,range); + else + eptr = zzlFirstInRange(zl,range); - /* We don't know in advance how many matching elements there are in the - * list, so we push this object that will represent the multi-bulk length - * in the output buffer, and will "fix" it later */ - if (!justcount) - replylen = addDeferredMultiBulkLength(c); + /* No "first" element in the specified interval. */ + if (eptr == NULL) { + addReply(c,emptyreply); + return; + } - /* If there is an offset, just traverse the number of elements without - * checking the score because that is done in the next loop. */ - while(ln && offset--) { - ln = reverse ? ln->backward : ln->level[0].forward; - } + /* Get score pointer for the first element. */ + redisAssert(eptr != NULL); + sptr = ziplistNext(zl,eptr); - while (ln && limit--) { - /* Abort when the node is no longer in range. */ - if (reverse) { - if (!zslValueGteMin(ln->score,&range)) break; - } else { - if (!zslValueLteMax(ln->score,&range)) break; + /* We don't know in advance how many matching elements there are in the + * list, so we push this object that will represent the multi-bulk + * length in the output buffer, and will "fix" it later */ + if (!justcount) + replylen = addDeferredMultiBulkLength(c); + + /* If there is an offset, just traverse the number of elements without + * checking the score because that is done in the next loop. */ + while (eptr && offset--) + if (reverse) + zzlPrev(zl,&eptr,&sptr); + else + zzlNext(zl,&eptr,&sptr); + + while (eptr && limit--) { + score = zzlGetScore(sptr); + + /* Abort when the node is no longer in range. */ + if (reverse) { + if (!zslValueGteMin(score,&range)) break; + } else { + if (!zslValueLteMax(score,&range)) break; + } + + /* Do our magic */ + rangelen++; + if (!justcount) { + redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong)); + if (vstr == NULL) + addReplyBulkLongLong(c,vlong); + else + addReplyBulkCBuffer(c,vstr,vlen); + + if (withscores) + addReplyDouble(c,score); + } + + /* Move to next node */ + if (reverse) + zzlPrev(zl,&eptr,&sptr); + else + zzlNext(zl,&eptr,&sptr); } + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *ln; - /* Do our magic */ - rangelen++; - if (!justcount) { - addReplyBulk(c,ln->obj); - if (withscores) - addReplyDouble(c,ln->score); + /* If reversed, get the last node in range as starting point. */ + if (reverse) + ln = zslLastInRange(zsl,range); + else + ln = zslFirstInRange(zsl,range); + + /* No "first" element in the specified interval. */ + if (ln == NULL) { + addReply(c,emptyreply); + return; } - /* Move to next node */ - ln = reverse ? ln->backward : ln->level[0].forward; + /* We don't know in advance how many matching elements there are in the + * list, so we push this object that will represent the multi-bulk + * length in the output buffer, and will "fix" it later */ + if (!justcount) + replylen = addDeferredMultiBulkLength(c); + + /* If there is an offset, just traverse the number of elements without + * checking the score because that is done in the next loop. */ + while (ln && offset--) + ln = reverse ? ln->backward : ln->level[0].forward; + + while (ln && limit--) { + /* Abort when the node is no longer in range. */ + if (reverse) { + if (!zslValueGteMin(ln->score,&range)) break; + } else { + if (!zslValueLteMax(ln->score,&range)) break; + } + + /* Do our magic */ + rangelen++; + if (!justcount) { + addReplyBulk(c,ln->obj); + if (withscores) + addReplyDouble(c,ln->score); + } + + /* Move to next node */ + ln = reverse ? ln->backward : ln->level[0].forward; + } + } else { + redisPanic("Unknown sorted set encoding"); } if (justcount) { addReplyLongLong(c,(long)rangelen); } else { - setDeferredMultiBulkLength(c,replylen, - withscores ? (rangelen*2) : rangelen); + if (withscores) rangelen *= 2; + setDeferredMultiBulkLength(c,replylen,rangelen); } } @@ -1393,66 +1926,103 @@ void zcountCommand(redisClient *c) { } void zcardCommand(redisClient *c) { - robj *o; - zset *zs; + robj *key = c->argv[1]; + robj *zobj; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_ZSET)) return; + if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL || + checkType(c,zobj,REDIS_ZSET)) return; - zs = o->ptr; - addReplyLongLong(c,zs->zsl->length); + addReplyLongLong(c,zsetLength(zobj)); } void zscoreCommand(redisClient *c) { - robj *o; - zset *zs; - dictEntry *de; + robj *key = c->argv[1]; + robj *zobj; + double score; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || - checkType(c,o,REDIS_ZSET)) return; + if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL || + checkType(c,zobj,REDIS_ZSET)) return; - zs = o->ptr; - c->argv[2] = tryObjectEncoding(c->argv[2]); - de = dictFind(zs->dict,c->argv[2]); - if (!de) { - addReply(c,shared.nullbulk); - } else { - double *score = dictGetEntryVal(de); + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL) + addReplyDouble(c,score); + else + addReply(c,shared.nullbulk); + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + dictEntry *de; - addReplyDouble(c,*score); + c->argv[2] = tryObjectEncoding(c->argv[2]); + de = dictFind(zs->dict,c->argv[2]); + if (de != NULL) { + score = *(double*)dictGetEntryVal(de); + addReplyDouble(c,score); + } else { + addReply(c,shared.nullbulk); + } + } else { + redisPanic("Unknown sorted set encoding"); } } void zrankGenericCommand(redisClient *c, int reverse) { - robj *o; - zset *zs; - zskiplist *zsl; - dictEntry *de; + robj *key = c->argv[1]; + robj *ele = c->argv[2]; + robj *zobj; + unsigned long llen; unsigned long rank; - double *score; - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || - checkType(c,o,REDIS_ZSET)) return; + if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL || + checkType(c,zobj,REDIS_ZSET)) return; + llen = zsetLength(zobj); - zs = o->ptr; - zsl = zs->zsl; - c->argv[2] = tryObjectEncoding(c->argv[2]); - de = dictFind(zs->dict,c->argv[2]); - if (!de) { - addReply(c,shared.nullbulk); - return; - } + redisAssert(ele->encoding == REDIS_ENCODING_RAW); + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; - score = dictGetEntryVal(de); - rank = zslGetRank(zsl, *score, c->argv[2]); - if (rank) { - if (reverse) { - addReplyLongLong(c, zsl->length - rank); + eptr = ziplistIndex(zl,0); + redisAssert(eptr != NULL); + sptr = ziplistNext(zl,eptr); + redisAssert(sptr != NULL); + + rank = 1; + while(eptr != NULL) { + if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) + break; + rank++; + zzlNext(zl,&eptr,&sptr); + } + + if (eptr != NULL) { + if (reverse) + addReplyLongLong(c,llen-rank); + else + addReplyLongLong(c,rank-1); } else { - addReplyLongLong(c, rank-1); + addReply(c,shared.nullbulk); + } + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + dictEntry *de; + double score; + + ele = c->argv[2] = tryObjectEncoding(c->argv[2]); + de = dictFind(zs->dict,ele); + if (de != NULL) { + score = *(double*)dictGetEntryVal(de); + rank = zslGetRank(zsl,score,ele); + redisAssert(rank); /* Existing elements always have a rank. */ + if (reverse) + addReplyLongLong(c,llen-rank); + else + addReplyLongLong(c,rank-1); + } else { + addReply(c,shared.nullbulk); } } else { - addReply(c,shared.nullbulk); + redisPanic("Unknown sorted set encoding"); } }