]> git.saurik.com Git - redis.git/blobdiff - src/t_zset.c
Fix used function in ZCARD
[redis.git] / src / t_zset.c
index 25da7871adb5146fac7a4614348afa94a1e066c1..3c9ede1c0971ff6e8747ab02834c1abe3f80582f 100644 (file)
@@ -451,6 +451,7 @@ int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int cl
 
 unsigned int zzlLength(robj *zobj) {
     unsigned char *zl = zobj->ptr;
+    redisAssert(zobj->encoding == REDIS_ENCODING_ZIPLIST);
     return ziplistLen(zl)/2;
 }
 
@@ -728,6 +729,85 @@ int zsLength(robj *zobj) {
     return length;
 }
 
+void zsConvert(robj *zobj, int encoding) {
+    zset *zs;
+    zskiplistNode *node, *next;
+    robj *ele;
+    double score;
+
+    if (zobj->encoding == encoding) return;
+    if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *zl = zobj->ptr;
+        unsigned char *eptr, *sptr;
+        unsigned char *vstr;
+        unsigned int vlen;
+        long long vlong;
+
+        if (encoding != REDIS_ENCODING_RAW)
+            redisPanic("Unknown target encoding");
+
+        zs = zmalloc(sizeof(*zs));
+        zs->dict = dictCreate(&zsetDictType,NULL);
+        zs->zsl = zslCreate();
+
+        eptr = ziplistIndex(zl,0);
+        redisAssert(eptr != NULL);
+        sptr = ziplistNext(zl,eptr);
+        redisAssert(sptr != NULL);
+
+        while (eptr != NULL) {
+            score = zzlGetScore(sptr);
+            redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
+            if (vstr == NULL)
+                ele = createStringObjectFromLongLong(vlong);
+            else
+                ele = createStringObject((char*)vstr,vlen);
+
+            /* Has incremented refcount since it was just created. */
+            node = zslInsert(zs->zsl,score,ele);
+            redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
+            incrRefCount(ele); /* Added to dictionary. */
+            zzlNext(zl,&eptr,&sptr);
+        }
+
+        zfree(zobj->ptr);
+        zobj->ptr = zs;
+        zobj->encoding = REDIS_ENCODING_RAW;
+    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+        unsigned char *zl = ziplistNew();
+
+        if (encoding != REDIS_ENCODING_ZIPLIST)
+            redisPanic("Unknown target encoding");
+
+        /* Approach similar to zslFree(), since we want to free the skiplist at
+         * the same time as creating the ziplist. */
+        zs = zobj->ptr;
+        dictRelease(zs->dict);
+        node = zs->zsl->header->level[0].forward;
+        zfree(zs->zsl->header);
+        zfree(zs->zsl);
+
+        /* Immediately store pointer to ziplist in object because it will
+         * change because of reallocations when pushing to the ziplist. */
+        zobj->ptr = zl;
+
+        while (node) {
+            ele = getDecodedObject(node->obj);
+            redisAssert(zzlInsertAt(zobj,ele,node->score,NULL) == REDIS_OK);
+            decrRefCount(ele);
+
+            next = node->level[0].forward;
+            zslFreeNode(node);
+            node = next;
+        }
+
+        zfree(zs);
+        zobj->encoding = REDIS_ENCODING_ZIPLIST;
+    } else {
+        redisPanic("Unknown sorted set encoding");
+    }
+}
+
 /*-----------------------------------------------------------------------------
  * Sorted set commands 
  *----------------------------------------------------------------------------*/
@@ -746,7 +826,13 @@ void zaddGenericCommand(redisClient *c, int incr) {
 
     zobj = lookupKeyWrite(c->db,key);
     if (zobj == NULL) {
-        zobj = createZsetZiplistObject();
+        if (server.zset_max_ziplist_entries == 0 ||
+            server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
+        {
+            zobj = createZsetObject();
+        } else {
+            zobj = createZsetZiplistObject();
+        }
         dbAdd(c->db,key,zobj);
     } else {
         if (zobj->type != REDIS_ZSET) {
@@ -785,7 +871,13 @@ void zaddGenericCommand(redisClient *c, int incr) {
             else /* ZADD */
                 addReply(c,shared.czero);
         } else {
+            /* Optimize: check if the element is too large or the list becomes
+             * too long *before* executing zzlInsert. */
             redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
+            if (zzlLength(zobj) > server.zset_max_ziplist_entries)
+                zsConvert(zobj,REDIS_ENCODING_RAW);
+            if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
+                zsConvert(zobj,REDIS_ENCODING_RAW);
 
             signalModifiedKey(c->db,key);
             server.dirty++;
@@ -1511,7 +1603,7 @@ void zcardCommand(redisClient *c) {
     if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
         checkType(c,zobj,REDIS_ZSET)) return;
 
-    addReplyLongLong(c,zzlLength(zobj));
+    addReplyLongLong(c,zsLength(zobj));
 }
 
 void zscoreCommand(redisClient *c) {