return setTypeSize(*(robj**)s1)-setTypeSize(*(robj**)s2);
+/* This is used by SDIFF and in this case we can receive NULL that should
+ * be handled as empty sets. */
+int qsortCompareSetsByRevCardinality(const void *s1, const void *s2) {
+ robj *o1 = *(robj**)s1, *o2 = *(robj**)s2;
+ return (o2 ? setTypeSize(o2) : 0) - (o1 ? setTypeSize(o1) : 0);
void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
setTypeIterator *si;
robj *ele, *dstset = NULL;
int j, cardinality = 0;
+ int diff_algo = 1;
for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
sets[j] = setobj;
+ /* Select what DIFF algorithm to use.
+ *
+ * Algorithm 1 is O(N*M) where N is the size of the element first set
+ * and M the total number of sets.
+ *
+ * Algorithm 2 is O(N) where N is the total number of elements in all
+ * the sets.
+ *
+ * We compute what is the best bet with the current input here. */
+ if (op == REDIS_OP_DIFF && sets[0]) {
+ long long algo_one_work = 0, algo_two_work = 0;
+ for (j = 0; j < setnum; j++) {
+ if (sets[j] == NULL) continue;
+ algo_one_work += setTypeSize(sets[0]);
+ algo_two_work += setTypeSize(sets[j]);
+ }
+ /* Algorithm 1 has better constant times and performs less operations
+ * if there are elements in common. Give it some advantage. */
+ algo_one_work /= 2;
+ diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;
+ if (diff_algo == 1 && setnum > 1) {
+ /* With algorithm 1 it is better to order the sets to subtract
+ * by decreasing size, so that we are more likely to find
+ * duplicated elements ASAP. */
+ qsort(sets+1,setnum-1,sizeof(robj*),
+ qsortCompareSetsByRevCardinality);
+ }
+ }
/* We need a temp set object to store our union. If the dstkey
* is not NULL (that is, we are inside an SUNIONSTORE operation) then
* this set object will be the resulting object to set into the target key*/
dstset = createIntsetObject();
- /* Iterate all the elements of all the sets, add every element a single
- * time to the result set */
- for (j = 0; j < setnum; j++) {
- if (op == REDIS_OP_DIFF && j == 0 && !sets[j]) break; /* result set is empty */
- if (!sets[j]) continue; /* non existing keys are like empty sets */
+ if (op == REDIS_OP_UNION) {
+ /* Union is trivial, just add every element of every set to the
+ * temporary set. */
+ for (j = 0; j < setnum; j++) {
+ if (!sets[j]) continue; /* non existing keys are like empty sets */
- si = setTypeInitIterator(sets[j]);
+ si = setTypeInitIterator(sets[j]);
+ while((ele = setTypeNextObject(si)) != NULL) {
+ if (setTypeAdd(dstset,ele)) cardinality++;
+ decrRefCount(ele);
+ }
+ setTypeReleaseIterator(si);
+ }
+ } else if (op == REDIS_OP_DIFF && sets[0] && diff_algo == 1) {
+ /* DIFF Algorithm 1:
+ *
+ * We perform the diff by iterating all the elements of the first set,
+ * and only adding it to the target set if the element does not exist
+ * into all the other sets.
+ *
+ * This way we perform at max N*M operations, where N is the size of
+ * the first set, and M the number of sets. */
+ si = setTypeInitIterator(sets[0]);
while((ele = setTypeNextObject(si)) != NULL) {
- if (op == REDIS_OP_UNION || j == 0) {
- if (setTypeAdd(dstset,ele)) {
- cardinality++;
- }
- } else if (op == REDIS_OP_DIFF) {
- if (setTypeRemove(dstset,ele)) {
- cardinality--;
- }
+ for (j = 1; j < setnum; j++) {
+ if (!sets[j]) continue; /* no key is an empty set. */
+ if (setTypeIsMember(sets[j],ele)) break;
+ }
+ if (j == setnum) {
+ /* There is no other set with this element. Add it. */
+ setTypeAdd(dstset,ele);
+ cardinality++;
+ } else if (op == REDIS_OP_DIFF && sets[0] && diff_algo == 2) {
+ /* DIFF Algorithm 2:
+ *
+ * Add all the elements of the first set to the auxiliary set.
+ * Then remove all the elements of all the next sets from it.
+ *
+ * This is O(N) where N is the sum of all the elements in every
+ * set. */
+ for (j = 0; j < setnum; j++) {
+ if (!sets[j]) continue; /* non existing keys are like empty sets */
+ si = setTypeInitIterator(sets[j]);
+ while((ele = setTypeNextObject(si)) != NULL) {
+ if (j == 0) {
+ if (setTypeAdd(dstset,ele)) cardinality++;
+ } else {
+ if (setTypeRemove(dstset,ele)) cardinality--;
+ }
+ decrRefCount(ele);
+ }
+ setTypeReleaseIterator(si);
- /* Exit when result set is empty. */
- if (op == REDIS_OP_DIFF && cardinality == 0) break;
+ /* Exit if result set is empty as any additional removal
+ * of elements will have no effect. */
+ if (cardinality == 0) break;
+ }
/* Output the content of the resulting set, if not in STORE mode */