X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/f7f12a606c39fcb09a203faaaa12c49882409d8f..c23c657cdd8696c66295962947b85c793c5d7b93:/src/t_set.c diff --git a/src/t_set.c b/src/t_set.c index 68e13227..fada1095 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -4,6 +4,8 @@ * Set Commands *----------------------------------------------------------------------------*/ +void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op); + /* Factory method to return a set that *can* hold "value". When the object has * an integer-encodable value, an intset will be returned. Otherwise a regular * hash table. */ @@ -37,7 +39,7 @@ int setTypeAdd(robj *subject, robj *value) { /* The set *was* an intset and this value is not integer * encodable, so dictAdd should always work. */ - redisAssert(dictAdd(subject->ptr,value,NULL) == DICT_OK); + redisAssertWithInfo(NULL,value,dictAdd(subject->ptr,value,NULL) == DICT_OK); incrRefCount(value); return 1; } @@ -47,17 +49,17 @@ int setTypeAdd(robj *subject, robj *value) { return 0; } -int setTypeRemove(robj *subject, robj *value) { +int setTypeRemove(robj *setobj, robj *value) { long long llval; - if (subject->encoding == REDIS_ENCODING_HT) { - if (dictDelete(subject->ptr,value) == DICT_OK) { - if (htNeedsResize(subject->ptr)) dictResize(subject->ptr); + if (setobj->encoding == REDIS_ENCODING_HT) { + if (dictDelete(setobj->ptr,value) == DICT_OK) { + if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr); return 1; } - } else if (subject->encoding == REDIS_ENCODING_INTSET) { + } else if (setobj->encoding == REDIS_ENCODING_INTSET) { if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) { - uint8_t success; - subject->ptr = intsetRemove(subject->ptr,llval,&success); + int success; + setobj->ptr = intsetRemove(setobj->ptr,llval,&success); if (success) return 1; } } else { @@ -101,40 +103,77 @@ void setTypeReleaseIterator(setTypeIterator *si) { } /* Move to the next entry in the set. Returns the object at the current - * position, or NULL when the end is reached. This object will have its - * refcount incremented, so the caller needs to take care of this. */ -robj *setTypeNext(setTypeIterator *si) { - robj *ret = NULL; + * position. + * + * Since set elements can be internally be stored as redis objects or + * simple arrays of integers, setTypeNext returns the encoding of the + * set object you are iterating, and will populate the appropriate pointer + * (eobj) or (llobj) accordingly. + * + * When there are no longer elements -1 is returned. + * Returned objects ref count is not incremented, so this function is + * copy on write friendly. */ +int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) { if (si->encoding == REDIS_ENCODING_HT) { dictEntry *de = dictNext(si->di); - if (de != NULL) { - ret = dictGetEntryKey(de); - incrRefCount(ret); - } + if (de == NULL) return -1; + *objele = dictGetKey(de); } else if (si->encoding == REDIS_ENCODING_INTSET) { - int64_t llval; - if (intsetGet(si->subject->ptr,si->ii++,&llval)) - ret = createStringObjectFromLongLong(llval); + if (!intsetGet(si->subject->ptr,si->ii++,llele)) + return -1; } - return ret; + return si->encoding; } +/* The not copy on write friendly version but easy to use version + * of setTypeNext() is setTypeNextObject(), returning new objects + * or incrementing the ref count of returned objects. So if you don't + * retain a pointer to this object you should call decrRefCount() against it. + * + * This function is the way to go for write operations where COW is not + * an issue as the result will be anyway of incrementing the ref count. */ +robj *setTypeNextObject(setTypeIterator *si) { + int64_t intele; + robj *objele; + int encoding; + + encoding = setTypeNext(si,&objele,&intele); + switch(encoding) { + case -1: return NULL; + case REDIS_ENCODING_INTSET: + return createStringObjectFromLongLong(intele); + case REDIS_ENCODING_HT: + incrRefCount(objele); + return objele; + default: + redisPanic("Unsupported encoding"); + } + return NULL; /* just to suppress warnings */ +} -/* Return random element from set. The returned object will always have - * an incremented refcount. */ -robj *setTypeRandomElement(robj *subject) { - robj *ret = NULL; - if (subject->encoding == REDIS_ENCODING_HT) { - dictEntry *de = dictGetRandomKey(subject->ptr); - ret = dictGetEntryKey(de); - incrRefCount(ret); - } else if (subject->encoding == REDIS_ENCODING_INTSET) { - long long llval = intsetRandom(subject->ptr); - ret = createStringObjectFromLongLong(llval); +/* Return random element from a non empty set. + * The returned element can be a int64_t value if the set is encoded + * as an "intset" blob of integers, or a redis object if the set + * is a regular set. + * + * The caller provides both pointers to be populated with the right + * object. The return value of the function is the object->encoding + * field of the object and is used by the caller to check if the + * int64_t pointer or the redis object pointere was populated. + * + * When an object is returned (the set was a real set) the ref count + * of the object is not incremented so this function can be considered + * copy on write friendly. */ +int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) { + if (setobj->encoding == REDIS_ENCODING_HT) { + dictEntry *de = dictGetRandomKey(setobj->ptr); + *objele = dictGetKey(de); + } else if (setobj->encoding == REDIS_ENCODING_INTSET) { + *llele = intsetRandom(setobj->ptr); } else { redisPanic("Unknown set encoding"); } - return ret; + return setobj->encoding; } unsigned long setTypeSize(robj *subject) { @@ -148,27 +187,32 @@ unsigned long setTypeSize(robj *subject) { } /* Convert the set to specified encoding. The resulting dict (when converting - * to a hashtable) is presized to hold the number of elements in the original + * to a hash table) is presized to hold the number of elements in the original * set. */ -void setTypeConvert(robj *subject, int enc) { +void setTypeConvert(robj *setobj, int enc) { setTypeIterator *si; - robj *element; - redisAssert(subject->type == REDIS_SET); + redisAssertWithInfo(NULL,setobj,setobj->type == REDIS_SET && + setobj->encoding == REDIS_ENCODING_INTSET); if (enc == REDIS_ENCODING_HT) { + int64_t intele; dict *d = dictCreate(&setDictType,NULL); + robj *element; + /* Presize the dict to avoid rehashing */ - dictExpand(d,intsetLen(subject->ptr)); + dictExpand(d,intsetLen(setobj->ptr)); - /* setTypeGet returns a robj with incremented refcount */ - si = setTypeInitIterator(subject); - while ((element = setTypeNext(si)) != NULL) - redisAssert(dictAdd(d,element,NULL) == DICT_OK); + /* To add the elements we extract integers and create redis objects */ + si = setTypeInitIterator(setobj); + while (setTypeNext(si,NULL,&intele) != -1) { + element = createStringObjectFromLongLong(intele); + redisAssertWithInfo(NULL,element,dictAdd(d,element,NULL) == DICT_OK); + } setTypeReleaseIterator(si); - subject->encoding = REDIS_ENCODING_HT; - zfree(subject->ptr); - subject->ptr = d; + setobj->encoding = REDIS_ENCODING_HT; + zfree(setobj->ptr); + setobj->ptr = d; } else { redisPanic("Unsupported set conversion"); } @@ -176,6 +220,7 @@ void setTypeConvert(robj *subject, int enc) { void saddCommand(redisClient *c) { robj *set; + int j, added = 0; set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { @@ -187,36 +232,44 @@ void saddCommand(redisClient *c) { return; } } - if (setTypeAdd(set,c->argv[2])) { - touchWatchedKey(c->db,c->argv[1]); - server.dirty++; - addReply(c,shared.cone); - } else { - addReply(c,shared.czero); + + for (j = 2; j < c->argc; j++) { + c->argv[j] = tryObjectEncoding(c->argv[j]); + if (setTypeAdd(set,c->argv[j])) added++; } + if (added) signalModifiedKey(c->db,c->argv[1]); + server.dirty += added; + addReplyLongLong(c,added); } void sremCommand(redisClient *c) { robj *set; + int j, deleted = 0; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; - if (setTypeRemove(set,c->argv[2])) { - if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - server.dirty++; - addReply(c,shared.cone); - } else { - addReply(c,shared.czero); + for (j = 2; j < c->argc; j++) { + if (setTypeRemove(set,c->argv[j])) { + deleted++; + if (setTypeSize(set) == 0) { + dbDelete(c->db,c->argv[1]); + break; + } + } + } + if (deleted) { + signalModifiedKey(c->db,c->argv[1]); + server.dirty += deleted; } + addReplyLongLong(c,deleted); } void smoveCommand(redisClient *c) { robj *srcset, *dstset, *ele; srcset = lookupKeyWrite(c->db,c->argv[1]); dstset = lookupKeyWrite(c->db,c->argv[2]); - ele = c->argv[3]; + ele = c->argv[3] = tryObjectEncoding(c->argv[3]); /* If the source key does not exist return 0 */ if (srcset == NULL) { @@ -243,8 +296,8 @@ void smoveCommand(redisClient *c) { /* Remove the src set from the database when empty */ if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[2]); + signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[2]); server.dirty++; /* Create the destination set when it doesn't exist */ @@ -264,6 +317,7 @@ void sismemberCommand(redisClient *c) { if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; + c->argv[2] = tryObjectEncoding(c->argv[2]); if (setTypeIsMember(set,c->argv[2])) addReply(c,shared.cone); else @@ -276,40 +330,205 @@ void scardCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_SET)) return; - addReplyUlong(c,setTypeSize(o)); + addReplyLongLong(c,setTypeSize(o)); } void spopCommand(redisClient *c) { - robj *set, *ele; + robj *set, *ele, *aux; + int64_t llele; + int encoding; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - ele = setTypeRandomElement(set); - if (ele == NULL) { - addReply(c,shared.nullbulk); + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + ele = createStringObjectFromLongLong(llele); + set->ptr = intsetRemove(set->ptr,llele,NULL); } else { + incrRefCount(ele); setTypeRemove(set,ele); - addReplyBulk(c,ele); - decrRefCount(ele); - if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - server.dirty++; + } + + /* Replicate/AOF this command as an SREM operation */ + aux = createStringObject("SREM",4); + rewriteClientCommandVector(c,3,aux,c->argv[1],ele); + decrRefCount(ele); + decrRefCount(aux); + + addReplyBulk(c,ele); + if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[1]); + server.dirty++; +} + +/* handle the "SRANDMEMBER key " variant. The normal version of the + * command is handled by the srandmemberCommand() function itself. */ + +/* How many times bigger should be the set compared to the requested size + * for us to don't use the "remove elements" strategy? Read later in the + * implementation for more info. */ +#define SRANDMEMBER_SUB_STRATEGY_MUL 3 + +void srandmemberWithCountCommand(redisClient *c) { + long l; + unsigned long count, size; + int uniq = 1; + robj *set, *ele; + int64_t llele; + int encoding; + + dict *d; + + if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != REDIS_OK) return; + if (l >= 0) { + count = (unsigned) l; + } else { + /* A negative count means: return the same elements multiple times + * (i.e. don't remove the extracted element after every extraction). */ + count = -l; + uniq = 0; + } + + if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) + == NULL || checkType(c,set,REDIS_SET)) return; + size = setTypeSize(set); + + /* If count is zero, serve it ASAP to avoid special cases later. */ + if (count == 0) { + addReply(c,shared.emptymultibulk); + return; + } + + /* CASE 1: The count was negative, so the extraction method is just: + * "return N random elements" sampling the whole set every time. + * This case is trivial and can be served without auxiliary data + * structures. */ + if (!uniq) { + addReplyMultiBulkLen(c,count); + while(count--) { + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + addReplyBulkLongLong(c,llele); + } else { + addReplyBulk(c,ele); + } + } + return; + } + + /* CASE 2: + * The number of requested elements is greater than the number of + * elements inside the set: simply return the whole set. */ + if (count >= size) { + sunionDiffGenericCommand(c,c->argv,c->argc-1,NULL,REDIS_OP_UNION); + return; + } + + /* For CASE 3 and CASE 4 we need an auxiliary dictionary. */ + d = dictCreate(&setDictType,NULL); + + /* CASE 3: + * The number of elements inside the set is not greater than + * SRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements. + * In this case we create a set from scratch with all the elements, and + * subtract random elements to reach the requested number of elements. + * + * This is done because if the number of requsted elements is just + * a bit less than the number of elements in the set, the natural approach + * used into CASE 3 is highly inefficient. */ + if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) { + setTypeIterator *si; + + /* Add all the elements into the temporary dictionary. */ + si = setTypeInitIterator(set); + while((encoding = setTypeNext(si,&ele,&llele)) != -1) { + int retval; + + if (encoding == REDIS_ENCODING_INTSET) { + retval = dictAdd(d,createStringObjectFromLongLong(llele),NULL); + } else if (ele->encoding == REDIS_ENCODING_RAW) { + retval = dictAdd(d,dupStringObject(ele),NULL); + } else if (ele->encoding == REDIS_ENCODING_INT) { + retval = dictAdd(d, + createStringObjectFromLongLong((long)ele->ptr),NULL); + } + redisAssert(retval == DICT_OK); + } + setTypeReleaseIterator(si); + redisAssert(dictSize(d) == size); + + /* Remove random elements to reach the right count. */ + while(size > count) { + dictEntry *de; + + de = dictGetRandomKey(d); + dictDelete(d,dictGetKey(de)); + size--; + } + } + + /* CASE 4: We have a big set compared to the requested number of elements. + * In this case we can simply get random elements from the set and add + * to the temporary set, trying to eventually get enough unique elements + * to reach the specified count. */ + else { + unsigned long added = 0; + + while(added < count) { + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + ele = createStringObjectFromLongLong(llele); + } else if (ele->encoding == REDIS_ENCODING_RAW) { + ele = dupStringObject(ele); + } else if (ele->encoding == REDIS_ENCODING_INT) { + ele = createStringObjectFromLongLong((long)ele->ptr); + } + /* Try to add the object to the dictionary. If it already exists + * free it, otherwise increment the number of objects we have + * in the result dictionary. */ + if (dictAdd(d,ele,NULL) == DICT_OK) + added++; + else + decrRefCount(ele); + } + } + + /* CASE 3 & 4: send the result to the user. */ + { + dictIterator *di; + dictEntry *de; + + addReplyMultiBulkLen(c,count); + di = dictGetIterator(d); + while((de = dictNext(di)) != NULL) + addReplyBulk(c,dictGetKey(de)); + dictReleaseIterator(di); + dictRelease(d); } } void srandmemberCommand(redisClient *c) { robj *set, *ele; + int64_t llele; + int encoding; + + if (c->argc == 3) { + srandmemberWithCountCommand(c); + return; + } else if (c->argc > 3) { + addReply(c,shared.syntaxerr); + return; + } if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - ele = setTypeRandomElement(set); - if (ele == NULL) { - addReply(c,shared.nullbulk); + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + addReplyBulkLongLong(c,llele); } else { addReplyBulk(c,ele); - decrRefCount(ele); } } @@ -320,8 +539,11 @@ int qsortCompareSetsByCardinality(const void *s1, const void *s2) { void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; - robj *ele, *lenobj = NULL, *dstset = NULL; + robj *eleobj, *dstset = NULL; + int64_t intobj; + void *replylen = NULL; unsigned long j, cardinality = 0; + int encoding; for (j = 0; j < setnum; j++) { robj *setobj = dstkey ? @@ -331,7 +553,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, zfree(sets); if (dstkey) { if (dbDelete(c->db,dstkey)) { - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } addReply(c,shared.czero); @@ -356,9 +578,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, * to the output list and save the pointer to later modify it with the * right length */ if (!dstkey) { - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); + replylen = addDeferredMultiBulkLength(c); } else { /* If we have a target key where to store the resulting set * create this key with an empty set inside */ @@ -369,20 +589,61 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, * the element against all the other sets, if at least one set does * not include the element it is discarded */ si = setTypeInitIterator(sets[0]); - while((ele = setTypeNext(si)) != NULL) { - for (j = 1; j < setnum; j++) - if (!setTypeIsMember(sets[j],ele)) break; + while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) { + for (j = 1; j < setnum; j++) { + if (sets[j] == sets[0]) continue; + if (encoding == REDIS_ENCODING_INTSET) { + /* intset with intset is simple... and fast */ + if (sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,intobj)) + { + break; + /* in order to compare an integer with an object we + * have to use the generic function, creating an object + * for this */ + } else if (sets[j]->encoding == REDIS_ENCODING_HT) { + eleobj = createStringObjectFromLongLong(intobj); + if (!setTypeIsMember(sets[j],eleobj)) { + decrRefCount(eleobj); + break; + } + decrRefCount(eleobj); + } + } else if (encoding == REDIS_ENCODING_HT) { + /* Optimization... if the source object is integer + * encoded AND the target set is an intset, we can get + * a much faster path. */ + if (eleobj->encoding == REDIS_ENCODING_INT && + sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr)) + { + break; + /* else... object to object check is easy as we use the + * type agnostic API here. */ + } else if (!setTypeIsMember(sets[j],eleobj)) { + break; + } + } + } /* Only take action when all sets contain the member */ if (j == setnum) { if (!dstkey) { - addReplyBulk(c,ele); + if (encoding == REDIS_ENCODING_HT) + addReplyBulk(c,eleobj); + else + addReplyBulkLongLong(c,intobj); cardinality++; } else { - setTypeAdd(dstset,ele); + if (encoding == REDIS_ENCODING_INTSET) { + eleobj = createStringObjectFromLongLong(intobj); + setTypeAdd(dstset,eleobj); + decrRefCount(eleobj); + } else { + setTypeAdd(dstset,eleobj); + } } } - decrRefCount(ele); } setTypeReleaseIterator(si); @@ -397,10 +658,10 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, decrRefCount(dstset); addReply(c,shared.czero); } - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } else { - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality); + setDeferredMultiBulkLength(c,replylen,cardinality); } zfree(sets); } @@ -450,7 +711,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * if (!sets[j]) continue; /* non existing keys are like empty sets */ si = setTypeInitIterator(sets[j]); - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { if (op == REDIS_OP_UNION || j == 0) { if (setTypeAdd(dstset,ele)) { cardinality++; @@ -470,9 +731,9 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * /* Output the content of the resulting set, if not in STORE mode */ if (!dstkey) { - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality)); + addReplyMultiBulkLen(c,cardinality); si = setTypeInitIterator(dstset); - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { addReplyBulk(c,ele); decrRefCount(ele); } @@ -489,7 +750,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * decrRefCount(dstset); addReply(c,shared.czero); } - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } zfree(sets);