]> git.saurik.com Git - redis.git/blobdiff - src/t_zset.c
Fix used function in ZCARD
[redis.git] / src / t_zset.c
index 5fe10883cb039edee51a0ed1cc9295dc33e6ff98..3c9ede1c0971ff6e8747ab02834c1abe3f80582f 100644 (file)
@@ -451,6 +451,7 @@ int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int cl
 
 unsigned int zzlLength(robj *zobj) {
     unsigned char *zl = zobj->ptr;
+    redisAssert(zobj->encoding == REDIS_ENCODING_ZIPLIST);
     return ziplistLen(zl)/2;
 }
 
@@ -658,7 +659,7 @@ int zzlInsert(robj *zobj, robj *ele, double score) {
             break;
         } else if (s == score) {
             /* Ensure lexicographical ordering for elements. */
-            if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) < 0) {
+            if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
                 zzlInsertAt(zobj,ele,score,eptr);
                 break;
             }
@@ -728,6 +729,85 @@ int zsLength(robj *zobj) {
     return length;
 }
 
+void zsConvert(robj *zobj, int encoding) {
+    zset *zs;
+    zskiplistNode *node, *next;
+    robj *ele;
+    double score;
+
+    if (zobj->encoding == encoding) return;
+    if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *zl = zobj->ptr;
+        unsigned char *eptr, *sptr;
+        unsigned char *vstr;
+        unsigned int vlen;
+        long long vlong;
+
+        if (encoding != REDIS_ENCODING_RAW)
+            redisPanic("Unknown target encoding");
+
+        zs = zmalloc(sizeof(*zs));
+        zs->dict = dictCreate(&zsetDictType,NULL);
+        zs->zsl = zslCreate();
+
+        eptr = ziplistIndex(zl,0);
+        redisAssert(eptr != NULL);
+        sptr = ziplistNext(zl,eptr);
+        redisAssert(sptr != NULL);
+
+        while (eptr != NULL) {
+            score = zzlGetScore(sptr);
+            redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
+            if (vstr == NULL)
+                ele = createStringObjectFromLongLong(vlong);
+            else
+                ele = createStringObject((char*)vstr,vlen);
+
+            /* Has incremented refcount since it was just created. */
+            node = zslInsert(zs->zsl,score,ele);
+            redisAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK);
+            incrRefCount(ele); /* Added to dictionary. */
+            zzlNext(zl,&eptr,&sptr);
+        }
+
+        zfree(zobj->ptr);
+        zobj->ptr = zs;
+        zobj->encoding = REDIS_ENCODING_RAW;
+    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+        unsigned char *zl = ziplistNew();
+
+        if (encoding != REDIS_ENCODING_ZIPLIST)
+            redisPanic("Unknown target encoding");
+
+        /* Approach similar to zslFree(), since we want to free the skiplist at
+         * the same time as creating the ziplist. */
+        zs = zobj->ptr;
+        dictRelease(zs->dict);
+        node = zs->zsl->header->level[0].forward;
+        zfree(zs->zsl->header);
+        zfree(zs->zsl);
+
+        /* Immediately store pointer to ziplist in object because it will
+         * change because of reallocations when pushing to the ziplist. */
+        zobj->ptr = zl;
+
+        while (node) {
+            ele = getDecodedObject(node->obj);
+            redisAssert(zzlInsertAt(zobj,ele,node->score,NULL) == REDIS_OK);
+            decrRefCount(ele);
+
+            next = node->level[0].forward;
+            zslFreeNode(node);
+            node = next;
+        }
+
+        zfree(zs);
+        zobj->encoding = REDIS_ENCODING_ZIPLIST;
+    } else {
+        redisPanic("Unknown sorted set encoding");
+    }
+}
+
 /*-----------------------------------------------------------------------------
  * Sorted set commands 
  *----------------------------------------------------------------------------*/
@@ -746,7 +826,13 @@ void zaddGenericCommand(redisClient *c, int incr) {
 
     zobj = lookupKeyWrite(c->db,key);
     if (zobj == NULL) {
-        zobj = createZsetZiplistObject();
+        if (server.zset_max_ziplist_entries == 0 ||
+            server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
+        {
+            zobj = createZsetObject();
+        } else {
+            zobj = createZsetZiplistObject();
+        }
         dbAdd(c->db,key,zobj);
     } else {
         if (zobj->type != REDIS_ZSET) {
@@ -785,7 +871,13 @@ void zaddGenericCommand(redisClient *c, int incr) {
             else /* ZADD */
                 addReply(c,shared.czero);
         } else {
+            /* Optimize: check if the element is too large or the list becomes
+             * too long *before* executing zzlInsert. */
             redisAssert(zzlInsert(zobj,ele,score) == REDIS_OK);
+            if (zzlLength(zobj) > server.zset_max_ziplist_entries)
+                zsConvert(zobj,REDIS_ENCODING_RAW);
+            if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
+                zsConvert(zobj,REDIS_ENCODING_RAW);
 
             signalModifiedKey(c->db,key);
             server.dirty++;
@@ -1505,66 +1597,103 @@ void zcountCommand(redisClient *c) {
 }
 
 void zcardCommand(redisClient *c) {
-    robj *o;
-    zset *zs;
+    robj *key = c->argv[1];
+    robj *zobj;
 
-    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
-        checkType(c,o,REDIS_ZSET)) return;
+    if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
+        checkType(c,zobj,REDIS_ZSET)) return;
 
-    zs = o->ptr;
-    addReplyLongLong(c,zs->zsl->length);
+    addReplyLongLong(c,zsLength(zobj));
 }
 
 void zscoreCommand(redisClient *c) {
-    robj *o;
-    zset *zs;
-    dictEntry *de;
+    robj *key = c->argv[1];
+    robj *zobj;
+    double score;
 
-    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
-        checkType(c,o,REDIS_ZSET)) return;
+    if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
+        checkType(c,zobj,REDIS_ZSET)) return;
 
-    zs = o->ptr;
-    c->argv[2] = tryObjectEncoding(c->argv[2]);
-    de = dictFind(zs->dict,c->argv[2]);
-    if (!de) {
-        addReply(c,shared.nullbulk);
-    } else {
-        double *score = dictGetEntryVal(de);
+    if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+        if (zzlFind(zobj,c->argv[2],&score) != NULL)
+            addReplyDouble(c,score);
+        else
+            addReply(c,shared.nullbulk);
+    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+        zset *zs = zobj->ptr;
+        dictEntry *de;
 
-        addReplyDouble(c,*score);
+        c->argv[2] = tryObjectEncoding(c->argv[2]);
+        de = dictFind(zs->dict,c->argv[2]);
+        if (de != NULL) {
+            score = *(double*)dictGetEntryVal(de);
+            addReplyDouble(c,score);
+        } else {
+            addReply(c,shared.nullbulk);
+        }
+    } else {
+        redisPanic("Unknown sorted set encoding");
     }
 }
 
 void zrankGenericCommand(redisClient *c, int reverse) {
-    robj *o;
-    zset *zs;
-    zskiplist *zsl;
-    dictEntry *de;
+    robj *key = c->argv[1];
+    robj *ele = c->argv[2];
+    robj *zobj;
+    unsigned long llen;
     unsigned long rank;
-    double *score;
 
-    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
-        checkType(c,o,REDIS_ZSET)) return;
+    if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
+        checkType(c,zobj,REDIS_ZSET)) return;
+    llen = zsLength(zobj);
 
-    zs = o->ptr;
-    zsl = zs->zsl;
-    c->argv[2] = tryObjectEncoding(c->argv[2]);
-    de = dictFind(zs->dict,c->argv[2]);
-    if (!de) {
-        addReply(c,shared.nullbulk);
-        return;
-    }
+    redisAssert(ele->encoding == REDIS_ENCODING_RAW);
+    if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *zl = zobj->ptr;
+        unsigned char *eptr, *sptr;
 
-    score = dictGetEntryVal(de);
-    rank = zslGetRank(zsl, *score, c->argv[2]);
-    if (rank) {
-        if (reverse) {
-            addReplyLongLong(c, zsl->length - rank);
+        eptr = ziplistIndex(zl,0);
+        redisAssert(eptr != NULL);
+        sptr = ziplistNext(zl,eptr);
+        redisAssert(sptr != NULL);
+
+        rank = 1;
+        while(eptr != NULL) {
+            if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
+                break;
+            rank++;
+            zzlNext(zl,&eptr,&sptr);
+        }
+
+        if (eptr != NULL) {
+            if (reverse)
+                addReplyLongLong(c,llen-rank);
+            else
+                addReplyLongLong(c,rank-1);
+        } else {
+            addReply(c,shared.nullbulk);
+        }
+    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+        zset *zs = zobj->ptr;
+        zskiplist *zsl = zs->zsl;
+        dictEntry *de;
+        double score;
+
+        ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
+        de = dictFind(zs->dict,ele);
+        if (de != NULL) {
+            score = *(double*)dictGetEntryVal(de);
+            rank = zslGetRank(zsl,score,ele);
+            redisAssert(rank); /* Existing elements always have a rank. */
+            if (reverse)
+                addReplyLongLong(c,llen-rank);
+            else
+                addReplyLongLong(c,rank-1);
         } else {
-            addReplyLongLong(c, rank-1);
+            addReply(c,shared.nullbulk);
         }
     } else {
-        addReply(c,shared.nullbulk);
+        redisPanic("Unknown sorted set encoding");
     }
 }