]> git.saurik.com Git - redis.git/commitdiff
Merge remote branch 'pietern/intset-split'
authorantirez <antirez@gmail.com>
Thu, 26 Aug 2010 10:04:24 +0000 (12:04 +0200)
committerantirez <antirez@gmail.com>
Thu, 26 Aug 2010 10:04:24 +0000 (12:04 +0200)
15 files changed:
src/Makefile
src/aof.c
src/config.c
src/debug.c
src/intset.c [new file with mode: 0644]
src/intset.h [new file with mode: 0644]
src/object.c
src/rdb.c
src/redis.c
src/redis.h
src/sort.c
src/t_set.c
tests/support/server.tcl
tests/unit/sort.tcl
tests/unit/type/set.tcl

index 0af70f1741ee47e4475c63d1ab763a4a2faaa8fa..5fe3971ed4171afdbc589a1e57e2d4f927c4ed7d 100644 (file)
@@ -19,7 +19,7 @@ INSTALL_TOP= /usr/local
 INSTALL_BIN= $(INSTALL_TOP)/bin
 INSTALL= cp -p
 
-OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o vm.o pubsub.o multi.o debug.o sort.o
+OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o vm.o pubsub.o multi.o debug.o sort.o intset.o
 BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
 CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o linenoise.o
 CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
@@ -58,6 +58,7 @@ sds.o: sds.c sds.h zmalloc.h
 sha1.o: sha1.c sha1.h
 ziplist.o: ziplist.c zmalloc.h ziplist.h
 zipmap.o: zipmap.c zmalloc.h
+intset.o: intset.c zmalloc.h
 zmalloc.o: zmalloc.c config.h
 
 redis-server: $(OBJ)
index f8b92d2d3301ebee556c02612c0123461b195728..dc806969d5ba5d517e96e38dab67583263bfaf14 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -463,20 +463,30 @@ int rewriteAppendOnlyFile(char *filename) {
                     redisPanic("Unknown list encoding");
                 }
             } else if (o->type == REDIS_SET) {
-                /* Emit the SADDs needed to rebuild the set */
-                dict *set = o->ptr;
-                dictIterator *di = dictGetIterator(set);
-                dictEntry *de;
-
-                while((de = dictNext(di)) != NULL) {
-                    char cmd[]="*3\r\n$4\r\nSADD\r\n";
-                    robj *eleobj = dictGetEntryKey(de);
+                char cmd[]="*3\r\n$4\r\nSADD\r\n";
 
-                    if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                    if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                /* Emit the SADDs needed to rebuild the set */
+                if (o->encoding == REDIS_ENCODING_INTSET) {
+                    int ii = 0;
+                    long long llval;
+                    while(intsetGet(o->ptr,ii++,&llval)) {
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                        if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
+                    }
+                } else if (o->encoding == REDIS_ENCODING_HT) {
+                    dictIterator *di = dictGetIterator(o->ptr);
+                    dictEntry *de;
+                    while((de = dictNext(di)) != NULL) {
+                        robj *eleobj = dictGetEntryKey(de);
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                        if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                    }
+                    dictReleaseIterator(di);
+                } else {
+                    redisPanic("Unknown set encoding");
                 }
-                dictReleaseIterator(di);
             } else if (o->type == REDIS_ZSET) {
                 /* Emit the ZADDs needed to rebuild the sorted set */
                 zset *zs = o->ptr;
index 6d946ee0cbe237e0b485d5fcdd97377f48908ddd..e1b743dbf41ace2c6b41a9b10ef1cd65aac0455b 100644 (file)
@@ -199,6 +199,8 @@ void loadServerConfig(char *filename) {
             server.list_max_ziplist_entries = memtoll(argv[1], NULL);
         } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2){
             server.list_max_ziplist_value = memtoll(argv[1], NULL);
+        } else if (!strcasecmp(argv[0],"set-max-intset-entries") && argc == 2){
+            server.set_max_intset_entries = memtoll(argv[1], NULL);
         } else {
             err = "Bad directive or wrong number of arguments"; goto loaderr;
         }
index ba183d7275414bc1ab70f1de56f39049412e2447..76d18b214964170a6162636f26cd591063f4ff04 100644 (file)
@@ -119,16 +119,13 @@ void computeDatasetDigest(unsigned char *final) {
                 }
                 listTypeReleaseIterator(li);
             } else if (o->type == REDIS_SET) {
-                dict *set = o->ptr;
-                dictIterator *di = dictGetIterator(set);
-                dictEntry *de;
-
-                while((de = dictNext(di)) != NULL) {
-                    robj *eleobj = dictGetEntryKey(de);
-
-                    xorObjectDigest(digest,eleobj);
+                setTypeIterator *si = setTypeInitIterator(o);
+                robj *ele;
+                while((ele = setTypeNext(si)) != NULL) {
+                    xorObjectDigest(digest,ele);
+                    decrRefCount(ele);
                 }
-                dictReleaseIterator(di);
+                setTypeReleaseIterator(si);
             } else if (o->type == REDIS_ZSET) {
                 zset *zs = o->ptr;
                 dictIterator *di = dictGetIterator(zs->dict);
diff --git a/src/intset.c b/src/intset.c
new file mode 100644 (file)
index 0000000..2b082b9
--- /dev/null
@@ -0,0 +1,422 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include "intset.h"
+#include "zmalloc.h"
+
+/* Note that these encodings are ordered, so:
+ * INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64. */
+#define INTSET_ENC_INT16 (sizeof(int16_t))
+#define INTSET_ENC_INT32 (sizeof(int32_t))
+#define INTSET_ENC_INT64 (sizeof(int64_t))
+
+/* Return the required encoding for the provided value. */
+static uint8_t _intsetValueEncoding(int64_t v) {
+    if (v < INT32_MIN || v > INT32_MAX)
+        return INTSET_ENC_INT64;
+    else if (v < INT16_MIN || v > INT16_MAX)
+        return INTSET_ENC_INT32;
+    return INTSET_ENC_INT16;
+}
+
+/* Return the value at pos, given an encoding. */
+static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
+    if (enc == INTSET_ENC_INT64)
+        return ((int64_t*)is->contents)[pos];
+    else if (enc == INTSET_ENC_INT32)
+        return ((int32_t*)is->contents)[pos];
+    return ((int16_t*)is->contents)[pos];
+}
+
+/* Return the value at pos, using the configured encoding. */
+static int64_t _intsetGet(intset *is, int pos) {
+    return _intsetGetEncoded(is,pos,is->encoding);
+}
+
+/* Set the value at pos, using the configured encoding. */
+static void _intsetSet(intset *is, int pos, int64_t value) {
+    if (is->encoding == INTSET_ENC_INT64)
+        ((int64_t*)is->contents)[pos] = value;
+    else if (is->encoding == INTSET_ENC_INT32)
+        ((int32_t*)is->contents)[pos] = value;
+    else
+        ((int16_t*)is->contents)[pos] = value;
+}
+
+/* Create an empty intset. */
+intset *intsetNew(void) {
+    intset *is = zmalloc(sizeof(intset));
+    is->encoding = INTSET_ENC_INT16;
+    is->length = 0;
+    return is;
+}
+
+/* Resize the intset */
+static intset *intsetResize(intset *is, uint32_t len) {
+    uint32_t size = len*is->encoding;
+    is = zrealloc(is,sizeof(intset)+size);
+    return is;
+}
+
+/* Search for the position of "value". Return 1 when the value was found and
+ * sets "pos" to the position of the value within the intset. Return 0 when
+ * the value is not present in the intset and sets "pos" to the position
+ * where "value" can be inserted. */
+static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
+    int min = 0, max = is->length-1, mid = -1;
+    int64_t cur = -1;
+
+    /* The value can never be found when the set is empty */
+    if (is->length == 0) {
+        if (pos) *pos = 0;
+        return 0;
+    } else {
+        /* Check for the case where we know we cannot find the value,
+         * but do know the insert position. */
+        if (value > _intsetGet(is,is->length-1)) {
+            if (pos) *pos = is->length;
+            return 0;
+        } else if (value < _intsetGet(is,0)) {
+            if (pos) *pos = 0;
+            return 0;
+        }
+    }
+
+    while(max >= min) {
+        mid = (min+max)/2;
+        cur = _intsetGet(is,mid);
+        if (value > cur) {
+            min = mid+1;
+        } else if (value < cur) {
+            max = mid-1;
+        } else {
+            break;
+        }
+    }
+
+    if (value == cur) {
+        if (pos) *pos = mid;
+        return 1;
+    } else {
+        if (pos) *pos = min;
+        return 0;
+    }
+}
+
+/* Upgrades the intset to a larger encoding and inserts the given integer. */
+static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
+    uint8_t curenc = is->encoding;
+    uint8_t newenc = _intsetValueEncoding(value);
+    int length = is->length;
+    int prepend = value < 0 ? 1 : 0;
+
+    /* First set new encoding and resize */
+    is->encoding = newenc;
+    is = intsetResize(is,is->length+1);
+
+    /* Upgrade back-to-front so we don't overwrite values.
+     * Note that the "prepend" variable is used to make sure we have an empty
+     * space at either the beginning or the end of the intset. */
+    while(length--)
+        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
+
+    /* Set the value at the beginning or the end. */
+    if (prepend)
+        _intsetSet(is,0,value);
+    else
+        _intsetSet(is,is->length,value);
+    is->length++;
+    return is;
+}
+
+static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
+    void *src, *dst;
+    uint32_t bytes = is->length-from;
+    if (is->encoding == INTSET_ENC_INT64) {
+        src = (int64_t*)is->contents+from;
+        dst = (int64_t*)is->contents+to;
+        bytes *= sizeof(int64_t);
+    } else if (is->encoding == INTSET_ENC_INT32) {
+        src = (int32_t*)is->contents+from;
+        dst = (int32_t*)is->contents+to;
+        bytes *= sizeof(int32_t);
+    } else {
+        src = (int16_t*)is->contents+from;
+        dst = (int16_t*)is->contents+to;
+        bytes *= sizeof(int16_t);
+    }
+    memmove(dst,src,bytes);
+}
+
+/* Insert an integer in the intset */
+intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
+    uint8_t valenc = _intsetValueEncoding(value);
+    uint32_t pos, offset;
+    if (success) *success = 1;
+
+    /* Upgrade encoding if necessary. If we need to upgrade, we know that
+     * this value should be either appended (if > 0) or prepended (if < 0),
+     * because it lies outside the range of existing values. */
+    if (valenc > is->encoding) {
+        /* This always succeeds, so we don't need to curry *success. */
+        return intsetUpgradeAndAdd(is,value);
+    } else {
+        /* Abort if the value is already present in the set.
+         * This call will populate "pos" with the right position to insert
+         * the value when it cannot be found. */
+        if (intsetSearch(is,value,&pos)) {
+            if (success) *success = 0;
+            return is;
+        }
+
+        is = intsetResize(is,is->length+1);
+        if (pos < is->length) intsetMoveTail(is,pos,pos+1);
+    }
+
+    _intsetSet(is,pos,value);
+    is->length++;
+    return is;
+}
+
+/* Delete integer from intset */
+intset *intsetRemove(intset *is, int64_t value, uint8_t *success) {
+    uint8_t valenc = _intsetValueEncoding(value);
+    uint32_t pos;
+    if (success) *success = 0;
+
+    if (valenc <= is->encoding && intsetSearch(is,value,&pos)) {
+        /* We know we can delete */
+        if (success) *success = 1;
+
+        /* Overwrite value with tail and update length */
+        if (pos < (is->length-1)) intsetMoveTail(is,pos+1,pos);
+        is = intsetResize(is,is->length-1);
+        is->length--;
+    }
+    return is;
+}
+
+/* Determine whether a value belongs to this set */
+uint8_t intsetFind(intset *is, int64_t value) {
+    uint8_t valenc = _intsetValueEncoding(value);
+    return valenc <= is->encoding && intsetSearch(is,value,NULL);
+}
+
+/* Return random member */
+int64_t intsetRandom(intset *is) {
+    return _intsetGet(is,rand()%is->length);
+}
+
+/* Sets the value to the value at the given position. When this position is
+ * out of range the function returns 0, when in range it returns 1. */
+uint8_t intsetGet(intset *is, uint32_t pos, int64_t *value) {
+    if (pos < is->length) {
+        *value = _intsetGet(is,pos);
+        return 1;
+    }
+    return 0;
+}
+
+/* Return intset length */
+uint32_t intsetLen(intset *is) {
+    return is->length;
+}
+
+#ifdef INTSET_TEST_MAIN
+#include <sys/time.h>
+
+void intsetRepr(intset *is) {
+    int i;
+    for (i = 0; i < is->length; i++) {
+        printf("%lld\n", (uint64_t)_intsetGet(is,i));
+    }
+    printf("\n");
+}
+
+void error(char *err) {
+    printf("%s\n", err);
+    exit(1);
+}
+
+void ok(void) {
+    printf("OK\n");
+}
+
+long long usec(void) {
+    struct timeval tv;
+    gettimeofday(&tv,NULL);
+    return (((long long)tv.tv_sec)*1000000)+tv.tv_usec;
+}
+
+#define assert(_e) ((_e)?(void)0:(_assert(#_e,__FILE__,__LINE__),exit(1)))
+void _assert(char *estr, char *file, int line) {
+    printf("\n\n=== ASSERTION FAILED ===\n");
+    printf("==> %s:%d '%s' is not true\n",file,line,estr);
+}
+
+intset *createSet(int bits, int size) {
+    uint64_t mask = (1<<bits)-1;
+    uint64_t i, value;
+    intset *is = intsetNew();
+
+    for (i = 0; i < size; i++) {
+        if (bits > 32) {
+            value = (rand()*rand()) & mask;
+        } else {
+            value = rand() & mask;
+        }
+        is = intsetAdd(is,value,NULL);
+    }
+    return is;
+}
+
+void checkConsistency(intset *is) {
+    int i;
+
+    for (i = 0; i < (is->length-1); i++) {
+        if (is->encoding == INTSET_ENC_INT16) {
+            int16_t *i16 = (int16_t*)is->contents;
+            assert(i16[i] < i16[i+1]);
+        } else if (is->encoding == INTSET_ENC_INT32) {
+            int32_t *i32 = (int32_t*)is->contents;
+            assert(i32[i] < i32[i+1]);
+        } else {
+            int64_t *i64 = (int64_t*)is->contents;
+            assert(i64[i] < i64[i+1]);
+        }
+    }
+}
+
+int main(int argc, char **argv) {
+    uint8_t success;
+    int i;
+    intset *is;
+    sranddev();
+
+    printf("Value encodings: "); {
+        assert(_intsetValueEncoding(-32768) == INTSET_ENC_INT16);
+        assert(_intsetValueEncoding(+32767) == INTSET_ENC_INT16);
+        assert(_intsetValueEncoding(-32769) == INTSET_ENC_INT32);
+        assert(_intsetValueEncoding(+32768) == INTSET_ENC_INT32);
+        assert(_intsetValueEncoding(-2147483648) == INTSET_ENC_INT32);
+        assert(_intsetValueEncoding(+2147483647) == INTSET_ENC_INT32);
+        assert(_intsetValueEncoding(-2147483649) == INTSET_ENC_INT64);
+        assert(_intsetValueEncoding(+2147483648) == INTSET_ENC_INT64);
+        assert(_intsetValueEncoding(-9223372036854775808ull) == INTSET_ENC_INT64);
+        assert(_intsetValueEncoding(+9223372036854775807ull) == INTSET_ENC_INT64);
+        ok();
+    }
+
+    printf("Basic adding: "); {
+        is = intsetNew();
+        is = intsetAdd(is,5,&success); assert(success);
+        is = intsetAdd(is,6,&success); assert(success);
+        is = intsetAdd(is,4,&success); assert(success);
+        is = intsetAdd(is,4,&success); assert(!success);
+        ok();
+    }
+
+    printf("Large number of random adds: "); {
+        int inserts = 0;
+        is = intsetNew();
+        for (i = 0; i < 1024; i++) {
+            is = intsetAdd(is,rand()%0x800,&success);
+            if (success) inserts++;
+        }
+        assert(is->length == inserts);
+        checkConsistency(is);
+        ok();
+    }
+
+    printf("Upgrade from int16 to int32: "); {
+        is = intsetNew();
+        is = intsetAdd(is,32,NULL);
+        assert(is->encoding == INTSET_ENC_INT16);
+        is = intsetAdd(is,65535,NULL);
+        assert(is->encoding == INTSET_ENC_INT32);
+        assert(intsetFind(is,32));
+        assert(intsetFind(is,65535));
+        checkConsistency(is);
+
+        is = intsetNew();
+        is = intsetAdd(is,32,NULL);
+        assert(is->encoding == INTSET_ENC_INT16);
+        is = intsetAdd(is,-65535,NULL);
+        assert(is->encoding == INTSET_ENC_INT32);
+        assert(intsetFind(is,32));
+        assert(intsetFind(is,-65535));
+        checkConsistency(is);
+        ok();
+    }
+
+    printf("Upgrade from int16 to int64: "); {
+        is = intsetNew();
+        is = intsetAdd(is,32,NULL);
+        assert(is->encoding == INTSET_ENC_INT16);
+        is = intsetAdd(is,4294967295,NULL);
+        assert(is->encoding == INTSET_ENC_INT64);
+        assert(intsetFind(is,32));
+        assert(intsetFind(is,4294967295));
+        checkConsistency(is);
+
+        is = intsetNew();
+        is = intsetAdd(is,32,NULL);
+        assert(is->encoding == INTSET_ENC_INT16);
+        is = intsetAdd(is,-4294967295,NULL);
+        assert(is->encoding == INTSET_ENC_INT64);
+        assert(intsetFind(is,32));
+        assert(intsetFind(is,-4294967295));
+        checkConsistency(is);
+        ok();
+    }
+
+    printf("Upgrade from int32 to int64: "); {
+        is = intsetNew();
+        is = intsetAdd(is,65535,NULL);
+        assert(is->encoding == INTSET_ENC_INT32);
+        is = intsetAdd(is,4294967295,NULL);
+        assert(is->encoding == INTSET_ENC_INT64);
+        assert(intsetFind(is,65535));
+        assert(intsetFind(is,4294967295));
+        checkConsistency(is);
+
+        is = intsetNew();
+        is = intsetAdd(is,65535,NULL);
+        assert(is->encoding == INTSET_ENC_INT32);
+        is = intsetAdd(is,-4294967295,NULL);
+        assert(is->encoding == INTSET_ENC_INT64);
+        assert(intsetFind(is,65535));
+        assert(intsetFind(is,-4294967295));
+        checkConsistency(is);
+        ok();
+    }
+
+    printf("Stress lookups: "); {
+        long num = 100000, size = 10000;
+        int i, bits = 20;
+        long long start;
+        is = createSet(bits,size);
+        checkConsistency(is);
+
+        start = usec();
+        for (i = 0; i < num; i++) intsetSearch(is,rand() % ((1<<bits)-1),NULL);
+        printf("%ld lookups, %ld element set, %lldusec\n",num,size,usec()-start);
+    }
+
+    printf("Stress add+delete: "); {
+        int i, v1, v2;
+        is = intsetNew();
+        for (i = 0; i < 0xffff; i++) {
+            v1 = rand() % 0xfff;
+            is = intsetAdd(is,v1,NULL);
+            assert(intsetFind(is,v1));
+
+            v2 = rand() % 0xfff;
+            is = intsetRemove(is,v2,NULL);
+            assert(!intsetFind(is,v2));
+        }
+        checkConsistency(is);
+        ok();
+    }
+}
+#endif
diff --git a/src/intset.h b/src/intset.h
new file mode 100644 (file)
index 0000000..25afc18
--- /dev/null
@@ -0,0 +1,19 @@
+#ifndef __INTSET_H
+#define __INTSET_H
+#include <stdint.h>
+
+typedef struct intset {
+    uint32_t encoding;
+    uint32_t length;
+    int8_t contents[];
+} intset;
+
+intset *intsetNew(void);
+intset *intsetAdd(intset *is, int64_t value, uint8_t *success);
+intset *intsetRemove(intset *is, int64_t value, uint8_t *success);
+uint8_t intsetFind(intset *is, int64_t value);
+int64_t intsetRandom(intset *is);
+uint8_t intsetGet(intset *is, uint32_t pos, int64_t *value);
+uint32_t intsetLen(intset *is);
+
+#endif // __INTSET_H
index 429ac0ecc6cd7eea9cabd7b987701dd9153278a0..dae7f97b21924fdda844e4e07a1b276cba0d7350 100644 (file)
@@ -74,7 +74,16 @@ robj *createZiplistObject(void) {
 
 robj *createSetObject(void) {
     dict *d = dictCreate(&setDictType,NULL);
-    return createObject(REDIS_SET,d);
+    robj *o = createObject(REDIS_SET,d);
+    o->encoding = REDIS_ENCODING_HT;
+    return o;
+}
+
+robj *createIntsetObject(void) {
+    intset *is = intsetNew();
+    robj *o = createObject(REDIS_SET,is);
+    o->encoding = REDIS_ENCODING_INTSET;
+    return o;
 }
 
 robj *createHashObject(void) {
@@ -115,7 +124,16 @@ void freeListObject(robj *o) {
 }
 
 void freeSetObject(robj *o) {
-    dictRelease((dict*) o->ptr);
+    switch (o->encoding) {
+    case REDIS_ENCODING_HT:
+        dictRelease((dict*) o->ptr);
+        break;
+    case REDIS_ENCODING_INTSET:
+        zfree(o->ptr);
+        break;
+    default:
+        redisPanic("Unknown set encoding type");
+    }
 }
 
 void freeZsetObject(robj *o) {
@@ -357,6 +375,7 @@ int getLongLongFromObject(robj *o, long long *target) {
         redisAssert(o->type == REDIS_STRING);
         if (o->encoding == REDIS_ENCODING_RAW) {
             value = strtoll(o->ptr, &eptr, 10);
+            if (errno == ERANGE) return REDIS_ERR;
             if (eptr[0] != '\0') return REDIS_ERR;
             if (errno == ERANGE && (value == LLONG_MIN || value == LLONG_MAX))
                 return REDIS_ERR;
@@ -367,7 +386,7 @@ int getLongLongFromObject(robj *o, long long *target) {
         }
     }
 
-    *target = value;
+    if (target) *target = value;
     return REDIS_OK;
 }
 
@@ -411,6 +430,7 @@ char *strEncoding(int encoding) {
     case REDIS_ENCODING_ZIPMAP: return "zipmap";
     case REDIS_ENCODING_LINKEDLIST: return "linkedlist";
     case REDIS_ENCODING_ZIPLIST: return "ziplist";
+    case REDIS_ENCODING_INTSET: return "intset";
     default: return "unknown";
     }
 }
index 509c70c307f378d417dedc4032ba4c59e9edcb7e..019aa9a00da0a0cf4c6b8031f61b38904b500fd0 100644 (file)
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -260,17 +260,29 @@ int rdbSaveObject(FILE *fp, robj *o) {
         }
     } else if (o->type == REDIS_SET) {
         /* Save a set value */
-        dict *set = o->ptr;
-        dictIterator *di = dictGetIterator(set);
-        dictEntry *de;
-
-        if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
-        while((de = dictNext(di)) != NULL) {
-            robj *eleobj = dictGetEntryKey(de);
+        if (o->encoding == REDIS_ENCODING_HT) {
+            dict *set = o->ptr;
+            dictIterator *di = dictGetIterator(set);
+            dictEntry *de;
 
-            if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
+            if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
+            while((de = dictNext(di)) != NULL) {
+                robj *eleobj = dictGetEntryKey(de);
+                if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
+            }
+            dictReleaseIterator(di);
+        } else if (o->encoding == REDIS_ENCODING_INTSET) {
+            intset *is = o->ptr;
+            long long llval;
+            int i = 0;
+
+            if (rdbSaveLen(fp,intsetLen(is)) == -1) return -1;
+            while(intsetGet(is,i++,&llval)) {
+                if (rdbSaveLongLongAsStringObject(fp,llval) == -1) return -1;
+            }
+        } else {
+            redisPanic("Unknown set encoding");
         }
-        dictReleaseIterator(di);
     } else if (o->type == REDIS_ZSET) {
         /* Save a set value */
         zset *zs = o->ptr;
@@ -628,6 +640,7 @@ int rdbLoadDoubleValue(FILE *fp, double *val) {
 robj *rdbLoadObject(int type, FILE *fp) {
     robj *o, *ele, *dec;
     size_t len;
+    unsigned int i;
 
     redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp));
     if (type == REDIS_STRING) {
@@ -669,16 +682,39 @@ robj *rdbLoadObject(int type, FILE *fp) {
     } else if (type == REDIS_SET) {
         /* Read list/set value */
         if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
-        o = createSetObject();
-        /* It's faster to expand the dict to the right size asap in order
-         * to avoid rehashing */
-        if (len > DICT_HT_INITIAL_SIZE)
-            dictExpand(o->ptr,len);
+
+        /* Use a regular set when there are too many entries. */
+        if (len > server.set_max_intset_entries) {
+            o = createSetObject();
+            /* It's faster to expand the dict to the right size asap in order
+             * to avoid rehashing */
+            if (len > DICT_HT_INITIAL_SIZE)
+                dictExpand(o->ptr,len);
+        } else {
+            o = createIntsetObject();
+        }
+
         /* Load every single element of the list/set */
-        while(len--) {
+        for (i = 0; i < len; i++) {
+            long long llval;
             if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
             ele = tryObjectEncoding(ele);
-            dictAdd((dict*)o->ptr,ele,NULL);
+
+            if (o->encoding == REDIS_ENCODING_INTSET) {
+                /* Fetch integer value from element */
+                if (getLongLongFromObject(ele,&llval) == REDIS_OK) {
+                    o->ptr = intsetAdd(o->ptr,llval,NULL);
+                } else {
+                    setTypeConvert(o,REDIS_ENCODING_HT);
+                    dictExpand(o->ptr,len);
+                }
+            }
+
+            /* This will also be called when the set was just converted
+             * to regular hashtable encoded set */
+            if (o->encoding == REDIS_ENCODING_HT) {
+                dictAdd((dict*)o->ptr,ele,NULL);
+            }
         }
     } else if (type == REDIS_ZSET) {
         /* Read list/set value */
index 0ee7a20ba6bbf9ec1e563b5dc729aaf2f6291424..9fbd52f2e19ca1b557fc74912449df67a1ffcf7a 100644 (file)
@@ -743,6 +743,7 @@ void initServerConfig() {
     server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
     server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
     server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
+    server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
     server.shutdown_asap = 0;
 
     resetServerSaveParams();
index c35fe53a2caffe39479ba17db2edd28a20fea0b8..6156a6ca74187dbb1ee9460f4cf941594e0c48f9 100644 (file)
@@ -26,6 +26,7 @@
 #include "anet.h"   /* Networking the easy way */
 #include "zipmap.h" /* Compact string -> string data structure */
 #include "ziplist.h" /* Compact list data structure */
+#include "intset.h" /* Compact integer set structure */
 #include "version.h"
 
 /* Error codes */
@@ -82,6 +83,7 @@
 #define REDIS_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
 #define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */
 #define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
+#define REDIS_ENCODING_INTSET 6  /* Encoded as intset */
 
 /* Object types only used for dumping to disk */
 #define REDIS_EXPIRETIME 253
 #define REDIS_HASH_MAX_ZIPMAP_VALUE 512
 #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024
 #define REDIS_LIST_MAX_ZIPLIST_VALUE 32
+#define REDIS_SET_MAX_INTSET_ENTRIES 4096
 
 /* Sets operations codes */
 #define REDIS_OP_UNION 0
@@ -398,6 +401,7 @@ struct redisServer {
     size_t hash_max_zipmap_value;
     size_t list_max_ziplist_entries;
     size_t list_max_ziplist_value;
+    size_t set_max_intset_entries;
     /* Virtual memory state */
     FILE *vm_fp;
     int vm_fd;
@@ -535,6 +539,14 @@ typedef struct {
     listNode *ln;       /* Entry in linked list */
 } listTypeEntry;
 
+/* Structure to hold set iteration abstraction. */
+typedef struct {
+    robj *subject;
+    int encoding;
+    int ii; /* intset iterator */
+    dictIterator *di;
+} setTypeIterator;
+
 /* Structure to hold hash iteration abstration. Note that iteration over
  * hashes involves both fields and values. Because it is possible that
  * not both are required, store pointers in the iterator to avoid
@@ -633,6 +645,7 @@ robj *createStringObjectFromLongLong(long long value);
 robj *createListObject(void);
 robj *createZiplistObject(void);
 robj *createSetObject(void);
+robj *createIntsetObject(void);
 robj *createHashObject(void);
 robj *createZsetObject(void);
 int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg);
@@ -716,6 +729,18 @@ int dontWaitForSwappedKey(redisClient *c, robj *key);
 void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
 vmpointer *vmSwapObjectBlocking(robj *val);
 
+/* Set data type */
+robj *setTypeCreate(robj *value);
+int setTypeAdd(robj *subject, robj *value);
+int setTypeRemove(robj *subject, robj *value);
+int setTypeIsMember(robj *subject, robj *value);
+setTypeIterator *setTypeInitIterator(robj *subject);
+void setTypeReleaseIterator(setTypeIterator *si);
+robj *setTypeNext(setTypeIterator *si);
+robj *setTypeRandomElement(robj *subject);
+unsigned long setTypeSize(robj *subject);
+void setTypeConvert(robj *subject, int enc);
+
 /* Hash data type */
 void convertToRealHash(robj *o);
 void hashTypeTryConversion(robj *subject, robj **argv, int start, int end);
index 4295a6ecca4da627725697700c1b79a93636db79..aa1ce929399dc64bfdbcccda95f1db3ae1b2c9c6 100644 (file)
@@ -202,7 +202,7 @@ void sortCommand(redisClient *c) {
     /* Load the sorting vector with all the objects to sort */
     switch(sortval->type) {
     case REDIS_LIST: vectorlen = listTypeLength(sortval); break;
-    case REDIS_SET: vectorlen =  dictSize((dict*)sortval->ptr); break;
+    case REDIS_SET: vectorlen =  setTypeSize(sortval); break;
     case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
     default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */
     }
@@ -219,18 +219,20 @@ void sortCommand(redisClient *c) {
             j++;
         }
         listTypeReleaseIterator(li);
-    } else {
-        dict *set;
+    } else if (sortval->type == REDIS_SET) {
+        setTypeIterator *si = setTypeInitIterator(sortval);
+        robj *ele;
+        while((ele = setTypeNext(si)) != NULL) {
+            vector[j].obj = ele;
+            vector[j].u.score = 0;
+            vector[j].u.cmpobj = NULL;
+            j++;
+        }
+        setTypeReleaseIterator(si);
+    } else if (sortval->type == REDIS_ZSET) {
+        dict *set = ((zset*)sortval->ptr)->dict;
         dictIterator *di;
         dictEntry *setele;
-
-        if (sortval->type == REDIS_SET) {
-            set = sortval->ptr;
-        } else {
-            zset *zs = sortval->ptr;
-            set = zs->dict;
-        }
-
         di = dictGetIterator(set);
         while((setele = dictNext(di)) != NULL) {
             vector[j].obj = dictGetEntryKey(setele);
@@ -239,6 +241,8 @@ void sortCommand(redisClient *c) {
             j++;
         }
         dictReleaseIterator(di);
+    } else {
+        redisPanic("Unknown type");
     }
     redisAssert(j == vectorlen);
 
@@ -369,7 +373,7 @@ void sortCommand(redisClient *c) {
     }
 
     /* Cleanup */
-    if (sortval->type == REDIS_LIST)
+    if (sortval->type == REDIS_LIST || sortval->type == REDIS_SET)
         for (j = 0; j < vectorlen; j++)
             decrRefCount(vector[j].obj);
     decrRefCount(sortval);
index 94b97633e83f0e1cfaeac0cb39d27beaba6ed270..01c851ba9ffe2c6d239ede95bf01a94509748d12 100644 (file)
  * Set Commands
  *----------------------------------------------------------------------------*/
 
+/* Factory method to return a set that *can* hold "value". When the object has
+ * an integer-encodable value, an intset will be returned. Otherwise a regular
+ * hash table. */
+robj *setTypeCreate(robj *value) {
+    if (getLongLongFromObject(value,NULL) == REDIS_OK)
+        return createIntsetObject();
+    return createSetObject();
+}
+
+int setTypeAdd(robj *subject, robj *value) {
+    long long llval;
+    if (subject->encoding == REDIS_ENCODING_HT) {
+        if (dictAdd(subject->ptr,value,NULL) == DICT_OK) {
+            incrRefCount(value);
+            return 1;
+        }
+    } else if (subject->encoding == REDIS_ENCODING_INTSET) {
+        if (getLongLongFromObject(value,&llval) == REDIS_OK) {
+            uint8_t success = 0;
+            subject->ptr = intsetAdd(subject->ptr,llval,&success);
+            if (success) {
+                /* Convert to regular set when the intset contains
+                 * too many entries. */
+                if (intsetLen(subject->ptr) > server.set_max_intset_entries)
+                    setTypeConvert(subject,REDIS_ENCODING_HT);
+                return 1;
+            }
+        } else {
+            /* Failed to get integer from object, convert to regular set. */
+            setTypeConvert(subject,REDIS_ENCODING_HT);
+
+            /* The set *was* an intset and this value is not integer
+             * encodable, so dictAdd should always work. */
+            redisAssert(dictAdd(subject->ptr,value,NULL) == DICT_OK);
+            incrRefCount(value);
+            return 1;
+        }
+    } else {
+        redisPanic("Unknown set encoding");
+    }
+    return 0;
+}
+
+int setTypeRemove(robj *subject, robj *value) {
+    long long llval;
+    if (subject->encoding == REDIS_ENCODING_HT) {
+        if (dictDelete(subject->ptr,value) == DICT_OK) {
+            if (htNeedsResize(subject->ptr)) dictResize(subject->ptr);
+            return 1;
+        }
+    } else if (subject->encoding == REDIS_ENCODING_INTSET) {
+        if (getLongLongFromObject(value,&llval) == REDIS_OK) {
+            uint8_t success;
+            subject->ptr = intsetRemove(subject->ptr,llval,&success);
+            if (success) return 1;
+        }
+    } else {
+        redisPanic("Unknown set encoding");
+    }
+    return 0;
+}
+
+int setTypeIsMember(robj *subject, robj *value) {
+    long long llval;
+    if (subject->encoding == REDIS_ENCODING_HT) {
+        return dictFind((dict*)subject->ptr,value) != NULL;
+    } else if (subject->encoding == REDIS_ENCODING_INTSET) {
+        if (getLongLongFromObject(value,&llval) == REDIS_OK) {
+            return intsetFind((intset*)subject->ptr,llval);
+        }
+    } else {
+        redisPanic("Unknown set encoding");
+    }
+    return 0;
+}
+
+setTypeIterator *setTypeInitIterator(robj *subject) {
+    setTypeIterator *si = zmalloc(sizeof(setIterator));
+    si->subject = subject;
+    si->encoding = subject->encoding;
+    if (si->encoding == REDIS_ENCODING_HT) {
+        si->di = dictGetIterator(subject->ptr);
+    } else if (si->encoding == REDIS_ENCODING_INTSET) {
+        si->ii = 0;
+    } else {
+        redisPanic("Unknown set encoding");
+    }
+    return si;
+}
+
+void setTypeReleaseIterator(setTypeIterator *si) {
+    if (si->encoding == REDIS_ENCODING_HT)
+        dictReleaseIterator(si->di);
+    zfree(si);
+}
+
+/* Move to the next entry in the set. Returns the object at the current
+ * position, or NULL when the end is reached. This object will have its
+ * refcount incremented, so the caller needs to take care of this. */
+robj *setTypeNext(setTypeIterator *si) {
+    robj *ret = NULL;
+    if (si->encoding == REDIS_ENCODING_HT) {
+        dictEntry *de = dictNext(si->di);
+        if (de != NULL) {
+            ret = dictGetEntryKey(de);
+            incrRefCount(ret);
+        }
+    } else if (si->encoding == REDIS_ENCODING_INTSET) {
+        long long llval;
+        if (intsetGet(si->subject->ptr,si->ii++,&llval))
+            ret = createStringObjectFromLongLong(llval);
+    }
+    return ret;
+}
+
+
+/* Return random element from set. The returned object will always have
+ * an incremented refcount. */
+robj *setTypeRandomElement(robj *subject) {
+    robj *ret = NULL;
+    if (subject->encoding == REDIS_ENCODING_HT) {
+        dictEntry *de = dictGetRandomKey(subject->ptr);
+        ret = dictGetEntryKey(de);
+        incrRefCount(ret);
+    } else if (subject->encoding == REDIS_ENCODING_INTSET) {
+        long long llval = intsetRandom(subject->ptr);
+        ret = createStringObjectFromLongLong(llval);
+    } else {
+        redisPanic("Unknown set encoding");
+    }
+    return ret;
+}
+
+unsigned long setTypeSize(robj *subject) {
+    if (subject->encoding == REDIS_ENCODING_HT) {
+        return dictSize((dict*)subject->ptr);
+    } else if (subject->encoding == REDIS_ENCODING_INTSET) {
+        return intsetLen((intset*)subject->ptr);
+    } else {
+        redisPanic("Unknown set encoding");
+    }
+}
+
+/* Convert the set to specified encoding. The resulting dict (when converting
+ * to a hashtable) is presized to hold the number of elements in the original
+ * set. */
+void setTypeConvert(robj *subject, int enc) {
+    setTypeIterator *si;
+    robj *element;
+    redisAssert(subject->type == REDIS_SET);
+
+    if (enc == REDIS_ENCODING_HT) {
+        dict *d = dictCreate(&setDictType,NULL);
+        /* Presize the dict to avoid rehashing */
+        dictExpand(d,intsetLen(subject->ptr));
+
+        /* setTypeGet returns a robj with incremented refcount */
+        si = setTypeInitIterator(subject);
+        while ((element = setTypeNext(si)) != NULL)
+            redisAssert(dictAdd(d,element,NULL) == DICT_OK);
+        setTypeReleaseIterator(si);
+
+        subject->encoding = REDIS_ENCODING_HT;
+        zfree(subject->ptr);
+        subject->ptr = d;
+    } else {
+        redisPanic("Unsupported set conversion");
+    }
+}
+
 void saddCommand(redisClient *c) {
     robj *set;
 
     set = lookupKeyWrite(c->db,c->argv[1]);
     if (set == NULL) {
-        set = createSetObject();
+        set = setTypeCreate(c->argv[2]);
         dbAdd(c->db,c->argv[1],set);
     } else {
         if (set->type != REDIS_SET) {
@@ -17,8 +187,7 @@ void saddCommand(redisClient *c) {
             return;
         }
     }
-    if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
-        incrRefCount(c->argv[2]);
+    if (setTypeAdd(set,c->argv[2])) {
         touchWatchedKey(c->db,c->argv[1]);
         server.dirty++;
         addReply(c,shared.cone);
@@ -33,11 +202,10 @@ void sremCommand(redisClient *c) {
     if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
-    if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
-        server.dirty++;
+    if (setTypeRemove(set,c->argv[2])) {
+        if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
         touchWatchedKey(c->db,c->argv[1]);
-        if (htNeedsResize(set->ptr)) dictResize(set->ptr);
-        if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]);
+        server.dirty++;
         addReply(c,shared.cone);
     } else {
         addReply(c,shared.czero);
@@ -45,40 +213,48 @@ void sremCommand(redisClient *c) {
 }
 
 void smoveCommand(redisClient *c) {
-    robj *srcset, *dstset;
-
+    robj *srcset, *dstset, *ele;
     srcset = lookupKeyWrite(c->db,c->argv[1]);
     dstset = lookupKeyWrite(c->db,c->argv[2]);
+    ele = c->argv[3];
 
-    /* If the source key does not exist return 0, if it's of the wrong type
-     * raise an error */
-    if (srcset == NULL || srcset->type != REDIS_SET) {
-        addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
+    /* If the source key does not exist return 0 */
+    if (srcset == NULL) {
+        addReply(c,shared.czero);
         return;
     }
-    /* Error if the destination key is not a set as well */
-    if (dstset && dstset->type != REDIS_SET) {
-        addReply(c,shared.wrongtypeerr);
+
+    /* If the source key has the wrong type, or the destination key
+     * is set and has the wrong type, return with an error. */
+    if (checkType(c,srcset,REDIS_SET) ||
+        (dstset && checkType(c,dstset,REDIS_SET))) return;
+
+    /* If srcset and dstset are equal, SMOVE is a no-op */
+    if (srcset == dstset) {
+        addReply(c,shared.cone);
         return;
     }
-    /* Remove the element from the source set */
-    if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
-        /* Key not found in the src set! return zero */
+
+    /* If the element cannot be removed from the src set, return 0. */
+    if (!setTypeRemove(srcset,ele)) {
         addReply(c,shared.czero);
         return;
     }
-    if (dictSize((dict*)srcset->ptr) == 0 && srcset != dstset)
-        dbDelete(c->db,c->argv[1]);
+
+    /* Remove the src set from the database when empty */
+    if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]);
     touchWatchedKey(c->db,c->argv[1]);
     touchWatchedKey(c->db,c->argv[2]);
     server.dirty++;
-    /* Add the element to the destination set */
+
+    /* Create the destination set when it doesn't exist */
     if (!dstset) {
-        dstset = createSetObject();
+        dstset = setTypeCreate(ele);
         dbAdd(c->db,c->argv[2],dstset);
     }
-    if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
-        incrRefCount(c->argv[3]);
+
+    /* An extra key has changed when ele was successfully added to dstset */
+    if (setTypeAdd(dstset,ele)) server.dirty++;
     addReply(c,shared.cone);
 }
 
@@ -88,7 +264,7 @@ void sismemberCommand(redisClient *c) {
     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
-    if (dictFind(set->ptr,c->argv[2]))
+    if (setTypeIsMember(set,c->argv[2]))
         addReply(c,shared.cone);
     else
         addReply(c,shared.czero);
@@ -96,75 +272,63 @@ void sismemberCommand(redisClient *c) {
 
 void scardCommand(redisClient *c) {
     robj *o;
-    dict *s;
 
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
         checkType(c,o,REDIS_SET)) return;
 
-    s = o->ptr;
-    addReplyUlong(c,dictSize(s));
+    addReplyUlong(c,setTypeSize(o));
 }
 
 void spopCommand(redisClient *c) {
-    robj *set;
-    dictEntry *de;
+    robj *set, *ele;
 
     if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
-    de = dictGetRandomKey(set->ptr);
-    if (de == NULL) {
+    ele = setTypeRandomElement(set);
+    if (ele == NULL) {
         addReply(c,shared.nullbulk);
     } else {
-        robj *ele = dictGetEntryKey(de);
-
+        setTypeRemove(set,ele);
         addReplyBulk(c,ele);
-        dictDelete(set->ptr,ele);
-        if (htNeedsResize(set->ptr)) dictResize(set->ptr);
-        if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]);
+        decrRefCount(ele);
+        if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
         touchWatchedKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
 
 void srandmemberCommand(redisClient *c) {
-    robj *set;
-    dictEntry *de;
+    robj *set, *ele;
 
     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
-    de = dictGetRandomKey(set->ptr);
-    if (de == NULL) {
+    ele = setTypeRandomElement(set);
+    if (ele == NULL) {
         addReply(c,shared.nullbulk);
     } else {
-        robj *ele = dictGetEntryKey(de);
-
         addReplyBulk(c,ele);
+        decrRefCount(ele);
     }
 }
 
 int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
-    dict **d1 = (void*) s1, **d2 = (void*) s2;
-
-    return dictSize(*d1)-dictSize(*d2);
+    return setTypeSize(*(robj**)s1)-setTypeSize(*(robj**)s2);
 }
 
-void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
-    dict **dv = zmalloc(sizeof(dict*)*setsnum);
-    dictIterator *di;
-    dictEntry *de;
-    robj *lenobj = NULL, *dstset = NULL;
+void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
+    robj **sets = zmalloc(sizeof(robj*)*setnum);
+    setTypeIterator *si;
+    robj *ele, *lenobj = NULL, *dstset = NULL;
     unsigned long j, cardinality = 0;
 
-    for (j = 0; j < setsnum; j++) {
-        robj *setobj;
-
-        setobj = dstkey ?
-                    lookupKeyWrite(c->db,setskeys[j]) :
-                    lookupKeyRead(c->db,setskeys[j]);
+    for (j = 0; j < setnum; j++) {
+        robj *setobj = dstkey ?
+            lookupKeyWrite(c->db,setkeys[j]) :
+            lookupKeyRead(c->db,setkeys[j]);
         if (!setobj) {
-            zfree(dv);
+            zfree(sets);
             if (dstkey) {
                 if (dbDelete(c->db,dstkey)) {
                     touchWatchedKey(c->db,dstkey);
@@ -176,16 +340,15 @@ void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum
             }
             return;
         }
-        if (setobj->type != REDIS_SET) {
-            zfree(dv);
-            addReply(c,shared.wrongtypeerr);
+        if (checkType(c,setobj,REDIS_SET)) {
+            zfree(sets);
             return;
         }
-        dv[j] = setobj->ptr;
+        sets[j] = setobj;
     }
     /* Sort sets from the smallest to largest, this will improve our
      * algorithm's performace */
-    qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
+    qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);
 
     /* The first thing we should output is the total number of elements...
      * since this is a multi-bulk write, but at this stage we don't know
@@ -199,39 +362,37 @@ void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum
     } else {
         /* If we have a target key where to store the resulting set
          * create this key with an empty set inside */
-        dstset = createSetObject();
+        dstset = createIntsetObject();
     }
 
     /* Iterate all the elements of the first (smallest) set, and test
      * the element against all the other sets, if at least one set does
      * not include the element it is discarded */
-    di = dictGetIterator(dv[0]);
-
-    while((de = dictNext(di)) != NULL) {
-        robj *ele;
-
-        for (j = 1; j < setsnum; j++)
-            if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
-        if (j != setsnum)
-            continue; /* at least one set does not contain the member */
-        ele = dictGetEntryKey(de);
-        if (!dstkey) {
-            addReplyBulk(c,ele);
-            cardinality++;
-        } else {
-            dictAdd(dstset->ptr,ele,NULL);
-            incrRefCount(ele);
+    si = setTypeInitIterator(sets[0]);
+    while((ele = setTypeNext(si)) != NULL) {
+        for (j = 1; j < setnum; j++)
+            if (!setTypeIsMember(sets[j],ele)) break;
+
+        /* Only take action when all sets contain the member */
+        if (j == setnum) {
+            if (!dstkey) {
+                addReplyBulk(c,ele);
+                cardinality++;
+            } else {
+                setTypeAdd(dstset,ele);
+            }
         }
+        decrRefCount(ele);
     }
-    dictReleaseIterator(di);
+    setTypeReleaseIterator(si);
 
     if (dstkey) {
         /* Store the resulting set into the target, if the intersection
          * is not an empty set. */
         dbDelete(c->db,dstkey);
-        if (dictSize((dict*)dstset->ptr) > 0) {
+        if (setTypeSize(dstset) > 0) {
             dbAdd(c->db,dstkey,dstset);
-            addReplyLongLong(c,dictSize((dict*)dstset->ptr));
+            addReplyLongLong(c,setTypeSize(dstset));
         } else {
             decrRefCount(dstset);
             addReply(c,shared.czero);
@@ -241,7 +402,7 @@ void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum
     } else {
         lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
     }
-    zfree(dv);
+    zfree(sets);
 }
 
 void sinterCommand(redisClient *c) {
@@ -252,85 +413,78 @@ void sinterstoreCommand(redisClient *c) {
     sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
 }
 
-void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
-    dict **dv = zmalloc(sizeof(dict*)*setsnum);
-    dictIterator *di;
-    dictEntry *de;
-    robj *dstset = NULL;
-    int j, cardinality = 0;
+#define REDIS_OP_UNION 0
+#define REDIS_OP_DIFF 1
+#define REDIS_OP_INTER 2
 
-    for (j = 0; j < setsnum; j++) {
-        robj *setobj;
+void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op) {
+    robj **sets = zmalloc(sizeof(robj*)*setnum);
+    setTypeIterator *si;
+    robj *ele, *dstset = NULL;
+    int j, cardinality = 0;
 
-        setobj = dstkey ?
-                    lookupKeyWrite(c->db,setskeys[j]) :
-                    lookupKeyRead(c->db,setskeys[j]);
+    for (j = 0; j < setnum; j++) {
+        robj *setobj = dstkey ?
+            lookupKeyWrite(c->db,setkeys[j]) :
+            lookupKeyRead(c->db,setkeys[j]);
         if (!setobj) {
-            dv[j] = NULL;
+            sets[j] = NULL;
             continue;
         }
-        if (setobj->type != REDIS_SET) {
-            zfree(dv);
-            addReply(c,shared.wrongtypeerr);
+        if (checkType(c,setobj,REDIS_SET)) {
+            zfree(sets);
             return;
         }
-        dv[j] = setobj->ptr;
+        sets[j] = setobj;
     }
 
     /* We need a temp set object to store our union. If the dstkey
      * is not NULL (that is, we are inside an SUNIONSTORE operation) then
      * this set object will be the resulting object to set into the target key*/
-    dstset = createSetObject();
+    dstset = createIntsetObject();
 
     /* Iterate all the elements of all the sets, add every element a single
      * time to the result set */
-    for (j = 0; j < setsnum; j++) {
-        if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
-        if (!dv[j]) continue; /* non existing keys are like empty sets */
-
-        di = dictGetIterator(dv[j]);
+    for (j = 0; j < setnum; j++) {
+        if (op == REDIS_OP_DIFF && j == 0 && !sets[j]) break; /* result set is empty */
+        if (!sets[j]) continue; /* non existing keys are like empty sets */
 
-        while((de = dictNext(di)) != NULL) {
-            robj *ele;
-
-            /* dictAdd will not add the same element multiple times */
-            ele = dictGetEntryKey(de);
+        si = setTypeInitIterator(sets[j]);
+        while((ele = setTypeNext(si)) != NULL) {
             if (op == REDIS_OP_UNION || j == 0) {
-                if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
-                    incrRefCount(ele);
+                if (setTypeAdd(dstset,ele)) {
                     cardinality++;
                 }
             } else if (op == REDIS_OP_DIFF) {
-                if (dictDelete(dstset->ptr,ele) == DICT_OK) {
+                if (setTypeRemove(dstset,ele)) {
                     cardinality--;
                 }
             }
+            decrRefCount(ele);
         }
-        dictReleaseIterator(di);
+        setTypeReleaseIterator(si);
 
-        /* result set is empty? Exit asap. */
+        /* Exit when result set is empty. */
         if (op == REDIS_OP_DIFF && cardinality == 0) break;
     }
 
     /* Output the content of the resulting set, if not in STORE mode */
     if (!dstkey) {
         addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
-        di = dictGetIterator(dstset->ptr);
-        while((de = dictNext(di)) != NULL) {
-            robj *ele;
-
-            ele = dictGetEntryKey(de);
+        si = setTypeInitIterator(dstset);
+        while((ele = setTypeNext(si)) != NULL) {
             addReplyBulk(c,ele);
+            decrRefCount(ele);
         }
-        dictReleaseIterator(di);
+        setTypeReleaseIterator(si);
         decrRefCount(dstset);
     } else {
         /* If we have a target key where to store the resulting set
          * create this key with the result set inside */
         dbDelete(c->db,dstkey);
-        if (dictSize((dict*)dstset->ptr) > 0) {
+        if (setTypeSize(dstset) > 0) {
             dbAdd(c->db,dstkey,dstset);
-            addReplyLongLong(c,dictSize((dict*)dstset->ptr));
+            addReplyLongLong(c,setTypeSize(dstset));
         } else {
             decrRefCount(dstset);
             addReply(c,shared.czero);
@@ -338,7 +492,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj
         touchWatchedKey(c->db,dstkey);
         server.dirty++;
     }
-    zfree(dv);
+    zfree(sets);
 }
 
 void sunionCommand(redisClient *c) {
index 8e226a7dd22a3638c149efaedcf17726700bb03e..24fef4677175cd4202836e48dfdb9a615a8bfacc 100644 (file)
@@ -230,7 +230,11 @@ proc start_server {options {code undefined}} {
         
         # execute provided block
         set curnum $::testnum
-        catch { uplevel 1 $code } err
+        if {![catch { uplevel 1 $code } err]} {
+            # zero exit status is good
+            unset err
+        }
+
         if {$curnum == $::testnum} {
             # don't check for leaks when no tests were executed
             dict set srv "skipleaks" 1
@@ -241,22 +245,24 @@ proc start_server {options {code undefined}} {
         
         # allow an exception to bubble up the call chain but still kill this
         # server, because we want to reuse the ports when the tests are re-run
-        if {$err eq "exception"} {
-            puts [format "Logged warnings (pid %d):" [dict get $srv "pid"]]
-            set warnings [warnings_from_file [dict get $srv "stdout"]]
-            if {[string length $warnings] > 0} {
-                puts "$warnings"
-            } else {
-                puts "(none)"
+        if {[info exists err]} {
+            if {$err eq "exception"} {
+                puts [format "Logged warnings (pid %d):" [dict get $srv "pid"]]
+                set warnings [warnings_from_file [dict get $srv "stdout"]]
+                if {[string length $warnings] > 0} {
+                    puts "$warnings"
+                } else {
+                    puts "(none)"
+                }
+                # kill this server without checking for leaks
+                dict set srv "skipleaks" 1
+                kill_server $srv
+                error "exception"
+            } elseif {[string length $err] > 0} {
+                puts "Error executing the suite, aborting..."
+                puts $err
+                exit 1
             }
-            # kill this server without checking for leaks
-            dict set srv "skipleaks" 1
-            kill_server $srv
-            error "exception"
-        } elseif {[string length $err] > 0} {
-            puts "Error executing the suite, aborting..."
-            puts $err
-            exit 1
         }
 
         set ::tags [lrange $::tags 0 end-[llength $tags]]
index 16a02b3a9ac9e4472db91eef26b314dec98ea55a..bca0173799d7ea7fbe46210e6eb8da3cb0509d14 100644 (file)
@@ -1,5 +1,99 @@
-start_server {tags {"sort"}} {
-    test {SORT ALPHA against integer encoded strings} {
+start_server {
+    tags {"sort"}
+    overrides {
+        "list-max-ziplist-value" 16
+        "list-max-ziplist-entries" 32
+        "set-max-intset-entries" 32
+    }
+} {
+    proc create_random_dataset {num cmd} {
+        set tosort {}
+        set result {}
+        array set seenrand {}
+        r del tosort
+        for {set i 0} {$i < $num} {incr i} {
+            # Make sure all the weights are different because
+            # Redis does not use a stable sort but Tcl does.
+            while 1 {
+                randpath {
+                    set rint [expr int(rand()*1000000)]
+                } {
+                    set rint [expr rand()]
+                }
+                if {![info exists seenrand($rint)]} break
+            }
+            set seenrand($rint) x
+            r $cmd tosort $i
+            r set weight_$i $rint
+            r hset wobj_$i weight $rint
+            lappend tosort [list $i $rint]
+        }
+        set sorted [lsort -index 1 -real $tosort]
+        for {set i 0} {$i < $num} {incr i} {
+            lappend result [lindex $sorted $i 0]
+        }
+        set _ $result
+    }
+
+    foreach {num cmd enc title} {
+        16 lpush ziplist "Ziplist"
+        64 lpush linkedlist "Linked list"
+        16 sadd intset "Intset"
+        64 sadd hashtable "Hash table"
+    } {
+        set result [create_random_dataset $num $cmd]
+        assert_encoding $enc tosort
+
+        test "$title: SORT BY key" {
+            assert_equal $result [r sort tosort {BY weight_*}]
+        }
+
+        test "$title: SORT BY hash field" {
+            assert_equal $result [r sort tosort {BY wobj_*->weight}]
+        }
+    }
+
+    set result [create_random_dataset 16 lpush]
+    test "SORT GET #" {
+        assert_equal [lsort -integer $result] [r sort tosort GET #]
+    }
+
+    test "SORT GET <const>" {
+        r del foo
+        set res [r sort tosort GET foo]
+        assert_equal 16 [llength $res]
+        foreach item $res { assert_equal {} $item }
+    }
+
+    test "SORT GET (key and hash) with sanity check" {
+        set l1 [r sort tosort GET # GET weight_*]
+        set l2 [r sort tosort GET # GET wobj_*->weight]
+        foreach {id1 w1} $l1 {id2 w2} $l2 {
+            assert_equal $id1 $id2
+            assert_equal $w1 [r get weight_$id1]
+            assert_equal $w2 [r get weight_$id1]
+        }
+    }
+
+    test "SORT BY key STORE" {
+        r sort tosort {BY weight_*} store sort-res
+        assert_equal $result [r lrange sort-res 0 -1]
+        assert_equal 16 [r llen sort-res]
+        assert_encoding ziplist sort-res
+    }
+
+    test "SORT BY hash field STORE" {
+        r sort tosort {BY wobj_*->weight} store sort-res
+        assert_equal $result [r lrange sort-res 0 -1]
+        assert_equal 16 [r llen sort-res]
+        assert_encoding ziplist sort-res
+    }
+
+    test "SORT DESC" {
+        assert_equal [lsort -decreasing -integer $result] [r sort tosort {DESC}]
+    }
+
+    test "SORT ALPHA against integer encoded strings" {
         r del mylist
         r lpush mylist 2
         r lpush mylist 1
@@ -8,89 +102,41 @@ start_server {tags {"sort"}} {
         r sort mylist alpha
     } {1 10 2 3}
 
-    tags {"slow"} {
-        set res {}
-        test {Create a random list and a random set} {
-            set tosort {}
-            array set seenrand {}
-            for {set i 0} {$i < 10000} {incr i} {
-                while 1 {
-                    # Make sure all the weights are different because
-                    # Redis does not use a stable sort but Tcl does.
-                    randpath {
-                        set rint [expr int(rand()*1000000)]
-                    } {
-                        set rint [expr rand()]
-                    }
-                    if {![info exists seenrand($rint)]} break
-                }
-                set seenrand($rint) x
-                r lpush tosort $i
-                r sadd tosort-set $i
-                r set weight_$i $rint
-                r hset wobj_$i weight $rint
-                lappend tosort [list $i $rint]
-            }
-            set sorted [lsort -index 1 -real $tosort]
-            for {set i 0} {$i < 10000} {incr i} {
-                lappend res [lindex $sorted $i 0]
-            }
-            format {}
-        } {}
-
-        test {SORT with BY against the newly created list} {
-            r sort tosort {BY weight_*}
-        } $res
-
-        test {SORT with BY (hash field) against the newly created list} {
-            r sort tosort {BY wobj_*->weight}
-        } $res
-
-        test {SORT with GET (key+hash) with sanity check of each element (list)} {
-            set err {}
-            set l1 [r sort tosort GET # GET weight_*]
-            set l2 [r sort tosort GET # GET wobj_*->weight]
-            foreach {id1 w1} $l1 {id2 w2} $l2 {
-                set realweight [r get weight_$id1]
-                if {$id1 != $id2} {
-                    set err "ID mismatch $id1 != $id2"
-                    break
-                }
-                if {$realweight != $w1 || $realweight != $w2} {
-                    set err "Weights mismatch! w1: $w1 w2: $w2 real: $realweight"
-                    break
-                }
-            }
-            set _ $err
-        } {}
-
-        test {SORT with BY, but against the newly created set} {
-            r sort tosort-set {BY weight_*}
-        } $res
-
-        test {SORT with BY (hash field), but against the newly created set} {
-            r sort tosort-set {BY wobj_*->weight}
-        } $res
-
-        test {SORT with BY and STORE against the newly created list} {
-            r sort tosort {BY weight_*} store sort-res
-            r lrange sort-res 0 -1
-        } $res
+    test "SORT sorted set" {
+        r del zset
+        r zadd zset 1 a
+        r zadd zset 5 b
+        r zadd zset 2 c
+        r zadd zset 10 d
+        r zadd zset 3 e
+        r sort zset alpha desc
+    } {e d c b a}
 
-        test {SORT with BY (hash field) and STORE against the newly created list} {
-            r sort tosort {BY wobj_*->weight} store sort-res
-            r lrange sort-res 0 -1
-        } $res
+    test "SORT sorted set: +inf and -inf handling" {
+        r del zset
+        r zadd zset -100 a
+        r zadd zset 200 b
+        r zadd zset -300 c
+        r zadd zset 1000000 d
+        r zadd zset +inf max
+        r zadd zset -inf min
+        r zrange zset 0 -1
+    } {min c a b d max}
 
-        test {SORT direct, numeric, against the newly created list} {
-            r sort tosort
-        } [lsort -integer $res]
+    test "SORT regression for issue #19, sorting floats" {
+        r flushdb
+        set floats {1.1 5.10 3.10 7.44 2.1 5.75 6.12 0.25 1.15}
+        foreach x $floats {
+            r lpush mylist $x
+        }
+        assert_equal [lsort -real $floats] [r sort mylist]
+    }
 
-        test {SORT decreasing sort} {
-            r sort tosort {DESC}
-        } [lsort -decreasing -integer $res]
+    tags {"slow"} {
+        set num 100
+        set res [create_random_dataset $num lpush]
 
-        test {SORT speed, sorting 10000 elements list using BY, 100 times} {
+        test "SORT speed, $num element list BY key, 100 times" {
             set start [clock clicks -milliseconds]
             for {set i 0} {$i < 100} {incr i} {
                 set sorted [r sort tosort {BY weight_* LIMIT 0 10}]
@@ -98,10 +144,9 @@ start_server {tags {"sort"}} {
             set elapsed [expr [clock clicks -milliseconds]-$start]
             puts -nonewline "\n  Average time to sort: [expr double($elapsed)/100] milliseconds "
             flush stdout
-            format {}
-        } {}
+        }
 
-        test {SORT speed, as above but against hash field} {
+        test "SORT speed, $num element list BY hash field, 100 times" {
             set start [clock clicks -milliseconds]
             for {set i 0} {$i < 100} {incr i} {
                 set sorted [r sort tosort {BY wobj_*->weight LIMIT 0 10}]
@@ -109,10 +154,9 @@ start_server {tags {"sort"}} {
             set elapsed [expr [clock clicks -milliseconds]-$start]
             puts -nonewline "\n  Average time to sort: [expr double($elapsed)/100] milliseconds "
             flush stdout
-            format {}
-        } {}
+        }
 
-        test {SORT speed, sorting 10000 elements list directly, 100 times} {
+        test "SORT speed, $num element list directly, 100 times" {
             set start [clock clicks -milliseconds]
             for {set i 0} {$i < 100} {incr i} {
                 set sorted [r sort tosort {LIMIT 0 10}]
@@ -120,10 +164,9 @@ start_server {tags {"sort"}} {
             set elapsed [expr [clock clicks -milliseconds]-$start]
             puts -nonewline "\n  Average time to sort: [expr double($elapsed)/100] milliseconds "
             flush stdout
-            format {}
-        } {}
+        }
 
-        test {SORT speed, pseudo-sorting 10000 elements list, BY <const>, 100 times} {
+        test "SORT speed, $num element list BY <const>, 100 times" {
             set start [clock clicks -milliseconds]
             for {set i 0} {$i < 100} {incr i} {
                 set sorted [r sort tosort {BY nokey LIMIT 0 10}]
@@ -131,49 +174,6 @@ start_server {tags {"sort"}} {
             set elapsed [expr [clock clicks -milliseconds]-$start]
             puts -nonewline "\n  Average time to sort: [expr double($elapsed)/100] milliseconds "
             flush stdout
-            format {}
-        } {}
-    }
-
-    test {SORT regression for issue #19, sorting floats} {
-        r flushdb
-        foreach x {1.1 5.10 3.10 7.44 2.1 5.75 6.12 0.25 1.15} {
-            r lpush mylist $x
         }
-        r sort mylist
-    } [lsort -real {1.1 5.10 3.10 7.44 2.1 5.75 6.12 0.25 1.15}]
-
-    test {SORT with GET #} {
-        r del mylist
-        r lpush mylist 1
-        r lpush mylist 2
-        r lpush mylist 3
-        r mset weight_1 10 weight_2 5 weight_3 30
-        r sort mylist BY weight_* GET #
-    } {2 1 3}
-
-    test {SORT with constant GET} {
-        r sort mylist GET foo
-    } {{} {} {}}
-    
-    test {SORT against sorted sets} {
-        r del zset
-        r zadd zset 1 a
-        r zadd zset 5 b
-        r zadd zset 2 c
-        r zadd zset 10 d
-        r zadd zset 3 e
-        r sort zset alpha desc
-    } {e d c b a}
-
-    test {Sorted sets +inf and -inf handling} {
-        r del zset
-        r zadd zset -100 a
-        r zadd zset 200 b
-        r zadd zset -300 c
-        r zadd zset 1000000 d
-        r zadd zset +inf max
-        r zadd zset -inf min
-        r zrange zset 0 -1
-    } {min c a b d max}
+    }
 }
index 58ea2b5b21499c3151a0f99aceba425992d0db00..056ed27c824ce888cec0952693e2ed0f0384707b 100644 (file)
-start_server {tags {"set"}} {
-    test {SADD, SCARD, SISMEMBER, SMEMBERS basics} {
-        r sadd myset foo
-        r sadd myset bar
-        list [r scard myset] [r sismember myset foo] \
-            [r sismember myset bar] [r sismember myset bla] \
-            [lsort [r smembers myset]]
-    } {2 1 1 0 {bar foo}}
-
-    test {SADD adding the same element multiple times} {
-        r sadd myset foo
-        r sadd myset foo
-        r sadd myset foo
-        r scard myset
-    } {2}
+start_server {
+    tags {"set"}
+    overrides {
+        "set-max-intset-entries" 512
+    }
+} {
+    proc create_set {key entries} {
+        r del $key
+        foreach entry $entries { r sadd $key $entry }
+    }
+
+    test {SADD, SCARD, SISMEMBER, SMEMBERS basics - regular set} {
+        create_set myset {foo}
+        assert_encoding hashtable myset
+        assert_equal 1 [r sadd myset bar]
+        assert_equal 0 [r sadd myset bar]
+        assert_equal 2 [r scard myset]
+        assert_equal 1 [r sismember myset foo]
+        assert_equal 1 [r sismember myset bar]
+        assert_equal 0 [r sismember myset bla]
+        assert_equal {bar foo} [lsort [r smembers myset]]
+    }
+
+    test {SADD, SCARD, SISMEMBER, SMEMBERS basics - intset} {
+        create_set myset {17}
+        assert_encoding intset myset
+        assert_equal 1 [r sadd myset 16]
+        assert_equal 0 [r sadd myset 16]
+        assert_equal 2 [r scard myset]
+        assert_equal 1 [r sismember myset 16]
+        assert_equal 1 [r sismember myset 17]
+        assert_equal 0 [r sismember myset 18]
+        assert_equal {16 17} [lsort [r smembers myset]]
+    }
 
     test {SADD against non set} {
         r lpush mylist foo
-        catch {r sadd mylist bar} err
-        format $err
-    } {ERR*kind*}
-
-    test {SREM basics} {
-        r sadd myset ciao
-        r srem myset foo
-        lsort [r smembers myset]
-    } {bar ciao}
-
-    test {Mass SADD and SINTER with two sets} {
-        for {set i 0} {$i < 1000} {incr i} {
+        assert_error ERR*kind* {r sadd mylist bar}
+    }
+
+    test "SADD a non-integer against an intset" {
+        create_set myset {1 2 3}
+        assert_encoding intset myset
+        assert_equal 1 [r sadd myset a]
+        assert_encoding hashtable myset
+    }
+
+    test "SADD an integer larger than 64 bits" {
+        create_set myset {213244124402402314402033402}
+        assert_encoding hashtable myset
+        assert_equal 1 [r sismember myset 213244124402402314402033402]
+    }
+
+    test "SADD overflows the maximum allowed integers in an intset" {
+        r del myset
+        for {set i 0} {$i < 512} {incr i} { r sadd myset $i }
+        assert_encoding intset myset
+        assert_equal 1 [r sadd myset 512]
+        assert_encoding hashtable myset
+    }
+
+    test "Set encoding after DEBUG RELOAD" {
+        r del myintset myhashset mylargeintset
+        for {set i 0} {$i <  100} {incr i} { r sadd myintset $i }
+        for {set i 0} {$i < 1280} {incr i} { r sadd mylargeintset $i }
+        for {set i 0} {$i <  256} {incr i} { r sadd myhashset [format "i%03d" $i] }
+        assert_encoding intset myintset
+        assert_encoding hashtable mylargeintset
+        assert_encoding hashtable myhashset
+
+        r debug reload
+        assert_encoding intset myintset
+        assert_encoding hashtable mylargeintset
+        assert_encoding hashtable myhashset
+    }
+
+    test {SREM basics - regular set} {
+        create_set myset {foo bar ciao}
+        assert_encoding hashtable myset
+        assert_equal 0 [r srem myset qux]
+        assert_equal 1 [r srem myset foo]
+        assert_equal {bar ciao} [lsort [r smembers myset]]
+    }
+
+    test {SREM basics - intset} {
+        create_set myset {3 4 5}
+        assert_encoding intset myset
+        assert_equal 0 [r srem myset 6]
+        assert_equal 1 [r srem myset 4]
+        assert_equal {3 5} [lsort [r smembers myset]]
+    }
+
+    foreach {type} {hashtable intset} {
+        for {set i 1} {$i <= 5} {incr i} {
+            r del [format "set%d" $i]
+        }
+        for {set i 0} {$i < 200} {incr i} {
             r sadd set1 $i
-            r sadd set2 [expr $i+995]
+            r sadd set2 [expr $i+195]
+        }
+        foreach i {199 195 1000 2000} {
+            r sadd set3 $i
         }
-        lsort [r sinter set1 set2]
-    } {995 996 997 998 999}
+        for {set i 5} {$i < 200} {incr i} {
+            r sadd set4 $i
+        }
+        r sadd set5 0
 
-    test {SUNION with two sets} {
-        lsort [r sunion set1 set2]
-    } [lsort -uniq "[r smembers set1] [r smembers set2]"]
+        # it is possible that a hashtable encoded only contains integers,
+        # because it is converted from an intset to a hashtable when a
+        # non-integer element is added and then removed.
+        if {$type eq "hashtable"} {
+            for {set i 1} {$i <= 5} {incr i} {
+                r sadd [format "set%d" $i] foo
+                r srem [format "set%d" $i] foo
+            }
+        }
 
-    test {SINTERSTORE with two sets} {
-        r sinterstore setres set1 set2
-        lsort [r smembers setres]
-    } {995 996 997 998 999}
+        test "Generated sets must be encoded as $type" {
+            for {set i 1} {$i <= 5} {incr i} {
+                assert_encoding $type [format "set%d" $i]
+            }
+        }
 
-    test {SINTERSTORE with two sets, after a DEBUG RELOAD} {
-        r debug reload
-        r sinterstore setres set1 set2
-        lsort [r smembers setres]
-    } {995 996 997 998 999}
+        test "SINTER with two sets - $type" {
+            assert_equal {195 196 197 198 199} [lsort [r sinter set1 set2]]
+        }
+
+        test "SINTERSTORE with two sets - $type" {
+            r sinterstore setres set1 set2
+            assert_encoding intset setres
+            assert_equal {195 196 197 198 199} [lsort [r smembers setres]]
+        }
+
+        test "SINTERSTORE with two sets, after a DEBUG RELOAD - $type" {
+            r debug reload
+            r sinterstore setres set1 set2
+            assert_encoding intset setres
+            assert_equal {195 196 197 198 199} [lsort [r smembers setres]]
+        }
 
-    test {SUNIONSTORE with two sets} {
-        r sunionstore setres set1 set2
-        lsort [r smembers setres]
-    } [lsort -uniq "[r smembers set1] [r smembers set2]"]
+        test "SUNION with two sets - $type" {
+            set expected [lsort -uniq "[r smembers set1] [r smembers set2]"]
+            assert_equal $expected [lsort [r sunion set1 set2]]
+        }
 
-    test {SUNIONSTORE against non existing keys} {
+        test "SUNIONSTORE with two sets - $type" {
+            r sunionstore setres set1 set2
+            assert_encoding intset setres
+            set expected [lsort -uniq "[r smembers set1] [r smembers set2]"]
+            assert_equal $expected [lsort [r smembers setres]]
+        }
+
+        test "SINTER against three sets - $type" {
+            assert_equal {195 199} [lsort [r sinter set1 set2 set3]]
+        }
+
+        test "SINTERSTORE with three sets - $type" {
+            r sinterstore setres set1 set2 set3
+            assert_equal {195 199} [r smembers setres]
+        }
+
+        test "SUNION with non existing keys - $type" {
+            set expected [lsort -uniq "[r smembers set1] [r smembers set2]"]
+            assert_equal $expected [lsort [r sunion nokey1 set1 set2 nokey2]]
+        }
+
+        test "SDIFF with two sets - $type" {
+            assert_equal {0 1 2 3 4} [lsort [r sdiff set1 set4]]
+        }
+
+        test "SDIFF with three sets - $type" {
+            assert_equal {1 2 3 4} [lsort [r sdiff set1 set4 set5]]
+        }
+
+        test "SDIFFSTORE with three sets - $type" {
+            r sdiffstore setres set1 set4 set5
+            assert_encoding intset setres
+            assert_equal {1 2 3 4} [lsort [r smembers setres]]
+        }
+    }
+
+    test "SINTER against non-set should throw error" {
+        r set key1 x
+        assert_error "ERR*wrong kind*" {r sinter key1 noset}
+    }
+
+    test "SUNION against non-set should throw error" {
+        r set key1 x
+        assert_error "ERR*wrong kind*" {r sunion key1 noset}
+    }
+
+    test "SINTERSTORE against non existing keys should delete dstkey" {
         r set setres xxx
-        list [r sunionstore setres foo111 bar222] [r exists xxx]
-    } {0 0}
-
-    test {SINTER against three sets} {
-        r sadd set3 999
-        r sadd set3 995
-        r sadd set3 1000
-        r sadd set3 2000
-        lsort [r sinter set1 set2 set3]
-    } {995 999}
-
-    test {SINTERSTORE with three sets} {
-        r sinterstore setres set1 set2 set3
-        lsort [r smembers setres]
-    } {995 999}
-
-    test {SUNION with non existing keys} {
-        lsort [r sunion nokey1 set1 set2 nokey2]
-    } [lsort -uniq "[r smembers set1] [r smembers set2]"]
-
-    test {SDIFF with two sets} {
-        for {set i 5} {$i < 1000} {incr i} {
-            r sadd set4 $i
+        assert_equal 0 [r sinterstore setres foo111 bar222]
+        assert_equal 0 [r exists setres]
+    }
+
+    test "SUNIONSTORE against non existing keys should delete dstkey" {
+        r set setres xxx
+        assert_equal 0 [r sunionstore setres foo111 bar222]
+        assert_equal 0 [r exists setres]
+    }
+
+    foreach {type contents} {hashtable {a b c} intset {1 2 3}} {
+        test "SPOP basics - $type" {
+            create_set myset $contents
+            assert_encoding $type myset
+            assert_equal $contents [lsort [list [r spop myset] [r spop myset] [r spop myset]]]
+            assert_equal 0 [r scard myset]
         }
-        lsort [r sdiff set1 set4]
-    } {0 1 2 3 4}
 
-    test {SDIFF with three sets} {
-        r sadd set5 0
-        lsort [r sdiff set1 set4 set5]
-    } {1 2 3 4}
+        test "SRANDMEMBER - $type" {
+            create_set myset $contents
+            unset -nocomplain myset
+            array set myset {}
+            for {set i 0} {$i < 100} {incr i} {
+                set myset([r srandmember myset]) 1
+            }
+            assert_equal $contents [lsort [array names myset]]
+        }
+    }
 
-    test {SDIFFSTORE with three sets} {
-        r sdiffstore sres set1 set4 set5
-        lsort [r smembers sres]
-    } {1 2 3 4}
+    proc setup_move {} {
+        r del myset3 myset4
+        create_set myset1 {1 a b}
+        create_set myset2 {2 3 4}
+        assert_encoding hashtable myset1
+        assert_encoding intset myset2
+    }
 
-    test {SPOP basics} {
-        r del myset
-        r sadd myset 1
-        r sadd myset 2
-        r sadd myset 3
-        list [lsort [list [r spop myset] [r spop myset] [r spop myset]]] [r scard myset]
-    } {{1 2 3} 0}
-    
-    test {SRANDMEMBER} {
-        r del myset
-        r sadd myset a
-        r sadd myset b
-        r sadd myset c
-        unset -nocomplain myset
-        array set myset {}
-        for {set i 0} {$i < 100} {incr i} {
-            set myset([r srandmember myset]) 1
-        }
-        lsort [array names myset]
-    } {a b c}
-    
-    test {SMOVE basics} {
-        r sadd myset1 a
-        r sadd myset1 b
-        r sadd myset1 c
-        r sadd myset2 x
-        r sadd myset2 y
-        r sadd myset2 z
-        r smove myset1 myset2 a
-        list [lsort [r smembers myset2]] [lsort [r smembers myset1]]
-    } {{a x y z} {b c}}
-
-    test {SMOVE non existing key} {
-        list [r smove myset1 myset2 foo] [lsort [r smembers myset2]] [lsort [r smembers myset1]]
-    } {0 {a x y z} {b c}}
-
-    test {SMOVE non existing src set} {
-        list [r smove noset myset2 foo] [lsort [r smembers myset2]]
-    } {0 {a x y z}}
-
-    test {SMOVE non existing dst set} {
-        list [r smove myset2 myset3 y] [lsort [r smembers myset2]] [lsort [r smembers myset3]]
-    } {1 {a x z} y}
-
-    test {SMOVE wrong src key type} {
+    test "SMOVE basics - from regular set to intset" {
+        # move a non-integer element to an intset should convert encoding
+        setup_move
+        assert_equal 1 [r smove myset1 myset2 a]
+        assert_equal {1 b} [lsort [r smembers myset1]]
+        assert_equal {2 3 4 a} [lsort [r smembers myset2]]
+        assert_encoding hashtable myset2
+
+        # move an integer element should not convert the encoding
+        setup_move
+        assert_equal 1 [r smove myset1 myset2 1]
+        assert_equal {a b} [lsort [r smembers myset1]]
+        assert_equal {1 2 3 4} [lsort [r smembers myset2]]
+        assert_encoding intset myset2
+    }
+
+    test "SMOVE basics - from intset to regular set" {
+        setup_move
+        assert_equal 1 [r smove myset2 myset1 2]
+        assert_equal {1 2 a b} [lsort [r smembers myset1]]
+        assert_equal {3 4} [lsort [r smembers myset2]]
+    }
+
+    test "SMOVE non existing key" {
+        setup_move
+        assert_equal 0 [r smove myset1 myset2 foo]
+        assert_equal {1 a b} [lsort [r smembers myset1]]
+        assert_equal {2 3 4} [lsort [r smembers myset2]]
+    }
+
+    test "SMOVE non existing src set" {
+        setup_move
+        assert_equal 0 [r smove noset myset2 foo]
+        assert_equal {2 3 4} [lsort [r smembers myset2]]
+    }
+
+    test "SMOVE from regular set to non existing destination set" {
+        setup_move
+        assert_equal 1 [r smove myset1 myset3 a]
+        assert_equal {1 b} [lsort [r smembers myset1]]
+        assert_equal {a} [lsort [r smembers myset3]]
+        assert_encoding hashtable myset3
+    }
+
+    test "SMOVE from intset to non existing destination set" {
+        setup_move
+        assert_equal 1 [r smove myset2 myset3 2]
+        assert_equal {3 4} [lsort [r smembers myset2]]
+        assert_equal {2} [lsort [r smembers myset3]]
+        assert_encoding intset myset3
+    }
+
+    test "SMOVE wrong src key type" {
         r set x 10
-        catch {r smove x myset2 foo} err
-        format $err
-    } {ERR*}
+        assert_error "ERR*wrong kind*" {r smove x myset2 foo}
+    }
 
-    test {SMOVE wrong dst key type} {
+    test "SMOVE wrong dst key type" {
         r set x 10
-        catch {r smove myset2 x foo} err
-        format $err
-    } {ERR*}
+        assert_error "ERR*wrong kind*" {r smove myset2 x foo}
+    }
 }