X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/02e600653216cebc0746bfbb86b353667a843591..38c06fa037dbb49b6e7c28df5b1fb566d504ba61:/src/t_zset.c diff --git a/src/t_zset.c b/src/t_zset.c index 7ce60349..ccf9962a 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -174,12 +174,6 @@ int zslDelete(zskiplist *zsl, double score, robj *obj) { return 0; /* not found */ } -/* Struct to hold a inclusive/exclusive range spec. */ -typedef struct { - double min, max; - int minex, maxex; /* are min or max exclusive? */ -} zrangespec; - static int zslValueGteMin(double value, zrangespec *spec) { return spec->minex ? (value > spec->min) : (value >= spec->min); } @@ -584,7 +578,7 @@ unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) { ele = getDecodedObject(ele); while (eptr != NULL) { sptr = ziplistNext(zl,eptr); - redisAssert(sptr != NULL); + redisAssertWithInfo(NULL,ele,sptr != NULL); if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) { /* Matching element, pull out score. */ @@ -618,7 +612,7 @@ unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, do int scorelen; size_t offset; - redisAssert(ele->encoding == REDIS_ENCODING_RAW); + redisAssertWithInfo(NULL,ele,ele->encoding == REDIS_ENCODING_RAW); scorelen = d2string(scorebuf,sizeof(scorebuf),score); if (eptr == NULL) { zl = ziplistPush(zl,ele->ptr,sdslen(ele->ptr),ZIPLIST_TAIL); @@ -630,7 +624,7 @@ unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, do eptr = zl+offset; /* Insert score after the element. */ - redisAssert((sptr = ziplistNext(zl,eptr)) != NULL); + redisAssertWithInfo(NULL,ele,(sptr = ziplistNext(zl,eptr)) != NULL); zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen); } @@ -646,7 +640,7 @@ unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) { ele = getDecodedObject(ele); while (eptr != NULL) { sptr = ziplistNext(zl,eptr); - redisAssert(sptr != NULL); + redisAssertWithInfo(NULL,ele,sptr != NULL); s = zzlGetScore(sptr); if (s > score) { @@ -751,13 +745,13 @@ void zsetConvert(robj *zobj, int encoding) { zs->zsl = zslCreate(); eptr = ziplistIndex(zl,0); - redisAssert(eptr != NULL); + redisAssertWithInfo(NULL,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); - redisAssert(sptr != NULL); + redisAssertWithInfo(NULL,zobj,sptr != NULL); while (eptr != NULL) { score = zzlGetScore(sptr); - redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong)); + redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); if (vstr == NULL) ele = createStringObjectFromLongLong(vlong); else @@ -765,7 +759,7 @@ void zsetConvert(robj *zobj, int encoding) { /* Has incremented refcount since it was just created. */ node = zslInsert(zs->zsl,score,ele); - redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); + redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK); incrRefCount(ele); /* Added to dictionary. */ zzlNext(zl,&eptr,&sptr); } @@ -816,11 +810,29 @@ void zaddGenericCommand(redisClient *c, int incr) { robj *ele; robj *zobj; robj *curobj; - double score, curscore = 0.0; + double score = 0, *scores, curscore = 0.0; + int j, elements = (c->argc-2)/2; + int added = 0; - if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK) + if (c->argc % 2) { + addReply(c,shared.syntaxerr); return; + } + /* Start parsing all the scores, we need to emit any syntax error + * before executing additions to the sorted set, as the command should + * either execute fully or nothing at all. */ + scores = zmalloc(sizeof(double)*elements); + for (j = 0; j < elements; j++) { + if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL) + != REDIS_OK) + { + zfree(scores); + return; + } + } + + /* Lookup the key and create the sorted set if does not exist. */ zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { if (server.zset_max_ziplist_entries == 0 || @@ -834,111 +846,105 @@ void zaddGenericCommand(redisClient *c, int incr) { } else { if (zobj->type != REDIS_ZSET) { addReply(c,shared.wrongtypeerr); + zfree(scores); return; } } - if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { - unsigned char *eptr; + for (j = 0; j < elements; j++) { + score = scores[j]; - /* Prefer non-encoded element when dealing with ziplists. */ - ele = c->argv[3]; - if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { - if (incr) { - score += curscore; - if (isnan(score)) { - addReplyError(c,nanerr); - /* Don't need to check if the sorted set is empty, because - * we know it has at least one element. */ - return; + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *eptr; + + /* Prefer non-encoded element when dealing with ziplists. */ + ele = c->argv[3+j*2]; + if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { + if (incr) { + score += curscore; + if (isnan(score)) { + addReplyError(c,nanerr); + /* Don't need to check if the sorted set is empty + * because we know it has at least one element. */ + zfree(scores); + return; + } } - } - /* Remove and re-insert when score changed. */ - if (score != curscore) { - zobj->ptr = zzlDelete(zobj->ptr,eptr); + /* Remove and re-insert when score changed. */ + if (score != curscore) { + zobj->ptr = zzlDelete(zobj->ptr,eptr); + zobj->ptr = zzlInsert(zobj->ptr,ele,score); + + signalModifiedKey(c->db,key); + server.dirty++; + } + } else { + /* Optimize: check if the element is too large or the list + * becomes too long *before* executing zzlInsert. */ zobj->ptr = zzlInsert(zobj->ptr,ele,score); + if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) + zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); + if (sdslen(ele->ptr) > server.zset_max_ziplist_value) + zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); signalModifiedKey(c->db,key); server.dirty++; + if (!incr) added++; } + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplistNode *znode; + dictEntry *de; - if (incr) /* ZINCRBY */ - addReplyDouble(c,score); - else /* ZADD */ - addReply(c,shared.czero); - } else { - /* Optimize: check if the element is too large or the list becomes - * too long *before* executing zzlInsert. */ - zobj->ptr = zzlInsert(zobj->ptr,ele,score); - if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) - zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); - if (sdslen(ele->ptr) > server.zset_max_ziplist_value) - zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); - - signalModifiedKey(c->db,key); - server.dirty++; - - if (incr) /* ZINCRBY */ - addReplyDouble(c,score); - else /* ZADD */ - addReply(c,shared.cone); - } - } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { - zset *zs = zobj->ptr; - zskiplistNode *znode; - dictEntry *de; - - ele = c->argv[3] = tryObjectEncoding(c->argv[3]); - de = dictFind(zs->dict,ele); - if (de != NULL) { - curobj = dictGetEntryKey(de); - curscore = *(double*)dictGetEntryVal(de); - - if (incr) { - score += curscore; - if (isnan(score)) { - addReplyError(c,nanerr); - /* Don't need to check if the sorted set is empty, because - * we know it has at least one element. */ - return; + ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]); + de = dictFind(zs->dict,ele); + if (de != NULL) { + curobj = dictGetKey(de); + curscore = *(double*)dictGetVal(de); + + if (incr) { + score += curscore; + if (isnan(score)) { + addReplyError(c,nanerr); + /* Don't need to check if the sorted set is empty + * because we know it has at least one element. */ + zfree(scores); + return; + } } - } - /* Remove and re-insert when score changed. We can safely delete - * the key object from the skiplist, since the dictionary still has - * a reference to it. */ - if (score != curscore) { - redisAssert(zslDelete(zs->zsl,curscore,curobj)); - znode = zslInsert(zs->zsl,score,curobj); - incrRefCount(curobj); /* Re-inserted in skiplist. */ - dictGetEntryVal(de) = &znode->score; /* Update score ptr. */ + /* Remove and re-insert when score changed. We can safely + * delete the key object from the skiplist, since the + * dictionary still has a reference to it. */ + if (score != curscore) { + redisAssertWithInfo(c,curobj,zslDelete(zs->zsl,curscore,curobj)); + znode = zslInsert(zs->zsl,score,curobj); + incrRefCount(curobj); /* Re-inserted in skiplist. */ + dictGetVal(de) = &znode->score; /* Update score ptr. */ + + signalModifiedKey(c->db,key); + server.dirty++; + } + } else { + znode = zslInsert(zs->zsl,score,ele); + incrRefCount(ele); /* Inserted in skiplist. */ + redisAssertWithInfo(c,NULL,dictAdd(zs->dict,ele,&znode->score) == DICT_OK); + incrRefCount(ele); /* Added to dictionary. */ signalModifiedKey(c->db,key); server.dirty++; + if (!incr) added++; } - - if (incr) /* ZINCRBY */ - addReplyDouble(c,score); - else /* ZADD */ - addReply(c,shared.czero); } else { - znode = zslInsert(zs->zsl,score,ele); - incrRefCount(ele); /* Inserted in skiplist. */ - redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); - incrRefCount(ele); /* Added to dictionary. */ - - signalModifiedKey(c->db,key); - server.dirty++; - - if (incr) /* ZINCRBY */ - addReplyDouble(c,score); - else /* ZADD */ - addReply(c,shared.cone); + redisPanic("Unknown sorted set encoding"); } - } else { - redisPanic("Unknown sorted set encoding"); } + zfree(scores); + if (incr) /* ZINCRBY */ + addReplyDouble(c,score); + else /* ZADD */ + addReplyLongLong(c,added); } void zaddCommand(redisClient *c) { @@ -951,8 +957,8 @@ void zincrbyCommand(redisClient *c) { void zremCommand(redisClient *c) { robj *key = c->argv[1]; - robj *ele = c->argv[2]; robj *zobj; + int deleted = 0, j; if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL || checkType(c,zobj,REDIS_ZSET)) return; @@ -960,39 +966,48 @@ void zremCommand(redisClient *c) { if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *eptr; - if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) { - zobj->ptr = zzlDelete(zobj->ptr,eptr); - if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key); - } else { - addReply(c,shared.czero); - return; + for (j = 2; j < c->argc; j++) { + if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) { + deleted++; + zobj->ptr = zzlDelete(zobj->ptr,eptr); + if (zzlLength(zobj->ptr) == 0) { + dbDelete(c->db,key); + break; + } + } } } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; dictEntry *de; double score; - de = dictFind(zs->dict,ele); - if (de != NULL) { - /* Delete from the skiplist */ - score = *(double*)dictGetEntryVal(de); - redisAssert(zslDelete(zs->zsl,score,ele)); - - /* Delete from the hash table */ - dictDelete(zs->dict,ele); - if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) dbDelete(c->db,key); - } else { - addReply(c,shared.czero); - return; + for (j = 2; j < c->argc; j++) { + de = dictFind(zs->dict,c->argv[j]); + if (de != NULL) { + deleted++; + + /* Delete from the skiplist */ + score = *(double*)dictGetVal(de); + redisAssertWithInfo(c,c->argv[j],zslDelete(zs->zsl,score,c->argv[j])); + + /* Delete from the hash table */ + dictDelete(zs->dict,c->argv[j]); + if (htNeedsResize(zs->dict)) dictResize(zs->dict); + if (dictSize(zs->dict) == 0) { + dbDelete(c->db,key); + break; + } + } } } else { redisPanic("Unknown sorted set encoding"); } - signalModifiedKey(c->db,key); - server.dirty++; - addReply(c,shared.cone); + if (deleted) { + signalModifiedKey(c->db,key); + server.dirty += deleted; + } + addReplyLongLong(c,deleted); } void zremrangebyscoreCommand(redisClient *c) { @@ -1003,7 +1018,7 @@ void zremrangebyscoreCommand(redisClient *c) { /* Parse the range arguments. */ if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) { - addReplyError(c,"min or max is not a double"); + addReplyError(c,"min or max is not a float"); return; } @@ -1239,7 +1254,7 @@ int zuiNext(zsetopsrc *op, zsetopval *val) { if (op->type == REDIS_SET) { iterset *it = &op->iter.set; if (op->encoding == REDIS_ENCODING_INTSET) { - if (!intsetGet(it->is.is,it->is.ii,&val->ell)) + if (!intsetGet(it->is.is,it->is.ii,(int64_t*)&val->ell)) return 0; val->score = 1.0; @@ -1248,7 +1263,7 @@ int zuiNext(zsetopsrc *op, zsetopval *val) { } else if (op->encoding == REDIS_ENCODING_HT) { if (it->ht.de == NULL) return 0; - val->ele = dictGetEntryKey(it->ht.de); + val->ele = dictGetKey(it->ht.de); val->score = 1.0; /* Move to next element. */ @@ -1382,7 +1397,7 @@ int zuiFind(zsetopsrc *op, zsetopval *val, double *score) { } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { dictEntry *de; if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) { - *score = *(double*)dictGetEntryVal(de); + *score = *(double*)dictGetVal(de); return 1; } else { return 0; @@ -1402,7 +1417,7 @@ int zuiCompareByCardinality(const void *s1, const void *s2) { #define REDIS_AGGR_SUM 1 #define REDIS_AGGR_MIN 2 #define REDIS_AGGR_MAX 3 -#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e)) +#define zunionInterDictValue(_e) (dictGetVal(_e) == NULL ? 1.0 : *(double*)dictGetVal(_e)) inline static void zunionInterAggregate(double *target, double val, int aggregate) { if (aggregate == REDIS_AGGR_SUM) { @@ -1478,7 +1493,7 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { j++; remaining--; for (i = 0; i < setnum; i++, j++, remaining--) { if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight, - "weight value is not a double") != REDIS_OK) + "weight value is not a float") != REDIS_OK) { zfree(src); return; @@ -1527,7 +1542,12 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { score = src[0].weight * zval.score; for (j = 1; j < setnum; j++) { - if (zuiFind(&src[j],&zval,&value)) { + /* It is not safe to access the zset we are + * iterating, so explicitly check for equal object. */ + if (src[j].subject == src[0].subject) { + value = zval.score*src[j].weight; + zunionInterAggregate(&score,value,aggregate); + } else if (zuiFind(&src[j],&zval,&value)) { value *= src[j].weight; zunionInterAggregate(&score,value,aggregate); } else { @@ -1551,7 +1571,7 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { } } else if (op == REDIS_OP_UNION) { for (i = 0; i < setnum; i++) { - if (zuiLength(&src[0]) == 0) + if (zuiLength(&src[i]) == 0) continue; while (zuiNext(&src[i],&zval)) { @@ -1567,7 +1587,12 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { /* Because the inputs are sorted by size, it's only possible * for sets at larger indices to hold this element. */ for (j = (i+1); j < setnum; j++) { - if (zuiFind(&src[j],&zval,&value)) { + /* It is not safe to access the zset we are + * iterating, so explicitly check for equal object. */ + if(src[j].subject == src[i].subject) { + value = zval.score*src[j].weight; + zunionInterAggregate(&score,value,aggregate); + } else if (zuiFind(&src[j],&zval,&value)) { value *= src[j].weight; zunionInterAggregate(&score,value,aggregate); } @@ -1673,12 +1698,12 @@ void zrangeGenericCommand(redisClient *c, int reverse) { else eptr = ziplistIndex(zl,2*start); - redisAssert(eptr != NULL); + redisAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); while (rangelen--) { - redisAssert(eptr != NULL && sptr != NULL); - redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong)); + redisAssertWithInfo(c,zobj,eptr != NULL && sptr != NULL); + redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); if (vstr == NULL) addReplyBulkLongLong(c,vlong); else @@ -1711,7 +1736,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) { } while(rangelen--) { - redisAssert(ln != NULL); + redisAssertWithInfo(c,zobj,ln != NULL); ele = ln->obj; addReplyBulk(c,ele); if (withscores) @@ -1731,12 +1756,11 @@ void zrevrangeCommand(redisClient *c) { zrangeGenericCommand(c,1); } -/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT. - * If "justcount", only the number of elements in the range is returned. */ -void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { +/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */ +void genericZrangebyscoreCommand(redisClient *c, int reverse) { zrangespec range; robj *key = c->argv[1]; - robj *emptyreply, *zobj; + robj *zobj; int offset = 0, limit = -1; int withscores = 0; unsigned long rangelen = 0; @@ -1753,7 +1777,7 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { } if (zslParseRange(c->argv[minidx],c->argv[maxidx],&range) != REDIS_OK) { - addReplyError(c,"min or max is not a double"); + addReplyError(c,"min or max is not a float"); return; } @@ -1779,8 +1803,7 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { } /* Ok, lookup the key and get the range */ - emptyreply = justcount ? shared.czero : shared.emptymultibulk; - if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL || + if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL || checkType(c,zobj,REDIS_ZSET)) return; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { @@ -1792,34 +1815,36 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { double score; /* If reversed, get the last node in range as starting point. */ - if (reverse) + if (reverse) { eptr = zzlLastInRange(zl,range); - else + } else { eptr = zzlFirstInRange(zl,range); + } /* No "first" element in the specified interval. */ if (eptr == NULL) { - addReply(c,emptyreply); + addReply(c, shared.emptymultibulk); return; } /* Get score pointer for the first element. */ - redisAssert(eptr != NULL); + redisAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); /* We don't know in advance how many matching elements there are in the * list, so we push this object that will represent the multi-bulk * length in the output buffer, and will "fix" it later */ - if (!justcount) - replylen = addDeferredMultiBulkLength(c); + replylen = addDeferredMultiBulkLength(c); /* If there is an offset, just traverse the number of elements without * checking the score because that is done in the next loop. */ - while (eptr && offset--) - if (reverse) + while (eptr && offset--) { + if (reverse) { zzlPrev(zl,&eptr,&sptr); - else + } else { zzlNext(zl,&eptr,&sptr); + } + } while (eptr && limit--) { score = zzlGetScore(sptr); @@ -1831,24 +1856,26 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { if (!zslValueLteMax(score,&range)) break; } - /* Do our magic */ + /* We know the element exists, so ziplistGet should always succeed */ + redisAssertWithInfo(c,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong)); + rangelen++; - if (!justcount) { - redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong)); - if (vstr == NULL) - addReplyBulkLongLong(c,vlong); - else - addReplyBulkCBuffer(c,vstr,vlen); - - if (withscores) - addReplyDouble(c,score); + if (vstr == NULL) { + addReplyBulkLongLong(c,vlong); + } else { + addReplyBulkCBuffer(c,vstr,vlen); + } + + if (withscores) { + addReplyDouble(c,score); } /* Move to next node */ - if (reverse) + if (reverse) { zzlPrev(zl,&eptr,&sptr); - else + } else { zzlNext(zl,&eptr,&sptr); + } } } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; @@ -1856,27 +1883,32 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { zskiplistNode *ln; /* If reversed, get the last node in range as starting point. */ - if (reverse) + if (reverse) { ln = zslLastInRange(zsl,range); - else + } else { ln = zslFirstInRange(zsl,range); + } /* No "first" element in the specified interval. */ if (ln == NULL) { - addReply(c,emptyreply); + addReply(c, shared.emptymultibulk); return; } /* We don't know in advance how many matching elements there are in the * list, so we push this object that will represent the multi-bulk * length in the output buffer, and will "fix" it later */ - if (!justcount) - replylen = addDeferredMultiBulkLength(c); + replylen = addDeferredMultiBulkLength(c); /* If there is an offset, just traverse the number of elements without * checking the score because that is done in the next loop. */ - while (ln && offset--) - ln = reverse ? ln->backward : ln->level[0].forward; + while (ln && offset--) { + if (reverse) { + ln = ln->backward; + } else { + ln = ln->level[0].forward; + } + } while (ln && limit--) { /* Abort when the node is no longer in range. */ @@ -1886,39 +1918,114 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { if (!zslValueLteMax(ln->score,&range)) break; } - /* Do our magic */ rangelen++; - if (!justcount) { - addReplyBulk(c,ln->obj); - if (withscores) - addReplyDouble(c,ln->score); + addReplyBulk(c,ln->obj); + + if (withscores) { + addReplyDouble(c,ln->score); } /* Move to next node */ - ln = reverse ? ln->backward : ln->level[0].forward; + if (reverse) { + ln = ln->backward; + } else { + ln = ln->level[0].forward; + } } } else { redisPanic("Unknown sorted set encoding"); } - if (justcount) { - addReplyLongLong(c,(long)rangelen); - } else { - if (withscores) rangelen *= 2; - setDeferredMultiBulkLength(c,replylen,rangelen); + if (withscores) { + rangelen *= 2; } + + setDeferredMultiBulkLength(c, replylen, rangelen); } void zrangebyscoreCommand(redisClient *c) { - genericZrangebyscoreCommand(c,0,0); + genericZrangebyscoreCommand(c,0); } void zrevrangebyscoreCommand(redisClient *c) { - genericZrangebyscoreCommand(c,1,0); + genericZrangebyscoreCommand(c,1); } void zcountCommand(redisClient *c) { - genericZrangebyscoreCommand(c,0,1); + robj *key = c->argv[1]; + robj *zobj; + zrangespec range; + int count = 0; + + /* Parse the range arguments */ + if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) { + addReplyError(c,"min or max is not a float"); + return; + } + + /* Lookup the sorted set */ + if ((zobj = lookupKeyReadOrReply(c, key, shared.czero)) == NULL || + checkType(c, zobj, REDIS_ZSET)) return; + + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + double score; + + /* Use the first element in range as the starting point */ + eptr = zzlFirstInRange(zl,range); + + /* No "first" element */ + if (eptr == NULL) { + addReply(c, shared.czero); + return; + } + + /* First element is in range */ + sptr = ziplistNext(zl,eptr); + score = zzlGetScore(sptr); + redisAssertWithInfo(c,zobj,zslValueLteMax(score,&range)); + + /* Iterate over elements in range */ + while (eptr) { + score = zzlGetScore(sptr); + + /* Abort when the node is no longer in range. */ + if (!zslValueLteMax(score,&range)) { + break; + } else { + count++; + zzlNext(zl,&eptr,&sptr); + } + } + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { + zset *zs = zobj->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *zn; + unsigned long rank; + + /* Find first element in range */ + zn = zslFirstInRange(zsl, range); + + /* Use rank of first element, if any, to determine preliminary count */ + if (zn != NULL) { + rank = zslGetRank(zsl, zn->score, zn->obj); + count = (zsl->length - (rank - 1)); + + /* Find last element in range */ + zn = zslLastInRange(zsl, range); + + /* Use rank of last element, if any, to determine the actual count */ + if (zn != NULL) { + rank = zslGetRank(zsl, zn->score, zn->obj); + count -= (zsl->length - rank); + } + } + } else { + redisPanic("Unknown sorted set encoding"); + } + + addReplyLongLong(c, count); } void zcardCommand(redisClient *c) { @@ -1951,7 +2058,7 @@ void zscoreCommand(redisClient *c) { c->argv[2] = tryObjectEncoding(c->argv[2]); de = dictFind(zs->dict,c->argv[2]); if (de != NULL) { - score = *(double*)dictGetEntryVal(de); + score = *(double*)dictGetVal(de); addReplyDouble(c,score); } else { addReply(c,shared.nullbulk); @@ -1972,15 +2079,15 @@ void zrankGenericCommand(redisClient *c, int reverse) { checkType(c,zobj,REDIS_ZSET)) return; llen = zsetLength(zobj); - redisAssert(ele->encoding == REDIS_ENCODING_RAW); + redisAssertWithInfo(c,ele,ele->encoding == REDIS_ENCODING_RAW); if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; eptr = ziplistIndex(zl,0); - redisAssert(eptr != NULL); + redisAssertWithInfo(c,zobj,eptr != NULL); sptr = ziplistNext(zl,eptr); - redisAssert(sptr != NULL); + redisAssertWithInfo(c,zobj,sptr != NULL); rank = 1; while(eptr != NULL) { @@ -2007,9 +2114,9 @@ void zrankGenericCommand(redisClient *c, int reverse) { ele = c->argv[2] = tryObjectEncoding(c->argv[2]); de = dictFind(zs->dict,ele); if (de != NULL) { - score = *(double*)dictGetEntryVal(de); + score = *(double*)dictGetVal(de); rank = zslGetRank(zsl,score,ele); - redisAssert(rank); /* Existing elements always have a rank. */ + redisAssertWithInfo(c,ele,rank); /* Existing elements always have a rank. */ if (reverse) addReplyLongLong(c,llen-rank); else