return cmp;
}
+unsigned int zzlLength(robj *zobj) {
+ unsigned char *zl = zobj->ptr;
+ return ziplistLen(zl)/2;
+}
+
+/* Move to next entry based on the values in eptr and sptr. Both are set to
+ * NULL when there is no next entry. */
+void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
+ unsigned char *_eptr, *_sptr;
+ redisAssert(*eptr != NULL && *sptr != NULL);
+
+ _eptr = ziplistNext(zl,*sptr);
+ if (_eptr != NULL) {
+ _sptr = ziplistNext(zl,_eptr);
+ redisAssert(_sptr != NULL);
+ } else {
+ /* No next entry. */
+ _sptr = NULL;
+ }
+
+ *eptr = _eptr;
+ *sptr = _sptr;
+}
+
+/* Move to the previous entry based on the values in eptr and sptr. Both are
+ * set to NULL when there is no next entry. */
+void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
+ unsigned char *_eptr, *_sptr;
+ redisAssert(*eptr != NULL && *sptr != NULL);
+
+ _sptr = ziplistPrev(zl,*eptr);
+ if (_sptr != NULL) {
+ _eptr = ziplistPrev(zl,_sptr);
+ redisAssert(_eptr != NULL);
+ } else {
+ /* No previous entry. */
+ _eptr = NULL;
+ }
+
+ *eptr = _eptr;
+ *sptr = _sptr;
+}
+
+/* Returns if there is a part of the zset is in range. Should only be used
+ * internally by zzlFirstInRange and zzlLastInRange. */
+int zzlIsInRange(unsigned char *zl, zrangespec *range) {
+ unsigned char *p;
+ double score;
+
+ /* Test for ranges that will always be empty. */
+ if (range->min > range->max ||
+ (range->min == range->max && (range->minex || range->maxex)))
+ return 0;
+
+ p = ziplistIndex(zl,-1); /* Last score. */
+ redisAssert(p != NULL);
+ score = zzlGetScore(p);
+ if (!zslValueGteMin(score,range))
+ return 0;
+
+ p = ziplistIndex(zl,1); /* First score. */
+ redisAssert(p != NULL);
+ score = zzlGetScore(p);
+ if (!zslValueLteMax(score,range))
+ return 0;
+
+ return 1;
+}
+
+/* Find pointer to the first element contained in the specified range.
+ * Returns NULL when no element is contained in the range. */
+unsigned char *zzlFirstInRange(robj *zobj, zrangespec range) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr = ziplistIndex(zl,0), *sptr;
+ double score;
+
+ /* If everything is out of range, return early. */
+ if (!zzlIsInRange(zl,&range)) return NULL;
+
+ while (eptr != NULL) {
+ sptr = ziplistNext(zl,eptr);
+ redisAssert(sptr != NULL);
+
+ score = zzlGetScore(sptr);
+ if (zslValueGteMin(score,&range))
+ return eptr;
+
+ /* Move to next element. */
+ eptr = ziplistNext(zl,sptr);
+ }
+
+ return NULL;
+}
+
+/* Find pointer to the last element contained in the specified range.
+ * Returns NULL when no element is contained in the range. */
+unsigned char *zzlLastInRange(robj *zobj, zrangespec range) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr = ziplistIndex(zl,-2), *sptr;
+ double score;
+
+ /* If everything is out of range, return early. */
+ if (!zzlIsInRange(zl,&range)) return NULL;
+
+ while (eptr != NULL) {
+ sptr = ziplistNext(zl,eptr);
+ redisAssert(sptr != NULL);
+
+ score = zzlGetScore(sptr);
+ if (zslValueLteMax(score,&range))
+ return eptr;
+
+ /* Move to previous element by moving to the score of previous element.
+ * When this returns NULL, we know there also is no element. */
+ sptr = ziplistPrev(zl,eptr);
+ if (sptr != NULL)
+ redisAssert((eptr = ziplistPrev(zl,sptr)) != NULL);
+ else
+ eptr = NULL;
+ }
+
+ return NULL;
+}
+
unsigned char *zzlFind(robj *zobj, robj *ele, double *score) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr))) {
/* Matching element, pull out score. */
- *score = zzlGetScore(sptr);
+ if (score != NULL) *score = zzlGetScore(sptr);
decrRefCount(ele);
return eptr;
}
unsigned char *zl = zobj->ptr;
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
double s;
- int insert = 0;
ele = getDecodedObject(ele);
while (eptr != NULL) {
/* First element with score larger than score for element to be
* inserted. This means we should take its spot in the list to
* maintain ordering. */
- insert = 1;
- } else if (s == score) {
- /* Ensure lexicographical ordering for elements. */
- if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) < 0)
- insert = 1;
- }
-
- if (insert) {
zzlInsertAt(zobj,ele,score,eptr);
break;
+ } else if (s == score) {
+ /* Ensure lexicographical ordering for elements. */
+ if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
+ zzlInsertAt(zobj,ele,score,eptr);
+ break;
+ }
}
/* Move to next element. */
}
/* Push on tail of list when it was not yet inserted. */
- if (!insert)
- zzlInsertAt(zobj,ele,score,eptr);
+ if (eptr == NULL)
+ zzlInsertAt(zobj,ele,score,NULL);
decrRefCount(ele);
return REDIS_OK;
}
+unsigned long zzlDeleteRangeByScore(robj *zobj, zrangespec range) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr, *sptr;
+ double score;
+ unsigned long deleted = 0;
+
+ eptr = zzlFirstInRange(zobj,range);
+ if (eptr == NULL) return deleted;
+
+
+ /* When the tail of the ziplist is deleted, eptr will point to the sentinel
+ * byte and ziplistNext will return NULL. */
+ while ((sptr = ziplistNext(zl,eptr)) != NULL) {
+ score = zzlGetScore(sptr);
+ if (zslValueLteMax(score,&range)) {
+ /* Delete both the element and the score. */
+ zl = ziplistDelete(zl,&eptr);
+ zl = ziplistDelete(zl,&eptr);
+ deleted++;
+ } else {
+ /* No longer in range. */
+ break;
+ }
+ }
+
+ return deleted;
+}
+
+/* Delete all the elements with rank between start and end from the skiplist.
+ * Start and end are inclusive. Note that start and end need to be 1-based */
+unsigned long zzlDeleteRangeByRank(robj *zobj, unsigned int start, unsigned int end) {
+ unsigned int num = (end-start)+1;
+ zobj->ptr = ziplistDeleteRange(zobj->ptr,2*(start-1),2*num);
+ return num;
+}
+
+/*-----------------------------------------------------------------------------
+ * Common sorted set API
+ *----------------------------------------------------------------------------*/
+
+int zsLength(robj *zobj) {
+ int length = -1;
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ length = zzlLength(zobj);
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ length = ((zset*)zobj->ptr)->zsl->length;
+ } else {
+ redisPanic("Unknown sorted set encoding");
+ }
+ return length;
+}
+
/*-----------------------------------------------------------------------------
* Sorted set commands
*----------------------------------------------------------------------------*/
/* This generic command implements both ZADD and ZINCRBY. */
-void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int incr) {
+void zaddGenericCommand(redisClient *c, int incr) {
static char *nanerr = "resulting score is not a number (NaN)";
+ robj *key = c->argv[1];
+ robj *ele;
robj *zobj;
robj *curobj;
- double curscore = 0.0;
+ double score, curscore = 0.0;
+
+ if (getDoubleFromObjectOrReply(c,c->argv[2],&score,NULL) != REDIS_OK)
+ return;
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *eptr;
+ /* Prefer non-encoded element when dealing with ziplists. */
+ ele = c->argv[3];
if ((eptr = zzlFind(zobj,ele,&curscore)) != NULL) {
if (incr) {
score += curscore;
zskiplistNode *znode;
dictEntry *de;
+ ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
de = dictFind(zs->dict,ele);
if (de != NULL) {
curobj = dictGetEntryKey(de);
}
void zaddCommand(redisClient *c) {
- double scoreval;
- if (getDoubleFromObjectOrReply(c,c->argv[2],&scoreval,NULL) != REDIS_OK) return;
- c->argv[3] = tryObjectEncoding(c->argv[3]);
- zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
+ zaddGenericCommand(c,0);
}
void zincrbyCommand(redisClient *c) {
- double scoreval;
- if (getDoubleFromObjectOrReply(c,c->argv[2],&scoreval,NULL) != REDIS_OK) return;
- c->argv[3] = tryObjectEncoding(c->argv[3]);
- zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
+ zaddGenericCommand(c,1);
}
void zremCommand(redisClient *c) {
- robj *zsetobj;
- zset *zs;
- dictEntry *de;
- double curscore;
- int deleted;
+ robj *key = c->argv[1];
+ robj *ele = c->argv[2];
+ robj *zobj;
- if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
- checkType(c,zsetobj,REDIS_ZSET)) return;
+ if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
- zs = zsetobj->ptr;
- c->argv[2] = tryObjectEncoding(c->argv[2]);
- de = dictFind(zs->dict,c->argv[2]);
- if (de == NULL) {
- addReply(c,shared.czero);
- return;
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ unsigned char *eptr;
+
+ if ((eptr = zzlFind(zobj,ele,NULL)) != NULL) {
+ redisAssert(zzlDelete(zobj,eptr) == REDIS_OK);
+ if (zzlLength(zobj) == 0) dbDelete(c->db,key);
+ } else {
+ addReply(c,shared.czero);
+ return;
+ }
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+ dictEntry *de;
+ double score;
+
+ de = dictFind(zs->dict,ele);
+ if (de != NULL) {
+ /* Delete from the skiplist */
+ score = *(double*)dictGetEntryVal(de);
+ redisAssert(zslDelete(zs->zsl,score,ele));
+
+ /* Delete from the hash table */
+ dictDelete(zs->dict,ele);
+ if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+ if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
+ } else {
+ addReply(c,shared.czero);
+ return;
+ }
+ } else {
+ redisPanic("Unknown sorted set encoding");
}
- /* Delete from the skiplist */
- curscore = *(double*)dictGetEntryVal(de);
- deleted = zslDelete(zs->zsl,curscore,c->argv[2]);
- redisAssert(deleted != 0);
-
- /* Delete from the hash table */
- dictDelete(zs->dict,c->argv[2]);
- if (htNeedsResize(zs->dict)) dictResize(zs->dict);
- if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
- signalModifiedKey(c->db,c->argv[1]);
+
+ signalModifiedKey(c->db,key);
server.dirty++;
addReply(c,shared.cone);
}
void zremrangebyscoreCommand(redisClient *c) {
+ robj *key = c->argv[1];
+ robj *zobj;
zrangespec range;
- long deleted;
- robj *o;
- zset *zs;
+ unsigned long deleted;
/* Parse the range arguments. */
if (zslParseRange(c->argv[2],c->argv[3],&range) != REDIS_OK) {
return;
}
- if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
- checkType(c,o,REDIS_ZSET)) return;
+ if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
- zs = o->ptr;
- deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
- if (htNeedsResize(zs->dict)) dictResize(zs->dict);
- if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
- if (deleted) signalModifiedKey(c->db,c->argv[1]);
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ deleted = zzlDeleteRangeByScore(zobj,range);
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+ deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
+ if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+ if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
+ } else {
+ redisPanic("Unknown sorted set encoding");
+ }
+
+ if (deleted) signalModifiedKey(c->db,key);
server.dirty += deleted;
addReplyLongLong(c,deleted);
}
void zremrangebyrankCommand(redisClient *c) {
+ robj *key = c->argv[1];
+ robj *zobj;
long start;
long end;
int llen;
- long deleted;
- robj *zsetobj;
- zset *zs;
+ unsigned long deleted;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
- if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
- checkType(c,zsetobj,REDIS_ZSET)) return;
- zs = zsetobj->ptr;
- llen = zs->zsl->length;
+ if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
- /* convert negative indexes */
+ /* Sanitize indexes. */
+ llen = zsLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
}
if (end >= llen) end = llen-1;
- /* increment start and end because zsl*Rank functions
- * use 1-based rank */
- deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
- if (htNeedsResize(zs->dict)) dictResize(zs->dict);
- if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
- if (deleted) signalModifiedKey(c->db,c->argv[1]);
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ /* Correct for 1-based rank. */
+ deleted = zzlDeleteRangeByRank(zobj,start+1,end+1);
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+
+ /* Correct for 1-based rank. */
+ deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
+ if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+ if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
+ } else {
+ redisPanic("Unknown sorted set encoding");
+ }
+
+ if (deleted) signalModifiedKey(c->db,key);
server.dirty += deleted;
- addReplyLongLong(c, deleted);
+ addReplyLongLong(c,deleted);
}
typedef struct {
}
void zrangeGenericCommand(redisClient *c, int reverse) {
- robj *o;
+ robj *key = c->argv[1];
+ robj *zobj;
+ int withscores = 0;
long start;
long end;
- int withscores = 0;
int llen;
- int rangelen, j;
- zset *zsetobj;
- zskiplist *zsl;
- zskiplistNode *ln;
- robj *ele;
+ int rangelen;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
return;
}
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
- || checkType(c,o,REDIS_ZSET)) return;
- zsetobj = o->ptr;
- zsl = zsetobj->zsl;
- llen = zsl->length;
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
+ || checkType(c,zobj,REDIS_ZSET)) return;
- /* convert negative indexes */
+ /* Sanitize indexes. */
+ llen = zsLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;
- /* check if starting point is trivial, before searching
- * the element in log(N) time */
- if (reverse) {
- ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start);
- } else {
- ln = start == 0 ?
- zsl->header->level[0].forward : zslGetElementByRank(zsl, start+1);
- }
-
/* Return the result in form of a multi-bulk reply */
- addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen);
- for (j = 0; j < rangelen; j++) {
- ele = ln->obj;
- addReplyBulk(c,ele);
- if (withscores)
- addReplyDouble(c,ln->score);
- ln = reverse ? ln->backward : ln->level[0].forward;
+ addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
+
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr, *sptr;
+ unsigned char *vstr;
+ unsigned int vlen;
+ long long vlong;
+
+ if (reverse)
+ eptr = ziplistIndex(zl,-2-(2*start));
+ else
+ eptr = ziplistIndex(zl,2*start);
+
+ redisAssert(eptr != NULL);
+ sptr = ziplistNext(zl,eptr);
+
+ while (rangelen--) {
+ redisAssert(eptr != NULL && sptr != NULL);
+ redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
+ if (vstr == NULL)
+ addReplyBulkLongLong(c,vlong);
+ else
+ addReplyBulkCBuffer(c,vstr,vlen);
+
+ if (withscores)
+ addReplyDouble(c,zzlGetScore(sptr));
+
+ if (reverse)
+ zzlPrev(zl,&eptr,&sptr);
+ else
+ zzlNext(zl,&eptr,&sptr);
+ }
+
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+ zskiplist *zsl = zs->zsl;
+ zskiplistNode *ln;
+ robj *ele;
+
+ /* Check if starting point is trivial, before doing log(N) lookup. */
+ if (reverse) {
+ ln = zsl->tail;
+ if (start > 0)
+ ln = zslGetElementByRank(zsl,llen-start);
+ } else {
+ ln = zsl->header->level[0].forward;
+ if (start > 0)
+ ln = zslGetElementByRank(zsl,start+1);
+ }
+
+ while(rangelen--) {
+ redisAssert(ln != NULL);
+ ele = ln->obj;
+ addReplyBulk(c,ele);
+ if (withscores)
+ addReplyDouble(c,ln->score);
+ ln = reverse ? ln->backward : ln->level[0].forward;
+ }
+ } else {
+ redisPanic("Unknown sorted set encoding");
}
}
* If "justcount", only the number of elements in the range is returned. */
void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
zrangespec range;
- robj *o, *emptyreply;
- zset *zsetobj;
- zskiplist *zsl;
- zskiplistNode *ln;
+ robj *key = c->argv[1];
+ robj *emptyreply, *zobj;
int offset = 0, limit = -1;
int withscores = 0;
unsigned long rangelen = 0;
/* Ok, lookup the key and get the range */
emptyreply = justcount ? shared.czero : shared.emptymultibulk;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyreply)) == NULL ||
- checkType(c,o,REDIS_ZSET)) return;
- zsetobj = o->ptr;
- zsl = zsetobj->zsl;
+ if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
- /* If reversed, get the last node in range as starting point. */
- if (reverse) {
- ln = zslLastInRange(zsl,range);
- } else {
- ln = zslFirstInRange(zsl,range);
- }
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr, *sptr;
+ unsigned char *vstr;
+ unsigned int vlen;
+ long long vlong;
+ double score;
+
+ /* If reversed, get the last node in range as starting point. */
+ if (reverse)
+ eptr = zzlLastInRange(zobj,range);
+ else
+ eptr = zzlFirstInRange(zobj,range);
+
+ /* No "first" element in the specified interval. */
+ if (eptr == NULL) {
+ addReply(c,emptyreply);
+ return;
+ }
- /* No "first" element in the specified interval. */
- if (ln == NULL) {
- addReply(c,emptyreply);
- return;
- }
+ /* Get score pointer for the first element. */
+ redisAssert(eptr != NULL);
+ sptr = ziplistNext(zl,eptr);
- /* We don't know in advance how many matching elements there are in the
- * list, so we push this object that will represent the multi-bulk length
- * in the output buffer, and will "fix" it later */
- if (!justcount)
- replylen = addDeferredMultiBulkLength(c);
+ /* We don't know in advance how many matching elements there are in the
+ * list, so we push this object that will represent the multi-bulk
+ * length in the output buffer, and will "fix" it later */
+ if (!justcount)
+ replylen = addDeferredMultiBulkLength(c);
+
+ /* If there is an offset, just traverse the number of elements without
+ * checking the score because that is done in the next loop. */
+ while (eptr && offset--)
+ if (reverse)
+ zzlPrev(zl,&eptr,&sptr);
+ else
+ zzlNext(zl,&eptr,&sptr);
+
+ while (eptr && limit--) {
+ score = zzlGetScore(sptr);
+
+ /* Abort when the node is no longer in range. */
+ if (reverse) {
+ if (!zslValueGteMin(score,&range)) break;
+ } else {
+ if (!zslValueLteMax(score,&range)) break;
+ }
- /* If there is an offset, just traverse the number of elements without
- * checking the score because that is done in the next loop. */
- while(ln && offset--) {
- ln = reverse ? ln->backward : ln->level[0].forward;
- }
+ /* Do our magic */
+ rangelen++;
+ if (!justcount) {
+ redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
+ if (vstr == NULL)
+ addReplyBulkLongLong(c,vlong);
+ else
+ addReplyBulkCBuffer(c,vstr,vlen);
+
+ if (withscores)
+ addReplyDouble(c,score);
+ }
- while (ln && limit--) {
- /* Abort when the node is no longer in range. */
- if (reverse) {
- if (!zslValueGteMin(ln->score,&range)) break;
- } else {
- if (!zslValueLteMax(ln->score,&range)) break;
+ /* Move to next node */
+ if (reverse)
+ zzlPrev(zl,&eptr,&sptr);
+ else
+ zzlNext(zl,&eptr,&sptr);
}
-
- /* Do our magic */
- rangelen++;
- if (!justcount) {
- addReplyBulk(c,ln->obj);
- if (withscores)
- addReplyDouble(c,ln->score);
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+ zskiplist *zsl = zs->zsl;
+ zskiplistNode *ln;
+
+ /* If reversed, get the last node in range as starting point. */
+ if (reverse)
+ ln = zslLastInRange(zsl,range);
+ else
+ ln = zslFirstInRange(zsl,range);
+
+ /* No "first" element in the specified interval. */
+ if (ln == NULL) {
+ addReply(c,emptyreply);
+ return;
}
- /* Move to next node */
- ln = reverse ? ln->backward : ln->level[0].forward;
+ /* We don't know in advance how many matching elements there are in the
+ * list, so we push this object that will represent the multi-bulk
+ * length in the output buffer, and will "fix" it later */
+ if (!justcount)
+ replylen = addDeferredMultiBulkLength(c);
+
+ /* If there is an offset, just traverse the number of elements without
+ * checking the score because that is done in the next loop. */
+ while (ln && offset--)
+ ln = reverse ? ln->backward : ln->level[0].forward;
+
+ while (ln && limit--) {
+ /* Abort when the node is no longer in range. */
+ if (reverse) {
+ if (!zslValueGteMin(ln->score,&range)) break;
+ } else {
+ if (!zslValueLteMax(ln->score,&range)) break;
+ }
+
+ /* Do our magic */
+ rangelen++;
+ if (!justcount) {
+ addReplyBulk(c,ln->obj);
+ if (withscores)
+ addReplyDouble(c,ln->score);
+ }
+
+ /* Move to next node */
+ ln = reverse ? ln->backward : ln->level[0].forward;
+ }
+ } else {
+ redisPanic("Unknown sorted set encoding");
}
if (justcount) {
addReplyLongLong(c,(long)rangelen);
} else {
- setDeferredMultiBulkLength(c,replylen,
- withscores ? (rangelen*2) : rangelen);
+ if (withscores) rangelen *= 2;
+ setDeferredMultiBulkLength(c,replylen,rangelen);
}
}
}
void zcardCommand(redisClient *c) {
- robj *o;
- zset *zs;
+ robj *key = c->argv[1];
+ robj *zobj;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
- checkType(c,o,REDIS_ZSET)) return;
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
- zs = o->ptr;
- addReplyLongLong(c,zs->zsl->length);
+ addReplyLongLong(c,zzlLength(zobj));
}
void zscoreCommand(redisClient *c) {
- robj *o;
- zset *zs;
- dictEntry *de;
+ robj *key = c->argv[1];
+ robj *zobj;
+ double score;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
- checkType(c,o,REDIS_ZSET)) return;
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
- zs = o->ptr;
- c->argv[2] = tryObjectEncoding(c->argv[2]);
- de = dictFind(zs->dict,c->argv[2]);
- if (!de) {
- addReply(c,shared.nullbulk);
- } else {
- double *score = dictGetEntryVal(de);
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ if (zzlFind(zobj,c->argv[2],&score) != NULL)
+ addReplyDouble(c,score);
+ else
+ addReply(c,shared.nullbulk);
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+ dictEntry *de;
- addReplyDouble(c,*score);
+ c->argv[2] = tryObjectEncoding(c->argv[2]);
+ de = dictFind(zs->dict,c->argv[2]);
+ if (de != NULL) {
+ score = *(double*)dictGetEntryVal(de);
+ addReplyDouble(c,score);
+ } else {
+ addReply(c,shared.nullbulk);
+ }
+ } else {
+ redisPanic("Unknown sorted set encoding");
}
}
void zrankGenericCommand(redisClient *c, int reverse) {
- robj *o;
- zset *zs;
- zskiplist *zsl;
- dictEntry *de;
+ robj *key = c->argv[1];
+ robj *ele = c->argv[2];
+ robj *zobj;
+ unsigned long llen;
unsigned long rank;
- double *score;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
- checkType(c,o,REDIS_ZSET)) return;
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
+ llen = zsLength(zobj);
- zs = o->ptr;
- zsl = zs->zsl;
- c->argv[2] = tryObjectEncoding(c->argv[2]);
- de = dictFind(zs->dict,c->argv[2]);
- if (!de) {
- addReply(c,shared.nullbulk);
- return;
- }
+ redisAssert(ele->encoding == REDIS_ENCODING_RAW);
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr, *sptr;
- score = dictGetEntryVal(de);
- rank = zslGetRank(zsl, *score, c->argv[2]);
- if (rank) {
- if (reverse) {
- addReplyLongLong(c, zsl->length - rank);
+ eptr = ziplistIndex(zl,0);
+ redisAssert(eptr != NULL);
+ sptr = ziplistNext(zl,eptr);
+ redisAssert(sptr != NULL);
+
+ rank = 1;
+ while(eptr != NULL) {
+ if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
+ break;
+ rank++;
+ zzlNext(zl,&eptr,&sptr);
+ }
+
+ if (eptr != NULL) {
+ if (reverse)
+ addReplyLongLong(c,llen-rank);
+ else
+ addReplyLongLong(c,rank-1);
+ } else {
+ addReply(c,shared.nullbulk);
+ }
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+ zskiplist *zsl = zs->zsl;
+ dictEntry *de;
+ double score;
+
+ ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
+ de = dictFind(zs->dict,ele);
+ if (de != NULL) {
+ score = *(double*)dictGetEntryVal(de);
+ rank = zslGetRank(zsl,score,ele);
+ redisAssert(rank); /* Existing elements always have a rank. */
+ if (reverse)
+ addReplyLongLong(c,llen-rank);
+ else
+ addReplyLongLong(c,rank-1);
} else {
- addReplyLongLong(c, rank-1);
+ addReply(c,shared.nullbulk);
}
} else {
- addReply(c,shared.nullbulk);
+ redisPanic("Unknown sorted set encoding");
}
}