X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/5b4bff9c175a0189672d95eb953df5704c891d0c..c8852ebf191387779275a70b5540f30976b9ef1f:/src/t_set.c?ds=inline diff --git a/src/t_set.c b/src/t_set.c index 94b97633..46a0c6ee 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -1,15 +1,259 @@ +/* + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + #include "redis.h" /*----------------------------------------------------------------------------- * Set Commands *----------------------------------------------------------------------------*/ +void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op); + +/* Factory method to return a set that *can* hold "value". When the object has + * an integer-encodable value, an intset will be returned. Otherwise a regular + * hash table. */ +robj *setTypeCreate(robj *value) { + if (isObjectRepresentableAsLongLong(value,NULL) == REDIS_OK) + return createIntsetObject(); + return createSetObject(); +} + +int setTypeAdd(robj *subject, robj *value) { + long long llval; + if (subject->encoding == REDIS_ENCODING_HT) { + if (dictAdd(subject->ptr,value,NULL) == DICT_OK) { + incrRefCount(value); + return 1; + } + } else if (subject->encoding == REDIS_ENCODING_INTSET) { + if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) { + uint8_t success = 0; + subject->ptr = intsetAdd(subject->ptr,llval,&success); + if (success) { + /* Convert to regular set when the intset contains + * too many entries. */ + if (intsetLen(subject->ptr) > server.set_max_intset_entries) + setTypeConvert(subject,REDIS_ENCODING_HT); + return 1; + } + } else { + /* Failed to get integer from object, convert to regular set. */ + setTypeConvert(subject,REDIS_ENCODING_HT); + + /* The set *was* an intset and this value is not integer + * encodable, so dictAdd should always work. */ + redisAssertWithInfo(NULL,value,dictAdd(subject->ptr,value,NULL) == DICT_OK); + incrRefCount(value); + return 1; + } + } else { + redisPanic("Unknown set encoding"); + } + return 0; +} + +int setTypeRemove(robj *setobj, robj *value) { + long long llval; + if (setobj->encoding == REDIS_ENCODING_HT) { + if (dictDelete(setobj->ptr,value) == DICT_OK) { + if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr); + return 1; + } + } else if (setobj->encoding == REDIS_ENCODING_INTSET) { + if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) { + int success; + setobj->ptr = intsetRemove(setobj->ptr,llval,&success); + if (success) return 1; + } + } else { + redisPanic("Unknown set encoding"); + } + return 0; +} + +int setTypeIsMember(robj *subject, robj *value) { + long long llval; + if (subject->encoding == REDIS_ENCODING_HT) { + return dictFind((dict*)subject->ptr,value) != NULL; + } else if (subject->encoding == REDIS_ENCODING_INTSET) { + if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) { + return intsetFind((intset*)subject->ptr,llval); + } + } else { + redisPanic("Unknown set encoding"); + } + return 0; +} + +setTypeIterator *setTypeInitIterator(robj *subject) { + setTypeIterator *si = zmalloc(sizeof(setTypeIterator)); + si->subject = subject; + si->encoding = subject->encoding; + if (si->encoding == REDIS_ENCODING_HT) { + si->di = dictGetIterator(subject->ptr); + } else if (si->encoding == REDIS_ENCODING_INTSET) { + si->ii = 0; + } else { + redisPanic("Unknown set encoding"); + } + return si; +} + +void setTypeReleaseIterator(setTypeIterator *si) { + if (si->encoding == REDIS_ENCODING_HT) + dictReleaseIterator(si->di); + zfree(si); +} + +/* Move to the next entry in the set. Returns the object at the current + * position. + * + * Since set elements can be internally be stored as redis objects or + * simple arrays of integers, setTypeNext returns the encoding of the + * set object you are iterating, and will populate the appropriate pointer + * (eobj) or (llobj) accordingly. + * + * When there are no longer elements -1 is returned. + * Returned objects ref count is not incremented, so this function is + * copy on write friendly. */ +int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) { + if (si->encoding == REDIS_ENCODING_HT) { + dictEntry *de = dictNext(si->di); + if (de == NULL) return -1; + *objele = dictGetKey(de); + } else if (si->encoding == REDIS_ENCODING_INTSET) { + if (!intsetGet(si->subject->ptr,si->ii++,llele)) + return -1; + } + return si->encoding; +} + +/* The not copy on write friendly version but easy to use version + * of setTypeNext() is setTypeNextObject(), returning new objects + * or incrementing the ref count of returned objects. So if you don't + * retain a pointer to this object you should call decrRefCount() against it. + * + * This function is the way to go for write operations where COW is not + * an issue as the result will be anyway of incrementing the ref count. */ +robj *setTypeNextObject(setTypeIterator *si) { + int64_t intele; + robj *objele; + int encoding; + + encoding = setTypeNext(si,&objele,&intele); + switch(encoding) { + case -1: return NULL; + case REDIS_ENCODING_INTSET: + return createStringObjectFromLongLong(intele); + case REDIS_ENCODING_HT: + incrRefCount(objele); + return objele; + default: + redisPanic("Unsupported encoding"); + } + return NULL; /* just to suppress warnings */ +} + +/* Return random element from a non empty set. + * The returned element can be a int64_t value if the set is encoded + * as an "intset" blob of integers, or a redis object if the set + * is a regular set. + * + * The caller provides both pointers to be populated with the right + * object. The return value of the function is the object->encoding + * field of the object and is used by the caller to check if the + * int64_t pointer or the redis object pointere was populated. + * + * When an object is returned (the set was a real set) the ref count + * of the object is not incremented so this function can be considered + * copy on write friendly. */ +int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) { + if (setobj->encoding == REDIS_ENCODING_HT) { + dictEntry *de = dictGetRandomKey(setobj->ptr); + *objele = dictGetKey(de); + } else if (setobj->encoding == REDIS_ENCODING_INTSET) { + *llele = intsetRandom(setobj->ptr); + } else { + redisPanic("Unknown set encoding"); + } + return setobj->encoding; +} + +unsigned long setTypeSize(robj *subject) { + if (subject->encoding == REDIS_ENCODING_HT) { + return dictSize((dict*)subject->ptr); + } else if (subject->encoding == REDIS_ENCODING_INTSET) { + return intsetLen((intset*)subject->ptr); + } else { + redisPanic("Unknown set encoding"); + } +} + +/* Convert the set to specified encoding. The resulting dict (when converting + * to a hash table) is presized to hold the number of elements in the original + * set. */ +void setTypeConvert(robj *setobj, int enc) { + setTypeIterator *si; + redisAssertWithInfo(NULL,setobj,setobj->type == REDIS_SET && + setobj->encoding == REDIS_ENCODING_INTSET); + + if (enc == REDIS_ENCODING_HT) { + int64_t intele; + dict *d = dictCreate(&setDictType,NULL); + robj *element; + + /* Presize the dict to avoid rehashing */ + dictExpand(d,intsetLen(setobj->ptr)); + + /* To add the elements we extract integers and create redis objects */ + si = setTypeInitIterator(setobj); + while (setTypeNext(si,NULL,&intele) != -1) { + element = createStringObjectFromLongLong(intele); + redisAssertWithInfo(NULL,element,dictAdd(d,element,NULL) == DICT_OK); + } + setTypeReleaseIterator(si); + + setobj->encoding = REDIS_ENCODING_HT; + zfree(setobj->ptr); + setobj->ptr = d; + } else { + redisPanic("Unsupported set conversion"); + } +} + void saddCommand(redisClient *c) { robj *set; + int j, added = 0; set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { - set = createSetObject(); + set = setTypeCreate(c->argv[2]); dbAdd(c->db,c->argv[1],set); } else { if (set->type != REDIS_SET) { @@ -17,68 +261,82 @@ void saddCommand(redisClient *c) { return; } } - if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) { - incrRefCount(c->argv[2]); - touchWatchedKey(c->db,c->argv[1]); - server.dirty++; - addReply(c,shared.cone); - } else { - addReply(c,shared.czero); + + for (j = 2; j < c->argc; j++) { + c->argv[j] = tryObjectEncoding(c->argv[j]); + if (setTypeAdd(set,c->argv[j])) added++; } + if (added) signalModifiedKey(c->db,c->argv[1]); + server.dirty += added; + addReplyLongLong(c,added); } void sremCommand(redisClient *c) { robj *set; + int j, deleted = 0; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; - if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) { - server.dirty++; - touchWatchedKey(c->db,c->argv[1]); - if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]); - addReply(c,shared.cone); - } else { - addReply(c,shared.czero); + for (j = 2; j < c->argc; j++) { + if (setTypeRemove(set,c->argv[j])) { + deleted++; + if (setTypeSize(set) == 0) { + dbDelete(c->db,c->argv[1]); + break; + } + } + } + if (deleted) { + signalModifiedKey(c->db,c->argv[1]); + server.dirty += deleted; } + addReplyLongLong(c,deleted); } void smoveCommand(redisClient *c) { - robj *srcset, *dstset; - + robj *srcset, *dstset, *ele; srcset = lookupKeyWrite(c->db,c->argv[1]); dstset = lookupKeyWrite(c->db,c->argv[2]); + ele = c->argv[3] = tryObjectEncoding(c->argv[3]); - /* If the source key does not exist return 0, if it's of the wrong type - * raise an error */ - if (srcset == NULL || srcset->type != REDIS_SET) { - addReply(c, srcset ? shared.wrongtypeerr : shared.czero); + /* If the source key does not exist return 0 */ + if (srcset == NULL) { + addReply(c,shared.czero); return; } - /* Error if the destination key is not a set as well */ - if (dstset && dstset->type != REDIS_SET) { - addReply(c,shared.wrongtypeerr); + + /* If the source key has the wrong type, or the destination key + * is set and has the wrong type, return with an error. */ + if (checkType(c,srcset,REDIS_SET) || + (dstset && checkType(c,dstset,REDIS_SET))) return; + + /* If srcset and dstset are equal, SMOVE is a no-op */ + if (srcset == dstset) { + addReply(c,shared.cone); return; } - /* Remove the element from the source set */ - if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) { - /* Key not found in the src set! return zero */ + + /* If the element cannot be removed from the src set, return 0. */ + if (!setTypeRemove(srcset,ele)) { addReply(c,shared.czero); return; } - if (dictSize((dict*)srcset->ptr) == 0 && srcset != dstset) - dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[2]); + + /* Remove the src set from the database when empty */ + if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[2]); server.dirty++; - /* Add the element to the destination set */ + + /* Create the destination set when it doesn't exist */ if (!dstset) { - dstset = createSetObject(); + dstset = setTypeCreate(ele); dbAdd(c->db,c->argv[2],dstset); } - if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK) - incrRefCount(c->argv[3]); + + /* An extra key has changed when ele was successfully added to dstset */ + if (setTypeAdd(dstset,ele)) server.dirty++; addReply(c,shared.cone); } @@ -88,7 +346,8 @@ void sismemberCommand(redisClient *c) { if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; - if (dictFind(set->ptr,c->argv[2])) + c->argv[2] = tryObjectEncoding(c->argv[2]); + if (setTypeIsMember(set,c->argv[2])) addReply(c,shared.cone); else addReply(c,shared.czero); @@ -96,78 +355,234 @@ void sismemberCommand(redisClient *c) { void scardCommand(redisClient *c) { robj *o; - dict *s; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_SET)) return; - s = o->ptr; - addReplyUlong(c,dictSize(s)); + addReplyLongLong(c,setTypeSize(o)); } void spopCommand(redisClient *c) { - robj *set; - dictEntry *de; + robj *set, *ele, *aux; + int64_t llele; + int encoding; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - de = dictGetRandomKey(set->ptr); - if (de == NULL) { - addReply(c,shared.nullbulk); + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + ele = createStringObjectFromLongLong(llele); + set->ptr = intsetRemove(set->ptr,llele,NULL); } else { - robj *ele = dictGetEntryKey(de); + incrRefCount(ele); + setTypeRemove(set,ele); + } - addReplyBulk(c,ele); - dictDelete(set->ptr,ele); - if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - server.dirty++; + /* Replicate/AOF this command as an SREM operation */ + aux = createStringObject("SREM",4); + rewriteClientCommandVector(c,3,aux,c->argv[1],ele); + decrRefCount(ele); + decrRefCount(aux); + + addReplyBulk(c,ele); + if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[1]); + server.dirty++; +} + +/* handle the "SRANDMEMBER key " variant. The normal version of the + * command is handled by the srandmemberCommand() function itself. */ + +/* How many times bigger should be the set compared to the requested size + * for us to don't use the "remove elements" strategy? Read later in the + * implementation for more info. */ +#define SRANDMEMBER_SUB_STRATEGY_MUL 3 + +void srandmemberWithCountCommand(redisClient *c) { + long l; + unsigned long count, size; + int uniq = 1; + robj *set, *ele; + int64_t llele; + int encoding; + + dict *d; + + if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != REDIS_OK) return; + if (l >= 0) { + count = (unsigned) l; + } else { + /* A negative count means: return the same elements multiple times + * (i.e. don't remove the extracted element after every extraction). */ + count = -l; + uniq = 0; + } + + if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) + == NULL || checkType(c,set,REDIS_SET)) return; + size = setTypeSize(set); + + /* If count is zero, serve it ASAP to avoid special cases later. */ + if (count == 0) { + addReply(c,shared.emptymultibulk); + return; + } + + /* CASE 1: The count was negative, so the extraction method is just: + * "return N random elements" sampling the whole set every time. + * This case is trivial and can be served without auxiliary data + * structures. */ + if (!uniq) { + addReplyMultiBulkLen(c,count); + while(count--) { + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + addReplyBulkLongLong(c,llele); + } else { + addReplyBulk(c,ele); + } + } + return; + } + + /* CASE 2: + * The number of requested elements is greater than the number of + * elements inside the set: simply return the whole set. */ + if (count >= size) { + sunionDiffGenericCommand(c,c->argv,c->argc-1,NULL,REDIS_OP_UNION); + return; + } + + /* For CASE 3 and CASE 4 we need an auxiliary dictionary. */ + d = dictCreate(&setDictType,NULL); + + /* CASE 3: + * The number of elements inside the set is not greater than + * SRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements. + * In this case we create a set from scratch with all the elements, and + * subtract random elements to reach the requested number of elements. + * + * This is done because if the number of requsted elements is just + * a bit less than the number of elements in the set, the natural approach + * used into CASE 3 is highly inefficient. */ + if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) { + setTypeIterator *si; + + /* Add all the elements into the temporary dictionary. */ + si = setTypeInitIterator(set); + while((encoding = setTypeNext(si,&ele,&llele)) != -1) { + int retval; + + if (encoding == REDIS_ENCODING_INTSET) { + retval = dictAdd(d,createStringObjectFromLongLong(llele),NULL); + } else if (ele->encoding == REDIS_ENCODING_RAW) { + retval = dictAdd(d,dupStringObject(ele),NULL); + } else if (ele->encoding == REDIS_ENCODING_INT) { + retval = dictAdd(d, + createStringObjectFromLongLong((long)ele->ptr),NULL); + } + redisAssert(retval == DICT_OK); + } + setTypeReleaseIterator(si); + redisAssert(dictSize(d) == size); + + /* Remove random elements to reach the right count. */ + while(size > count) { + dictEntry *de; + + de = dictGetRandomKey(d); + dictDelete(d,dictGetKey(de)); + size--; + } + } + + /* CASE 4: We have a big set compared to the requested number of elements. + * In this case we can simply get random elements from the set and add + * to the temporary set, trying to eventually get enough unique elements + * to reach the specified count. */ + else { + unsigned long added = 0; + + while(added < count) { + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + ele = createStringObjectFromLongLong(llele); + } else if (ele->encoding == REDIS_ENCODING_RAW) { + ele = dupStringObject(ele); + } else if (ele->encoding == REDIS_ENCODING_INT) { + ele = createStringObjectFromLongLong((long)ele->ptr); + } + /* Try to add the object to the dictionary. If it already exists + * free it, otherwise increment the number of objects we have + * in the result dictionary. */ + if (dictAdd(d,ele,NULL) == DICT_OK) + added++; + else + decrRefCount(ele); + } + } + + /* CASE 3 & 4: send the result to the user. */ + { + dictIterator *di; + dictEntry *de; + + addReplyMultiBulkLen(c,count); + di = dictGetIterator(d); + while((de = dictNext(di)) != NULL) + addReplyBulk(c,dictGetKey(de)); + dictReleaseIterator(di); + dictRelease(d); } } void srandmemberCommand(redisClient *c) { - robj *set; - dictEntry *de; + robj *set, *ele; + int64_t llele; + int encoding; + + if (c->argc == 3) { + srandmemberWithCountCommand(c); + return; + } else if (c->argc > 3) { + addReply(c,shared.syntaxerr); + return; + } if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - de = dictGetRandomKey(set->ptr); - if (de == NULL) { - addReply(c,shared.nullbulk); + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + addReplyBulkLongLong(c,llele); } else { - robj *ele = dictGetEntryKey(de); - addReplyBulk(c,ele); } } int qsortCompareSetsByCardinality(const void *s1, const void *s2) { - dict **d1 = (void*) s1, **d2 = (void*) s2; - - return dictSize(*d1)-dictSize(*d2); + return setTypeSize(*(robj**)s1)-setTypeSize(*(robj**)s2); } -void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) { - dict **dv = zmalloc(sizeof(dict*)*setsnum); - dictIterator *di; - dictEntry *de; - robj *lenobj = NULL, *dstset = NULL; +void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) { + robj **sets = zmalloc(sizeof(robj*)*setnum); + setTypeIterator *si; + robj *eleobj, *dstset = NULL; + int64_t intobj; + void *replylen = NULL; unsigned long j, cardinality = 0; + int encoding; - for (j = 0; j < setsnum; j++) { - robj *setobj; - - setobj = dstkey ? - lookupKeyWrite(c->db,setskeys[j]) : - lookupKeyRead(c->db,setskeys[j]); + for (j = 0; j < setnum; j++) { + robj *setobj = dstkey ? + lookupKeyWrite(c->db,setkeys[j]) : + lookupKeyRead(c->db,setkeys[j]); if (!setobj) { - zfree(dv); + zfree(sets); if (dstkey) { if (dbDelete(c->db,dstkey)) { - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } addReply(c,shared.czero); @@ -176,16 +591,15 @@ void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum } return; } - if (setobj->type != REDIS_SET) { - zfree(dv); - addReply(c,shared.wrongtypeerr); + if (checkType(c,setobj,REDIS_SET)) { + zfree(sets); return; } - dv[j] = setobj->ptr; + sets[j] = setobj; } /* Sort sets from the smallest to largest, this will improve our * algorithm's performace */ - qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality); + qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality); /* The first thing we should output is the total number of elements... * since this is a multi-bulk write, but at this stage we don't know @@ -193,55 +607,92 @@ void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum * to the output list and save the pointer to later modify it with the * right length */ if (!dstkey) { - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); + replylen = addDeferredMultiBulkLength(c); } else { /* If we have a target key where to store the resulting set * create this key with an empty set inside */ - dstset = createSetObject(); + dstset = createIntsetObject(); } /* Iterate all the elements of the first (smallest) set, and test * the element against all the other sets, if at least one set does * not include the element it is discarded */ - di = dictGetIterator(dv[0]); - - while((de = dictNext(di)) != NULL) { - robj *ele; + si = setTypeInitIterator(sets[0]); + while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) { + for (j = 1; j < setnum; j++) { + if (sets[j] == sets[0]) continue; + if (encoding == REDIS_ENCODING_INTSET) { + /* intset with intset is simple... and fast */ + if (sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,intobj)) + { + break; + /* in order to compare an integer with an object we + * have to use the generic function, creating an object + * for this */ + } else if (sets[j]->encoding == REDIS_ENCODING_HT) { + eleobj = createStringObjectFromLongLong(intobj); + if (!setTypeIsMember(sets[j],eleobj)) { + decrRefCount(eleobj); + break; + } + decrRefCount(eleobj); + } + } else if (encoding == REDIS_ENCODING_HT) { + /* Optimization... if the source object is integer + * encoded AND the target set is an intset, we can get + * a much faster path. */ + if (eleobj->encoding == REDIS_ENCODING_INT && + sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr)) + { + break; + /* else... object to object check is easy as we use the + * type agnostic API here. */ + } else if (!setTypeIsMember(sets[j],eleobj)) { + break; + } + } + } - for (j = 1; j < setsnum; j++) - if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break; - if (j != setsnum) - continue; /* at least one set does not contain the member */ - ele = dictGetEntryKey(de); - if (!dstkey) { - addReplyBulk(c,ele); - cardinality++; - } else { - dictAdd(dstset->ptr,ele,NULL); - incrRefCount(ele); + /* Only take action when all sets contain the member */ + if (j == setnum) { + if (!dstkey) { + if (encoding == REDIS_ENCODING_HT) + addReplyBulk(c,eleobj); + else + addReplyBulkLongLong(c,intobj); + cardinality++; + } else { + if (encoding == REDIS_ENCODING_INTSET) { + eleobj = createStringObjectFromLongLong(intobj); + setTypeAdd(dstset,eleobj); + decrRefCount(eleobj); + } else { + setTypeAdd(dstset,eleobj); + } + } } } - dictReleaseIterator(di); + setTypeReleaseIterator(si); if (dstkey) { /* Store the resulting set into the target, if the intersection * is not an empty set. */ dbDelete(c->db,dstkey); - if (dictSize((dict*)dstset->ptr) > 0) { + if (setTypeSize(dstset) > 0) { dbAdd(c->db,dstkey,dstset); - addReplyLongLong(c,dictSize((dict*)dstset->ptr)); + addReplyLongLong(c,setTypeSize(dstset)); } else { decrRefCount(dstset); addReply(c,shared.czero); } - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } else { - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality); + setDeferredMultiBulkLength(c,replylen,cardinality); } - zfree(dv); + zfree(sets); } void sinterCommand(redisClient *c) { @@ -252,93 +703,86 @@ void sinterstoreCommand(redisClient *c) { sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]); } -void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) { - dict **dv = zmalloc(sizeof(dict*)*setsnum); - dictIterator *di; - dictEntry *de; - robj *dstset = NULL; - int j, cardinality = 0; +#define REDIS_OP_UNION 0 +#define REDIS_OP_DIFF 1 +#define REDIS_OP_INTER 2 - for (j = 0; j < setsnum; j++) { - robj *setobj; +void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op) { + robj **sets = zmalloc(sizeof(robj*)*setnum); + setTypeIterator *si; + robj *ele, *dstset = NULL; + int j, cardinality = 0; - setobj = dstkey ? - lookupKeyWrite(c->db,setskeys[j]) : - lookupKeyRead(c->db,setskeys[j]); + for (j = 0; j < setnum; j++) { + robj *setobj = dstkey ? + lookupKeyWrite(c->db,setkeys[j]) : + lookupKeyRead(c->db,setkeys[j]); if (!setobj) { - dv[j] = NULL; + sets[j] = NULL; continue; } - if (setobj->type != REDIS_SET) { - zfree(dv); - addReply(c,shared.wrongtypeerr); + if (checkType(c,setobj,REDIS_SET)) { + zfree(sets); return; } - dv[j] = setobj->ptr; + sets[j] = setobj; } /* We need a temp set object to store our union. If the dstkey * is not NULL (that is, we are inside an SUNIONSTORE operation) then * this set object will be the resulting object to set into the target key*/ - dstset = createSetObject(); + dstset = createIntsetObject(); /* Iterate all the elements of all the sets, add every element a single * time to the result set */ - for (j = 0; j < setsnum; j++) { - if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */ - if (!dv[j]) continue; /* non existing keys are like empty sets */ - - di = dictGetIterator(dv[j]); + for (j = 0; j < setnum; j++) { + if (op == REDIS_OP_DIFF && j == 0 && !sets[j]) break; /* result set is empty */ + if (!sets[j]) continue; /* non existing keys are like empty sets */ - while((de = dictNext(di)) != NULL) { - robj *ele; - - /* dictAdd will not add the same element multiple times */ - ele = dictGetEntryKey(de); + si = setTypeInitIterator(sets[j]); + while((ele = setTypeNextObject(si)) != NULL) { if (op == REDIS_OP_UNION || j == 0) { - if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) { - incrRefCount(ele); + if (setTypeAdd(dstset,ele)) { cardinality++; } } else if (op == REDIS_OP_DIFF) { - if (dictDelete(dstset->ptr,ele) == DICT_OK) { + if (setTypeRemove(dstset,ele)) { cardinality--; } } + decrRefCount(ele); } - dictReleaseIterator(di); + setTypeReleaseIterator(si); - /* result set is empty? Exit asap. */ + /* Exit when result set is empty. */ if (op == REDIS_OP_DIFF && cardinality == 0) break; } /* Output the content of the resulting set, if not in STORE mode */ if (!dstkey) { - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality)); - di = dictGetIterator(dstset->ptr); - while((de = dictNext(di)) != NULL) { - robj *ele; - - ele = dictGetEntryKey(de); + addReplyMultiBulkLen(c,cardinality); + si = setTypeInitIterator(dstset); + while((ele = setTypeNextObject(si)) != NULL) { addReplyBulk(c,ele); + decrRefCount(ele); } - dictReleaseIterator(di); + setTypeReleaseIterator(si); decrRefCount(dstset); } else { /* If we have a target key where to store the resulting set * create this key with the result set inside */ dbDelete(c->db,dstkey); - if (dictSize((dict*)dstset->ptr) > 0) { + if (setTypeSize(dstset) > 0) { dbAdd(c->db,dstkey,dstset); - addReplyLongLong(c,dictSize((dict*)dstset->ptr)); + addReplyLongLong(c,setTypeSize(dstset)); } else { decrRefCount(dstset); addReply(c,shared.czero); } - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } - zfree(dv); + zfree(sets); } void sunionCommand(redisClient *c) {