return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
-void zslInsert(zskiplist *zsl, double score, robj *obj) {
+zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
else
zsl->tail = x;
zsl->length++;
+ return x;
}
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
x = x->level[0].forward;
while (x && x->score <= max) {
zskiplistNode *next = x->level[0].forward;
- zslDeleteNode(zsl, x, update);
+ zslDeleteNode(zsl,x,update);
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
x = x->level[0].forward;
while (x && traversed <= end) {
zskiplistNode *next = x->level[0].forward;
- zslDeleteNode(zsl, x, update);
+ zslDeleteNode(zsl,x,update);
dictDelete(dict,x->obj);
zslFreeNode(x);
removed++;
* Sorted set commands
*----------------------------------------------------------------------------*/
-/* This generic command implements both ZADD and ZINCRBY.
- * scoreval is the score if the operation is a ZADD (doincrement == 0) or
- * the increment if the operation is a ZINCRBY (doincrement == 1). */
-void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
+/* This generic command implements both ZADD and ZINCRBY. */
+void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int incr) {
robj *zsetobj;
zset *zs;
- double *score;
+ zskiplistNode *znode;
zsetobj = lookupKeyWrite(c->db,key);
if (zsetobj == NULL) {
}
zs = zsetobj->ptr;
- /* Ok now since we implement both ZADD and ZINCRBY here the code
- * needs to handle the two different conditions. It's all about setting
- * '*score', that is, the new score to set, to the right value. */
- score = zmalloc(sizeof(double));
- if (doincrement) {
- dictEntry *de;
-
+ /* Since both ZADD and ZINCRBY are implemented here, we need to increment
+ * the score first by the current score if ZINCRBY is called. */
+ if (incr) {
/* Read the old score. If the element was not present starts from 0 */
- de = dictFind(zs->dict,ele);
- if (de) {
- double *oldscore = dictGetEntryVal(de);
- *score = *oldscore + scoreval;
- } else {
- *score = scoreval;
- }
- if (isnan(*score)) {
- addReplySds(c,
- sdsnew("-ERR resulting score is not a number (NaN)\r\n"));
- zfree(score);
+ dictEntry *de = dictFind(zs->dict,ele);
+ if (de != NULL)
+ score += *(double*)dictGetEntryVal(de);
+
+ if (isnan(score)) {
+ addReplyError(c,"resulting score is not a number (NaN)");
/* Note that we don't need to check if the zset may be empty and
* should be removed here, as we can only obtain Nan as score if
* there was already an element in the sorted set. */
return;
}
- } else {
- *score = scoreval;
}
- /* What follows is a simple remove and re-insert operation that is common
- * to both ZADD and ZINCRBY... */
- if (dictAdd(zs->dict,ele,score) == DICT_OK) {
- /* case 1: New element */
+ /* We need to remove and re-insert the element when it was already present
+ * in the dictionary, to update the skiplist. Note that we delay adding a
+ * pointer to the score because we want to reference the score in the
+ * skiplist node. */
+ if (dictAdd(zs->dict,ele,NULL) == DICT_OK) {
+ dictEntry *de;
+
+ /* New element */
incrRefCount(ele); /* added to hash */
- zslInsert(zs->zsl,*score,ele);
+ znode = zslInsert(zs->zsl,score,ele);
incrRefCount(ele); /* added to skiplist */
+
+ /* Update the score in the dict entry */
+ de = dictFind(zs->dict,ele);
+ redisAssert(de != NULL);
+ dictGetEntryVal(de) = &znode->score;
touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
- if (doincrement)
- addReplyDouble(c,*score);
+ if (incr)
+ addReplyDouble(c,score);
else
addReply(c,shared.cone);
} else {
dictEntry *de;
- double *oldscore;
+ robj *curobj;
+ double *curscore;
+ int deleted;
- /* case 2: Score update operation */
+ /* Update score */
de = dictFind(zs->dict,ele);
redisAssert(de != NULL);
- oldscore = dictGetEntryVal(de);
- if (*score != *oldscore) {
- int deleted;
+ curobj = dictGetEntryKey(de);
+ curscore = dictGetEntryVal(de);
- /* Remove and insert the element in the skip list with new score */
- deleted = zslDelete(zs->zsl,*oldscore,ele);
+ /* When the score is updated, reuse the existing string object to
+ * prevent extra alloc/dealloc of strings on ZINCRBY. */
+ if (score != *curscore) {
+ deleted = zslDelete(zs->zsl,*curscore,curobj);
redisAssert(deleted != 0);
- zslInsert(zs->zsl,*score,ele);
- incrRefCount(ele);
- /* Update the score in the hash table */
- dictReplace(zs->dict,ele,score);
+ znode = zslInsert(zs->zsl,score,curobj);
+ incrRefCount(curobj);
+
+ /* Update the score in the current dict entry */
+ dictGetEntryVal(de) = &znode->score;
touchWatchedKey(c->db,c->argv[1]);
server.dirty++;
- } else {
- zfree(score);
}
- if (doincrement)
- addReplyDouble(c,*score);
+ if (incr)
+ addReplyDouble(c,score);
else
addReply(c,shared.czero);
}
robj *zsetobj;
zset *zs;
dictEntry *de;
- double *oldscore;
+ double curscore;
int deleted;
if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
return;
}
/* Delete from the skiplist */
- oldscore = dictGetEntryVal(de);
- deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
+ curscore = *(double*)dictGetEntryVal(de);
+ deleted = zslDelete(zs->zsl,curscore,c->argv[2]);
redisAssert(deleted != 0);
/* Delete from the hash table */
zsetopsrc *src;
robj *dstobj;
zset *dstzset;
+ zskiplistNode *znode;
dictIterator *di;
dictEntry *de;
int touched = 0;
/* expect setnum input keys to be given */
setnum = atoi(c->argv[2]->ptr);
if (setnum < 1) {
- addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n"));
+ addReplyError(c,
+ "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
return;
}
* from small to large, all src[i > 0].dict are non-empty too */
di = dictGetIterator(src[0].dict);
while((de = dictNext(di)) != NULL) {
- double *score = zmalloc(sizeof(double)), value;
- *score = src[0].weight * zunionInterDictValue(de);
+ double score, value;
+ score = src[0].weight * zunionInterDictValue(de);
for (j = 1; j < setnum; j++) {
dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
if (other) {
value = src[j].weight * zunionInterDictValue(other);
- zunionInterAggregate(score, value, aggregate);
+ zunionInterAggregate(&score, value, aggregate);
} else {
break;
}
}
- /* skip entry when not present in every source dict */
- if (j != setnum) {
- zfree(score);
- } else {
+ /* accept entry only when present in every source dict */
+ if (j == setnum) {
robj *o = dictGetEntryKey(de);
- dictAdd(dstzset->dict,o,score);
- incrRefCount(o); /* added to dictionary */
- zslInsert(dstzset->zsl,*score,o);
+ znode = zslInsert(dstzset->zsl,score,o);
incrRefCount(o); /* added to skiplist */
+ dictAdd(dstzset->dict,o,&znode->score);
+ incrRefCount(o); /* added to dictionary */
}
}
dictReleaseIterator(di);
di = dictGetIterator(src[i].dict);
while((de = dictNext(di)) != NULL) {
- /* skip key when already processed */
- if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue;
+ double score, value;
- double *score = zmalloc(sizeof(double)), value;
- *score = src[i].weight * zunionInterDictValue(de);
+ /* skip key when already processed */
+ if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL)
+ continue;
+ score = src[i].weight * zunionInterDictValue(de);
/* because the zsets are sorted by size, its only possible
* for sets at larger indices to hold this entry */
dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de));
if (other) {
value = src[j].weight * zunionInterDictValue(other);
- zunionInterAggregate(score, value, aggregate);
+ zunionInterAggregate(&score, value, aggregate);
}
}
robj *o = dictGetEntryKey(de);
- dictAdd(dstzset->dict,o,score);
- incrRefCount(o); /* added to dictionary */
- zslInsert(dstzset->zsl,*score,o);
+ znode = zslInsert(dstzset->zsl,score,o);
incrRefCount(o); /* added to skiplist */
+ dictAdd(dstzset->dict,o,&znode->score);
+ incrRefCount(o); /* added to dictionary */
}
dictReleaseIterator(di);
}
}
/* Return the result in form of a multi-bulk reply */
- addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
- withscores ? (rangelen*2) : rangelen));
+ addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen);
for (j = 0; j < rangelen; j++) {
ele = ln->obj;
addReplyBulk(c,ele);
if (c->argc != (4 + withscores) && c->argc != (7 + withscores))
badsyntax = 1;
if (badsyntax) {
- addReplySds(c,
- sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
+ addReplyError(c,"wrong number of arguments for ZRANGEBYSCORE");
return;
}
zset *zsetobj = o->ptr;
zskiplist *zsl = zsetobj->zsl;
zskiplistNode *ln;
- robj *ele, *lenobj = NULL;
+ robj *ele;
+ void *replylen = NULL;
unsigned long rangelen = 0;
/* Get the first node with the score >= min, or with
* are in the list, so we push this object that will represent
* the multi-bulk length in the output buffer, and will "fix"
* it later */
- if (!justcount) {
- lenobj = createObject(REDIS_STRING,NULL);
- addReply(c,lenobj);
- decrRefCount(lenobj);
- }
+ if (!justcount)
+ replylen = addDeferredMultiBulkLength(c);
while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) {
if (offset) {
if (justcount) {
addReplyLongLong(c,(long)rangelen);
} else {
- lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",
+ setDeferredMultiBulkLength(c,replylen,
withscores ? (rangelen*2) : rangelen);
}
}
checkType(c,o,REDIS_ZSET)) return;
zs = o->ptr;
- addReplyUlong(c,zs->zsl->length);
+ addReplyLongLong(c,zs->zsl->length);
}
void zscoreCommand(redisClient *c) {