]> git.saurik.com Git - redis.git/commitdiff
Convert encoding when thresholds overflow
authorPieter Noordhuis <pcnoordhuis@gmail.com>
Wed, 9 Mar 2011 15:13:06 +0000 (16:13 +0100)
committerPieter Noordhuis <pcnoordhuis@gmail.com>
Wed, 9 Mar 2011 15:13:06 +0000 (16:13 +0100)
src/t_zset.c

index 25da7871adb5146fac7a4614348afa94a1e066c1..35d95ba750229fb8a34247552bcde8fe3cb29a01 100644 (file)
@@ -728,6 +728,85 @@ int zsLength(robj *zobj) {
     return length;
 }
 
+void zsConvert(robj *zobj, int encoding) {
+    zset *zs;
+    zskiplistNode *node, *next;
+    robj *ele;
+    double score;
+
+    if (zobj->encoding == encoding) return;
+    if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *zl = zobj->ptr;
+        unsigned char *eptr, *sptr;
+        unsigned char *vstr;
+        unsigned int vlen;
+        long long vlong;
+
+        if (encoding != REDIS_ENCODING_RAW)
+            redisPanic("Unknown target encoding");
+
+        zs = zmalloc(sizeof(*zs));
+        zs->dict = dictCreate(&zsetDictType,NULL);
+        zs->zsl = zslCreate();
+
+        eptr = ziplistIndex(zl,0);
+        redisAssert(eptr != NULL);
+        sptr = ziplistNext(zl,eptr);
+        redisAssert(sptr != NULL);
+
+        while (eptr != NULL) {
+            score = zzlGetScore(sptr);
+            redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
+            if (vstr == NULL)
+                ele = createStringObjectFromLongLong(vlong);
+            else
+                ele = createStringObject((char*)vstr,vlen);
+
+            /* Has incremented refcount since it was just created. */
+            node = zslInsert(zs->zsl,score,ele);
+            redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
+            incrRefCount(ele); /* Added to dictionary. */
+            zzlNext(zl,&eptr,&sptr);
+        }
+
+        zfree(zobj->ptr);
+        zobj->ptr = zs;
+        zobj->encoding = REDIS_ENCODING_RAW;
+    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+        unsigned char *zl = ziplistNew();
+
+        if (encoding != REDIS_ENCODING_ZIPLIST)
+            redisPanic("Unknown target encoding");
+
+        /* Approach similar to zslFree(), since we want to free the skiplist at
+         * the same time as creating the ziplist. */
+        zs = zobj->ptr;
+        dictRelease(zs->dict);
+        node = zs->zsl->header->level[0].forward;
+        zfree(zs->zsl->header);
+        zfree(zs->zsl);
+
+        /* Immediately store pointer to ziplist in object because it will
+         * change because of reallocations when pushing to the ziplist. */
+        zobj->ptr = zl;
+
+        while (node) {
+            ele = getDecodedObject(node->obj);
+            redisAssert(zzlInsertAt(zobj,ele,node->score,NULL) == REDIS_OK);
+            decrRefCount(ele);
+
+            next = node->level[0].forward;
+            zslFreeNode(node);
+            node = next;
+        }
+
+        zfree(zs);
+        zobj->encoding = REDIS_ENCODING_ZIPLIST;
+    } else {
+        redisPanic("Unknown sorted set encoding");
+    }
+}
+
 /*-----------------------------------------------------------------------------
  * Sorted set commands 
  *----------------------------------------------------------------------------*/
@@ -746,7 +825,13 @@ void zaddGenericCommand(redisClient *c, int incr) {
 
     zobj = lookupKeyWrite(c->db,key);
     if (zobj == NULL) {
-        zobj = createZsetZiplistObject();
+        if (server.zset_max_ziplist_entries == 0 ||
+            server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
+        {
+            zobj = createZsetObject();
+        } else {
+            zobj = createZsetZiplistObject();
+        }
         dbAdd(c->db,key,zobj);
     } else {
         if (zobj->type != REDIS_ZSET) {
@@ -785,7 +870,13 @@ void zaddGenericCommand(redisClient *c, int incr) {
             else /* ZADD */
                 addReply(c,shared.czero);
         } else {
+            /* Optimize: check if the element is too large or the list becomes
+             * too long *before* executing zzlInsert. */
             redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
+            if (zzlLength(zobj) > server.zset_max_ziplist_entries)
+                zsConvert(zobj,REDIS_ENCODING_RAW);
+            if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
+                zsConvert(zobj,REDIS_ENCODING_RAW);
 
             signalModifiedKey(c->db,key);
             server.dirty++;