]> git.saurik.com Git - redis.git/blobdiff - src/t_set.c
Fast memory test on Redis crash.
[redis.git] / src / t_set.c
index 0b4128adf4c804053b9e237b0a7a9b9f354e8f1c..46a0c6ee502417eb2688b1aa3958a2a97ab0880a 100644 (file)
@@ -1,9 +1,40 @@
+/*
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *   * Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   * Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *   * Neither the name of Redis nor the names of its contributors may be used
+ *     to endorse or promote products derived from this software without
+ *     specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
 #include "redis.h"
 
 /*-----------------------------------------------------------------------------
  * Set Commands
  *----------------------------------------------------------------------------*/
 
 #include "redis.h"
 
 /*-----------------------------------------------------------------------------
  * Set Commands
  *----------------------------------------------------------------------------*/
 
+void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op);
+
 /* Factory method to return a set that *can* hold "value". When the object has
  * an integer-encodable value, an intset will be returned. Otherwise a regular
  * hash table. */
 /* Factory method to return a set that *can* hold "value". When the object has
  * an integer-encodable value, an intset will be returned. Otherwise a regular
  * hash table. */
@@ -37,7 +68,7 @@ int setTypeAdd(robj *subject, robj *value) {
 
             /* The set *was* an intset and this value is not integer
              * encodable, so dictAdd should always work. */
 
             /* The set *was* an intset and this value is not integer
              * encodable, so dictAdd should always work. */
-            redisAssert(dictAdd(subject->ptr,value,NULL) == DICT_OK);
+            redisAssertWithInfo(NULL,value,dictAdd(subject->ptr,value,NULL) == DICT_OK);
             incrRefCount(value);
             return 1;
         }
             incrRefCount(value);
             return 1;
         }
@@ -115,7 +146,7 @@ int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele) {
     if (si->encoding == REDIS_ENCODING_HT) {
         dictEntry *de = dictNext(si->di);
         if (de == NULL) return -1;
     if (si->encoding == REDIS_ENCODING_HT) {
         dictEntry *de = dictNext(si->di);
         if (de == NULL) return -1;
-        *objele = dictGetEntryKey(de);
+        *objele = dictGetKey(de);
     } else if (si->encoding == REDIS_ENCODING_INTSET) {
         if (!intsetGet(si->subject->ptr,si->ii++,llele))
             return -1;
     } else if (si->encoding == REDIS_ENCODING_INTSET) {
         if (!intsetGet(si->subject->ptr,si->ii++,llele))
             return -1;
@@ -165,7 +196,7 @@ robj *setTypeNextObject(setTypeIterator *si) {
 int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) {
     if (setobj->encoding == REDIS_ENCODING_HT) {
         dictEntry *de = dictGetRandomKey(setobj->ptr);
 int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele) {
     if (setobj->encoding == REDIS_ENCODING_HT) {
         dictEntry *de = dictGetRandomKey(setobj->ptr);
-        *objele = dictGetEntryKey(de);
+        *objele = dictGetKey(de);
     } else if (setobj->encoding == REDIS_ENCODING_INTSET) {
         *llele = intsetRandom(setobj->ptr);
     } else {
     } else if (setobj->encoding == REDIS_ENCODING_INTSET) {
         *llele = intsetRandom(setobj->ptr);
     } else {
@@ -185,12 +216,12 @@ unsigned long setTypeSize(robj *subject) {
 }
 
 /* Convert the set to specified encoding. The resulting dict (when converting
 }
 
 /* Convert the set to specified encoding. The resulting dict (when converting
- * to a hashtable) is presized to hold the number of elements in the original
+ * to a hash table) is presized to hold the number of elements in the original
  * set. */
 void setTypeConvert(robj *setobj, int enc) {
     setTypeIterator *si;
  * set. */
 void setTypeConvert(robj *setobj, int enc) {
     setTypeIterator *si;
-    redisAssert(setobj->type == REDIS_SET &&
-                setobj->encoding == REDIS_ENCODING_INTSET);
+    redisAssertWithInfo(NULL,setobj,setobj->type == REDIS_SET &&
+                             setobj->encoding == REDIS_ENCODING_INTSET);
 
     if (enc == REDIS_ENCODING_HT) {
         int64_t intele;
 
     if (enc == REDIS_ENCODING_HT) {
         int64_t intele;
@@ -204,7 +235,7 @@ void setTypeConvert(robj *setobj, int enc) {
         si = setTypeInitIterator(setobj);
         while (setTypeNext(si,NULL,&intele) != -1) {
             element = createStringObjectFromLongLong(intele);
         si = setTypeInitIterator(setobj);
         while (setTypeNext(si,NULL,&intele) != -1) {
             element = createStringObjectFromLongLong(intele);
-            redisAssert(dictAdd(d,element,NULL) == DICT_OK);
+            redisAssertWithInfo(NULL,element,dictAdd(d,element,NULL) == DICT_OK);
         }
         setTypeReleaseIterator(si);
 
         }
         setTypeReleaseIterator(si);
 
@@ -218,9 +249,9 @@ void setTypeConvert(robj *setobj, int enc) {
 
 void saddCommand(redisClient *c) {
     robj *set;
 
 void saddCommand(redisClient *c) {
     robj *set;
+    int j, added = 0;
 
     set = lookupKeyWrite(c->db,c->argv[1]);
 
     set = lookupKeyWrite(c->db,c->argv[1]);
-    c->argv[2] = tryObjectEncoding(c->argv[2]);
     if (set == NULL) {
         set = setTypeCreate(c->argv[2]);
         dbAdd(c->db,c->argv[1],set);
     if (set == NULL) {
         set = setTypeCreate(c->argv[2]);
         dbAdd(c->db,c->argv[1],set);
@@ -230,30 +261,37 @@ void saddCommand(redisClient *c) {
             return;
         }
     }
             return;
         }
     }
-    if (setTypeAdd(set,c->argv[2])) {
-        touchWatchedKey(c->db,c->argv[1]);
-        server.dirty++;
-        addReply(c,shared.cone);
-    } else {
-        addReply(c,shared.czero);
+
+    for (j = 2; j < c->argc; j++) {
+        c->argv[j] = tryObjectEncoding(c->argv[j]);
+        if (setTypeAdd(set,c->argv[j])) added++;
     }
     }
+    if (added) signalModifiedKey(c->db,c->argv[1]);
+    server.dirty += added;
+    addReplyLongLong(c,added);
 }
 
 void sremCommand(redisClient *c) {
     robj *set;
 }
 
 void sremCommand(redisClient *c) {
     robj *set;
+    int j, deleted = 0;
 
     if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
 
     if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
-    c->argv[2] = tryObjectEncoding(c->argv[2]);
-    if (setTypeRemove(set,c->argv[2])) {
-        if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
-        touchWatchedKey(c->db,c->argv[1]);
-        server.dirty++;
-        addReply(c,shared.cone);
-    } else {
-        addReply(c,shared.czero);
+    for (j = 2; j < c->argc; j++) {
+        if (setTypeRemove(set,c->argv[j])) {
+            deleted++;
+            if (setTypeSize(set) == 0) {
+                dbDelete(c->db,c->argv[1]);
+                break;
+            }
+        }
     }
     }
+    if (deleted) {
+        signalModifiedKey(c->db,c->argv[1]);
+        server.dirty += deleted;
+    }
+    addReplyLongLong(c,deleted);
 }
 
 void smoveCommand(redisClient *c) {
 }
 
 void smoveCommand(redisClient *c) {
@@ -287,8 +325,8 @@ void smoveCommand(redisClient *c) {
 
     /* Remove the src set from the database when empty */
     if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]);
 
     /* Remove the src set from the database when empty */
     if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[2]);
+    signalModifiedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[2]);
     server.dirty++;
 
     /* Create the destination set when it doesn't exist */
     server.dirty++;
 
     /* Create the destination set when it doesn't exist */
@@ -325,7 +363,7 @@ void scardCommand(redisClient *c) {
 }
 
 void spopCommand(redisClient *c) {
 }
 
 void spopCommand(redisClient *c) {
-    robj *set, *ele;
+    robj *set, *ele, *aux;
     int64_t llele;
     int encoding;
 
     int64_t llele;
     int encoding;
 
@@ -334,22 +372,184 @@ void spopCommand(redisClient *c) {
 
     encoding = setTypeRandomElement(set,&ele,&llele);
     if (encoding == REDIS_ENCODING_INTSET) {
 
     encoding = setTypeRandomElement(set,&ele,&llele);
     if (encoding == REDIS_ENCODING_INTSET) {
-        addReplyBulkLongLong(c,llele);
+        ele = createStringObjectFromLongLong(llele);
         set->ptr = intsetRemove(set->ptr,llele,NULL);
     } else {
         set->ptr = intsetRemove(set->ptr,llele,NULL);
     } else {
-        addReplyBulk(c,ele);
+        incrRefCount(ele);
         setTypeRemove(set,ele);
     }
         setTypeRemove(set,ele);
     }
+
+    /* Replicate/AOF this command as an SREM operation */
+    aux = createStringObject("SREM",4);
+    rewriteClientCommandVector(c,3,aux,c->argv[1],ele);
+    decrRefCount(ele);
+    decrRefCount(aux);
+
+    addReplyBulk(c,ele);
     if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
     if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
-    touchWatchedKey(c->db,c->argv[1]);
+    signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
 }
 
     server.dirty++;
 }
 
+/* handle the "SRANDMEMBER key <count>" variant. The normal version of the
+ * command is handled by the srandmemberCommand() function itself. */
+
+/* How many times bigger should be the set compared to the requested size
+ * for us to don't use the "remove elements" strategy? Read later in the
+ * implementation for more info. */
+#define SRANDMEMBER_SUB_STRATEGY_MUL 3
+
+void srandmemberWithCountCommand(redisClient *c) {
+    long l;
+    unsigned long count, size;
+    int uniq = 1;
+    robj *set, *ele;
+    int64_t llele;
+    int encoding;
+
+    dict *d;
+
+    if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != REDIS_OK) return;
+    if (l >= 0) {
+        count = (unsigned) l;
+    } else {
+        /* A negative count means: return the same elements multiple times
+         * (i.e. don't remove the extracted element after every extraction). */
+        count = -l;
+        uniq = 0;
+    }
+
+    if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk))
+        == NULL || checkType(c,set,REDIS_SET)) return;
+    size = setTypeSize(set);
+
+    /* If count is zero, serve it ASAP to avoid special cases later. */
+    if (count == 0) {
+        addReply(c,shared.emptymultibulk);
+        return;
+    }
+
+    /* CASE 1: The count was negative, so the extraction method is just:
+     * "return N random elements" sampling the whole set every time.
+     * This case is trivial and can be served without auxiliary data
+     * structures. */
+    if (!uniq) {
+        addReplyMultiBulkLen(c,count);
+        while(count--) {
+            encoding = setTypeRandomElement(set,&ele,&llele);
+            if (encoding == REDIS_ENCODING_INTSET) {
+                addReplyBulkLongLong(c,llele);
+            } else {
+                addReplyBulk(c,ele);
+            }
+        }
+        return;
+    }
+
+    /* CASE 2:
+     * The number of requested elements is greater than the number of
+     * elements inside the set: simply return the whole set. */
+    if (count >= size) {
+        sunionDiffGenericCommand(c,c->argv,c->argc-1,NULL,REDIS_OP_UNION);
+        return;
+    }
+
+    /* For CASE 3 and CASE 4 we need an auxiliary dictionary. */
+    d = dictCreate(&setDictType,NULL);
+
+    /* CASE 3:
+     * The number of elements inside the set is not greater than
+     * SRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements.
+     * In this case we create a set from scratch with all the elements, and
+     * subtract random elements to reach the requested number of elements.
+     *
+     * This is done because if the number of requsted elements is just
+     * a bit less than the number of elements in the set, the natural approach
+     * used into CASE 3 is highly inefficient. */
+    if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) {
+        setTypeIterator *si;
+
+        /* Add all the elements into the temporary dictionary. */
+        si = setTypeInitIterator(set);
+        while((encoding = setTypeNext(si,&ele,&llele)) != -1) {
+            int retval;
+
+            if (encoding == REDIS_ENCODING_INTSET) {
+                retval = dictAdd(d,createStringObjectFromLongLong(llele),NULL);
+            } else if (ele->encoding == REDIS_ENCODING_RAW) {
+                retval = dictAdd(d,dupStringObject(ele),NULL);
+            } else if (ele->encoding == REDIS_ENCODING_INT) {
+                retval = dictAdd(d,
+                    createStringObjectFromLongLong((long)ele->ptr),NULL);
+            }
+            redisAssert(retval == DICT_OK);
+        }
+        setTypeReleaseIterator(si);
+        redisAssert(dictSize(d) == size);
+
+        /* Remove random elements to reach the right count. */
+        while(size > count) {
+            dictEntry *de;
+
+            de = dictGetRandomKey(d);
+            dictDelete(d,dictGetKey(de));
+            size--;
+        }
+    }
+    
+    /* CASE 4: We have a big set compared to the requested number of elements.
+     * In this case we can simply get random elements from the set and add
+     * to the temporary set, trying to eventually get enough unique elements
+     * to reach the specified count. */
+    else {
+        unsigned long added = 0;
+
+        while(added < count) {
+            encoding = setTypeRandomElement(set,&ele,&llele);
+            if (encoding == REDIS_ENCODING_INTSET) {
+                ele = createStringObjectFromLongLong(llele);
+            } else if (ele->encoding == REDIS_ENCODING_RAW) {
+                ele = dupStringObject(ele);
+            } else if (ele->encoding == REDIS_ENCODING_INT) {
+                ele = createStringObjectFromLongLong((long)ele->ptr);
+            }
+            /* Try to add the object to the dictionary. If it already exists
+             * free it, otherwise increment the number of objects we have
+             * in the result dictionary. */
+            if (dictAdd(d,ele,NULL) == DICT_OK)
+                added++;
+            else
+                decrRefCount(ele);
+        }
+    }
+
+    /* CASE 3 & 4: send the result to the user. */
+    {
+        dictIterator *di;
+        dictEntry *de;
+
+        addReplyMultiBulkLen(c,count);
+        di = dictGetIterator(d);
+        while((de = dictNext(di)) != NULL)
+            addReplyBulk(c,dictGetKey(de));
+        dictReleaseIterator(di);
+        dictRelease(d);
+    }
+}
+
 void srandmemberCommand(redisClient *c) {
     robj *set, *ele;
     int64_t llele;
     int encoding;
 
 void srandmemberCommand(redisClient *c) {
     robj *set, *ele;
     int64_t llele;
     int encoding;
 
+    if (c->argc == 3) {
+        srandmemberWithCountCommand(c);
+        return;
+    } else if (c->argc > 3) {
+        addReply(c,shared.syntaxerr);
+        return;
+    }
+
     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
@@ -382,7 +582,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
             zfree(sets);
             if (dstkey) {
                 if (dbDelete(c->db,dstkey)) {
             zfree(sets);
             if (dstkey) {
                 if (dbDelete(c->db,dstkey)) {
-                    touchWatchedKey(c->db,dstkey);
+                    signalModifiedKey(c->db,dstkey);
                     server.dirty++;
                 }
                 addReply(c,shared.czero);
                     server.dirty++;
                 }
                 addReply(c,shared.czero);
@@ -420,6 +620,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
     si = setTypeInitIterator(sets[0]);
     while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) {
         for (j = 1; j < setnum; j++) {
     si = setTypeInitIterator(sets[0]);
     while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) {
         for (j = 1; j < setnum; j++) {
+            if (sets[j] == sets[0]) continue;
             if (encoding == REDIS_ENCODING_INTSET) {
                 /* intset with intset is simple... and fast */
                 if (sets[j]->encoding == REDIS_ENCODING_INTSET &&
             if (encoding == REDIS_ENCODING_INTSET) {
                 /* intset with intset is simple... and fast */
                 if (sets[j]->encoding == REDIS_ENCODING_INTSET &&
@@ -486,7 +687,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
             decrRefCount(dstset);
             addReply(c,shared.czero);
         }
             decrRefCount(dstset);
             addReply(c,shared.czero);
         }
-        touchWatchedKey(c->db,dstkey);
+        signalModifiedKey(c->db,dstkey);
         server.dirty++;
     } else {
         setDeferredMultiBulkLength(c,replylen,cardinality);
         server.dirty++;
     } else {
         setDeferredMultiBulkLength(c,replylen,cardinality);
@@ -578,7 +779,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
             decrRefCount(dstset);
             addReply(c,shared.czero);
         }
             decrRefCount(dstset);
             addReply(c,shared.czero);
         }
-        touchWatchedKey(c->db,dstkey);
+        signalModifiedKey(c->db,dstkey);
         server.dirty++;
     }
     zfree(sets);
         server.dirty++;
     }
     zfree(sets);