X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/ec7e138926b7b587adc247e8c64da6d3b1706434..9ad853ccde374ed371efee3b1654c6ebb39e06fa:/src/t_set.c diff --git a/src/t_set.c b/src/t_set.c index 68e13227..7d0811ed 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -47,17 +47,17 @@ int setTypeAdd(robj *subject, robj *value) { return 0; } -int setTypeRemove(robj *subject, robj *value) { +int setTypeRemove(robj *setobj, robj *value) { long long llval; - if (subject->encoding == REDIS_ENCODING_HT) { - if (dictDelete(subject->ptr,value) == DICT_OK) { - if (htNeedsResize(subject->ptr)) dictResize(subject->ptr); + if (setobj->encoding == REDIS_ENCODING_HT) { + if (dictDelete(setobj->ptr,value) == DICT_OK) { + if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr); return 1; } - } else if (subject->encoding == REDIS_ENCODING_INTSET) { + } else if (setobj->encoding == REDIS_ENCODING_INTSET) { if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) { - uint8_t success; - subject->ptr = intsetRemove(subject->ptr,llval,&success); + int success; + setobj->ptr = intsetRemove(setobj->ptr,llval,&success); if (success) return 1; } } else { @@ -101,40 +101,77 @@ void setTypeReleaseIterator(setTypeIterator *si) { } /* Move to the next entry in the set. Returns the object at the current - * position, or NULL when the end is reached. This object will have its - * refcount incremented, so the caller needs to take care of this. */ -robj *setTypeNext(setTypeIterator *si) { - robj *ret = NULL; + * position. + * + * Since set elements can be internally be stored as redis objects or + * simple arrays of integers, setTypeNext returns the encoding of the + * set object you are iterating, and will populate the appropriate pointer + * (eobj) or (llobj) accordingly. + * + * When there are no longer elements -1 is returned. + * Returned objects ref count is not incremented, so this function is + * copy on write friendly. */ +int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) { if (si->encoding == REDIS_ENCODING_HT) { dictEntry *de = dictNext(si->di); - if (de != NULL) { - ret = dictGetEntryKey(de); - incrRefCount(ret); - } + if (de == NULL) return -1; + *objele = dictGetEntryKey(de); } else if (si->encoding == REDIS_ENCODING_INTSET) { - int64_t llval; - if (intsetGet(si->subject->ptr,si->ii++,&llval)) - ret = createStringObjectFromLongLong(llval); + if (!intsetGet(si->subject->ptr,si->ii++,llele)) + return -1; } - return ret; + return si->encoding; } +/* The not copy on write friendly version but easy to use version + * of setTypeNext() is setTypeNextObject(), returning new objects + * or incrementing the ref count of returned objects. So if you don't + * retain a pointer to this object you should call decrRefCount() against it. + * + * This function is the way to go for write operations where COW is not + * an issue as the result will be anyway of incrementing the ref count. */ +robj *setTypeNextObject(setTypeIterator *si) { + int64_t intele; + robj *objele; + int encoding; + + encoding = setTypeNext(si,&objele,&intele); + switch(encoding) { + case -1: return NULL; + case REDIS_ENCODING_INTSET: + return createStringObjectFromLongLong(intele); + case REDIS_ENCODING_HT: + incrRefCount(objele); + return objele; + default: + redisPanic("Unsupported encoding"); + } + return NULL; /* just to suppress warnings */ +} -/* Return random element from set. The returned object will always have - * an incremented refcount. */ -robj *setTypeRandomElement(robj *subject) { - robj *ret = NULL; - if (subject->encoding == REDIS_ENCODING_HT) { - dictEntry *de = dictGetRandomKey(subject->ptr); - ret = dictGetEntryKey(de); - incrRefCount(ret); - } else if (subject->encoding == REDIS_ENCODING_INTSET) { - long long llval = intsetRandom(subject->ptr); - ret = createStringObjectFromLongLong(llval); +/* Return random element from a non empty set. + * The returned element can be a int64_t value if the set is encoded + * as an "intset" blob of integers, or a redis object if the set + * is a regular set. + * + * The caller provides both pointers to be populated with the right + * object. The return value of the function is the object->encoding + * field of the object and is used by the caller to check if the + * int64_t pointer or the redis object pointere was populated. + * + * When an object is returned (the set was a real set) the ref count + * of the object is not incremented so this function can be considered + * copy on write friendly. */ +int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) { + if (setobj->encoding == REDIS_ENCODING_HT) { + dictEntry *de = dictGetRandomKey(setobj->ptr); + *objele = dictGetEntryKey(de); + } else if (setobj->encoding == REDIS_ENCODING_INTSET) { + *llele = intsetRandom(setobj->ptr); } else { redisPanic("Unknown set encoding"); } - return ret; + return setobj->encoding; } unsigned long setTypeSize(robj *subject) { @@ -150,25 +187,30 @@ unsigned long setTypeSize(robj *subject) { /* Convert the set to specified encoding. The resulting dict (when converting * to a hashtable) is presized to hold the number of elements in the original * set. */ -void setTypeConvert(robj *subject, int enc) { +void setTypeConvert(robj *setobj, int enc) { setTypeIterator *si; - robj *element; - redisAssert(subject->type == REDIS_SET); + redisAssert(setobj->type == REDIS_SET && + setobj->encoding == REDIS_ENCODING_INTSET); if (enc == REDIS_ENCODING_HT) { + int64_t intele; dict *d = dictCreate(&setDictType,NULL); + robj *element; + /* Presize the dict to avoid rehashing */ - dictExpand(d,intsetLen(subject->ptr)); + dictExpand(d,intsetLen(setobj->ptr)); - /* setTypeGet returns a robj with incremented refcount */ - si = setTypeInitIterator(subject); - while ((element = setTypeNext(si)) != NULL) + /* To add the elements we extract integers and create redis objects */ + si = setTypeInitIterator(setobj); + while (setTypeNext(si,NULL,&intele) != -1) { + element = createStringObjectFromLongLong(intele); redisAssert(dictAdd(d,element,NULL) == DICT_OK); + } setTypeReleaseIterator(si); - subject->encoding = REDIS_ENCODING_HT; - zfree(subject->ptr); - subject->ptr = d; + setobj->encoding = REDIS_ENCODING_HT; + zfree(setobj->ptr); + setobj->ptr = d; } else { redisPanic("Unsupported set conversion"); } @@ -178,6 +220,7 @@ void saddCommand(redisClient *c) { robj *set; set = lookupKeyWrite(c->db,c->argv[1]); + c->argv[2] = tryObjectEncoding(c->argv[2]); if (set == NULL) { set = setTypeCreate(c->argv[2]); dbAdd(c->db,c->argv[1],set); @@ -188,7 +231,7 @@ void saddCommand(redisClient *c) { } } if (setTypeAdd(set,c->argv[2])) { - touchWatchedKey(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[1]); server.dirty++; addReply(c,shared.cone); } else { @@ -202,9 +245,10 @@ void sremCommand(redisClient *c) { if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; + c->argv[2] = tryObjectEncoding(c->argv[2]); if (setTypeRemove(set,c->argv[2])) { if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[1]); server.dirty++; addReply(c,shared.cone); } else { @@ -216,7 +260,7 @@ void smoveCommand(redisClient *c) { robj *srcset, *dstset, *ele; srcset = lookupKeyWrite(c->db,c->argv[1]); dstset = lookupKeyWrite(c->db,c->argv[2]); - ele = c->argv[3]; + ele = c->argv[3] = tryObjectEncoding(c->argv[3]); /* If the source key does not exist return 0 */ if (srcset == NULL) { @@ -243,8 +287,8 @@ void smoveCommand(redisClient *c) { /* Remove the src set from the database when empty */ if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[2]); + signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[2]); server.dirty++; /* Create the destination set when it doesn't exist */ @@ -264,6 +308,7 @@ void sismemberCommand(redisClient *c) { if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; + c->argv[2] = tryObjectEncoding(c->argv[2]); if (setTypeIsMember(set,c->argv[2])) addReply(c,shared.cone); else @@ -276,40 +321,56 @@ void scardCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_SET)) return; - addReplyUlong(c,setTypeSize(o)); + addReplyLongLong(c,setTypeSize(o)); } void spopCommand(redisClient *c) { robj *set, *ele; + int64_t llele; + int encoding; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - ele = setTypeRandomElement(set); - if (ele == NULL) { - addReply(c,shared.nullbulk); + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + ele = createStringObjectFromLongLong(llele); + set->ptr = intsetRemove(set->ptr,llele,NULL); } else { + incrRefCount(ele); setTypeRemove(set,ele); - addReplyBulk(c,ele); - decrRefCount(ele); - if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - server.dirty++; } + + /* Change argv to replicate as SREM */ + c->argc = 3; + c->argv = zrealloc(c->argv,sizeof(robj*)*(c->argc)); + + /* Overwrite SREM with SPOP (same length) */ + redisAssert(sdslen(c->argv[0]->ptr) == 4); + memcpy(c->argv[0]->ptr, "SREM", 4); + + /* Popped element already has incremented refcount */ + c->argv[2] = ele; + + addReplyBulk(c,ele); + if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[1]); + server.dirty++; } void srandmemberCommand(redisClient *c) { robj *set, *ele; + int64_t llele; + int encoding; if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - ele = setTypeRandomElement(set); - if (ele == NULL) { - addReply(c,shared.nullbulk); + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + addReplyBulkLongLong(c,llele); } else { addReplyBulk(c,ele); - decrRefCount(ele); } } @@ -320,8 +381,11 @@ int qsortCompareSetsByCardinality(const void *s1, const void *s2) { void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; - robj *ele, *lenobj = NULL, *dstset = NULL; + robj *eleobj, *dstset = NULL; + int64_t intobj; + void *replylen = NULL; unsigned long j, cardinality = 0; + int encoding; for (j = 0; j < setnum; j++) { robj *setobj = dstkey ? @@ -331,7 +395,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, zfree(sets); if (dstkey) { if (dbDelete(c->db,dstkey)) { - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } addReply(c,shared.czero); @@ -356,9 +420,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, * to the output list and save the pointer to later modify it with the * right length */ if (!dstkey) { - lenobj = createObject(REDIS_STRING,NULL); - addReply(c,lenobj); - decrRefCount(lenobj); + replylen = addDeferredMultiBulkLength(c); } else { /* If we have a target key where to store the resulting set * create this key with an empty set inside */ @@ -369,20 +431,60 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, * the element against all the other sets, if at least one set does * not include the element it is discarded */ si = setTypeInitIterator(sets[0]); - while((ele = setTypeNext(si)) != NULL) { - for (j = 1; j < setnum; j++) - if (!setTypeIsMember(sets[j],ele)) break; + while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) { + for (j = 1; j < setnum; j++) { + if (encoding == REDIS_ENCODING_INTSET) { + /* intset with intset is simple... and fast */ + if (sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,intobj)) + { + break; + /* in order to compare an integer with an object we + * have to use the generic function, creating an object + * for this */ + } else if (sets[j]->encoding == REDIS_ENCODING_HT) { + eleobj = createStringObjectFromLongLong(intobj); + if (!setTypeIsMember(sets[j],eleobj)) { + decrRefCount(eleobj); + break; + } + decrRefCount(eleobj); + } + } else if (encoding == REDIS_ENCODING_HT) { + /* Optimization... if the source object is integer + * encoded AND the target set is an intset, we can get + * a much faster path. */ + if (eleobj->encoding == REDIS_ENCODING_INT && + sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr)) + { + break; + /* else... object to object check is easy as we use the + * type agnostic API here. */ + } else if (!setTypeIsMember(sets[j],eleobj)) { + break; + } + } + } /* Only take action when all sets contain the member */ if (j == setnum) { if (!dstkey) { - addReplyBulk(c,ele); + if (encoding == REDIS_ENCODING_HT) + addReplyBulk(c,eleobj); + else + addReplyBulkLongLong(c,intobj); cardinality++; } else { - setTypeAdd(dstset,ele); + if (encoding == REDIS_ENCODING_INTSET) { + eleobj = createStringObjectFromLongLong(intobj); + setTypeAdd(dstset,eleobj); + decrRefCount(eleobj); + } else { + setTypeAdd(dstset,eleobj); + } } } - decrRefCount(ele); } setTypeReleaseIterator(si); @@ -397,10 +499,10 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, decrRefCount(dstset); addReply(c,shared.czero); } - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } else { - lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality); + setDeferredMultiBulkLength(c,replylen,cardinality); } zfree(sets); } @@ -450,7 +552,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * if (!sets[j]) continue; /* non existing keys are like empty sets */ si = setTypeInitIterator(sets[j]); - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { if (op == REDIS_OP_UNION || j == 0) { if (setTypeAdd(dstset,ele)) { cardinality++; @@ -470,9 +572,9 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * /* Output the content of the resulting set, if not in STORE mode */ if (!dstkey) { - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality)); + addReplyMultiBulkLen(c,cardinality); si = setTypeInitIterator(dstset); - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { addReplyBulk(c,ele); decrRefCount(ele); } @@ -489,7 +591,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * decrRefCount(dstset); addReply(c,shared.czero); } - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } zfree(sets);