* Set Commands
*----------------------------------------------------------------------------*/
+void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op);
+
/* Factory method to return a set that *can* hold "value". When the object has
* an integer-encodable value, an intset will be returned. Otherwise a regular
* hash table. */
robj *setTypeCreate(robj *value) {
- if (getLongLongFromObject(value,NULL) == REDIS_OK)
+ if (isObjectRepresentableAsLongLong(value,NULL) == REDIS_OK)
return createIntsetObject();
return createSetObject();
}
return 1;
}
} else if (subject->encoding == REDIS_ENCODING_INTSET) {
- if (getLongLongFromObject(value,&llval) == REDIS_OK) {
+ if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
/* The set *was* an intset and this value is not integer
* encodable, so dictAdd should always work. */
- redisAssert(dictAdd(subject->ptr,value,NULL) == DICT_OK);
+ redisAssertWithInfo(NULL,value,dictAdd(subject->ptr,value,NULL) == DICT_OK);
incrRefCount(value);
return 1;
}
return 0;
}
-int setTypeRemove(robj *subject, robj *value) {
+int setTypeRemove(robj *setobj, robj *value) {
long long llval;
- if (subject->encoding == REDIS_ENCODING_HT) {
- if (dictDelete(subject->ptr,value) == DICT_OK) {
- if (htNeedsResize(subject->ptr)) dictResize(subject->ptr);
+ if (setobj->encoding == REDIS_ENCODING_HT) {
+ if (dictDelete(setobj->ptr,value) == DICT_OK) {
+ if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr);
return 1;
}
- } else if (subject->encoding == REDIS_ENCODING_INTSET) {
- if (getLongLongFromObject(value,&llval) == REDIS_OK) {
- uint8_t success;
- subject->ptr = intsetRemove(subject->ptr,llval,&success);
+ } else if (setobj->encoding == REDIS_ENCODING_INTSET) {
+ if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) {
+ int success;
+ setobj->ptr = intsetRemove(setobj->ptr,llval,&success);
if (success) return 1;
}
} else {
if (subject->encoding == REDIS_ENCODING_HT) {
return dictFind((dict*)subject->ptr,value) != NULL;
} else if (subject->encoding == REDIS_ENCODING_INTSET) {
- if (getLongLongFromObject(value,&llval) == REDIS_OK) {
+ if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) {
return intsetFind((intset*)subject->ptr,llval);
}
} else {
}
/* Move to the next entry in the set. Returns the object at the current
- * position, or NULL when the end is reached. This object will have its
- * refcount incremented, so the caller needs to take care of this. */
-robj *setTypeNext(setTypeIterator *si) {
- robj *ret = NULL;
+ * position.
+ *
+ * Since set elements can be internally be stored as redis objects or
+ * simple arrays of integers, setTypeNext returns the encoding of the
+ * set object you are iterating, and will populate the appropriate pointer
+ * (eobj) or (llobj) accordingly.
+ *
+ * When there are no longer elements -1 is returned.
+ * Returned objects ref count is not incremented, so this function is
+ * copy on write friendly. */
+int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) {
if (si->encoding == REDIS_ENCODING_HT) {
dictEntry *de = dictNext(si->di);
- if (de != NULL) {
- ret = dictGetEntryKey(de);
- incrRefCount(ret);
- }
+ if (de == NULL) return -1;
+ *objele = dictGetKey(de);
} else if (si->encoding == REDIS_ENCODING_INTSET) {
- long long llval;
- if (intsetGet(si->subject->ptr,si->ii++,&llval))
- ret = createStringObjectFromLongLong(llval);
+ if (!intsetGet(si->subject->ptr,si->ii++,llele))
+ return -1;
}
- return ret;
+ return si->encoding;
}
+/* The not copy on write friendly version but easy to use version
+ * of setTypeNext() is setTypeNextObject(), returning new objects
+ * or incrementing the ref count of returned objects. So if you don't
+ * retain a pointer to this object you should call decrRefCount() against it.
+ *
+ * This function is the way to go for write operations where COW is not
+ * an issue as the result will be anyway of incrementing the ref count. */
+robj *setTypeNextObject(setTypeIterator *si) {
+ int64_t intele;
+ robj *objele;
+ int encoding;
+
+ encoding = setTypeNext(si,&objele,&intele);
+ switch(encoding) {
+ case -1: return NULL;
+ case REDIS_ENCODING_INTSET:
+ return createStringObjectFromLongLong(intele);
+ case REDIS_ENCODING_HT:
+ incrRefCount(objele);
+ return objele;
+ default:
+ redisPanic("Unsupported encoding");
+ }
+ return NULL; /* just to suppress warnings */
+}
-/* Return random element from set. The returned object will always have
- * an incremented refcount. */
-robj *setTypeRandomElement(robj *subject) {
- robj *ret = NULL;
- if (subject->encoding == REDIS_ENCODING_HT) {
- dictEntry *de = dictGetRandomKey(subject->ptr);
- ret = dictGetEntryKey(de);
- incrRefCount(ret);
- } else if (subject->encoding == REDIS_ENCODING_INTSET) {
- long long llval = intsetRandom(subject->ptr);
- ret = createStringObjectFromLongLong(llval);
+/* Return random element from a non empty set.
+ * The returned element can be a int64_t value if the set is encoded
+ * as an "intset" blob of integers, or a redis object if the set
+ * is a regular set.
+ *
+ * The caller provides both pointers to be populated with the right
+ * object. The return value of the function is the object->encoding
+ * field of the object and is used by the caller to check if the
+ * int64_t pointer or the redis object pointere was populated.
+ *
+ * When an object is returned (the set was a real set) the ref count
+ * of the object is not incremented so this function can be considered
+ * copy on write friendly. */
+int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) {
+ if (setobj->encoding == REDIS_ENCODING_HT) {
+ dictEntry *de = dictGetRandomKey(setobj->ptr);
+ *objele = dictGetKey(de);
+ } else if (setobj->encoding == REDIS_ENCODING_INTSET) {
+ *llele = intsetRandom(setobj->ptr);
} else {
redisPanic("Unknown set encoding");
}
- return ret;
+ return setobj->encoding;
}
unsigned long setTypeSize(robj *subject) {
}
/* Convert the set to specified encoding. The resulting dict (when converting
- * to a hashtable) is presized to hold the number of elements in the original
+ * to a hash table) is presized to hold the number of elements in the original
* set. */
-void setTypeConvert(robj *subject, int enc) {
+void setTypeConvert(robj *setobj, int enc) {
setTypeIterator *si;
- robj *element;
- redisAssert(subject->type == REDIS_SET);
+ redisAssertWithInfo(NULL,setobj,setobj->type == REDIS_SET &&
+ setobj->encoding == REDIS_ENCODING_INTSET);
if (enc == REDIS_ENCODING_HT) {
+ int64_t intele;
dict *d = dictCreate(&setDictType,NULL);
+ robj *element;
+
/* Presize the dict to avoid rehashing */
- dictExpand(d,intsetLen(subject->ptr));
+ dictExpand(d,intsetLen(setobj->ptr));
- /* setTypeGet returns a robj with incremented refcount */
- si = setTypeInitIterator(subject);
- while ((element = setTypeNext(si)) != NULL)
- redisAssert(dictAdd(d,element,NULL) == DICT_OK);
+ /* To add the elements we extract integers and create redis objects */
+ si = setTypeInitIterator(setobj);
+ while (setTypeNext(si,NULL,&intele) != -1) {
+ element = createStringObjectFromLongLong(intele);
+ redisAssertWithInfo(NULL,element,dictAdd(d,element,NULL) == DICT_OK);
+ }
setTypeReleaseIterator(si);
- subject->encoding = REDIS_ENCODING_HT;
- zfree(subject->ptr);
- subject->ptr = d;
+ setobj->encoding = REDIS_ENCODING_HT;
+ zfree(setobj->ptr);
+ setobj->ptr = d;
} else {
redisPanic("Unsupported set conversion");
}
void saddCommand(redisClient *c) {
robj *set;
+ int j, added = 0;
set = lookupKeyWrite(c->db,c->argv[1]);
if (set == NULL) {
return;
}
}
- if (setTypeAdd(set,c->argv[2])) {
- touchWatchedKey(c->db,c->argv[1]);
- server.dirty++;
- addReply(c,shared.cone);
- } else {
- addReply(c,shared.czero);
+
+ for (j = 2; j < c->argc; j++) {
+ c->argv[j] = tryObjectEncoding(c->argv[j]);
+ if (setTypeAdd(set,c->argv[j])) added++;
}
+ if (added) signalModifiedKey(c->db,c->argv[1]);
+ server.dirty += added;
+ addReplyLongLong(c,added);
}
void sremCommand(redisClient *c) {
robj *set;
+ int j, deleted = 0;
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,set,REDIS_SET)) return;
- if (setTypeRemove(set,c->argv[2])) {
- if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
- touchWatchedKey(c->db,c->argv[1]);
- server.dirty++;
- addReply(c,shared.cone);
- } else {
- addReply(c,shared.czero);
+ for (j = 2; j < c->argc; j++) {
+ if (setTypeRemove(set,c->argv[j])) {
+ deleted++;
+ if (setTypeSize(set) == 0) {
+ dbDelete(c->db,c->argv[1]);
+ break;
+ }
+ }
+ }
+ if (deleted) {
+ signalModifiedKey(c->db,c->argv[1]);
+ server.dirty += deleted;
}
+ addReplyLongLong(c,deleted);
}
void smoveCommand(redisClient *c) {
robj *srcset, *dstset, *ele;
srcset = lookupKeyWrite(c->db,c->argv[1]);
dstset = lookupKeyWrite(c->db,c->argv[2]);
- ele = c->argv[3];
+ ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
/* If the source key does not exist return 0 */
if (srcset == NULL) {
/* Remove the src set from the database when empty */
if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]);
- touchWatchedKey(c->db,c->argv[1]);
- touchWatchedKey(c->db,c->argv[2]);
+ signalModifiedKey(c->db,c->argv[1]);
+ signalModifiedKey(c->db,c->argv[2]);
server.dirty++;
/* Create the destination set when it doesn't exist */
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,set,REDIS_SET)) return;
+ c->argv[2] = tryObjectEncoding(c->argv[2]);
if (setTypeIsMember(set,c->argv[2]))
addReply(c,shared.cone);
else
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,REDIS_SET)) return;
- addReplyUlong(c,setTypeSize(o));
+ addReplyLongLong(c,setTypeSize(o));
}
void spopCommand(redisClient *c) {
- robj *set, *ele;
+ robj *set, *ele, *aux;
+ int64_t llele;
+ int encoding;
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,set,REDIS_SET)) return;
- ele = setTypeRandomElement(set);
- if (ele == NULL) {
- addReply(c,shared.nullbulk);
+ encoding = setTypeRandomElement(set,&ele,&llele);
+ if (encoding == REDIS_ENCODING_INTSET) {
+ ele = createStringObjectFromLongLong(llele);
+ set->ptr = intsetRemove(set->ptr,llele,NULL);
} else {
+ incrRefCount(ele);
setTypeRemove(set,ele);
- addReplyBulk(c,ele);
- decrRefCount(ele);
- if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
- touchWatchedKey(c->db,c->argv[1]);
- server.dirty++;
+ }
+
+ /* Replicate/AOF this command as an SREM operation */
+ aux = createStringObject("SREM",4);
+ rewriteClientCommandVector(c,3,aux,c->argv[1],ele);
+ decrRefCount(ele);
+ decrRefCount(aux);
+
+ addReplyBulk(c,ele);
+ if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
+ signalModifiedKey(c->db,c->argv[1]);
+ server.dirty++;
+}
+
+/* handle the "SRANDMEMBER key <count>" variant. The normal version of the
+ * command is handled by the srandmemberCommand() function itself. */
+
+/* How many times bigger should be the set compared to the requested size
+ * for us to don't use the "remove elements" strategy? Read later in the
+ * implementation for more info. */
+#define SRANDMEMBER_SUB_STRATEGY_MUL 3
+
+void srandmemberWithCountCommand(redisClient *c) {
+ long l;
+ unsigned long count, size;
+ int uniq = 1;
+ robj *set, *ele;
+ int64_t llele;
+ int encoding;
+
+ dict *d;
+
+ if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != REDIS_OK) return;
+ if (l >= 0) {
+ count = (unsigned) l;
+ } else {
+ /* A negative count means: return the same elements multiple times
+ * (i.e. don't remove the extracted element after every extraction). */
+ count = -l;
+ uniq = 0;
+ }
+
+ if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk))
+ == NULL || checkType(c,set,REDIS_SET)) return;
+ size = setTypeSize(set);
+
+ /* If count is zero, serve it ASAP to avoid special cases later. */
+ if (count == 0) {
+ addReply(c,shared.emptymultibulk);
+ return;
+ }
+
+ /* CASE 1: The count was negative, so the extraction method is just:
+ * "return N random elements" sampling the whole set every time.
+ * This case is trivial and can be served without auxiliary data
+ * structures. */
+ if (!uniq) {
+ addReplyMultiBulkLen(c,count);
+ while(count--) {
+ encoding = setTypeRandomElement(set,&ele,&llele);
+ if (encoding == REDIS_ENCODING_INTSET) {
+ addReplyBulkLongLong(c,llele);
+ } else {
+ addReplyBulk(c,ele);
+ }
+ }
+ return;
+ }
+
+ /* CASE 2:
+ * The number of requested elements is greater than the number of
+ * elements inside the set: simply return the whole set. */
+ if (count >= size) {
+ sunionDiffGenericCommand(c,c->argv,c->argc-1,NULL,REDIS_OP_UNION);
+ return;
+ }
+
+ /* For CASE 3 and CASE 4 we need an auxiliary dictionary. */
+ d = dictCreate(&setDictType,NULL);
+
+ /* CASE 3:
+ * The number of elements inside the set is not greater than
+ * SRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements.
+ * In this case we create a set from scratch with all the elements, and
+ * subtract random elements to reach the requested number of elements.
+ *
+ * This is done because if the number of requsted elements is just
+ * a bit less than the number of elements in the set, the natural approach
+ * used into CASE 3 is highly inefficient. */
+ if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) {
+ setTypeIterator *si;
+
+ /* Add all the elements into the temporary dictionary. */
+ si = setTypeInitIterator(set);
+ while((encoding = setTypeNext(si,&ele,&llele)) != -1) {
+ int retval;
+
+ if (encoding == REDIS_ENCODING_INTSET) {
+ retval = dictAdd(d,createStringObjectFromLongLong(llele),NULL);
+ } else if (ele->encoding == REDIS_ENCODING_RAW) {
+ retval = dictAdd(d,dupStringObject(ele),NULL);
+ } else if (ele->encoding == REDIS_ENCODING_INT) {
+ retval = dictAdd(d,
+ createStringObjectFromLongLong((long)ele->ptr),NULL);
+ }
+ redisAssert(retval == DICT_OK);
+ }
+ setTypeReleaseIterator(si);
+ redisAssert(dictSize(d) == size);
+
+ /* Remove random elements to reach the right count. */
+ while(size > count) {
+ dictEntry *de;
+
+ de = dictGetRandomKey(d);
+ dictDelete(d,dictGetKey(de));
+ size--;
+ }
+ }
+
+ /* CASE 4: We have a big set compared to the requested number of elements.
+ * In this case we can simply get random elements from the set and add
+ * to the temporary set, trying to eventually get enough unique elements
+ * to reach the specified count. */
+ else {
+ unsigned long added = 0;
+
+ while(added < count) {
+ encoding = setTypeRandomElement(set,&ele,&llele);
+ if (encoding == REDIS_ENCODING_INTSET) {
+ ele = createStringObjectFromLongLong(llele);
+ } else if (ele->encoding == REDIS_ENCODING_RAW) {
+ ele = dupStringObject(ele);
+ } else if (ele->encoding == REDIS_ENCODING_INT) {
+ ele = createStringObjectFromLongLong((long)ele->ptr);
+ }
+ /* Try to add the object to the dictionary. If it already exists
+ * free it, otherwise increment the number of objects we have
+ * in the result dictionary. */
+ if (dictAdd(d,ele,NULL) == DICT_OK)
+ added++;
+ else
+ decrRefCount(ele);
+ }
+ }
+
+ /* CASE 3 & 4: send the result to the user. */
+ {
+ dictIterator *di;
+ dictEntry *de;
+
+ addReplyMultiBulkLen(c,count);
+ di = dictGetIterator(d);
+ while((de = dictNext(di)) != NULL)
+ addReplyBulk(c,dictGetKey(de));
+ dictReleaseIterator(di);
+ dictRelease(d);
}
}
void srandmemberCommand(redisClient *c) {
robj *set, *ele;
+ int64_t llele;
+ int encoding;
+
+ if (c->argc == 3) {
+ srandmemberWithCountCommand(c);
+ return;
+ } else if (c->argc > 3) {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,set,REDIS_SET)) return;
- ele = setTypeRandomElement(set);
- if (ele == NULL) {
- addReply(c,shared.nullbulk);
+ encoding = setTypeRandomElement(set,&ele,&llele);
+ if (encoding == REDIS_ENCODING_INTSET) {
+ addReplyBulkLongLong(c,llele);
} else {
addReplyBulk(c,ele);
- decrRefCount(ele);
}
}
void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
- robj *ele, *lenobj = NULL, *dstset = NULL;
+ robj *eleobj, *dstset = NULL;
+ int64_t intobj;
+ void *replylen = NULL;
unsigned long j, cardinality = 0;
+ int encoding;
for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
zfree(sets);
if (dstkey) {
if (dbDelete(c->db,dstkey)) {
- touchWatchedKey(c->db,dstkey);
+ signalModifiedKey(c->db,dstkey);
server.dirty++;
}
addReply(c,shared.czero);
* to the output list and save the pointer to later modify it with the
* right length */
if (!dstkey) {
- lenobj = createObject(REDIS_STRING,NULL);
- addReply(c,lenobj);
- decrRefCount(lenobj);
+ replylen = addDeferredMultiBulkLength(c);
} else {
/* If we have a target key where to store the resulting set
* create this key with an empty set inside */
* the element against all the other sets, if at least one set does
* not include the element it is discarded */
si = setTypeInitIterator(sets[0]);
- while((ele = setTypeNext(si)) != NULL) {
- for (j = 1; j < setnum; j++)
- if (!setTypeIsMember(sets[j],ele)) break;
+ while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) {
+ for (j = 1; j < setnum; j++) {
+ if (sets[j] == sets[0]) continue;
+ if (encoding == REDIS_ENCODING_INTSET) {
+ /* intset with intset is simple... and fast */
+ if (sets[j]->encoding == REDIS_ENCODING_INTSET &&
+ !intsetFind((intset*)sets[j]->ptr,intobj))
+ {
+ break;
+ /* in order to compare an integer with an object we
+ * have to use the generic function, creating an object
+ * for this */
+ } else if (sets[j]->encoding == REDIS_ENCODING_HT) {
+ eleobj = createStringObjectFromLongLong(intobj);
+ if (!setTypeIsMember(sets[j],eleobj)) {
+ decrRefCount(eleobj);
+ break;
+ }
+ decrRefCount(eleobj);
+ }
+ } else if (encoding == REDIS_ENCODING_HT) {
+ /* Optimization... if the source object is integer
+ * encoded AND the target set is an intset, we can get
+ * a much faster path. */
+ if (eleobj->encoding == REDIS_ENCODING_INT &&
+ sets[j]->encoding == REDIS_ENCODING_INTSET &&
+ !intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr))
+ {
+ break;
+ /* else... object to object check is easy as we use the
+ * type agnostic API here. */
+ } else if (!setTypeIsMember(sets[j],eleobj)) {
+ break;
+ }
+ }
+ }
/* Only take action when all sets contain the member */
if (j == setnum) {
if (!dstkey) {
- addReplyBulk(c,ele);
+ if (encoding == REDIS_ENCODING_HT)
+ addReplyBulk(c,eleobj);
+ else
+ addReplyBulkLongLong(c,intobj);
cardinality++;
} else {
- setTypeAdd(dstset,ele);
+ if (encoding == REDIS_ENCODING_INTSET) {
+ eleobj = createStringObjectFromLongLong(intobj);
+ setTypeAdd(dstset,eleobj);
+ decrRefCount(eleobj);
+ } else {
+ setTypeAdd(dstset,eleobj);
+ }
}
}
- decrRefCount(ele);
}
setTypeReleaseIterator(si);
decrRefCount(dstset);
addReply(c,shared.czero);
}
- touchWatchedKey(c->db,dstkey);
+ signalModifiedKey(c->db,dstkey);
server.dirty++;
} else {
- lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
+ setDeferredMultiBulkLength(c,replylen,cardinality);
}
zfree(sets);
}
if (!sets[j]) continue; /* non existing keys are like empty sets */
si = setTypeInitIterator(sets[j]);
- while((ele = setTypeNext(si)) != NULL) {
+ while((ele = setTypeNextObject(si)) != NULL) {
if (op == REDIS_OP_UNION || j == 0) {
if (setTypeAdd(dstset,ele)) {
cardinality++;
/* Output the content of the resulting set, if not in STORE mode */
if (!dstkey) {
- addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
+ addReplyMultiBulkLen(c,cardinality);
si = setTypeInitIterator(dstset);
- while((ele = setTypeNext(si)) != NULL) {
+ while((ele = setTypeNextObject(si)) != NULL) {
addReplyBulk(c,ele);
decrRefCount(ele);
}
decrRefCount(dstset);
addReply(c,shared.czero);
}
- touchWatchedKey(c->db,dstkey);
+ signalModifiedKey(c->db,dstkey);
server.dirty++;
}
zfree(sets);