X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/237194b76064c95028b14e9ff7d7abfb10abd63d..4b1f6ad3e7a5c7c28618e43e7539c9a937bf8521:/src/sort.c?ds=inline diff --git a/src/sort.c b/src/sort.c index 1a8376a8..e5178cd0 100644 --- a/src/sort.c +++ b/src/sort.c @@ -1,5 +1,6 @@ #include "redis.h" #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ +#include /* isnan() */ redisSortOperation *createSortOperation(int type, robj *pattern) { redisSortOperation *so = zmalloc(sizeof(*so)); @@ -8,21 +9,27 @@ redisSortOperation *createSortOperation(int type, robj *pattern) { return so; } -/* Return the value associated to the key with a name obtained - * substituting the first occurence of '*' in 'pattern' with 'subst'. +/* Return the value associated to the key with a name obtained using + * the following rules: + * + * 1) The first occurence of '*' in 'pattern' is substituted with 'subst'. + * + * 2) If 'pattern' matches the "->" string, everything on the left of + * the arrow is treated as the name of an hash field, and the part on the + * left as the key name containing an hash. The value of the specified + * field is returned. + * + * 3) If 'pattern' equals "#", the function simply returns 'subst' itself so + * that the SORT command can be used like: SORT key GET # to retrieve + * the Set/List elements directly. + * * The returned object will always have its refcount increased by 1 * when it is non-NULL. */ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { - char *p, *f; + char *p, *f, *k; sds spat, ssub; - robj keyobj, fieldobj, *o; + robj *keyobj, *fieldobj = NULL, *o; int prefixlen, sublen, postfixlen, fieldlen; - /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */ - struct { - int len; - int free; - char buf[REDIS_SORTKEY_MAX+1]; - } keyname, fieldname; /* If the pattern is "#" return the substitution object itself in order * to implement the "SORT ... GET #" feature. */ @@ -36,9 +43,10 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { * a decoded object on the fly. Otherwise getDecodedObject will just * increment the ref count, that we'll decrement later. */ subst = getDecodedObject(subst); - ssub = subst->ptr; - if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL; + + /* If we can't find '*' in the pattern we return NULL as to GET a + * fixed key does not make sense. */ p = strchr(spat,'*'); if (!p) { decrRefCount(subst); @@ -46,46 +54,49 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { } /* Find out if we're dealing with a hash dereference. */ - if ((f = strstr(p+1, "->")) != NULL) { - fieldlen = sdslen(spat)-(f-spat); - /* this also copies \0 character */ - memcpy(fieldname.buf,f+2,fieldlen-1); - fieldname.len = fieldlen-2; + if ((f = strstr(p+1, "->")) != NULL && *(f+2) != '\0') { + fieldlen = sdslen(spat)-(f-spat)-2; + fieldobj = createStringObject(f+2,fieldlen); } else { fieldlen = 0; } + /* Perform the '*' substitution. */ prefixlen = p-spat; sublen = sdslen(ssub); - postfixlen = sdslen(spat)-(prefixlen+1)-fieldlen; - memcpy(keyname.buf,spat,prefixlen); - memcpy(keyname.buf+prefixlen,ssub,sublen); - memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen); - keyname.buf[prefixlen+sublen+postfixlen] = '\0'; - keyname.len = prefixlen+sublen+postfixlen; - decrRefCount(subst); + postfixlen = sdslen(spat)-(prefixlen+1)-(fieldlen ? fieldlen+2 : 0); + keyobj = createStringObject(NULL,prefixlen+sublen+postfixlen); + k = keyobj->ptr; + memcpy(k,spat,prefixlen); + memcpy(k+prefixlen,ssub,sublen); + memcpy(k+prefixlen+sublen,p+1,postfixlen); + decrRefCount(subst); /* Incremented by decodeObject() */ /* Lookup substituted key */ - initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(struct sdshdr))); - o = lookupKeyRead(db,&keyobj); - if (o == NULL) return NULL; + o = lookupKeyRead(db,keyobj); + if (o == NULL) goto noobj; - if (fieldlen > 0) { - if (o->type != REDIS_HASH || fieldname.len < 1) return NULL; + if (fieldobj) { + if (o->type != REDIS_HASH) goto noobj; /* Retrieve value from hash by the field name. This operation * already increases the refcount of the returned object. */ - initStaticStringObject(fieldobj,((char*)&fieldname)+(sizeof(struct sdshdr))); - o = hashTypeGetObject(o, &fieldobj); + o = hashTypeGetObject(o, fieldobj); } else { - if (o->type != REDIS_STRING) return NULL; + if (o->type != REDIS_STRING) goto noobj; /* Every object that this function returns needs to have its refcount * increased. sortCommand decreases it again. */ incrRefCount(o); } - + decrRefCount(keyobj); + if (fieldobj) decrRefCount(fieldobj); return o; + +noobj: + decrRefCount(keyobj); + if (fieldlen) decrRefCount(fieldobj); + return NULL; } /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with @@ -102,7 +113,10 @@ int sortCompare(const void *s1, const void *s2) { } else if (so1->u.score < so2->u.score) { cmp = -1; } else { - cmp = 0; + /* Objects have the same score, but we don't want the comparison + * to be undefined, so we compare objects lexicographycally. + * This way the result of SORT is deterministic. */ + cmp = compareStringObjects(so1->obj,so2->obj); } } else { /* Alphanumeric sorting */ @@ -133,9 +147,10 @@ void sortCommand(redisClient *c) { list *operations; unsigned int outputlen = 0; int desc = 0, alpha = 0; - int limit_start = 0, limit_count = -1, start, end; + long limit_start = 0, limit_count = -1, start, end; int j, dontsort = 0, vectorlen; int getop = 0; /* GET operation counter */ + int int_convertion_error = 0; robj *sortval, *sortby = NULL, *storekey = NULL; redisSortObject *vector; /* Resulting vector to sort */ @@ -172,8 +187,8 @@ void sortCommand(redisClient *c) { } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) { alpha = 1; } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) { - limit_start = atoi(c->argv[j+1]->ptr); - limit_count = atoi(c->argv[j+2]->ptr); + if ((getLongFromObjectOrReply(c, c->argv[j+1], &limit_start, NULL) != REDIS_OK) || + (getLongFromObjectOrReply(c, c->argv[j+2], &limit_count, NULL) != REDIS_OK)) return; j+=2; } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) { storekey = c->argv[j+1]; @@ -198,6 +213,18 @@ void sortCommand(redisClient *c) { j++; } + /* If we have STORE we need to force sorting for deterministic output + * and replication. We use alpha sorting since this is guaranteed to + * work with any input. + * + * We also want determinism when SORT is called from Lua scripts, so + * in this case we also force alpha sorting. */ + if ((storekey || c->flags & REDIS_LUA_CLIENT) && dontsort) { + dontsort = 0; + alpha = 1; + sortby = NULL; + } + /* Destructively convert encoded sorted sets for SORT. */ if (sortval->type == REDIS_ZSET) zsetConvert(sortval, REDIS_ENCODING_SKIPLIST); @@ -266,7 +293,14 @@ void sortCommand(redisClient *c) { if (sortby) vector[j].u.cmpobj = getDecodedObject(byval); } else { if (byval->encoding == REDIS_ENCODING_RAW) { - vector[j].u.score = strtod(byval->ptr,NULL); + char *eptr; + + vector[j].u.score = strtod(byval->ptr,&eptr); + if (eptr[0] != '\0' || errno == ERANGE || + isnan(vector[j].u.score)) + { + int_convertion_error = 1; + } } else if (byval->encoding == REDIS_ENCODING_INT) { /* Don't need to decode the object if it's * integer-encoded (the only encoding supported) so @@ -308,7 +342,9 @@ void sortCommand(redisClient *c) { /* Send command output to the output buffer, performing the specified * GET/DEL/INCR/DECR operations if any. */ outputlen = getop ? getop*(end-start+1) : end-start+1; - if (storekey == NULL) { + if (int_convertion_error) { + addReplyError(c,"One or more scores can't be converted into double"); + } else if (storekey == NULL) { /* STORE option not specified, sent the sorting result to client */ addReplyMultiBulkLen(c,outputlen); for (j = start; j <= end; j++) { @@ -367,9 +403,14 @@ void sortCommand(redisClient *c) { } } } - if (outputlen) setKey(c->db,storekey,sobj); + if (outputlen) { + setKey(c->db,storekey,sobj); + server.dirty += outputlen; + } else if (dbDelete(c->db,storekey)) { + signalModifiedKey(c->db,storekey); + server.dirty++; + } decrRefCount(sobj); - server.dirty += outputlen; addReplyLongLong(c,outputlen); }