X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/56ce42faf168cafeb9dee681ab269b1fb98b197d..d310fbedabd3101505b694f5c25a2e48480a3c2b:/src/t_zset.c diff --git a/src/t_zset.c b/src/t_zset.c index 1ef5e991..4812709e 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -17,7 +17,7 @@ /* This skiplist implementation is almost a C translation of the original * algorithm described by William Pugh in "Skip Lists: A Probabilistic * Alternative to Balanced Trees", modified in three ways: - * a) this implementation allows for repeated values. + * a) this implementation allows for repeated scores. * b) the comparison is not just by key (our 'score') but by satellite data. * c) there is a back pointer, so it's a doubly linked list with the back * pointers being only at "level 1". This allows to traverse the list @@ -64,6 +64,10 @@ void zslFree(zskiplist *zsl) { zfree(zsl); } +/* Returns a random level for the new skiplist node we are going to create. + * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL + * (both inclusive), with a powerlaw-alike distribution where higher + * levels are less likely to be returned. */ int zslRandomLevel(void) { int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) @@ -76,6 +80,7 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) { unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; + redisAssert(!isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ @@ -174,12 +179,6 @@ int zslDelete(zskiplist *zsl, double score, robj *obj) { return 0; /* not found */ } -/* Struct to hold a inclusive/exclusive range spec. */ -typedef struct { - double min, max; - int minex, maxex; /* are min or max exclusive? */ -} zrangespec; - static int zslValueGteMin(double value, zrangespec *spec) { return spec->minex ? (value > spec->min) : (value >= spec->min); } @@ -188,10 +187,6 @@ static int zslValueLteMax(double value, zrangespec *spec) { return spec->maxex ? (value < spec->max) : (value <= spec->max); } -static int zslValueInRange(double value, zrangespec *spec) { - return zslValueGteMin(value,spec) && zslValueLteMax(value,spec); -} - /* Returns if there is a part of the zset is in range. */ int zslIsInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; @@ -226,10 +221,12 @@ zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) { x = x->level[i].forward; } - /* The tail is in range, so the previous block should always return a - * node that is non-NULL and the last one to be out of range. */ + /* This is an inner range, so the next node cannot be NULL. */ x = x->level[0].forward; - redisAssert(x != NULL && zslValueInRange(x->score,&range)); + redisAssert(x != NULL); + + /* Check if score <= max. */ + if (!zslValueLteMax(x->score,&range)) return NULL; return x; } @@ -250,9 +247,11 @@ zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) { x = x->level[i].forward; } - /* The header is in range, so the previous block should always return a - * node that is non-NULL and in range. */ - redisAssert(x != NULL && zslValueInRange(x->score,&range)); + /* This is an inner range, so this node cannot be NULL. */ + redisAssert(x != NULL); + + /* Check if score >= min. */ + if (!zslValueGteMin(x->score,&range)) return NULL; return x; } @@ -503,7 +502,7 @@ int zzlIsInRange(unsigned char *zl, zrangespec *range) { return 0; p = ziplistIndex(zl,-1); /* Last score. */ - redisAssert(p != NULL); + if (p == NULL) return 0; /* Empty sorted set */ score = zzlGetScore(p); if (!zslValueGteMin(score,range)) return 0; @@ -519,8 +518,7 @@ int zzlIsInRange(unsigned char *zl, zrangespec *range) { /* Find pointer to the first element contained in the specified range. * Returns NULL when no element is contained in the range. */ -unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; double score; @@ -532,8 +530,12 @@ unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) { redisAssert(sptr != NULL); score = zzlGetScore(sptr); - if (zslValueGteMin(score,&range)) - return eptr; + if (zslValueGteMin(score,&range)) { + /* Check if score <= max. */ + if (zslValueLteMax(score,&range)) + return eptr; + return NULL; + } /* Move to next element. */ eptr = ziplistNext(zl,sptr); @@ -544,8 +546,7 @@ unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) { /* Find pointer to the last element contained in the specified range. * Returns NULL when no element is contained in the range. */ -unsigned char *zzlLastInRange(robj *zobj, zrangespec range) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) { unsigned char *eptr = ziplistIndex(zl,-2), *sptr; double score; @@ -557,8 +558,12 @@ unsigned char *zzlLastInRange(robj *zobj, zrangespec range) { redisAssert(sptr != NULL); score = zzlGetScore(sptr); - if (zslValueLteMax(score,&range)) - return eptr; + if (zslValueLteMax(score,&range)) { + /* Check if score >= min. */ + if (zslValueGteMin(score,&range)) + return eptr; + return NULL; + } /* Move to previous element by moving to the score of previous element. * When this returns NULL, we know there also is no element. */ @@ -572,14 +577,13 @@ unsigned char *zzlLastInRange(robj *zobj, zrangespec range) { return NULL; } -unsigned char *zzlFind(robj *zobj, robj *ele, double *score) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; ele = getDecodedObject(ele); while (eptr != NULL) { sptr = ziplistNext(zl,eptr); - redisAssert(sptr != NULL); + redisAssertWithInfo(NULL,ele,sptr != NULL); if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) { /* Matching element, pull out score. */ @@ -598,25 +602,22 @@ unsigned char *zzlFind(robj *zobj, robj *ele, double *score) { /* Delete (element,score) pair from ziplist. Use local copy of eptr because we * don't want to modify the one given as argument. */ -int zzlDelete(robj *zobj, unsigned char *eptr) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) { unsigned char *p = eptr; /* TODO: add function to ziplist API to delete N elements from offset. */ zl = ziplistDelete(zl,&p); zl = ziplistDelete(zl,&p); - zobj->ptr = zl; - return REDIS_OK; + return zl; } -int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) { unsigned char *sptr; char scorebuf[128]; int scorelen; - int offset; + size_t offset; - redisAssert(ele->encoding == REDIS_ENCODING_RAW); + redisAssertWithInfo(NULL,ele,ele->encoding == REDIS_ENCODING_RAW); scorelen = d2string(scorebuf,sizeof(scorebuf),score); if (eptr == NULL) { zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL); @@ -628,37 +629,35 @@ int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) { eptr = zl+offset; /* Insert score after the element. */ - redisAssert((sptr = ziplistNext(zl,eptr)) != NULL); + redisAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL); zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen); } - zobj->ptr = zl; - return REDIS_OK; + return zl; } /* Insert (element,score) pair in ziplist. This function assumes the element is * not yet present in the list. */ -int zzlInsert(robj *zobj, robj *ele, double score) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; double s; ele = getDecodedObject(ele); while (eptr != NULL) { sptr = ziplistNext(zl,eptr); - redisAssert(sptr != NULL); + redisAssertWithInfo(NULL,ele,sptr != NULL); s = zzlGetScore(sptr); if (s > score) { /* First element with score larger than score for element to be * inserted. This means we should take its spot in the list to * maintain ordering. */ - zzlInsertAt(zobj,ele,score,eptr); + zl = zzlInsertAt(zl,eptr,ele,score); break; } else if (s == score) { /* Ensure lexicographical ordering for elements. */ if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) { - zzlInsertAt(zobj,ele,score,eptr); + zl = zzlInsertAt(zl,eptr,ele,score); break; } } @@ -669,21 +668,21 @@ int zzlInsert(robj *zobj, robj *ele, double score) { /* Push on tail of list when it was not yet inserted. */ if (eptr == NULL) - zzlInsertAt(zobj,ele,score,NULL); + zl = zzlInsertAt(zl,NULL,ele,score); decrRefCount(ele); - return REDIS_OK; + return zl; } -unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) { unsigned char *eptr, *sptr; double score; - unsigned long deleted = 0; + unsigned long num = 0; - eptr = zzlFirstInRange(zobj,range); - if (eptr == NULL) return deleted; + if (deleted != NULL) *deleted = 0; + eptr = zzlFirstInRange(zl,range); + if (eptr == NULL) return zl; /* When the tail of the ziplist is deleted, eptr will point to the sentinel * byte and ziplistNext will return NULL. */ @@ -693,33 +692,35 @@ unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) { /* Delete both the element and the score. */ zl = ziplistDelete(zl,&eptr); zl = ziplistDelete(zl,&eptr); - deleted++; + num++; } else { /* No longer in range. */ break; } } - return deleted; + if (deleted != NULL) *deleted = num; + return zl; } /* Delete all the elements with rank between start and end from the skiplist. * Start and end are inclusive. Note that start and end need to be 1-based */ -unsigned long zzlDeleteRangeByRank(robj *zobj, unsigned int start, unsigned int end) { +unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) { unsigned int num = (end-start)+1; - zobj->ptr = ziplistDeleteRange(zobj->ptr,2*(start-1),2*num); - return num; + if (deleted) *deleted = num; + zl = ziplistDeleteRange(zl,2*(start-1),2*num); + return zl; } /*----------------------------------------------------------------------------- * Common sorted set API *----------------------------------------------------------------------------*/ -int zsLength(robj *zobj) { +unsigned int zsetLength(robj *zobj) { int length = -1; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { length = zzlLength(zobj->ptr); - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { length = ((zset*)zobj->ptr)->zsl->length; } else { redisPanic("Unknown sorted set encoding"); @@ -727,7 +728,7 @@ int zsLength(robj *zobj) { return length; } -void zsConvert(robj *zobj, int encoding) { +void zsetConvert(robj *zobj, int encoding) { zset *zs; zskiplistNode *node, *next; robj *ele; @@ -741,7 +742,7 @@ void zsConvert(robj *zobj, int encoding) { unsigned int vlen; long long vlong; - if (encoding != REDIS_ENCODING_RAW) + if (encoding != REDIS_ENCODING_SKIPLIST) redisPanic("Unknown target encoding"); zs = zmalloc(sizeof(*zs)); @@ -749,13 +750,13 @@ void zsConvert(robj *zobj, int encoding) { zs->zsl = zslCreate(); eptr = ziplistIndex(zl,0); - redisAssert(eptr != NULL); + redisAssertWithInfo(NULL,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); - redisAssert(sptr != NULL); + redisAssertWithInfo(NULL,zobj,sptr != NULL); while (eptr != NULL) { score = zzlGetScore(sptr); - redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong)); + redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); if (vstr == NULL) ele = createStringObjectFromLongLong(vlong); else @@ -763,15 +764,15 @@ void zsConvert(robj *zobj, int encoding) { /* Has incremented refcount since it was just created. */ node = zslInsert(zs->zsl,score,ele); - redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); + redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK); incrRefCount(ele); /* Added to dictionary. */ zzlNext(zl,&eptr,&sptr); } zfree(zobj->ptr); zobj->ptr = zs; - zobj->encoding = REDIS_ENCODING_RAW; - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + zobj->encoding = REDIS_ENCODING_SKIPLIST; + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { unsigned char *zl = ziplistNew(); if (encoding != REDIS_ENCODING_ZIPLIST) @@ -785,13 +786,9 @@ void zsConvert(robj *zobj, int encoding) { zfree(zs->zsl->header); zfree(zs->zsl); - /* Immediately store pointer to ziplist in object because it will - * change because of reallocations when pushing to the ziplist. */ - zobj->ptr = zl; - while (node) { ele = getDecodedObject(node->obj); - redisAssert(zzlInsertAt(zobj,ele,node->score,NULL) == REDIS_OK); + zl = zzlInsertAt(zl,NULL,ele,node->score); decrRefCount(ele); next = node->level[0].forward; @@ -800,6 +797,7 @@ void zsConvert(robj *zobj, int encoding) { } zfree(zs); + zobj->ptr = zl; zobj->encoding = REDIS_ENCODING_ZIPLIST; } else { redisPanic("Unknown sorted set encoding"); @@ -810,12 +808,6 @@ void zsConvert(robj *zobj, int encoding) { * Sorted set commands *----------------------------------------------------------------------------*/ -// if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { -// } else if (zobj->encoding == REDIS_ENCODING_RAW) { -// } else { -// redisPanic("Unknown sorted set encoding"); -// } - /* This generic command implements both ZADD and ZINCRBY. */ void zaddGenericCommand(redisClient *c, int incr) { static char *nanerr = "resulting score is not a number (NaN)"; @@ -823,11 +815,29 @@ void zaddGenericCommand(redisClient *c, int incr) { robj *ele; robj *zobj; robj *curobj; - double score, curscore = 0.0; + double score = 0, *scores, curscore = 0.0; + int j, elements = (c->argc-2)/2; + int added = 0; - if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK) + if (c->argc % 2) { + addReply(c,shared.syntaxerr); return; + } + /* Start parsing all the scores, we need to emit any syntax error + * before executing additions to the sorted set, as the command should + * either execute fully or nothing at all. */ + scores = zmalloc(sizeof(double)*elements); + for (j = 0; j < elements; j++) { + if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL) + != REDIS_OK) + { + zfree(scores); + return; + } + } + + /* Lookup the key and create the sorted set if does not exist. */ zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { if (server.zset_max_ziplist_entries == 0 || @@ -841,111 +851,105 @@ void zaddGenericCommand(redisClient *c, int incr) { } else { if (zobj->type != REDIS_ZSET) { addReply(c,shared.wrongtypeerr); + zfree(scores); return; } } - if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { - unsigned char *eptr; + for (j = 0; j < elements; j++) { + score = scores[j]; - /* Prefer non-encoded element when dealing with ziplists. */ - ele = c->argv[3]; - if ((eptr = zzlFind(zobj,ele,&curscore)) != NULL) { - if (incr) { - score += curscore; - if (isnan(score)) { - addReplyError(c,nanerr); - /* Don't need to check if the sorted set is empty, because - * we know it has at least one element. */ - return; + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *eptr; + + /* Prefer non-encoded element when dealing with ziplists. */ + ele = c->argv[3+j*2]; + if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { + if (incr) { + score += curscore; + if (isnan(score)) { + addReplyError(c,nanerr); + /* Don't need to check if the sorted set is empty + * because we know it has at least one element. */ + zfree(scores); + return; + } } - } - /* Remove and re-insert when score changed. */ - if (score != curscore) { - redisAssert(zzlDelete(zobj,eptr) == REDIS_OK); - redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK); + /* Remove and re-insert when score changed. */ + if (score != curscore) { + zobj->ptr = zzlDelete(zobj->ptr,eptr); + zobj->ptr = zzlInsert(zobj->ptr,ele,score); + + signalModifiedKey(c->db,key); + server.dirty++; + } + } else { + /* Optimize: check if the element is too large or the list + * becomes too long *before* executing zzlInsert. */ + zobj->ptr = zzlInsert(zobj->ptr,ele,score); + if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) + zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); + if (sdslen(ele->ptr) > server.zset_max_ziplist_value) + zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); signalModifiedKey(c->db,key); server.dirty++; + if (!incr) added++; } + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplistNode *znode; + dictEntry *de; - if (incr) /* ZINCRBY */ - addReplyDouble(c,score); - else /* ZADD */ - addReply(c,shared.czero); - } else { - /* Optimize: check if the element is too large or the list becomes - * too long *before* executing zzlInsert. */ - redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK); - if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) - zsConvert(zobj,REDIS_ENCODING_RAW); - if (sdslen(ele->ptr) > server.zset_max_ziplist_value) - zsConvert(zobj,REDIS_ENCODING_RAW); - - signalModifiedKey(c->db,key); - server.dirty++; - - if (incr) /* ZINCRBY */ - addReplyDouble(c,score); - else /* ZADD */ - addReply(c,shared.cone); - } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { - zset *zs = zobj->ptr; - zskiplistNode *znode; - dictEntry *de; - - ele = c->argv[3] = tryObjectEncoding(c->argv[3]); - de = dictFind(zs->dict,ele); - if (de != NULL) { - curobj = dictGetEntryKey(de); - curscore = *(double*)dictGetEntryVal(de); - - if (incr) { - score += curscore; - if (isnan(score)) { - addReplyError(c,nanerr); - /* Don't need to check if the sorted set is empty, because - * we know it has at least one element. */ - return; + ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]); + de = dictFind(zs->dict,ele); + if (de != NULL) { + curobj = dictGetKey(de); + curscore = *(double*)dictGetVal(de); + + if (incr) { + score += curscore; + if (isnan(score)) { + addReplyError(c,nanerr); + /* Don't need to check if the sorted set is empty + * because we know it has at least one element. */ + zfree(scores); + return; + } } - } - /* Remove and re-insert when score changed. We can safely delete - * the key object from the skiplist, since the dictionary still has - * a reference to it. */ - if (score != curscore) { - redisAssert(zslDelete(zs->zsl,curscore,curobj)); - znode = zslInsert(zs->zsl,score,curobj); - incrRefCount(curobj); /* Re-inserted in skiplist. */ - dictGetEntryVal(de) = &znode->score; /* Update score ptr. */ + /* Remove and re-insert when score changed. We can safely + * delete the key object from the skiplist, since the + * dictionary still has a reference to it. */ + if (score != curscore) { + redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj)); + znode = zslInsert(zs->zsl,score,curobj); + incrRefCount(curobj); /* Re-inserted in skiplist. */ + dictGetVal(de) = &znode->score; /* Update score ptr. */ + + signalModifiedKey(c->db,key); + server.dirty++; + } + } else { + znode = zslInsert(zs->zsl,score,ele); + incrRefCount(ele); /* Inserted in skiplist. */ + redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK); + incrRefCount(ele); /* Added to dictionary. */ signalModifiedKey(c->db,key); server.dirty++; + if (!incr) added++; } - - if (incr) /* ZINCRBY */ - addReplyDouble(c,score); - else /* ZADD */ - addReply(c,shared.czero); } else { - znode = zslInsert(zs->zsl,score,ele); - incrRefCount(ele); /* Inserted in skiplist. */ - redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); - incrRefCount(ele); /* Added to dictionary. */ - - signalModifiedKey(c->db,key); - server.dirty++; - - if (incr) /* ZINCRBY */ - addReplyDouble(c,score); - else /* ZADD */ - addReply(c,shared.cone); + redisPanic("Unknown sorted set encoding"); } - } else { - redisPanic("Unknown sorted set encoding"); } + zfree(scores); + if (incr) /* ZINCRBY */ + addReplyDouble(c,score); + else /* ZADD */ + addReplyLongLong(c,added); } void zaddCommand(redisClient *c) { @@ -958,8 +962,8 @@ void zincrbyCommand(redisClient *c) { void zremCommand(redisClient *c) { robj *key = c->argv[1]; - robj *ele = c->argv[2]; robj *zobj; + int deleted = 0, j; if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || checkType(c,zobj,REDIS_ZSET)) return; @@ -967,39 +971,48 @@ void zremCommand(redisClient *c) { if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *eptr; - if ((eptr = zzlFind(zobj,ele,NULL)) != NULL) { - redisAssert(zzlDelete(zobj,eptr) == REDIS_OK); - if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key); - } else { - addReply(c,shared.czero); - return; + for (j = 2; j < c->argc; j++) { + if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) { + deleted++; + zobj->ptr = zzlDelete(zobj->ptr,eptr); + if (zzlLength(zobj->ptr) == 0) { + dbDelete(c->db,key); + break; + } + } } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; dictEntry *de; double score; - de = dictFind(zs->dict,ele); - if (de != NULL) { - /* Delete from the skiplist */ - score = *(double*)dictGetEntryVal(de); - redisAssert(zslDelete(zs->zsl,score,ele)); - - /* Delete from the hash table */ - dictDelete(zs->dict,ele); - if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) dbDelete(c->db,key); - } else { - addReply(c,shared.czero); - return; + for (j = 2; j < c->argc; j++) { + de = dictFind(zs->dict,c->argv[j]); + if (de != NULL) { + deleted++; + + /* Delete from the skiplist */ + score = *(double*)dictGetVal(de); + redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j])); + + /* Delete from the hash table */ + dictDelete(zs->dict,c->argv[j]); + if (htNeedsResize(zs->dict)) dictResize(zs->dict); + if (dictSize(zs->dict) == 0) { + dbDelete(c->db,key); + break; + } + } } } else { redisPanic("Unknown sorted set encoding"); } - signalModifiedKey(c->db,key); - server.dirty++; - addReply(c,shared.cone); + if (deleted) { + signalModifiedKey(c->db,key); + server.dirty += deleted; + } + addReplyLongLong(c,deleted); } void zremrangebyscoreCommand(redisClient *c) { @@ -1010,7 +1023,7 @@ void zremrangebyscoreCommand(redisClient *c) { /* Parse the range arguments. */ if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) { - addReplyError(c,"min or max is not a double"); + addReplyError(c,"min or max is not a float"); return; } @@ -1018,8 +1031,9 @@ void zremrangebyscoreCommand(redisClient *c) { checkType(c,zobj,REDIS_ZSET)) return; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { - deleted = zzlDeleteRangeByScore(zobj,range); - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted); + if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key); + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); @@ -1048,7 +1062,7 @@ void zremrangebyrankCommand(redisClient *c) { checkType(c,zobj,REDIS_ZSET)) return; /* Sanitize indexes. */ - llen = zsLength(zobj); + llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; @@ -1063,8 +1077,9 @@ void zremrangebyrankCommand(redisClient *c) { if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { /* Correct for 1-based rank. */ - deleted = zzlDeleteRangeByRank(zobj,start+1,end+1); - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted); + if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key); + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; /* Correct for 1-based rank. */ @@ -1164,7 +1179,7 @@ void zuiInitIterator(zsetopsrc *op) { it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr); redisAssert(it->zl.sptr != NULL); } - } else if (op->encoding == REDIS_ENCODING_RAW) { + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { it->sl.zs = op->subject->ptr; it->sl.node = it->sl.zs->zsl->header->level[0].forward; } else { @@ -1192,7 +1207,7 @@ void zuiClearIterator(zsetopsrc *op) { iterzset *it = &op->iter.zset; if (op->encoding == REDIS_ENCODING_ZIPLIST) { REDIS_NOTUSED(it); /* skip */ - } else if (op->encoding == REDIS_ENCODING_RAW) { + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { REDIS_NOTUSED(it); /* skip */ } else { redisPanic("Unknown sorted set encoding"); @@ -1219,7 +1234,7 @@ int zuiLength(zsetopsrc *op) { iterzset *it = &op->iter.zset; if (op->encoding == REDIS_ENCODING_ZIPLIST) { return zzlLength(it->zl.zl); - } else if (op->encoding == REDIS_ENCODING_RAW) { + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { return it->sl.zs->zsl->length; } else { redisPanic("Unknown sorted set encoding"); @@ -1239,13 +1254,16 @@ int zuiNext(zsetopsrc *op, zsetopval *val) { if (val->flags & OPVAL_DIRTY_ROBJ) decrRefCount(val->ele); - bzero(val,sizeof(zsetopval)); + memset(val,0,sizeof(zsetopval)); if (op->type == REDIS_SET) { iterset *it = &op->iter.set; if (op->encoding == REDIS_ENCODING_INTSET) { - if (!intsetGet(it->is.is,it->is.ii,&val->ell)) + int64_t ell; + + if (!intsetGet(it->is.is,it->is.ii,&ell)) return 0; + val->ell = ell; val->score = 1.0; /* Move to next element. */ @@ -1253,7 +1271,7 @@ int zuiNext(zsetopsrc *op, zsetopval *val) { } else if (op->encoding == REDIS_ENCODING_HT) { if (it->ht.de == NULL) return 0; - val->ele = dictGetEntryKey(it->ht.de); + val->ele = dictGetKey(it->ht.de); val->score = 1.0; /* Move to next element. */ @@ -1272,7 +1290,7 @@ int zuiNext(zsetopsrc *op, zsetopval *val) { /* Move to next element. */ zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr); - } else if (op->encoding == REDIS_ENCODING_RAW) { + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { if (it->sl.node == NULL) return 0; val->ele = it->sl.node->obj; @@ -1378,16 +1396,16 @@ int zuiFind(zsetopsrc *op, zsetopval *val, double *score) { zuiObjectFromValue(val); if (op->encoding == REDIS_ENCODING_ZIPLIST) { - if (zzlFind(op->subject,val->ele,score) != NULL) { + if (zzlFind(it->zl.zl,val->ele,score) != NULL) { /* Score is already set by zzlFind. */ return 1; } else { return 0; } - } else if (op->encoding == REDIS_ENCODING_RAW) { + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { dictEntry *de; if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) { - *score = *(double*)dictGetEntryVal(de); + *score = *(double*)dictGetVal(de); return 1; } else { return 0; @@ -1407,7 +1425,7 @@ int zuiCompareByCardinality(const void *s1, const void *s2) { #define REDIS_AGGR_SUM 1 #define REDIS_AGGR_MIN 2 #define REDIS_AGGR_MAX 3 -#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e)) +#define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e)) inline static void zunionInterAggregate(double *target, double val, int aggregate) { if (aggregate == REDIS_AGGR_SUM) { @@ -1427,18 +1445,22 @@ inline static void zunionInterAggregate(double *target, double val, int aggregat } void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { - int i, j, setnum; + int i, j; + long setnum; int aggregate = REDIS_AGGR_SUM; zsetopsrc *src; zsetopval zval; robj *tmp; + unsigned int maxelelen = 0; robj *dstobj; zset *dstzset; zskiplistNode *znode; int touched = 0; /* expect setnum input keys to be given */ - setnum = atoi(c->argv[2]->ptr); + if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK)) + return; + if (setnum < 1) { addReplyError(c, "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); @@ -1482,7 +1504,7 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { j++; remaining--; for (i = 0; i < setnum; i++, j++, remaining--) { if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight, - "weight value is not a double") != REDIS_OK) + "weight value is not a float") != REDIS_OK) { zfree(src); return; @@ -1519,6 +1541,7 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dstobj = createZsetObject(); dstzset = dstobj->ptr; + memset(&zval, 0, sizeof(zval)); if (op == REDIS_OP_INTER) { /* Skip everything if the smallest input is empty. */ @@ -1529,8 +1552,15 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { double score, value; score = src[0].weight * zval.score; + if (isnan(score)) score = 0; + for (j = 1; j < setnum; j++) { - if (zuiFind(&src[j],&zval,&value)) { + /* It is not safe to access the zset we are + * iterating, so explicitly check for equal object. */ + if (src[j].subject == src[0].subject) { + value = zval.score*src[j].weight; + zunionInterAggregate(&score,value,aggregate); + } else if (zuiFind(&src[j],&zval,&value)) { value *= src[j].weight; zunionInterAggregate(&score,value,aggregate); } else { @@ -1545,12 +1575,16 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { incrRefCount(tmp); /* added to skiplist */ dictAdd(dstzset->dict,tmp,&znode->score); incrRefCount(tmp); /* added to dictionary */ + + if (tmp->encoding == REDIS_ENCODING_RAW) + if (sdslen(tmp->ptr) > maxelelen) + maxelelen = sdslen(tmp->ptr); } } } } else if (op == REDIS_OP_UNION) { for (i = 0; i < setnum; i++) { - if (zuiLength(&src[0]) == 0) + if (zuiLength(&src[i]) == 0) continue; while (zuiNext(&src[i],&zval)) { @@ -1562,11 +1596,17 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { /* Initialize score */ score = src[i].weight * zval.score; + if (isnan(score)) score = 0; /* Because the inputs are sorted by size, it's only possible * for sets at larger indices to hold this element. */ for (j = (i+1); j < setnum; j++) { - if (zuiFind(&src[j],&zval,&value)) { + /* It is not safe to access the zset we are + * iterating, so explicitly check for equal object. */ + if(src[j].subject == src[i].subject) { + value = zval.score*src[j].weight; + zunionInterAggregate(&score,value,aggregate); + } else if (zuiFind(&src[j],&zval,&value)) { value *= src[j].weight; zunionInterAggregate(&score,value,aggregate); } @@ -1577,6 +1617,10 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { incrRefCount(zval.ele); /* added to skiplist */ dictAdd(dstzset->dict,tmp,&znode->score); incrRefCount(zval.ele); /* added to dictionary */ + + if (tmp->encoding == REDIS_ENCODING_RAW) + if (sdslen(tmp->ptr) > maxelelen) + maxelelen = sdslen(tmp->ptr); } } } else { @@ -1592,13 +1636,18 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { server.dirty++; } if (dstzset->zsl->length) { + /* Convert to ziplist when in limits. */ + if (dstzset->zsl->length <= server.zset_max_ziplist_entries && + maxelelen <= server.zset_max_ziplist_value) + zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST); + dbAdd(c->db,dstkey,dstobj); - addReplyLongLong(c, dstzset->zsl->length); + addReplyLongLong(c,zsetLength(dstobj)); if (!touched) signalModifiedKey(c->db,dstkey); server.dirty++; } else { decrRefCount(dstobj); - addReply(c, shared.czero); + addReply(c,shared.czero); } zfree(src); } @@ -1634,7 +1683,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) { || checkType(c,zobj,REDIS_ZSET)) return; /* Sanitize indexes. */ - llen = zsLength(zobj); + llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; @@ -1663,12 +1712,12 @@ void zrangeGenericCommand(redisClient *c, int reverse) { else eptr = ziplistIndex(zl,2*start); - redisAssert(eptr != NULL); + redisAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); while (rangelen--) { - redisAssert(eptr != NULL && sptr != NULL); - redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong)); + redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL); + redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); if (vstr == NULL) addReplyBulkLongLong(c,vlong); else @@ -1683,7 +1732,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) { zzlNext(zl,&eptr,&sptr); } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; @@ -1701,7 +1750,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) { } while(rangelen--) { - redisAssert(ln != NULL); + redisAssertWithInfo(c,zobj,ln != NULL); ele = ln->obj; addReplyBulk(c,ele); if (withscores) @@ -1721,13 +1770,12 @@ void zrevrangeCommand(redisClient *c) { zrangeGenericCommand(c,1); } -/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT. - * If "justcount", only the number of elements in the range is returned. */ -void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { +/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */ +void genericZrangebyscoreCommand(redisClient *c, int reverse) { zrangespec range; robj *key = c->argv[1]; - robj *emptyreply, *zobj; - int offset = 0, limit = -1; + robj *zobj; + long offset = 0, limit = -1; int withscores = 0; unsigned long rangelen = 0; void *replylen = NULL; @@ -1743,7 +1791,7 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { } if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) { - addReplyError(c,"min or max is not a double"); + addReplyError(c,"min or max is not a float"); return; } @@ -1758,8 +1806,8 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { pos++; remaining--; withscores = 1; } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) { - offset = atoi(c->argv[pos+1]->ptr); - limit = atoi(c->argv[pos+2]->ptr); + if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != REDIS_OK) || + (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != REDIS_OK)) return; pos += 3; remaining -= 3; } else { addReply(c,shared.syntaxerr); @@ -1769,8 +1817,7 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { } /* Ok, lookup the key and get the range */ - emptyreply = justcount ? shared.czero : shared.emptymultibulk; - if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL || + if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL || checkType(c,zobj,REDIS_ZSET)) return; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { @@ -1782,34 +1829,36 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { double score; /* If reversed, get the last node in range as starting point. */ - if (reverse) - eptr = zzlLastInRange(zobj,range); - else - eptr = zzlFirstInRange(zobj,range); + if (reverse) { + eptr = zzlLastInRange(zl,range); + } else { + eptr = zzlFirstInRange(zl,range); + } /* No "first" element in the specified interval. */ if (eptr == NULL) { - addReply(c,emptyreply); + addReply(c, shared.emptymultibulk); return; } /* Get score pointer for the first element. */ - redisAssert(eptr != NULL); + redisAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); /* We don't know in advance how many matching elements there are in the * list, so we push this object that will represent the multi-bulk * length in the output buffer, and will "fix" it later */ - if (!justcount) - replylen = addDeferredMultiBulkLength(c); + replylen = addDeferredMultiBulkLength(c); /* If there is an offset, just traverse the number of elements without * checking the score because that is done in the next loop. */ - while (eptr && offset--) - if (reverse) + while (eptr && offset--) { + if (reverse) { zzlPrev(zl,&eptr,&sptr); - else + } else { zzlNext(zl,&eptr,&sptr); + } + } while (eptr && limit--) { score = zzlGetScore(sptr); @@ -1821,52 +1870,59 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { if (!zslValueLteMax(score,&range)) break; } - /* Do our magic */ + /* We know the element exists, so ziplistGet should always succeed */ + redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); + rangelen++; - if (!justcount) { - redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong)); - if (vstr == NULL) - addReplyBulkLongLong(c,vlong); - else - addReplyBulkCBuffer(c,vstr,vlen); - - if (withscores) - addReplyDouble(c,score); + if (vstr == NULL) { + addReplyBulkLongLong(c,vlong); + } else { + addReplyBulkCBuffer(c,vstr,vlen); + } + + if (withscores) { + addReplyDouble(c,score); } /* Move to next node */ - if (reverse) + if (reverse) { zzlPrev(zl,&eptr,&sptr); - else + } else { zzlNext(zl,&eptr,&sptr); + } } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; /* If reversed, get the last node in range as starting point. */ - if (reverse) + if (reverse) { ln = zslLastInRange(zsl,range); - else + } else { ln = zslFirstInRange(zsl,range); + } /* No "first" element in the specified interval. */ if (ln == NULL) { - addReply(c,emptyreply); + addReply(c, shared.emptymultibulk); return; } /* We don't know in advance how many matching elements there are in the * list, so we push this object that will represent the multi-bulk * length in the output buffer, and will "fix" it later */ - if (!justcount) - replylen = addDeferredMultiBulkLength(c); + replylen = addDeferredMultiBulkLength(c); /* If there is an offset, just traverse the number of elements without * checking the score because that is done in the next loop. */ - while (ln && offset--) - ln = reverse ? ln->backward : ln->level[0].forward; + while (ln && offset--) { + if (reverse) { + ln = ln->backward; + } else { + ln = ln->level[0].forward; + } + } while (ln && limit--) { /* Abort when the node is no longer in range. */ @@ -1876,39 +1932,114 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { if (!zslValueLteMax(ln->score,&range)) break; } - /* Do our magic */ rangelen++; - if (!justcount) { - addReplyBulk(c,ln->obj); - if (withscores) - addReplyDouble(c,ln->score); + addReplyBulk(c,ln->obj); + + if (withscores) { + addReplyDouble(c,ln->score); } /* Move to next node */ - ln = reverse ? ln->backward : ln->level[0].forward; + if (reverse) { + ln = ln->backward; + } else { + ln = ln->level[0].forward; + } } } else { redisPanic("Unknown sorted set encoding"); } - if (justcount) { - addReplyLongLong(c,(long)rangelen); - } else { - if (withscores) rangelen *= 2; - setDeferredMultiBulkLength(c,replylen,rangelen); + if (withscores) { + rangelen *= 2; } + + setDeferredMultiBulkLength(c, replylen, rangelen); } void zrangebyscoreCommand(redisClient *c) { - genericZrangebyscoreCommand(c,0,0); + genericZrangebyscoreCommand(c,0); } void zrevrangebyscoreCommand(redisClient *c) { - genericZrangebyscoreCommand(c,1,0); + genericZrangebyscoreCommand(c,1); } void zcountCommand(redisClient *c) { - genericZrangebyscoreCommand(c,0,1); + robj *key = c->argv[1]; + robj *zobj; + zrangespec range; + int count = 0; + + /* Parse the range arguments */ + if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) { + addReplyError(c,"min or max is not a float"); + return; + } + + /* Lookup the sorted set */ + if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL || + checkType(c, zobj, REDIS_ZSET)) return; + + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + double score; + + /* Use the first element in range as the starting point */ + eptr = zzlFirstInRange(zl,range); + + /* No "first" element */ + if (eptr == NULL) { + addReply(c, shared.czero); + return; + } + + /* First element is in range */ + sptr = ziplistNext(zl,eptr); + score = zzlGetScore(sptr); + redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range)); + + /* Iterate over elements in range */ + while (eptr) { + score = zzlGetScore(sptr); + + /* Abort when the node is no longer in range. */ + if (!zslValueLteMax(score,&range)) { + break; + } else { + count++; + zzlNext(zl,&eptr,&sptr); + } + } + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *zn; + unsigned long rank; + + /* Find first element in range */ + zn = zslFirstInRange(zsl, range); + + /* Use rank of first element, if any, to determine preliminary count */ + if (zn != NULL) { + rank = zslGetRank(zsl, zn->score, zn->obj); + count = (zsl->length - (rank - 1)); + + /* Find last element in range */ + zn = zslLastInRange(zsl, range); + + /* Use rank of last element, if any, to determine the actual count */ + if (zn != NULL) { + rank = zslGetRank(zsl, zn->score, zn->obj); + count -= (zsl->length - rank); + } + } + } else { + redisPanic("Unknown sorted set encoding"); + } + + addReplyLongLong(c, count); } void zcardCommand(redisClient *c) { @@ -1918,7 +2049,7 @@ void zcardCommand(redisClient *c) { if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL || checkType(c,zobj,REDIS_ZSET)) return; - addReplyLongLong(c,zsLength(zobj)); + addReplyLongLong(c,zsetLength(zobj)); } void zscoreCommand(redisClient *c) { @@ -1930,18 +2061,18 @@ void zscoreCommand(redisClient *c) { checkType(c,zobj,REDIS_ZSET)) return; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { - if (zzlFind(zobj,c->argv[2],&score) != NULL) + if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL) addReplyDouble(c,score); else addReply(c,shared.nullbulk); - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; dictEntry *de; c->argv[2] = tryObjectEncoding(c->argv[2]); de = dictFind(zs->dict,c->argv[2]); if (de != NULL) { - score = *(double*)dictGetEntryVal(de); + score = *(double*)dictGetVal(de); addReplyDouble(c,score); } else { addReply(c,shared.nullbulk); @@ -1960,17 +2091,17 @@ void zrankGenericCommand(redisClient *c, int reverse) { if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL || checkType(c,zobj,REDIS_ZSET)) return; - llen = zsLength(zobj); + llen = zsetLength(zobj); - redisAssert(ele->encoding == REDIS_ENCODING_RAW); + redisAssertWithInfo(c,ele,ele->encoding == REDIS_ENCODING_RAW); if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; eptr = ziplistIndex(zl,0); - redisAssert(eptr != NULL); + redisAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); - redisAssert(sptr != NULL); + redisAssertWithInfo(c,zobj,sptr != NULL); rank = 1; while(eptr != NULL) { @@ -1988,7 +2119,7 @@ void zrankGenericCommand(redisClient *c, int reverse) { } else { addReply(c,shared.nullbulk); } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; dictEntry *de; @@ -1997,9 +2128,9 @@ void zrankGenericCommand(redisClient *c, int reverse) { ele = c->argv[2] = tryObjectEncoding(c->argv[2]); de = dictFind(zs->dict,ele); if (de != NULL) { - score = *(double*)dictGetEntryVal(de); + score = *(double*)dictGetVal(de); rank = zslGetRank(zsl,score,ele); - redisAssert(rank); /* Existing elements always have a rank. */ + redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */ if (reverse) addReplyLongLong(c,llen-rank); else