X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/5171777bf112a6a930c4750949037ce741f3bf28..5244d6e54ec08666f953124739a498d0537a2bf9:/src/t_set.c diff --git a/src/t_set.c b/src/t_set.c index e2ac5ae5..3cf1cf00 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -37,7 +37,7 @@ int setTypeAdd(robj *subject, robj *value) { /* The set *was* an intset and this value is not integer * encodable, so dictAdd should always work. */ - redisAssert(dictAdd(subject->ptr,value,NULL) == DICT_OK); + redisAssertWithInfo(NULL,value,dictAdd(subject->ptr,value,NULL) == DICT_OK); incrRefCount(value); return 1; } @@ -47,17 +47,17 @@ int setTypeAdd(robj *subject, robj *value) { return 0; } -int setTypeRemove(robj *subject, robj *value) { +int setTypeRemove(robj *setobj, robj *value) { long long llval; - if (subject->encoding == REDIS_ENCODING_HT) { - if (dictDelete(subject->ptr,value) == DICT_OK) { - if (htNeedsResize(subject->ptr)) dictResize(subject->ptr); + if (setobj->encoding == REDIS_ENCODING_HT) { + if (dictDelete(setobj->ptr,value) == DICT_OK) { + if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr); return 1; } - } else if (subject->encoding == REDIS_ENCODING_INTSET) { + } else if (setobj->encoding == REDIS_ENCODING_INTSET) { if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) { - uint8_t success; - subject->ptr = intsetRemove(subject->ptr,llval,&success); + int success; + setobj->ptr = intsetRemove(setobj->ptr,llval,&success); if (success) return 1; } } else { @@ -101,40 +101,77 @@ void setTypeReleaseIterator(setTypeIterator *si) { } /* Move to the next entry in the set. Returns the object at the current - * position, or NULL when the end is reached. This object will have its - * refcount incremented, so the caller needs to take care of this. */ -robj *setTypeNext(setTypeIterator *si) { - robj *ret = NULL; + * position. + * + * Since set elements can be internally be stored as redis objects or + * simple arrays of integers, setTypeNext returns the encoding of the + * set object you are iterating, and will populate the appropriate pointer + * (eobj) or (llobj) accordingly. + * + * When there are no longer elements -1 is returned. + * Returned objects ref count is not incremented, so this function is + * copy on write friendly. */ +int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) { if (si->encoding == REDIS_ENCODING_HT) { dictEntry *de = dictNext(si->di); - if (de != NULL) { - ret = dictGetEntryKey(de); - incrRefCount(ret); - } + if (de == NULL) return -1; + *objele = dictGetKey(de); } else if (si->encoding == REDIS_ENCODING_INTSET) { - int64_t llval; - if (intsetGet(si->subject->ptr,si->ii++,&llval)) - ret = createStringObjectFromLongLong(llval); + if (!intsetGet(si->subject->ptr,si->ii++,llele)) + return -1; } - return ret; + return si->encoding; } +/* The not copy on write friendly version but easy to use version + * of setTypeNext() is setTypeNextObject(), returning new objects + * or incrementing the ref count of returned objects. So if you don't + * retain a pointer to this object you should call decrRefCount() against it. + * + * This function is the way to go for write operations where COW is not + * an issue as the result will be anyway of incrementing the ref count. */ +robj *setTypeNextObject(setTypeIterator *si) { + int64_t intele; + robj *objele; + int encoding; + + encoding = setTypeNext(si,&objele,&intele); + switch(encoding) { + case -1: return NULL; + case REDIS_ENCODING_INTSET: + return createStringObjectFromLongLong(intele); + case REDIS_ENCODING_HT: + incrRefCount(objele); + return objele; + default: + redisPanic("Unsupported encoding"); + } + return NULL; /* just to suppress warnings */ +} -/* Return random element from set. The returned object will always have - * an incremented refcount. */ -robj *setTypeRandomElement(robj *subject) { - robj *ret = NULL; - if (subject->encoding == REDIS_ENCODING_HT) { - dictEntry *de = dictGetRandomKey(subject->ptr); - ret = dictGetEntryKey(de); - incrRefCount(ret); - } else if (subject->encoding == REDIS_ENCODING_INTSET) { - long long llval = intsetRandom(subject->ptr); - ret = createStringObjectFromLongLong(llval); +/* Return random element from a non empty set. + * The returned element can be a int64_t value if the set is encoded + * as an "intset" blob of integers, or a redis object if the set + * is a regular set. + * + * The caller provides both pointers to be populated with the right + * object. The return value of the function is the object->encoding + * field of the object and is used by the caller to check if the + * int64_t pointer or the redis object pointere was populated. + * + * When an object is returned (the set was a real set) the ref count + * of the object is not incremented so this function can be considered + * copy on write friendly. */ +int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) { + if (setobj->encoding == REDIS_ENCODING_HT) { + dictEntry *de = dictGetRandomKey(setobj->ptr); + *objele = dictGetKey(de); + } else if (setobj->encoding == REDIS_ENCODING_INTSET) { + *llele = intsetRandom(setobj->ptr); } else { redisPanic("Unknown set encoding"); } - return ret; + return setobj->encoding; } unsigned long setTypeSize(robj *subject) { @@ -150,25 +187,30 @@ unsigned long setTypeSize(robj *subject) { /* Convert the set to specified encoding. The resulting dict (when converting * to a hashtable) is presized to hold the number of elements in the original * set. */ -void setTypeConvert(robj *subject, int enc) { +void setTypeConvert(robj *setobj, int enc) { setTypeIterator *si; - robj *element; - redisAssert(subject->type == REDIS_SET); + redisAssertWithInfo(NULL,setobj,setobj->type == REDIS_SET && + setobj->encoding == REDIS_ENCODING_INTSET); if (enc == REDIS_ENCODING_HT) { + int64_t intele; dict *d = dictCreate(&setDictType,NULL); + robj *element; + /* Presize the dict to avoid rehashing */ - dictExpand(d,intsetLen(subject->ptr)); + dictExpand(d,intsetLen(setobj->ptr)); - /* setTypeGet returns a robj with incremented refcount */ - si = setTypeInitIterator(subject); - while ((element = setTypeNext(si)) != NULL) - redisAssert(dictAdd(d,element,NULL) == DICT_OK); + /* To add the elements we extract integers and create redis objects */ + si = setTypeInitIterator(setobj); + while (setTypeNext(si,NULL,&intele) != -1) { + element = createStringObjectFromLongLong(intele); + redisAssertWithInfo(NULL,element,dictAdd(d,element,NULL) == DICT_OK); + } setTypeReleaseIterator(si); - subject->encoding = REDIS_ENCODING_HT; - zfree(subject->ptr); - subject->ptr = d; + setobj->encoding = REDIS_ENCODING_HT; + zfree(setobj->ptr); + setobj->ptr = d; } else { redisPanic("Unsupported set conversion"); } @@ -176,6 +218,7 @@ void setTypeConvert(robj *subject, int enc) { void saddCommand(redisClient *c) { robj *set; + int j, added = 0; set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { @@ -187,36 +230,44 @@ void saddCommand(redisClient *c) { return; } } - if (setTypeAdd(set,c->argv[2])) { - touchWatchedKey(c->db,c->argv[1]); - server.dirty++; - addReply(c,shared.cone); - } else { - addReply(c,shared.czero); + + for (j = 2; j < c->argc; j++) { + c->argv[j] = tryObjectEncoding(c->argv[j]); + if (setTypeAdd(set,c->argv[j])) added++; } + if (added) signalModifiedKey(c->db,c->argv[1]); + server.dirty += added; + addReplyLongLong(c,added); } void sremCommand(redisClient *c) { robj *set; + int j, deleted = 0; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; - if (setTypeRemove(set,c->argv[2])) { - if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - server.dirty++; - addReply(c,shared.cone); - } else { - addReply(c,shared.czero); + for (j = 2; j < c->argc; j++) { + if (setTypeRemove(set,c->argv[j])) { + deleted++; + if (setTypeSize(set) == 0) { + dbDelete(c->db,c->argv[1]); + break; + } + } + } + if (deleted) { + signalModifiedKey(c->db,c->argv[1]); + server.dirty += deleted; } + addReplyLongLong(c,deleted); } void smoveCommand(redisClient *c) { robj *srcset, *dstset, *ele; srcset = lookupKeyWrite(c->db,c->argv[1]); dstset = lookupKeyWrite(c->db,c->argv[2]); - ele = c->argv[3]; + ele = c->argv[3] = tryObjectEncoding(c->argv[3]); /* If the source key does not exist return 0 */ if (srcset == NULL) { @@ -243,8 +294,8 @@ void smoveCommand(redisClient *c) { /* Remove the src set from the database when empty */ if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[2]); + signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[2]); server.dirty++; /* Create the destination set when it doesn't exist */ @@ -264,6 +315,7 @@ void sismemberCommand(redisClient *c) { if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; + c->argv[2] = tryObjectEncoding(c->argv[2]); if (setTypeIsMember(set,c->argv[2])) addReply(c,shared.cone); else @@ -280,36 +332,47 @@ void scardCommand(redisClient *c) { } void spopCommand(redisClient *c) { - robj *set, *ele; + robj *set, *ele, *aux; + int64_t llele; + int encoding; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - ele = setTypeRandomElement(set); - if (ele == NULL) { - addReply(c,shared.nullbulk); + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + ele = createStringObjectFromLongLong(llele); + set->ptr = intsetRemove(set->ptr,llele,NULL); } else { + incrRefCount(ele); setTypeRemove(set,ele); - addReplyBulk(c,ele); - decrRefCount(ele); - if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - server.dirty++; } + + /* Replicate/AOF this command as an SREM operation */ + aux = createStringObject("SREM",4); + rewriteClientCommandVector(c,3,aux,c->argv[1],ele); + decrRefCount(ele); + decrRefCount(aux); + + addReplyBulk(c,ele); + if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[1]); + server.dirty++; } void srandmemberCommand(redisClient *c) { robj *set, *ele; + int64_t llele; + int encoding; if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - ele = setTypeRandomElement(set); - if (ele == NULL) { - addReply(c,shared.nullbulk); + encoding = setTypeRandomElement(set,&ele,&llele); + if (encoding == REDIS_ENCODING_INTSET) { + addReplyBulkLongLong(c,llele); } else { addReplyBulk(c,ele); - decrRefCount(ele); } } @@ -320,9 +383,11 @@ int qsortCompareSetsByCardinality(const void *s1, const void *s2) { void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; - robj *ele, *dstset = NULL; + robj *eleobj, *dstset = NULL; + int64_t intobj; void *replylen = NULL; unsigned long j, cardinality = 0; + int encoding; for (j = 0; j < setnum; j++) { robj *setobj = dstkey ? @@ -332,7 +397,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, zfree(sets); if (dstkey) { if (dbDelete(c->db,dstkey)) { - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } addReply(c,shared.czero); @@ -368,20 +433,61 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, * the element against all the other sets, if at least one set does * not include the element it is discarded */ si = setTypeInitIterator(sets[0]); - while((ele = setTypeNext(si)) != NULL) { - for (j = 1; j < setnum; j++) - if (!setTypeIsMember(sets[j],ele)) break; + while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) { + for (j = 1; j < setnum; j++) { + if (sets[j] == sets[0]) continue; + if (encoding == REDIS_ENCODING_INTSET) { + /* intset with intset is simple... and fast */ + if (sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,intobj)) + { + break; + /* in order to compare an integer with an object we + * have to use the generic function, creating an object + * for this */ + } else if (sets[j]->encoding == REDIS_ENCODING_HT) { + eleobj = createStringObjectFromLongLong(intobj); + if (!setTypeIsMember(sets[j],eleobj)) { + decrRefCount(eleobj); + break; + } + decrRefCount(eleobj); + } + } else if (encoding == REDIS_ENCODING_HT) { + /* Optimization... if the source object is integer + * encoded AND the target set is an intset, we can get + * a much faster path. */ + if (eleobj->encoding == REDIS_ENCODING_INT && + sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr)) + { + break; + /* else... object to object check is easy as we use the + * type agnostic API here. */ + } else if (!setTypeIsMember(sets[j],eleobj)) { + break; + } + } + } /* Only take action when all sets contain the member */ if (j == setnum) { if (!dstkey) { - addReplyBulk(c,ele); + if (encoding == REDIS_ENCODING_HT) + addReplyBulk(c,eleobj); + else + addReplyBulkLongLong(c,intobj); cardinality++; } else { - setTypeAdd(dstset,ele); + if (encoding == REDIS_ENCODING_INTSET) { + eleobj = createStringObjectFromLongLong(intobj); + setTypeAdd(dstset,eleobj); + decrRefCount(eleobj); + } else { + setTypeAdd(dstset,eleobj); + } } } - decrRefCount(ele); } setTypeReleaseIterator(si); @@ -396,7 +502,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, decrRefCount(dstset); addReply(c,shared.czero); } - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } else { setDeferredMultiBulkLength(c,replylen,cardinality); @@ -449,7 +555,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * if (!sets[j]) continue; /* non existing keys are like empty sets */ si = setTypeInitIterator(sets[j]); - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { if (op == REDIS_OP_UNION || j == 0) { if (setTypeAdd(dstset,ele)) { cardinality++; @@ -471,7 +577,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * if (!dstkey) { addReplyMultiBulkLen(c,cardinality); si = setTypeInitIterator(dstset); - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { addReplyBulk(c,ele); decrRefCount(ele); } @@ -488,7 +594,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * decrRefCount(dstset); addReply(c,shared.czero); } - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } zfree(sets);