]> git.saurik.com Git - redis.git/blobdiff - src/t_zset.c
correctly set AOF base size field in server structure
[redis.git] / src / t_zset.c
index b8a961eb775022e46ef2468ecbb54334a6ce6520..6a56c3b4eaf8fd25dfccfd598c4778893bba4d38 100644 (file)
@@ -174,12 +174,6 @@ int zslDelete(zskiplist *zsl, double score, robj *obj) {
     return 0; /* not found */
 }
 
-/* Struct to hold a inclusive/exclusive range spec. */
-typedef struct {
-    double min, max;
-    int minex, maxex; /* are min or max exclusive? */
-} zrangespec;
-
 static int zslValueGteMin(double value, zrangespec *spec) {
     return spec->minex ? (value > spec->min) : (value >= spec->min);
 }
@@ -188,10 +182,6 @@ static int zslValueLteMax(double value, zrangespec *spec) {
     return spec->maxex ? (value < spec->max) : (value <= spec->max);
 }
 
-static int zslValueInRange(double value, zrangespec *spec) {
-    return zslValueGteMin(value,spec) && zslValueLteMax(value,spec);
-}
-
 /* Returns if there is a part of the zset is in range. */
 int zslIsInRange(zskiplist *zsl, zrangespec *range) {
     zskiplistNode *x;
@@ -620,7 +610,7 @@ unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, robj *ele, do
     unsigned char *sptr;
     char scorebuf[128];
     int scorelen;
-    int offset;
+    size_t offset;
 
     redisAssert(ele->encoding == REDIS_ENCODING_RAW);
     scorelen = d2string(scorebuf,sizeof(scorebuf),score);
@@ -725,7 +715,7 @@ unsigned int zsetLength(robj *zobj) {
     int length = -1;
     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
         length = zzlLength(zobj->ptr);
-    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
         length = ((zset*)zobj->ptr)->zsl->length;
     } else {
         redisPanic("Unknown sorted set encoding");
@@ -747,7 +737,7 @@ void zsetConvert(robj *zobj, int encoding) {
         unsigned int vlen;
         long long vlong;
 
-        if (encoding != REDIS_ENCODING_RAW)
+        if (encoding != REDIS_ENCODING_SKIPLIST)
             redisPanic("Unknown target encoding");
 
         zs = zmalloc(sizeof(*zs));
@@ -776,8 +766,8 @@ void zsetConvert(robj *zobj, int encoding) {
 
         zfree(zobj->ptr);
         zobj->ptr = zs;
-        zobj->encoding = REDIS_ENCODING_RAW;
-    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+        zobj->encoding = REDIS_ENCODING_SKIPLIST;
+    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
         unsigned char *zl = ziplistNew();
 
         if (encoding != REDIS_ENCODING_ZIPLIST)
@@ -820,11 +810,29 @@ void zaddGenericCommand(redisClient *c, int incr) {
     robj *ele;
     robj *zobj;
     robj *curobj;
-    double score, curscore = 0.0;
+    double score = 0, *scores, curscore = 0.0;
+    int j, elements = (c->argc-2)/2;
+    int added = 0;
 
-    if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
+    if (c->argc % 2) {
+        addReply(c,shared.syntaxerr);
         return;
+    }
+
+    /* Start parsing all the scores, we need to emit any syntax error
+     * before executing additions to the sorted set, as the command should
+     * either execute fully or nothing at all. */
+    scores = zmalloc(sizeof(double)*elements);
+    for (j = 0; j < elements; j++) {
+        if (getDoubleFromObjectOrReply(c,c->argv[2+j*2],&scores[j],NULL)
+            != REDIS_OK)
+        {
+            zfree(scores);
+            return;
+        }
+    }
 
+    /* Lookup the key and create the sorted set if does not exist. */
     zobj = lookupKeyWrite(c->db,key);
     if (zobj == NULL) {
         if (server.zset_max_ziplist_entries == 0 ||
@@ -838,111 +846,105 @@ void zaddGenericCommand(redisClient *c, int incr) {
     } else {
         if (zobj->type != REDIS_ZSET) {
             addReply(c,shared.wrongtypeerr);
+            zfree(scores);
             return;
         }
     }
 
-    if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
-        unsigned char *eptr;
+    for (j = 0; j < elements; j++) {
+        score = scores[j];
 
-        /* Prefer non-encoded element when dealing with ziplists. */
-        ele = c->argv[3];
-        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
-            if (incr) {
-                score += curscore;
-                if (isnan(score)) {
-                    addReplyError(c,nanerr);
-                    /* Don't need to check if the sorted set is empty, because
-                     * we know it has at least one element. */
-                    return;
+        if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+            unsigned char *eptr;
+
+            /* Prefer non-encoded element when dealing with ziplists. */
+            ele = c->argv[3+j*2];
+            if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
+                if (incr) {
+                    score += curscore;
+                    if (isnan(score)) {
+                        addReplyError(c,nanerr);
+                        /* Don't need to check if the sorted set is empty
+                         * because we know it has at least one element. */
+                        zfree(scores);
+                        return;
+                    }
                 }
-            }
 
-            /* Remove and re-insert when score changed. */
-            if (score != curscore) {
-                zobj->ptr = zzlDelete(zobj->ptr,eptr);
+                /* Remove and re-insert when score changed. */
+                if (score != curscore) {
+                    zobj->ptr = zzlDelete(zobj->ptr,eptr);
+                    zobj->ptr = zzlInsert(zobj->ptr,ele,score);
+
+                    signalModifiedKey(c->db,key);
+                    server.dirty++;
+                }
+            } else {
+                /* Optimize: check if the element is too large or the list
+                 * becomes too long *before* executing zzlInsert. */
                 zobj->ptr = zzlInsert(zobj->ptr,ele,score);
+                if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
+                    zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
+                if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
+                    zsetConvert(zobj,REDIS_ENCODING_SKIPLIST);
 
                 signalModifiedKey(c->db,key);
                 server.dirty++;
+                if (!incr) added++;
             }
+        } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
+            zset *zs = zobj->ptr;
+            zskiplistNode *znode;
+            dictEntry *de;
 
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.czero);
-        } else {
-            /* Optimize: check if the element is too large or the list becomes
-             * too long *before* executing zzlInsert. */
-            zobj->ptr = zzlInsert(zobj->ptr,ele,score);
-            if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries)
-                zsetConvert(zobj,REDIS_ENCODING_RAW);
-            if (sdslen(ele->ptr) > server.zset_max_ziplist_value)
-                zsetConvert(zobj,REDIS_ENCODING_RAW);
-
-            signalModifiedKey(c->db,key);
-            server.dirty++;
-
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.cone);
-        }
-    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
-        zset *zs = zobj->ptr;
-        zskiplistNode *znode;
-        dictEntry *de;
-
-        ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
-        de = dictFind(zs->dict,ele);
-        if (de != NULL) {
-            curobj = dictGetEntryKey(de);
-            curscore = *(double*)dictGetEntryVal(de);
-
-            if (incr) {
-                score += curscore;
-                if (isnan(score)) {
-                    addReplyError(c,nanerr);
-                    /* Don't need to check if the sorted set is empty, because
-                     * we know it has at least one element. */
-                    return;
+            ele = c->argv[3+j*2] = tryObjectEncoding(c->argv[3+j*2]);
+            de = dictFind(zs->dict,ele);
+            if (de != NULL) {
+                curobj = dictGetEntryKey(de);
+                curscore = *(double*)dictGetEntryVal(de);
+
+                if (incr) {
+                    score += curscore;
+                    if (isnan(score)) {
+                        addReplyError(c,nanerr);
+                        /* Don't need to check if the sorted set is empty
+                         * because we know it has at least one element. */
+                        zfree(scores);
+                        return;
+                    }
                 }
-            }
 
-            /* Remove and re-insert when score changed. We can safely delete
-             * the key object from the skiplist, since the dictionary still has
-             * a reference to it. */
-            if (score != curscore) {
-                redisAssert(zslDelete(zs->zsl,curscore,curobj));
-                znode = zslInsert(zs->zsl,score,curobj);
-                incrRefCount(curobj); /* Re-inserted in skiplist. */
-                dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
+                /* Remove and re-insert when score changed. We can safely
+                 * delete the key object from the skiplist, since the
+                 * dictionary still has a reference to it. */
+                if (score != curscore) {
+                    redisAssert(zslDelete(zs->zsl,curscore,curobj));
+                    znode = zslInsert(zs->zsl,score,curobj);
+                    incrRefCount(curobj); /* Re-inserted in skiplist. */
+                    dictGetEntryVal(de) = &znode->score; /* Update score ptr. */
+
+                    signalModifiedKey(c->db,key);
+                    server.dirty++;
+                }
+            } else {
+                znode = zslInsert(zs->zsl,score,ele);
+                incrRefCount(ele); /* Inserted in skiplist. */
+                redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
+                incrRefCount(ele); /* Added to dictionary. */
 
                 signalModifiedKey(c->db,key);
                 server.dirty++;
+                if (!incr) added++;
             }
-
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.czero);
         } else {
-            znode = zslInsert(zs->zsl,score,ele);
-            incrRefCount(ele); /* Inserted in skiplist. */
-            redisAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
-            incrRefCount(ele); /* Added to dictionary. */
-
-            signalModifiedKey(c->db,key);
-            server.dirty++;
-
-            if (incr) /* ZINCRBY */
-                addReplyDouble(c,score);
-            else /* ZADD */
-                addReply(c,shared.cone);
+            redisPanic("Unknown sorted set encoding");
         }
-    } else {
-        redisPanic("Unknown sorted set encoding");
     }
+    zfree(scores);
+    if (incr) /* ZINCRBY */
+        addReplyDouble(c,score);
+    else /* ZADD */
+        addReplyLongLong(c,added);
 }
 
 void zaddCommand(redisClient *c) {
@@ -955,8 +957,8 @@ void zincrbyCommand(redisClient *c) {
 
 void zremCommand(redisClient *c) {
     robj *key = c->argv[1];
-    robj *ele = c->argv[2];
     robj *zobj;
+    int deleted = 0, j;
 
     if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
         checkType(c,zobj,REDIS_ZSET)) return;
@@ -964,39 +966,48 @@ void zremCommand(redisClient *c) {
     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *eptr;
 
-        if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) {
-            zobj->ptr = zzlDelete(zobj->ptr,eptr);
-            if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
-        } else {
-            addReply(c,shared.czero);
-            return;
+        for (j = 2; j < c->argc; j++) {
+            if ((eptr = zzlFind(zobj->ptr,c->argv[j],NULL)) != NULL) {
+                deleted++;
+                zobj->ptr = zzlDelete(zobj->ptr,eptr);
+                if (zzlLength(zobj->ptr) == 0) {
+                    dbDelete(c->db,key);
+                    break;
+                }
+            }
         }
-    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
         zset *zs = zobj->ptr;
         dictEntry *de;
         double score;
 
-        de = dictFind(zs->dict,ele);
-        if (de != NULL) {
-            /* Delete from the skiplist */
-            score = *(double*)dictGetEntryVal(de);
-            redisAssert(zslDelete(zs->zsl,score,ele));
-
-            /* Delete from the hash table */
-            dictDelete(zs->dict,ele);
-            if (htNeedsResize(zs->dict)) dictResize(zs->dict);
-            if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
-        } else {
-            addReply(c,shared.czero);
-            return;
+        for (j = 2; j < c->argc; j++) {
+            de = dictFind(zs->dict,c->argv[j]);
+            if (de != NULL) {
+                deleted++;
+
+                /* Delete from the skiplist */
+                score = *(double*)dictGetEntryVal(de);
+                redisAssert(zslDelete(zs->zsl,score,c->argv[j]));
+
+                /* Delete from the hash table */
+                dictDelete(zs->dict,c->argv[j]);
+                if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+                if (dictSize(zs->dict) == 0) {
+                    dbDelete(c->db,key);
+                    break;
+                }
+            }
         }
     } else {
         redisPanic("Unknown sorted set encoding");
     }
 
-    signalModifiedKey(c->db,key);
-    server.dirty++;
-    addReply(c,shared.cone);
+    if (deleted) {
+        signalModifiedKey(c->db,key);
+        server.dirty += deleted;
+    }
+    addReplyLongLong(c,deleted);
 }
 
 void zremrangebyscoreCommand(redisClient *c) {
@@ -1016,7 +1027,8 @@ void zremrangebyscoreCommand(redisClient *c) {
 
     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
         zobj->ptr = zzlDeleteRangeByScore(zobj->ptr,range,&deleted);
-    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+        if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
+    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
         zset *zs = zobj->ptr;
         deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
@@ -1061,7 +1073,8 @@ void zremrangebyrankCommand(redisClient *c) {
     if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
         /* Correct for 1-based rank. */
         zobj->ptr = zzlDeleteRangeByRank(zobj->ptr,start+1,end+1,&deleted);
-    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+        if (zzlLength(zobj->ptr) == 0) dbDelete(c->db,key);
+    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
         zset *zs = zobj->ptr;
 
         /* Correct for 1-based rank. */
@@ -1161,7 +1174,7 @@ void zuiInitIterator(zsetopsrc *op) {
                 it->zl.sptr = ziplistNext(it->zl.zl,it->zl.eptr);
                 redisAssert(it->zl.sptr != NULL);
             }
-        } else if (op->encoding == REDIS_ENCODING_RAW) {
+        } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
             it->sl.zs = op->subject->ptr;
             it->sl.node = it->sl.zs->zsl->header->level[0].forward;
         } else {
@@ -1189,7 +1202,7 @@ void zuiClearIterator(zsetopsrc *op) {
         iterzset *it = &op->iter.zset;
         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
             REDIS_NOTUSED(it); /* skip */
-        } else if (op->encoding == REDIS_ENCODING_RAW) {
+        } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
             REDIS_NOTUSED(it); /* skip */
         } else {
             redisPanic("Unknown sorted set encoding");
@@ -1216,7 +1229,7 @@ int zuiLength(zsetopsrc *op) {
         iterzset *it = &op->iter.zset;
         if (op->encoding == REDIS_ENCODING_ZIPLIST) {
             return zzlLength(it->zl.zl);
-        } else if (op->encoding == REDIS_ENCODING_RAW) {
+        } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
             return it->sl.zs->zsl->length;
         } else {
             redisPanic("Unknown sorted set encoding");
@@ -1241,7 +1254,7 @@ int zuiNext(zsetopsrc *op, zsetopval *val) {
     if (op->type == REDIS_SET) {
         iterset *it = &op->iter.set;
         if (op->encoding == REDIS_ENCODING_INTSET) {
-            if (!intsetGet(it->is.is,it->is.ii,&val->ell))
+            if (!intsetGet(it->is.is,it->is.ii,(int64_t*)&val->ell))
                 return 0;
             val->score = 1.0;
 
@@ -1269,7 +1282,7 @@ int zuiNext(zsetopsrc *op, zsetopval *val) {
 
             /* Move to next element. */
             zzlNext(it->zl.zl,&it->zl.eptr,&it->zl.sptr);
-        } else if (op->encoding == REDIS_ENCODING_RAW) {
+        } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
             if (it->sl.node == NULL)
                 return 0;
             val->ele = it->sl.node->obj;
@@ -1381,7 +1394,7 @@ int zuiFind(zsetopsrc *op, zsetopval *val, double *score) {
             } else {
                 return 0;
             }
-        } else if (op->encoding == REDIS_ENCODING_RAW) {
+        } else if (op->encoding == REDIS_ENCODING_SKIPLIST) {
             dictEntry *de;
             if ((de = dictFind(it->sl.zs->dict,val->ele)) != NULL) {
                 *score = *(double*)dictGetEntryVal(de);
@@ -1517,6 +1530,7 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
 
     dstobj = createZsetObject();
     dstzset = dstobj->ptr;
+    memset(&zval, 0, sizeof(zval));
 
     if (op == REDIS_OP_INTER) {
         /* Skip everything if the smallest input is empty. */
@@ -1528,7 +1542,12 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
 
                 score = src[0].weight * zval.score;
                 for (j = 1; j < setnum; j++) {
-                    if (zuiFind(&src[j],&zval,&value)) {
+                    /* It is not safe to access the zset we are
+                     * iterating, so explicitly check for equal object. */
+                    if (src[j].subject == src[0].subject) {
+                        value = zval.score*src[j].weight;
+                        zunionInterAggregate(&score,value,aggregate);
+                    } else if (zuiFind(&src[j],&zval,&value)) {
                         value *= src[j].weight;
                         zunionInterAggregate(&score,value,aggregate);
                     } else {
@@ -1552,7 +1571,7 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
         }
     } else if (op == REDIS_OP_UNION) {
         for (i = 0; i < setnum; i++) {
-            if (zuiLength(&src[0]) == 0)
+            if (zuiLength(&src[i]) == 0)
                 continue;
 
             while (zuiNext(&src[i],&zval)) {
@@ -1568,7 +1587,12 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
                 /* Because the inputs are sorted by size, it's only possible
                  * for sets at larger indices to hold this element. */
                 for (j = (i+1); j < setnum; j++) {
-                    if (zuiFind(&src[j],&zval,&value)) {
+                    /* It is not safe to access the zset we are
+                     * iterating, so explicitly check for equal object. */
+                    if(src[j].subject == src[i].subject) {
+                        value = zval.score*src[j].weight;
+                        zunionInterAggregate(&score,value,aggregate);
+                    } else if (zuiFind(&src[j],&zval,&value)) {
                         value *= src[j].weight;
                         zunionInterAggregate(&score,value,aggregate);
                     }
@@ -1694,7 +1718,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) {
                 zzlNext(zl,&eptr,&sptr);
         }
 
-    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
         zset *zs = zobj->ptr;
         zskiplist *zsl = zs->zsl;
         zskiplistNode *ln;
@@ -1851,7 +1875,7 @@ void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
             else
                 zzlNext(zl,&eptr,&sptr);
         }
-    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
         zset *zs = zobj->ptr;
         zskiplist *zsl = zs->zsl;
         zskiplistNode *ln;
@@ -1945,7 +1969,7 @@ void zscoreCommand(redisClient *c) {
             addReplyDouble(c,score);
         else
             addReply(c,shared.nullbulk);
-    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
         zset *zs = zobj->ptr;
         dictEntry *de;
 
@@ -1999,7 +2023,7 @@ void zrankGenericCommand(redisClient *c, int reverse) {
         } else {
             addReply(c,shared.nullbulk);
         }
-    } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
         zset *zs = zobj->ptr;
         zskiplist *zsl = zs->zsl;
         dictEntry *de;