X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/dba3a153a7ac9a1ec81e8f3034714d4900235a00..d4d3a70da2c9be4c5aa67a0be735568dbe436568:/src/t_zset.c diff --git a/src/t_zset.c b/src/t_zset.c index b77a3424..929714ca 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -188,10 +188,6 @@ static int zslValueLteMax(double value, zrangespec *spec) { return spec->maxex ? (value < spec->max) : (value <= spec->max); } -static int zslValueInRange(double value, zrangespec *spec) { - return zslValueGteMin(value,spec) && zslValueLteMax(value,spec); -} - /* Returns if there is a part of the zset is in range. */ int zslIsInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; @@ -226,10 +222,12 @@ zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec range) { x = x->level[i].forward; } - /* The tail is in range, so the previous block should always return a - * node that is non-NULL and the last one to be out of range. */ + /* This is an inner range, so the next node cannot be NULL. */ x = x->level[0].forward; - redisAssert(x != NULL && zslValueInRange(x->score,&range)); + redisAssert(x != NULL); + + /* Check if score <= max. */ + if (!zslValueLteMax(x->score,&range)) return NULL; return x; } @@ -250,9 +248,11 @@ zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec range) { x = x->level[i].forward; } - /* The header is in range, so the previous block should always return a - * node that is non-NULL and in range. */ - redisAssert(x != NULL && zslValueInRange(x->score,&range)); + /* This is an inner range, so this node cannot be NULL. */ + redisAssert(x != NULL); + + /* Check if score >= min. */ + if (!zslValueGteMin(x->score,&range)) return NULL; return x; } @@ -519,8 +519,7 @@ int zzlIsInRange(unsigned char *zl, zrangespec *range) { /* Find pointer to the first element contained in the specified range. * Returns NULL when no element is contained in the range. */ -unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlFirstInRange(unsigned char *zl, zrangespec range) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; double score; @@ -532,8 +531,12 @@ unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) { redisAssert(sptr != NULL); score = zzlGetScore(sptr); - if (zslValueGteMin(score,&range)) - return eptr; + if (zslValueGteMin(score,&range)) { + /* Check if score <= max. */ + if (zslValueLteMax(score,&range)) + return eptr; + return NULL; + } /* Move to next element. */ eptr = ziplistNext(zl,sptr); @@ -544,8 +547,7 @@ unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) { /* Find pointer to the last element contained in the specified range. * Returns NULL when no element is contained in the range. */ -unsigned char *zzlLastInRange(robj *zobj, zrangespec range) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlLastInRange(unsigned char *zl, zrangespec range) { unsigned char *eptr = ziplistIndex(zl,-2), *sptr; double score; @@ -557,8 +559,12 @@ unsigned char *zzlLastInRange(robj *zobj, zrangespec range) { redisAssert(sptr != NULL); score = zzlGetScore(sptr); - if (zslValueLteMax(score,&range)) - return eptr; + if (zslValueLteMax(score,&range)) { + /* Check if score >= min. */ + if (zslValueGteMin(score,&range)) + return eptr; + return NULL; + } /* Move to previous element by moving to the score of previous element. * When this returns NULL, we know there also is no element. */ @@ -572,8 +578,7 @@ unsigned char *zzlLastInRange(robj *zobj, zrangespec range) { return NULL; } -unsigned char *zzlFind(robj *zobj, robj *ele, double *score) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlFind(unsigned char *zl, robj *ele, double *score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; ele = getDecodedObject(ele); @@ -598,23 +603,20 @@ unsigned char *zzlFind(robj *zobj, robj *ele, double *score) { /* Delete (element,score) pair from ziplist. Use local copy of eptr because we * don't want to modify the one given as argument. */ -int zzlDelete(robj *zobj, unsigned char *eptr) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) { unsigned char *p = eptr; /* TODO: add function to ziplist API to delete N elements from offset. */ zl = ziplistDelete(zl,&p); zl = ziplistDelete(zl,&p); - zobj->ptr = zl; - return REDIS_OK; + return zl; } -int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, double score) { unsigned char *sptr; char scorebuf[128]; int scorelen; - int offset; + size_t offset; redisAssert(ele->encoding == REDIS_ENCODING_RAW); scorelen = d2string(scorebuf,sizeof(scorebuf),score); @@ -632,14 +634,12 @@ int zzlInsertAt(robj *zobj, robj *ele, double score, unsigned char *eptr) { zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen); } - zobj->ptr = zl; - return REDIS_OK; + return zl; } /* Insert (element,score) pair in ziplist. This function assumes the element is * not yet present in the list. */ -int zzlInsert(robj *zobj, robj *ele, double score) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; double s; @@ -653,12 +653,12 @@ int zzlInsert(robj *zobj, robj *ele, double score) { /* First element with score larger than score for element to be * inserted. This means we should take its spot in the list to * maintain ordering. */ - zzlInsertAt(zobj,ele,score,eptr); + zl = zzlInsertAt(zl,eptr,ele,score); break; } else if (s == score) { /* Ensure lexicographical ordering for elements. */ if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) { - zzlInsertAt(zobj,ele,score,eptr); + zl = zzlInsertAt(zl,eptr,ele,score); break; } } @@ -669,21 +669,21 @@ int zzlInsert(robj *zobj, robj *ele, double score) { /* Push on tail of list when it was not yet inserted. */ if (eptr == NULL) - zzlInsertAt(zobj,ele,score,NULL); + zl = zzlInsertAt(zl,NULL,ele,score); decrRefCount(ele); - return REDIS_OK; + return zl; } -unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) { - unsigned char *zl = zobj->ptr; +unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec range, unsigned long *deleted) { unsigned char *eptr, *sptr; double score; - unsigned long deleted = 0; + unsigned long num = 0; - eptr = zzlFirstInRange(zobj,range); - if (eptr == NULL) return deleted; + if (deleted != NULL) *deleted = 0; + eptr = zzlFirstInRange(zl,range); + if (eptr == NULL) return zl; /* When the tail of the ziplist is deleted, eptr will point to the sentinel * byte and ziplistNext will return NULL. */ @@ -693,33 +693,35 @@ unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) { /* Delete both the element and the score. */ zl = ziplistDelete(zl,&eptr); zl = ziplistDelete(zl,&eptr); - deleted++; + num++; } else { /* No longer in range. */ break; } } - return deleted; + if (deleted != NULL) *deleted = num; + return zl; } /* Delete all the elements with rank between start and end from the skiplist. * Start and end are inclusive. Note that start and end need to be 1-based */ -unsigned long zzlDeleteRangeByRank(robj *zobj, unsigned int start, unsigned int end) { +unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) { unsigned int num = (end-start)+1; - zobj->ptr = ziplistDeleteRange(zobj->ptr,2*(start-1),2*num); - return num; + if (deleted) *deleted = num; + zl = ziplistDeleteRange(zl,2*(start-1),2*num); + return zl; } /*----------------------------------------------------------------------------- * Common sorted set API *----------------------------------------------------------------------------*/ -int zsLength(robj *zobj) { +unsigned int zsetLength(robj *zobj) { int length = -1; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { length = zzlLength(zobj->ptr); - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { length = ((zset*)zobj->ptr)->zsl->length; } else { redisPanic("Unknown sorted set encoding"); @@ -727,7 +729,7 @@ int zsLength(robj *zobj) { return length; } -void zsConvert(robj *zobj, int encoding) { +void zsetConvert(robj *zobj, int encoding) { zset *zs; zskiplistNode *node, *next; robj *ele; @@ -741,7 +743,7 @@ void zsConvert(robj *zobj, int encoding) { unsigned int vlen; long long vlong; - if (encoding != REDIS_ENCODING_RAW) + if (encoding != REDIS_ENCODING_SKIPLIST) redisPanic("Unknown target encoding"); zs = zmalloc(sizeof(*zs)); @@ -770,8 +772,8 @@ void zsConvert(robj *zobj, int encoding) { zfree(zobj->ptr); zobj->ptr = zs; - zobj->encoding = REDIS_ENCODING_RAW; - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + zobj->encoding = REDIS_ENCODING_SKIPLIST; + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { unsigned char *zl = ziplistNew(); if (encoding != REDIS_ENCODING_ZIPLIST) @@ -785,13 +787,9 @@ void zsConvert(robj *zobj, int encoding) { zfree(zs->zsl->header); zfree(zs->zsl); - /* Immediately store pointer to ziplist in object because it will - * change because of reallocations when pushing to the ziplist. */ - zobj->ptr = zl; - while (node) { ele = getDecodedObject(node->obj); - redisAssert(zzlInsertAt(zobj,ele,node->score,NULL) == REDIS_OK); + zl = zzlInsertAt(zl,NULL,ele,node->score); decrRefCount(ele); next = node->level[0].forward; @@ -800,6 +798,7 @@ void zsConvert(robj *zobj, int encoding) { } zfree(zs); + zobj->ptr = zl; zobj->encoding = REDIS_ENCODING_ZIPLIST; } else { redisPanic("Unknown sorted set encoding"); @@ -844,7 +843,7 @@ void zaddGenericCommand(redisClient *c, int incr) { /* Prefer non-encoded element when dealing with ziplists. */ ele = c->argv[3]; - if ((eptr = zzlFind(zobj,ele,&curscore)) != NULL) { + if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { if (incr) { score += curscore; if (isnan(score)) { @@ -857,8 +856,8 @@ void zaddGenericCommand(redisClient *c, int incr) { /* Remove and re-insert when score changed. */ if (score != curscore) { - redisAssert(zzlDelete(zobj,eptr) == REDIS_OK); - redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK); + zobj->ptr = zzlDelete(zobj->ptr,eptr); + zobj->ptr = zzlInsert(zobj->ptr,ele,score); signalModifiedKey(c->db,key); server.dirty++; @@ -871,11 +870,11 @@ void zaddGenericCommand(redisClient *c, int incr) { } else { /* Optimize: check if the element is too large or the list becomes * too long *before* executing zzlInsert. */ - redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK); + zobj->ptr = zzlInsert(zobj->ptr,ele,score); if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) - zsConvert(zobj,REDIS_ENCODING_RAW); + zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); if (sdslen(ele->ptr) > server.zset_max_ziplist_value) - zsConvert(zobj,REDIS_ENCODING_RAW); + zsetConvert(zobj,REDIS_ENCODING_SKIPLIST); signalModifiedKey(c->db,key); server.dirty++; @@ -885,7 +884,7 @@ void zaddGenericCommand(redisClient *c, int incr) { else /* ZADD */ addReply(c,shared.cone); } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplistNode *znode; dictEntry *de; @@ -961,14 +960,14 @@ void zremCommand(redisClient *c) { if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *eptr; - if ((eptr = zzlFind(zobj,ele,NULL)) != NULL) { - redisAssert(zzlDelete(zobj,eptr) == REDIS_OK); + if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) { + zobj->ptr = zzlDelete(zobj->ptr,eptr); if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key); } else { addReply(c,shared.czero); return; } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; dictEntry *de; double score; @@ -1012,8 +1011,9 @@ void zremrangebyscoreCommand(redisClient *c) { checkType(c,zobj,REDIS_ZSET)) return; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { - deleted = zzlDeleteRangeByScore(zobj,range); - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted); + if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key); + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); @@ -1042,7 +1042,7 @@ void zremrangebyrankCommand(redisClient *c) { checkType(c,zobj,REDIS_ZSET)) return; /* Sanitize indexes. */ - llen = zsLength(zobj); + llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; @@ -1057,8 +1057,9 @@ void zremrangebyrankCommand(redisClient *c) { if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { /* Correct for 1-based rank. */ - deleted = zzlDeleteRangeByRank(zobj,start+1,end+1); - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted); + if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key); + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; /* Correct for 1-based rank. */ @@ -1158,7 +1159,7 @@ void zuiInitIterator(zsetopsrc *op) { it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr); redisAssert(it->zl.sptr != NULL); } - } else if (op->encoding == REDIS_ENCODING_RAW) { + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { it->sl.zs = op->subject->ptr; it->sl.node = it->sl.zs->zsl->header->level[0].forward; } else { @@ -1186,7 +1187,7 @@ void zuiClearIterator(zsetopsrc *op) { iterzset *it = &op->iter.zset; if (op->encoding == REDIS_ENCODING_ZIPLIST) { REDIS_NOTUSED(it); /* skip */ - } else if (op->encoding == REDIS_ENCODING_RAW) { + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { REDIS_NOTUSED(it); /* skip */ } else { redisPanic("Unknown sorted set encoding"); @@ -1213,7 +1214,7 @@ int zuiLength(zsetopsrc *op) { iterzset *it = &op->iter.zset; if (op->encoding == REDIS_ENCODING_ZIPLIST) { return zzlLength(it->zl.zl); - } else if (op->encoding == REDIS_ENCODING_RAW) { + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { return it->sl.zs->zsl->length; } else { redisPanic("Unknown sorted set encoding"); @@ -1266,7 +1267,7 @@ int zuiNext(zsetopsrc *op, zsetopval *val) { /* Move to next element. */ zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr); - } else if (op->encoding == REDIS_ENCODING_RAW) { + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { if (it->sl.node == NULL) return 0; val->ele = it->sl.node->obj; @@ -1372,13 +1373,13 @@ int zuiFind(zsetopsrc *op, zsetopval *val, double *score) { zuiObjectFromValue(val); if (op->encoding == REDIS_ENCODING_ZIPLIST) { - if (zzlFind(op->subject,val->ele,score) != NULL) { + if (zzlFind(it->zl.zl,val->ele,score) != NULL) { /* Score is already set by zzlFind. */ return 1; } else { return 0; } - } else if (op->encoding == REDIS_ENCODING_RAW) { + } else if (op->encoding == REDIS_ENCODING_SKIPLIST) { dictEntry *de; if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) { *score = *(double*)dictGetEntryVal(de); @@ -1426,6 +1427,7 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { zsetopsrc *src; zsetopval zval; robj *tmp; + unsigned int maxelelen = 0; robj *dstobj; zset *dstzset; zskiplistNode *znode; @@ -1539,6 +1541,10 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { incrRefCount(tmp); /* added to skiplist */ dictAdd(dstzset->dict,tmp,&znode->score); incrRefCount(tmp); /* added to dictionary */ + + if (tmp->encoding == REDIS_ENCODING_RAW) + if (sdslen(tmp->ptr) > maxelelen) + maxelelen = sdslen(tmp->ptr); } } } @@ -1571,6 +1577,10 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { incrRefCount(zval.ele); /* added to skiplist */ dictAdd(dstzset->dict,tmp,&znode->score); incrRefCount(zval.ele); /* added to dictionary */ + + if (tmp->encoding == REDIS_ENCODING_RAW) + if (sdslen(tmp->ptr) > maxelelen) + maxelelen = sdslen(tmp->ptr); } } } else { @@ -1586,13 +1596,18 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { server.dirty++; } if (dstzset->zsl->length) { + /* Convert to ziplist when in limits. */ + if (dstzset->zsl->length <= server.zset_max_ziplist_entries && + maxelelen <= server.zset_max_ziplist_value) + zsetConvert(dstobj,REDIS_ENCODING_ZIPLIST); + dbAdd(c->db,dstkey,dstobj); - addReplyLongLong(c, dstzset->zsl->length); + addReplyLongLong(c,zsetLength(dstobj)); if (!touched) signalModifiedKey(c->db,dstkey); server.dirty++; } else { decrRefCount(dstobj); - addReply(c, shared.czero); + addReply(c,shared.czero); } zfree(src); } @@ -1628,7 +1643,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) { || checkType(c,zobj,REDIS_ZSET)) return; /* Sanitize indexes. */ - llen = zsLength(zobj); + llen = zsetLength(zobj); if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; @@ -1677,7 +1692,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) { zzlNext(zl,&eptr,&sptr); } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; @@ -1777,9 +1792,9 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { /* If reversed, get the last node in range as starting point. */ if (reverse) - eptr = zzlLastInRange(zobj,range); + eptr = zzlLastInRange(zl,range); else - eptr = zzlFirstInRange(zobj,range); + eptr = zzlFirstInRange(zl,range); /* No "first" element in the specified interval. */ if (eptr == NULL) { @@ -1834,7 +1849,7 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { else zzlNext(zl,&eptr,&sptr); } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; zskiplistNode *ln; @@ -1912,7 +1927,7 @@ void zcardCommand(redisClient *c) { if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL || checkType(c,zobj,REDIS_ZSET)) return; - addReplyLongLong(c,zsLength(zobj)); + addReplyLongLong(c,zsetLength(zobj)); } void zscoreCommand(redisClient *c) { @@ -1924,11 +1939,11 @@ void zscoreCommand(redisClient *c) { checkType(c,zobj,REDIS_ZSET)) return; if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { - if (zzlFind(zobj,c->argv[2],&score) != NULL) + if (zzlFind(zobj->ptr,c->argv[2],&score) != NULL) addReplyDouble(c,score); else addReply(c,shared.nullbulk); - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; dictEntry *de; @@ -1954,7 +1969,7 @@ void zrankGenericCommand(redisClient *c, int reverse) { if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL || checkType(c,zobj,REDIS_ZSET)) return; - llen = zsLength(zobj); + llen = zsetLength(zobj); redisAssert(ele->encoding == REDIS_ENCODING_RAW); if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { @@ -1982,7 +1997,7 @@ void zrankGenericCommand(redisClient *c, int reverse) { } else { addReply(c,shared.nullbulk); } - } else if (zobj->encoding == REDIS_ENCODING_RAW) { + } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; dictEntry *de;