From 68ee18558ab23066b0273ae59b3d6ab63db2dd15 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 17 Apr 2012 13:05:09 +0200 Subject: [PATCH] lookupKeyByPattern() used by SORT GET/BY rewritten. Fixes issue #460. lookupKeyByPattern() was implemented with a trick to speedup the lookup process allocating two fake Redis obejcts on the stack. However now that we propagate expires to the slave as DEL operations the lookup of the key may result into a call to expireIfNeeded() having the stack allocated object as argument, that may in turn use it to create the protocol to send to the slave. But since this fake obejcts are inherently read-only this is a problem. As a side effect of this fix there are no longer size limits in the pattern to be used with GET/BY option of SORT. See https://github.com/antirez/redis/issues/460 for bug details. --- src/sort.c | 74 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/src/sort.c b/src/sort.c index 3f02e49a..ff655c7e 100644 --- a/src/sort.c +++ b/src/sort.c @@ -9,21 +9,27 @@ redisSortOperation *createSortOperation(int type, robj *pattern) { return so; } -/* Return the value associated to the key with a name obtained - * substituting the first occurence of '*' in 'pattern' with 'subst'. +/* Return the value associated to the key with a name obtained using + * the following rules: + * + * 1) The first occurence of '*' in 'pattern' is substituted with 'subst'. + * + * 2) If 'pattern' matches the "->" string, everything on the left of + * the arrow is treated as the name of an hash field, and the part on the + * left as the key name containing an hash. The value of the specified + * field is returned. + * + * 3) If 'pattern' equals "#", the function simply returns 'subst' itself so + * that the SORT command can be used like: SORT key GET # to retrieve + * the Set/List elements directly. + * * The returned object will always have its refcount increased by 1 * when it is non-NULL. */ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { - char *p, *f; + char *p, *f, *k; sds spat, ssub; - robj keyobj, fieldobj, *o; + robj *keyobj, *fieldobj, *o; int prefixlen, sublen, postfixlen, fieldlen; - /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */ - struct { - int len; - int free; - char buf[REDIS_SORTKEY_MAX+1]; - } keyname, fieldname; /* If the pattern is "#" return the substitution object itself in order * to implement the "SORT ... GET #" feature. */ @@ -37,9 +43,10 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { * a decoded object on the fly. Otherwise getDecodedObject will just * increment the ref count, that we'll decrement later. */ subst = getDecodedObject(subst); - ssub = subst->ptr; - if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL; + + /* If we can't find '*' in the pattern we return NULL as to GET a + * fixed key does not make sense. */ p = strchr(spat,'*'); if (!p) { decrRefCount(subst); @@ -47,46 +54,49 @@ robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { } /* Find out if we're dealing with a hash dereference. */ - if ((f = strstr(p+1, "->")) != NULL) { - fieldlen = sdslen(spat)-(f-spat); - /* this also copies \0 character */ - memcpy(fieldname.buf,f+2,fieldlen-1); - fieldname.len = fieldlen-2; + if ((f = strstr(p+1, "->")) != NULL && *(f+2) != '\0') { + fieldlen = sdslen(spat)-(f-spat)-2; + fieldobj = createStringObject(f+2,fieldlen); } else { fieldlen = 0; } + /* Perform the '*' substitution. */ prefixlen = p-spat; sublen = sdslen(ssub); - postfixlen = sdslen(spat)-(prefixlen+1)-fieldlen; - memcpy(keyname.buf,spat,prefixlen); - memcpy(keyname.buf+prefixlen,ssub,sublen); - memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen); - keyname.buf[prefixlen+sublen+postfixlen] = '\0'; - keyname.len = prefixlen+sublen+postfixlen; - decrRefCount(subst); + postfixlen = sdslen(spat)-(prefixlen+1)-(fieldlen ? fieldlen+2 : 0); + keyobj = createStringObject(NULL,prefixlen+sublen+postfixlen); + k = keyobj->ptr; + memcpy(k,spat,prefixlen); + memcpy(k+prefixlen,ssub,sublen); + memcpy(k+prefixlen+sublen,p+1,postfixlen); + decrRefCount(subst); /* Incremented by decodeObject() */ /* Lookup substituted key */ - initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(struct sdshdr))); - o = lookupKeyRead(db,&keyobj); - if (o == NULL) return NULL; + o = lookupKeyRead(db,keyobj); + if (o == NULL) goto noobj; if (fieldlen > 0) { - if (o->type != REDIS_HASH || fieldname.len < 1) return NULL; + if (o->type != REDIS_HASH) goto noobj; /* Retrieve value from hash by the field name. This operation * already increases the refcount of the returned object. */ - initStaticStringObject(fieldobj,((char*)&fieldname)+(sizeof(struct sdshdr))); - o = hashTypeGetObject(o, &fieldobj); + o = hashTypeGetObject(o, fieldobj); } else { - if (o->type != REDIS_STRING) return NULL; + if (o->type != REDIS_STRING) goto noobj; /* Every object that this function returns needs to have its refcount * increased. sortCommand decreases it again. */ incrRefCount(o); } - + decrRefCount(keyobj); + if (fieldlen) decrRefCount(fieldobj); return o; + +noobj: + decrRefCount(keyobj); + if (fieldlen) decrRefCount(fieldobj); + return NULL; } /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with -- 2.45.2