X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/af4e866dbb1455a50d51b3d5f46832f1a36e2080..143d0077ba07fba8d662092c309d51b99270e648:/src/t_zset.c?ds=sidebyside diff --git a/src/t_zset.c b/src/t_zset.c index de32a8ee..27522367 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -24,13 +24,7 @@ * from tail to head, useful for ZREVRANGE. */ zskiplistNode *zslCreateNode(int level, double score, robj *obj) { - zskiplistNode *zn = zmalloc(sizeof(*zn)); - - zn->forward = zmalloc(sizeof(zskiplistNode*) * level); - if (level > 1) - zn->span = zmalloc(sizeof(unsigned int) * (level - 1)); - else - zn->span = NULL; + zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); zn->score = score; zn->obj = obj; return zn; @@ -45,11 +39,8 @@ zskiplist *zslCreate(void) { zsl->length = 0; zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { - zsl->header->forward[j] = NULL; - - /* span has space for ZSKIPLIST_MAXLEVEL-1 elements */ - if (j < ZSKIPLIST_MAXLEVEL-1) - zsl->header->span[j] = 0; + zsl->header->level[j].forward = NULL; + zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; @@ -58,19 +49,15 @@ zskiplist *zslCreate(void) { void zslFreeNode(zskiplistNode *node) { decrRefCount(node->obj); - zfree(node->forward); - zfree(node->span); zfree(node); } void zslFree(zskiplist *zsl) { - zskiplistNode *node = zsl->header->forward[0], *next; + zskiplistNode *node = zsl->header->level[0].forward, *next; - zfree(zsl->header->forward); - zfree(zsl->header->span); zfree(zsl->header); while(node) { - next = node->forward[0]; + next = node->level[0].forward; zslFreeNode(node); node = next; } @@ -84,7 +71,7 @@ int zslRandomLevel(void) { return (levellevel-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; - - while (x->forward[i] && - (x->forward[i]->score < score || - (x->forward[i]->score == score && - compareStringObjects(x->forward[i]->obj,obj) < 0))) { - rank[i] += i > 0 ? x->span[i-1] : 1; - x = x->forward[i]; + while (x->level[i].forward && + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + compareStringObjects(x->level[i].forward->obj,obj) < 0))) { + rank[i] += x->level[i].span; + x = x->level[i].forward; } update[i] = x; } @@ -112,56 +98,51 @@ void zslInsert(zskiplist *zsl, double score, robj *obj) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; - update[i]->span[i-1] = zsl->length; + update[i]->level[i].span = zsl->length; } zsl->level = level; } x = zslCreateNode(level,score,obj); for (i = 0; i < level; i++) { - x->forward[i] = update[i]->forward[i]; - update[i]->forward[i] = x; + x->level[i].forward = update[i]->level[i].forward; + update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ - if (i > 0) { - x->span[i-1] = update[i]->span[i-1] - (rank[0] - rank[i]); - update[i]->span[i-1] = (rank[0] - rank[i]) + 1; - } + x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); + update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ for (i = level; i < zsl->level; i++) { - update[i]->span[i-1]++; + update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; - if (x->forward[0]) - x->forward[0]->backward = x; + if (x->level[0].forward) + x->level[0].forward->backward = x; else zsl->tail = x; zsl->length++; + return x; } /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */ void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; for (i = 0; i < zsl->level; i++) { - if (update[i]->forward[i] == x) { - if (i > 0) { - update[i]->span[i-1] += x->span[i-1] - 1; - } - update[i]->forward[i] = x->forward[i]; + if (update[i]->level[i].forward == x) { + update[i]->level[i].span += x->level[i].span - 1; + update[i]->level[i].forward = x->level[i].forward; } else { - /* invariant: i > 0, because update[0]->forward[0] - * is always equal to x */ - update[i]->span[i-1] -= 1; + update[i]->level[i].span -= 1; } } - if (x->forward[0]) { - x->forward[0]->backward = x->backward; + if (x->level[0].forward) { + x->level[0].forward->backward = x->backward; } else { zsl->tail = x->backward; } - while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL) + while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL) zsl->level--; zsl->length--; } @@ -173,16 +154,16 @@ int zslDelete(zskiplist *zsl, double score, robj *obj) { x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && - (x->forward[i]->score < score || - (x->forward[i]->score == score && - compareStringObjects(x->forward[i]->obj,obj) < 0))) - x = x->forward[i]; + while (x->level[i].forward && + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + compareStringObjects(x->level[i].forward->obj,obj) < 0))) + x = x->level[i].forward; update[i] = x; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ - x = x->forward[0]; + x = x->level[0].forward; if (x && score == x->score && equalStringObjects(x->obj,obj)) { zslDeleteNode(zsl, x, update); zslFreeNode(x); @@ -193,33 +174,43 @@ int zslDelete(zskiplist *zsl, double score, robj *obj) { return 0; /* not found */ } +/* Struct to hold a inclusive/exclusive range spec. */ +typedef struct { + double min, max; + int minex, maxex; /* are min or max exclusive? */ +} zrangespec; + /* Delete all the elements with score between min and max from the skiplist. * Min and mx are inclusive, so a score >= min || score <= max is deleted. * Note that this function takes the reference to the hash table view of the * sorted set, in order to remove the elements from the hash table too. */ -unsigned long zslDeleteRangeByScore(zskiplist *zsl, double min, double max, dict *dict) { +unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec range, dict *dict) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned long removed = 0; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && x->forward[i]->score < min) - x = x->forward[i]; + while (x->level[i].forward && (range.minex ? + x->level[i].forward->score <= range.min : + x->level[i].forward->score < range.min)) + x = x->level[i].forward; update[i] = x; } - /* We may have multiple elements with the same score, what we need - * is to find the element with both the right score and object. */ - x = x->forward[0]; - while (x && x->score <= max) { - zskiplistNode *next = x->forward[0]; - zslDeleteNode(zsl, x, update); + + /* Current node is the last with score < or <= min. */ + x = x->level[0].forward; + + /* Delete nodes while in range. */ + while (x && (range.maxex ? x->score < range.max : x->score <= range.max)) { + zskiplistNode *next = x->level[0].forward; + zslDeleteNode(zsl,x,update); dictDelete(dict,x->obj); zslFreeNode(x); removed++; x = next; } - return removed; /* not found */ + return removed; } /* Delete all the elements with rank between start and end from the skiplist. @@ -231,18 +222,18 @@ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && (traversed + (i > 0 ? x->span[i-1] : 1)) < start) { - traversed += i > 0 ? x->span[i-1] : 1; - x = x->forward[i]; + while (x->level[i].forward && (traversed + x->level[i].span) < start) { + traversed += x->level[i].span; + x = x->level[i].forward; } update[i] = x; } traversed++; - x = x->forward[0]; + x = x->level[0].forward; while (x && traversed <= end) { - zskiplistNode *next = x->forward[0]; - zslDeleteNode(zsl, x, update); + zskiplistNode *next = x->level[0].forward; + zslDeleteNode(zsl,x,update); dictDelete(dict,x->obj); zslFreeNode(x); removed++; @@ -260,12 +251,12 @@ zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) { x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && x->forward[i]->score < score) - x = x->forward[i]; + while (x->level[i].forward && x->level[i].forward->score < score) + x = x->level[i].forward; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ - return x->forward[0]; + return x->level[0].forward; } /* Find the rank for an element by both score and key. @@ -279,12 +270,12 @@ unsigned long zslistTypeGetRank(zskiplist *zsl, double score, robj *o) { x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && - (x->forward[i]->score < score || - (x->forward[i]->score == score && - compareStringObjects(x->forward[i]->obj,o) <= 0))) { - rank += i > 0 ? x->span[i-1] : 1; - x = x->forward[i]; + while (x->level[i].forward && + (x->level[i].forward->score < score || + (x->level[i].forward->score == score && + compareStringObjects(x->level[i].forward->obj,o) <= 0))) { + rank += x->level[i].span; + x = x->level[i].forward; } /* x might be equal to zsl->header, so test if obj is non-NULL */ @@ -303,10 +294,10 @@ zskiplistNode* zslistTypeGetElementByRank(zskiplist *zsl, unsigned long rank) { x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { - while (x->forward[i] && (traversed + (i>0 ? x->span[i-1] : 1)) <= rank) + while (x->level[i].forward && (traversed + x->level[i].span) <= rank) { - traversed += i > 0 ? x->span[i-1] : 1; - x = x->forward[i]; + traversed += x->level[i].span; + x = x->level[i].forward; } if (traversed == rank) { return x; @@ -315,22 +306,53 @@ zskiplistNode* zslistTypeGetElementByRank(zskiplist *zsl, unsigned long rank) { return NULL; } +/* Populate the rangespec according to the objects min and max. */ +static int zslParseRange(robj *min, robj *max, zrangespec *spec) { + char *eptr; + spec->minex = spec->maxex = 0; + + /* Parse the min-max interval. If one of the values is prefixed + * by the "(" character, it's considered "open". For instance + * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max + * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ + if (min->encoding == REDIS_ENCODING_INT) { + spec->min = (long)min->ptr; + } else { + if (((char*)min->ptr)[0] == '(') { + spec->min = strtod((char*)min->ptr+1,&eptr); + if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR; + spec->minex = 1; + } else { + spec->min = strtod((char*)min->ptr,&eptr); + if (eptr[0] != '\0' || isnan(spec->min)) return REDIS_ERR; + } + } + if (max->encoding == REDIS_ENCODING_INT) { + spec->max = (long)max->ptr; + } else { + if (((char*)max->ptr)[0] == '(') { + spec->max = strtod((char*)max->ptr+1,&eptr); + if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR; + spec->maxex = 1; + } else { + spec->max = strtod((char*)max->ptr,&eptr); + if (eptr[0] != '\0' || isnan(spec->max)) return REDIS_ERR; + } + } + + return REDIS_OK; +} + + /*----------------------------------------------------------------------------- * Sorted set commands *----------------------------------------------------------------------------*/ -/* This generic command implements both ZADD and ZINCRBY. - * scoreval is the score if the operation is a ZADD (doincrement == 0) or - * the increment if the operation is a ZINCRBY (doincrement == 1). */ -void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) { +/* This generic command implements both ZADD and ZINCRBY. */ +void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int incr) { robj *zsetobj; zset *zs; - double *score; - - if (isnan(scoreval)) { - addReplySds(c,sdsnew("-ERR provide score is Not A Number (nan)\r\n")); - return; - } + zskiplistNode *znode; zsetobj = lookupKeyWrite(c->db,key); if (zsetobj == NULL) { @@ -344,70 +366,72 @@ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, i } zs = zsetobj->ptr; - /* Ok now since we implement both ZADD and ZINCRBY here the code - * needs to handle the two different conditions. It's all about setting - * '*score', that is, the new score to set, to the right value. */ - score = zmalloc(sizeof(double)); - if (doincrement) { - dictEntry *de; - + /* Since both ZADD and ZINCRBY are implemented here, we need to increment + * the score first by the current score if ZINCRBY is called. */ + if (incr) { /* Read the old score. If the element was not present starts from 0 */ - de = dictFind(zs->dict,ele); - if (de) { - double *oldscore = dictGetEntryVal(de); - *score = *oldscore + scoreval; - } else { - *score = scoreval; - } - if (isnan(*score)) { - addReplySds(c, - sdsnew("-ERR resulting score is Not A Number (nan)\r\n")); - zfree(score); + dictEntry *de = dictFind(zs->dict,ele); + if (de != NULL) + score += *(double*)dictGetEntryVal(de); + + if (isnan(score)) { + addReplyError(c,"resulting score is not a number (NaN)"); /* Note that we don't need to check if the zset may be empty and * should be removed here, as we can only obtain Nan as score if * there was already an element in the sorted set. */ return; } - } else { - *score = scoreval; } - /* What follows is a simple remove and re-insert operation that is common - * to both ZADD and ZINCRBY... */ - if (dictAdd(zs->dict,ele,score) == DICT_OK) { - /* case 1: New element */ + /* We need to remove and re-insert the element when it was already present + * in the dictionary, to update the skiplist. Note that we delay adding a + * pointer to the score because we want to reference the score in the + * skiplist node. */ + if (dictAdd(zs->dict,ele,NULL) == DICT_OK) { + dictEntry *de; + + /* New element */ incrRefCount(ele); /* added to hash */ - zslInsert(zs->zsl,*score,ele); + znode = zslInsert(zs->zsl,score,ele); incrRefCount(ele); /* added to skiplist */ + + /* Update the score in the dict entry */ + de = dictFind(zs->dict,ele); + redisAssert(de != NULL); + dictGetEntryVal(de) = &znode->score; + signalModifiedKey(c->db,c->argv[1]); server.dirty++; - if (doincrement) - addReplyDouble(c,*score); + if (incr) + addReplyDouble(c,score); else addReply(c,shared.cone); } else { dictEntry *de; - double *oldscore; + robj *curobj; + double *curscore; + int deleted; - /* case 2: Score update operation */ + /* Update score */ de = dictFind(zs->dict,ele); redisAssert(de != NULL); - oldscore = dictGetEntryVal(de); - if (*score != *oldscore) { - int deleted; + curobj = dictGetEntryKey(de); + curscore = dictGetEntryVal(de); - /* Remove and insert the element in the skip list with new score */ - deleted = zslDelete(zs->zsl,*oldscore,ele); + /* When the score is updated, reuse the existing string object to + * prevent extra alloc/dealloc of strings on ZINCRBY. */ + if (score != *curscore) { + deleted = zslDelete(zs->zsl,*curscore,curobj); redisAssert(deleted != 0); - zslInsert(zs->zsl,*score,ele); - incrRefCount(ele); - /* Update the score in the hash table */ - dictReplace(zs->dict,ele,score); + znode = zslInsert(zs->zsl,score,curobj); + incrRefCount(curobj); + + /* Update the score in the current dict entry */ + dictGetEntryVal(de) = &znode->score; + signalModifiedKey(c->db,c->argv[1]); server.dirty++; - } else { - zfree(score); } - if (doincrement) - addReplyDouble(c,*score); + if (incr) + addReplyDouble(c,score); else addReply(c,shared.czero); } @@ -415,15 +439,15 @@ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, i void zaddCommand(redisClient *c) { double scoreval; - - if (getDoubleFromObjectOrReply(c, c->argv[2], &scoreval, NULL) != REDIS_OK) return; + if (getDoubleFromObjectOrReply(c,c->argv[2],&scoreval,NULL) != REDIS_OK) return; + c->argv[3] = tryObjectEncoding(c->argv[3]); zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0); } void zincrbyCommand(redisClient *c) { double scoreval; - - if (getDoubleFromObjectOrReply(c, c->argv[2], &scoreval, NULL) != REDIS_OK) return; + if (getDoubleFromObjectOrReply(c,c->argv[2],&scoreval,NULL) != REDIS_OK) return; + c->argv[3] = tryObjectEncoding(c->argv[3]); zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1); } @@ -431,48 +455,53 @@ void zremCommand(redisClient *c) { robj *zsetobj; zset *zs; dictEntry *de; - double *oldscore; + double curscore; int deleted; if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,zsetobj,REDIS_ZSET)) return; zs = zsetobj->ptr; + c->argv[2] = tryObjectEncoding(c->argv[2]); de = dictFind(zs->dict,c->argv[2]); if (de == NULL) { addReply(c,shared.czero); return; } /* Delete from the skiplist */ - oldscore = dictGetEntryVal(de); - deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]); + curscore = *(double*)dictGetEntryVal(de); + deleted = zslDelete(zs->zsl,curscore,c->argv[2]); redisAssert(deleted != 0); /* Delete from the hash table */ dictDelete(zs->dict,c->argv[2]); if (htNeedsResize(zs->dict)) dictResize(zs->dict); if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[1]); server.dirty++; addReply(c,shared.cone); } void zremrangebyscoreCommand(redisClient *c) { - double min; - double max; + zrangespec range; long deleted; - robj *zsetobj; + robj *o; zset *zs; - if ((getDoubleFromObjectOrReply(c, c->argv[2], &min, NULL) != REDIS_OK) || - (getDoubleFromObjectOrReply(c, c->argv[3], &max, NULL) != REDIS_OK)) return; + /* Parse the range arguments. */ + if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) { + addReplyError(c,"min or max is not a double"); + return; + } - if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,zsetobj,REDIS_ZSET)) return; + if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,o,REDIS_ZSET)) return; - zs = zsetobj->ptr; - deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict); + zs = o->ptr; + deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); + if (deleted) signalModifiedKey(c->db,c->argv[1]); server.dirty += deleted; addReplyLongLong(c,deleted); } @@ -497,9 +526,9 @@ void zremrangebyrankCommand(redisClient *c) { if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; - if (end < 0) end = 0; - /* indexes sanity checks */ + /* Invariant: start >= 0, so this test will be true when end < 0. + * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { addReply(c,shared.czero); return; @@ -511,6 +540,7 @@ void zremrangebyrankCommand(redisClient *c) { deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); + if (deleted) signalModifiedKey(c->db,c->argv[1]); server.dirty += deleted; addReplyLongLong(c, deleted); } @@ -536,6 +566,10 @@ int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) { inline static void zunionInterAggregate(double *target, double val, int aggregate) { if (aggregate == REDIS_AGGR_SUM) { *target = *target + val; + /* The result of adding two doubles is NaN when one variable + * is +inf and the other is -inf. When these numbers are added, + * we maintain the convention of the result being 0.0. */ + if (isnan(*target)) *target = 0.0; } else if (aggregate == REDIS_AGGR_MIN) { *target = val < *target ? val : *target; } else if (aggregate == REDIS_AGGR_MAX) { @@ -552,13 +586,16 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { zsetopsrc *src; robj *dstobj; zset *dstzset; + zskiplistNode *znode; dictIterator *di; dictEntry *de; + int touched = 0; /* expect setnum input keys to be given */ setnum = atoi(c->argv[2]->ptr); if (setnum < 1) { - addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n")); + addReplyError(c, + "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"); return; } @@ -598,8 +635,12 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { j++; remaining--; for (i = 0; i < setnum; i++, j++, remaining--) { - if (getDoubleFromObjectOrReply(c, c->argv[j], &src[i].weight, NULL) != REDIS_OK) + if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight, + "weight value is not a double") != REDIS_OK) + { + zfree(src); return; + } } } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) { j++; remaining--; @@ -637,28 +678,26 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { * from small to large, all src[i > 0].dict are non-empty too */ di = dictGetIterator(src[0].dict); while((de = dictNext(di)) != NULL) { - double *score = zmalloc(sizeof(double)), value; - *score = src[0].weight * zunionInterDictValue(de); + double score, value; + score = src[0].weight * zunionInterDictValue(de); for (j = 1; j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { value = src[j].weight * zunionInterDictValue(other); - zunionInterAggregate(score, value, aggregate); + zunionInterAggregate(&score,value,aggregate); } else { break; } } - /* skip entry when not present in every source dict */ - if (j != setnum) { - zfree(score); - } else { + /* Only continue when present in every source dict. */ + if (j == setnum) { robj *o = dictGetEntryKey(de); - dictAdd(dstzset->dict,o,score); - incrRefCount(o); /* added to dictionary */ - zslInsert(dstzset->zsl,*score,o); + znode = zslInsert(dstzset->zsl,score,o); incrRefCount(o); /* added to skiplist */ + dictAdd(dstzset->dict,o,&znode->score); + incrRefCount(o); /* added to dictionary */ } } dictReleaseIterator(di); @@ -669,11 +708,14 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { di = dictGetIterator(src[i].dict); while((de = dictNext(di)) != NULL) { + double score, value; + /* skip key when already processed */ - if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue; + if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) + continue; - double *score = zmalloc(sizeof(double)), value; - *score = src[i].weight * zunionInterDictValue(de); + /* initialize score */ + score = src[i].weight * zunionInterDictValue(de); /* because the zsets are sorted by size, its only possible * for sets at larger indices to hold this entry */ @@ -681,15 +723,15 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { value = src[j].weight * zunionInterDictValue(other); - zunionInterAggregate(score, value, aggregate); + zunionInterAggregate(&score,value,aggregate); } } robj *o = dictGetEntryKey(de); - dictAdd(dstzset->dict,o,score); - incrRefCount(o); /* added to dictionary */ - zslInsert(dstzset->zsl,*score,o); + znode = zslInsert(dstzset->zsl,score,o); incrRefCount(o); /* added to skiplist */ + dictAdd(dstzset->dict,o,&znode->score); + incrRefCount(o); /* added to dictionary */ } dictReleaseIterator(di); } @@ -698,10 +740,15 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION); } - dbDelete(c->db,dstkey); + if (dbDelete(c->db,dstkey)) { + signalModifiedKey(c->db,dstkey); + touched = 1; + server.dirty++; + } if (dstzset->zsl->length) { dbAdd(c->db,dstkey,dstobj); addReplyLongLong(c, dstzset->zsl->length); + if (!touched) signalModifiedKey(c->db,dstkey); server.dirty++; } else { decrRefCount(dstobj); @@ -750,11 +797,10 @@ void zrangeGenericCommand(redisClient *c, int reverse) { if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; - if (end < 0) end = 0; - /* indexes sanity checks */ + /* Invariant: start >= 0, so this test will be true when end < 0. + * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { - /* Out of range start or start > end result in empty list */ addReply(c,shared.emptymultibulk); return; } @@ -767,18 +813,17 @@ void zrangeGenericCommand(redisClient *c, int reverse) { ln = start == 0 ? zsl->tail : zslistTypeGetElementByRank(zsl, llen-start); } else { ln = start == 0 ? - zsl->header->forward[0] : zslistTypeGetElementByRank(zsl, start+1); + zsl->header->level[0].forward : zslistTypeGetElementByRank(zsl, start+1); } /* Return the result in form of a multi-bulk reply */ - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n", - withscores ? (rangelen*2) : rangelen)); + addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen); for (j = 0; j < rangelen; j++) { ele = ln->obj; addReplyBulk(c,ele); if (withscores) addReplyDouble(c,ln->score); - ln = reverse ? ln->backward : ln->forward[0]; + ln = reverse ? ln->backward : ln->level[0].forward; } } @@ -790,128 +835,156 @@ void zrevrangeCommand(redisClient *c) { zrangeGenericCommand(c,1); } -/* This command implements both ZRANGEBYSCORE and ZCOUNT. - * If justcount is non-zero, just the count is returned. */ -void genericZrangebyscoreCommand(redisClient *c, int justcount) { - robj *o; - double min, max; - int minex = 0, maxex = 0; /* are min or max exclusive? */ +/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE and ZCOUNT. + * If "justcount", only the number of elements in the range is returned. */ +void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) { + zrangespec range; + robj *o, *emptyreply; + zset *zsetobj; + zskiplist *zsl; + zskiplistNode *ln; int offset = 0, limit = -1; int withscores = 0; - int badsyntax = 0; + unsigned long rangelen = 0; + void *replylen = NULL; - /* Parse the min-max interval. If one of the values is prefixed - * by the "(" character, it's considered "open". For instance - * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max - * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ - if (((char*)c->argv[2]->ptr)[0] == '(') { - min = strtod((char*)c->argv[2]->ptr+1,NULL); - minex = 1; - } else { - min = strtod(c->argv[2]->ptr,NULL); - } - if (((char*)c->argv[3]->ptr)[0] == '(') { - max = strtod((char*)c->argv[3]->ptr+1,NULL); - maxex = 1; - } else { - max = strtod(c->argv[3]->ptr,NULL); - } - - /* Parse "WITHSCORES": note that if the command was called with - * the name ZCOUNT then we are sure that c->argc == 4, so we'll never - * enter the following paths to parse WITHSCORES and LIMIT. */ - if (c->argc == 5 || c->argc == 8) { - if (strcasecmp(c->argv[c->argc-1]->ptr,"withscores") == 0) - withscores = 1; - else - badsyntax = 1; - } - if (c->argc != (4 + withscores) && c->argc != (7 + withscores)) - badsyntax = 1; - if (badsyntax) { - addReplySds(c, - sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n")); + /* Parse the range arguments. */ + if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) { + addReplyError(c,"min or max is not a double"); return; } - /* Parse "LIMIT" */ - if (c->argc == (7 + withscores) && strcasecmp(c->argv[4]->ptr,"limit")) { - addReply(c,shared.syntaxerr); - return; - } else if (c->argc == (7 + withscores)) { - offset = atoi(c->argv[5]->ptr); - limit = atoi(c->argv[6]->ptr); - if (offset < 0) offset = 0; + /* Parse optional extra arguments. Note that ZCOUNT will exactly have + * 4 arguments, so we'll never enter the following code path. */ + if (c->argc > 4) { + int remaining = c->argc - 4; + int pos = 4; + + while (remaining) { + if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr,"withscores")) { + pos++; remaining--; + withscores = 1; + } else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr,"limit")) { + offset = atoi(c->argv[pos+1]->ptr); + limit = atoi(c->argv[pos+2]->ptr); + pos += 3; remaining -= 3; + } else { + addReply(c,shared.syntaxerr); + return; + } + } } /* Ok, lookup the key and get the range */ - o = lookupKeyRead(c->db,c->argv[1]); - if (o == NULL) { - addReply(c,justcount ? shared.czero : shared.emptymultibulk); - } else { - if (o->type != REDIS_ZSET) { - addReply(c,shared.wrongtypeerr); + emptyreply = justcount ? shared.czero : shared.emptymultibulk; + if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyreply)) == NULL || + checkType(c,o,REDIS_ZSET)) return; + zsetobj = o->ptr; + zsl = zsetobj->zsl; + + /* If reversed, assume the elements are sorted from high to low score. */ + ln = zslFirstWithScore(zsl,range.min); + if (reverse) { + /* If range.min is out of range, ln will be NULL and we need to use + * the tail of the skiplist as first node of the range. */ + if (ln == NULL) ln = zsl->tail; + + /* zslFirstWithScore returns the first element with where with + * score >= range.min, so backtrack to make sure the element we use + * here has score <= range.min. */ + while (ln && ln->score > range.min) ln = ln->backward; + + /* Move to the right element according to the range spec. */ + if (range.minex) { + /* Find last element with score < range.min */ + while (ln && ln->score == range.min) ln = ln->backward; } else { - zset *zsetobj = o->ptr; - zskiplist *zsl = zsetobj->zsl; - zskiplistNode *ln; - robj *ele, *lenobj = NULL; - unsigned long rangelen = 0; - - /* Get the first node with the score >= min, or with - * score > min if 'minex' is true. */ - ln = zslFirstWithScore(zsl,min); - while (minex && ln && ln->score == min) ln = ln->forward[0]; - - if (ln == NULL) { - /* No element matching the speciifed interval */ - addReply(c,justcount ? shared.czero : shared.emptymultibulk); - return; - } + /* Find last element with score <= range.min */ + while (ln && ln->level[0].forward && + ln->level[0].forward->score == range.min) + ln = ln->level[0].forward; + } + } else { + if (range.minex) { + /* Find first element with score > range.min */ + while (ln && ln->score == range.min) ln = ln->level[0].forward; + } + } - /* We don't know in advance how many matching elements there - * are in the list, so we push this object that will represent - * the multi-bulk length in the output buffer, and will "fix" - * it later */ - if (!justcount) { - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); - } + /* No "first" element in the specified interval. */ + if (ln == NULL) { + addReply(c,emptyreply); + return; + } - while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) { - if (offset) { - offset--; - ln = ln->forward[0]; - continue; - } - if (limit == 0) break; - if (!justcount) { - ele = ln->obj; - addReplyBulk(c,ele); - if (withscores) - addReplyDouble(c,ln->score); - } - ln = ln->forward[0]; - rangelen++; - if (limit > 0) limit--; + /* We don't know in advance how many matching elements there + * are in the list, so we push this object that will represent + * the multi-bulk length in the output buffer, and will "fix" + * it later */ + if (!justcount) + replylen = addDeferredMultiBulkLength(c); + + /* If there is an offset, just traverse the number of elements without + * checking the score because that is done in the next loop. */ + while(ln && offset--) { + if (reverse) + ln = ln->backward; + else + ln = ln->level[0].forward; + } + + while (ln && limit--) { + /* Check if this this element is in range. */ + if (reverse) { + if (range.maxex) { + /* Element should have score > range.max */ + if (ln->score <= range.max) break; + } else { + /* Element should have score >= range.max */ + if (ln->score < range.max) break; } - if (justcount) { - addReplyLongLong(c,(long)rangelen); + } else { + if (range.maxex) { + /* Element should have score < range.max */ + if (ln->score >= range.max) break; } else { - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n", - withscores ? (rangelen*2) : rangelen); + /* Element should have score <= range.max */ + if (ln->score > range.max) break; } } + + /* Do our magic */ + rangelen++; + if (!justcount) { + addReplyBulk(c,ln->obj); + if (withscores) + addReplyDouble(c,ln->score); + } + + if (reverse) + ln = ln->backward; + else + ln = ln->level[0].forward; + } + + if (justcount) { + addReplyLongLong(c,(long)rangelen); + } else { + setDeferredMultiBulkLength(c,replylen, + withscores ? (rangelen*2) : rangelen); } } void zrangebyscoreCommand(redisClient *c) { - genericZrangebyscoreCommand(c,0); + genericZrangebyscoreCommand(c,0,0); +} + +void zrevrangebyscoreCommand(redisClient *c) { + genericZrangebyscoreCommand(c,1,0); } void zcountCommand(redisClient *c) { - genericZrangebyscoreCommand(c,1); + genericZrangebyscoreCommand(c,0,1); } void zcardCommand(redisClient *c) { @@ -922,7 +995,7 @@ void zcardCommand(redisClient *c) { checkType(c,o,REDIS_ZSET)) return; zs = o->ptr; - addReplyUlong(c,zs->zsl->length); + addReplyLongLong(c,zs->zsl->length); } void zscoreCommand(redisClient *c) { @@ -934,6 +1007,7 @@ void zscoreCommand(redisClient *c) { checkType(c,o,REDIS_ZSET)) return; zs = o->ptr; + c->argv[2] = tryObjectEncoding(c->argv[2]); de = dictFind(zs->dict,c->argv[2]); if (!de) { addReply(c,shared.nullbulk); @@ -957,6 +1031,7 @@ void zrankGenericCommand(redisClient *c, int reverse) { zs = o->ptr; zsl = zs->zsl; + c->argv[2] = tryObjectEncoding(c->argv[2]); de = dictFind(zs->dict,c->argv[2]); if (!de) { addReply(c,shared.nullbulk);