X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/a5be65f71c927601260f4518236cbc7bc3d87965..e69e76d44217c33c164e96725e941d43ad9a136c:/src/t_set.c diff --git a/src/t_set.c b/src/t_set.c index e15952c0..3cf1cf00 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -37,7 +37,7 @@ int setTypeAdd(robj *subject, robj *value) { /* The set *was* an intset and this value is not integer * encodable, so dictAdd should always work. */ - redisAssert(dictAdd(subject->ptr,value,NULL) == DICT_OK); + redisAssertWithInfo(NULL,value,dictAdd(subject->ptr,value,NULL) == DICT_OK); incrRefCount(value); return 1; } @@ -101,42 +101,71 @@ void setTypeReleaseIterator(setTypeIterator *si) { } /* Move to the next entry in the set. Returns the object at the current - * position, or NULL when the end is reached. This object will have its - * refcount incremented, so the caller needs to take care of this. */ -robj *setTypeNext(setTypeIterator *si) { - robj *ret = NULL; + * position. + * + * Since set elements can be internally be stored as redis objects or + * simple arrays of integers, setTypeNext returns the encoding of the + * set object you are iterating, and will populate the appropriate pointer + * (eobj) or (llobj) accordingly. + * + * When there are no longer elements -1 is returned. + * Returned objects ref count is not incremented, so this function is + * copy on write friendly. */ +int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) { if (si->encoding == REDIS_ENCODING_HT) { dictEntry *de = dictNext(si->di); - if (de != NULL) { - ret = dictGetEntryKey(de); - incrRefCount(ret); - } + if (de == NULL) return -1; + *objele = dictGetKey(de); } else if (si->encoding == REDIS_ENCODING_INTSET) { - int64_t llval; - if (intsetGet(si->subject->ptr,si->ii++,&llval)) - ret = createStringObjectFromLongLong(llval); + if (!intsetGet(si->subject->ptr,si->ii++,llele)) + return -1; } - return ret; + return si->encoding; } +/* The not copy on write friendly version but easy to use version + * of setTypeNext() is setTypeNextObject(), returning new objects + * or incrementing the ref count of returned objects. So if you don't + * retain a pointer to this object you should call decrRefCount() against it. + * + * This function is the way to go for write operations where COW is not + * an issue as the result will be anyway of incrementing the ref count. */ +robj *setTypeNextObject(setTypeIterator *si) { + int64_t intele; + robj *objele; + int encoding; + + encoding = setTypeNext(si,&objele,&intele); + switch(encoding) { + case -1: return NULL; + case REDIS_ENCODING_INTSET: + return createStringObjectFromLongLong(intele); + case REDIS_ENCODING_HT: + incrRefCount(objele); + return objele; + default: + redisPanic("Unsupported encoding"); + } + return NULL; /* just to suppress warnings */ +} /* Return random element from a non empty set. - * The returned element can be a long long value if the set is encoded + * The returned element can be a int64_t value if the set is encoded * as an "intset" blob of integers, or a redis object if the set * is a regular set. * * The caller provides both pointers to be populated with the right * object. The return value of the function is the object->encoding * field of the object and is used by the caller to check if the - * long long pointer or the redis object pointere was populated. + * int64_t pointer or the redis object pointere was populated. * * When an object is returned (the set was a real set) the ref count * of the object is not incremented so this function can be considered - * copy-on-write friendly. */ -int setTypeRandomElement(robj *setobj, robj **objele, long long *llele) { + * copy on write friendly. */ +int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) { if (setobj->encoding == REDIS_ENCODING_HT) { dictEntry *de = dictGetRandomKey(setobj->ptr); - *objele = dictGetEntryKey(de); + *objele = dictGetKey(de); } else if (setobj->encoding == REDIS_ENCODING_INTSET) { *llele = intsetRandom(setobj->ptr); } else { @@ -158,25 +187,30 @@ unsigned long setTypeSize(robj *subject) { /* Convert the set to specified encoding. The resulting dict (when converting * to a hashtable) is presized to hold the number of elements in the original * set. */ -void setTypeConvert(robj *subject, int enc) { +void setTypeConvert(robj *setobj, int enc) { setTypeIterator *si; - robj *element; - redisAssert(subject->type == REDIS_SET); + redisAssertWithInfo(NULL,setobj,setobj->type == REDIS_SET && + setobj->encoding == REDIS_ENCODING_INTSET); if (enc == REDIS_ENCODING_HT) { + int64_t intele; dict *d = dictCreate(&setDictType,NULL); + robj *element; + /* Presize the dict to avoid rehashing */ - dictExpand(d,intsetLen(subject->ptr)); + dictExpand(d,intsetLen(setobj->ptr)); - /* setTypeGet returns a robj with incremented refcount */ - si = setTypeInitIterator(subject); - while ((element = setTypeNext(si)) != NULL) - redisAssert(dictAdd(d,element,NULL) == DICT_OK); + /* To add the elements we extract integers and create redis objects */ + si = setTypeInitIterator(setobj); + while (setTypeNext(si,NULL,&intele) != -1) { + element = createStringObjectFromLongLong(intele); + redisAssertWithInfo(NULL,element,dictAdd(d,element,NULL) == DICT_OK); + } setTypeReleaseIterator(si); - subject->encoding = REDIS_ENCODING_HT; - zfree(subject->ptr); - subject->ptr = d; + setobj->encoding = REDIS_ENCODING_HT; + zfree(setobj->ptr); + setobj->ptr = d; } else { redisPanic("Unsupported set conversion"); } @@ -184,9 +218,9 @@ void setTypeConvert(robj *subject, int enc) { void saddCommand(redisClient *c) { robj *set; + int j, added = 0; set = lookupKeyWrite(c->db,c->argv[1]); - c->argv[2] = tryObjectEncoding(c->argv[2]); if (set == NULL) { set = setTypeCreate(c->argv[2]); dbAdd(c->db,c->argv[1],set); @@ -196,30 +230,37 @@ void saddCommand(redisClient *c) { return; } } - if (setTypeAdd(set,c->argv[2])) { - touchWatchedKey(c->db,c->argv[1]); - server.dirty++; - addReply(c,shared.cone); - } else { - addReply(c,shared.czero); + + for (j = 2; j < c->argc; j++) { + c->argv[j] = tryObjectEncoding(c->argv[j]); + if (setTypeAdd(set,c->argv[j])) added++; } + if (added) signalModifiedKey(c->db,c->argv[1]); + server.dirty += added; + addReplyLongLong(c,added); } void sremCommand(redisClient *c) { robj *set; + int j, deleted = 0; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; - c->argv[2] = tryObjectEncoding(c->argv[2]); - if (setTypeRemove(set,c->argv[2])) { - if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - server.dirty++; - addReply(c,shared.cone); - } else { - addReply(c,shared.czero); + for (j = 2; j < c->argc; j++) { + if (setTypeRemove(set,c->argv[j])) { + deleted++; + if (setTypeSize(set) == 0) { + dbDelete(c->db,c->argv[1]); + break; + } + } } + if (deleted) { + signalModifiedKey(c->db,c->argv[1]); + server.dirty += deleted; + } + addReplyLongLong(c,deleted); } void smoveCommand(redisClient *c) { @@ -253,8 +294,8 @@ void smoveCommand(redisClient *c) { /* Remove the src set from the database when empty */ if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[2]); + signalModifiedKey(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[2]); server.dirty++; /* Create the destination set when it doesn't exist */ @@ -291,8 +332,8 @@ void scardCommand(redisClient *c) { } void spopCommand(redisClient *c) { - robj *set, *ele; - long long llele; + robj *set, *ele, *aux; + int64_t llele; int encoding; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || @@ -300,20 +341,28 @@ void spopCommand(redisClient *c) { encoding = setTypeRandomElement(set,&ele,&llele); if (encoding == REDIS_ENCODING_INTSET) { - addReplyBulkLongLong(c,llele); + ele = createStringObjectFromLongLong(llele); set->ptr = intsetRemove(set->ptr,llele,NULL); } else { - addReplyBulk(c,ele); + incrRefCount(ele); setTypeRemove(set,ele); } + + /* Replicate/AOF this command as an SREM operation */ + aux = createStringObject("SREM",4); + rewriteClientCommandVector(c,3,aux,c->argv[1],ele); + decrRefCount(ele); + decrRefCount(aux); + + addReplyBulk(c,ele); if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); - touchWatchedKey(c->db,c->argv[1]); + signalModifiedKey(c->db,c->argv[1]); server.dirty++; } void srandmemberCommand(redisClient *c) { robj *set, *ele; - long long llele; + int64_t llele; int encoding; if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || @@ -334,9 +383,11 @@ int qsortCompareSetsByCardinality(const void *s1, const void *s2) { void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) { robj **sets = zmalloc(sizeof(robj*)*setnum); setTypeIterator *si; - robj *ele, *dstset = NULL; + robj *eleobj, *dstset = NULL; + int64_t intobj; void *replylen = NULL; unsigned long j, cardinality = 0; + int encoding; for (j = 0; j < setnum; j++) { robj *setobj = dstkey ? @@ -346,7 +397,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, zfree(sets); if (dstkey) { if (dbDelete(c->db,dstkey)) { - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } addReply(c,shared.czero); @@ -382,20 +433,61 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, * the element against all the other sets, if at least one set does * not include the element it is discarded */ si = setTypeInitIterator(sets[0]); - while((ele = setTypeNext(si)) != NULL) { - for (j = 1; j < setnum; j++) - if (!setTypeIsMember(sets[j],ele)) break; + while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) { + for (j = 1; j < setnum; j++) { + if (sets[j] == sets[0]) continue; + if (encoding == REDIS_ENCODING_INTSET) { + /* intset with intset is simple... and fast */ + if (sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,intobj)) + { + break; + /* in order to compare an integer with an object we + * have to use the generic function, creating an object + * for this */ + } else if (sets[j]->encoding == REDIS_ENCODING_HT) { + eleobj = createStringObjectFromLongLong(intobj); + if (!setTypeIsMember(sets[j],eleobj)) { + decrRefCount(eleobj); + break; + } + decrRefCount(eleobj); + } + } else if (encoding == REDIS_ENCODING_HT) { + /* Optimization... if the source object is integer + * encoded AND the target set is an intset, we can get + * a much faster path. */ + if (eleobj->encoding == REDIS_ENCODING_INT && + sets[j]->encoding == REDIS_ENCODING_INTSET && + !intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr)) + { + break; + /* else... object to object check is easy as we use the + * type agnostic API here. */ + } else if (!setTypeIsMember(sets[j],eleobj)) { + break; + } + } + } /* Only take action when all sets contain the member */ if (j == setnum) { if (!dstkey) { - addReplyBulk(c,ele); + if (encoding == REDIS_ENCODING_HT) + addReplyBulk(c,eleobj); + else + addReplyBulkLongLong(c,intobj); cardinality++; } else { - setTypeAdd(dstset,ele); + if (encoding == REDIS_ENCODING_INTSET) { + eleobj = createStringObjectFromLongLong(intobj); + setTypeAdd(dstset,eleobj); + decrRefCount(eleobj); + } else { + setTypeAdd(dstset,eleobj); + } } } - decrRefCount(ele); } setTypeReleaseIterator(si); @@ -410,7 +502,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, decrRefCount(dstset); addReply(c,shared.czero); } - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } else { setDeferredMultiBulkLength(c,replylen,cardinality); @@ -463,7 +555,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * if (!sets[j]) continue; /* non existing keys are like empty sets */ si = setTypeInitIterator(sets[j]); - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { if (op == REDIS_OP_UNION || j == 0) { if (setTypeAdd(dstset,ele)) { cardinality++; @@ -485,7 +577,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * if (!dstkey) { addReplyMultiBulkLen(c,cardinality); si = setTypeInitIterator(dstset); - while((ele = setTypeNext(si)) != NULL) { + while((ele = setTypeNextObject(si)) != NULL) { addReplyBulk(c,ele); decrRefCount(ele); } @@ -502,7 +594,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj * decrRefCount(dstset); addReply(c,shared.czero); } - touchWatchedKey(c->db,dstkey); + signalModifiedKey(c->db,dstkey); server.dirty++; } zfree(sets);