From: Pieter Noordhuis Date: Fri, 20 Aug 2010 10:40:29 +0000 (+0200) Subject: Merge branch 'master' into intset-split X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/aaada3f962a9f87fb239e55e3d26c1e794d411d5?ds=sidebyside;hp=-c Merge branch 'master' into intset-split Conflicts: src/Makefile src/t_set.c --- aaada3f962a9f87fb239e55e3d26c1e794d411d5 diff --combined src/Makefile index fb343e80,0af70f17..5fe3971e --- a/src/Makefile +++ b/src/Makefile @@@ -15,7 -15,11 +15,11 @@@ endi CCOPT= $(CFLAGS) $(CCLINK) $(ARCH) $(PROF) DEBUG?= -g -rdynamic -ggdb + INSTALL_TOP= /usr/local + INSTALL_BIN= $(INSTALL_TOP)/bin + INSTALL= cp -p + -OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o vm.o pubsub.o multi.o debug.o sort.o +OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o vm.o pubsub.o multi.o debug.o sort.o intset.o BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o linenoise.o CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o @@@ -54,7 -58,6 +58,7 @@@ sds.o: sds.c sds.h zmalloc. sha1.o: sha1.c sha1.h ziplist.o: ziplist.c zmalloc.h ziplist.h zipmap.o: zipmap.c zmalloc.h +intset.o: intset.c zmalloc.h zmalloc.o: zmalloc.c config.h redis-server: $(OBJ) @@@ -110,3 -113,10 +114,10 @@@ noopt 32bitgprof: make PROF="-pg" ARCH="-arch i386" + + install: all + $(INSTALL) $(PRGNAME) $(INSTALL_BIN) + $(INSTALL) $(BENCHPRGNAME) $(INSTALL_BIN) + $(INSTALL) $(CLIPRGNAME) $(INSTALL_BIN) + $(INSTALL) $(CHECKDUMPPRGNAME) $(INSTALL_BIN) + $(INSTALL) $(CHECKAOFPRGNAME) $(INSTALL_BIN) diff --combined src/aof.c index a2e732d2,f8b92d2d..dc806969 --- a/src/aof.c +++ b/src/aof.c @@@ -194,6 -194,7 +194,7 @@@ struct redisClient *createFakeClient(vo * so that Redis will not try to send replies to this client. */ c->replstate = REDIS_REPL_WAIT_BGSAVE_START; c->reply = listCreate(); + c->watched_keys = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); initClientMultiState(c); @@@ -203,6 -204,7 +204,7 @@@ void freeFakeClient(struct redisClient *c) { sdsfree(c->querybuf); listRelease(c->reply); + listRelease(c->watched_keys); freeClientMultiState(c); zfree(c); } @@@ -461,30 -463,20 +463,30 @@@ int rewriteAppendOnlyFile(char *filenam redisPanic("Unknown list encoding"); } } else if (o->type == REDIS_SET) { - /* Emit the SADDs needed to rebuild the set */ - dict *set = o->ptr; - dictIterator *di = dictGetIterator(set); - dictEntry *de; - - while((de = dictNext(di)) != NULL) { - char cmd[]="*3\r\n$4\r\nSADD\r\n"; - robj *eleobj = dictGetEntryKey(de); + char cmd[]="*3\r\n$4\r\nSADD\r\n"; - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + /* Emit the SADDs needed to rebuild the set */ + if (o->encoding == REDIS_ENCODING_INTSET) { + int ii = 0; + long long llval; + while(intsetGet(o->ptr,ii++,&llval)) { + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkLongLong(fp,llval) == 0) goto werr; + } + } else if (o->encoding == REDIS_ENCODING_HT) { + dictIterator *di = dictGetIterator(o->ptr); + dictEntry *de; + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetEntryKey(de); + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + } + dictReleaseIterator(di); + } else { + redisPanic("Unknown set encoding"); } - dictReleaseIterator(di); } else if (o->type == REDIS_ZSET) { /* Emit the ZADDs needed to rebuild the sorted set */ zset *zs = o->ptr; diff --combined src/object.c index 16c4d74c,21268340..45dde52b --- a/src/object.c +++ b/src/object.c @@@ -1,5 -1,6 +1,6 @@@ #include "redis.h" #include + #include robj *createObject(int type, void *ptr) { robj *o; @@@ -11,8 -12,7 +12,7 @@@ listDelNode(server.objfreelist,head); if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); } else { - if (server.vm_enabled) - pthread_mutex_unlock(&server.obj_freelist_mutex); + if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); o = zmalloc(sizeof(*o)); } o->type = type; @@@ -36,7 -36,8 +36,8 @@@ robj *createStringObject(char *ptr, siz robj *createStringObjectFromLongLong(long long value) { robj *o; - if (value >= 0 && value < REDIS_SHARED_INTEGERS) { + if (value >= 0 && value < REDIS_SHARED_INTEGERS && + pthread_equal(pthread_self(),server.mainthread)) { incrRefCount(shared.integers[value]); o = shared.integers[value]; } else { @@@ -73,16 -74,7 +74,16 @@@ robj *createZiplistObject(void) robj *createSetObject(void) { dict *d = dictCreate(&setDictType,NULL); - return createObject(REDIS_SET,d); + robj *o = createObject(REDIS_SET,d); + o->encoding = REDIS_ENCODING_HT; + return o; +} + +robj *createIntsetObject(void) { + intset *is = intsetNew(); + robj *o = createObject(REDIS_SET,is); + o->encoding = REDIS_ENCODING_INTSET; + return o; } robj *createHashObject(void) { @@@ -123,16 -115,7 +124,16 @@@ void freeListObject(robj *o) } void freeSetObject(robj *o) { - dictRelease((dict*) o->ptr); + switch (o->encoding) { + case REDIS_ENCODING_HT: + dictRelease((dict*) o->ptr); + break; + case REDIS_ENCODING_INTSET: + zfree(o->ptr); + break; + default: + redisPanic("Unknown set encoding type"); + } } void freeZsetObject(robj *o) { @@@ -197,6 -180,7 +198,7 @@@ void decrRefCount(void *obj) case REDIS_HASH: freeHashObject(o); break; default: redisPanic("Unknown object type"); break; } + o->ptr = NULL; /* defensive programming. We'll see NULL in traces. */ if (server.vm_enabled) pthread_mutex_lock(&server.obj_freelist_mutex); if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX || !listAddNodeHead(server.objfreelist,o)) @@@ -232,8 -216,15 +234,15 @@@ robj *tryObjectEncoding(robj *o) /* Check if we can represent this string as a long integer */ if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return o; - /* Ok, this object can be encoded */ - if (value >= 0 && value < REDIS_SHARED_INTEGERS) { + /* Ok, this object can be encoded... + * + * Can I use a shared object? Only if the object is inside a given + * range and if this is the main thread, since when VM is enabled we + * have the constraint that I/O thread should only handle non-shared + * objects, in order to avoid race conditions (we don't have per-object + * locking). */ + if (value >= 0 && value < REDIS_SHARED_INTEGERS && + pthread_equal(pthread_self(),server.mainthread)) { decrRefCount(o); incrRefCount(shared.integers[value]); return shared.integers[value]; @@@ -329,7 -320,7 +338,7 @@@ int getDoubleFromObject(robj *o, doubl redisAssert(o->type == REDIS_STRING); if (o->encoding == REDIS_ENCODING_RAW) { value = strtod(o->ptr, &eptr); - if (eptr[0] != '\0') return REDIS_ERR; + if (eptr[0] != '\0' || isnan(value)) return REDIS_ERR; } else if (o->encoding == REDIS_ENCODING_INT) { value = (long)o->ptr; } else { @@@ -374,7 -365,7 +383,7 @@@ int getLongLongFromObject(robj *o, lon } } - *target = value; + if (target) *target = value; return REDIS_OK; } @@@ -418,7 -409,6 +427,7 @@@ char *strEncoding(int encoding) case REDIS_ENCODING_ZIPMAP: return "zipmap"; case REDIS_ENCODING_LINKEDLIST: return "linkedlist"; case REDIS_ENCODING_ZIPLIST: return "ziplist"; + case REDIS_ENCODING_INTSET: return "intset"; default: return "unknown"; } } diff --combined src/redis.c index 7809d057,1a581a92..7b2ed42e --- a/src/redis.c +++ b/src/redis.c @@@ -74,6 -74,7 +74,7 @@@ struct redisCommand readonlyCommandTabl {"setex",setexCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, {"substr",substrCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, + {"strlen",strlenCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"del",delCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"exists",existsCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1}, @@@ -169,6 -170,7 +170,7 @@@ {"info",infoCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"monitor",monitorCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, + {"persist",persistCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0}, {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0}, @@@ -186,23 -188,22 +188,22 @@@ void redisLog(int level, const char *fmt, ...) { va_list ap; FILE *fp; + char *c = ".-*#"; + char buf[64]; + time_t now; + + if (level < server.verbosity) return; fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a"); if (!fp) return; va_start(ap, fmt); - if (level >= server.verbosity) { - char *c = ".-*#"; - char buf[64]; - time_t now; - - now = time(NULL); - strftime(buf,64,"%d %b %H:%M:%S",localtime(&now)); - fprintf(fp,"[%d] %s %c ",(int)getpid(),buf,c[level]); - vfprintf(fp, fmt, ap); - fprintf(fp,"\n"); - fflush(fp); - } + now = time(NULL); + strftime(buf,64,"%d %b %H:%M:%S",localtime(&now)); + fprintf(fp,"[%d] %s %c ",(int)getpid(),buf,c[level]); + vfprintf(fp, fmt, ap); + fprintf(fp,"\n"); + fflush(fp); va_end(ap); if (server.logfile) fclose(fp); @@@ -435,6 -436,48 +436,48 @@@ void updateDictResizePolicy(void) /* ======================= Cron: called every 100 ms ======================== */ + /* Try to expire a few timed out keys. The algorithm used is adaptive and + * will use few CPU cycles if there are few expiring keys, otherwise + * it will get more aggressive to avoid that too much memory is used by + * keys that can be removed from the keyspace. */ + void activeExpireCycle(void) { + int j; + + for (j = 0; j < server.dbnum; j++) { + int expired; + redisDb *db = server.db+j; + + /* Continue to expire if at the end of the cycle more than 25% + * of the keys were expired. */ + do { + long num = dictSize(db->expires); + time_t now = time(NULL); + + expired = 0; + if (num > REDIS_EXPIRELOOKUPS_PER_CRON) + num = REDIS_EXPIRELOOKUPS_PER_CRON; + while (num--) { + dictEntry *de; + time_t t; + + if ((de = dictGetRandomKey(db->expires)) == NULL) break; + t = (time_t) dictGetEntryVal(de); + if (now > t) { + sds key = dictGetEntryKey(de); + robj *keyobj = createStringObject(key,sdslen(key)); + + propagateExpire(db,keyobj); + dbDelete(db,keyobj); + decrRefCount(keyobj); + expired++; + server.stat_expiredkeys++; + } + } + } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); + } + } + + int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j, loops = server.cronloops++; REDIS_NOTUSED(eventLoop); @@@ -533,41 -576,10 +576,10 @@@ } } - /* Try to expire a few timed out keys. The algorithm used is adaptive and - * will use few CPU cycles if there are few expiring keys, otherwise - * it will get more aggressive to avoid that too much memory is used by - * keys that can be removed from the keyspace. */ - for (j = 0; j < server.dbnum; j++) { - int expired; - redisDb *db = server.db+j; - - /* Continue to expire if at the end of the cycle more than 25% - * of the keys were expired. */ - do { - long num = dictSize(db->expires); - time_t now = time(NULL); - - expired = 0; - if (num > REDIS_EXPIRELOOKUPS_PER_CRON) - num = REDIS_EXPIRELOOKUPS_PER_CRON; - while (num--) { - dictEntry *de; - time_t t; - - if ((de = dictGetRandomKey(db->expires)) == NULL) break; - t = (time_t) dictGetEntryVal(de); - if (now > t) { - sds key = dictGetEntryKey(de); - robj *keyobj = createStringObject(key,sdslen(key)); - - dbDelete(db,keyobj); - decrRefCount(keyobj); - expired++; - server.stat_expiredkeys++; - } - } - } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); - } + /* Expire a few keys per cycle, only if this is a master. + * On slaves we wait for DEL operations synthesized by the master + * in order to guarantee a strict consistency. */ + if (server.masterhost == NULL) activeExpireCycle(); /* Swap a few keys on disk if we are over the memory limit and VM * is enbled. Try to free objects from the free list first. */ @@@ -731,7 -743,6 +743,7 @@@ void initServerConfig() server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE; server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES; server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE; + server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES; server.shutdown_asap = 0; resetServerSaveParams(); @@@ -761,6 -772,7 +773,7 @@@ void initServer() signal(SIGPIPE, SIG_IGN); setupSigSegvAction(); + server.mainthread = pthread_self(); server.devnull = fopen("/dev/null","w"); if (server.devnull == NULL) { redisLog(REDIS_WARNING, "Can't open /dev/null: %s", server.neterr); @@@ -827,7 -839,7 +840,7 @@@ int qsortRedisCommands(const void *r1, void sortCommandTable() { /* Copy and sort the read-only version of the command table */ - commandTable = (struct redisCommand*)malloc(sizeof(readonlyCommandTable)); + commandTable = (struct redisCommand*)zmalloc(sizeof(readonlyCommandTable)); memcpy(commandTable,readonlyCommandTable,sizeof(readonlyCommandTable)); qsort(commandTable, sizeof(readonlyCommandTable)/sizeof(struct redisCommand), diff --combined src/redis.h index 463db704,781fb209..288c9069 --- a/src/redis.h +++ b/src/redis.h @@@ -16,6 -16,7 +16,7 @@@ #include #include #include + #include #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@@ -25,7 -26,6 +26,7 @@@ #include "anet.h" /* Networking the easy way */ #include "zipmap.h" /* Compact string -> string data structure */ #include "ziplist.h" /* Compact list data structure */ +#include "intset.h" /* Compact integer set structure */ #include "version.h" /* Error codes */ @@@ -82,7 -82,6 +83,7 @@@ #define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ #define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */ #define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ +#define REDIS_ENCODING_INTSET 6 /* Encoded as intset */ /* Object types only used for dumping to disk */ #define REDIS_EXPIRETIME 253 @@@ -189,7 -188,6 +190,7 @@@ #define REDIS_HASH_MAX_ZIPMAP_VALUE 512 #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024 #define REDIS_LIST_MAX_ZIPLIST_VALUE 32 +#define REDIS_SET_MAX_INTSET_ENTRIES 4096 /* Sets operations codes */ #define REDIS_OP_UNION 0 @@@ -329,6 -327,7 +330,7 @@@ struct sharedObjectsStruct /* Global server state structure */ struct redisServer { + pthread_t mainthread; int port; int fd; redisDb *db; @@@ -399,7 -398,6 +401,7 @@@ size_t hash_max_zipmap_value; size_t list_max_ziplist_entries; size_t list_max_ziplist_value; + size_t set_max_intset_entries; /* Virtual memory state */ FILE *vm_fp; int vm_fd; @@@ -537,14 -535,6 +539,14 @@@ typedef struct listNode *ln; /* Entry in linked list */ } listTypeEntry; +/* Structure to hold set iteration abstraction. */ +typedef struct { + robj *subject; + int encoding; + int ii; /* intset iterator */ + dictIterator *di; +} setIterator; + /* Structure to hold hash iteration abstration. Note that iteration over * hashes involves both fields and values. Because it is possible that * not both are required, store pointers in the iterator to avoid @@@ -643,7 -633,6 +645,7 @@@ robj *createStringObjectFromLongLong(lo robj *createListObject(void); robj *createZiplistObject(void); robj *createSetObject(void); +robj *createIntsetObject(void); robj *createHashObject(void); robj *createZsetObject(void); int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg); @@@ -727,18 -716,6 +729,18 @@@ int dontWaitForSwappedKey(redisClient * void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); vmpointer *vmSwapObjectBlocking(robj *val); +/* Set data type */ +robj *setTypeCreate(robj *value); +int setTypeAdd(robj *subject, robj *value); +int setTypeRemove(robj *subject, robj *value); +int setTypeIsMember(robj *subject, robj *value); +setIterator *setTypeInitIterator(robj *subject); +void setTypeReleaseIterator(setIterator *si); +robj *setTypeNext(setIterator *si); +robj *setTypeRandomElement(robj *subject); +unsigned long setTypeSize(robj *subject); +void setTypeConvert(robj *subject, int enc); + /* Hash data type */ void convertToRealHash(robj *o); void hashTypeTryConversion(robj *subject, robj **argv, int start, int end); @@@ -775,10 -752,10 +777,10 @@@ void resetServerSaveParams() /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); + void propagateExpire(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key); - int deleteIfVolatile(redisDb *db, robj *key); time_t getExpire(redisDb *db, robj *key); - int setExpire(redisDb *db, robj *key, time_t when); + void setExpire(redisDb *db, robj *key, time_t when); robj *lookupKey(redisDb *db, robj *key); robj *lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key); @@@ -861,6 -838,7 +863,7 @@@ void expireCommand(redisClient *c) void expireatCommand(redisClient *c); void getsetCommand(redisClient *c); void ttlCommand(redisClient *c); + void persistCommand(redisClient *c); void slaveofCommand(redisClient *c); void debugCommand(redisClient *c); void msetCommand(redisClient *c); @@@ -882,6 -860,7 +885,7 @@@ void blpopCommand(redisClient *c) void brpopCommand(redisClient *c); void appendCommand(redisClient *c); void substrCommand(redisClient *c); + void strlenCommand(redisClient *c); void zrankCommand(redisClient *c); void zrevrankCommand(redisClient *c); void hsetCommand(redisClient *c); @@@ -908,4 -887,11 +912,11 @@@ void publishCommand(redisClient *c) void watchCommand(redisClient *c); void unwatchCommand(redisClient *c); + #if defined(__GNUC__) + void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); + void free(void *ptr) __attribute__ ((deprecated)); + void *malloc(size_t size) __attribute__ ((deprecated)); + void *realloc(void *ptr, size_t size) __attribute__ ((deprecated)); + #endif + #endif diff --combined src/t_set.c index 3fbf13a3,94b97633..bcb8dd3f --- a/src/t_set.c +++ b/src/t_set.c @@@ -4,182 -4,12 +4,182 @@@ * Set Commands *----------------------------------------------------------------------------*/ +/* Factory method to return a set that *can* hold "value". When the object has + * an integer-encodable value, an intset will be returned. Otherwise a regular + * hash table. */ +robj *setTypeCreate(robj *value) { + if (getLongLongFromObject(value,NULL) == REDIS_OK) + return createIntsetObject(); + return createSetObject(); +} + +int setTypeAdd(robj *subject, robj *value) { + long long llval; + if (subject->encoding == REDIS_ENCODING_HT) { + if (dictAdd(subject->ptr,value,NULL) == DICT_OK) { + incrRefCount(value); + return 1; + } + } else if (subject->encoding == REDIS_ENCODING_INTSET) { + if (getLongLongFromObject(value,&llval) == REDIS_OK) { + uint8_t success = 0; + subject->ptr = intsetAdd(subject->ptr,llval,&success); + if (success) { + /* Convert to regular set when the intset contains + * too many entries. */ + if (intsetLen(subject->ptr) > server.set_max_intset_entries) + setTypeConvert(subject,REDIS_ENCODING_HT); + return 1; + } + } else { + /* Failed to get integer from object, convert to regular set. */ + setTypeConvert(subject,REDIS_ENCODING_HT); + + /* The set *was* an intset and this value is not integer + * encodable, so dictAdd should always work. */ + redisAssert(dictAdd(subject->ptr,value,NULL) == DICT_OK); + incrRefCount(value); + return 1; + } + } else { + redisPanic("Unknown set encoding"); + } + return 0; +} + +int setTypeRemove(robj *subject, robj *value) { + long long llval; + if (subject->encoding == REDIS_ENCODING_HT) { + if (dictDelete(subject->ptr,value) == DICT_OK) { + if (htNeedsResize(subject->ptr)) dictResize(subject->ptr); + return 1; + } + } else if (subject->encoding == REDIS_ENCODING_INTSET) { + if (getLongLongFromObject(value,&llval) == REDIS_OK) { + uint8_t success; + subject->ptr = intsetRemove(subject->ptr,llval,&success); + if (success) return 1; + } + } else { + redisPanic("Unknown set encoding"); + } + return 0; +} + +int setTypeIsMember(robj *subject, robj *value) { + long long llval; + if (subject->encoding == REDIS_ENCODING_HT) { + return dictFind((dict*)subject->ptr,value) != NULL; + } else if (subject->encoding == REDIS_ENCODING_INTSET) { + if (getLongLongFromObject(value,&llval) == REDIS_OK) { + return intsetFind((intset*)subject->ptr,llval); + } + } else { + redisPanic("Unknown set encoding"); + } + return 0; +} + +setIterator *setTypeInitIterator(robj *subject) { + setIterator *si = zmalloc(sizeof(setIterator)); + si->subject = subject; + si->encoding = subject->encoding; + if (si->encoding == REDIS_ENCODING_HT) { + si->di = dictGetIterator(subject->ptr); + } else if (si->encoding == REDIS_ENCODING_INTSET) { + si->ii = 0; + } else { + redisPanic("Unknown set encoding"); + } + return si; +} + +void setTypeReleaseIterator(setIterator *si) { + if (si->encoding == REDIS_ENCODING_HT) + dictReleaseIterator(si->di); + zfree(si); +} + +/* Move to the next entry in the set. Returns the object at the current + * position, or NULL when the end is reached. This object will have its + * refcount incremented, so the caller needs to take care of this. */ +robj *setTypeNext(setIterator *si) { + robj *ret = NULL; + if (si->encoding == REDIS_ENCODING_HT) { + dictEntry *de = dictNext(si->di); + if (de != NULL) { + ret = dictGetEntryKey(de); + incrRefCount(ret); + } + } else if (si->encoding == REDIS_ENCODING_INTSET) { + long long llval; + if (intsetGet(si->subject->ptr,si->ii++,&llval)) + ret = createStringObjectFromLongLong(llval); + } + return ret; +} + + +/* Return random element from set. The returned object will always have + * an incremented refcount. */ +robj *setTypeRandomElement(robj *subject) { + robj *ret = NULL; + if (subject->encoding == REDIS_ENCODING_HT) { + dictEntry *de = dictGetRandomKey(subject->ptr); + ret = dictGetEntryKey(de); + incrRefCount(ret); + } else if (subject->encoding == REDIS_ENCODING_INTSET) { + long long llval = intsetRandom(subject->ptr); + ret = createStringObjectFromLongLong(llval); + } else { + redisPanic("Unknown set encoding"); + } + return ret; +} + +unsigned long setTypeSize(robj *subject) { + if (subject->encoding == REDIS_ENCODING_HT) { + return dictSize((dict*)subject->ptr); + } else if (subject->encoding == REDIS_ENCODING_INTSET) { + return intsetLen((intset*)subject->ptr); + } else { + redisPanic("Unknown set encoding"); + } +} + +/* Convert the set to specified encoding. The resulting dict (when converting + * to a hashtable) is presized to hold the number of elements in the original + * set. */ +void setTypeConvert(robj *subject, int enc) { + setIterator *si; + robj *element; + redisAssert(subject->type == REDIS_SET); + + if (enc == REDIS_ENCODING_HT) { + dict *d = dictCreate(&setDictType,NULL); + /* Presize the dict to avoid rehashing */ + dictExpand(d,intsetLen(subject->ptr)); + + /* setTypeGet returns a robj with incremented refcount */ + si = setTypeInitIterator(subject); + while ((element = setTypeNext(si)) != NULL) + redisAssert(dictAdd(d,element,NULL) == DICT_OK); + setTypeReleaseIterator(si); + + subject->encoding = REDIS_ENCODING_HT; + zfree(subject->ptr); + subject->ptr = d; + } else { + redisPanic("Unsupported set conversion"); + } +} + void saddCommand(redisClient *c) { robj *set; set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { - set = createSetObject(); + set = setTypeCreate(c->argv[2]); dbAdd(c->db,c->argv[1],set); } else { if (set->type != REDIS_SET) { @@@ -187,7 -17,9 +187,8 @@@ return; } } - if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) { - incrRefCount(c->argv[2]); + if (setTypeAdd(set,c->argv[2])) { + touchWatchedKey(c->db,c->argv[1]); server.dirty++; addReply(c,shared.cone); } else { @@@ -201,9 -33,11 +202,10 @@@ void sremCommand(redisClient *c) if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; - if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) { - server.dirty++; + if (setTypeRemove(set,c->argv[2])) { + if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); + touchWatchedKey(c->db,c->argv[1]); - if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]); + server.dirty++; addReply(c,shared.cone); } else { addReply(c,shared.czero); @@@ -211,46 -45,40 +213,48 @@@ } void smoveCommand(redisClient *c) { - robj *srcset, *dstset; - + robj *srcset, *dstset, *ele; srcset = lookupKeyWrite(c->db,c->argv[1]); dstset = lookupKeyWrite(c->db,c->argv[2]); + ele = c->argv[3]; - /* If the source key does not exist return 0, if it's of the wrong type - * raise an error */ - if (srcset == NULL || srcset->type != REDIS_SET) { - addReply(c, srcset ? shared.wrongtypeerr : shared.czero); + /* If the source key does not exist return 0 */ + if (srcset == NULL) { + addReply(c,shared.czero); return; } - /* Error if the destination key is not a set as well */ - if (dstset && dstset->type != REDIS_SET) { - addReply(c,shared.wrongtypeerr); + + /* If the source key has the wrong type, or the destination key + * is set and has the wrong type, return with an error. */ + if (checkType(c,srcset,REDIS_SET) || + (dstset && checkType(c,dstset,REDIS_SET))) return; + + /* If srcset and dstset are equal, SMOVE is a no-op */ + if (srcset == dstset) { + addReply(c,shared.cone); return; } - /* Remove the element from the source set */ - if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) { - /* Key not found in the src set! return zero */ + + /* If the element cannot be removed from the src set, return 0. */ + if (!setTypeRemove(srcset,ele)) { addReply(c,shared.czero); return; } - if (dictSize((dict*)srcset->ptr) == 0 && srcset != dstset) - dbDelete(c->db,c->argv[1]); + + /* Remove the src set from the database when empty */ + if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]); + touchWatchedKey(c->db,c->argv[1]); + touchWatchedKey(c->db,c->argv[2]); server.dirty++; - /* Add the element to the destination set */ + + /* Create the destination set when it doesn't exist */ if (!dstset) { - dstset = createSetObject(); + dstset = setTypeCreate(ele); dbAdd(c->db,c->argv[2],dstset); } - if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK) - incrRefCount(c->argv[3]); + + /* An extra key has changed when ele was successfully added to dstset */ + if (setTypeAdd(dstset,ele)) server.dirty++; addReply(c,shared.cone); } @@@ -260,7 -88,7 +264,7 @@@ void sismemberCommand(redisClient *c) if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; - if (dictFind(set->ptr,c->argv[2])) + if (setTypeIsMember(set,c->argv[2])) addReply(c,shared.cone); else addReply(c,shared.czero); @@@ -268,80 -96,96 +272,83 @@@ void scardCommand(redisClient *c) { robj *o; - dict *s; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_SET)) return; - s = o->ptr; - addReplyUlong(c,dictSize(s)); + addReplyUlong(c,setTypeSize(o)); } void spopCommand(redisClient *c) { - robj *set; - dictEntry *de; + robj *set, *ele; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - de = dictGetRandomKey(set->ptr); - if (de == NULL) { + ele = setTypeRandomElement(set); + if (ele == NULL) { addReply(c,shared.nullbulk); } else { - robj *ele = dictGetEntryKey(de); - + setTypeRemove(set,ele); addReplyBulk(c,ele); - dictDelete(set->ptr,ele); - if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]); + decrRefCount(ele); + if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); + touchWatchedKey(c->db,c->argv[1]); server.dirty++; } } void srandmemberCommand(redisClient *c) { - robj *set; - dictEntry *de; + robj *set, *ele; if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - de = dictGetRandomKey(set->ptr); - if (de == NULL) { + ele = setTypeRandomElement(set); + if (ele == NULL) { addReply(c,shared.nullbulk); } else { - robj *ele = dictGetEntryKey(de); - addReplyBulk(c,ele); + decrRefCount(ele); } } int qsortCompareSetsByCardinality(const void *s1, const void *s2) { - dict **d1 = (void*) s1, **d2 = (void*) s2; - - return dictSize(*d1)-dictSize(*d2); + return setTypeSize(*(robj**)s1)-setTypeSize(*(robj**)s2); } -void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) { - dict **dv = zmalloc(sizeof(dict*)*setsnum); - dictIterator *di; - dictEntry *de; - robj *lenobj = NULL, *dstset = NULL; +void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) { + robj **sets = zmalloc(sizeof(robj*)*setnum); + setIterator *si; + robj *ele, *lenobj = NULL, *dstset = NULL; unsigned long j, cardinality = 0; - for (j = 0; j < setsnum; j++) { - robj *setobj; - - setobj = dstkey ? - lookupKeyWrite(c->db,setskeys[j]) : - lookupKeyRead(c->db,setskeys[j]); + for (j = 0; j < setnum; j++) { + robj *setobj = dstkey ? + lookupKeyWrite(c->db,setkeys[j]) : + lookupKeyRead(c->db,setkeys[j]); if (!setobj) { - zfree(dv); + zfree(sets); if (dstkey) { - if (dbDelete(c->db,dstkey)) + if (dbDelete(c->db,dstkey)) { + touchWatchedKey(c->db,dstkey); server.dirty++; + } addReply(c,shared.czero); } else { addReply(c,shared.emptymultibulk); } return; } - if (setobj->type != REDIS_SET) { - zfree(dv); - addReply(c,shared.wrongtypeerr); + if (checkType(c,setobj,REDIS_SET)) { + zfree(sets); return; } - dv[j] = setobj->ptr; + sets[j] = setobj; } /* Sort sets from the smallest to largest, this will improve our * algorithm's performace */ - qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality); + qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality); /* The first thing we should output is the total number of elements... * since this is a multi-bulk write, but at this stage we don't know @@@ -355,46 -199,49 +362,47 @@@ } else { /* If we have a target key where to store the resulting set * create this key with an empty set inside */ - dstset = createSetObject(); + dstset = createIntsetObject(); } /* Iterate all the elements of the first (smallest) set, and test * the element against all the other sets, if at least one set does * not include the element it is discarded */ - di = dictGetIterator(dv[0]); - - while((de = dictNext(di)) != NULL) { - robj *ele; - - for (j = 1; j < setsnum; j++) - if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break; - if (j != setsnum) - continue; /* at least one set does not contain the member */ - ele = dictGetEntryKey(de); - if (!dstkey) { - addReplyBulk(c,ele); - cardinality++; - } else { - dictAdd(dstset->ptr,ele,NULL); - incrRefCount(ele); + si = setTypeInitIterator(sets[0]); + while((ele = setTypeNext(si)) != NULL) { + for (j = 1; j < setnum; j++) + if (!setTypeIsMember(sets[j],ele)) break; + + /* Only take action when all sets contain the member */ + if (j == setnum) { + if (!dstkey) { + addReplyBulk(c,ele); + cardinality++; + } else { + setTypeAdd(dstset,ele); + } } + decrRefCount(ele); } - dictReleaseIterator(di); + setTypeReleaseIterator(si); if (dstkey) { /* Store the resulting set into the target, if the intersection * is not an empty set. */ dbDelete(c->db,dstkey); - if (dictSize((dict*)dstset->ptr) > 0) { + if (setTypeSize(dstset) > 0) { dbAdd(c->db,dstkey,dstset); - addReplyLongLong(c,dictSize((dict*)dstset->ptr)); + addReplyLongLong(c,setTypeSize(dstset)); } else { decrRefCount(dstset); addReply(c,shared.czero); } + touchWatchedKey(c->db,dstkey); server.dirty++; } else { lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality); } - zfree(dv); + zfree(sets); } void sinterCommand(redisClient *c) { @@@ -405,85 -252,93 +413,86 @@@ void sinterstoreCommand(redisClient *c sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]); } -void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) { - dict **dv = zmalloc(sizeof(dict*)*setsnum); - dictIterator *di; - dictEntry *de; - robj *dstset = NULL; - int j, cardinality = 0; +#define REDIS_OP_UNION 0 +#define REDIS_OP_DIFF 1 +#define REDIS_OP_INTER 2 - for (j = 0; j < setsnum; j++) { - robj *setobj; +void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op) { + robj **sets = zmalloc(sizeof(robj*)*setnum); + setIterator *si; + robj *ele, *dstset = NULL; + int j, cardinality = 0; - setobj = dstkey ? - lookupKeyWrite(c->db,setskeys[j]) : - lookupKeyRead(c->db,setskeys[j]); + for (j = 0; j < setnum; j++) { + robj *setobj = dstkey ? + lookupKeyWrite(c->db,setkeys[j]) : + lookupKeyRead(c->db,setkeys[j]); if (!setobj) { - dv[j] = NULL; + sets[j] = NULL; continue; } - if (setobj->type != REDIS_SET) { - zfree(dv); - addReply(c,shared.wrongtypeerr); + if (checkType(c,setobj,REDIS_SET)) { + zfree(sets); return; } - dv[j] = setobj->ptr; + sets[j] = setobj; } /* We need a temp set object to store our union. If the dstkey * is not NULL (that is, we are inside an SUNIONSTORE operation) then * this set object will be the resulting object to set into the target key*/ - dstset = createSetObject(); + dstset = createIntsetObject(); /* Iterate all the elements of all the sets, add every element a single * time to the result set */ - for (j = 0; j < setsnum; j++) { - if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */ - if (!dv[j]) continue; /* non existing keys are like empty sets */ - - di = dictGetIterator(dv[j]); + for (j = 0; j < setnum; j++) { + if (op == REDIS_OP_DIFF && j == 0 && !sets[j]) break; /* result set is empty */ + if (!sets[j]) continue; /* non existing keys are like empty sets */ - while((de = dictNext(di)) != NULL) { - robj *ele; - - /* dictAdd will not add the same element multiple times */ - ele = dictGetEntryKey(de); + si = setTypeInitIterator(sets[j]); + while((ele = setTypeNext(si)) != NULL) { if (op == REDIS_OP_UNION || j == 0) { - if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) { - incrRefCount(ele); + if (setTypeAdd(dstset,ele)) { cardinality++; } } else if (op == REDIS_OP_DIFF) { - if (dictDelete(dstset->ptr,ele) == DICT_OK) { + if (setTypeRemove(dstset,ele)) { cardinality--; } } + decrRefCount(ele); } - dictReleaseIterator(di); + setTypeReleaseIterator(si); - /* result set is empty? Exit asap. */ + /* Exit when result set is empty. */ if (op == REDIS_OP_DIFF && cardinality == 0) break; } /* Output the content of the resulting set, if not in STORE mode */ if (!dstkey) { addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality)); - di = dictGetIterator(dstset->ptr); - while((de = dictNext(di)) != NULL) { - robj *ele; - - ele = dictGetEntryKey(de); + si = setTypeInitIterator(dstset); + while((ele = setTypeNext(si)) != NULL) { addReplyBulk(c,ele); + decrRefCount(ele); } - dictReleaseIterator(di); + setTypeReleaseIterator(si); decrRefCount(dstset); } else { /* If we have a target key where to store the resulting set * create this key with the result set inside */ dbDelete(c->db,dstkey); - if (dictSize((dict*)dstset->ptr) > 0) { + if (setTypeSize(dstset) > 0) { dbAdd(c->db,dstkey,dstset); - addReplyLongLong(c,dictSize((dict*)dstset->ptr)); + addReplyLongLong(c,setTypeSize(dstset)); } else { decrRefCount(dstset); addReply(c,shared.czero); } + touchWatchedKey(c->db,dstkey); server.dirty++; } - zfree(dv); + zfree(sets); } void sunionCommand(redisClient *c) {