X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/2c861050c17237a61fdaff4da2777c5d18ce979a..89423052cab4700291f290819d53f4fbb8dabf95:/src/sort.c diff --git a/src/sort.c b/src/sort.c index 11b73ad3..d18a5295 100644 --- a/src/sort.c +++ b/src/sort.c @@ -2,6 +2,8 @@ #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ #include /* isnan() */ +zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank); + redisSortOperation *createSortOperation(int type, robj *pattern) { redisSortOperation *so = zmalloc(sizeof(*so)); so->type = type; @@ -9,21 +11,27 @@ redisSortOperation *createSortOperation(int type, robj *pattern) { return so; } -/* Return the value associated to the key with a name obtained - * substituting the first occurence of '*' in 'pattern' with 'subst'. +/* Return the value associated to the key with a name obtained using + * the following rules: + * + * 1) The first occurence of '*' in 'pattern' is substituted with 'subst'. + * + * 2) If 'pattern' matches the "->" string, everything on the left of + * the arrow is treated as the name of an hash field, and the part on the + * left as the key name containing an hash. The value of the specified + * field is returned. + * + * 3) If 'pattern' equals "#", the function simply returns 'subst' itself so + * that the SORT command can be used like: SORT key GET # to retrieve + * the Set/List elements directly. + * * The returned object will always have its refcount increased by 1 * when it is non-NULL. */ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { - char *p, *f; + char *p, *f, *k; sds spat, ssub; - robj keyobj, fieldobj, *o; + robj *keyobj, *fieldobj = NULL, *o; int prefixlen, sublen, postfixlen, fieldlen; - /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */ - struct { - int len; - int free; - char buf[REDIS_SORTKEY_MAX+1]; - } keyname, fieldname; /* If the pattern is "#" return the substitution object itself in order * to implement the "SORT ... GET #" feature. */ @@ -37,9 +45,10 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { * a decoded object on the fly. Otherwise getDecodedObject will just * increment the ref count, that we'll decrement later. */ subst = getDecodedObject(subst); - ssub = subst->ptr; - if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL; + + /* If we can't find '*' in the pattern we return NULL as to GET a + * fixed key does not make sense. */ p = strchr(spat,'*'); if (!p) { decrRefCount(subst); @@ -47,46 +56,49 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { } /* Find out if we're dealing with a hash dereference. */ - if ((f = strstr(p+1, "->")) != NULL) { - fieldlen = sdslen(spat)-(f-spat); - /* this also copies \0 character */ - memcpy(fieldname.buf,f+2,fieldlen-1); - fieldname.len = fieldlen-2; + if ((f = strstr(p+1, "->")) != NULL && *(f+2) != '\0') { + fieldlen = sdslen(spat)-(f-spat)-2; + fieldobj = createStringObject(f+2,fieldlen); } else { fieldlen = 0; } + /* Perform the '*' substitution. */ prefixlen = p-spat; sublen = sdslen(ssub); - postfixlen = sdslen(spat)-(prefixlen+1)-fieldlen; - memcpy(keyname.buf,spat,prefixlen); - memcpy(keyname.buf+prefixlen,ssub,sublen); - memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen); - keyname.buf[prefixlen+sublen+postfixlen] = '\0'; - keyname.len = prefixlen+sublen+postfixlen; - decrRefCount(subst); + postfixlen = sdslen(spat)-(prefixlen+1)-(fieldlen ? fieldlen+2 : 0); + keyobj = createStringObject(NULL,prefixlen+sublen+postfixlen); + k = keyobj->ptr; + memcpy(k,spat,prefixlen); + memcpy(k+prefixlen,ssub,sublen); + memcpy(k+prefixlen+sublen,p+1,postfixlen); + decrRefCount(subst); /* Incremented by decodeObject() */ /* Lookup substituted key */ - initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(struct sdshdr))); - o = lookupKeyRead(db,&keyobj); - if (o == NULL) return NULL; + o = lookupKeyRead(db,keyobj); + if (o == NULL) goto noobj; - if (fieldlen > 0) { - if (o->type != REDIS_HASH || fieldname.len < 1) return NULL; + if (fieldobj) { + if (o->type != REDIS_HASH) goto noobj; /* Retrieve value from hash by the field name. This operation * already increases the refcount of the returned object. */ - initStaticStringObject(fieldobj,((char*)&fieldname)+(sizeof(struct sdshdr))); - o = hashTypeGetObject(o, &fieldobj); + o = hashTypeGetObject(o, fieldobj); } else { - if (o->type != REDIS_STRING) return NULL; + if (o->type != REDIS_STRING) goto noobj; /* Every object that this function returns needs to have its refcount * increased. sortCommand decreases it again. */ incrRefCount(o); } - + decrRefCount(keyobj); + if (fieldobj) decrRefCount(fieldobj); return o; + +noobj: + decrRefCount(keyobj); + if (fieldlen) decrRefCount(fieldobj); + return NULL; } /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with @@ -146,8 +158,9 @@ void sortCommand(redisClient *c) { /* Lookup the key to sort. It must be of the right types */ sortval = lookupKeyRead(c->db,c->argv[1]); - if (sortval && sortval->type != REDIS_SET && sortval->type != REDIS_LIST && - sortval->type != REDIS_ZSET) + if (sortval && sortval->type != REDIS_SET && + sortval->type != REDIS_LIST && + sortval->type != REDIS_ZSET) { addReply(c,shared.wrongtypeerr); return; @@ -157,7 +170,7 @@ void sortCommand(redisClient *c) { * Operations can be GET/DEL/INCR/DECR */ operations = listCreate(); listSetFreeMethod(operations,zfree); - j = 2; + j = 2; /* options start at argv[2] */ /* Now we need to protect sortval incrementing its count, in the future * SORT may have options able to overwrite/delete keys during the sorting @@ -203,17 +216,62 @@ void sortCommand(redisClient *c) { j++; } + /* For the STORE option, or when SORT is called from a Lua script, + * we want to force a specific ordering even when no explicit ordering + * was asked (SORT BY nosort). This guarantees that replication / AOF + * is deterministic. + * + * However in the case 'dontsort' is true, but the type to sort is a + * sorted set, we don't need to do anything as ordering is guaranteed + * in this special case. */ + if ((storekey || c->flags & REDIS_LUA_CLIENT) && + (dontsort && sortval->type != REDIS_ZSET)) + { + /* Force ALPHA sorting */ + dontsort = 0; + alpha = 1; + sortby = NULL; + } + /* Destructively convert encoded sorted sets for SORT. */ if (sortval->type == REDIS_ZSET) zsetConvert(sortval, REDIS_ENCODING_SKIPLIST); - /* Load the sorting vector with all the objects to sort */ + /* Objtain the length of the object to sort. */ switch(sortval->type) { case REDIS_LIST: vectorlen = listTypeLength(sortval); break; case REDIS_SET: vectorlen = setTypeSize(sortval); break; case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break; default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */ } + + /* Perform LIMIT start,count sanity checking. */ + start = (limit_start < 0) ? 0 : limit_start; + end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1; + if (start >= vectorlen) { + start = vectorlen-1; + end = vectorlen-2; + } + if (end >= vectorlen) end = vectorlen-1; + + /* Optimization: + * + * 1) if the object to sort is a sorted set. + * 2) There is nothing to sort as dontsort is true (BY ). + * 3) We have a LIMIT option that actually reduces the number of elements + * to fetch. + * + * In this case to load all the objects in the vector is a huge waste of + * resources. We just allocate a vector that is big enough for the selected + * range length, and make sure to load just this part in the vector. */ + if (sortval->type == REDIS_ZSET && + dontsort && + (start != 0 || end != vectorlen-1)) + { + vectorlen = end-start+1; + } + + /* Load the sorting vector with all the objects to sort */ vector = zmalloc(sizeof(redisSortObject)*vectorlen); j = 0; @@ -237,6 +295,48 @@ void sortCommand(redisClient *c) { j++; } setTypeReleaseIterator(si); + } else if (sortval->type == REDIS_ZSET && dontsort) { + /* Special handling for a sorted set, if 'dontsort' is true. + * This makes sure we return elements in the sorted set original + * ordering, accordingly to DESC / ASC options. + * + * Note that in this case we also handle LIMIT here in a direct + * way, just getting the required range, as an optimization. */ + + zset *zs = sortval->ptr; + zskiplist *zsl = zs->zsl; + zskiplistNode *ln; + robj *ele; + int rangelen = vectorlen; + + /* Check if starting point is trivial, before doing log(N) lookup. */ + if (desc) { + long zsetlen = dictSize(((zset*)sortval->ptr)->dict); + + ln = zsl->tail; + if (start > 0) + ln = zslGetElementByRank(zsl,zsetlen-start); + } else { + ln = zsl->header->level[0].forward; + if (start > 0) + ln = zslGetElementByRank(zsl,start+1); + } + + while(rangelen--) { + redisAssertWithInfo(c,sortval,ln != NULL); + ele = ln->obj; + vector[j].obj = ele; + vector[j].u.score = 0; + vector[j].u.cmpobj = NULL; + j++; + ln = desc ? ln->backward : ln->level[0].forward; + } + /* The code producing the output does not know that in the case of + * sorted set, 'dontsort', and LIMIT, we are able to get just the + * range, already sorted, so we need to adjust "start" and "end" + * to make sure start is set to 0. */ + end -= start; + start = 0; } else if (sortval->type == REDIS_ZSET) { dict *set = ((zset*)sortval->ptr)->dict; dictIterator *di; @@ -297,17 +397,6 @@ void sortCommand(redisClient *c) { } } - /* We are ready to sort the vector... perform a bit of sanity check - * on the LIMIT option too. We'll use a partial version of quicksort. */ - start = (limit_start < 0) ? 0 : limit_start; - end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1; - if (start >= vectorlen) { - start = vectorlen-1; - end = vectorlen-2; - } - if (end >= vectorlen) end = vectorlen-1; - - server.sort_dontsort = dontsort; if (dontsort == 0) { server.sort_desc = desc; server.sort_alpha = alpha;