]> git.saurik.com Git - redis.git/blobdiff - src/t_set.c
Merge remote-tracking branch 'origin/unstable' into unstable
[redis.git] / src / t_set.c
index 3fbf13a352affabf72ae6602e5194254d4fa752c..df8ade4773b31a8c39b49b0257378f6da270b6f0 100644 (file)
@@ -8,7 +8,7 @@
  * an integer-encodable value, an intset will be returned. Otherwise a regular
  * hash table. */
 robj *setTypeCreate(robj *value) {
-    if (getLongLongFromObject(value,NULL) == REDIS_OK)
+    if (isObjectRepresentableAsLongLong(value,NULL) == REDIS_OK)
         return createIntsetObject();
     return createSetObject();
 }
@@ -21,7 +21,7 @@ int setTypeAdd(robj *subject, robj *value) {
             return 1;
         }
     } else if (subject->encoding == REDIS_ENCODING_INTSET) {
-        if (getLongLongFromObject(value,&llval) == REDIS_OK) {
+        if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) {
             uint8_t success = 0;
             subject->ptr = intsetAdd(subject->ptr,llval,&success);
             if (success) {
@@ -37,7 +37,7 @@ int setTypeAdd(robj *subject, robj *value) {
 
             /* The set *was* an intset and this value is not integer
              * encodable, so dictAdd should always work. */
-            redisAssert(dictAdd(subject->ptr,value,NULL) == DICT_OK);
+            redisAssertWithInfo(NULL,value,dictAdd(subject->ptr,value,NULL) == DICT_OK);
             incrRefCount(value);
             return 1;
         }
@@ -47,17 +47,17 @@ int setTypeAdd(robj *subject, robj *value) {
     return 0;
 }
 
-int setTypeRemove(robj *subject, robj *value) {
+int setTypeRemove(robj *setobj, robj *value) {
     long long llval;
-    if (subject->encoding == REDIS_ENCODING_HT) {
-        if (dictDelete(subject->ptr,value) == DICT_OK) {
-            if (htNeedsResize(subject->ptr)) dictResize(subject->ptr);
+    if (setobj->encoding == REDIS_ENCODING_HT) {
+        if (dictDelete(setobj->ptr,value) == DICT_OK) {
+            if (htNeedsResize(setobj->ptr)) dictResize(setobj->ptr);
             return 1;
         }
-    } else if (subject->encoding == REDIS_ENCODING_INTSET) {
-        if (getLongLongFromObject(value,&llval) == REDIS_OK) {
-            uint8_t success;
-            subject->ptr = intsetRemove(subject->ptr,llval,&success);
+    } else if (setobj->encoding == REDIS_ENCODING_INTSET) {
+        if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) {
+            int success;
+            setobj->ptr = intsetRemove(setobj->ptr,llval,&success);
             if (success) return 1;
         }
     } else {
@@ -71,7 +71,7 @@ int setTypeIsMember(robj *subject, robj *value) {
     if (subject->encoding == REDIS_ENCODING_HT) {
         return dictFind((dict*)subject->ptr,value) != NULL;
     } else if (subject->encoding == REDIS_ENCODING_INTSET) {
-        if (getLongLongFromObject(value,&llval) == REDIS_OK) {
+        if (isObjectRepresentableAsLongLong(value,&llval) == REDIS_OK) {
             return intsetFind((intset*)subject->ptr,llval);
         }
     } else {
@@ -80,8 +80,8 @@ int setTypeIsMember(robj *subject, robj *value) {
     return 0;
 }
 
-setIterator *setTypeInitIterator(robj *subject) {
-    setIterator *si = zmalloc(sizeof(setIterator));
+setTypeIterator *setTypeInitIterator(robj *subject) {
+    setTypeIterator *si = zmalloc(sizeof(setTypeIterator));
     si->subject = subject;
     si->encoding = subject->encoding;
     if (si->encoding == REDIS_ENCODING_HT) {
@@ -94,47 +94,84 @@ setIterator *setTypeInitIterator(robj *subject) {
     return si;
 }
 
-void setTypeReleaseIterator(setIterator *si) {
+void setTypeReleaseIterator(setTypeIterator *si) {
     if (si->encoding == REDIS_ENCODING_HT)
         dictReleaseIterator(si->di);
     zfree(si);
 }
 
 /* Move to the next entry in the set. Returns the object at the current
- * position, or NULL when the end is reached. This object will have its
- * refcount incremented, so the caller needs to take care of this. */
-robj *setTypeNext(setIterator *si) {
-    robj *ret = NULL;
+ * position.
+ *
+ * Since set elements can be internally be stored as redis objects or
+ * simple arrays of integers, setTypeNext returns the encoding of the
+ * set object you are iterating, and will populate the appropriate pointer
+ * (eobj) or (llobj) accordingly.
+ *
+ * When there are no longer elements -1 is returned.
+ * Returned objects ref count is not incremented, so this function is
+ * copy on write friendly. */
+int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) {
     if (si->encoding == REDIS_ENCODING_HT) {
         dictEntry *de = dictNext(si->di);
-        if (de != NULL) {
-            ret = dictGetEntryKey(de);
-            incrRefCount(ret);
-        }
+        if (de == NULL) return -1;
+        *objele = dictGetKey(de);
     } else if (si->encoding == REDIS_ENCODING_INTSET) {
-        long long llval;
-        if (intsetGet(si->subject->ptr,si->ii++,&llval))
-            ret = createStringObjectFromLongLong(llval);
+        if (!intsetGet(si->subject->ptr,si->ii++,llele))
+            return -1;
     }
-    return ret;
+    return si->encoding;
 }
 
+/* The not copy on write friendly version but easy to use version
+ * of setTypeNext() is setTypeNextObject(), returning new objects
+ * or incrementing the ref count of returned objects. So if you don't
+ * retain a pointer to this object you should call decrRefCount() against it.
+ *
+ * This function is the way to go for write operations where COW is not
+ * an issue as the result will be anyway of incrementing the ref count. */
+robj *setTypeNextObject(setTypeIterator *si) {
+    int64_t intele;
+    robj *objele;
+    int encoding;
+
+    encoding = setTypeNext(si,&objele,&intele);
+    switch(encoding) {
+        case -1:    return NULL;
+        case REDIS_ENCODING_INTSET:
+            return createStringObjectFromLongLong(intele);
+        case REDIS_ENCODING_HT:
+            incrRefCount(objele);
+            return objele;
+        default:
+            redisPanic("Unsupported encoding");
+    }
+    return NULL; /* just to suppress warnings */
+}
 
-/* Return random element from set. The returned object will always have
- * an incremented refcount. */
-robj *setTypeRandomElement(robj *subject) {
-    robj *ret = NULL;
-    if (subject->encoding == REDIS_ENCODING_HT) {
-        dictEntry *de = dictGetRandomKey(subject->ptr);
-        ret = dictGetEntryKey(de);
-        incrRefCount(ret);
-    } else if (subject->encoding == REDIS_ENCODING_INTSET) {
-        long long llval = intsetRandom(subject->ptr);
-        ret = createStringObjectFromLongLong(llval);
+/* Return random element from a non empty set.
+ * The returned element can be a int64_t value if the set is encoded
+ * as an "intset" blob of integers, or a redis object if the set
+ * is a regular set.
+ *
+ * The caller provides both pointers to be populated with the right
+ * object. The return value of the function is the object->encoding
+ * field of the object and is used by the caller to check if the
+ * int64_t pointer or the redis object pointere was populated.
+ *
+ * When an object is returned (the set was a real set) the ref count
+ * of the object is not incremented so this function can be considered
+ * copy on write friendly. */
+int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) {
+    if (setobj->encoding == REDIS_ENCODING_HT) {
+        dictEntry *de = dictGetRandomKey(setobj->ptr);
+        *objele = dictGetKey(de);
+    } else if (setobj->encoding == REDIS_ENCODING_INTSET) {
+        *llele = intsetRandom(setobj->ptr);
     } else {
         redisPanic("Unknown set encoding");
     }
-    return ret;
+    return setobj->encoding;
 }
 
 unsigned long setTypeSize(robj *subject) {
@@ -148,27 +185,32 @@ unsigned long setTypeSize(robj *subject) {
 }
 
 /* Convert the set to specified encoding. The resulting dict (when converting
- * to a hashtable) is presized to hold the number of elements in the original
+ * to a hash table) is presized to hold the number of elements in the original
  * set. */
-void setTypeConvert(robj *subject, int enc) {
-    setIterator *si;
-    robj *element;
-    redisAssert(subject->type == REDIS_SET);
+void setTypeConvert(robj *setobj, int enc) {
+    setTypeIterator *si;
+    redisAssertWithInfo(NULL,setobj,setobj->type == REDIS_SET &&
+                             setobj->encoding == REDIS_ENCODING_INTSET);
 
     if (enc == REDIS_ENCODING_HT) {
+        int64_t intele;
         dict *d = dictCreate(&setDictType,NULL);
+        robj *element;
+
         /* Presize the dict to avoid rehashing */
-        dictExpand(d,intsetLen(subject->ptr));
+        dictExpand(d,intsetLen(setobj->ptr));
 
-        /* setTypeGet returns a robj with incremented refcount */
-        si = setTypeInitIterator(subject);
-        while ((element = setTypeNext(si)) != NULL)
-            redisAssert(dictAdd(d,element,NULL) == DICT_OK);
+        /* To add the elements we extract integers and create redis objects */
+        si = setTypeInitIterator(setobj);
+        while (setTypeNext(si,NULL,&intele) != -1) {
+            element = createStringObjectFromLongLong(intele);
+            redisAssertWithInfo(NULL,element,dictAdd(d,element,NULL) == DICT_OK);
+        }
         setTypeReleaseIterator(si);
 
-        subject->encoding = REDIS_ENCODING_HT;
-        zfree(subject->ptr);
-        subject->ptr = d;
+        setobj->encoding = REDIS_ENCODING_HT;
+        zfree(setobj->ptr);
+        setobj->ptr = d;
     } else {
         redisPanic("Unsupported set conversion");
     }
@@ -176,6 +218,7 @@ void setTypeConvert(robj *subject, int enc) {
 
 void saddCommand(redisClient *c) {
     robj *set;
+    int j, added = 0;
 
     set = lookupKeyWrite(c->db,c->argv[1]);
     if (set == NULL) {
@@ -187,34 +230,44 @@ void saddCommand(redisClient *c) {
             return;
         }
     }
-    if (setTypeAdd(set,c->argv[2])) {
-        server.dirty++;
-        addReply(c,shared.cone);
-    } else {
-        addReply(c,shared.czero);
+
+    for (j = 2; j < c->argc; j++) {
+        c->argv[j] = tryObjectEncoding(c->argv[j]);
+        if (setTypeAdd(set,c->argv[j])) added++;
     }
+    if (added) signalModifiedKey(c->db,c->argv[1]);
+    server.dirty += added;
+    addReplyLongLong(c,added);
 }
 
 void sremCommand(redisClient *c) {
     robj *set;
+    int j, deleted = 0;
 
     if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
-    if (setTypeRemove(set,c->argv[2])) {
-        if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
-        server.dirty++;
-        addReply(c,shared.cone);
-    } else {
-        addReply(c,shared.czero);
+    for (j = 2; j < c->argc; j++) {
+        if (setTypeRemove(set,c->argv[j])) {
+            deleted++;
+            if (setTypeSize(set) == 0) {
+                dbDelete(c->db,c->argv[1]);
+                break;
+            }
+        }
+    }
+    if (deleted) {
+        signalModifiedKey(c->db,c->argv[1]);
+        server.dirty += deleted;
     }
+    addReplyLongLong(c,deleted);
 }
 
 void smoveCommand(redisClient *c) {
     robj *srcset, *dstset, *ele;
     srcset = lookupKeyWrite(c->db,c->argv[1]);
     dstset = lookupKeyWrite(c->db,c->argv[2]);
-    ele = c->argv[3];
+    ele = c->argv[3] = tryObjectEncoding(c->argv[3]);
 
     /* If the source key does not exist return 0 */
     if (srcset == NULL) {
@@ -241,6 +294,8 @@ void smoveCommand(redisClient *c) {
 
     /* Remove the src set from the database when empty */
     if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[2]);
     server.dirty++;
 
     /* Create the destination set when it doesn't exist */
@@ -260,6 +315,7 @@ void sismemberCommand(redisClient *c) {
     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
+    c->argv[2] = tryObjectEncoding(c->argv[2]);
     if (setTypeIsMember(set,c->argv[2]))
         addReply(c,shared.cone);
     else
@@ -272,39 +328,51 @@ void scardCommand(redisClient *c) {
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
         checkType(c,o,REDIS_SET)) return;
 
-    addReplyUlong(c,setTypeSize(o));
+    addReplyLongLong(c,setTypeSize(o));
 }
 
 void spopCommand(redisClient *c) {
-    robj *set, *ele;
+    robj *set, *ele, *aux;
+    int64_t llele;
+    int encoding;
 
     if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
-    ele = setTypeRandomElement(set);
-    if (ele == NULL) {
-        addReply(c,shared.nullbulk);
+    encoding = setTypeRandomElement(set,&ele,&llele);
+    if (encoding == REDIS_ENCODING_INTSET) {
+        ele = createStringObjectFromLongLong(llele);
+        set->ptr = intsetRemove(set->ptr,llele,NULL);
     } else {
+        incrRefCount(ele);
         setTypeRemove(set,ele);
-        addReplyBulk(c,ele);
-        decrRefCount(ele);
-        if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
-        server.dirty++;
     }
+
+    /* Replicate/AOF this command as an SREM operation */
+    aux = createStringObject("SREM",4);
+    rewriteClientCommandVector(c,3,aux,c->argv[1],ele);
+    decrRefCount(ele);
+    decrRefCount(aux);
+
+    addReplyBulk(c,ele);
+    if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
+    server.dirty++;
 }
 
 void srandmemberCommand(redisClient *c) {
     robj *set, *ele;
+    int64_t llele;
+    int encoding;
 
     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
-    ele = setTypeRandomElement(set);
-    if (ele == NULL) {
-        addReply(c,shared.nullbulk);
+    encoding = setTypeRandomElement(set,&ele,&llele);
+    if (encoding == REDIS_ENCODING_INTSET) {
+        addReplyBulkLongLong(c,llele);
     } else {
         addReplyBulk(c,ele);
-        decrRefCount(ele);
     }
 }
 
@@ -314,9 +382,12 @@ int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
 
 void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
     robj **sets = zmalloc(sizeof(robj*)*setnum);
-    setIterator *si;
-    robj *ele, *lenobj = NULL, *dstset = NULL;
+    setTypeIterator *si;
+    robj *eleobj, *dstset = NULL;
+    int64_t intobj;
+    void *replylen = NULL;
     unsigned long j, cardinality = 0;
+    int encoding;
 
     for (j = 0; j < setnum; j++) {
         robj *setobj = dstkey ?
@@ -325,8 +396,10 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
         if (!setobj) {
             zfree(sets);
             if (dstkey) {
-                if (dbDelete(c->db,dstkey))
+                if (dbDelete(c->db,dstkey)) {
+                    signalModifiedKey(c->db,dstkey);
                     server.dirty++;
+                }
                 addReply(c,shared.czero);
             } else {
                 addReply(c,shared.emptymultibulk);
@@ -349,9 +422,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
      * to the output list and save the pointer to later modify it with the
      * right length */
     if (!dstkey) {
-        lenobj = createObject(REDIS_STRING,NULL);
-        addReply(c,lenobj);
-        decrRefCount(lenobj);
+        replylen = addDeferredMultiBulkLength(c);
     } else {
         /* If we have a target key where to store the resulting set
          * create this key with an empty set inside */
@@ -362,20 +433,61 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
      * the element against all the other sets, if at least one set does
      * not include the element it is discarded */
     si = setTypeInitIterator(sets[0]);
-    while((ele = setTypeNext(si)) != NULL) {
-        for (j = 1; j < setnum; j++)
-            if (!setTypeIsMember(sets[j],ele)) break;
+    while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) {
+        for (j = 1; j < setnum; j++) {
+            if (sets[j] == sets[0]) continue;
+            if (encoding == REDIS_ENCODING_INTSET) {
+                /* intset with intset is simple... and fast */
+                if (sets[j]->encoding == REDIS_ENCODING_INTSET &&
+                    !intsetFind((intset*)sets[j]->ptr,intobj))
+                {
+                    break;
+                /* in order to compare an integer with an object we
+                 * have to use the generic function, creating an object
+                 * for this */
+                } else if (sets[j]->encoding == REDIS_ENCODING_HT) {
+                    eleobj = createStringObjectFromLongLong(intobj);
+                    if (!setTypeIsMember(sets[j],eleobj)) {
+                        decrRefCount(eleobj);
+                        break;
+                    }
+                    decrRefCount(eleobj);
+                }
+            } else if (encoding == REDIS_ENCODING_HT) {
+                /* Optimization... if the source object is integer
+                 * encoded AND the target set is an intset, we can get
+                 * a much faster path. */
+                if (eleobj->encoding == REDIS_ENCODING_INT &&
+                    sets[j]->encoding == REDIS_ENCODING_INTSET &&
+                    !intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr))
+                {
+                    break;
+                /* else... object to object check is easy as we use the
+                 * type agnostic API here. */
+                } else if (!setTypeIsMember(sets[j],eleobj)) {
+                    break;
+                }
+            }
+        }
 
         /* Only take action when all sets contain the member */
         if (j == setnum) {
             if (!dstkey) {
-                addReplyBulk(c,ele);
+                if (encoding == REDIS_ENCODING_HT)
+                    addReplyBulk(c,eleobj);
+                else
+                    addReplyBulkLongLong(c,intobj);
                 cardinality++;
             } else {
-                setTypeAdd(dstset,ele);
+                if (encoding == REDIS_ENCODING_INTSET) {
+                    eleobj = createStringObjectFromLongLong(intobj);
+                    setTypeAdd(dstset,eleobj);
+                    decrRefCount(eleobj);
+                } else {
+                    setTypeAdd(dstset,eleobj);
+                }
             }
         }
-        decrRefCount(ele);
     }
     setTypeReleaseIterator(si);
 
@@ -390,9 +502,10 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
             decrRefCount(dstset);
             addReply(c,shared.czero);
         }
+        signalModifiedKey(c->db,dstkey);
         server.dirty++;
     } else {
-        lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
+        setDeferredMultiBulkLength(c,replylen,cardinality);
     }
     zfree(sets);
 }
@@ -411,7 +524,7 @@ void sinterstoreCommand(redisClient *c) {
 
 void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op) {
     robj **sets = zmalloc(sizeof(robj*)*setnum);
-    setIterator *si;
+    setTypeIterator *si;
     robj *ele, *dstset = NULL;
     int j, cardinality = 0;
 
@@ -442,7 +555,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
         if (!sets[j]) continue; /* non existing keys are like empty sets */
 
         si = setTypeInitIterator(sets[j]);
-        while((ele = setTypeNext(si)) != NULL) {
+        while((ele = setTypeNextObject(si)) != NULL) {
             if (op == REDIS_OP_UNION || j == 0) {
                 if (setTypeAdd(dstset,ele)) {
                     cardinality++;
@@ -462,9 +575,9 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
 
     /* Output the content of the resulting set, if not in STORE mode */
     if (!dstkey) {
-        addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
+        addReplyMultiBulkLen(c,cardinality);
         si = setTypeInitIterator(dstset);
-        while((ele = setTypeNext(si)) != NULL) {
+        while((ele = setTypeNextObject(si)) != NULL) {
             addReplyBulk(c,ele);
             decrRefCount(ele);
         }
@@ -481,6 +594,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
             decrRefCount(dstset);
             addReply(c,shared.czero);
         }
+        signalModifiedKey(c->db,dstkey);
         server.dirty++;
     }
     zfree(sets);