X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/ca3f830b32a0a8303a5a761d6212925d9b5ac365..35cabcb50557eae1433a68c7730ad8aff478b68a:/redis.c diff --git a/redis.c b/redis.c index 33db9a1a..cb7bca8e 100644 --- a/redis.c +++ b/redis.c @@ -75,6 +75,7 @@ #include "lzf.h" /* LZF compression library */ #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ #include "zipmap.h" /* Compact dictionary-alike data structure */ +#include "ziplist.h" /* Compact list data structure */ #include "sha1.h" /* SHA1 is used for DEBUG DIGEST */ #include "release.h" /* Release and/or git repository information */ @@ -120,17 +121,20 @@ #define REDIS_SET 2 #define REDIS_ZSET 3 #define REDIS_HASH 4 +#define REDIS_VMPOINTER 8 /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. */ -#define REDIS_ENCODING_RAW 0 /* Raw representation */ -#define REDIS_ENCODING_INT 1 /* Encoded as integer */ -#define REDIS_ENCODING_ZIPMAP 2 /* Encoded as zipmap */ -#define REDIS_ENCODING_HT 3 /* Encoded as an hash table */ +#define REDIS_ENCODING_RAW 0 /* Raw representation */ +#define REDIS_ENCODING_INT 1 /* Encoded as integer */ +#define REDIS_ENCODING_HT 2 /* Encoded as hash table */ +#define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ +#define REDIS_ENCODING_LIST 4 /* Encoded as zipmap */ +#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ static char* strencoding[] = { - "raw", "int", "zipmap", "hashtable" + "raw", "int", "hashtable", "zipmap", "list", "ziplist" }; /* Object types only used for dumping to disk */ @@ -233,9 +237,11 @@ static char* strencoding[] = { #define APPENDFSYNC_ALWAYS 1 #define APPENDFSYNC_EVERYSEC 2 -/* Hashes related defaults */ +/* Zip structure related defaults */ #define REDIS_HASH_MAX_ZIPMAP_ENTRIES 64 #define REDIS_HASH_MAX_ZIPMAP_VALUE 512 +#define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024 +#define REDIS_LIST_MAX_ZIPLIST_VALUE 32 /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) @@ -247,30 +253,41 @@ static void _redisPanic(char *msg, char *file, int line); /* A redis object, that is a type able to hold a string / list / set */ -/* The VM object structure */ -struct redisObjectVM { - off_t page; /* the page at witch the object is stored on disk */ - off_t usedpages; /* number of pages used on disk */ - time_t atime; /* Last access time */ -} vm; - /* The actual Redis Object */ typedef struct redisObject { - void *ptr; - unsigned char type; - unsigned char encoding; - unsigned char storage; /* If this object is a key, where is the value? - * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */ - unsigned char vtype; /* If this object is a key, and value is swapped out, - * this is the type of the swapped out object. */ + unsigned type:4; + unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */ + unsigned encoding:4; + unsigned lru:22; /* lru time (relative to server.lruclock) */ int refcount; + void *ptr; /* VM fields, this are only allocated if VM is active, otherwise the * object allocation function will just allocate * sizeof(redisObjct) minus sizeof(redisObjectVM), so using * Redis without VM active will not have any overhead. */ - struct redisObjectVM vm; } robj; +/* The VM pointer structure - identifies an object in the swap file. + * + * This object is stored in place of the value + * object in the main key->value hash table representing a database. + * Note that the first fields (type, storage) are the same as the redisObject + * structure so that vmPointer strucuters can be accessed even when casted + * as redisObject structures. + * + * This is useful as we don't know if a value object is or not on disk, but we + * are always able to read obj->storage to check this. For vmPointer + * structures "type" is set to REDIS_VMPOINTER (even if without this field + * is still possible to check the kind of object from the value of 'storage').*/ +typedef struct vmPointer { + unsigned type:4; + unsigned storage:2; /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ + unsigned notused:26; + unsigned int vtype; /* type of the object stored in the swap file */ + off_t page; /* the page at witch the object is stored on disk */ + off_t usedpages; /* number of pages used on disk */ +} vmpointer; + /* Macro used to initalize a Redis object allocated on the stack. * Note that this macro is taken near the structure definition to make sure * we'll update it when the structure is changed, to avoid bugs like @@ -280,7 +297,7 @@ typedef struct redisObject { _var.type = REDIS_STRING; \ _var.encoding = REDIS_ENCODING_RAW; \ _var.ptr = _ptr; \ - if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \ + _var.storage = REDIS_VM_MEMORY; \ } while(0); typedef struct redisDb { @@ -369,6 +386,7 @@ struct redisServer { int daemonize; int appendonly; int appendfsync; + int no_appendfsync_on_rewrite; int shutdown_asap; time_t lastfsync; int appendfd; @@ -409,9 +427,11 @@ struct redisServer { off_t vm_page_size; off_t vm_pages; unsigned long long vm_max_memory; - /* Hashes config */ + /* Zip structure config */ size_t hash_max_zipmap_entries; size_t hash_max_zipmap_value; + size_t list_max_ziplist_entries; + size_t list_max_ziplist_value; /* Virtual memory state */ FILE *vm_fp; int vm_fd; @@ -449,6 +469,8 @@ struct redisServer { list *pubsub_patterns; /* A list of pubsub_patterns */ /* Misc */ FILE *devnull; + unsigned lruclock:22; /* clock incrementing every minute, for LRU */ + unsigned lruclock_padding:10; }; typedef struct pubsubPattern { @@ -541,6 +563,9 @@ typedef struct iojob { int type; /* Request type, REDIS_IOJOB_* */ redisDb *db;/* Redis database */ robj *key; /* This I/O request is about swapping this key */ + robj *id; /* Unique identifier of this job: + this is the object to swap for REDIS_IOREQ_*_SWAP, or the + vmpointer objct for REDIS_IOREQ_LOAD. */ robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */ off_t page; /* Swap page where to read/write the object */ @@ -574,8 +599,7 @@ static robj *getDecodedObject(robj *o); static int removeExpire(redisDb *db, robj *key); static int expireIfNeeded(redisDb *db, robj *key); static int deleteIfVolatile(redisDb *db, robj *key); -static int deleteIfSwapped(redisDb *db, robj *key); -static int deleteKey(redisDb *db, robj *key); +static int dbDelete(redisDb *db, robj *key); static time_t getExpire(redisDb *db, robj *key); static int setExpire(redisDb *db, robj *key, time_t when); static void updateSlavesWaitingBgsave(int bgsaveerr); @@ -597,8 +621,8 @@ static void unblockClientWaitingData(redisClient *c); static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele); static void vmInit(void); static void vmMarkPagesFree(off_t page, off_t count); -static robj *vmLoadObject(robj *key); -static robj *vmPreviewObject(robj *key); +static robj *vmLoadObject(robj *o); +static robj *vmPreviewObject(robj *o); static int vmSwapOneObjectBlocking(void); static int vmSwapOneObjectThreaded(void); static int vmCanSwapOut(void); @@ -626,6 +650,7 @@ static struct redisCommand *lookupCommand(char *name); static void call(redisClient *c, struct redisCommand *cmd); static void resetClient(redisClient *c); static void convertToRealHash(robj *o); +static void listTypeConvert(robj *o, int enc); static int pubsubUnsubscribeAllChannels(redisClient *c, int notify); static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify); static void freePubsubPattern(void *p); @@ -634,7 +659,7 @@ static int compareStringObjects(robj *a, robj *b); static int equalStringObjects(robj *a, robj *b); static void usage(); static int rewriteAppendOnlyFileBackground(void); -static int vmSwapObjectBlocking(robj *key, robj *val); +static vmpointer *vmSwapObjectBlocking(robj *val); static int prepareForShutdown(); static void touchWatchedKey(redisDb *db, robj *key); static void touchWatchedKeysOnFlush(int dbid); @@ -752,7 +777,8 @@ static void unwatchCommand(redisClient *c); /* Global vars */ static struct redisServer server; /* server global state */ -static struct redisCommand cmdTable[] = { +static struct redisCommand *commandTable; +static struct redisCommand readonlyCommandTable[] = { {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, @@ -860,8 +886,7 @@ static struct redisCommand cmdTable[] = { {"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0}, {"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0}, {"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, - {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {NULL,NULL,0,0,NULL,0,0,0} + {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0} }; /*============================ Utility functions ============================ */ @@ -1107,7 +1132,7 @@ static void dictListDestructor(void *privdata, void *val) listRelease((list*)val); } -static int sdsDictKeyCompare(void *privdata, const void *key1, +static int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2) { int l1,l2; @@ -1127,11 +1152,18 @@ static void dictRedisObjectDestructor(void *privdata, void *val) decrRefCount(val); } +static void dictSdsDestructor(void *privdata, void *val) +{ + DICT_NOTUSED(privdata); + + sdsfree(val); +} + static int dictObjKeyCompare(void *privdata, const void *key1, const void *key2) { const robj *o1 = key1, *o2 = key2; - return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr); + return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); } static unsigned int dictObjHash(const void *key) { @@ -1139,6 +1171,10 @@ static unsigned int dictObjHash(const void *key) { return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); } +static unsigned int dictSdsHash(const void *key) { + return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); +} + static int dictEncObjKeyCompare(void *privdata, const void *key1, const void *key2) { @@ -1151,7 +1187,7 @@ static int dictEncObjKeyCompare(void *privdata, const void *key1, o1 = getDecodedObject(o1); o2 = getDecodedObject(o2); - cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr); + cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); decrRefCount(o1); decrRefCount(o2); return cmp; @@ -1180,7 +1216,7 @@ static unsigned int dictEncObjHash(const void *key) { } } -/* Sets type and expires */ +/* Sets type */ static dictType setDictType = { dictEncObjHash, /* hash function */ NULL, /* key dup */ @@ -1200,23 +1236,23 @@ static dictType zsetDictType = { dictVanillaFree /* val destructor of malloc(sizeof(double)) */ }; -/* Db->dict */ +/* Db->dict, keys are sds strings, vals are Redis objects. */ static dictType dbDictType = { - dictObjHash, /* hash function */ + dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictObjKeyCompare, /* key compare */ - dictRedisObjectDestructor, /* key destructor */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ dictRedisObjectDestructor /* val destructor */ }; /* Db->expires */ static dictType keyptrDictType = { - dictObjHash, /* hash function */ + dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictObjKeyCompare, /* key compare */ - dictRedisObjectDestructor, /* key destructor */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ NULL /* val destructor */ }; @@ -1385,7 +1421,7 @@ void backgroundRewriteDoneHandler(int statloc) { /* If append only is actually enabled... */ close(server.appendfd); server.appendfd = fd; - fsync(fd); + if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd); server.appendseldb = -1; /* Make sure it will issue SELECT */ redisLog(REDIS_NOTICE,"The new append only file was selected for future appends."); } else { @@ -1431,6 +1467,19 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD * in objects at every object access, and accuracy is not needed. * To access a global var is faster than calling time(NULL) */ server.unixtime = time(NULL); + /* We have just 21 bits per object for LRU information. + * So we use an (eventually wrapping) LRU clock with minutes resolution. + * + * When we need to select what object to swap, we compute the minimum + * time distance between the current lruclock and the object last access + * lruclock info. Even if clocks will wrap on overflow, there is + * the interesting property that we are sure that at least + * ABS(A-B) minutes passed between current time and timestamp B. + * + * This is not precise but we don't need at all precision, but just + * something statistically reasonable. + */ + server.lruclock = (time(NULL)/60)&((1<<21)-1); /* We received a SIGTERM, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ @@ -1529,7 +1578,11 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD if ((de = dictGetRandomKey(db->expires)) == NULL) break; t = (time_t) dictGetEntryVal(de); if (now > t) { - deleteKey(db,dictGetEntryKey(de)); + sds key = dictGetEntryKey(de); + robj *keyobj = createStringObject(key,sdslen(key)); + + dbDelete(db,keyobj); + decrRefCount(keyobj); expired++; server.stat_expiredkeys++; } @@ -1685,6 +1738,7 @@ static void initServerConfig() { server.daemonize = 0; server.appendonly = 0; server.appendfsync = APPENDFSYNC_EVERYSEC; + server.no_appendfsync_on_rewrite = 0; server.lastfsync = time(NULL); server.appendfd = -1; server.appendseldb = -1; /* Make sure the first time will not match */ @@ -1706,6 +1760,8 @@ static void initServerConfig() { server.vm_blocked_clients = 0; server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES; server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE; + server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES; + server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE; server.shutdown_asap = 0; resetServerSaveParams(); @@ -1941,6 +1997,11 @@ static void loadServerConfig(char *filename) { } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) { zfree(server.appendfilename); server.appendfilename = zstrdup(argv[1]); + } else if (!strcasecmp(argv[0],"no-appendfsync-on-rewrite") + && argc == 2) { + if ((server.no_appendfsync_on_rewrite= yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) { if (!strcasecmp(argv[1],"no")) { server.appendfsync = APPENDFSYNC_NO; @@ -1979,6 +2040,10 @@ static void loadServerConfig(char *filename) { server.hash_max_zipmap_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2){ server.hash_max_zipmap_value = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){ + server.list_max_ziplist_entries = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2){ + server.list_max_ziplist_value = memtoll(argv[1], NULL); } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -2247,13 +2312,29 @@ static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int } } +static int qsortRedisCommands(const void *r1, const void *r2) { + return strcasecmp( + ((struct redisCommand*)r1)->name, + ((struct redisCommand*)r2)->name); +} + +static void sortCommandTable() { + /* Copy and sort the read-only version of the command table */ + commandTable = (struct redisCommand*)malloc(sizeof(readonlyCommandTable)); + memcpy(commandTable,readonlyCommandTable,sizeof(readonlyCommandTable)); + qsort(commandTable, + sizeof(readonlyCommandTable)/sizeof(struct redisCommand), + sizeof(struct redisCommand),qsortRedisCommands); +} + static struct redisCommand *lookupCommand(char *name) { - int j = 0; - while(cmdTable[j].name != NULL) { - if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j]; - j++; - } - return NULL; + struct redisCommand tmp = {name,NULL,0,0,NULL,0,0,0}; + return bsearch( + &tmp, + commandTable, + sizeof(readonlyCommandTable)/sizeof(struct redisCommand), + sizeof(struct redisCommand), + qsortRedisCommands); } /* resetClient prepare the client to process the next command */ @@ -2856,6 +2937,12 @@ static void addReplyBulk(redisClient *c, robj *obj) { addReply(c,shared.crlf); } +static void addReplyBulkSds(redisClient *c, sds s) { + robj *o = createStringObject(s, sdslen(s)); + addReplyBulk(c,o); + decrRefCount(o); +} + /* In the CONFIG command we need to add vanilla C string as bulk replies */ static void addReplyBulkCString(redisClient *c, char *s) { if (s == NULL) { @@ -2915,12 +3002,9 @@ static robj *createObject(int type, void *ptr) { listDelNode(server.objfreelist,head); if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); } else { - if (server.vm_enabled) { + if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); - o = zmalloc(sizeof(*o)); - } else { - o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM)); - } + o = zmalloc(sizeof(*o)); } o->type = type; o->encoding = REDIS_ENCODING_RAW; @@ -2928,10 +3012,10 @@ static robj *createObject(int type, void *ptr) { o->refcount = 1; if (server.vm_enabled) { /* Note that this code may run in the context of an I/O thread - * and accessing to server.unixtime in theory is an error + * and accessing server.lruclock in theory is an error * (no locks). But in practice this is safe, and even if we read - * garbage Redis will not fail, as it's just a statistical info */ - o->vm.atime = server.unixtime; + * garbage Redis will not fail. */ + o->lru = server.lruclock; o->storage = REDIS_VM_MEMORY; } return o; @@ -2965,14 +3049,24 @@ static robj *dupStringObject(robj *o) { static robj *createListObject(void) { list *l = listCreate(); - + robj *o = createObject(REDIS_LIST,l); listSetFreeMethod(l,decrRefCount); - return createObject(REDIS_LIST,l); + o->encoding = REDIS_ENCODING_LIST; + return o; +} + +static robj *createZiplistObject(void) { + unsigned char *zl = ziplistNew(); + robj *o = createObject(REDIS_LIST,zl); + o->encoding = REDIS_ENCODING_ZIPLIST; + return o; } static robj *createSetObject(void) { dict *d = dictCreate(&setDictType,NULL); - return createObject(REDIS_SET,d); + robj *o = createObject(REDIS_SET,d); + o->encoding = REDIS_ENCODING_HT; + return o; } static robj *createHashObject(void) { @@ -3000,7 +3094,16 @@ static void freeStringObject(robj *o) { } static void freeListObject(robj *o) { - listRelease((list*) o->ptr); + switch (o->encoding) { + case REDIS_ENCODING_LIST: + listRelease((list*) o->ptr); + break; + case REDIS_ENCODING_ZIPLIST: + zfree(o->ptr); + break; + default: + redisPanic("Unknown list encoding type"); + } } static void freeSetObject(robj *o) { @@ -3036,28 +3139,31 @@ static void incrRefCount(robj *o) { static void decrRefCount(void *obj) { robj *o = obj; - if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0"); - /* Object is a key of a swapped out value, or in the process of being - * loaded. */ + /* Object is a swapped out value, or in the process of being loaded. */ if (server.vm_enabled && (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING)) { - if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(obj); - redisAssert(o->type == REDIS_STRING); - freeStringObject(o); - vmMarkPagesFree(o->vm.page,o->vm.usedpages); - pthread_mutex_lock(&server.obj_freelist_mutex); - if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX || - !listAddNodeHead(server.objfreelist,o)) - zfree(o); - pthread_mutex_unlock(&server.obj_freelist_mutex); + vmpointer *vp = obj; + if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(o); + vmMarkPagesFree(vp->page,vp->usedpages); server.vm_stats_swapped_objects--; + zfree(vp); return; } - /* Object is in memory, or in the process of being swapped out. */ + + if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0"); + /* Object is in memory, or in the process of being swapped out. + * + * If the object is being swapped out, abort the operation on + * decrRefCount even if the refcount does not drop to 0: the object + * is referenced at least two times, as value of the key AND as + * job->val in the iojob. So if we don't invalidate the iojob, when it is + * done but the relevant key was removed in the meantime, the + * complete jobs handler will not find the key about the job and the + * assert will fail. */ + if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING) + vmCancelThreadedIOJob(o); if (--(o->refcount) == 0) { - if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING) - vmCancelThreadedIOJob(obj); switch(o->type) { case REDIS_STRING: freeStringObject(o); break; case REDIS_LIST: freeListObject(o); break; @@ -3074,64 +3180,6 @@ static void decrRefCount(void *obj) { } } -static robj *lookupKey(redisDb *db, robj *key) { - dictEntry *de = dictFind(db->dict,key); - if (de) { - robj *key = dictGetEntryKey(de); - robj *val = dictGetEntryVal(de); - - if (server.vm_enabled) { - if (key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) - { - /* If we were swapping the object out, stop it, this key - * was requested. */ - if (key->storage == REDIS_VM_SWAPPING) - vmCancelThreadedIOJob(key); - /* Update the access time of the key for the aging algorithm. */ - key->vm.atime = server.unixtime; - } else { - int notify = (key->storage == REDIS_VM_LOADING); - - /* Our value was swapped on disk. Bring it at home. */ - redisAssert(val == NULL); - val = vmLoadObject(key); - dictGetEntryVal(de) = val; - - /* Clients blocked by the VM subsystem may be waiting for - * this key... */ - if (notify) handleClientsBlockedOnSwappedKey(db,key); - } - } - return val; - } else { - return NULL; - } -} - -static robj *lookupKeyRead(redisDb *db, robj *key) { - expireIfNeeded(db,key); - return lookupKey(db,key); -} - -static robj *lookupKeyWrite(redisDb *db, robj *key) { - deleteIfVolatile(db,key); - touchWatchedKey(db,key); - return lookupKey(db,key); -} - -static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) { - robj *o = lookupKeyRead(c->db, key); - if (!o) addReply(c,reply); - return o; -} - -static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) { - robj *o = lookupKeyWrite(c->db, key); - if (!o) addReply(c,reply); - return o; -} - static int checkType(redisClient *c, robj *o, int type) { if (o->type != type) { addReply(c,shared.wrongtypeerr); @@ -3140,21 +3188,6 @@ static int checkType(redisClient *c, robj *o, int type) { return 0; } -static int deleteKey(redisDb *db, robj *key) { - int retval; - - /* We need to protect key from destruction: after the first dictDelete() - * it may happen that 'key' is no longer valid if we don't increment - * it's count. This may happen when we get the object reference directly - * from the hash table with dictRandomKey() or dict iterators */ - incrRefCount(key); - if (dictSize(db->expires)) dictDelete(db->expires,key); - retval = dictDelete(db->dict,key); - decrRefCount(key); - - return retval == DICT_OK; -} - /* Check if the nul-terminated string 's' can be represented by a long * (that is, is a number that fits into long without any other space or * character before or after the digits). @@ -3374,6 +3407,134 @@ static int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const return REDIS_OK; } +/* =========================== Keyspace access API ========================== */ + +static robj *lookupKey(redisDb *db, robj *key) { + dictEntry *de = dictFind(db->dict,key->ptr); + if (de) { + robj *val = dictGetEntryVal(de); + + if (server.vm_enabled) { + if (val->storage == REDIS_VM_MEMORY || + val->storage == REDIS_VM_SWAPPING) + { + /* If we were swapping the object out, cancel the operation */ + if (val->storage == REDIS_VM_SWAPPING) + vmCancelThreadedIOJob(val); + /* Update the access time for the aging algorithm. */ + val->lru = server.lruclock; + } else { + int notify = (val->storage == REDIS_VM_LOADING); + + /* Our value was swapped on disk. Bring it at home. */ + redisAssert(val->type == REDIS_VMPOINTER); + val = vmLoadObject(val); + dictGetEntryVal(de) = val; + + /* Clients blocked by the VM subsystem may be waiting for + * this key... */ + if (notify) handleClientsBlockedOnSwappedKey(db,key); + } + } + return val; + } else { + return NULL; + } +} + +static robj *lookupKeyRead(redisDb *db, robj *key) { + expireIfNeeded(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyWrite(redisDb *db, robj *key) { + deleteIfVolatile(db,key); + touchWatchedKey(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) { + robj *o = lookupKeyRead(c->db, key); + if (!o) addReply(c,reply); + return o; +} + +static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) { + robj *o = lookupKeyWrite(c->db, key); + if (!o) addReply(c,reply); + return o; +} + +/* Add the key to the DB. If the key already exists REDIS_ERR is returned, + * otherwise REDIS_OK is returned, and the caller should increment the + * refcount of 'val'. */ +static int dbAdd(redisDb *db, robj *key, robj *val) { + /* Perform a lookup before adding the key, as we need to copy the + * key value. */ + if (dictFind(db->dict, key->ptr) != NULL) { + return REDIS_ERR; + } else { + sds copy = sdsdup(key->ptr); + dictAdd(db->dict, copy, val); + return REDIS_OK; + } +} + +/* If the key does not exist, this is just like dbAdd(). Otherwise + * the value associated to the key is replaced with the new one. + * + * On update (key already existed) 0 is returned. Otherwise 1. */ +static int dbReplace(redisDb *db, robj *key, robj *val) { + if (dictFind(db->dict,key->ptr) == NULL) { + sds copy = sdsdup(key->ptr); + dictAdd(db->dict, copy, val); + return 1; + } else { + dictReplace(db->dict, key->ptr, val); + return 0; + } +} + +static int dbExists(redisDb *db, robj *key) { + return dictFind(db->dict,key->ptr) != NULL; +} + +/* Return a random key, in form of a Redis object. + * If there are no keys, NULL is returned. + * + * The function makes sure to return keys not already expired. */ +static robj *dbRandomKey(redisDb *db) { + struct dictEntry *de; + + while(1) { + sds key; + robj *keyobj; + + de = dictGetRandomKey(db->dict); + if (de == NULL) return NULL; + + key = dictGetEntryKey(de); + keyobj = createStringObject(key,sdslen(key)); + if (dictFind(db->expires,key)) { + if (expireIfNeeded(db,keyobj)) { + decrRefCount(keyobj); + continue; /* search for another key. This expired. */ + } + } + return keyobj; + } +} + +/* Delete a key, value, and associated expiration entry if any, from the DB */ +static int dbDelete(redisDb *db, robj *key) { + int retval; + + if (dictSize(db->expires)) dictDelete(db->expires,key->ptr); + retval = dictDelete(db->dict,key->ptr); + + return retval == DICT_OK; +} + /*============================ RDB saving/loading =========================== */ static int rdbSaveType(FILE *fp, unsigned char type) { @@ -3516,38 +3677,32 @@ static int rdbSaveRawString(FILE *fp, unsigned char *s, size_t len) { return 0; } +/* Save a long long value as either an encoded string or a string. */ +static int rdbSaveLongLongAsStringObject(FILE *fp, long long value) { + unsigned char buf[32]; + int enclen = rdbEncodeInteger(value,buf); + if (enclen > 0) { + if (fwrite(buf,enclen,1,fp) == 0) return -1; + } else { + /* Encode as string */ + enclen = ll2string((char*)buf,32,value); + redisAssert(enclen < 32); + if (rdbSaveLen(fp,enclen) == -1) return -1; + if (fwrite(buf,enclen,1,fp) == 0) return -1; + } + return 0; +} + /* Like rdbSaveStringObjectRaw() but handle encoded objects */ static int rdbSaveStringObject(FILE *fp, robj *obj) { - int retval; - /* Avoid to decode the object, then encode it again, if the * object is alrady integer encoded. */ if (obj->encoding == REDIS_ENCODING_INT) { - long val = (long) obj->ptr; - unsigned char buf[5]; - int enclen; - - if ((enclen = rdbEncodeInteger(val,buf)) > 0) { - if (fwrite(buf,enclen,1,fp) == 0) return -1; - return 0; - } - /* otherwise... fall throught and continue with the usual - * code path. */ - } - - /* Avoid incr/decr ref count business when possible. - * This plays well with copy-on-write given that we are probably - * in a child process (BGSAVE). Also this makes sure key objects - * of swapped objects are not incRefCount-ed (an assert does not allow - * this in order to avoid bugs) */ - if (obj->encoding != REDIS_ENCODING_RAW) { - obj = getDecodedObject(obj); - retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); - decrRefCount(obj); + return rdbSaveLongLongAsStringObject(fp,(long)obj->ptr); } else { - retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); + redisAssert(obj->encoding == REDIS_ENCODING_RAW); + return rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); } - return retval; } /* Save a double value. Doubles are saved as strings prefixed by an unsigned @@ -3600,16 +3755,37 @@ static int rdbSaveObject(FILE *fp, robj *o) { if (rdbSaveStringObject(fp,o) == -1) return -1; } else if (o->type == REDIS_LIST) { /* Save a list value */ - list *list = o->ptr; - listIter li; - listNode *ln; - - if (rdbSaveLen(fp,listLength(list)) == -1) return -1; - listRewind(list,&li); - while((ln = listNext(&li))) { - robj *eleobj = listNodeValue(ln); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + if (rdbSaveLen(fp,ziplistLen(o->ptr)) == -1) return -1; + p = ziplistIndex(o->ptr,0); + while(ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + if (rdbSaveRawString(fp,vstr,vlen) == -1) + return -1; + } else { + if (rdbSaveLongLongAsStringObject(fp,vlong) == -1) + return -1; + } + p = ziplistNext(o->ptr,p); + } + } else if (o->encoding == REDIS_ENCODING_LIST) { + list *list = o->ptr; + listIter li; + listNode *ln; - if (rdbSaveStringObject(fp,eleobj) == -1) return -1; + if (rdbSaveLen(fp,listLength(list)) == -1) return -1; + listRewind(list,&li); + while((ln = listNext(&li))) { + robj *eleobj = listNodeValue(ln); + if (rdbSaveStringObject(fp,eleobj) == -1) return -1; + } + } else { + redisPanic("Unknown list encoding"); } } else if (o->type == REDIS_SET) { /* Save a set value */ @@ -3728,9 +3904,12 @@ static int rdbSave(char *filename) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - robj *key = dictGetEntryKey(de); - robj *o = dictGetEntryVal(de); - time_t expiretime = getExpire(db,key); + sds keystr = dictGetEntryKey(de); + robj key, *o = dictGetEntryVal(de); + time_t expiretime; + + initStaticStringObject(key,keystr); + expiretime = getExpire(db,&key); /* Save the expire time */ if (expiretime != -1) { @@ -3741,20 +3920,20 @@ static int rdbSave(char *filename) { } /* Save the key and associated value. This requires special * handling if the value is swapped out. */ - if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) { + if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || + o->storage == REDIS_VM_SWAPPING) { /* Save type, key, value */ if (rdbSaveType(fp,o->type) == -1) goto werr; - if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,o) == -1) goto werr; } else { /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ robj *po; /* Get a preview of the object in memory */ - po = vmPreviewObject(key); + po = vmPreviewObject(o); /* Save type, key, value */ - if (rdbSaveType(fp,key->vtype) == -1) goto werr; - if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (rdbSaveType(fp,po->type) == -1) goto werr; + if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,po) == -1) goto werr; /* Remove the loaded object from memory */ decrRefCount(po); @@ -3976,34 +4155,59 @@ static int rdbLoadDoubleValue(FILE *fp, double *val) { /* Load a Redis object of the specified type from the specified file. * On success a newly allocated object is returned, otherwise NULL. */ static robj *rdbLoadObject(int type, FILE *fp) { - robj *o; + robj *o, *ele, *dec; + size_t len; redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp)); if (type == REDIS_STRING) { /* Read string value */ if ((o = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; o = tryObjectEncoding(o); - } else if (type == REDIS_LIST || type == REDIS_SET) { - /* Read list/set value */ - uint32_t listlen; + } else if (type == REDIS_LIST) { + /* Read list value */ + if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; + + /* Use a real list when there are too many entries */ + if (len > server.list_max_ziplist_entries) { + o = createListObject(); + } else { + o = createZiplistObject(); + } - if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; - o = (type == REDIS_LIST) ? createListObject() : createSetObject(); + /* Load every single element of the list */ + while(len--) { + if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + + /* If we are using a ziplist and the value is too big, convert + * the object to a real list. */ + if (o->encoding == REDIS_ENCODING_ZIPLIST && + ele->encoding == REDIS_ENCODING_RAW && + sdslen(ele->ptr) > server.list_max_ziplist_value) + listTypeConvert(o,REDIS_ENCODING_LIST); + + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + dec = getDecodedObject(ele); + o->ptr = ziplistPush(o->ptr,dec->ptr,sdslen(dec->ptr),REDIS_TAIL); + decrRefCount(dec); + decrRefCount(ele); + } else { + ele = tryObjectEncoding(ele); + listAddNodeTail(o->ptr,ele); + } + } + } else if (type == REDIS_SET) { + /* Read list/set value */ + if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; + o = createSetObject(); /* It's faster to expand the dict to the right size asap in order * to avoid rehashing */ - if (type == REDIS_SET && listlen > DICT_HT_INITIAL_SIZE) - dictExpand(o->ptr,listlen); + if (len > DICT_HT_INITIAL_SIZE) + dictExpand(o->ptr,len); /* Load every single element of the list/set */ - while(listlen--) { - robj *ele; - + while(len--) { if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; ele = tryObjectEncoding(ele); - if (type == REDIS_LIST) { - listAddNodeTail((list*)o->ptr,ele); - } else { - dictAdd((dict*)o->ptr,ele,NULL); - } + dictAdd((dict*)o->ptr,ele,NULL); } } else if (type == REDIS_ZSET) { /* Read list/set value */ @@ -4038,8 +4242,8 @@ static robj *rdbLoadObject(int type, FILE *fp) { while(hashlen--) { robj *key, *val; - if ((key = rdbLoadStringObject(fp)) == NULL) return NULL; - if ((val = rdbLoadStringObject(fp)) == NULL) return NULL; + if ((key = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + if ((val = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; /* If we are using a zipmap and there are too big values * the object is converted to real hash table encoding. */ if (o->encoding != REDIS_ENCODING_HT && @@ -4074,11 +4278,9 @@ static int rdbLoad(char *filename) { uint32_t dbid; int type, retval, rdbver; int swap_all_values = 0; - dict *d = server.db[0].dict; redisDb *db = server.db+0; char buf[1024]; time_t expiretime, now = time(NULL); - long long loadedkeys = 0; fp = fopen(filename,"r"); if (!fp) return REDIS_ERR; @@ -4097,6 +4299,7 @@ static int rdbLoad(char *filename) { } while(1) { robj *key, *val; + int force_swapout; expiretime = -1; /* Read type. */ @@ -4116,7 +4319,6 @@ static int rdbLoad(char *filename) { exit(1); } db = server.db+dbid; - d = db->dict; continue; } /* Read key */ @@ -4130,12 +4332,11 @@ static int rdbLoad(char *filename) { continue; } /* Add the new object in the hash table */ - retval = dictAdd(d,key,val); - if (retval == DICT_ERR) { + retval = dbAdd(db,key,val); + if (retval == REDIS_ERR) { redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", key->ptr); exit(1); } - loadedkeys++; /* Set the expire time if needed */ if (expiretime != -1) setExpire(db,key,expiretime); @@ -4147,23 +4348,30 @@ static int rdbLoad(char *filename) { * to random sampling, otherwise we may try to swap already * swapped keys. */ if (swap_all_values) { - dictEntry *de = dictFind(d,key); + dictEntry *de = dictFind(db->dict,key->ptr); /* de may be NULL since the key already expired */ if (de) { - key = dictGetEntryKey(de); + vmpointer *vp; val = dictGetEntryVal(de); - if (vmSwapObjectBlocking(key,val) == REDIS_OK) { - dictGetEntryVal(de) = NULL; - } + if (val->refcount == 1 && + (vp = vmSwapObjectBlocking(val)) != NULL) + dictGetEntryVal(de) = vp; } + decrRefCount(key); continue; } + decrRefCount(key); + + /* Flush data on disk once 32 MB of additional RAM are used... */ + force_swapout = 0; + if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) + force_swapout = 1; /* If we have still some hope of having some value fitting memory * then we try random sampling. */ - if (!swap_all_values && server.vm_enabled && (loadedkeys % 5000) == 0) { + if (!swap_all_values && server.vm_enabled && force_swapout) { while (zmalloc_used_memory() > server.vm_max_memory) { if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } @@ -4193,7 +4401,7 @@ static int prepareForShutdown() { } if (server.appendonly) { /* Append only file: fsync() the AOF and exit */ - fsync(server.appendfd); + aof_fsync(server.appendfd); if (server.vm_enabled) unlink(server.vm_swap_file); } else { /* Snapshotting. Perform a SYNC SAVE and exit */ @@ -4252,23 +4460,16 @@ static void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj touchWatchedKey(c->db,key); if (nx) deleteIfVolatile(c->db,key); - retval = dictAdd(c->db->dict,key,val); - if (retval == DICT_ERR) { + retval = dbAdd(c->db,key,val); + if (retval == REDIS_ERR) { if (!nx) { - /* If the key is about a swapped value, we want a new key object - * to overwrite the old. So we delete the old key in the database. - * This will also make sure that swap pages about the old object - * will be marked as free. */ - if (server.vm_enabled && deleteIfSwapped(c->db,key)) - incrRefCount(key); - dictReplace(c->db->dict,key,val); + dbReplace(c->db,key,val); incrRefCount(val); } else { addReply(c,shared.czero); return; } } else { - incrRefCount(key); incrRefCount(val); } server.dirty++; @@ -4310,11 +4511,7 @@ static void getCommand(redisClient *c) { static void getsetCommand(redisClient *c) { if (getGenericCommand(c) == REDIS_ERR) return; - if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) { - dictReplace(c->db->dict,c->argv[1],c->argv[2]); - } else { - incrRefCount(c->argv[1]); - } + dbReplace(c->db,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); server.dirty++; removeExpire(c->db,c->argv[1]); @@ -4360,17 +4557,9 @@ static void msetGenericCommand(redisClient *c, int nx) { } for (j = 1; j < c->argc; j += 2) { - int retval; - c->argv[j+1] = tryObjectEncoding(c->argv[j+1]); - retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]); - if (retval == DICT_ERR) { - dictReplace(c->db->dict,c->argv[j],c->argv[j+1]); - incrRefCount(c->argv[j+1]); - } else { - incrRefCount(c->argv[j]); - incrRefCount(c->argv[j+1]); - } + dbReplace(c->db,c->argv[j],c->argv[j+1]); + incrRefCount(c->argv[j+1]); removeExpire(c->db,c->argv[j]); } server.dirty += (c->argc-1)/2; @@ -4387,7 +4576,6 @@ static void msetnxCommand(redisClient *c) { static void incrDecrCommand(redisClient *c, long long incr) { long long value; - int retval; robj *o; o = lookupKeyWrite(c->db,c->argv[1]); @@ -4396,13 +4584,7 @@ static void incrDecrCommand(redisClient *c, long long incr) { value += incr; o = createStringObjectFromLongLong(value); - retval = dictAdd(c->db->dict,c->argv[1],o); - if (retval == DICT_ERR) { - dictReplace(c->db->dict,c->argv[1],o); - removeExpire(c->db,c->argv[1]); - } else { - incrRefCount(c->argv[1]); - } + dbReplace(c->db,c->argv[1],o); server.dirty++; addReply(c,shared.colon); addReply(c,o); @@ -4439,17 +4621,10 @@ static void appendCommand(redisClient *c) { o = lookupKeyWrite(c->db,c->argv[1]); if (o == NULL) { /* Create the key */ - retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); - incrRefCount(c->argv[1]); + retval = dbAdd(c->db,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); totlen = stringObjectLen(c->argv[2]); } else { - dictEntry *de; - - de = dictFind(c->db->dict,c->argv[1]); - assert(de != NULL); - - o = dictGetEntryVal(de); if (o->type != REDIS_STRING) { addReply(c,shared.wrongtypeerr); return; @@ -4461,7 +4636,7 @@ static void appendCommand(redisClient *c) { o = createStringObject(decoded->ptr, sdslen(decoded->ptr)); decrRefCount(decoded); - dictReplace(c->db->dict,c->argv[1],o); + dbReplace(c->db,c->argv[1],o); } /* APPEND! */ if (c->argv[2]->encoding == REDIS_ENCODING_RAW) { @@ -4520,7 +4695,7 @@ static void delCommand(redisClient *c) { int deleted = 0, j; for (j = 1; j < c->argc; j++) { - if (deleteKey(c->db,c->argv[j])) { + if (dbDelete(c->db,c->argv[j])) { touchWatchedKey(c->db,c->argv[j]); server.dirty++; deleted++; @@ -4531,7 +4706,7 @@ static void delCommand(redisClient *c) { static void existsCommand(redisClient *c) { expireIfNeeded(c->db,c->argv[1]); - if (dictFind(c->db->dict,c->argv[1])) { + if (dbExists(c->db,c->argv[1])) { addReply(c, shared.cone); } else { addReply(c, shared.czero); @@ -4549,27 +4724,15 @@ static void selectCommand(redisClient *c) { } static void randomkeyCommand(redisClient *c) { - dictEntry *de; robj *key; - while(1) { - de = dictGetRandomKey(c->db->dict); - if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break; - } - - if (de == NULL) { + if ((key = dbRandomKey(c->db)) == NULL) { addReply(c,shared.nullbulk); return; } - key = dictGetEntryKey(de); - if (server.vm_enabled) { - key = dupStringObject(key); - addReplyBulk(c,key); - decrRefCount(key); - } else { - addReplyBulk(c,key); - } + addReplyBulk(c,key); + decrRefCount(key); } static void keysCommand(redisClient *c) { @@ -4584,15 +4747,17 @@ static void keysCommand(redisClient *c) { addReply(c,lenobj); decrRefCount(lenobj); while((de = dictNext(di)) != NULL) { - robj *keyobj = dictGetEntryKey(de); + sds key = dictGetEntryKey(de); + robj *keyobj; - sds key = keyobj->ptr; if ((pattern[0] == '*' && pattern[1] == '\0') || stringmatchlen(pattern,plen,key,sdslen(key),0)) { + keyobj = createStringObject(key,sdslen(key)); if (expireIfNeeded(c->db,keyobj) == 0) { addReplyBulk(c,keyobj); numkeys++; } + decrRefCount(keyobj); } } dictReleaseIterator(di); @@ -4675,17 +4840,15 @@ static void renameGenericCommand(redisClient *c, int nx) { incrRefCount(o); deleteIfVolatile(c->db,c->argv[2]); - if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) { + if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) { if (nx) { decrRefCount(o); addReply(c,shared.czero); return; } - dictReplace(c->db->dict,c->argv[2],o); - } else { - incrRefCount(c->argv[2]); + dbReplace(c->db,c->argv[2],o); } - deleteKey(c->db,c->argv[1]); + dbDelete(c->db,c->argv[1]); touchWatchedKey(c->db,c->argv[2]); server.dirty++; addReply(c,nx ? shared.cone : shared.ok); @@ -4730,40 +4893,266 @@ static void moveCommand(redisClient *c) { /* Try to add the element to the target DB */ deleteIfVolatile(dst,c->argv[1]); - if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) { + if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) { addReply(c,shared.czero); return; } - incrRefCount(c->argv[1]); incrRefCount(o); /* OK! key moved, free the entry in the source DB */ - deleteKey(src,c->argv[1]); + dbDelete(src,c->argv[1]); server.dirty++; addReply(c,shared.cone); } /* =================================== Lists ================================ */ -static void pushGenericCommand(redisClient *c, int where) { - robj *lobj; - list *list; - lobj = lookupKeyWrite(c->db,c->argv[1]); + +/* Check the argument length to see if it requires us to convert the ziplist + * to a real list. Only check raw-encoded objects because integer encoded + * objects are never too long. */ +static void listTypeTryConversion(robj *subject, robj *value) { + if (subject->encoding != REDIS_ENCODING_ZIPLIST) return; + if (value->encoding == REDIS_ENCODING_RAW && + sdslen(value->ptr) > server.list_max_ziplist_value) + listTypeConvert(subject,REDIS_ENCODING_LIST); +} + +static void listTypePush(robj *subject, robj *value, int where) { + /* Check if we need to convert the ziplist */ + listTypeTryConversion(subject,value); + if (subject->encoding == REDIS_ENCODING_ZIPLIST && + ziplistLen(subject->ptr) > server.list_max_ziplist_entries) + listTypeConvert(subject,REDIS_ENCODING_LIST); + + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL; + value = getDecodedObject(value); + subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos); + decrRefCount(value); + } else if (subject->encoding == REDIS_ENCODING_LIST) { + if (where == REDIS_HEAD) { + listAddNodeHead(subject->ptr,value); + } else { + listAddNodeTail(subject->ptr,value); + } + incrRefCount(value); + } else { + redisPanic("Unknown list encoding"); + } +} + +static robj *listTypePop(robj *subject, int where) { + robj *value = NULL; + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + int pos = (where == REDIS_HEAD) ? 0 : -1; + p = ziplistIndex(subject->ptr,pos); + if (ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + /* We only need to delete an element when it exists */ + subject->ptr = ziplistDelete(subject->ptr,&p); + } + } else if (subject->encoding == REDIS_ENCODING_LIST) { + list *list = subject->ptr; + listNode *ln; + if (where == REDIS_HEAD) { + ln = listFirst(list); + } else { + ln = listLast(list); + } + if (ln != NULL) { + value = listNodeValue(ln); + incrRefCount(value); + listDelNode(list,ln); + } + } else { + redisPanic("Unknown list encoding"); + } + return value; +} + +static unsigned long listTypeLength(robj *subject) { + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + return ziplistLen(subject->ptr); + } else if (subject->encoding == REDIS_ENCODING_LIST) { + return listLength((list*)subject->ptr); + } else { + redisPanic("Unknown list encoding"); + } +} + +/* Structure to hold set iteration abstraction. */ +typedef struct { + robj *subject; + unsigned char encoding; + unsigned char direction; /* Iteration direction */ + unsigned char *zi; + listNode *ln; +} listTypeIterator; + +/* Structure for an entry while iterating over a list. */ +typedef struct { + listTypeIterator *li; + unsigned char *zi; /* Entry in ziplist */ + listNode *ln; /* Entry in linked list */ +} listTypeEntry; + +/* Initialize an iterator at the specified index. */ +static listTypeIterator *listTypeInitIterator(robj *subject, int index, unsigned char direction) { + listTypeIterator *li = zmalloc(sizeof(listTypeIterator)); + li->subject = subject; + li->encoding = subject->encoding; + li->direction = direction; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + li->zi = ziplistIndex(subject->ptr,index); + } else if (li->encoding == REDIS_ENCODING_LIST) { + li->ln = listIndex(subject->ptr,index); + } else { + redisPanic("Unknown list encoding"); + } + return li; +} + +/* Clean up the iterator. */ +static void listTypeReleaseIterator(listTypeIterator *li) { + zfree(li); +} + +/* Stores pointer to current the entry in the provided entry structure + * and advances the position of the iterator. Returns 1 when the current + * entry is in fact an entry, 0 otherwise. */ +static int listTypeNext(listTypeIterator *li, listTypeEntry *entry) { + /* Protect from converting when iterating */ + redisAssert(li->subject->encoding == li->encoding); + + entry->li = li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + entry->zi = li->zi; + if (entry->zi != NULL) { + if (li->direction == REDIS_TAIL) + li->zi = ziplistNext(li->subject->ptr,li->zi); + else + li->zi = ziplistPrev(li->subject->ptr,li->zi); + return 1; + } + } else if (li->encoding == REDIS_ENCODING_LIST) { + entry->ln = li->ln; + if (entry->ln != NULL) { + if (li->direction == REDIS_TAIL) + li->ln = li->ln->next; + else + li->ln = li->ln->prev; + return 1; + } + } else { + redisPanic("Unknown list encoding"); + } + return 0; +} + +/* Return entry or NULL at the current position of the iterator. */ +static robj *listTypeGet(listTypeEntry *entry) { + listTypeIterator *li = entry->li; + robj *value = NULL; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *vstr; + unsigned int vlen; + long long vlong; + redisAssert(entry->zi != NULL); + if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + } + } else if (li->encoding == REDIS_ENCODING_LIST) { + redisAssert(entry->ln != NULL); + value = listNodeValue(entry->ln); + incrRefCount(value); + } else { + redisPanic("Unknown list encoding"); + } + return value; +} + +/* Compare the given object with the entry at the current position. */ +static int listTypeEqual(listTypeEntry *entry, robj *o) { + listTypeIterator *li = entry->li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + redisAssert(o->encoding == REDIS_ENCODING_RAW); + return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr)); + } else if (li->encoding == REDIS_ENCODING_LIST) { + return equalStringObjects(o,listNodeValue(entry->ln)); + } else { + redisPanic("Unknown list encoding"); + } +} + +/* Delete the element pointed to. */ +static void listTypeDelete(listTypeEntry *entry) { + listTypeIterator *li = entry->li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p = entry->zi; + li->subject->ptr = ziplistDelete(li->subject->ptr,&p); + + /* Update position of the iterator depending on the direction */ + if (li->direction == REDIS_TAIL) + li->zi = p; + else + li->zi = ziplistPrev(li->subject->ptr,p); + } else if (entry->li->encoding == REDIS_ENCODING_LIST) { + listNode *next; + if (li->direction == REDIS_TAIL) + next = entry->ln->next; + else + next = entry->ln->prev; + listDelNode(li->subject->ptr,entry->ln); + li->ln = next; + } else { + redisPanic("Unknown list encoding"); + } +} + +static void listTypeConvert(robj *subject, int enc) { + listTypeIterator *li; + listTypeEntry entry; + redisAssert(subject->type == REDIS_LIST); + + if (enc == REDIS_ENCODING_LIST) { + list *l = listCreate(); + listSetFreeMethod(l,decrRefCount); + + /* listTypeGet returns a robj with incremented refcount */ + li = listTypeInitIterator(subject,0,REDIS_TAIL); + while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry)); + listTypeReleaseIterator(li); + + subject->encoding = REDIS_ENCODING_LIST; + zfree(subject->ptr); + subject->ptr = l; + } else { + redisPanic("Unsupported list conversion"); + } +} + +static void pushGenericCommand(redisClient *c, int where) { + robj *lobj = lookupKeyWrite(c->db,c->argv[1]); if (lobj == NULL) { if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) { addReply(c,shared.cone); return; } - lobj = createListObject(); - list = lobj->ptr; - if (where == REDIS_HEAD) { - listAddNodeHead(list,c->argv[2]); - } else { - listAddNodeTail(list,c->argv[2]); - } - dictAdd(c->db->dict,c->argv[1],lobj); - incrRefCount(c->argv[1]); - incrRefCount(c->argv[2]); + lobj = createZiplistObject(); + dbAdd(c->db,c->argv[1],lobj); } else { if (lobj->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); @@ -4773,16 +5162,10 @@ static void pushGenericCommand(redisClient *c, int where) { addReply(c,shared.cone); return; } - list = lobj->ptr; - if (where == REDIS_HEAD) { - listAddNodeHead(list,c->argv[2]); - } else { - listAddNodeTail(list,c->argv[2]); - } - incrRefCount(c->argv[2]); } + listTypePush(lobj,c->argv[2],where); + addReplyLongLong(c,listTypeLength(lobj)); server.dirty++; - addReplyLongLong(c,listLength(list)); } static void lpushCommand(redisClient *c) { @@ -4794,80 +5177,94 @@ static void rpushCommand(redisClient *c) { } static void llenCommand(redisClient *c) { - robj *o; - list *l; - - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_LIST)) return; - - l = o->ptr; - addReplyUlong(c,listLength(l)); + robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; + addReplyUlong(c,listTypeLength(o)); } static void lindexCommand(redisClient *c) { - robj *o; + robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; int index = atoi(c->argv[2]->ptr); - list *list; - listNode *ln; - - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; + robj *value = NULL; - ln = listIndex(list, index); - if (ln == NULL) { - addReply(c,shared.nullbulk); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + p = ziplistIndex(o->ptr,index); + if (ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + addReplyBulk(c,value); + decrRefCount(value); + } else { + addReply(c,shared.nullbulk); + } + } else if (o->encoding == REDIS_ENCODING_LIST) { + listNode *ln = listIndex(o->ptr,index); + if (ln != NULL) { + value = listNodeValue(ln); + addReplyBulk(c,value); + } else { + addReply(c,shared.nullbulk); + } } else { - robj *ele = listNodeValue(ln); - addReplyBulk(c,ele); + redisPanic("Unknown list encoding"); } } static void lsetCommand(redisClient *c) { - robj *o; + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; int index = atoi(c->argv[2]->ptr); - list *list; - listNode *ln; - - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - - ln = listIndex(list, index); - if (ln == NULL) { - addReply(c,shared.outofrangeerr); + robj *value = c->argv[3]; + + listTypeTryConversion(o,value); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p, *zl = o->ptr; + p = ziplistIndex(zl,index); + if (p == NULL) { + addReply(c,shared.outofrangeerr); + } else { + o->ptr = ziplistDelete(o->ptr,&p); + value = getDecodedObject(value); + o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr)); + decrRefCount(value); + addReply(c,shared.ok); + server.dirty++; + } + } else if (o->encoding == REDIS_ENCODING_LIST) { + listNode *ln = listIndex(o->ptr,index); + if (ln == NULL) { + addReply(c,shared.outofrangeerr); + } else { + decrRefCount((robj*)listNodeValue(ln)); + listNodeValue(ln) = value; + incrRefCount(value); + addReply(c,shared.ok); + server.dirty++; + } } else { - robj *ele = listNodeValue(ln); - - decrRefCount(ele); - listNodeValue(ln) = c->argv[3]; - incrRefCount(c->argv[3]); - addReply(c,shared.ok); - server.dirty++; + redisPanic("Unknown list encoding"); } } static void popGenericCommand(redisClient *c, int where) { - robj *o; - list *list; - listNode *ln; - - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; - if (where == REDIS_HEAD) - ln = listFirst(list); - else - ln = listLast(list); - - if (ln == NULL) { + robj *value = listTypePop(o,where); + if (value == NULL) { addReply(c,shared.nullbulk); } else { - robj *ele = listNodeValue(ln); - addReplyBulk(c,ele); - listDelNode(list,ln); - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); + addReplyBulk(c,value); + decrRefCount(value); + if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; } } @@ -4881,19 +5278,16 @@ static void rpopCommand(redisClient *c) { } static void lrangeCommand(redisClient *c) { - robj *o; + robj *o, *value; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); int llen; int rangelen, j; - list *list; - listNode *ln; - robj *ele; + listTypeEntry entry; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - llen = listLength(list); + llen = listTypeLength(o); /* convert negative indexes */ if (start < 0) start = llen+start; @@ -4911,13 +5305,15 @@ static void lrangeCommand(redisClient *c) { rangelen = (end-start)+1; /* Return the result in form of a multi-bulk reply */ - ln = listIndex(list, start); addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen)); + listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL); for (j = 0; j < rangelen; j++) { - ele = listNodeValue(ln); - addReplyBulk(c,ele); - ln = ln->next; + redisAssert(listTypeNext(li,&entry)); + value = listTypeGet(&entry); + addReplyBulk(c,value); + decrRefCount(value); } + listTypeReleaseIterator(li); } static void ltrimCommand(redisClient *c) { @@ -4931,8 +5327,7 @@ static void ltrimCommand(redisClient *c) { if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL || checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - llen = listLength(list); + llen = listTypeLength(o); /* convert negative indexes */ if (start < 0) start = llen+start; @@ -4952,49 +5347,63 @@ static void ltrimCommand(redisClient *c) { } /* Remove list elements to perform the trim */ - for (j = 0; j < ltrim; j++) { - ln = listFirst(list); - listDelNode(list,ln); - } - for (j = 0; j < rtrim; j++) { - ln = listLast(list); - listDelNode(list,ln); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + o->ptr = ziplistDeleteRange(o->ptr,0,ltrim); + o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim); + } else if (o->encoding == REDIS_ENCODING_LIST) { + list = o->ptr; + for (j = 0; j < ltrim; j++) { + ln = listFirst(list); + listDelNode(list,ln); + } + for (j = 0; j < rtrim; j++) { + ln = listLast(list); + listDelNode(list,ln); + } + } else { + redisPanic("Unknown list encoding"); } - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); + if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; addReply(c,shared.ok); } static void lremCommand(redisClient *c) { - robj *o; - list *list; - listNode *ln, *next; + robj *subject, *obj = c->argv[3]; int toremove = atoi(c->argv[2]->ptr); int removed = 0; - int fromtail = 0; + listTypeEntry entry; - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; + subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero); + if (subject == NULL || checkType(c,subject,REDIS_LIST)) return; + /* Make sure obj is raw when we're dealing with a ziplist */ + if (subject->encoding == REDIS_ENCODING_ZIPLIST) + obj = getDecodedObject(obj); + + listTypeIterator *li; if (toremove < 0) { toremove = -toremove; - fromtail = 1; + li = listTypeInitIterator(subject,-1,REDIS_HEAD); + } else { + li = listTypeInitIterator(subject,0,REDIS_TAIL); } - ln = fromtail ? list->tail : list->head; - while (ln) { - robj *ele = listNodeValue(ln); - next = fromtail ? ln->prev : ln->next; - if (equalStringObjects(ele,c->argv[3])) { - listDelNode(list,ln); + while (listTypeNext(li,&entry)) { + if (listTypeEqual(&entry,obj)) { + listTypeDelete(&entry); server.dirty++; removed++; if (toremove && removed == toremove) break; } - ln = next; } - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); + listTypeReleaseIterator(li); + + /* Clean up raw encoded object */ + if (subject->encoding == REDIS_ENCODING_ZIPLIST) + decrRefCount(obj); + + if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]); addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed)); } @@ -5014,52 +5423,134 @@ static void lremCommand(redisClient *c) { * as well. This command was originally proposed by Ezra Zygmuntowicz. */ static void rpoplpushcommand(redisClient *c) { - robj *sobj; - list *srclist; - listNode *ln; - + robj *sobj, *value; if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,sobj,REDIS_LIST)) return; - srclist = sobj->ptr; - ln = listLast(srclist); - if (ln == NULL) { + if (listTypeLength(sobj) == 0) { addReply(c,shared.nullbulk); } else { - robj *dobj = lookupKeyWrite(c->db,c->argv[2]); - robj *ele = listNodeValue(ln); - list *dstlist; + robj *dobj = lookupKeyWrite(c->db,c->argv[2]); + if (dobj && checkType(c,dobj,REDIS_LIST)) return; + value = listTypePop(sobj,REDIS_TAIL); + + /* Add the element to the target list (unless it's directly + * passed to some BLPOP-ing client */ + if (!handleClientsWaitingListPush(c,c->argv[2],value)) { + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(c->db,c->argv[2],dobj); + } + listTypePush(dobj,value,REDIS_HEAD); + } + + /* Send the element to the client as reply as well */ + addReplyBulk(c,value); + + /* listTypePop returns an object with its refcount incremented */ + decrRefCount(value); + + /* Delete the source list when it is empty */ + if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]); + server.dirty++; + } +} + +/* ==================================== Sets ================================ */ + +static int setTypeAdd(robj *subject, robj *value) { + if (subject->encoding == REDIS_ENCODING_HT) { + if (dictAdd(subject->ptr,value,NULL) == DICT_OK) { + incrRefCount(value); + return 1; + } + } else { + redisPanic("Unknown set encoding"); + } + return 0; +} + +static int setTypeRemove(robj *subject, robj *value) { + if (subject->encoding == REDIS_ENCODING_HT) { + if (dictDelete(subject->ptr,value) == DICT_OK) { + if (htNeedsResize(subject->ptr)) dictResize(subject->ptr); + return 1; + } + } else { + redisPanic("Unknown set encoding"); + } + return 0; +} + +static int setTypeIsMember(robj *subject, robj *value) { + if (subject->encoding == REDIS_ENCODING_HT) { + return dictFind((dict*)subject->ptr,value) != NULL; + } else { + redisPanic("Unknown set encoding"); + } +} + +/* Structure to hold set iteration abstraction. */ +typedef struct { + int encoding; + dictIterator *di; +} setIterator; + +static setIterator *setTypeInitIterator(robj *subject) { + setIterator *si = zmalloc(sizeof(setIterator)); + si->encoding = subject->encoding; + if (si->encoding == REDIS_ENCODING_HT) { + si->di = dictGetIterator(subject->ptr); + } else { + redisPanic("Unknown set encoding"); + } + return si; +} - if (dobj && dobj->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerr); - return; - } +static void setTypeReleaseIterator(setIterator *si) { + if (si->encoding == REDIS_ENCODING_HT) + dictReleaseIterator(si->di); + zfree(si); +} - /* Add the element to the target list (unless it's directly - * passed to some BLPOP-ing client */ - if (!handleClientsWaitingListPush(c,c->argv[2],ele)) { - if (dobj == NULL) { - /* Create the list if the key does not exist */ - dobj = createListObject(); - dictAdd(c->db->dict,c->argv[2],dobj); - incrRefCount(c->argv[2]); - } - dstlist = dobj->ptr; - listAddNodeHead(dstlist,ele); - incrRefCount(ele); +/* Move to the next entry in the set. Returns the object at the current + * position, or NULL when the end is reached. This object will have its + * refcount incremented, so the caller needs to take care of this. */ +static robj *setTypeNext(setIterator *si) { + robj *ret = NULL; + if (si->encoding == REDIS_ENCODING_HT) { + dictEntry *de = dictNext(si->di); + if (de != NULL) { + ret = dictGetEntryKey(de); + incrRefCount(ret); } + } + return ret; +} - /* Send the element to the client as reply as well */ - addReplyBulk(c,ele); - /* Finally remove the element from the source list */ - listDelNode(srclist,ln); - if (listLength(srclist) == 0) deleteKey(c->db,c->argv[1]); - server.dirty++; +/* Return random element from set. The returned object will always have + * an incremented refcount. */ +robj *setTypeRandomElement(robj *subject) { + robj *ret = NULL; + if (subject->encoding == REDIS_ENCODING_HT) { + dictEntry *de = dictGetRandomKey(subject->ptr); + ret = dictGetEntryKey(de); + incrRefCount(ret); + } else { + redisPanic("Unknown set encoding"); } + return ret; } -/* ==================================== Sets ================================ */ +static unsigned long setTypeSize(robj *subject) { + if (subject->encoding == REDIS_ENCODING_HT) { + return dictSize((dict*)subject->ptr); + } else { + redisPanic("Unknown set encoding"); + } +} static void saddCommand(redisClient *c) { robj *set; @@ -5067,16 +5558,14 @@ static void saddCommand(redisClient *c) { set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { set = createSetObject(); - dictAdd(c->db->dict,c->argv[1],set); - incrRefCount(c->argv[1]); + dbAdd(c->db,c->argv[1],set); } else { if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); return; } } - if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) { - incrRefCount(c->argv[2]); + if (setTypeAdd(set,c->argv[2])) { server.dirty++; addReply(c,shared.cone); } else { @@ -5090,10 +5579,9 @@ static void sremCommand(redisClient *c) { if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; - if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) { + if (setTypeRemove(set,c->argv[2])) { + if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; - if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]); addReply(c,shared.cone); } else { addReply(c,shared.czero); @@ -5118,22 +5606,20 @@ static void smoveCommand(redisClient *c) { return; } /* Remove the element from the source set */ - if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) { + if (!setTypeRemove(srcset,c->argv[3])) { /* Key not found in the src set! return zero */ addReply(c,shared.czero); return; } - if (dictSize((dict*)srcset->ptr) == 0 && srcset != dstset) - deleteKey(c->db,c->argv[1]); + if (setTypeSize(srcset) == 0 && srcset != dstset) + dbDelete(c->db,c->argv[1]); server.dirty++; /* Add the element to the destination set */ if (!dstset) { dstset = createSetObject(); - dictAdd(c->db->dict,c->argv[2],dstset); - incrRefCount(c->argv[2]); + dbAdd(c->db,c->argv[2],dstset); } - if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK) - incrRefCount(c->argv[3]); + setTypeAdd(dstset,c->argv[3]); addReply(c,shared.cone); } @@ -5143,7 +5629,7 @@ static void sismemberCommand(redisClient *c) { if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,set,REDIS_SET)) return; - if (dictFind(set->ptr,c->argv[2])) + if (setTypeIsMember(set,c->argv[2])) addReply(c,shared.cone); else addReply(c,shared.czero); @@ -5151,76 +5637,64 @@ static void sismemberCommand(redisClient *c) { static void scardCommand(redisClient *c) { robj *o; - dict *s; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_SET)) return; - s = o->ptr; - addReplyUlong(c,dictSize(s)); + addReplyUlong(c,setTypeSize(o)); } static void spopCommand(redisClient *c) { - robj *set; - dictEntry *de; + robj *set, *ele; if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - de = dictGetRandomKey(set->ptr); - if (de == NULL) { + ele = setTypeRandomElement(set); + if (ele == NULL) { addReply(c,shared.nullbulk); } else { - robj *ele = dictGetEntryKey(de); - + setTypeRemove(set,ele); addReplyBulk(c,ele); - dictDelete(set->ptr,ele); - if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]); + decrRefCount(ele); + if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; } } static void srandmemberCommand(redisClient *c) { - robj *set; - dictEntry *de; + robj *set, *ele; if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,set,REDIS_SET)) return; - de = dictGetRandomKey(set->ptr); - if (de == NULL) { + ele = setTypeRandomElement(set); + if (ele == NULL) { addReply(c,shared.nullbulk); } else { - robj *ele = dictGetEntryKey(de); - addReplyBulk(c,ele); + decrRefCount(ele); } } static int qsortCompareSetsByCardinality(const void *s1, const void *s2) { - dict **d1 = (void*) s1, **d2 = (void*) s2; - - return dictSize(*d1)-dictSize(*d2); + return setTypeSize(*(robj**)s1)-setTypeSize(*(robj**)s2); } -static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) { - dict **dv = zmalloc(sizeof(dict*)*setsnum); - dictIterator *di; - dictEntry *de; - robj *lenobj = NULL, *dstset = NULL; +static void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) { + robj **sets = zmalloc(sizeof(robj*)*setnum); + setIterator *si; + robj *ele, *lenobj = NULL, *dstset = NULL; unsigned long j, cardinality = 0; - for (j = 0; j < setsnum; j++) { - robj *setobj; - - setobj = dstkey ? - lookupKeyWrite(c->db,setskeys[j]) : - lookupKeyRead(c->db,setskeys[j]); + for (j = 0; j < setnum; j++) { + robj *setobj = dstkey ? + lookupKeyWrite(c->db,setkeys[j]) : + lookupKeyRead(c->db,setkeys[j]); if (!setobj) { - zfree(dv); + zfree(sets); if (dstkey) { - if (deleteKey(c->db,dstkey)) + if (dbDelete(c->db,dstkey)) server.dirty++; addReply(c,shared.czero); } else { @@ -5228,16 +5702,15 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long } return; } - if (setobj->type != REDIS_SET) { - zfree(dv); - addReply(c,shared.wrongtypeerr); + if (checkType(c,setobj,REDIS_SET)) { + zfree(sets); return; } - dv[j] = setobj->ptr; + sets[j] = setobj; } /* Sort sets from the smallest to largest, this will improve our * algorithm's performace */ - qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality); + qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality); /* The first thing we should output is the total number of elements... * since this is a multi-bulk write, but at this stage we don't know @@ -5257,34 +5730,31 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long /* Iterate all the elements of the first (smallest) set, and test * the element against all the other sets, if at least one set does * not include the element it is discarded */ - di = dictGetIterator(dv[0]); - - while((de = dictNext(di)) != NULL) { - robj *ele; - - for (j = 1; j < setsnum; j++) - if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break; - if (j != setsnum) - continue; /* at least one set does not contain the member */ - ele = dictGetEntryKey(de); - if (!dstkey) { - addReplyBulk(c,ele); - cardinality++; - } else { - dictAdd(dstset->ptr,ele,NULL); - incrRefCount(ele); + si = setTypeInitIterator(sets[0]); + while((ele = setTypeNext(si)) != NULL) { + for (j = 1; j < setnum; j++) + if (!setTypeIsMember(sets[j],ele)) break; + + /* Only take action when all sets contain the member */ + if (j == setnum) { + if (!dstkey) { + addReplyBulk(c,ele); + cardinality++; + } else { + setTypeAdd(dstset,ele); + } } + decrRefCount(ele); } - dictReleaseIterator(di); + setTypeReleaseIterator(si); if (dstkey) { /* Store the resulting set into the target, if the intersection * is not an empty set. */ - deleteKey(c->db,dstkey); - if (dictSize((dict*)dstset->ptr) > 0) { - dictAdd(c->db->dict,dstkey,dstset); - incrRefCount(dstkey); - addReplyLongLong(c,dictSize((dict*)dstset->ptr)); + dbDelete(c->db,dstkey); + if (setTypeSize(dstset) > 0) { + dbAdd(c->db,dstkey,dstset); + addReplyLongLong(c,setTypeSize(dstset)); } else { decrRefCount(dstset); addReply(c,shared.czero); @@ -5293,7 +5763,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long } else { lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality); } - zfree(dv); + zfree(sets); } static void sinterCommand(redisClient *c) { @@ -5308,29 +5778,25 @@ static void sinterstoreCommand(redisClient *c) { #define REDIS_OP_DIFF 1 #define REDIS_OP_INTER 2 -static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) { - dict **dv = zmalloc(sizeof(dict*)*setsnum); - dictIterator *di; - dictEntry *de; - robj *dstset = NULL; +static void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op) { + robj **sets = zmalloc(sizeof(robj*)*setnum); + setIterator *si; + robj *ele, *dstset = NULL; int j, cardinality = 0; - for (j = 0; j < setsnum; j++) { - robj *setobj; - - setobj = dstkey ? - lookupKeyWrite(c->db,setskeys[j]) : - lookupKeyRead(c->db,setskeys[j]); + for (j = 0; j < setnum; j++) { + robj *setobj = dstkey ? + lookupKeyWrite(c->db,setkeys[j]) : + lookupKeyRead(c->db,setkeys[j]); if (!setobj) { - dv[j] = NULL; + sets[j] = NULL; continue; } - if (setobj->type != REDIS_SET) { - zfree(dv); - addReply(c,shared.wrongtypeerr); + if (checkType(c,setobj,REDIS_SET)) { + zfree(sets); return; } - dv[j] = setobj->ptr; + sets[j] = setobj; } /* We need a temp set object to store our union. If the dstkey @@ -5340,61 +5806,53 @@ static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnu /* Iterate all the elements of all the sets, add every element a single * time to the result set */ - for (j = 0; j < setsnum; j++) { - if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */ - if (!dv[j]) continue; /* non existing keys are like empty sets */ + for (j = 0; j < setnum; j++) { + if (op == REDIS_OP_DIFF && j == 0 && !sets[j]) break; /* result set is empty */ + if (!sets[j]) continue; /* non existing keys are like empty sets */ - di = dictGetIterator(dv[j]); - - while((de = dictNext(di)) != NULL) { - robj *ele; - - /* dictAdd will not add the same element multiple times */ - ele = dictGetEntryKey(de); + si = setTypeInitIterator(sets[j]); + while((ele = setTypeNext(si)) != NULL) { if (op == REDIS_OP_UNION || j == 0) { - if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) { - incrRefCount(ele); + if (setTypeAdd(dstset,ele)) { cardinality++; } } else if (op == REDIS_OP_DIFF) { - if (dictDelete(dstset->ptr,ele) == DICT_OK) { + if (setTypeRemove(dstset,ele)) { cardinality--; } } + decrRefCount(ele); } - dictReleaseIterator(di); + setTypeReleaseIterator(si); - /* result set is empty? Exit asap. */ + /* Exit when result set is empty. */ if (op == REDIS_OP_DIFF && cardinality == 0) break; } /* Output the content of the resulting set, if not in STORE mode */ if (!dstkey) { addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality)); - di = dictGetIterator(dstset->ptr); - while((de = dictNext(di)) != NULL) { - robj *ele; - - ele = dictGetEntryKey(de); + si = setTypeInitIterator(dstset); + while((ele = setTypeNext(si)) != NULL) { addReplyBulk(c,ele); + decrRefCount(ele); } - dictReleaseIterator(di); + setTypeReleaseIterator(si); decrRefCount(dstset); } else { /* If we have a target key where to store the resulting set * create this key with the result set inside */ - deleteKey(c->db,dstkey); - if (dictSize((dict*)dstset->ptr) > 0) { - dictAdd(c->db->dict,dstkey,dstset); - incrRefCount(dstkey); - addReplyLongLong(c,dictSize((dict*)dstset->ptr)); + dbDelete(c->db,dstkey); + if (setTypeSize(dstset) > 0) { + dbAdd(c->db,dstkey,dstset); + addReplyLongLong(c,setTypeSize(dstset)); } else { decrRefCount(dstset); addReply(c,shared.czero); } server.dirty++; } - zfree(dv); + zfree(sets); } static void sunionCommand(redisClient *c) { @@ -5681,7 +6139,7 @@ static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) { * Returns 0 when the element cannot be found, rank otherwise. * Note that the rank is 1-based due to the span of zsl->header to the * first element. */ -static unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { +static unsigned long zslistTypeGetRank(zskiplist *zsl, double score, robj *o) { zskiplistNode *x; unsigned long rank = 0; int i; @@ -5705,7 +6163,7 @@ static unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { } /* Finds an element by its rank. The rank argument needs to be 1-based. */ -zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { +zskiplistNode* zslistTypeGetElementByRank(zskiplist *zsl, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0; int i; @@ -5734,11 +6192,15 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor zset *zs; double *score; + if (isnan(scoreval)) { + addReplySds(c,sdsnew("-ERR provide score is Not A Number (nan)\r\n")); + return; + } + zsetobj = lookupKeyWrite(c->db,key); if (zsetobj == NULL) { zsetobj = createZsetObject(); - dictAdd(c->db->dict,key,zsetobj); - incrRefCount(key); + dbAdd(c->db,key,zsetobj); } else { if (zsetobj->type != REDIS_ZSET) { addReply(c,shared.wrongtypeerr); @@ -5762,6 +6224,15 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor } else { *score = scoreval; } + if (isnan(*score)) { + addReplySds(c, + sdsnew("-ERR resulting score is Not A Number (nan)\r\n")); + zfree(score); + /* Note that we don't need to check if the zset may be empty and + * should be removed here, as we can only obtain Nan as score if + * there was already an element in the sorted set. */ + return; + } } else { *score = scoreval; } @@ -5845,7 +6316,7 @@ static void zremCommand(redisClient *c) { /* Delete from the hash table */ dictDelete(zs->dict,c->argv[2]); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; addReply(c,shared.cone); } @@ -5866,7 +6337,7 @@ static void zremrangebyscoreCommand(redisClient *c) { zs = zsetobj->ptr; deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty += deleted; addReplyLongLong(c,deleted); } @@ -5904,7 +6375,7 @@ static void zremrangebyrankCommand(redisClient *c) { * use 1-based rank */ deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty += deleted; addReplyLongLong(c, deleted); } @@ -6092,10 +6563,9 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION); } - deleteKey(c->db,dstkey); + dbDelete(c->db,dstkey); if (dstzset->zsl->length) { - dictAdd(c->db->dict,dstkey,dstobj); - incrRefCount(dstkey); + dbAdd(c->db,dstkey,dstobj); addReplyLongLong(c, dstzset->zsl->length); server.dirty++; } else { @@ -6159,10 +6629,10 @@ static void zrangeGenericCommand(redisClient *c, int reverse) { /* check if starting point is trivial, before searching * the element in log(N) time */ if (reverse) { - ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start); + ln = start == 0 ? zsl->tail : zslistTypeGetElementByRank(zsl, llen-start); } else { ln = start == 0 ? - zsl->header->forward[0] : zslGetElementByRank(zsl, start+1); + zsl->header->forward[0] : zslistTypeGetElementByRank(zsl, start+1); } /* Return the result in form of a multi-bulk reply */ @@ -6359,7 +6829,7 @@ static void zrankGenericCommand(redisClient *c, int reverse) { } score = dictGetEntryVal(de); - rank = zslGetRank(zsl, *score, c->argv[2]); + rank = zslistTypeGetRank(zsl, *score, c->argv[2]); if (rank) { if (reverse) { addReplyLongLong(c, zsl->length - rank); @@ -6386,7 +6856,7 @@ static void zrevrankCommand(redisClient *c) { /* Check the length of a number of objects to see if we need to convert a * zipmap to a real hash. Note that we only check string encoded objects * as their string length can be queried in constant time. */ -static void hashTryConversion(robj *subject, robj **argv, int start, int end) { +static void hashTypeTryConversion(robj *subject, robj **argv, int start, int end) { int i; if (subject->encoding != REDIS_ENCODING_ZIPMAP) return; @@ -6401,7 +6871,7 @@ static void hashTryConversion(robj *subject, robj **argv, int start, int end) { } /* Encode given objects in-place when the hash uses a dict. */ -static void hashTryObjectEncoding(robj *subject, robj **o1, robj **o2) { +static void hashTypeTryObjectEncoding(robj *subject, robj **o1, robj **o2) { if (subject->encoding == REDIS_ENCODING_HT) { if (o1) *o1 = tryObjectEncoding(*o1); if (o2) *o2 = tryObjectEncoding(*o2); @@ -6411,7 +6881,7 @@ static void hashTryObjectEncoding(robj *subject, robj **o1, robj **o2) { /* Get the value from a hash identified by key. Returns either a string * object or NULL if the value cannot be found. The refcount of the object * is always increased by 1 when the value was found. */ -static robj *hashGet(robj *o, robj *key) { +static robj *hashTypeGet(robj *o, robj *key) { robj *value = NULL; if (o->encoding == REDIS_ENCODING_ZIPMAP) { unsigned char *v; @@ -6433,7 +6903,7 @@ static robj *hashGet(robj *o, robj *key) { /* Test if the key exists in the given hash. Returns 1 if the key * exists and 0 when it doesn't. */ -static int hashExists(robj *o, robj *key) { +static int hashTypeExists(robj *o, robj *key) { if (o->encoding == REDIS_ENCODING_ZIPMAP) { key = getDecodedObject(key); if (zipmapExists(o->ptr,key->ptr,sdslen(key->ptr))) { @@ -6451,7 +6921,7 @@ static int hashExists(robj *o, robj *key) { /* Add an element, discard the old if the key already exists. * Return 0 on insert and 1 on update. */ -static int hashSet(robj *o, robj *key, robj *value) { +static int hashTypeSet(robj *o, robj *key, robj *value) { int update = 0; if (o->encoding == REDIS_ENCODING_ZIPMAP) { key = getDecodedObject(key); @@ -6480,7 +6950,7 @@ static int hashSet(robj *o, robj *key, robj *value) { /* Delete an element from a hash. * Return 1 on deleted and 0 on not found. */ -static int hashDelete(robj *o, robj *key) { +static int hashTypeDelete(robj *o, robj *key) { int deleted = 0; if (o->encoding == REDIS_ENCODING_ZIPMAP) { key = getDecodedObject(key); @@ -6495,7 +6965,7 @@ static int hashDelete(robj *o, robj *key) { } /* Return the number of elements in a hash. */ -static unsigned long hashLength(robj *o) { +static unsigned long hashTypeLength(robj *o) { return (o->encoding == REDIS_ENCODING_ZIPMAP) ? zipmapLen((unsigned char*)o->ptr) : dictSize((dict*)o->ptr); } @@ -6512,10 +6982,10 @@ typedef struct { dictIterator *di; dictEntry *de; -} hashIterator; +} hashTypeIterator; -static hashIterator *hashInitIterator(robj *subject) { - hashIterator *hi = zmalloc(sizeof(hashIterator)); +static hashTypeIterator *hashTypeInitIterator(robj *subject) { + hashTypeIterator *hi = zmalloc(sizeof(hashTypeIterator)); hi->encoding = subject->encoding; if (hi->encoding == REDIS_ENCODING_ZIPMAP) { hi->zi = zipmapRewind(subject->ptr); @@ -6527,7 +6997,7 @@ static hashIterator *hashInitIterator(robj *subject) { return hi; } -static void hashReleaseIterator(hashIterator *hi) { +static void hashTypeReleaseIterator(hashTypeIterator *hi) { if (hi->encoding == REDIS_ENCODING_HT) { dictReleaseIterator(hi->di); } @@ -6536,7 +7006,7 @@ static void hashReleaseIterator(hashIterator *hi) { /* Move to the next entry in the hash. Return REDIS_OK when the next entry * could be found and REDIS_ERR when the iterator reaches the end. */ -static int hashNext(hashIterator *hi) { +static int hashTypeNext(hashTypeIterator *hi) { if (hi->encoding == REDIS_ENCODING_ZIPMAP) { if ((hi->zi = zipmapNext(hi->zi, &hi->zk, &hi->zklen, &hi->zv, &hi->zvlen)) == NULL) return REDIS_ERR; @@ -6548,7 +7018,7 @@ static int hashNext(hashIterator *hi) { /* Get key or value object at current iteration position. * This increases the refcount of the field object by 1. */ -static robj *hashCurrent(hashIterator *hi, int what) { +static robj *hashTypeCurrent(hashTypeIterator *hi, int what) { robj *o; if (hi->encoding == REDIS_ENCODING_ZIPMAP) { if (what & REDIS_HASH_KEY) { @@ -6567,12 +7037,11 @@ static robj *hashCurrent(hashIterator *hi, int what) { return o; } -static robj *hashLookupWriteOrCreate(redisClient *c, robj *key) { +static robj *hashTypeLookupWriteOrCreate(redisClient *c, robj *key) { robj *o = lookupKeyWrite(c->db,key); if (o == NULL) { o = createHashObject(); - dictAdd(c->db->dict,key,o); - incrRefCount(key); + dbAdd(c->db,key,o); } else { if (o->type != REDIS_HASH) { addReply(c,shared.wrongtypeerr); @@ -6587,24 +7056,24 @@ static void hsetCommand(redisClient *c) { int update; robj *o; - if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - hashTryConversion(o,c->argv,2,3); - hashTryObjectEncoding(o,&c->argv[2], &c->argv[3]); - update = hashSet(o,c->argv[2],c->argv[3]); + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + hashTypeTryConversion(o,c->argv,2,3); + hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]); + update = hashTypeSet(o,c->argv[2],c->argv[3]); addReply(c, update ? shared.czero : shared.cone); server.dirty++; } static void hsetnxCommand(redisClient *c) { robj *o; - if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - hashTryConversion(o,c->argv,2,3); + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + hashTypeTryConversion(o,c->argv,2,3); - if (hashExists(o, c->argv[2])) { + if (hashTypeExists(o, c->argv[2])) { addReply(c, shared.czero); } else { - hashTryObjectEncoding(o,&c->argv[2], &c->argv[3]); - hashSet(o,c->argv[2],c->argv[3]); + hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]); + hashTypeSet(o,c->argv[2],c->argv[3]); addReply(c, shared.cone); server.dirty++; } @@ -6619,11 +7088,11 @@ static void hmsetCommand(redisClient *c) { return; } - if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - hashTryConversion(o,c->argv,2,c->argc-1); + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + hashTypeTryConversion(o,c->argv,2,c->argc-1); for (i = 2; i < c->argc; i += 2) { - hashTryObjectEncoding(o,&c->argv[i], &c->argv[i+1]); - hashSet(o,c->argv[i],c->argv[i+1]); + hashTypeTryObjectEncoding(o,&c->argv[i], &c->argv[i+1]); + hashTypeSet(o,c->argv[i],c->argv[i+1]); } addReply(c, shared.ok); server.dirty++; @@ -6634,8 +7103,8 @@ static void hincrbyCommand(redisClient *c) { robj *o, *current, *new; if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return; - if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if ((current = hashGet(o,c->argv[2])) != NULL) { + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + if ((current = hashTypeGet(o,c->argv[2])) != NULL) { if (getLongLongFromObjectOrReply(c,current,&value, "hash value is not an integer") != REDIS_OK) { decrRefCount(current); @@ -6648,8 +7117,8 @@ static void hincrbyCommand(redisClient *c) { value += incr; new = createStringObjectFromLongLong(value); - hashTryObjectEncoding(o,&c->argv[2],NULL); - hashSet(o,c->argv[2],new); + hashTypeTryObjectEncoding(o,&c->argv[2],NULL); + hashTypeSet(o,c->argv[2],new); decrRefCount(new); addReplyLongLong(c,value); server.dirty++; @@ -6660,7 +7129,7 @@ static void hgetCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,o,REDIS_HASH)) return; - if ((value = hashGet(o,c->argv[2])) != NULL) { + if ((value = hashTypeGet(o,c->argv[2])) != NULL) { addReplyBulk(c,value); decrRefCount(value); } else { @@ -6681,7 +7150,7 @@ static void hmgetCommand(redisClient *c) { * an empty hash. The reply should then be a series of NULLs. */ addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-2)); for (i = 2; i < c->argc; i++) { - if (o != NULL && (value = hashGet(o,c->argv[i])) != NULL) { + if (o != NULL && (value = hashTypeGet(o,c->argv[i])) != NULL) { addReplyBulk(c,value); decrRefCount(value); } else { @@ -6695,8 +7164,8 @@ static void hdelCommand(redisClient *c) { if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_HASH)) return; - if (hashDelete(o,c->argv[2])) { - if (hashLength(o) == 0) deleteKey(c->db,c->argv[1]); + if (hashTypeDelete(o,c->argv[2])) { + if (hashTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); addReply(c,shared.cone); server.dirty++; } else { @@ -6709,13 +7178,13 @@ static void hlenCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_HASH)) return; - addReplyUlong(c,hashLength(o)); + addReplyUlong(c,hashTypeLength(o)); } static void genericHgetallCommand(redisClient *c, int flags) { robj *o, *lenobj, *obj; unsigned long count = 0; - hashIterator *hi; + hashTypeIterator *hi; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,REDIS_HASH)) return; @@ -6724,22 +7193,22 @@ static void genericHgetallCommand(redisClient *c, int flags) { addReply(c,lenobj); decrRefCount(lenobj); - hi = hashInitIterator(o); - while (hashNext(hi) != REDIS_ERR) { + hi = hashTypeInitIterator(o); + while (hashTypeNext(hi) != REDIS_ERR) { if (flags & REDIS_HASH_KEY) { - obj = hashCurrent(hi,REDIS_HASH_KEY); + obj = hashTypeCurrent(hi,REDIS_HASH_KEY); addReplyBulk(c,obj); decrRefCount(obj); count++; } if (flags & REDIS_HASH_VALUE) { - obj = hashCurrent(hi,REDIS_HASH_VALUE); + obj = hashTypeCurrent(hi,REDIS_HASH_VALUE); addReplyBulk(c,obj); decrRefCount(obj); count++; } } - hashReleaseIterator(hi); + hashTypeReleaseIterator(hi); lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",count); } @@ -6761,7 +7230,7 @@ static void hexistsCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_HASH)) return; - addReply(c, hashExists(o,c->argv[2]) ? shared.cone : shared.czero); + addReply(c, hashTypeExists(o,c->argv[2]) ? shared.cone : shared.czero); } static void convertToRealHash(robj *o) { @@ -6882,7 +7351,7 @@ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { /* Retrieve value from hash by the field name. This operation * already increases the refcount of the returned object. */ initStaticStringObject(fieldobj,((char*)&fieldname)+(sizeof(long)*2)); - o = hashGet(o, &fieldobj); + o = hashTypeGet(o, &fieldobj); } else { if (o->type != REDIS_STRING) return NULL; @@ -6937,7 +7406,7 @@ static int sortCompare(const void *s1, const void *s2) { * is optimized for speed and a bit less for readability */ static void sortCommand(redisClient *c) { list *operations; - int outputlen = 0; + unsigned int outputlen = 0; int desc = 0, alpha = 0; int limit_start = 0, limit_count = -1, start, end; int j, dontsort = 0, vectorlen; @@ -7007,7 +7476,7 @@ static void sortCommand(redisClient *c) { /* Load the sorting vector with all the objects to sort */ switch(sortval->type) { - case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break; + case REDIS_LIST: vectorlen = listTypeLength(sortval); break; case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break; case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break; default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */ @@ -7016,18 +7485,15 @@ static void sortCommand(redisClient *c) { j = 0; if (sortval->type == REDIS_LIST) { - list *list = sortval->ptr; - listNode *ln; - listIter li; - - listRewind(list,&li); - while((ln = listNext(&li))) { - robj *ele = ln->value; - vector[j].obj = ele; + listTypeIterator *li = listTypeInitIterator(sortval,0,REDIS_TAIL); + listTypeEntry entry; + while(listTypeNext(li,&entry)) { + vector[j].obj = listTypeGet(&entry); vector[j].u.score = 0; vector[j].u.cmpobj = NULL; j++; } + listTypeReleaseIterator(li); } else { dict *set; dictIterator *di; @@ -7137,8 +7603,7 @@ static void sortCommand(redisClient *c) { } } } else { - robj *listObject = createListObject(); - list *listPtr = (list*) listObject->ptr; + robj *sobj = createZiplistObject(); /* STORE option specified, set the sorting result as a List object */ for (j = start; j <= end; j++) { @@ -7146,33 +7611,30 @@ static void sortCommand(redisClient *c) { listIter li; if (!getop) { - listAddNodeTail(listPtr,vector[j].obj); - incrRefCount(vector[j].obj); - } - listRewind(operations,&li); - while((ln = listNext(&li))) { - redisSortOperation *sop = ln->value; - robj *val = lookupKeyByPattern(c->db,sop->pattern, - vector[j].obj); + listTypePush(sobj,vector[j].obj,REDIS_TAIL); + } else { + listRewind(operations,&li); + while((ln = listNext(&li))) { + redisSortOperation *sop = ln->value; + robj *val = lookupKeyByPattern(c->db,sop->pattern, + vector[j].obj); - if (sop->type == REDIS_SORT_GET) { - if (!val) { - listAddNodeTail(listPtr,createStringObject("",0)); + if (sop->type == REDIS_SORT_GET) { + if (!val) val = createStringObject("",0); + + /* listTypePush does an incrRefCount, so we should take care + * care of the incremented refcount caused by either + * lookupKeyByPattern or createStringObject("",0) */ + listTypePush(sobj,val,REDIS_TAIL); + decrRefCount(val); } else { - /* We should do a incrRefCount on val because it is - * added to the list, but also a decrRefCount because - * it is returned by lookupKeyByPattern. This results - * in doing nothing at all. */ - listAddNodeTail(listPtr,val); + /* always fails */ + redisAssert(sop->type == REDIS_SORT_GET); } - } else { - redisAssert(sop->type == REDIS_SORT_GET); /* always fails */ } } } - if (dictReplace(c->db->dict,storekey,listObject)) { - incrRefCount(storekey); - } + dbReplace(c->db,storekey,sobj); /* Note: we add 1 because the DB is dirty anyway since even if the * SORT result is empty a new key is set and maybe the old content * replaced. */ @@ -7181,6 +7643,9 @@ static void sortCommand(redisClient *c) { } /* Cleanup */ + if (sortval->type == REDIS_LIST) + for (j = 0; j < vectorlen; j++) + decrRefCount(vector[j].obj); decrRefCount(sortval); listRelease(operations); for (j = 0; j < vectorlen; j++) { @@ -7351,7 +7816,7 @@ static void monitorCommand(redisClient *c) { /* ================================= Expire ================================= */ static int removeExpire(redisDb *db, robj *key) { - if (dictDelete(db->expires,key) == DICT_OK) { + if (dictDelete(db->expires,key->ptr) == DICT_OK) { return 1; } else { return 0; @@ -7359,10 +7824,11 @@ static int removeExpire(redisDb *db, robj *key) { } static int setExpire(redisDb *db, robj *key, time_t when) { - if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) { + sds copy = sdsdup(key->ptr); + if (dictAdd(db->expires,copy,(void*)when) == DICT_ERR) { + sdsfree(copy); return 0; } else { - incrRefCount(key); return 1; } } @@ -7374,7 +7840,7 @@ static time_t getExpire(redisDb *db, robj *key) { /* No expire? return ASAP */ if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return -1; + (de = dictFind(db->expires,key->ptr)) == NULL) return -1; return (time_t) dictGetEntryVal(de); } @@ -7385,16 +7851,16 @@ static int expireIfNeeded(redisDb *db, robj *key) { /* No expire? return ASAP */ if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return 0; + (de = dictFind(db->expires,key->ptr)) == NULL) return 0; /* Lookup the expire */ when = (time_t) dictGetEntryVal(de); if (time(NULL) <= when) return 0; /* Delete the key */ - dictDelete(db->expires,key); + dbDelete(db,key); server.stat_expiredkeys++; - return dictDelete(db->dict,key) == DICT_OK; + return 1; } static int deleteIfVolatile(redisDb *db, robj *key) { @@ -7402,13 +7868,13 @@ static int deleteIfVolatile(redisDb *db, robj *key) { /* No expire? return ASAP */ if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return 0; + (de = dictFind(db->expires,key->ptr)) == NULL) return 0; /* Delete the key */ server.dirty++; server.stat_expiredkeys++; - dictDelete(db->expires,key); - return dictDelete(db->dict,key) == DICT_OK; + dictDelete(db->expires,key->ptr); + return dictDelete(db->dict,key->ptr) == DICT_OK; } static void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) { @@ -7419,13 +7885,13 @@ static void expireGenericCommand(redisClient *c, robj *key, robj *param, long of seconds -= offset; - de = dictFind(c->db->dict,key); + de = dictFind(c->db->dict,key->ptr); if (de == NULL) { addReply(c,shared.czero); return; } if (seconds <= 0) { - if (deleteKey(c->db,key)) server.dirty++; + if (dbDelete(c->db,key)) server.dirty++; addReply(c, shared.cone); return; } else { @@ -7518,6 +7984,7 @@ static void discardCommand(redisClient *c) { freeClientMultiState(c); initClientMultiState(c); c->flags &= (~REDIS_MULTI); + unwatchAllKeys(c); addReply(c,shared.ok); } @@ -8194,7 +8661,7 @@ static void freeMemoryIfNeeded(void) { minttl = t; } } - deleteKey(server.db+j,minkey); + dbDelete(server.db+j,minkey); } } if (!freed) return; /* nothing to free... */ @@ -8203,6 +8670,48 @@ static void freeMemoryIfNeeded(void) { /* ============================== Append Only file ========================== */ +/* Called when the user switches from "appendonly yes" to "appendonly no" + * at runtime using the CONFIG command. */ +static void stopAppendOnly(void) { + flushAppendOnlyFile(); + aof_fsync(server.appendfd); + close(server.appendfd); + + server.appendfd = -1; + server.appendseldb = -1; + server.appendonly = 0; + /* rewrite operation in progress? kill it, wait child exit */ + if (server.bgsavechildpid != -1) { + int statloc; + + if (kill(server.bgsavechildpid,SIGKILL) != -1) + wait3(&statloc,0,NULL); + /* reset the buffer accumulating changes while the child saves */ + sdsfree(server.bgrewritebuf); + server.bgrewritebuf = sdsempty(); + server.bgsavechildpid = -1; + } +} + +/* Called when the user switches from "appendonly no" to "appendonly yes" + * at runtime using the CONFIG command. */ +static int startAppendOnly(void) { + server.appendonly = 1; + server.lastfsync = time(NULL); + server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); + if (server.appendfd == -1) { + redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno)); + return REDIS_ERR; + } + if (rewriteAppendOnlyFileBackground() == REDIS_ERR) { + server.appendonly = 0; + close(server.appendfd); + redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno)); + return REDIS_ERR; + } + return REDIS_OK; +} + /* Write the append only file buffer on disk. * * Since we are required to write the AOF before replying to the client, @@ -8236,6 +8745,11 @@ static void flushAppendOnlyFile(void) { sdsfree(server.aofbuf); server.aofbuf = sdsempty(); + /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have + * childs performing heavy I/O on disk. */ + if (server.no_appendfsync_on_rewrite && + (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1)) + return; /* Fsync if needed */ now = time(NULL); if (server.appendfsync == APPENDFSYNC_ALWAYS || @@ -8362,7 +8876,6 @@ int loadAppendOnlyFile(char *filename) { struct redisClient *fakeClient; FILE *fp = fopen(filename,"r"); struct redis_stat sb; - unsigned long long loadedkeys = 0; int appendonly = server.appendonly; if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) @@ -8385,6 +8898,7 @@ int loadAppendOnlyFile(char *filename) { char buf[128]; sds argsds; struct redisCommand *cmd; + int force_swapout; if (fgets(buf,sizeof(buf),fp) == NULL) { if (feof(fp)) @@ -8425,8 +8939,11 @@ int loadAppendOnlyFile(char *filename) { for (j = 0; j < argc; j++) decrRefCount(argv[j]); zfree(argv); /* Handle swapping while loading big datasets when VM is on */ - loadedkeys++; - if (server.vm_enabled && (loadedkeys % 5000) == 0) { + force_swapout = 0; + if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) + force_swapout = 1; + + if (server.vm_enabled && force_swapout) { while (zmalloc_used_memory() > server.vm_max_memory) { if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } @@ -8454,40 +8971,17 @@ fmterr: exit(1); } -/* Write an object into a file in the bulk format $\r\n\r\n */ -static int fwriteBulkObject(FILE *fp, robj *obj) { - char buf[128]; - int decrrc = 0; - - /* Avoid the incr/decr ref count business if possible to help - * copy-on-write (we are often in a child process when this function - * is called). - * Also makes sure that key objects don't get incrRefCount-ed when VM - * is enabled */ - if (obj->encoding != REDIS_ENCODING_RAW) { - obj = getDecodedObject(obj); - decrrc = 1; - } - snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr)); - if (fwrite(buf,strlen(buf),1,fp) == 0) goto err; - if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) - goto err; - if (fwrite("\r\n",2,1,fp) == 0) goto err; - if (decrrc) decrRefCount(obj); - return 1; -err: - if (decrrc) decrRefCount(obj); - return 0; -} - /* Write binary-safe string into a file in the bulkformat * $\r\n\r\n */ static int fwriteBulkString(FILE *fp, char *s, unsigned long len) { - char buf[128]; - - snprintf(buf,sizeof(buf),"$%ld\r\n",(unsigned long)len); - if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; - if (len && fwrite(s,len,1,fp) == 0) return 0; + char cbuf[128]; + int clen; + cbuf[0] = '$'; + clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len); + cbuf[clen++] = '\r'; + cbuf[clen++] = '\n'; + if (fwrite(cbuf,clen,1,fp) == 0) return 0; + if (len > 0 && fwrite(s,len,1,fp) == 0) return 0; if (fwrite("\r\n",2,1,fp) == 0) return 0; return 1; } @@ -8504,16 +8998,28 @@ static int fwriteBulkDouble(FILE *fp, double d) { } /* Write a long value in bulk format $\r\n\r\n */ -static int fwriteBulkLong(FILE *fp, long l) { - char buf[128], lbuf[128]; - - snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l); - snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2); - if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; - if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0; +static int fwriteBulkLongLong(FILE *fp, long long l) { + char bbuf[128], lbuf[128]; + unsigned int blen, llen; + llen = ll2string(lbuf,32,l); + blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf); + if (fwrite(bbuf,blen,1,fp) == 0) return 0; return 1; } +/* Delegate writing an object to writing a bulk string or bulk long long. */ +static int fwriteBulkObject(FILE *fp, robj *obj) { + /* Avoid using getDecodedObject to help copy-on-write (we are often + * in a child process when this function is called). */ + if (obj->encoding == REDIS_ENCODING_INT) { + return fwriteBulkLongLong(fp,(long)obj->ptr); + } else if (obj->encoding == REDIS_ENCODING_RAW) { + return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr)); + } else { + redisPanic("Unknown string encoding"); + } +} + /* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */ static int rewriteAppendOnlyFile(char *filename) { @@ -8545,28 +9051,30 @@ static int rewriteAppendOnlyFile(char *filename) { /* SELECT the new DB */ if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkLong(fp,j) == 0) goto werr; + if (fwriteBulkLongLong(fp,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - robj *key, *o; + sds keystr = dictGetEntryKey(de); + robj key, *o; time_t expiretime; int swapped; - key = dictGetEntryKey(de); + keystr = dictGetEntryKey(de); + o = dictGetEntryVal(de); + initStaticStringObject(key,keystr); /* If the value for this key is swapped, load a preview in memory. * We use a "swapped" flag to remember if we need to free the * value object instead to just increment the ref count anyway * in order to avoid copy-on-write of pages if we are forked() */ - if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) { - o = dictGetEntryVal(de); + if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || + o->storage == REDIS_VM_SWAPPING) { swapped = 0; } else { - o = vmPreviewObject(key); + o = vmPreviewObject(o); swapped = 1; } - expiretime = getExpire(db,key); + expiretime = getExpire(db,&key); /* Save the key and associated value */ if (o->type == REDIS_STRING) { @@ -8574,22 +9082,45 @@ static int rewriteAppendOnlyFile(char *filename) { char cmd[]="*3\r\n$3\r\nSET\r\n"; if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; /* Key and value */ - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { /* Emit the RPUSHes needed to rebuild the list */ - list *list = o->ptr; - listNode *ln; - listIter li; + char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = o->ptr; + unsigned char *p = ziplistIndex(zl,0); + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + while(ziplistGet(p,&vstr,&vlen,&vlong)) { + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (vstr) { + if (fwriteBulkString(fp,(char*)vstr,vlen) == 0) + goto werr; + } else { + if (fwriteBulkLongLong(fp,vlong) == 0) + goto werr; + } + p = ziplistNext(zl,p); + } + } else if (o->encoding == REDIS_ENCODING_LIST) { + list *list = o->ptr; + listNode *ln; + listIter li; - listRewind(list,&li); - while((ln = listNext(&li))) { - char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; - robj *eleobj = listNodeValue(ln); + listRewind(list,&li); + while((ln = listNext(&li))) { + robj *eleobj = listNodeValue(ln); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + } + } else { + redisPanic("Unknown list encoding"); } } else if (o->type == REDIS_SET) { /* Emit the SADDs needed to rebuild the set */ @@ -8602,7 +9133,7 @@ static int rewriteAppendOnlyFile(char *filename) { robj *eleobj = dictGetEntryKey(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,eleobj) == 0) goto werr; } dictReleaseIterator(di); @@ -8618,7 +9149,7 @@ static int rewriteAppendOnlyFile(char *filename) { double *score = dictGetEntryVal(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkDouble(fp,*score) == 0) goto werr; if (fwriteBulkObject(fp,eleobj) == 0) goto werr; } @@ -8634,7 +9165,7 @@ static int rewriteAppendOnlyFile(char *filename) { while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkString(fp,(char*)field,flen) == -1) return -1; if (fwriteBulkString(fp,(char*)val,vlen) == -1) @@ -8649,7 +9180,7 @@ static int rewriteAppendOnlyFile(char *filename) { robj *val = dictGetEntryVal(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,field) == -1) return -1; if (fwriteBulkObject(fp,val) == -1) return -1; } @@ -8664,8 +9195,8 @@ static int rewriteAppendOnlyFile(char *filename) { /* If this key is already expired skip it */ if (expiretime < now) continue; if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; - if (fwriteBulkLong(fp,expiretime) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr; } if (swapped) decrRefCount(o); } @@ -8674,7 +9205,7 @@ static int rewriteAppendOnlyFile(char *filename) { /* Make sure data will not remain on the OS's output buffers */ fflush(fp); - fsync(fileno(fp)); + aof_fsync(fileno(fp)); fclose(fp); /* Use RENAME to make sure the DB file is changed atomically only @@ -8787,50 +9318,19 @@ static void aofRemoveTempFile(pid_t childpid) { * as a fully non-blocking VM. */ -/* Called when the user switches from "appendonly yes" to "appendonly no" - * at runtime using the CONFIG command. */ -static void stopAppendOnly(void) { - flushAppendOnlyFile(); - fsync(server.appendfd); - close(server.appendfd); - - server.appendfd = -1; - server.appendseldb = -1; - server.appendonly = 0; - /* rewrite operation in progress? kill it, wait child exit */ - if (server.bgsavechildpid != -1) { - int statloc; +/* =================== Virtual Memory - Blocking Side ====================== */ - if (kill(server.bgsavechildpid,SIGKILL) != -1) - wait3(&statloc,0,NULL); - /* reset the buffer accumulating changes while the child saves */ - sdsfree(server.bgrewritebuf); - server.bgrewritebuf = sdsempty(); - server.bgsavechildpid = -1; - } -} +/* Create a VM pointer object. This kind of objects are used in place of + * values in the key -> value hash table, for swapped out objects. */ +static vmpointer *createVmPointer(int vtype) { + vmpointer *vp = zmalloc(sizeof(vmpointer)); -/* Called when the user switches from "appendonly no" to "appendonly yes" - * at runtime using the CONFIG command. */ -static int startAppendOnly(void) { - server.appendonly = 1; - server.lastfsync = time(NULL); - server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); - if (server.appendfd == -1) { - redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno)); - return REDIS_ERR; - } - if (rewriteAppendOnlyFileBackground() == REDIS_ERR) { - server.appendonly = 0; - close(server.appendfd); - redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno)); - return REDIS_ERR; - } - return REDIS_OK; + vp->type = REDIS_VMPOINTER; + vp->storage = REDIS_VM_SWAPPED; + vp->vtype = vtype; + return vp; } -/* =================== Virtual Memory - Blocking Side ====================== */ - static void vmInit(void) { off_t totsize; int pipefds[2]; @@ -9045,30 +9545,33 @@ static int vmWriteObjectOnSwap(robj *o, off_t page) { return REDIS_OK; } -/* Swap the 'val' object relative to 'key' into disk. Store all the information - * needed to later retrieve the object into the key object. +/* Transfers the 'val' object to disk. Store all the information + * a 'vmpointer' object containing all the information needed to load the + * object back later is returned. + * * If we can't find enough contiguous empty pages to swap the object on disk - * REDIS_ERR is returned. */ -static int vmSwapObjectBlocking(robj *key, robj *val) { + * NULL is returned. */ +static vmpointer *vmSwapObjectBlocking(robj *val) { off_t pages = rdbSavedObjectPages(val,NULL); off_t page; + vmpointer *vp; + + assert(val->storage == REDIS_VM_MEMORY); + assert(val->refcount == 1); + if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return NULL; + if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return NULL; - assert(key->storage == REDIS_VM_MEMORY); - assert(key->refcount == 1); - if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR; - if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return REDIS_ERR; - key->vm.page = page; - key->vm.usedpages = pages; - key->storage = REDIS_VM_SWAPPED; - key->vtype = val->type; + vp = createVmPointer(val->type); + vp->page = page; + vp->usedpages = pages; decrRefCount(val); /* Deallocate the object from memory. */ vmMarkPagesUsed(page,pages); - redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)", - (unsigned char*) key->ptr, + redisLog(REDIS_DEBUG,"VM: object %p swapped out at %lld (%lld pages)", + (void*) val, (unsigned long long) page, (unsigned long long) pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; - return REDIS_OK; + return vp; } static robj *vmReadObjectFromSwap(off_t page, int type) { @@ -9090,46 +9593,47 @@ static robj *vmReadObjectFromSwap(off_t page, int type) { return o; } -/* Load the value object relative to the 'key' object from swap to memory. +/* Load the specified object from swap to memory. * The newly allocated object is returned. * * If preview is true the unserialized object is returned to the caller but - * no changes are made to the key object, nor the pages are marked as freed */ -static robj *vmGenericLoadObject(robj *key, int preview) { + * the pages are not marked as freed, nor the vp object is freed. */ +static robj *vmGenericLoadObject(vmpointer *vp, int preview) { robj *val; - redisAssert(key->storage == REDIS_VM_SWAPPED || key->storage == REDIS_VM_LOADING); - val = vmReadObjectFromSwap(key->vm.page,key->vtype); + redisAssert(vp->type == REDIS_VMPOINTER && + (vp->storage == REDIS_VM_SWAPPED || vp->storage == REDIS_VM_LOADING)); + val = vmReadObjectFromSwap(vp->page,vp->vtype); if (!preview) { - key->storage = REDIS_VM_MEMORY; - key->vm.atime = server.unixtime; - vmMarkPagesFree(key->vm.page,key->vm.usedpages); - redisLog(REDIS_DEBUG, "VM: object %s loaded from disk", - (unsigned char*) key->ptr); + redisLog(REDIS_DEBUG, "VM: object %p loaded from disk", (void*)vp); + vmMarkPagesFree(vp->page,vp->usedpages); + zfree(vp); server.vm_stats_swapped_objects--; } else { - redisLog(REDIS_DEBUG, "VM: object %s previewed from disk", - (unsigned char*) key->ptr); + redisLog(REDIS_DEBUG, "VM: object %p previewed from disk", (void*)vp); } server.vm_stats_swapins++; return val; } -/* Plain object loading, from swap to memory */ -static robj *vmLoadObject(robj *key) { +/* Plain object loading, from swap to memory. + * + * 'o' is actually a redisVmPointer structure that will be freed by the call. + * The return value is the loaded object. */ +static robj *vmLoadObject(robj *o) { /* If we are loading the object in background, stop it, we * need to load this object synchronously ASAP. */ - if (key->storage == REDIS_VM_LOADING) - vmCancelThreadedIOJob(key); - return vmGenericLoadObject(key,0); + if (o->storage == REDIS_VM_LOADING) + vmCancelThreadedIOJob(o); + return vmGenericLoadObject((vmpointer*)o,0); } /* Just load the value on disk, without to modify the key. * This is useful when we want to perform some operation on the value * without to really bring it from swap to memory, like while saving the * dataset or rewriting the append only log. */ -static robj *vmPreviewObject(robj *key) { - return vmGenericLoadObject(key,1); +static robj *vmPreviewObject(robj *o) { + return vmGenericLoadObject((vmpointer*)o,1); } /* How a good candidate is this object for swapping? @@ -9144,14 +9648,16 @@ static robj *vmPreviewObject(robj *key) { * proportionally, this is why we use the logarithm. This algorithm is * just a first try and will probably be tuned later. */ static double computeObjectSwappability(robj *o) { - time_t age = server.unixtime - o->vm.atime; + /* actual age can be >= minage, but not < minage. As we use wrapping + * 21 bit clocks with minutes resolution for the LRU. */ + time_t minage = abs(server.lruclock - o->lru); long asize = 0; list *l; dict *d; struct dictEntry *de; int z; - if (age <= 0) return 0; + if (minage <= 0) return 0; switch(o->type) { case REDIS_STRING: if (o->encoding != REDIS_ENCODING_RAW) { @@ -9170,8 +9676,7 @@ static double computeObjectSwappability(robj *o) { long elesize; elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); asize += (sizeof(listNode)+elesize)*listLength(l); } break; @@ -9189,8 +9694,7 @@ static double computeObjectSwappability(robj *o) { de = dictGetRandomKey(d); ele = dictGetEntryKey(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); asize += (sizeof(struct dictEntry)+elesize)*dictSize(d); if (z) asize += sizeof(zskiplistNode)*dictSize(d); } @@ -9217,18 +9721,16 @@ static double computeObjectSwappability(robj *o) { de = dictGetRandomKey(d); ele = dictGetEntryKey(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); ele = dictGetEntryVal(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); asize += (sizeof(struct dictEntry)+elesize)*dictSize(d); } } break; } - return (double)age*log(1+asize); + return (double)minage*log(1+asize); } /* Try to swap an object that's a good candidate for swapping. @@ -9242,7 +9744,8 @@ static int vmSwapOneObject(int usethreads) { struct dictEntry *best = NULL; double best_swappability = 0; redisDb *best_db = NULL; - robj *key, *val; + robj *val; + sds key; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -9258,16 +9761,14 @@ static int vmSwapOneObject(int usethreads) { if (maxtries) maxtries--; de = dictGetRandomKey(db->dict); - key = dictGetEntryKey(de); val = dictGetEntryVal(de); /* Only swap objects that are currently in memory. * - * Also don't swap shared objects if threaded VM is on, as we - * try to ensure that the main thread does not touch the + * Also don't swap shared objects: not a good idea in general and + * we need to ensure that the main thread does not touch the * object while the I/O thread is using it, but we can't * control other keys without adding additional mutex. */ - if (key->storage != REDIS_VM_MEMORY || - (server.vm_max_threads != 0 && val->refcount != 1)) { + if (val->storage != REDIS_VM_MEMORY || val->refcount != 1) { if (maxtries) i--; /* don't count this try */ continue; } @@ -9284,21 +9785,19 @@ static int vmSwapOneObject(int usethreads) { val = dictGetEntryVal(best); redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f", - key->ptr, best_swappability); + key, best_swappability); - /* Unshare the key if needed */ - if (key->refcount > 1) { - robj *newkey = dupStringObject(key); - decrRefCount(key); - key = dictGetEntryKey(best) = newkey; - } /* Swap it */ if (usethreads) { - vmSwapObjectThreaded(key,val,best_db); + robj *keyobj = createStringObject(key,sdslen(key)); + vmSwapObjectThreaded(keyobj,val,best_db); + decrRefCount(keyobj); return REDIS_OK; } else { - if (vmSwapObjectBlocking(key,val) == REDIS_OK) { - dictGetEntryVal(best) = NULL; + vmpointer *vp; + + if ((vp = vmSwapObjectBlocking(val)) != NULL) { + dictGetEntryVal(best) = vp; return REDIS_OK; } else { return REDIS_ERR; @@ -9321,30 +9820,20 @@ static int vmCanSwapOut(void) { return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1); } -/* Delete a key if swapped. Returns 1 if the key was found, was swapped - * and was deleted. Otherwise 0 is returned. */ -static int deleteIfSwapped(redisDb *db, robj *key) { - dictEntry *de; - robj *foundkey; - - if ((de = dictFind(db->dict,key)) == NULL) return 0; - foundkey = dictGetEntryKey(de); - if (foundkey->storage == REDIS_VM_MEMORY) return 0; - deleteKey(db,key); - return 1; -} - /* =================== Virtual Memory - Threaded I/O ======================= */ static void freeIOJob(iojob *j) { if ((j->type == REDIS_IOJOB_PREPARE_SWAP || j->type == REDIS_IOJOB_DO_SWAP || j->type == REDIS_IOJOB_LOAD) && j->val != NULL) + { + /* we fix the storage type, otherwise decrRefCount() will try to + * kill the I/O thread Job (that does no longer exists). */ + if (j->val->storage == REDIS_VM_SWAPPING) + j->val->storage = REDIS_VM_MEMORY; decrRefCount(j->val); - /* We don't decrRefCount the j->key field as we did't incremented - * the count creating IO Jobs. This is because the key field here is - * just used as an indentifier and if a key is removed the Job should - * never be touched again. */ + } + decrRefCount(j->key); zfree(j); } @@ -9365,7 +9854,6 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, while((retval = read(fd,buf,1)) == 1) { iojob *j; listNode *ln; - robj *key; struct dictEntry *de; redisLog(REDIS_DEBUG,"Processing I/O completed job"); @@ -9388,27 +9876,26 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, } /* Post process it in the main thread, as there are things we * can do just here to avoid race conditions and/or invasive locks */ - redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount); - de = dictFind(j->db->dict,j->key); - assert(de != NULL); - key = dictGetEntryKey(de); + redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr); + de = dictFind(j->db->dict,j->key->ptr); + redisAssert(de != NULL); if (j->type == REDIS_IOJOB_LOAD) { redisDb *db; + vmpointer *vp = dictGetEntryVal(de); /* Key loaded, bring it at home */ - key->storage = REDIS_VM_MEMORY; - key->vm.atime = server.unixtime; - vmMarkPagesFree(key->vm.page,key->vm.usedpages); + vmMarkPagesFree(vp->page,vp->usedpages); redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)", - (unsigned char*) key->ptr); + (unsigned char*) j->key->ptr); server.vm_stats_swapped_objects--; server.vm_stats_swapins++; dictGetEntryVal(de) = j->val; incrRefCount(j->val); db = j->db; - freeIOJob(j); /* Handle clients waiting for this key to be loaded. */ - handleClientsBlockedOnSwappedKey(db,key); + handleClientsBlockedOnSwappedKey(db,j->key); + freeIOJob(j); + zfree(vp); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { /* Now we know the amount of pages required to swap this object. * Let's find some space for it, and queue this task again @@ -9418,8 +9905,8 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, { /* Ooops... no space or we can't swap as there is * a fork()ed Redis trying to save stuff on disk. */ + j->val->storage = REDIS_VM_MEMORY; /* undo operation */ freeIOJob(j); - key->storage = REDIS_VM_MEMORY; /* undo operation */ } else { /* Note that we need to mark this pages as used now, * if the job will be canceled, we'll mark them as freed @@ -9431,28 +9918,29 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, unlockThreadedIO(); } } else if (j->type == REDIS_IOJOB_DO_SWAP) { - robj *val; + vmpointer *vp; /* Key swapped. We can finally free some memory. */ - if (key->storage != REDIS_VM_SWAPPING) { - printf("key->storage: %d\n",key->storage); - printf("key->name: %s\n",(char*)key->ptr); - printf("key->refcount: %d\n",key->refcount); + if (j->val->storage != REDIS_VM_SWAPPING) { + vmpointer *vp = (vmpointer*) j->id; + printf("storage: %d\n",vp->storage); + printf("key->name: %s\n",(char*)j->key->ptr); printf("val: %p\n",(void*)j->val); printf("val->type: %d\n",j->val->type); printf("val->ptr: %s\n",(char*)j->val->ptr); } - redisAssert(key->storage == REDIS_VM_SWAPPING); - val = dictGetEntryVal(de); - key->vm.page = j->page; - key->vm.usedpages = j->pages; - key->storage = REDIS_VM_SWAPPED; - key->vtype = j->val->type; - decrRefCount(val); /* Deallocate the object from memory. */ - dictGetEntryVal(de) = NULL; + redisAssert(j->val->storage == REDIS_VM_SWAPPING); + vp = createVmPointer(j->val->type); + vp->page = j->page; + vp->usedpages = j->pages; + dictGetEntryVal(de) = vp; + /* Fix the storage otherwise decrRefCount will attempt to + * remove the associated I/O job */ + j->val->storage = REDIS_VM_MEMORY; + decrRefCount(j->val); redisLog(REDIS_DEBUG, "VM: object %s swapped out at %lld (%lld pages) (threaded)", - (unsigned char*) key->ptr, + (unsigned char*) j->key->ptr, (unsigned long long) j->page, (unsigned long long) j->pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; @@ -9507,7 +9995,7 @@ static void vmCancelThreadedIOJob(robj *o) { assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING); again: lockThreadedIO(); - /* Search for a matching key in one of the queues */ + /* Search for a matching object in one of the queues */ for (i = 0; i < 3; i++) { listNode *ln; listIter li; @@ -9517,9 +10005,9 @@ again: iojob *job = ln->value; if (job->canceled) continue; /* Skip this, already canceled. */ - if (job->key == o) { - redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n", - (void*)job, (char*)o->ptr, job->type, i); + if (job->id == o) { + redisLog(REDIS_DEBUG,"*** CANCELED %p (key %s) (type %d) (LIST ID %d)\n", + (void*)job, (char*)job->key->ptr, job->type, i); /* Mark the pages as free since the swap didn't happened * or happened but is now discarded. */ if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP) @@ -9567,12 +10055,14 @@ again: else if (o->storage == REDIS_VM_SWAPPING) o->storage = REDIS_VM_MEMORY; unlockThreadedIO(); + redisLog(REDIS_DEBUG,"*** DONE"); return; } } } unlockThreadedIO(); - assert(1 != 1); /* We should never reach this */ + printf("Not found: %p\n", (void*)o); + redisAssert(1 != 1); /* We should never reach this */ } static void *IOThreadEntryPoint(void *arg) { @@ -9605,7 +10095,8 @@ static void *IOThreadEntryPoint(void *arg) { /* Process the Job */ if (j->type == REDIS_IOJOB_LOAD) { - j->val = vmReadObjectFromSwap(j->page,j->key->vtype); + vmpointer *vp = (vmpointer*)j->id; + j->val = vmReadObjectFromSwap(j->page,vp->vtype); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { FILE *fp = fopen("/dev/null","w+"); j->pages = rdbSavedObjectPages(j->val,fp); @@ -9701,18 +10192,16 @@ static void queueIOJob(iojob *j) { static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { iojob *j; - assert(key->storage == REDIS_VM_MEMORY); - assert(key->refcount == 1); - j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_PREPARE_SWAP; j->db = db; j->key = key; - j->val = val; + incrRefCount(key); + j->id = j->val = val; incrRefCount(val); j->canceled = 0; j->thread = (pthread_t) -1; - key->storage = REDIS_VM_SWAPPING; + val->storage = REDIS_VM_SWAPPING; lockThreadedIO(); queueIOJob(j); @@ -9734,9 +10223,9 @@ static int waitForSwappedKey(redisClient *c, robj *key) { /* If the key does not exist or is already in RAM we don't need to * block the client at all. */ - de = dictFind(c->db->dict,key); + de = dictFind(c->db->dict,key->ptr); if (de == NULL) return 0; - o = dictGetEntryKey(de); + o = dictGetEntryVal(de); if (o->storage == REDIS_VM_MEMORY) { return 0; } else if (o->storage == REDIS_VM_SWAPPING) { @@ -9770,14 +10259,16 @@ static int waitForSwappedKey(redisClient *c, robj *key) { /* Are we already loading the key from disk? If not create a job */ if (o->storage == REDIS_VM_SWAPPED) { iojob *j; + vmpointer *vp = (vmpointer*)o; o->storage = REDIS_VM_LOADING; j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_LOAD; j->db = c->db; - j->key = o; - j->key->vtype = o->vtype; - j->page = o->vm.page; + j->id = (robj*)vp; + j->key = key; + incrRefCount(key); + j->page = vp->page; j->val = NULL; j->canceled = 0; j->thread = (pthread_t) -1; @@ -9902,6 +10393,8 @@ static int dontWaitForSwappedKey(redisClient *c, robj *key) { return listLength(c->io_keys) == 0; } +/* Every time we now a key was loaded back in memory, we handle clients + * waiting for this key if any. */ static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { struct dictEntry *de; list *l; @@ -9960,6 +10453,11 @@ static void configSetCommand(redisClient *c) { } else { goto badfmt; } + } else if (!strcasecmp(c->argv[2]->ptr,"no-appendfsync-on-rewrite")) { + int yn = yesnotoi(o->ptr); + + if (yn == -1) goto badfmt; + server.no_appendfsync_on_rewrite = yn; } else if (!strcasecmp(c->argv[2]->ptr,"appendonly")) { int old = server.appendonly; int new = yesnotoi(o->ptr); @@ -10075,6 +10573,11 @@ static void configGetCommand(redisClient *c) { addReplyBulkCString(c,server.appendonly ? "yes" : "no"); matches++; } + if (stringmatch(pattern,"no-appendfsync-on-rewrite",0)) { + addReplyBulkCString(c,"no-appendfsync-on-rewrite"); + addReplyBulkCString(c,server.no_appendfsync_on_rewrite ? "yes" : "no"); + matches++; + } if (stringmatch(pattern,"appendfsync",0)) { char *policy; @@ -10510,7 +11013,7 @@ static void touchWatchedKeysOnFlush(int dbid) { * key exists, mark the client as dirty, as the key will be * removed. */ if (dbid == -1 || wk->db->id == dbid) { - if (dictFind(wk->db->dict, wk->key) != NULL) + if (dictFind(wk->db->dict, wk->key->ptr) != NULL) c->flags |= REDIS_DIRTY_CAS; } } @@ -10621,42 +11124,35 @@ static void computeDatasetDigest(unsigned char *final) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - robj *key, *o, *kcopy; + sds key; + robj *keyobj, *o; time_t expiretime; memset(digest,0,20); /* This key-val digest */ key = dictGetEntryKey(de); + keyobj = createStringObject(key,sdslen(key)); + + mixDigest(digest,key,sdslen(key)); + + /* Make sure the key is loaded if VM is active */ + o = lookupKeyRead(db,keyobj); - if (!server.vm_enabled) { - mixObjectDigest(digest,key); - o = dictGetEntryVal(de); - } else { - /* Don't work with the key directly as when VM is active - * this is unsafe: TODO: fix decrRefCount to check if the - * count really reached 0 to avoid this mess */ - kcopy = dupStringObject(key); - mixObjectDigest(digest,kcopy); - o = lookupKeyRead(db,kcopy); - decrRefCount(kcopy); - } aux = htonl(o->type); mixDigest(digest,&aux,sizeof(aux)); - expiretime = getExpire(db,key); + expiretime = getExpire(db,keyobj); /* Save the key and associated value */ if (o->type == REDIS_STRING) { mixObjectDigest(digest,o); } else if (o->type == REDIS_LIST) { - list *list = o->ptr; - listNode *ln; - listIter li; - - listRewind(list,&li); - while((ln = listNext(&li))) { - robj *eleobj = listNodeValue(ln); - + listTypeIterator *li = listTypeInitIterator(o,0,REDIS_TAIL); + listTypeEntry entry; + while(listTypeNext(li,&entry)) { + robj *eleobj = listTypeGet(&entry); mixObjectDigest(digest,eleobj); + decrRefCount(eleobj); } + listTypeReleaseIterator(li); } else if (o->type == REDIS_SET) { dict *set = o->ptr; dictIterator *di = dictGetIterator(set); @@ -10686,23 +11182,23 @@ static void computeDatasetDigest(unsigned char *final) { } dictReleaseIterator(di); } else if (o->type == REDIS_HASH) { - hashIterator *hi; + hashTypeIterator *hi; robj *obj; - hi = hashInitIterator(o); - while (hashNext(hi) != REDIS_ERR) { + hi = hashTypeInitIterator(o); + while (hashTypeNext(hi) != REDIS_ERR) { unsigned char eledigest[20]; memset(eledigest,0,20); - obj = hashCurrent(hi,REDIS_HASH_KEY); + obj = hashTypeCurrent(hi,REDIS_HASH_KEY); mixObjectDigest(eledigest,obj); decrRefCount(obj); - obj = hashCurrent(hi,REDIS_HASH_VALUE); + obj = hashTypeCurrent(hi,REDIS_HASH_VALUE); mixObjectDigest(eledigest,obj); decrRefCount(obj); xorDigest(digest,eledigest,20); } - hashReleaseIterator(hi); + hashTypeReleaseIterator(hi); } else { redisPanic("Unknown object type"); } @@ -10710,6 +11206,7 @@ static void computeDatasetDigest(unsigned char *final) { if (expiretime != -1) xorDigest(digest,"!!expire!!",10); /* We can finally xor the key-val digest to the final digest */ xorDigest(final,digest,20); + decrRefCount(keyobj); } dictReleaseIterator(di); } @@ -10739,17 +11236,16 @@ static void debugCommand(redisClient *c) { redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF"); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) { - dictEntry *de = dictFind(c->db->dict,c->argv[2]); - robj *key, *val; + dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr); + robj *val; if (!de) { addReply(c,shared.nokeyerr); return; } - key = dictGetEntryKey(de); val = dictGetEntryVal(de); - if (!server.vm_enabled || (key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING)) { + if (!server.vm_enabled || (val->storage == REDIS_VM_MEMORY || + val->storage == REDIS_VM_SWAPPING)) { char *strenc; char buf[128]; @@ -10760,23 +11256,25 @@ static void debugCommand(redisClient *c) { strenc = buf; } addReplySds(c,sdscatprintf(sdsempty(), - "+Key at:%p refcount:%d, value at:%p refcount:%d " + "+Value at:%p refcount:%d " "encoding:%s serializedlength:%lld\r\n", - (void*)key, key->refcount, (void*)val, val->refcount, + (void*)val, val->refcount, strenc, (long long) rdbSavedObjectLen(val,NULL))); } else { + vmpointer *vp = (vmpointer*) val; addReplySds(c,sdscatprintf(sdsempty(), - "+Key at:%p refcount:%d, value swapped at: page %llu " + "+Value swapped at: page %llu " "using %llu pages\r\n", - (void*)key, key->refcount, (unsigned long long) key->vm.page, - (unsigned long long) key->vm.usedpages)); + (unsigned long long) vp->page, + (unsigned long long) vp->usedpages)); } } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) { lookupKeyRead(c->db,c->argv[2]); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) { - dictEntry *de = dictFind(c->db->dict,c->argv[2]); - robj *key, *val; + dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr); + robj *val; + vmpointer *vp; if (!server.vm_enabled) { addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n")); @@ -10786,19 +11284,14 @@ static void debugCommand(redisClient *c) { addReply(c,shared.nokeyerr); return; } - key = dictGetEntryKey(de); val = dictGetEntryVal(de); - /* If the key is shared we want to create a copy */ - if (key->refcount > 1) { - robj *newkey = dupStringObject(key); - decrRefCount(key); - key = dictGetEntryKey(de) = newkey; - } /* Swap it */ - if (key->storage != REDIS_VM_MEMORY) { + if (val->storage != REDIS_VM_MEMORY) { addReplySds(c,sdsnew("-ERR This key is not in memory\r\n")); - } else if (vmSwapObjectBlocking(key,val) == REDIS_OK) { - dictGetEntryVal(de) = NULL; + } else if (val->refcount != 1) { + addReplySds(c,sdsnew("-ERR Object is shared\r\n")); + } else if ((vp = vmSwapObjectBlocking(val)) != NULL) { + dictGetEntryVal(de) = vp; addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -10819,7 +11312,8 @@ static void debugCommand(redisClient *c) { } snprintf(buf,sizeof(buf),"value:%lu",j); val = createStringObject(buf,strlen(buf)); - dictAdd(c->db->dict,key,val); + dbAdd(c->db,key,val); + decrRefCount(key); } addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) { @@ -10906,7 +11400,8 @@ static void daemonize(void) { } static void version() { - printf("Redis server version %s\n", REDIS_VERSION); + printf("Redis server version %s (%s:%d)\n", REDIS_VERSION, + REDIS_GIT_SHA1, atoi(REDIS_GIT_DIRTY) > 0); exit(0); } @@ -10920,6 +11415,7 @@ int main(int argc, char **argv) { time_t start; initServerConfig(); + sortCommandTable(); if (argc == 2) { if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--version") == 0) version();