return ziplistLen(zl)/2;
}
+/* Move to next entry based on the values in eptr and sptr. Both are set to
+ * NULL when there is no next entry. */
+void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
+ unsigned char *_eptr, *_sptr;
+ redisAssert(*eptr != NULL && *sptr != NULL);
+
+ _eptr = ziplistNext(zl,*sptr);
+ if (_eptr != NULL) {
+ _sptr = ziplistNext(zl,_eptr);
+ redisAssert(_sptr != NULL);
+ } else {
+ /* No next entry. */
+ _sptr = NULL;
+ }
+
+ *eptr = _eptr;
+ *sptr = _sptr;
+}
+
+/* Move to the previous entry based on the values in eptr and sptr. Both are
+ * set to NULL when there is no next entry. */
+void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr) {
+ unsigned char *_eptr, *_sptr;
+ redisAssert(*eptr != NULL && *sptr != NULL);
+
+ _sptr = ziplistPrev(zl,*eptr);
+ if (_sptr != NULL) {
+ _eptr = ziplistPrev(zl,_sptr);
+ redisAssert(_eptr != NULL);
+ } else {
+ /* No previous entry. */
+ _eptr = NULL;
+ }
+
+ *eptr = _eptr;
+ *sptr = _sptr;
+}
+
/* Returns if there is a part of the zset is in range. Should only be used
* internally by zzlFirstInRange and zzlLastInRange. */
int zzlIsInRange(unsigned char *zl, zrangespec *range) {
break;
} else if (s == score) {
/* Ensure lexicographical ordering for elements. */
- if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) < 0) {
+ if (zzlCompareElements(eptr,ele->ptr,sdslen(ele->ptr)) > 0) {
zzlInsertAt(zobj,ele,score,eptr);
break;
}
return deleted;
}
+/* Delete all the elements with rank between start and end from the skiplist.
+ * Start and end are inclusive. Note that start and end need to be 1-based */
+unsigned long zzlDeleteRangeByRank(robj *zobj, unsigned int start, unsigned int end) {
+ unsigned int num = (end-start)+1;
+ zobj->ptr = ziplistDeleteRange(zobj->ptr,2*(start-1),2*num);
+ return num;
+}
+
+/*-----------------------------------------------------------------------------
+ * Common sorted set API
+ *----------------------------------------------------------------------------*/
+
+int zsLength(robj *zobj) {
+ int length = -1;
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ length = zzlLength(zobj);
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ length = ((zset*)zobj->ptr)->zsl->length;
+ } else {
+ redisPanic("Unknown sorted set encoding");
+ }
+ return length;
+}
+
/*-----------------------------------------------------------------------------
* Sorted set commands
*----------------------------------------------------------------------------*/
}
void zremrangebyrankCommand(redisClient *c) {
+ robj *key = c->argv[1];
+ robj *zobj;
long start;
long end;
int llen;
- long deleted;
- robj *zsetobj;
- zset *zs;
+ unsigned long deleted;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
- if ((zsetobj = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
- checkType(c,zsetobj,REDIS_ZSET)) return;
- zs = zsetobj->ptr;
- llen = zs->zsl->length;
+ if ((zobj = lookupKeyWriteOrReply(c,key,shared.czero)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
- /* convert negative indexes */
+ /* Sanitize indexes. */
+ llen = zsLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
}
if (end >= llen) end = llen-1;
- /* increment start and end because zsl*Rank functions
- * use 1-based rank */
- deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
- if (htNeedsResize(zs->dict)) dictResize(zs->dict);
- if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
- if (deleted) signalModifiedKey(c->db,c->argv[1]);
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ /* Correct for 1-based rank. */
+ deleted = zzlDeleteRangeByRank(zobj,start+1,end+1);
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+
+ /* Correct for 1-based rank. */
+ deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
+ if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+ if (dictSize(zs->dict) == 0) dbDelete(c->db,key);
+ } else {
+ redisPanic("Unknown sorted set encoding");
+ }
+
+ if (deleted) signalModifiedKey(c->db,key);
server.dirty += deleted;
- addReplyLongLong(c, deleted);
+ addReplyLongLong(c,deleted);
}
typedef struct {
}
void zrangeGenericCommand(redisClient *c, int reverse) {
- robj *o;
+ robj *key = c->argv[1];
+ robj *zobj;
+ int withscores = 0;
long start;
long end;
- int withscores = 0;
int llen;
- int rangelen, j;
- zset *zsetobj;
- zskiplist *zsl;
- zskiplistNode *ln;
- robj *ele;
+ int rangelen;
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
return;
}
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
- || checkType(c,o,REDIS_ZSET)) return;
- zsetobj = o->ptr;
- zsl = zsetobj->zsl;
- llen = zsl->length;
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.emptymultibulk)) == NULL
+ || checkType(c,zobj,REDIS_ZSET)) return;
- /* convert negative indexes */
+ /* Sanitize indexes. */
+ llen = zsLength(zobj);
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;
- /* check if starting point is trivial, before searching
- * the element in log(N) time */
- if (reverse) {
- ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start);
- } else {
- ln = start == 0 ?
- zsl->header->level[0].forward : zslGetElementByRank(zsl, start+1);
- }
-
/* Return the result in form of a multi-bulk reply */
- addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen);
- for (j = 0; j < rangelen; j++) {
- ele = ln->obj;
- addReplyBulk(c,ele);
- if (withscores)
- addReplyDouble(c,ln->score);
- ln = reverse ? ln->backward : ln->level[0].forward;
+ addReplyMultiBulkLen(c, withscores ? (rangelen*2) : rangelen);
+
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr, *sptr;
+ unsigned char *vstr;
+ unsigned int vlen;
+ long long vlong;
+
+ if (reverse)
+ eptr = ziplistIndex(zl,-2-(2*start));
+ else
+ eptr = ziplistIndex(zl,2*start);
+
+ redisAssert(eptr != NULL);
+ sptr = ziplistNext(zl,eptr);
+
+ while (rangelen--) {
+ redisAssert(eptr != NULL && sptr != NULL);
+ redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
+ if (vstr == NULL)
+ addReplyBulkLongLong(c,vlong);
+ else
+ addReplyBulkCBuffer(c,vstr,vlen);
+
+ if (withscores)
+ addReplyDouble(c,zzlGetScore(sptr));
+
+ if (reverse)
+ zzlPrev(zl,&eptr,&sptr);
+ else
+ zzlNext(zl,&eptr,&sptr);
+ }
+
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+ zskiplist *zsl = zs->zsl;
+ zskiplistNode *ln;
+ robj *ele;
+
+ /* Check if starting point is trivial, before doing log(N) lookup. */
+ if (reverse) {
+ ln = zsl->tail;
+ if (start > 0)
+ ln = zslGetElementByRank(zsl,llen-start);
+ } else {
+ ln = zsl->header->level[0].forward;
+ if (start > 0)
+ ln = zslGetElementByRank(zsl,start+1);
+ }
+
+ while(rangelen--) {
+ redisAssert(ln != NULL);
+ ele = ln->obj;
+ addReplyBulk(c,ele);
+ if (withscores)
+ addReplyDouble(c,ln->score);
+ ln = reverse ? ln->backward : ln->level[0].forward;
+ }
+ } else {
+ redisPanic("Unknown sorted set encoding");
}
}
* If "justcount", only the number of elements in the range is returned. */
void genericZrangebyscoreCommand(redisClient *c, int reverse, int justcount) {
zrangespec range;
- robj *o, *emptyreply;
- zset *zsetobj;
- zskiplist *zsl;
- zskiplistNode *ln;
+ robj *key = c->argv[1];
+ robj *emptyreply, *zobj;
int offset = 0, limit = -1;
int withscores = 0;
unsigned long rangelen = 0;
/* Ok, lookup the key and get the range */
emptyreply = justcount ? shared.czero : shared.emptymultibulk;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],emptyreply)) == NULL ||
- checkType(c,o,REDIS_ZSET)) return;
- zsetobj = o->ptr;
- zsl = zsetobj->zsl;
+ if ((zobj = lookupKeyReadOrReply(c,key,emptyreply)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
- /* If reversed, get the last node in range as starting point. */
- if (reverse) {
- ln = zslLastInRange(zsl,range);
- } else {
- ln = zslFirstInRange(zsl,range);
- }
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr, *sptr;
+ unsigned char *vstr;
+ unsigned int vlen;
+ long long vlong;
+ double score;
- /* No "first" element in the specified interval. */
- if (ln == NULL) {
- addReply(c,emptyreply);
- return;
- }
+ /* If reversed, get the last node in range as starting point. */
+ if (reverse)
+ eptr = zzlLastInRange(zobj,range);
+ else
+ eptr = zzlFirstInRange(zobj,range);
- /* We don't know in advance how many matching elements there are in the
- * list, so we push this object that will represent the multi-bulk length
- * in the output buffer, and will "fix" it later */
- if (!justcount)
- replylen = addDeferredMultiBulkLength(c);
+ /* No "first" element in the specified interval. */
+ if (eptr == NULL) {
+ addReply(c,emptyreply);
+ return;
+ }
- /* If there is an offset, just traverse the number of elements without
- * checking the score because that is done in the next loop. */
- while(ln && offset--) {
- ln = reverse ? ln->backward : ln->level[0].forward;
- }
+ /* Get score pointer for the first element. */
+ redisAssert(eptr != NULL);
+ sptr = ziplistNext(zl,eptr);
- while (ln && limit--) {
- /* Abort when the node is no longer in range. */
- if (reverse) {
- if (!zslValueGteMin(ln->score,&range)) break;
- } else {
- if (!zslValueLteMax(ln->score,&range)) break;
+ /* We don't know in advance how many matching elements there are in the
+ * list, so we push this object that will represent the multi-bulk
+ * length in the output buffer, and will "fix" it later */
+ if (!justcount)
+ replylen = addDeferredMultiBulkLength(c);
+
+ /* If there is an offset, just traverse the number of elements without
+ * checking the score because that is done in the next loop. */
+ while (eptr && offset--)
+ if (reverse)
+ zzlPrev(zl,&eptr,&sptr);
+ else
+ zzlNext(zl,&eptr,&sptr);
+
+ while (eptr && limit--) {
+ score = zzlGetScore(sptr);
+
+ /* Abort when the node is no longer in range. */
+ if (reverse) {
+ if (!zslValueGteMin(score,&range)) break;
+ } else {
+ if (!zslValueLteMax(score,&range)) break;
+ }
+
+ /* Do our magic */
+ rangelen++;
+ if (!justcount) {
+ redisAssert(ziplistGet(eptr,&vstr,&vlen,&vlong));
+ if (vstr == NULL)
+ addReplyBulkLongLong(c,vlong);
+ else
+ addReplyBulkCBuffer(c,vstr,vlen);
+
+ if (withscores)
+ addReplyDouble(c,score);
+ }
+
+ /* Move to next node */
+ if (reverse)
+ zzlPrev(zl,&eptr,&sptr);
+ else
+ zzlNext(zl,&eptr,&sptr);
}
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+ zskiplist *zsl = zs->zsl;
+ zskiplistNode *ln;
- /* Do our magic */
- rangelen++;
- if (!justcount) {
- addReplyBulk(c,ln->obj);
- if (withscores)
- addReplyDouble(c,ln->score);
+ /* If reversed, get the last node in range as starting point. */
+ if (reverse)
+ ln = zslLastInRange(zsl,range);
+ else
+ ln = zslFirstInRange(zsl,range);
+
+ /* No "first" element in the specified interval. */
+ if (ln == NULL) {
+ addReply(c,emptyreply);
+ return;
}
- /* Move to next node */
- ln = reverse ? ln->backward : ln->level[0].forward;
+ /* We don't know in advance how many matching elements there are in the
+ * list, so we push this object that will represent the multi-bulk
+ * length in the output buffer, and will "fix" it later */
+ if (!justcount)
+ replylen = addDeferredMultiBulkLength(c);
+
+ /* If there is an offset, just traverse the number of elements without
+ * checking the score because that is done in the next loop. */
+ while (ln && offset--)
+ ln = reverse ? ln->backward : ln->level[0].forward;
+
+ while (ln && limit--) {
+ /* Abort when the node is no longer in range. */
+ if (reverse) {
+ if (!zslValueGteMin(ln->score,&range)) break;
+ } else {
+ if (!zslValueLteMax(ln->score,&range)) break;
+ }
+
+ /* Do our magic */
+ rangelen++;
+ if (!justcount) {
+ addReplyBulk(c,ln->obj);
+ if (withscores)
+ addReplyDouble(c,ln->score);
+ }
+
+ /* Move to next node */
+ ln = reverse ? ln->backward : ln->level[0].forward;
+ }
+ } else {
+ redisPanic("Unknown sorted set encoding");
}
if (justcount) {
addReplyLongLong(c,(long)rangelen);
} else {
- setDeferredMultiBulkLength(c,replylen,
- withscores ? (rangelen*2) : rangelen);
+ if (withscores) rangelen *= 2;
+ setDeferredMultiBulkLength(c,replylen,rangelen);
}
}
}
void zcardCommand(redisClient *c) {
- robj *o;
- zset *zs;
+ robj *key = c->argv[1];
+ robj *zobj;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
- checkType(c,o,REDIS_ZSET)) return;
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.czero)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
- zs = o->ptr;
- addReplyLongLong(c,zs->zsl->length);
+ addReplyLongLong(c,zzlLength(zobj));
}
void zscoreCommand(redisClient *c) {
- robj *o;
- zset *zs;
- dictEntry *de;
+ robj *key = c->argv[1];
+ robj *zobj;
+ double score;
+
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
- checkType(c,o,REDIS_ZSET)) return;
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ if (zzlFind(zobj,c->argv[2],&score) != NULL)
+ addReplyDouble(c,score);
+ else
+ addReply(c,shared.nullbulk);
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+ dictEntry *de;
- zs = o->ptr;
- c->argv[2] = tryObjectEncoding(c->argv[2]);
- de = dictFind(zs->dict,c->argv[2]);
- if (!de) {
- addReply(c,shared.nullbulk);
+ c->argv[2] = tryObjectEncoding(c->argv[2]);
+ de = dictFind(zs->dict,c->argv[2]);
+ if (de != NULL) {
+ score = *(double*)dictGetEntryVal(de);
+ addReplyDouble(c,score);
+ } else {
+ addReply(c,shared.nullbulk);
+ }
} else {
- double *score = dictGetEntryVal(de);
-
- addReplyDouble(c,*score);
+ redisPanic("Unknown sorted set encoding");
}
}
void zrankGenericCommand(redisClient *c, int reverse) {
- robj *o;
- zset *zs;
- zskiplist *zsl;
- dictEntry *de;
+ robj *key = c->argv[1];
+ robj *ele = c->argv[2];
+ robj *zobj;
+ unsigned long llen;
unsigned long rank;
- double *score;
- if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
- checkType(c,o,REDIS_ZSET)) return;
+ if ((zobj = lookupKeyReadOrReply(c,key,shared.nullbulk)) == NULL ||
+ checkType(c,zobj,REDIS_ZSET)) return;
+ llen = zsLength(zobj);
- zs = o->ptr;
- zsl = zs->zsl;
- c->argv[2] = tryObjectEncoding(c->argv[2]);
- de = dictFind(zs->dict,c->argv[2]);
- if (!de) {
- addReply(c,shared.nullbulk);
- return;
- }
+ redisAssert(ele->encoding == REDIS_ENCODING_RAW);
+ if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
+ unsigned char *zl = zobj->ptr;
+ unsigned char *eptr, *sptr;
- score = dictGetEntryVal(de);
- rank = zslGetRank(zsl, *score, c->argv[2]);
- if (rank) {
- if (reverse) {
- addReplyLongLong(c, zsl->length - rank);
+ eptr = ziplistIndex(zl,0);
+ redisAssert(eptr != NULL);
+ sptr = ziplistNext(zl,eptr);
+ redisAssert(sptr != NULL);
+
+ rank = 1;
+ while(eptr != NULL) {
+ if (ziplistCompare(eptr,ele->ptr,sdslen(ele->ptr)))
+ break;
+ rank++;
+ zzlNext(zl,&eptr,&sptr);
+ }
+
+ if (eptr != NULL) {
+ if (reverse)
+ addReplyLongLong(c,llen-rank);
+ else
+ addReplyLongLong(c,rank-1);
+ } else {
+ addReply(c,shared.nullbulk);
+ }
+ } else if (zobj->encoding == REDIS_ENCODING_RAW) {
+ zset *zs = zobj->ptr;
+ zskiplist *zsl = zs->zsl;
+ dictEntry *de;
+ double score;
+
+ ele = c->argv[2] = tryObjectEncoding(c->argv[2]);
+ de = dictFind(zs->dict,ele);
+ if (de != NULL) {
+ score = *(double*)dictGetEntryVal(de);
+ rank = zslGetRank(zsl,score,ele);
+ redisAssert(rank); /* Existing elements always have a rank. */
+ if (reverse)
+ addReplyLongLong(c,llen-rank);
+ else
+ addReplyLongLong(c,rank-1);
} else {
- addReplyLongLong(c, rank-1);
+ addReply(c,shared.nullbulk);
}
} else {
- addReply(c,shared.nullbulk);
+ redisPanic("Unknown sorted set encoding");
}
}