From a669d5e99945b873279eadfcf289181956cb62c3 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Wed, 9 Mar 2011 16:13:06 +0100 Subject: [PATCH] Convert encoding when thresholds overflow --- src/t_zset.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 92 insertions(+), 1 deletion(-) diff --git a/src/t_zset.c b/src/t_zset.c index 25da7871..35d95ba7 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -728,6 +728,85 @@ int zsLength(robj *zobj) { return length; } +void zsConvert(robj *zobj, int encoding) { + zset *zs; + zskiplistNode *node, *next; + robj *ele; + double score; + + if (zobj->encoding == encoding) return; + if (zobj->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = zobj->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + if (encoding != REDIS_ENCODING_RAW) + redisPanic("Unknown target encoding"); + + zs = zmalloc(sizeof(*zs)); + zs->dict = dictCreate(&zsetDictType,NULL); + zs->zsl = zslCreate(); + + eptr = ziplistIndex(zl,0); + redisAssert(eptr != NULL); + sptr = ziplistNext(zl,eptr); + redisAssert(sptr != NULL); + + while (eptr != NULL) { + score = zzlGetScore(sptr); + redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong)); + if (vstr == NULL) + ele = createStringObjectFromLongLong(vlong); + else + ele = createStringObject((char*)vstr,vlen); + + /* Has incremented refcount since it was just created. */ + node = zslInsert(zs->zsl,score,ele); + redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); + incrRefCount(ele); /* Added to dictionary. */ + zzlNext(zl,&eptr,&sptr); + } + + zfree(zobj->ptr); + zobj->ptr = zs; + zobj->encoding = REDIS_ENCODING_RAW; + } else if (zobj->encoding == REDIS_ENCODING_RAW) { + unsigned char *zl = ziplistNew(); + + if (encoding != REDIS_ENCODING_ZIPLIST) + redisPanic("Unknown target encoding"); + + /* Approach similar to zslFree(), since we want to free the skiplist at + * the same time as creating the ziplist. */ + zs = zobj->ptr; + dictRelease(zs->dict); + node = zs->zsl->header->level[0].forward; + zfree(zs->zsl->header); + zfree(zs->zsl); + + /* Immediately store pointer to ziplist in object because it will + * change because of reallocations when pushing to the ziplist. */ + zobj->ptr = zl; + + while (node) { + ele = getDecodedObject(node->obj); + redisAssert(zzlInsertAt(zobj,ele,node->score,NULL) == REDIS_OK); + decrRefCount(ele); + + next = node->level[0].forward; + zslFreeNode(node); + node = next; + } + + zfree(zs); + zobj->encoding = REDIS_ENCODING_ZIPLIST; + } else { + redisPanic("Unknown sorted set encoding"); + } +} + /*----------------------------------------------------------------------------- * Sorted set commands *----------------------------------------------------------------------------*/ @@ -746,7 +825,13 @@ void zaddGenericCommand(redisClient *c, int incr) { zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { - zobj = createZsetZiplistObject(); + if (server.zset_max_ziplist_entries == 0 || + server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr)) + { + zobj = createZsetObject(); + } else { + zobj = createZsetZiplistObject(); + } dbAdd(c->db,key,zobj); } else { if (zobj->type != REDIS_ZSET) { @@ -785,7 +870,13 @@ void zaddGenericCommand(redisClient *c, int incr) { else /* ZADD */ addReply(c,shared.czero); } else { + /* Optimize: check if the element is too large or the list becomes + * too long *before* executing zzlInsert. */ redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK); + if (zzlLength(zobj) > server.zset_max_ziplist_entries) + zsConvert(zobj,REDIS_ENCODING_RAW); + if (sdslen(ele->ptr) > server.zset_max_ziplist_value) + zsConvert(zobj,REDIS_ENCODING_RAW); signalModifiedKey(c->db,key); server.dirty++; -- 2.47.2