X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/5436146c8dcd95c5ef3809f2830fb9c08edc4177..d52e5888694cb65918f2b0d00691c198deb5643f:/redis.c diff --git a/redis.c b/redis.c index 8ace1898..f6a765da 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "1.3.12" +#define REDIS_VERSION "2.1.1" #include "fmacros.h" #include "config.h" @@ -75,8 +75,8 @@ #include "lzf.h" /* LZF compression library */ #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ #include "zipmap.h" /* Compact dictionary-alike data structure */ +#include "ziplist.h" /* Compact list data structure */ #include "sha1.h" /* SHA1 is used for DEBUG DIGEST */ -#include "release.h" /* Release and/or git repository information */ /* Error codes */ #define REDIS_OK 0 @@ -120,17 +120,20 @@ #define REDIS_SET 2 #define REDIS_ZSET 3 #define REDIS_HASH 4 +#define REDIS_VMPOINTER 8 /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. */ -#define REDIS_ENCODING_RAW 0 /* Raw representation */ -#define REDIS_ENCODING_INT 1 /* Encoded as integer */ -#define REDIS_ENCODING_ZIPMAP 2 /* Encoded as zipmap */ -#define REDIS_ENCODING_HT 3 /* Encoded as an hash table */ +#define REDIS_ENCODING_RAW 0 /* Raw representation */ +#define REDIS_ENCODING_INT 1 /* Encoded as integer */ +#define REDIS_ENCODING_HT 2 /* Encoded as hash table */ +#define REDIS_ENCODING_ZIPMAP 3 /* Encoded as zipmap */ +#define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */ +#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ static char* strencoding[] = { - "raw", "int", "zipmap", "hashtable" + "raw", "int", "hashtable", "zipmap", "linkedlist", "ziplist" }; /* Object types only used for dumping to disk */ @@ -190,6 +193,7 @@ static char* strencoding[] = { #define REDIS_MULTI 8 /* This client is in a MULTI context */ #define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */ #define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ +#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ /* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ @@ -232,9 +236,11 @@ static char* strencoding[] = { #define APPENDFSYNC_ALWAYS 1 #define APPENDFSYNC_EVERYSEC 2 -/* Hashes related defaults */ +/* Zip structure related defaults */ #define REDIS_HASH_MAX_ZIPMAP_ENTRIES 64 #define REDIS_HASH_MAX_ZIPMAP_VALUE 512 +#define REDIS_LIST_MAX_ZIPLIST_ENTRIES 1024 +#define REDIS_LIST_MAX_ZIPLIST_VALUE 32 /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) @@ -246,30 +252,41 @@ static void _redisPanic(char *msg, char *file, int line); /* A redis object, that is a type able to hold a string / list / set */ -/* The VM object structure */ -struct redisObjectVM { - off_t page; /* the page at witch the object is stored on disk */ - off_t usedpages; /* number of pages used on disk */ - time_t atime; /* Last access time */ -} vm; - /* The actual Redis Object */ typedef struct redisObject { - void *ptr; - unsigned char type; - unsigned char encoding; - unsigned char storage; /* If this object is a key, where is the value? - * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */ - unsigned char vtype; /* If this object is a key, and value is swapped out, - * this is the type of the swapped out object. */ + unsigned type:4; + unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */ + unsigned encoding:4; + unsigned lru:22; /* lru time (relative to server.lruclock) */ int refcount; - /* VM fields, this are only allocated if VM is active, otherwise the + void *ptr; + /* VM fields are only allocated if VM is active, otherwise the * object allocation function will just allocate * sizeof(redisObjct) minus sizeof(redisObjectVM), so using * Redis without VM active will not have any overhead. */ - struct redisObjectVM vm; } robj; +/* The VM pointer structure - identifies an object in the swap file. + * + * This object is stored in place of the value + * object in the main key->value hash table representing a database. + * Note that the first fields (type, storage) are the same as the redisObject + * structure so that vmPointer strucuters can be accessed even when casted + * as redisObject structures. + * + * This is useful as we don't know if a value object is or not on disk, but we + * are always able to read obj->storage to check this. For vmPointer + * structures "type" is set to REDIS_VMPOINTER (even if without this field + * is still possible to check the kind of object from the value of 'storage').*/ +typedef struct vmPointer { + unsigned type:4; + unsigned storage:2; /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ + unsigned notused:26; + unsigned int vtype; /* type of the object stored in the swap file */ + off_t page; /* the page at witch the object is stored on disk */ + off_t usedpages; /* number of pages used on disk */ +} vmpointer; + /* Macro used to initalize a Redis object allocated on the stack. * Note that this macro is taken near the structure definition to make sure * we'll update it when the structure is changed, to avoid bugs like @@ -279,14 +296,15 @@ typedef struct redisObject { _var.type = REDIS_STRING; \ _var.encoding = REDIS_ENCODING_RAW; \ _var.ptr = _ptr; \ - if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \ + _var.storage = REDIS_VM_MEMORY; \ } while(0); typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ - dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */ + dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ dict *io_keys; /* Keys with clients waiting for VM I/O */ + dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -324,13 +342,14 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - robj **blockingkeys; /* The key we are waiting to terminate a blocking + robj **blocking_keys; /* The key we are waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ - int blockingkeysnum; /* Number of blocking keys */ + int blocking_keys_num; /* Number of blocking keys */ time_t blockingto; /* Blocking operation timeout. If UNIX current time * is >= blockingto then the operation timed out. */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ + list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ } redisClient; @@ -366,6 +385,8 @@ struct redisServer { int daemonize; int appendonly; int appendfsync; + int no_appendfsync_on_rewrite; + int shutdown_asap; time_t lastfsync; int appendfd; int appendseldb; @@ -405,9 +426,11 @@ struct redisServer { off_t vm_page_size; off_t vm_pages; unsigned long long vm_max_memory; - /* Hashes config */ + /* Zip structure config */ size_t hash_max_zipmap_entries; size_t hash_max_zipmap_value; + size_t list_max_ziplist_entries; + size_t list_max_ziplist_value; /* Virtual memory state */ FILE *vm_fp; int vm_fd; @@ -445,6 +468,8 @@ struct redisServer { list *pubsub_patterns; /* A list of pubsub_patterns */ /* Misc */ FILE *devnull; + unsigned lruclock:22; /* clock incrementing every minute, for LRU */ + unsigned lruclock_padding:10; }; typedef struct pubsubPattern { @@ -512,7 +537,7 @@ typedef struct zset { #define REDIS_SHARED_INTEGERS 10000 struct sharedObjectsStruct { - robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space, + robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *cnegone, *pong, *space, *colon, *nullbulk, *nullmultibulk, *queued, *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, *outofrangeerr, *plus, @@ -537,6 +562,9 @@ typedef struct iojob { int type; /* Request type, REDIS_IOJOB_* */ redisDb *db;/* Redis database */ robj *key; /* This I/O request is about swapping this key */ + robj *id; /* Unique identifier of this job: + this is the object to swap for REDIS_IOREQ_*_SWAP, or the + vmpointer objct for REDIS_IOREQ_LOAD. */ robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */ off_t page; /* Swap page where to read/write the object */ @@ -546,6 +574,8 @@ typedef struct iojob { } iojob; /*================================ Prototypes =============================== */ +char *redisGitSHA1(void); +char *redisGitDirty(void); static void freeStringObject(robj *o); static void freeListObject(robj *o); @@ -570,8 +600,7 @@ static robj *getDecodedObject(robj *o); static int removeExpire(redisDb *db, robj *key); static int expireIfNeeded(redisDb *db, robj *key); static int deleteIfVolatile(redisDb *db, robj *key); -static int deleteIfSwapped(redisDb *db, robj *key); -static int deleteKey(redisDb *db, robj *key); +static int dbDelete(redisDb *db, robj *key); static time_t getExpire(redisDb *db, robj *key); static int setExpire(redisDb *db, robj *key, time_t when); static void updateSlavesWaitingBgsave(int bgsaveerr); @@ -593,8 +622,8 @@ static void unblockClientWaitingData(redisClient *c); static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele); static void vmInit(void); static void vmMarkPagesFree(off_t page, off_t count); -static robj *vmLoadObject(robj *key); -static robj *vmPreviewObject(robj *key); +static robj *vmLoadObject(robj *o); +static robj *vmPreviewObject(robj *o); static int vmSwapOneObjectBlocking(void); static int vmSwapOneObjectThreaded(void); static int vmCanSwapOut(void); @@ -622,6 +651,7 @@ static struct redisCommand *lookupCommand(char *name); static void call(redisClient *c, struct redisCommand *cmd); static void resetClient(redisClient *c); static void convertToRealHash(robj *o); +static void listTypeConvert(robj *o, int enc); static int pubsubUnsubscribeAllChannels(redisClient *c, int notify); static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify); static void freePubsubPattern(void *p); @@ -630,7 +660,11 @@ static int compareStringObjects(robj *a, robj *b); static int equalStringObjects(robj *a, robj *b); static void usage(); static int rewriteAppendOnlyFileBackground(void); -static int vmSwapObjectBlocking(robj *key, robj *val); +static vmpointer *vmSwapObjectBlocking(robj *val); +static int prepareForShutdown(); +static void touchWatchedKey(redisDb *db, robj *key); +static void touchWatchedKeysOnFlush(int dbid); +static void unwatchAllKeys(redisClient *c); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); @@ -659,6 +693,9 @@ static void renameCommand(redisClient *c); static void renamenxCommand(redisClient *c); static void lpushCommand(redisClient *c); static void rpushCommand(redisClient *c); +static void lpushxCommand(redisClient *c); +static void rpushxCommand(redisClient *c); +static void linsertCommand(redisClient *c); static void lpopCommand(redisClient *c); static void rpopCommand(redisClient *c); static void llenCommand(redisClient *c); @@ -737,12 +774,15 @@ static void unsubscribeCommand(redisClient *c); static void psubscribeCommand(redisClient *c); static void punsubscribeCommand(redisClient *c); static void publishCommand(redisClient *c); +static void watchCommand(redisClient *c); +static void unwatchCommand(redisClient *c); /*================================= Globals ================================= */ /* Global vars */ static struct redisServer server; /* server global state */ -static struct redisCommand cmdTable[] = { +static struct redisCommand *commandTable; +static struct redisCommand readonlyCommandTable[] = { {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, @@ -756,6 +796,9 @@ static struct redisCommand cmdTable[] = { {"mget",mgetCommand,-2,REDIS_CMD_INLINE,NULL,1,-1,1}, {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"rpushx",rpushxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"lpushx",lpushxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, + {"linsert",linsertCommand,5,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, {"rpop",rpopCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"lpop",lpopCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"brpop",brpopCommand,-3,REDIS_CMD_INLINE,NULL,1,1,1}, @@ -849,7 +892,8 @@ static struct redisCommand cmdTable[] = { {"psubscribe",psubscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0}, {"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0}, - {NULL,NULL,0,0,NULL,0,0,0} + {"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, + {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0} }; /*============================ Utility functions ============================ */ @@ -1095,7 +1139,7 @@ static void dictListDestructor(void *privdata, void *val) listRelease((list*)val); } -static int sdsDictKeyCompare(void *privdata, const void *key1, +static int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2) { int l1,l2; @@ -1115,11 +1159,18 @@ static void dictRedisObjectDestructor(void *privdata, void *val) decrRefCount(val); } +static void dictSdsDestructor(void *privdata, void *val) +{ + DICT_NOTUSED(privdata); + + sdsfree(val); +} + static int dictObjKeyCompare(void *privdata, const void *key1, const void *key2) { const robj *o1 = key1, *o2 = key2; - return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr); + return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); } static unsigned int dictObjHash(const void *key) { @@ -1127,6 +1178,10 @@ static unsigned int dictObjHash(const void *key) { return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr)); } +static unsigned int dictSdsHash(const void *key) { + return dictGenHashFunction((unsigned char*)key, sdslen((char*)key)); +} + static int dictEncObjKeyCompare(void *privdata, const void *key1, const void *key2) { @@ -1139,7 +1194,7 @@ static int dictEncObjKeyCompare(void *privdata, const void *key1, o1 = getDecodedObject(o1); o2 = getDecodedObject(o2); - cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr); + cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr); decrRefCount(o1); decrRefCount(o2); return cmp; @@ -1168,7 +1223,7 @@ static unsigned int dictEncObjHash(const void *key) { } } -/* Sets type and expires */ +/* Sets type */ static dictType setDictType = { dictEncObjHash, /* hash function */ NULL, /* key dup */ @@ -1188,23 +1243,23 @@ static dictType zsetDictType = { dictVanillaFree /* val destructor of malloc(sizeof(double)) */ }; -/* Db->dict */ +/* Db->dict, keys are sds strings, vals are Redis objects. */ static dictType dbDictType = { - dictObjHash, /* hash function */ + dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictObjKeyCompare, /* key compare */ - dictRedisObjectDestructor, /* key destructor */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ dictRedisObjectDestructor /* val destructor */ }; /* Db->expires */ static dictType keyptrDictType = { - dictObjHash, /* hash function */ + dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ - dictObjKeyCompare, /* key compare */ - dictRedisObjectDestructor, /* key destructor */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ NULL /* val destructor */ }; @@ -1373,7 +1428,7 @@ void backgroundRewriteDoneHandler(int statloc) { /* If append only is actually enabled... */ close(server.appendfd); server.appendfd = fd; - fsync(fd); + if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd); server.appendseldb = -1; /* Make sure it will issue SELECT */ redisLog(REDIS_NOTICE,"The new append only file was selected for future appends."); } else { @@ -1419,6 +1474,26 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD * in objects at every object access, and accuracy is not needed. * To access a global var is faster than calling time(NULL) */ server.unixtime = time(NULL); + /* We have just 21 bits per object for LRU information. + * So we use an (eventually wrapping) LRU clock with minutes resolution. + * + * When we need to select what object to swap, we compute the minimum + * time distance between the current lruclock and the object last access + * lruclock info. Even if clocks will wrap on overflow, there is + * the interesting property that we are sure that at least + * ABS(A-B) minutes passed between current time and timestamp B. + * + * This is not precise but we don't need at all precision, but just + * something statistically reasonable. + */ + server.lruclock = (time(NULL)/60)&((1<<21)-1); + + /* We received a SIGTERM, shutting down here in a safe way, as it is + * not ok doing so inside the signal handler. */ + if (server.shutdown_asap) { + if (prepareForShutdown() == REDIS_OK) exit(0); + redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); + } /* Show some info about non-empty databases */ for (j = 0; j < server.dbnum; j++) { @@ -1510,7 +1585,11 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD if ((de = dictGetRandomKey(db->expires)) == NULL) break; t = (time_t) dictGetEntryVal(de); if (now > t) { - deleteKey(db,dictGetEntryKey(de)); + sds key = dictGetEntryKey(de); + robj *keyobj = createStringObject(key,sdslen(key)); + + dbDelete(db,keyobj); + decrRefCount(keyobj); expired++; server.stat_expiredkeys++; } @@ -1599,6 +1678,7 @@ static void createSharedObjects(void) { shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n")); shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n")); shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n")); + shared.cnegone = createObject(REDIS_STRING,sdsnew(":-1\r\n")); shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n")); shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n")); shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n")); @@ -1666,6 +1746,7 @@ static void initServerConfig() { server.daemonize = 0; server.appendonly = 0; server.appendfsync = APPENDFSYNC_EVERYSEC; + server.no_appendfsync_on_rewrite = 0; server.lastfsync = time(NULL); server.appendfd = -1; server.appendseldb = -1; /* Make sure the first time will not match */ @@ -1687,6 +1768,9 @@ static void initServerConfig() { server.vm_blocked_clients = 0; server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES; server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE; + server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES; + server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE; + server.shutdown_asap = 0; resetServerSaveParams(); @@ -1735,7 +1819,8 @@ static void initServer() { for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); - server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL); + server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); + server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); if (server.vm_enabled) server.db[j].io_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; @@ -1920,6 +2005,11 @@ static void loadServerConfig(char *filename) { } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) { zfree(server.appendfilename); server.appendfilename = zstrdup(argv[1]); + } else if (!strcasecmp(argv[0],"no-appendfsync-on-rewrite") + && argc == 2) { + if ((server.no_appendfsync_on_rewrite= yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) { if (!strcasecmp(argv[1],"no")) { server.appendfsync = APPENDFSYNC_NO; @@ -1958,6 +2048,10 @@ static void loadServerConfig(char *filename) { server.hash_max_zipmap_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2){ server.hash_max_zipmap_value = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"list-max-ziplist-entries") && argc == 2){ + server.list_max_ziplist_entries = memtoll(argv[1], NULL); + } else if (!strcasecmp(argv[0],"list-max-ziplist-value") && argc == 2){ + server.list_max_ziplist_value = memtoll(argv[1], NULL); } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -2001,6 +2095,9 @@ static void freeClient(redisClient *c) { if (c->flags & REDIS_BLOCKED) unblockClientWaitingData(c); + /* UNWATCH all the keys */ + unwatchAllKeys(c); + listRelease(c->watched_keys); /* Unsubscribe from all the pubsub channels */ pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeAllPatterns(c,0); @@ -2016,7 +2113,8 @@ static void freeClient(redisClient *c) { ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); - /* Remove from the list of clients waiting for swapped keys */ + /* Remove from the list of clients that are now ready to be restarted + * after waiting for swapped keys */ if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) { ln = listSearchKey(server.io_ready_clients,c); if (ln) { @@ -2024,6 +2122,7 @@ static void freeClient(redisClient *c) { server.vm_blocked_clients--; } } + /* Remove from the list of clients waiting for swapped keys */ while (server.vm_enabled && listLength(c->io_keys)) { ln = listFirst(c->io_keys); dontWaitForSwappedKey(c,ln->value); @@ -2221,13 +2320,29 @@ static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int } } +static int qsortRedisCommands(const void *r1, const void *r2) { + return strcasecmp( + ((struct redisCommand*)r1)->name, + ((struct redisCommand*)r2)->name); +} + +static void sortCommandTable() { + /* Copy and sort the read-only version of the command table */ + commandTable = (struct redisCommand*)malloc(sizeof(readonlyCommandTable)); + memcpy(commandTable,readonlyCommandTable,sizeof(readonlyCommandTable)); + qsort(commandTable, + sizeof(readonlyCommandTable)/sizeof(struct redisCommand), + sizeof(struct redisCommand),qsortRedisCommands); +} + static struct redisCommand *lookupCommand(char *name) { - int j = 0; - while(cmdTable[j].name != NULL) { - if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j]; - j++; - } - return NULL; + struct redisCommand tmp = {name,NULL,0,0,NULL,0,0,0}; + return bsearch( + &tmp, + commandTable, + sizeof(readonlyCommandTable)/sizeof(struct redisCommand), + sizeof(struct redisCommand), + qsortRedisCommands); } /* resetClient prepare the client to process the next command */ @@ -2420,7 +2535,10 @@ static int processCommand(redisClient *c) { } /* Exec the command */ - if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) { + if (c->flags & REDIS_MULTI && + cmd->proc != execCommand && cmd->proc != discardCommand && + cmd->proc != multiCommand && cmd->proc != watchCommand) + { queueMultiCommand(c,cmd); addReply(c,shared.queued); } else { @@ -2715,9 +2833,10 @@ static redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->blockingkeys = NULL; - c->blockingkeysnum = 0; + c->blocking_keys = NULL; + c->blocking_keys_num = 0; c->io_keys = listCreate(); + c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); @@ -2826,6 +2945,12 @@ static void addReplyBulk(redisClient *c, robj *obj) { addReply(c,shared.crlf); } +static void addReplyBulkSds(redisClient *c, sds s) { + robj *o = createStringObject(s, sdslen(s)); + addReplyBulk(c,o); + decrRefCount(o); +} + /* In the CONFIG command we need to add vanilla C string as bulk replies */ static void addReplyBulkCString(redisClient *c, char *s) { if (s == NULL) { @@ -2885,12 +3010,9 @@ static robj *createObject(int type, void *ptr) { listDelNode(server.objfreelist,head); if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); } else { - if (server.vm_enabled) { + if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex); - o = zmalloc(sizeof(*o)); - } else { - o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM)); - } + o = zmalloc(sizeof(*o)); } o->type = type; o->encoding = REDIS_ENCODING_RAW; @@ -2898,10 +3020,10 @@ static robj *createObject(int type, void *ptr) { o->refcount = 1; if (server.vm_enabled) { /* Note that this code may run in the context of an I/O thread - * and accessing to server.unixtime in theory is an error + * and accessing server.lruclock in theory is an error * (no locks). But in practice this is safe, and even if we read - * garbage Redis will not fail, as it's just a statistical info */ - o->vm.atime = server.unixtime; + * garbage Redis will not fail. */ + o->lru = server.lruclock; o->storage = REDIS_VM_MEMORY; } return o; @@ -2917,8 +3039,8 @@ static robj *createStringObjectFromLongLong(long long value) { incrRefCount(shared.integers[value]); o = shared.integers[value]; } else { - o = createObject(REDIS_STRING, NULL); if (value >= LONG_MIN && value <= LONG_MAX) { + o = createObject(REDIS_STRING, NULL); o->encoding = REDIS_ENCODING_INT; o->ptr = (void*)((long)value); } else { @@ -2935,9 +3057,17 @@ static robj *dupStringObject(robj *o) { static robj *createListObject(void) { list *l = listCreate(); - + robj *o = createObject(REDIS_LIST,l); listSetFreeMethod(l,decrRefCount); - return createObject(REDIS_LIST,l); + o->encoding = REDIS_ENCODING_LINKEDLIST; + return o; +} + +static robj *createZiplistObject(void) { + unsigned char *zl = ziplistNew(); + robj *o = createObject(REDIS_LIST,zl); + o->encoding = REDIS_ENCODING_ZIPLIST; + return o; } static robj *createSetObject(void) { @@ -2970,7 +3100,16 @@ static void freeStringObject(robj *o) { } static void freeListObject(robj *o) { - listRelease((list*) o->ptr); + switch (o->encoding) { + case REDIS_ENCODING_LINKEDLIST: + listRelease((list*) o->ptr); + break; + case REDIS_ENCODING_ZIPLIST: + zfree(o->ptr); + break; + default: + redisPanic("Unknown list encoding type"); + } } static void freeSetObject(robj *o) { @@ -3006,28 +3145,31 @@ static void incrRefCount(robj *o) { static void decrRefCount(void *obj) { robj *o = obj; - if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0"); - /* Object is a key of a swapped out value, or in the process of being - * loaded. */ + /* Object is a swapped out value, or in the process of being loaded. */ if (server.vm_enabled && (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING)) { - if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(obj); - redisAssert(o->type == REDIS_STRING); - freeStringObject(o); - vmMarkPagesFree(o->vm.page,o->vm.usedpages); - pthread_mutex_lock(&server.obj_freelist_mutex); - if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX || - !listAddNodeHead(server.objfreelist,o)) - zfree(o); - pthread_mutex_unlock(&server.obj_freelist_mutex); + vmpointer *vp = obj; + if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(o); + vmMarkPagesFree(vp->page,vp->usedpages); server.vm_stats_swapped_objects--; + zfree(vp); return; } - /* Object is in memory, or in the process of being swapped out. */ + + if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0"); + /* Object is in memory, or in the process of being swapped out. + * + * If the object is being swapped out, abort the operation on + * decrRefCount even if the refcount does not drop to 0: the object + * is referenced at least two times, as value of the key AND as + * job->val in the iojob. So if we don't invalidate the iojob, when it is + * done but the relevant key was removed in the meantime, the + * complete jobs handler will not find the key about the job and the + * assert will fail. */ + if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING) + vmCancelThreadedIOJob(o); if (--(o->refcount) == 0) { - if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING) - vmCancelThreadedIOJob(obj); switch(o->type) { case REDIS_STRING: freeStringObject(o); break; case REDIS_LIST: freeListObject(o); break; @@ -3044,63 +3186,6 @@ static void decrRefCount(void *obj) { } } -static robj *lookupKey(redisDb *db, robj *key) { - dictEntry *de = dictFind(db->dict,key); - if (de) { - robj *key = dictGetEntryKey(de); - robj *val = dictGetEntryVal(de); - - if (server.vm_enabled) { - if (key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) - { - /* If we were swapping the object out, stop it, this key - * was requested. */ - if (key->storage == REDIS_VM_SWAPPING) - vmCancelThreadedIOJob(key); - /* Update the access time of the key for the aging algorithm. */ - key->vm.atime = server.unixtime; - } else { - int notify = (key->storage == REDIS_VM_LOADING); - - /* Our value was swapped on disk. Bring it at home. */ - redisAssert(val == NULL); - val = vmLoadObject(key); - dictGetEntryVal(de) = val; - - /* Clients blocked by the VM subsystem may be waiting for - * this key... */ - if (notify) handleClientsBlockedOnSwappedKey(db,key); - } - } - return val; - } else { - return NULL; - } -} - -static robj *lookupKeyRead(redisDb *db, robj *key) { - expireIfNeeded(db,key); - return lookupKey(db,key); -} - -static robj *lookupKeyWrite(redisDb *db, robj *key) { - deleteIfVolatile(db,key); - return lookupKey(db,key); -} - -static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) { - robj *o = lookupKeyRead(c->db, key); - if (!o) addReply(c,reply); - return o; -} - -static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) { - robj *o = lookupKeyWrite(c->db, key); - if (!o) addReply(c,reply); - return o; -} - static int checkType(redisClient *c, robj *o, int type) { if (o->type != type) { addReply(c,shared.wrongtypeerr); @@ -3109,21 +3194,6 @@ static int checkType(redisClient *c, robj *o, int type) { return 0; } -static int deleteKey(redisDb *db, robj *key) { - int retval; - - /* We need to protect key from destruction: after the first dictDelete() - * it may happen that 'key' is no longer valid if we don't increment - * it's count. This may happen when we get the object reference directly - * from the hash table with dictRandomKey() or dict iterators */ - incrRefCount(key); - if (dictSize(db->expires)) dictDelete(db->expires,key); - retval = dictDelete(db->dict,key); - decrRefCount(key); - - return retval == DICT_OK; -} - /* Check if the nul-terminated string 's' can be represented by a long * (that is, is a number that fits into long without any other space or * character before or after the digits). @@ -3343,6 +3413,132 @@ static int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const return REDIS_OK; } +/* =========================== Keyspace access API ========================== */ + +static robj *lookupKey(redisDb *db, robj *key) { + dictEntry *de = dictFind(db->dict,key->ptr); + if (de) { + robj *val = dictGetEntryVal(de); + + if (server.vm_enabled) { + if (val->storage == REDIS_VM_MEMORY || + val->storage == REDIS_VM_SWAPPING) + { + /* If we were swapping the object out, cancel the operation */ + if (val->storage == REDIS_VM_SWAPPING) + vmCancelThreadedIOJob(val); + /* Update the access time for the aging algorithm. */ + val->lru = server.lruclock; + } else { + int notify = (val->storage == REDIS_VM_LOADING); + + /* Our value was swapped on disk. Bring it at home. */ + redisAssert(val->type == REDIS_VMPOINTER); + val = vmLoadObject(val); + dictGetEntryVal(de) = val; + + /* Clients blocked by the VM subsystem may be waiting for + * this key... */ + if (notify) handleClientsBlockedOnSwappedKey(db,key); + } + } + return val; + } else { + return NULL; + } +} + +static robj *lookupKeyRead(redisDb *db, robj *key) { + expireIfNeeded(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyWrite(redisDb *db, robj *key) { + deleteIfVolatile(db,key); + touchWatchedKey(db,key); + return lookupKey(db,key); +} + +static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) { + robj *o = lookupKeyRead(c->db, key); + if (!o) addReply(c,reply); + return o; +} + +static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) { + robj *o = lookupKeyWrite(c->db, key); + if (!o) addReply(c,reply); + return o; +} + +/* Add the key to the DB. If the key already exists REDIS_ERR is returned, + * otherwise REDIS_OK is returned, and the caller should increment the + * refcount of 'val'. */ +static int dbAdd(redisDb *db, robj *key, robj *val) { + /* Perform a lookup before adding the key, as we need to copy the + * key value. */ + if (dictFind(db->dict, key->ptr) != NULL) { + return REDIS_ERR; + } else { + sds copy = sdsdup(key->ptr); + dictAdd(db->dict, copy, val); + return REDIS_OK; + } +} + +/* If the key does not exist, this is just like dbAdd(). Otherwise + * the value associated to the key is replaced with the new one. + * + * On update (key already existed) 0 is returned. Otherwise 1. */ +static int dbReplace(redisDb *db, robj *key, robj *val) { + if (dictFind(db->dict,key->ptr) == NULL) { + sds copy = sdsdup(key->ptr); + dictAdd(db->dict, copy, val); + return 1; + } else { + dictReplace(db->dict, key->ptr, val); + return 0; + } +} + +static int dbExists(redisDb *db, robj *key) { + return dictFind(db->dict,key->ptr) != NULL; +} + +/* Return a random key, in form of a Redis object. + * If there are no keys, NULL is returned. + * + * The function makes sure to return keys not already expired. */ +static robj *dbRandomKey(redisDb *db) { + struct dictEntry *de; + + while(1) { + sds key; + robj *keyobj; + + de = dictGetRandomKey(db->dict); + if (de == NULL) return NULL; + + key = dictGetEntryKey(de); + keyobj = createStringObject(key,sdslen(key)); + if (dictFind(db->expires,key)) { + if (expireIfNeeded(db,keyobj)) { + decrRefCount(keyobj); + continue; /* search for another key. This expired. */ + } + } + return keyobj; + } +} + +/* Delete a key, value, and associated expiration entry if any, from the DB */ +static int dbDelete(redisDb *db, robj *key) { + /* Deleting an entry from the expires dict will not free the sds of + * the key, because it is shared with the main dictionary. */ + if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); + return dictDelete(db->dict,key->ptr) == DICT_OK; +} + /*============================ RDB saving/loading =========================== */ static int rdbSaveType(FILE *fp, unsigned char type) { @@ -3485,38 +3681,32 @@ static int rdbSaveRawString(FILE *fp, unsigned char *s, size_t len) { return 0; } +/* Save a long long value as either an encoded string or a string. */ +static int rdbSaveLongLongAsStringObject(FILE *fp, long long value) { + unsigned char buf[32]; + int enclen = rdbEncodeInteger(value,buf); + if (enclen > 0) { + if (fwrite(buf,enclen,1,fp) == 0) return -1; + } else { + /* Encode as string */ + enclen = ll2string((char*)buf,32,value); + redisAssert(enclen < 32); + if (rdbSaveLen(fp,enclen) == -1) return -1; + if (fwrite(buf,enclen,1,fp) == 0) return -1; + } + return 0; +} + /* Like rdbSaveStringObjectRaw() but handle encoded objects */ static int rdbSaveStringObject(FILE *fp, robj *obj) { - int retval; - /* Avoid to decode the object, then encode it again, if the * object is alrady integer encoded. */ if (obj->encoding == REDIS_ENCODING_INT) { - long val = (long) obj->ptr; - unsigned char buf[5]; - int enclen; - - if ((enclen = rdbEncodeInteger(val,buf)) > 0) { - if (fwrite(buf,enclen,1,fp) == 0) return -1; - return 0; - } - /* otherwise... fall throught and continue with the usual - * code path. */ - } - - /* Avoid incr/decr ref count business when possible. - * This plays well with copy-on-write given that we are probably - * in a child process (BGSAVE). Also this makes sure key objects - * of swapped objects are not incRefCount-ed (an assert does not allow - * this in order to avoid bugs) */ - if (obj->encoding != REDIS_ENCODING_RAW) { - obj = getDecodedObject(obj); - retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); - decrRefCount(obj); + return rdbSaveLongLongAsStringObject(fp,(long)obj->ptr); } else { - retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); + redisAssert(obj->encoding == REDIS_ENCODING_RAW); + return rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); } - return retval; } /* Save a double value. Doubles are saved as strings prefixed by an unsigned @@ -3569,16 +3759,37 @@ static int rdbSaveObject(FILE *fp, robj *o) { if (rdbSaveStringObject(fp,o) == -1) return -1; } else if (o->type == REDIS_LIST) { /* Save a list value */ - list *list = o->ptr; - listIter li; - listNode *ln; - - if (rdbSaveLen(fp,listLength(list)) == -1) return -1; - listRewind(list,&li); - while((ln = listNext(&li))) { - robj *eleobj = listNodeValue(ln); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + if (rdbSaveLen(fp,ziplistLen(o->ptr)) == -1) return -1; + p = ziplistIndex(o->ptr,0); + while(ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + if (rdbSaveRawString(fp,vstr,vlen) == -1) + return -1; + } else { + if (rdbSaveLongLongAsStringObject(fp,vlong) == -1) + return -1; + } + p = ziplistNext(o->ptr,p); + } + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + list *list = o->ptr; + listIter li; + listNode *ln; - if (rdbSaveStringObject(fp,eleobj) == -1) return -1; + if (rdbSaveLen(fp,listLength(list)) == -1) return -1; + listRewind(list,&li); + while((ln = listNext(&li))) { + robj *eleobj = listNodeValue(ln); + if (rdbSaveStringObject(fp,eleobj) == -1) return -1; + } + } else { + redisPanic("Unknown list encoding"); } } else if (o->type == REDIS_SET) { /* Save a set value */ @@ -3697,9 +3908,12 @@ static int rdbSave(char *filename) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - robj *key = dictGetEntryKey(de); - robj *o = dictGetEntryVal(de); - time_t expiretime = getExpire(db,key); + sds keystr = dictGetEntryKey(de); + robj key, *o = dictGetEntryVal(de); + time_t expiretime; + + initStaticStringObject(key,keystr); + expiretime = getExpire(db,&key); /* Save the expire time */ if (expiretime != -1) { @@ -3710,20 +3924,20 @@ static int rdbSave(char *filename) { } /* Save the key and associated value. This requires special * handling if the value is swapped out. */ - if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) { + if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || + o->storage == REDIS_VM_SWAPPING) { /* Save type, key, value */ if (rdbSaveType(fp,o->type) == -1) goto werr; - if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,o) == -1) goto werr; } else { /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ robj *po; /* Get a preview of the object in memory */ - po = vmPreviewObject(key); + po = vmPreviewObject(o); /* Save type, key, value */ - if (rdbSaveType(fp,key->vtype) == -1) goto werr; - if (rdbSaveStringObject(fp,key) == -1) goto werr; + if (rdbSaveType(fp,po->type) == -1) goto werr; + if (rdbSaveStringObject(fp,&key) == -1) goto werr; if (rdbSaveObject(fp,po) == -1) goto werr; /* Remove the loaded object from memory */ decrRefCount(po); @@ -3945,34 +4159,59 @@ static int rdbLoadDoubleValue(FILE *fp, double *val) { /* Load a Redis object of the specified type from the specified file. * On success a newly allocated object is returned, otherwise NULL. */ static robj *rdbLoadObject(int type, FILE *fp) { - robj *o; + robj *o, *ele, *dec; + size_t len; redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp)); if (type == REDIS_STRING) { /* Read string value */ if ((o = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; o = tryObjectEncoding(o); - } else if (type == REDIS_LIST || type == REDIS_SET) { - /* Read list/set value */ - uint32_t listlen; + } else if (type == REDIS_LIST) { + /* Read list value */ + if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; - if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; - o = (type == REDIS_LIST) ? createListObject() : createSetObject(); + /* Use a real list when there are too many entries */ + if (len > server.list_max_ziplist_entries) { + o = createListObject(); + } else { + o = createZiplistObject(); + } + + /* Load every single element of the list */ + while(len--) { + if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + + /* If we are using a ziplist and the value is too big, convert + * the object to a real list. */ + if (o->encoding == REDIS_ENCODING_ZIPLIST && + ele->encoding == REDIS_ENCODING_RAW && + sdslen(ele->ptr) > server.list_max_ziplist_value) + listTypeConvert(o,REDIS_ENCODING_LINKEDLIST); + + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + dec = getDecodedObject(ele); + o->ptr = ziplistPush(o->ptr,dec->ptr,sdslen(dec->ptr),REDIS_TAIL); + decrRefCount(dec); + decrRefCount(ele); + } else { + ele = tryObjectEncoding(ele); + listAddNodeTail(o->ptr,ele); + } + } + } else if (type == REDIS_SET) { + /* Read list/set value */ + if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; + o = createSetObject(); /* It's faster to expand the dict to the right size asap in order * to avoid rehashing */ - if (type == REDIS_SET && listlen > DICT_HT_INITIAL_SIZE) - dictExpand(o->ptr,listlen); + if (len > DICT_HT_INITIAL_SIZE) + dictExpand(o->ptr,len); /* Load every single element of the list/set */ - while(listlen--) { - robj *ele; - + while(len--) { if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; ele = tryObjectEncoding(ele); - if (type == REDIS_LIST) { - listAddNodeTail((list*)o->ptr,ele); - } else { - dictAdd((dict*)o->ptr,ele,NULL); - } + dictAdd((dict*)o->ptr,ele,NULL); } } else if (type == REDIS_ZSET) { /* Read list/set value */ @@ -4007,23 +4246,31 @@ static robj *rdbLoadObject(int type, FILE *fp) { while(hashlen--) { robj *key, *val; - if ((key = rdbLoadStringObject(fp)) == NULL) return NULL; - if ((val = rdbLoadStringObject(fp)) == NULL) return NULL; + if ((key = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + if ((val = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; /* If we are using a zipmap and there are too big values * the object is converted to real hash table encoding. */ if (o->encoding != REDIS_ENCODING_HT && - (sdslen(key->ptr) > server.hash_max_zipmap_value || - sdslen(val->ptr) > server.hash_max_zipmap_value)) + ((key->encoding == REDIS_ENCODING_RAW && + sdslen(key->ptr) > server.hash_max_zipmap_value) || + (val->encoding == REDIS_ENCODING_RAW && + sdslen(val->ptr) > server.hash_max_zipmap_value))) { convertToRealHash(o); } if (o->encoding == REDIS_ENCODING_ZIPMAP) { unsigned char *zm = o->ptr; + robj *deckey, *decval; - zm = zipmapSet(zm,key->ptr,sdslen(key->ptr), - val->ptr,sdslen(val->ptr),NULL); + /* We need raw string objects to add them to the zipmap */ + deckey = getDecodedObject(key); + decval = getDecodedObject(val); + zm = zipmapSet(zm,deckey->ptr,sdslen(deckey->ptr), + decval->ptr,sdslen(decval->ptr),NULL); o->ptr = zm; + decrRefCount(deckey); + decrRefCount(decval); decrRefCount(key); decrRefCount(val); } else { @@ -4043,11 +4290,9 @@ static int rdbLoad(char *filename) { uint32_t dbid; int type, retval, rdbver; int swap_all_values = 0; - dict *d = server.db[0].dict; redisDb *db = server.db+0; char buf[1024]; time_t expiretime, now = time(NULL); - long long loadedkeys = 0; fp = fopen(filename,"r"); if (!fp) return REDIS_ERR; @@ -4066,6 +4311,7 @@ static int rdbLoad(char *filename) { } while(1) { robj *key, *val; + int force_swapout; expiretime = -1; /* Read type. */ @@ -4085,7 +4331,6 @@ static int rdbLoad(char *filename) { exit(1); } db = server.db+dbid; - d = db->dict; continue; } /* Read key */ @@ -4099,12 +4344,11 @@ static int rdbLoad(char *filename) { continue; } /* Add the new object in the hash table */ - retval = dictAdd(d,key,val); - if (retval == DICT_ERR) { + retval = dbAdd(db,key,val); + if (retval == REDIS_ERR) { redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", key->ptr); exit(1); } - loadedkeys++; /* Set the expire time if needed */ if (expiretime != -1) setExpire(db,key,expiretime); @@ -4116,23 +4360,30 @@ static int rdbLoad(char *filename) { * to random sampling, otherwise we may try to swap already * swapped keys. */ if (swap_all_values) { - dictEntry *de = dictFind(d,key); + dictEntry *de = dictFind(db->dict,key->ptr); /* de may be NULL since the key already expired */ if (de) { - key = dictGetEntryKey(de); + vmpointer *vp; val = dictGetEntryVal(de); - if (vmSwapObjectBlocking(key,val) == REDIS_OK) { - dictGetEntryVal(de) = NULL; - } + if (val->refcount == 1 && + (vp = vmSwapObjectBlocking(val)) != NULL) + dictGetEntryVal(de) = vp; } + decrRefCount(key); continue; } + decrRefCount(key); + + /* Flush data on disk once 32 MB of additional RAM are used... */ + force_swapout = 0; + if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) + force_swapout = 1; /* If we have still some hope of having some value fitting memory * then we try random sampling. */ - if (!swap_all_values && server.vm_enabled && (loadedkeys % 5000) == 0) { + if (!swap_all_values && server.vm_enabled && force_swapout) { while (zmalloc_used_memory() > server.vm_max_memory) { if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } @@ -4149,21 +4400,56 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ return REDIS_ERR; /* Just to avoid warning */ } -/*================================== Commands =============================== */ - -static void authCommand(redisClient *c) { - if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) { - c->authenticated = 1; - addReply(c,shared.ok); - } else { - c->authenticated = 0; - addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n")); - } -} - -static void pingCommand(redisClient *c) { - addReply(c,shared.pong); -} +/*================================== Shutdown =============================== */ +static int prepareForShutdown() { + redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); + /* Kill the saving child if there is a background saving in progress. + We want to avoid race conditions, for instance our saving child may + overwrite the synchronous saving did by SHUTDOWN. */ + if (server.bgsavechildpid != -1) { + redisLog(REDIS_WARNING,"There is a live saving child. Killing it!"); + kill(server.bgsavechildpid,SIGKILL); + rdbRemoveTempFile(server.bgsavechildpid); + } + if (server.appendonly) { + /* Append only file: fsync() the AOF and exit */ + aof_fsync(server.appendfd); + if (server.vm_enabled) unlink(server.vm_swap_file); + } else { + /* Snapshotting. Perform a SYNC SAVE and exit */ + if (rdbSave(server.dbfilename) == REDIS_OK) { + if (server.daemonize) + unlink(server.pidfile); + redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); + } else { + /* Ooops.. error saving! The best we can do is to continue + * operating. Note that if there was a background saving process, + * in the next cron() Redis will be notified that the background + * saving aborted, handling special stuff like slaves pending for + * synchronization... */ + redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); + return REDIS_ERR; + } + } + redisLog(REDIS_WARNING,"Server exit now, bye bye..."); + return REDIS_OK; +} + +/*================================== Commands =============================== */ + +static void authCommand(redisClient *c) { + if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) { + c->authenticated = 1; + addReply(c,shared.ok); + } else { + c->authenticated = 0; + addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n")); + } +} + +static void pingCommand(redisClient *c) { + addReply(c,shared.pong); +} static void echoCommand(redisClient *c) { addReplyBulk(c,c->argv[1]); @@ -4184,24 +4470,18 @@ static void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj } } + touchWatchedKey(c->db,key); if (nx) deleteIfVolatile(c->db,key); - retval = dictAdd(c->db->dict,key,val); - if (retval == DICT_ERR) { + retval = dbAdd(c->db,key,val); + if (retval == REDIS_ERR) { if (!nx) { - /* If the key is about a swapped value, we want a new key object - * to overwrite the old. So we delete the old key in the database. - * This will also make sure that swap pages about the old object - * will be marked as free. */ - if (server.vm_enabled && deleteIfSwapped(c->db,key)) - incrRefCount(key); - dictReplace(c->db->dict,key,val); + dbReplace(c->db,key,val); incrRefCount(val); } else { addReply(c,shared.czero); return; } } else { - incrRefCount(key); incrRefCount(val); } server.dirty++; @@ -4243,11 +4523,7 @@ static void getCommand(redisClient *c) { static void getsetCommand(redisClient *c) { if (getGenericCommand(c) == REDIS_ERR) return; - if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) { - dictReplace(c->db->dict,c->argv[1],c->argv[2]); - } else { - incrRefCount(c->argv[1]); - } + dbReplace(c->db,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); server.dirty++; removeExpire(c->db,c->argv[1]); @@ -4293,17 +4569,9 @@ static void msetGenericCommand(redisClient *c, int nx) { } for (j = 1; j < c->argc; j += 2) { - int retval; - c->argv[j+1] = tryObjectEncoding(c->argv[j+1]); - retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]); - if (retval == DICT_ERR) { - dictReplace(c->db->dict,c->argv[j],c->argv[j+1]); - incrRefCount(c->argv[j+1]); - } else { - incrRefCount(c->argv[j]); - incrRefCount(c->argv[j+1]); - } + dbReplace(c->db,c->argv[j],c->argv[j+1]); + incrRefCount(c->argv[j+1]); removeExpire(c->db,c->argv[j]); } server.dirty += (c->argc-1)/2; @@ -4320,7 +4588,6 @@ static void msetnxCommand(redisClient *c) { static void incrDecrCommand(redisClient *c, long long incr) { long long value; - int retval; robj *o; o = lookupKeyWrite(c->db,c->argv[1]); @@ -4329,13 +4596,7 @@ static void incrDecrCommand(redisClient *c, long long incr) { value += incr; o = createStringObjectFromLongLong(value); - retval = dictAdd(c->db->dict,c->argv[1],o); - if (retval == DICT_ERR) { - dictReplace(c->db->dict,c->argv[1],o); - removeExpire(c->db,c->argv[1]); - } else { - incrRefCount(c->argv[1]); - } + dbReplace(c->db,c->argv[1],o); server.dirty++; addReply(c,shared.colon); addReply(c,o); @@ -4372,17 +4633,10 @@ static void appendCommand(redisClient *c) { o = lookupKeyWrite(c->db,c->argv[1]); if (o == NULL) { /* Create the key */ - retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); - incrRefCount(c->argv[1]); + retval = dbAdd(c->db,c->argv[1],c->argv[2]); incrRefCount(c->argv[2]); totlen = stringObjectLen(c->argv[2]); } else { - dictEntry *de; - - de = dictFind(c->db->dict,c->argv[1]); - assert(de != NULL); - - o = dictGetEntryVal(de); if (o->type != REDIS_STRING) { addReply(c,shared.wrongtypeerr); return; @@ -4394,7 +4648,7 @@ static void appendCommand(redisClient *c) { o = createStringObject(decoded->ptr, sdslen(decoded->ptr)); decrRefCount(decoded); - dictReplace(c->db->dict,c->argv[1],o); + dbReplace(c->db,c->argv[1],o); } /* APPEND! */ if (c->argv[2]->encoding == REDIS_ENCODING_RAW) { @@ -4453,7 +4707,8 @@ static void delCommand(redisClient *c) { int deleted = 0, j; for (j = 1; j < c->argc; j++) { - if (deleteKey(c->db,c->argv[j])) { + if (dbDelete(c->db,c->argv[j])) { + touchWatchedKey(c->db,c->argv[j]); server.dirty++; deleted++; } @@ -4463,7 +4718,7 @@ static void delCommand(redisClient *c) { static void existsCommand(redisClient *c) { expireIfNeeded(c->db,c->argv[1]); - if (dictFind(c->db->dict,c->argv[1])) { + if (dbExists(c->db,c->argv[1])) { addReply(c, shared.cone); } else { addReply(c, shared.czero); @@ -4481,27 +4736,15 @@ static void selectCommand(redisClient *c) { } static void randomkeyCommand(redisClient *c) { - dictEntry *de; robj *key; - while(1) { - de = dictGetRandomKey(c->db->dict); - if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break; - } - - if (de == NULL) { + if ((key = dbRandomKey(c->db)) == NULL) { addReply(c,shared.nullbulk); return; } - key = dictGetEntryKey(de); - if (server.vm_enabled) { - key = dupStringObject(key); - addReplyBulk(c,key); - decrRefCount(key); - } else { - addReplyBulk(c,key); - } + addReplyBulk(c,key); + decrRefCount(key); } static void keysCommand(redisClient *c) { @@ -4516,15 +4759,17 @@ static void keysCommand(redisClient *c) { addReply(c,lenobj); decrRefCount(lenobj); while((de = dictNext(di)) != NULL) { - robj *keyobj = dictGetEntryKey(de); + sds key = dictGetEntryKey(de); + robj *keyobj; - sds key = keyobj->ptr; if ((pattern[0] == '*' && pattern[1] == '\0') || stringmatchlen(pattern,plen,key,sdslen(key),0)) { + keyobj = createStringObject(key,sdslen(key)); if (expireIfNeeded(c->db,keyobj) == 0) { addReplyBulk(c,keyobj); numkeys++; } + decrRefCount(keyobj); } } dictReleaseIterator(di); @@ -4588,39 +4833,9 @@ static void bgsaveCommand(redisClient *c) { } static void shutdownCommand(redisClient *c) { - redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); - /* Kill the saving child if there is a background saving in progress. - We want to avoid race conditions, for instance our saving child may - overwrite the synchronous saving did by SHUTDOWN. */ - if (server.bgsavechildpid != -1) { - redisLog(REDIS_WARNING,"There is a live saving child. Killing it!"); - kill(server.bgsavechildpid,SIGKILL); - rdbRemoveTempFile(server.bgsavechildpid); - } - if (server.appendonly) { - /* Append only file: fsync() the AOF and exit */ - fsync(server.appendfd); - if (server.vm_enabled) unlink(server.vm_swap_file); + if (prepareForShutdown() == REDIS_OK) exit(0); - } else { - /* Snapshotting. Perform a SYNC SAVE and exit */ - if (rdbSave(server.dbfilename) == REDIS_OK) { - if (server.daemonize) - unlink(server.pidfile); - redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); - redisLog(REDIS_WARNING,"Server exit now, bye bye..."); - exit(0); - } else { - /* Ooops.. error saving! The best we can do is to continue - * operating. Note that if there was a background saving process, - * in the next cron() Redis will be notified that the background - * saving aborted, handling special stuff like slaves pending for - * synchronization... */ - redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); - addReplySds(c, - sdsnew("-ERR can't quit, problems saving the DB\r\n")); - } - } + addReplySds(c, sdsnew("-ERR Errors trying to SHUTDOWN. Check logs.\r\n")); } static void renameGenericCommand(redisClient *c, int nx) { @@ -4637,17 +4852,16 @@ static void renameGenericCommand(redisClient *c, int nx) { incrRefCount(o); deleteIfVolatile(c->db,c->argv[2]); - if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) { + if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) { if (nx) { decrRefCount(o); addReply(c,shared.czero); return; } - dictReplace(c->db->dict,c->argv[2],o); - } else { - incrRefCount(c->argv[2]); + dbReplace(c->db,c->argv[2],o); } - deleteKey(c->db,c->argv[1]); + dbDelete(c->db,c->argv[1]); + touchWatchedKey(c->db,c->argv[2]); server.dirty++; addReply(c,nx ? shared.cone : shared.ok); } @@ -4691,40 +4905,296 @@ static void moveCommand(redisClient *c) { /* Try to add the element to the target DB */ deleteIfVolatile(dst,c->argv[1]); - if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) { + if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) { addReply(c,shared.czero); return; } - incrRefCount(c->argv[1]); incrRefCount(o); /* OK! key moved, free the entry in the source DB */ - deleteKey(src,c->argv[1]); + dbDelete(src,c->argv[1]); server.dirty++; addReply(c,shared.cone); } /* =================================== Lists ================================ */ -static void pushGenericCommand(redisClient *c, int where) { - robj *lobj; - list *list; - lobj = lookupKeyWrite(c->db,c->argv[1]); + +/* Check the argument length to see if it requires us to convert the ziplist + * to a real list. Only check raw-encoded objects because integer encoded + * objects are never too long. */ +static void listTypeTryConversion(robj *subject, robj *value) { + if (subject->encoding != REDIS_ENCODING_ZIPLIST) return; + if (value->encoding == REDIS_ENCODING_RAW && + sdslen(value->ptr) > server.list_max_ziplist_value) + listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); +} + +static void listTypePush(robj *subject, robj *value, int where) { + /* Check if we need to convert the ziplist */ + listTypeTryConversion(subject,value); + if (subject->encoding == REDIS_ENCODING_ZIPLIST && + ziplistLen(subject->ptr) >= server.list_max_ziplist_entries) + listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); + + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL; + value = getDecodedObject(value); + subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos); + decrRefCount(value); + } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) { + if (where == REDIS_HEAD) { + listAddNodeHead(subject->ptr,value); + } else { + listAddNodeTail(subject->ptr,value); + } + incrRefCount(value); + } else { + redisPanic("Unknown list encoding"); + } +} + +static robj *listTypePop(robj *subject, int where) { + robj *value = NULL; + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + int pos = (where == REDIS_HEAD) ? 0 : -1; + p = ziplistIndex(subject->ptr,pos); + if (ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + /* We only need to delete an element when it exists */ + subject->ptr = ziplistDelete(subject->ptr,&p); + } + } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) { + list *list = subject->ptr; + listNode *ln; + if (where == REDIS_HEAD) { + ln = listFirst(list); + } else { + ln = listLast(list); + } + if (ln != NULL) { + value = listNodeValue(ln); + incrRefCount(value); + listDelNode(list,ln); + } + } else { + redisPanic("Unknown list encoding"); + } + return value; +} + +static unsigned long listTypeLength(robj *subject) { + if (subject->encoding == REDIS_ENCODING_ZIPLIST) { + return ziplistLen(subject->ptr); + } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) { + return listLength((list*)subject->ptr); + } else { + redisPanic("Unknown list encoding"); + } +} + +/* Structure to hold set iteration abstraction. */ +typedef struct { + robj *subject; + unsigned char encoding; + unsigned char direction; /* Iteration direction */ + unsigned char *zi; + listNode *ln; +} listTypeIterator; + +/* Structure for an entry while iterating over a list. */ +typedef struct { + listTypeIterator *li; + unsigned char *zi; /* Entry in ziplist */ + listNode *ln; /* Entry in linked list */ +} listTypeEntry; + +/* Initialize an iterator at the specified index. */ +static listTypeIterator *listTypeInitIterator(robj *subject, int index, unsigned char direction) { + listTypeIterator *li = zmalloc(sizeof(listTypeIterator)); + li->subject = subject; + li->encoding = subject->encoding; + li->direction = direction; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + li->zi = ziplistIndex(subject->ptr,index); + } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) { + li->ln = listIndex(subject->ptr,index); + } else { + redisPanic("Unknown list encoding"); + } + return li; +} + +/* Clean up the iterator. */ +static void listTypeReleaseIterator(listTypeIterator *li) { + zfree(li); +} + +/* Stores pointer to current the entry in the provided entry structure + * and advances the position of the iterator. Returns 1 when the current + * entry is in fact an entry, 0 otherwise. */ +static int listTypeNext(listTypeIterator *li, listTypeEntry *entry) { + /* Protect from converting when iterating */ + redisAssert(li->subject->encoding == li->encoding); + + entry->li = li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + entry->zi = li->zi; + if (entry->zi != NULL) { + if (li->direction == REDIS_TAIL) + li->zi = ziplistNext(li->subject->ptr,li->zi); + else + li->zi = ziplistPrev(li->subject->ptr,li->zi); + return 1; + } + } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) { + entry->ln = li->ln; + if (entry->ln != NULL) { + if (li->direction == REDIS_TAIL) + li->ln = li->ln->next; + else + li->ln = li->ln->prev; + return 1; + } + } else { + redisPanic("Unknown list encoding"); + } + return 0; +} + +/* Return entry or NULL at the current position of the iterator. */ +static robj *listTypeGet(listTypeEntry *entry) { + listTypeIterator *li = entry->li; + robj *value = NULL; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *vstr; + unsigned int vlen; + long long vlong; + redisAssert(entry->zi != NULL); + if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + } + } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) { + redisAssert(entry->ln != NULL); + value = listNodeValue(entry->ln); + incrRefCount(value); + } else { + redisPanic("Unknown list encoding"); + } + return value; +} + +static void listTypeInsert(listTypeEntry *entry, robj *value, int where) { + robj *subject = entry->li->subject; + if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) { + value = getDecodedObject(value); + if (where == REDIS_TAIL) { + unsigned char *next = ziplistNext(subject->ptr,entry->zi); + + /* When we insert after the current element, but the current element + * is the tail of the list, we need to do a push. */ + if (next == NULL) { + subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL); + } else { + subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr)); + } + } else { + subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr)); + } + decrRefCount(value); + } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) { + if (where == REDIS_TAIL) { + listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL); + } else { + listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD); + } + incrRefCount(value); + } else { + redisPanic("Unknown list encoding"); + } +} + +/* Compare the given object with the entry at the current position. */ +static int listTypeEqual(listTypeEntry *entry, robj *o) { + listTypeIterator *li = entry->li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + redisAssert(o->encoding == REDIS_ENCODING_RAW); + return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr)); + } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) { + return equalStringObjects(o,listNodeValue(entry->ln)); + } else { + redisPanic("Unknown list encoding"); + } +} + +/* Delete the element pointed to. */ +static void listTypeDelete(listTypeEntry *entry) { + listTypeIterator *li = entry->li; + if (li->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p = entry->zi; + li->subject->ptr = ziplistDelete(li->subject->ptr,&p); + + /* Update position of the iterator depending on the direction */ + if (li->direction == REDIS_TAIL) + li->zi = p; + else + li->zi = ziplistPrev(li->subject->ptr,p); + } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) { + listNode *next; + if (li->direction == REDIS_TAIL) + next = entry->ln->next; + else + next = entry->ln->prev; + listDelNode(li->subject->ptr,entry->ln); + li->ln = next; + } else { + redisPanic("Unknown list encoding"); + } +} + +static void listTypeConvert(robj *subject, int enc) { + listTypeIterator *li; + listTypeEntry entry; + redisAssert(subject->type == REDIS_LIST); + + if (enc == REDIS_ENCODING_LINKEDLIST) { + list *l = listCreate(); + listSetFreeMethod(l,decrRefCount); + + /* listTypeGet returns a robj with incremented refcount */ + li = listTypeInitIterator(subject,0,REDIS_TAIL); + while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry)); + listTypeReleaseIterator(li); + + subject->encoding = REDIS_ENCODING_LINKEDLIST; + zfree(subject->ptr); + subject->ptr = l; + } else { + redisPanic("Unsupported list conversion"); + } +} + +static void pushGenericCommand(redisClient *c, int where) { + robj *lobj = lookupKeyWrite(c->db,c->argv[1]); if (lobj == NULL) { if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) { addReply(c,shared.cone); return; } - lobj = createListObject(); - list = lobj->ptr; - if (where == REDIS_HEAD) { - listAddNodeHead(list,c->argv[2]); - } else { - listAddNodeTail(list,c->argv[2]); - } - dictAdd(c->db->dict,c->argv[1],lobj); - incrRefCount(c->argv[1]); - incrRefCount(c->argv[2]); + lobj = createZiplistObject(); + dbAdd(c->db,c->argv[1],lobj); } else { if (lobj->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); @@ -4734,16 +5204,10 @@ static void pushGenericCommand(redisClient *c, int where) { addReply(c,shared.cone); return; } - list = lobj->ptr; - if (where == REDIS_HEAD) { - listAddNodeHead(list,c->argv[2]); - } else { - listAddNodeTail(list,c->argv[2]); - } - incrRefCount(c->argv[2]); } + listTypePush(lobj,c->argv[2],where); + addReplyLongLong(c,listTypeLength(lobj)); server.dirty++; - addReplyLongLong(c,listLength(list)); } static void lpushCommand(redisClient *c) { @@ -4754,81 +5218,164 @@ static void rpushCommand(redisClient *c) { pushGenericCommand(c,REDIS_TAIL); } -static void llenCommand(redisClient *c) { - robj *o; - list *l; +static void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) { + robj *subject; + listTypeIterator *iter; + listTypeEntry entry; + int inserted = 0; + + if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || + checkType(c,subject,REDIS_LIST)) return; + + if (refval != NULL) { + /* Note: we expect refval to be string-encoded because it is *not* the + * last argument of the multi-bulk LINSERT. */ + redisAssert(refval->encoding == REDIS_ENCODING_RAW); + + /* We're not sure if this value can be inserted yet, but we cannot + * convert the list inside the iterator. We don't want to loop over + * the list twice (once to see if the value can be inserted and once + * to do the actual insert), so we assume this value can be inserted + * and convert the ziplist to a regular list if necessary. */ + listTypeTryConversion(subject,val); + + /* Seek refval from head to tail */ + iter = listTypeInitIterator(subject,0,REDIS_TAIL); + while (listTypeNext(iter,&entry)) { + if (listTypeEqual(&entry,refval)) { + listTypeInsert(&entry,val,where); + inserted = 1; + break; + } + } + listTypeReleaseIterator(iter); - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_LIST)) return; + if (inserted) { + /* Check if the length exceeds the ziplist length threshold. */ + if (subject->encoding == REDIS_ENCODING_ZIPLIST && + ziplistLen(subject->ptr) > server.list_max_ziplist_entries) + listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); + server.dirty++; + } else { + /* Notify client of a failed insert */ + addReply(c,shared.cnegone); + return; + } + } else { + listTypePush(subject,val,where); + server.dirty++; + } - l = o->ptr; - addReplyUlong(c,listLength(l)); + addReplyUlong(c,listTypeLength(subject)); } -static void lindexCommand(redisClient *c) { - robj *o; - int index = atoi(c->argv[2]->ptr); - list *list; - listNode *ln; +static void lpushxCommand(redisClient *c) { + pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD); +} - if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; +static void rpushxCommand(redisClient *c) { + pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL); +} - ln = listIndex(list, index); - if (ln == NULL) { - addReply(c,shared.nullbulk); +static void linsertCommand(redisClient *c) { + if (strcasecmp(c->argv[2]->ptr,"after") == 0) { + pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL); + } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) { + pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD); } else { - robj *ele = listNodeValue(ln); - addReplyBulk(c,ele); + addReply(c,shared.syntaxerr); } } -static void lsetCommand(redisClient *c) { - robj *o; - int index = atoi(c->argv[2]->ptr); - list *list; - listNode *ln; +static void llenCommand(redisClient *c) { + robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; + addReplyUlong(c,listTypeLength(o)); +} - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; +static void lindexCommand(redisClient *c) { + robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; + int index = atoi(c->argv[2]->ptr); + robj *value = NULL; - ln = listIndex(list, index); - if (ln == NULL) { - addReply(c,shared.outofrangeerr); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p; + unsigned char *vstr; + unsigned int vlen; + long long vlong; + p = ziplistIndex(o->ptr,index); + if (ziplistGet(p,&vstr,&vlen,&vlong)) { + if (vstr) { + value = createStringObject((char*)vstr,vlen); + } else { + value = createStringObjectFromLongLong(vlong); + } + addReplyBulk(c,value); + decrRefCount(value); + } else { + addReply(c,shared.nullbulk); + } + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + listNode *ln = listIndex(o->ptr,index); + if (ln != NULL) { + value = listNodeValue(ln); + addReplyBulk(c,value); + } else { + addReply(c,shared.nullbulk); + } } else { - robj *ele = listNodeValue(ln); + redisPanic("Unknown list encoding"); + } +} - decrRefCount(ele); - listNodeValue(ln) = c->argv[3]; - incrRefCount(c->argv[3]); - addReply(c,shared.ok); - server.dirty++; +static void lsetCommand(redisClient *c) { + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; + int index = atoi(c->argv[2]->ptr); + robj *value = c->argv[3]; + + listTypeTryConversion(o,value); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p, *zl = o->ptr; + p = ziplistIndex(zl,index); + if (p == NULL) { + addReply(c,shared.outofrangeerr); + } else { + o->ptr = ziplistDelete(o->ptr,&p); + value = getDecodedObject(value); + o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr)); + decrRefCount(value); + addReply(c,shared.ok); + server.dirty++; + } + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + listNode *ln = listIndex(o->ptr,index); + if (ln == NULL) { + addReply(c,shared.outofrangeerr); + } else { + decrRefCount((robj*)listNodeValue(ln)); + listNodeValue(ln) = value; + incrRefCount(value); + addReply(c,shared.ok); + server.dirty++; + } + } else { + redisPanic("Unknown list encoding"); } } static void popGenericCommand(redisClient *c, int where) { - robj *o; - list *list; - listNode *ln; + robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk); + if (o == NULL || checkType(c,o,REDIS_LIST)) return; - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - - if (where == REDIS_HEAD) - ln = listFirst(list); - else - ln = listLast(list); - - if (ln == NULL) { + robj *value = listTypePop(o,where); + if (value == NULL) { addReply(c,shared.nullbulk); } else { - robj *ele = listNodeValue(ln); - addReplyBulk(c,ele); - listDelNode(list,ln); - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); + addReplyBulk(c,value); + decrRefCount(value); + if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; } } @@ -4842,19 +5389,16 @@ static void rpopCommand(redisClient *c) { } static void lrangeCommand(redisClient *c) { - robj *o; + robj *o, *value; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); int llen; int rangelen, j; - list *list; - listNode *ln; - robj *ele; + listTypeEntry entry; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - llen = listLength(list); + llen = listTypeLength(o); /* convert negative indexes */ if (start < 0) start = llen+start; @@ -4872,13 +5416,15 @@ static void lrangeCommand(redisClient *c) { rangelen = (end-start)+1; /* Return the result in form of a multi-bulk reply */ - ln = listIndex(list, start); addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen)); + listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL); for (j = 0; j < rangelen; j++) { - ele = listNodeValue(ln); - addReplyBulk(c,ele); - ln = ln->next; + redisAssert(listTypeNext(li,&entry)); + value = listTypeGet(&entry); + addReplyBulk(c,value); + decrRefCount(value); } + listTypeReleaseIterator(li); } static void ltrimCommand(redisClient *c) { @@ -4892,8 +5438,7 @@ static void ltrimCommand(redisClient *c) { if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL || checkType(c,o,REDIS_LIST)) return; - list = o->ptr; - llen = listLength(list); + llen = listTypeLength(o); /* convert negative indexes */ if (start < 0) start = llen+start; @@ -4913,49 +5458,63 @@ static void ltrimCommand(redisClient *c) { } /* Remove list elements to perform the trim */ - for (j = 0; j < ltrim; j++) { - ln = listFirst(list); - listDelNode(list,ln); - } - for (j = 0; j < rtrim; j++) { - ln = listLast(list); - listDelNode(list,ln); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + o->ptr = ziplistDeleteRange(o->ptr,0,ltrim); + o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim); + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + list = o->ptr; + for (j = 0; j < ltrim; j++) { + ln = listFirst(list); + listDelNode(list,ln); + } + for (j = 0; j < rtrim; j++) { + ln = listLast(list); + listDelNode(list,ln); + } + } else { + redisPanic("Unknown list encoding"); } - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); + if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; addReply(c,shared.ok); } static void lremCommand(redisClient *c) { - robj *o; - list *list; - listNode *ln, *next; + robj *subject, *obj = c->argv[3]; int toremove = atoi(c->argv[2]->ptr); int removed = 0; - int fromtail = 0; + listTypeEntry entry; - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || - checkType(c,o,REDIS_LIST)) return; - list = o->ptr; + subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero); + if (subject == NULL || checkType(c,subject,REDIS_LIST)) return; + + /* Make sure obj is raw when we're dealing with a ziplist */ + if (subject->encoding == REDIS_ENCODING_ZIPLIST) + obj = getDecodedObject(obj); + listTypeIterator *li; if (toremove < 0) { toremove = -toremove; - fromtail = 1; + li = listTypeInitIterator(subject,-1,REDIS_HEAD); + } else { + li = listTypeInitIterator(subject,0,REDIS_TAIL); } - ln = fromtail ? list->tail : list->head; - while (ln) { - robj *ele = listNodeValue(ln); - next = fromtail ? ln->prev : ln->next; - if (equalStringObjects(ele,c->argv[3])) { - listDelNode(list,ln); + while (listTypeNext(li,&entry)) { + if (listTypeEqual(&entry,obj)) { + listTypeDelete(&entry); server.dirty++; removed++; if (toremove && removed == toremove) break; } - ln = next; } - if (listLength(list) == 0) deleteKey(c->db,c->argv[1]); + listTypeReleaseIterator(li); + + /* Clean up raw encoded object */ + if (subject->encoding == REDIS_ENCODING_ZIPLIST) + decrRefCount(obj); + + if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]); addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed)); } @@ -4975,47 +5534,36 @@ static void lremCommand(redisClient *c) { * as well. This command was originally proposed by Ezra Zygmuntowicz. */ static void rpoplpushcommand(redisClient *c) { - robj *sobj; - list *srclist; - listNode *ln; - + robj *sobj, *value; if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,sobj,REDIS_LIST)) return; - srclist = sobj->ptr; - ln = listLast(srclist); - if (ln == NULL) { + if (listTypeLength(sobj) == 0) { addReply(c,shared.nullbulk); } else { robj *dobj = lookupKeyWrite(c->db,c->argv[2]); - robj *ele = listNodeValue(ln); - list *dstlist; - - if (dobj && dobj->type != REDIS_LIST) { - addReply(c,shared.wrongtypeerr); - return; - } + if (dobj && checkType(c,dobj,REDIS_LIST)) return; + value = listTypePop(sobj,REDIS_TAIL); /* Add the element to the target list (unless it's directly * passed to some BLPOP-ing client */ - if (!handleClientsWaitingListPush(c,c->argv[2],ele)) { - if (dobj == NULL) { - /* Create the list if the key does not exist */ - dobj = createListObject(); - dictAdd(c->db->dict,c->argv[2],dobj); - incrRefCount(c->argv[2]); + if (!handleClientsWaitingListPush(c,c->argv[2],value)) { + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(c->db,c->argv[2],dobj); } - dstlist = dobj->ptr; - listAddNodeHead(dstlist,ele); - incrRefCount(ele); + listTypePush(dobj,value,REDIS_HEAD); } /* Send the element to the client as reply as well */ - addReplyBulk(c,ele); + addReplyBulk(c,value); + + /* listTypePop returns an object with its refcount incremented */ + decrRefCount(value); - /* Finally remove the element from the source list */ - listDelNode(srclist,ln); - if (listLength(srclist) == 0) deleteKey(c->db,c->argv[1]); + /* Delete the source list when it is empty */ + if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; } } @@ -5028,8 +5576,7 @@ static void saddCommand(redisClient *c) { set = lookupKeyWrite(c->db,c->argv[1]); if (set == NULL) { set = createSetObject(); - dictAdd(c->db->dict,c->argv[1],set); - incrRefCount(c->argv[1]); + dbAdd(c->db,c->argv[1],set); } else { if (set->type != REDIS_SET) { addReply(c,shared.wrongtypeerr); @@ -5054,7 +5601,7 @@ static void sremCommand(redisClient *c) { if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) { server.dirty++; if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]); addReply(c,shared.cone); } else { addReply(c,shared.czero); @@ -5085,13 +5632,12 @@ static void smoveCommand(redisClient *c) { return; } if (dictSize((dict*)srcset->ptr) == 0 && srcset != dstset) - deleteKey(c->db,c->argv[1]); + dbDelete(c->db,c->argv[1]); server.dirty++; /* Add the element to the destination set */ if (!dstset) { dstset = createSetObject(); - dictAdd(c->db->dict,c->argv[2],dstset); - incrRefCount(c->argv[2]); + dbAdd(c->db,c->argv[2],dstset); } if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK) incrRefCount(c->argv[3]); @@ -5137,7 +5683,7 @@ static void spopCommand(redisClient *c) { addReplyBulk(c,ele); dictDelete(set->ptr,ele); if (htNeedsResize(set->ptr)) dictResize(set->ptr); - if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; } } @@ -5181,7 +5727,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long if (!setobj) { zfree(dv); if (dstkey) { - if (deleteKey(c->db,dstkey)) + if (dbDelete(c->db,dstkey)) server.dirty++; addReply(c,shared.czero); } else { @@ -5241,10 +5787,9 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long if (dstkey) { /* Store the resulting set into the target, if the intersection * is not an empty set. */ - deleteKey(c->db,dstkey); + dbDelete(c->db,dstkey); if (dictSize((dict*)dstset->ptr) > 0) { - dictAdd(c->db->dict,dstkey,dstset); - incrRefCount(dstkey); + dbAdd(c->db,dstkey,dstset); addReplyLongLong(c,dictSize((dict*)dstset->ptr)); } else { decrRefCount(dstset); @@ -5344,10 +5889,9 @@ static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnu } else { /* If we have a target key where to store the resulting set * create this key with the result set inside */ - deleteKey(c->db,dstkey); + dbDelete(c->db,dstkey); if (dictSize((dict*)dstset->ptr) > 0) { - dictAdd(c->db->dict,dstkey,dstset); - incrRefCount(dstkey); + dbAdd(c->db,dstkey,dstset); addReplyLongLong(c,dictSize((dict*)dstset->ptr)); } else { decrRefCount(dstset); @@ -5397,8 +5941,10 @@ static zskiplistNode *zslCreateNode(int level, double score, robj *obj) { zskiplistNode *zn = zmalloc(sizeof(*zn)); zn->forward = zmalloc(sizeof(zskiplistNode*) * level); - if (level > 0) + if (level > 1) zn->span = zmalloc(sizeof(unsigned int) * (level - 1)); + else + zn->span = NULL; zn->score = score; zn->obj = obj; return zn; @@ -5640,7 +6186,7 @@ static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) { * Returns 0 when the element cannot be found, rank otherwise. * Note that the rank is 1-based due to the span of zsl->header to the * first element. */ -static unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { +static unsigned long zslistTypeGetRank(zskiplist *zsl, double score, robj *o) { zskiplistNode *x; unsigned long rank = 0; int i; @@ -5664,7 +6210,7 @@ static unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { } /* Finds an element by its rank. The rank argument needs to be 1-based. */ -zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { +zskiplistNode* zslistTypeGetElementByRank(zskiplist *zsl, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0; int i; @@ -5693,11 +6239,15 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor zset *zs; double *score; + if (isnan(scoreval)) { + addReplySds(c,sdsnew("-ERR provide score is Not A Number (nan)\r\n")); + return; + } + zsetobj = lookupKeyWrite(c->db,key); if (zsetobj == NULL) { zsetobj = createZsetObject(); - dictAdd(c->db->dict,key,zsetobj); - incrRefCount(key); + dbAdd(c->db,key,zsetobj); } else { if (zsetobj->type != REDIS_ZSET) { addReply(c,shared.wrongtypeerr); @@ -5721,6 +6271,15 @@ static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scor } else { *score = scoreval; } + if (isnan(*score)) { + addReplySds(c, + sdsnew("-ERR resulting score is Not A Number (nan)\r\n")); + zfree(score); + /* Note that we don't need to check if the zset may be empty and + * should be removed here, as we can only obtain Nan as score if + * there was already an element in the sorted set. */ + return; + } } else { *score = scoreval; } @@ -5804,7 +6363,7 @@ static void zremCommand(redisClient *c) { /* Delete from the hash table */ dictDelete(zs->dict,c->argv[2]); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty++; addReply(c,shared.cone); } @@ -5825,7 +6384,7 @@ static void zremrangebyscoreCommand(redisClient *c) { zs = zsetobj->ptr; deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty += deleted; addReplyLongLong(c,deleted); } @@ -5863,7 +6422,7 @@ static void zremrangebyrankCommand(redisClient *c) { * use 1-based rank */ deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict); if (htNeedsResize(zs->dict)) dictResize(zs->dict); - if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); + if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]); server.dirty += deleted; addReplyLongLong(c, deleted); } @@ -5884,6 +6443,7 @@ static int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) { #define REDIS_AGGR_SUM 1 #define REDIS_AGGR_MIN 2 #define REDIS_AGGR_MAX 3 +#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e)) inline static void zunionInterAggregate(double *target, double val, int aggregate) { if (aggregate == REDIS_AGGR_SUM) { @@ -5899,7 +6459,7 @@ inline static void zunionInterAggregate(double *target, double val, int aggregat } static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { - int i, j, zsetnum; + int i, j, setnum; int aggregate = REDIS_AGGR_SUM; zsetopsrc *src; robj *dstobj; @@ -5907,32 +6467,35 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictIterator *di; dictEntry *de; - /* expect zsetnum input keys to be given */ - zsetnum = atoi(c->argv[2]->ptr); - if (zsetnum < 1) { + /* expect setnum input keys to be given */ + setnum = atoi(c->argv[2]->ptr); + if (setnum < 1) { addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n")); return; } /* test if the expected number of keys would overflow */ - if (3+zsetnum > c->argc) { + if (3+setnum > c->argc) { addReply(c,shared.syntaxerr); return; } /* read keys to be used for input */ - src = zmalloc(sizeof(zsetopsrc) * zsetnum); - for (i = 0, j = 3; i < zsetnum; i++, j++) { - robj *zsetobj = lookupKeyWrite(c->db,c->argv[j]); - if (!zsetobj) { + src = zmalloc(sizeof(zsetopsrc) * setnum); + for (i = 0, j = 3; i < setnum; i++, j++) { + robj *obj = lookupKeyWrite(c->db,c->argv[j]); + if (!obj) { src[i].dict = NULL; } else { - if (zsetobj->type != REDIS_ZSET) { + if (obj->type == REDIS_ZSET) { + src[i].dict = ((zset*)obj->ptr)->dict; + } else if (obj->type == REDIS_SET) { + src[i].dict = (obj->ptr); + } else { zfree(src); addReply(c,shared.wrongtypeerr); return; } - src[i].dict = ((zset*)zsetobj->ptr)->dict; } /* default all weights to 1 */ @@ -5944,9 +6507,9 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { int remaining = c->argc - j; while (remaining) { - if (remaining >= (zsetnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { + if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { j++; remaining--; - for (i = 0; i < zsetnum; i++, j++, remaining--) { + for (i = 0; i < setnum; i++, j++, remaining--) { if (getDoubleFromObjectOrReply(c, c->argv[j], &src[i].weight, NULL) != REDIS_OK) return; } @@ -5974,7 +6537,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { /* sort sets from the smallest to largest, this will improve our * algorithm's performance */ - qsort(src,zsetnum,sizeof(zsetopsrc), qsortCompareZsetopsrcByCardinality); + qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality); dstobj = createZsetObject(); dstzset = dstobj->ptr; @@ -5987,12 +6550,12 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { di = dictGetIterator(src[0].dict); while((de = dictNext(di)) != NULL) { double *score = zmalloc(sizeof(double)), value; - *score = src[0].weight * (*(double*)dictGetEntryVal(de)); + *score = src[0].weight * zunionInterDictValue(de); - for (j = 1; j < zsetnum; j++) { + for (j = 1; j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { - value = src[j].weight * (*(double*)dictGetEntryVal(other)); + value = src[j].weight * zunionInterDictValue(other); zunionInterAggregate(score, value, aggregate); } else { break; @@ -6000,7 +6563,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { } /* skip entry when not present in every source dict */ - if (j != zsetnum) { + if (j != setnum) { zfree(score); } else { robj *o = dictGetEntryKey(de); @@ -6013,7 +6576,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictReleaseIterator(di); } } else if (op == REDIS_OP_UNION) { - for (i = 0; i < zsetnum; i++) { + for (i = 0; i < setnum; i++) { if (!src[i].dict) continue; di = dictGetIterator(src[i].dict); @@ -6022,14 +6585,14 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue; double *score = zmalloc(sizeof(double)), value; - *score = src[i].weight * (*(double*)dictGetEntryVal(de)); + *score = src[i].weight * zunionInterDictValue(de); /* because the zsets are sorted by size, its only possible * for sets at larger indices to hold this entry */ - for (j = (i+1); j < zsetnum; j++) { + for (j = (i+1); j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { - value = src[j].weight * (*(double*)dictGetEntryVal(other)); + value = src[j].weight * zunionInterDictValue(other); zunionInterAggregate(score, value, aggregate); } } @@ -6047,10 +6610,9 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION); } - deleteKey(c->db,dstkey); + dbDelete(c->db,dstkey); if (dstzset->zsl->length) { - dictAdd(c->db->dict,dstkey,dstobj); - incrRefCount(dstkey); + dbAdd(c->db,dstkey,dstobj); addReplyLongLong(c, dstzset->zsl->length); server.dirty++; } else { @@ -6114,10 +6676,10 @@ static void zrangeGenericCommand(redisClient *c, int reverse) { /* check if starting point is trivial, before searching * the element in log(N) time */ if (reverse) { - ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start); + ln = start == 0 ? zsl->tail : zslistTypeGetElementByRank(zsl, llen-start); } else { ln = start == 0 ? - zsl->header->forward[0] : zslGetElementByRank(zsl, start+1); + zsl->header->forward[0] : zslistTypeGetElementByRank(zsl, start+1); } /* Return the result in form of a multi-bulk reply */ @@ -6314,7 +6876,7 @@ static void zrankGenericCommand(redisClient *c, int reverse) { } score = dictGetEntryVal(de); - rank = zslGetRank(zsl, *score, c->argv[2]); + rank = zslistTypeGetRank(zsl, *score, c->argv[2]); if (rank) { if (reverse) { addReplyLongLong(c, zsl->length - rank); @@ -6341,7 +6903,7 @@ static void zrevrankCommand(redisClient *c) { /* Check the length of a number of objects to see if we need to convert a * zipmap to a real hash. Note that we only check string encoded objects * as their string length can be queried in constant time. */ -static void hashTryConversion(robj *subject, robj **argv, int start, int end) { +static void hashTypeTryConversion(robj *subject, robj **argv, int start, int end) { int i; if (subject->encoding != REDIS_ENCODING_ZIPMAP) return; @@ -6356,7 +6918,7 @@ static void hashTryConversion(robj *subject, robj **argv, int start, int end) { } /* Encode given objects in-place when the hash uses a dict. */ -static void hashTryObjectEncoding(robj *subject, robj **o1, robj **o2) { +static void hashTypeTryObjectEncoding(robj *subject, robj **o1, robj **o2) { if (subject->encoding == REDIS_ENCODING_HT) { if (o1) *o1 = tryObjectEncoding(*o1); if (o2) *o2 = tryObjectEncoding(*o2); @@ -6366,7 +6928,7 @@ static void hashTryObjectEncoding(robj *subject, robj **o1, robj **o2) { /* Get the value from a hash identified by key. Returns either a string * object or NULL if the value cannot be found. The refcount of the object * is always increased by 1 when the value was found. */ -static robj *hashGet(robj *o, robj *key) { +static robj *hashTypeGet(robj *o, robj *key) { robj *value = NULL; if (o->encoding == REDIS_ENCODING_ZIPMAP) { unsigned char *v; @@ -6388,7 +6950,7 @@ static robj *hashGet(robj *o, robj *key) { /* Test if the key exists in the given hash. Returns 1 if the key * exists and 0 when it doesn't. */ -static int hashExists(robj *o, robj *key) { +static int hashTypeExists(robj *o, robj *key) { if (o->encoding == REDIS_ENCODING_ZIPMAP) { key = getDecodedObject(key); if (zipmapExists(o->ptr,key->ptr,sdslen(key->ptr))) { @@ -6406,7 +6968,7 @@ static int hashExists(robj *o, robj *key) { /* Add an element, discard the old if the key already exists. * Return 0 on insert and 1 on update. */ -static int hashSet(robj *o, robj *key, robj *value) { +static int hashTypeSet(robj *o, robj *key, robj *value) { int update = 0; if (o->encoding == REDIS_ENCODING_ZIPMAP) { key = getDecodedObject(key); @@ -6435,7 +6997,7 @@ static int hashSet(robj *o, robj *key, robj *value) { /* Delete an element from a hash. * Return 1 on deleted and 0 on not found. */ -static int hashDelete(robj *o, robj *key) { +static int hashTypeDelete(robj *o, robj *key) { int deleted = 0; if (o->encoding == REDIS_ENCODING_ZIPMAP) { key = getDecodedObject(key); @@ -6450,7 +7012,7 @@ static int hashDelete(robj *o, robj *key) { } /* Return the number of elements in a hash. */ -static unsigned long hashLength(robj *o) { +static unsigned long hashTypeLength(robj *o) { return (o->encoding == REDIS_ENCODING_ZIPMAP) ? zipmapLen((unsigned char*)o->ptr) : dictSize((dict*)o->ptr); } @@ -6467,10 +7029,10 @@ typedef struct { dictIterator *di; dictEntry *de; -} hashIterator; +} hashTypeIterator; -static hashIterator *hashInitIterator(robj *subject) { - hashIterator *hi = zmalloc(sizeof(hashIterator)); +static hashTypeIterator *hashTypeInitIterator(robj *subject) { + hashTypeIterator *hi = zmalloc(sizeof(hashTypeIterator)); hi->encoding = subject->encoding; if (hi->encoding == REDIS_ENCODING_ZIPMAP) { hi->zi = zipmapRewind(subject->ptr); @@ -6482,7 +7044,7 @@ static hashIterator *hashInitIterator(robj *subject) { return hi; } -static void hashReleaseIterator(hashIterator *hi) { +static void hashTypeReleaseIterator(hashTypeIterator *hi) { if (hi->encoding == REDIS_ENCODING_HT) { dictReleaseIterator(hi->di); } @@ -6491,7 +7053,7 @@ static void hashReleaseIterator(hashIterator *hi) { /* Move to the next entry in the hash. Return REDIS_OK when the next entry * could be found and REDIS_ERR when the iterator reaches the end. */ -static int hashNext(hashIterator *hi) { +static int hashTypeNext(hashTypeIterator *hi) { if (hi->encoding == REDIS_ENCODING_ZIPMAP) { if ((hi->zi = zipmapNext(hi->zi, &hi->zk, &hi->zklen, &hi->zv, &hi->zvlen)) == NULL) return REDIS_ERR; @@ -6503,7 +7065,7 @@ static int hashNext(hashIterator *hi) { /* Get key or value object at current iteration position. * This increases the refcount of the field object by 1. */ -static robj *hashCurrent(hashIterator *hi, int what) { +static robj *hashTypeCurrent(hashTypeIterator *hi, int what) { robj *o; if (hi->encoding == REDIS_ENCODING_ZIPMAP) { if (what & REDIS_HASH_KEY) { @@ -6522,12 +7084,11 @@ static robj *hashCurrent(hashIterator *hi, int what) { return o; } -static robj *hashLookupWriteOrCreate(redisClient *c, robj *key) { +static robj *hashTypeLookupWriteOrCreate(redisClient *c, robj *key) { robj *o = lookupKeyWrite(c->db,key); if (o == NULL) { o = createHashObject(); - dictAdd(c->db->dict,key,o); - incrRefCount(key); + dbAdd(c->db,key,o); } else { if (o->type != REDIS_HASH) { addReply(c,shared.wrongtypeerr); @@ -6542,24 +7103,24 @@ static void hsetCommand(redisClient *c) { int update; robj *o; - if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - hashTryConversion(o,c->argv,2,3); - hashTryObjectEncoding(o,&c->argv[2], &c->argv[3]); - update = hashSet(o,c->argv[2],c->argv[3]); + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + hashTypeTryConversion(o,c->argv,2,3); + hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]); + update = hashTypeSet(o,c->argv[2],c->argv[3]); addReply(c, update ? shared.czero : shared.cone); server.dirty++; } static void hsetnxCommand(redisClient *c) { robj *o; - if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - hashTryConversion(o,c->argv,2,3); + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + hashTypeTryConversion(o,c->argv,2,3); - if (hashExists(o, c->argv[2])) { + if (hashTypeExists(o, c->argv[2])) { addReply(c, shared.czero); } else { - hashTryObjectEncoding(o,&c->argv[2], &c->argv[3]); - hashSet(o,c->argv[2],c->argv[3]); + hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]); + hashTypeSet(o,c->argv[2],c->argv[3]); addReply(c, shared.cone); server.dirty++; } @@ -6574,11 +7135,11 @@ static void hmsetCommand(redisClient *c) { return; } - if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - hashTryConversion(o,c->argv,2,c->argc-1); + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + hashTypeTryConversion(o,c->argv,2,c->argc-1); for (i = 2; i < c->argc; i += 2) { - hashTryObjectEncoding(o,&c->argv[i], &c->argv[i+1]); - hashSet(o,c->argv[i],c->argv[i+1]); + hashTypeTryObjectEncoding(o,&c->argv[i], &c->argv[i+1]); + hashTypeSet(o,c->argv[i],c->argv[i+1]); } addReply(c, shared.ok); server.dirty++; @@ -6589,8 +7150,8 @@ static void hincrbyCommand(redisClient *c) { robj *o, *current, *new; if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return; - if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; - if ((current = hashGet(o,c->argv[2])) != NULL) { + if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + if ((current = hashTypeGet(o,c->argv[2])) != NULL) { if (getLongLongFromObjectOrReply(c,current,&value, "hash value is not an integer") != REDIS_OK) { decrRefCount(current); @@ -6603,8 +7164,8 @@ static void hincrbyCommand(redisClient *c) { value += incr; new = createStringObjectFromLongLong(value); - hashTryObjectEncoding(o,&c->argv[2],NULL); - hashSet(o,c->argv[2],new); + hashTypeTryObjectEncoding(o,&c->argv[2],NULL); + hashTypeSet(o,c->argv[2],new); decrRefCount(new); addReplyLongLong(c,value); server.dirty++; @@ -6615,7 +7176,7 @@ static void hgetCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,o,REDIS_HASH)) return; - if ((value = hashGet(o,c->argv[2])) != NULL) { + if ((value = hashTypeGet(o,c->argv[2])) != NULL) { addReplyBulk(c,value); decrRefCount(value); } else { @@ -6636,7 +7197,7 @@ static void hmgetCommand(redisClient *c) { * an empty hash. The reply should then be a series of NULLs. */ addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-2)); for (i = 2; i < c->argc; i++) { - if (o != NULL && (value = hashGet(o,c->argv[i])) != NULL) { + if (o != NULL && (value = hashTypeGet(o,c->argv[i])) != NULL) { addReplyBulk(c,value); decrRefCount(value); } else { @@ -6650,8 +7211,8 @@ static void hdelCommand(redisClient *c) { if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_HASH)) return; - if (hashDelete(o,c->argv[2])) { - if (hashLength(o) == 0) deleteKey(c->db,c->argv[1]); + if (hashTypeDelete(o,c->argv[2])) { + if (hashTypeLength(o) == 0) dbDelete(c->db,c->argv[1]); addReply(c,shared.cone); server.dirty++; } else { @@ -6664,13 +7225,13 @@ static void hlenCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_HASH)) return; - addReplyUlong(c,hashLength(o)); + addReplyUlong(c,hashTypeLength(o)); } static void genericHgetallCommand(redisClient *c, int flags) { robj *o, *lenobj, *obj; unsigned long count = 0; - hashIterator *hi; + hashTypeIterator *hi; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,REDIS_HASH)) return; @@ -6679,22 +7240,22 @@ static void genericHgetallCommand(redisClient *c, int flags) { addReply(c,lenobj); decrRefCount(lenobj); - hi = hashInitIterator(o); - while (hashNext(hi) != REDIS_ERR) { + hi = hashTypeInitIterator(o); + while (hashTypeNext(hi) != REDIS_ERR) { if (flags & REDIS_HASH_KEY) { - obj = hashCurrent(hi,REDIS_HASH_KEY); + obj = hashTypeCurrent(hi,REDIS_HASH_KEY); addReplyBulk(c,obj); decrRefCount(obj); count++; } if (flags & REDIS_HASH_VALUE) { - obj = hashCurrent(hi,REDIS_HASH_VALUE); + obj = hashTypeCurrent(hi,REDIS_HASH_VALUE); addReplyBulk(c,obj); decrRefCount(obj); count++; } } - hashReleaseIterator(hi); + hashTypeReleaseIterator(hi); lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",count); } @@ -6716,7 +7277,7 @@ static void hexistsCommand(redisClient *c) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,o,REDIS_HASH)) return; - addReply(c, hashExists(o,c->argv[2]) ? shared.cone : shared.czero); + addReply(c, hashTypeExists(o,c->argv[2]) ? shared.cone : shared.czero); } static void convertToRealHash(robj *o) { @@ -6744,12 +7305,14 @@ static void convertToRealHash(robj *o) { static void flushdbCommand(redisClient *c) { server.dirty += dictSize(c->db->dict); + touchWatchedKeysOnFlush(c->db->id); dictEmpty(c->db->dict); dictEmpty(c->db->expires); addReply(c,shared.ok); } static void flushallCommand(redisClient *c) { + touchWatchedKeysOnFlush(-1); server.dirty += emptyDb(); addReply(c,shared.ok); if (server.bgsavechildpid != -1) { @@ -6835,7 +7398,7 @@ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) { /* Retrieve value from hash by the field name. This operation * already increases the refcount of the returned object. */ initStaticStringObject(fieldobj,((char*)&fieldname)+(sizeof(long)*2)); - o = hashGet(o, &fieldobj); + o = hashTypeGet(o, &fieldobj); } else { if (o->type != REDIS_STRING) return NULL; @@ -6890,7 +7453,7 @@ static int sortCompare(const void *s1, const void *s2) { * is optimized for speed and a bit less for readability */ static void sortCommand(redisClient *c) { list *operations; - int outputlen = 0; + unsigned int outputlen = 0; int desc = 0, alpha = 0; int limit_start = 0, limit_count = -1, start, end; int j, dontsort = 0, vectorlen; @@ -6960,7 +7523,7 @@ static void sortCommand(redisClient *c) { /* Load the sorting vector with all the objects to sort */ switch(sortval->type) { - case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break; + case REDIS_LIST: vectorlen = listTypeLength(sortval); break; case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break; case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break; default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */ @@ -6969,18 +7532,15 @@ static void sortCommand(redisClient *c) { j = 0; if (sortval->type == REDIS_LIST) { - list *list = sortval->ptr; - listNode *ln; - listIter li; - - listRewind(list,&li); - while((ln = listNext(&li))) { - robj *ele = ln->value; - vector[j].obj = ele; + listTypeIterator *li = listTypeInitIterator(sortval,0,REDIS_TAIL); + listTypeEntry entry; + while(listTypeNext(li,&entry)) { + vector[j].obj = listTypeGet(&entry); vector[j].u.score = 0; vector[j].u.cmpobj = NULL; j++; } + listTypeReleaseIterator(li); } else { dict *set; dictIterator *di; @@ -7090,8 +7650,7 @@ static void sortCommand(redisClient *c) { } } } else { - robj *listObject = createListObject(); - list *listPtr = (list*) listObject->ptr; + robj *sobj = createZiplistObject(); /* STORE option specified, set the sorting result as a List object */ for (j = start; j <= end; j++) { @@ -7099,33 +7658,30 @@ static void sortCommand(redisClient *c) { listIter li; if (!getop) { - listAddNodeTail(listPtr,vector[j].obj); - incrRefCount(vector[j].obj); - } - listRewind(operations,&li); - while((ln = listNext(&li))) { - redisSortOperation *sop = ln->value; - robj *val = lookupKeyByPattern(c->db,sop->pattern, - vector[j].obj); + listTypePush(sobj,vector[j].obj,REDIS_TAIL); + } else { + listRewind(operations,&li); + while((ln = listNext(&li))) { + redisSortOperation *sop = ln->value; + robj *val = lookupKeyByPattern(c->db,sop->pattern, + vector[j].obj); - if (sop->type == REDIS_SORT_GET) { - if (!val) { - listAddNodeTail(listPtr,createStringObject("",0)); + if (sop->type == REDIS_SORT_GET) { + if (!val) val = createStringObject("",0); + + /* listTypePush does an incrRefCount, so we should take care + * care of the incremented refcount caused by either + * lookupKeyByPattern or createStringObject("",0) */ + listTypePush(sobj,val,REDIS_TAIL); + decrRefCount(val); } else { - /* We should do a incrRefCount on val because it is - * added to the list, but also a decrRefCount because - * it is returned by lookupKeyByPattern. This results - * in doing nothing at all. */ - listAddNodeTail(listPtr,val); + /* always fails */ + redisAssert(sop->type == REDIS_SORT_GET); } - } else { - redisAssert(sop->type == REDIS_SORT_GET); /* always fails */ } } } - if (dictReplace(c->db->dict,storekey,listObject)) { - incrRefCount(storekey); - } + dbReplace(c->db,storekey,sobj); /* Note: we add 1 because the DB is dirty anyway since even if the * SORT result is empty a new key is set and maybe the old content * replaced. */ @@ -7134,6 +7690,9 @@ static void sortCommand(redisClient *c) { } /* Cleanup */ + if (sortval->type == REDIS_LIST) + for (j = 0; j < vectorlen; j++) + decrRefCount(vector[j].obj); decrRefCount(sortval); listRelease(operations); for (j = 0; j < vectorlen; j++) { @@ -7202,8 +7761,8 @@ static sds genRedisInfoString(void) { "vm_enabled:%d\r\n" "role:%s\r\n" ,REDIS_VERSION, - REDIS_GIT_SHA1, - REDIS_GIT_DIRTY > 0, + redisGitSHA1(), + strtol(redisGitDirty(),NULL,10) > 0, (sizeof(long) == 8) ? "64" : "32", aeGetApiName(), (long) getpid(), @@ -7304,7 +7863,10 @@ static void monitorCommand(redisClient *c) { /* ================================= Expire ================================= */ static int removeExpire(redisDb *db, robj *key) { - if (dictDelete(db->expires,key) == DICT_OK) { + /* An expire may only be removed if there is a corresponding entry in the + * main dict. Otherwise, the key will never be freed. */ + redisAssert(dictFind(db->dict,key->ptr) != NULL); + if (dictDelete(db->expires,key->ptr) == DICT_OK) { return 1; } else { return 0; @@ -7312,10 +7874,13 @@ static int removeExpire(redisDb *db, robj *key) { } static int setExpire(redisDb *db, robj *key, time_t when) { - if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) { + dictEntry *de; + + /* Reuse the sds from the main dict in the expire dict */ + redisAssert((de = dictFind(db->dict,key->ptr)) != NULL); + if (dictAdd(db->expires,dictGetEntryKey(de),(void*)when) == DICT_ERR) { return 0; } else { - incrRefCount(key); return 1; } } @@ -7327,41 +7892,34 @@ static time_t getExpire(redisDb *db, robj *key) { /* No expire? return ASAP */ if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return -1; + (de = dictFind(db->expires,key->ptr)) == NULL) return -1; + /* The entry was found in the expire dict, this means it should also + * be present in the main dict (safety check). */ + redisAssert(dictFind(db->dict,key->ptr) != NULL); return (time_t) dictGetEntryVal(de); } static int expireIfNeeded(redisDb *db, robj *key) { - time_t when; - dictEntry *de; - - /* No expire? return ASAP */ - if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return 0; + time_t when = getExpire(db,key); + if (when < 0) return 0; - /* Lookup the expire */ - when = (time_t) dictGetEntryVal(de); + /* Return when this key has not expired */ if (time(NULL) <= when) return 0; /* Delete the key */ - dictDelete(db->expires,key); server.stat_expiredkeys++; - return dictDelete(db->dict,key) == DICT_OK; + server.dirty++; + return dbDelete(db,key); } static int deleteIfVolatile(redisDb *db, robj *key) { - dictEntry *de; - - /* No expire? return ASAP */ - if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,key)) == NULL) return 0; + if (getExpire(db,key) < 0) return 0; /* Delete the key */ - server.dirty++; server.stat_expiredkeys++; - dictDelete(db->expires,key); - return dictDelete(db->dict,key) == DICT_OK; + server.dirty++; + return dbDelete(db,key); } static void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) { @@ -7372,13 +7930,13 @@ static void expireGenericCommand(redisClient *c, robj *key, robj *param, long of seconds -= offset; - de = dictFind(c->db->dict,key); + de = dictFind(c->db->dict,key->ptr); if (de == NULL) { addReply(c,shared.czero); return; } if (seconds <= 0) { - if (deleteKey(c->db,key)) server.dirty++; + if (dbDelete(c->db,key)) server.dirty++; addReply(c, shared.cone); return; } else { @@ -7454,6 +8012,10 @@ static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) { } static void multiCommand(redisClient *c) { + if (c->flags & REDIS_MULTI) { + addReplySds(c,sdsnew("-ERR MULTI calls can not be nested\r\n")); + return; + } c->flags |= REDIS_MULTI; addReply(c,shared.ok); } @@ -7467,6 +8029,7 @@ static void discardCommand(redisClient *c) { freeClientMultiState(c); initClientMultiState(c); c->flags &= (~REDIS_MULTI); + unwatchAllKeys(c); addReply(c,shared.ok); } @@ -7494,6 +8057,17 @@ static void execCommand(redisClient *c) { return; } + /* Check if we need to abort the EXEC if some WATCHed key was touched. + * A failed EXEC will return a multi bulk nil object. */ + if (c->flags & REDIS_DIRTY_CAS) { + freeClientMultiState(c); + initClientMultiState(c); + c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); + unwatchAllKeys(c); + addReply(c,shared.nullmultibulk); + return; + } + /* Replicate a MULTI request now that we are sure the block is executed. * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency @@ -7501,6 +8075,7 @@ static void execCommand(redisClient *c) { execCommandReplicateMulti(c); /* Exec all the queued commands */ + unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; orig_argc = c->argc; addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count)); @@ -7513,7 +8088,7 @@ static void execCommand(redisClient *c) { c->argc = orig_argc; freeClientMultiState(c); initClientMultiState(c); - c->flags &= (~REDIS_MULTI); + c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); /* Make sure the EXEC command is always replicated / AOF, since we * always send the MULTI command (we can't know beforehand if the * next operations will contain at least a modification to the DB). */ @@ -7540,7 +8115,7 @@ static void execCommand(redisClient *c) { * empty we need to block. In order to do so we remove the notification for * new data to read in the client socket (so that we'll not serve new * requests if the blocking request is not served). Also we put the client - * in a dictionary (db->blockingkeys) mapping keys to a list of clients + * in a dictionary (db->blocking_keys) mapping keys to a list of clients * blocking for this keys. * - If a PUSH operation against a key with blocked clients waiting is * performed, we serve the first in the list: basically instead to push @@ -7558,22 +8133,22 @@ static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeou list *l; int j; - c->blockingkeys = zmalloc(sizeof(robj*)*numkeys); - c->blockingkeysnum = numkeys; + c->blocking_keys = zmalloc(sizeof(robj*)*numkeys); + c->blocking_keys_num = numkeys; c->blockingto = timeout; for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->blockingkeys[j] = keys[j]; + c->blocking_keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ - de = dictFind(c->db->blockingkeys,keys[j]); + de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); - retval = dictAdd(c->db->blockingkeys,keys[j],l); + retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); assert(retval == DICT_OK); } else { @@ -7592,22 +8167,22 @@ static void unblockClientWaitingData(redisClient *c) { list *l; int j; - assert(c->blockingkeys != NULL); + assert(c->blocking_keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->blockingkeysnum; j++) { + for (j = 0; j < c->blocking_keys_num; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blockingkeys,c->blockingkeys[j]); + de = dictFind(c->db->blocking_keys,c->blocking_keys[j]); assert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blockingkeys,c->blockingkeys[j]); - decrRefCount(c->blockingkeys[j]); + dictDelete(c->db->blocking_keys,c->blocking_keys[j]); + decrRefCount(c->blocking_keys[j]); } /* Cleanup the client structure */ - zfree(c->blockingkeys); - c->blockingkeys = NULL; + zfree(c->blocking_keys); + c->blocking_keys = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -7634,7 +8209,7 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { list *l; listNode *ln; - de = dictFind(c->db->blockingkeys,key); + de = dictFind(c->db->blocking_keys,key); if (de == NULL) return 0; l = dictGetEntryVal(de); ln = listFirst(l); @@ -8131,7 +8706,7 @@ static void freeMemoryIfNeeded(void) { minttl = t; } } - deleteKey(server.db+j,minkey); + dbDelete(server.db+j,minkey); } } if (!freed) return; /* nothing to free... */ @@ -8140,6 +8715,48 @@ static void freeMemoryIfNeeded(void) { /* ============================== Append Only file ========================== */ +/* Called when the user switches from "appendonly yes" to "appendonly no" + * at runtime using the CONFIG command. */ +static void stopAppendOnly(void) { + flushAppendOnlyFile(); + aof_fsync(server.appendfd); + close(server.appendfd); + + server.appendfd = -1; + server.appendseldb = -1; + server.appendonly = 0; + /* rewrite operation in progress? kill it, wait child exit */ + if (server.bgsavechildpid != -1) { + int statloc; + + if (kill(server.bgsavechildpid,SIGKILL) != -1) + wait3(&statloc,0,NULL); + /* reset the buffer accumulating changes while the child saves */ + sdsfree(server.bgrewritebuf); + server.bgrewritebuf = sdsempty(); + server.bgsavechildpid = -1; + } +} + +/* Called when the user switches from "appendonly no" to "appendonly yes" + * at runtime using the CONFIG command. */ +static int startAppendOnly(void) { + server.appendonly = 1; + server.lastfsync = time(NULL); + server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); + if (server.appendfd == -1) { + redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno)); + return REDIS_ERR; + } + if (rewriteAppendOnlyFileBackground() == REDIS_ERR) { + server.appendonly = 0; + close(server.appendfd); + redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno)); + return REDIS_ERR; + } + return REDIS_OK; +} + /* Write the append only file buffer on disk. * * Since we are required to write the AOF before replying to the client, @@ -8173,6 +8790,11 @@ static void flushAppendOnlyFile(void) { sdsfree(server.aofbuf); server.aofbuf = sdsempty(); + /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have + * childs performing heavy I/O on disk. */ + if (server.no_appendfsync_on_rewrite && + (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1)) + return; /* Fsync if needed */ now = time(NULL); if (server.appendfsync == APPENDFSYNC_ALWAYS || @@ -8299,7 +8921,6 @@ int loadAppendOnlyFile(char *filename) { struct redisClient *fakeClient; FILE *fp = fopen(filename,"r"); struct redis_stat sb; - unsigned long long loadedkeys = 0; int appendonly = server.appendonly; if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) @@ -8322,6 +8943,7 @@ int loadAppendOnlyFile(char *filename) { char buf[128]; sds argsds; struct redisCommand *cmd; + int force_swapout; if (fgets(buf,sizeof(buf),fp) == NULL) { if (feof(fp)) @@ -8362,8 +8984,11 @@ int loadAppendOnlyFile(char *filename) { for (j = 0; j < argc; j++) decrRefCount(argv[j]); zfree(argv); /* Handle swapping while loading big datasets when VM is on */ - loadedkeys++; - if (server.vm_enabled && (loadedkeys % 5000) == 0) { + force_swapout = 0; + if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) + force_swapout = 1; + + if (server.vm_enabled && force_swapout) { while (zmalloc_used_memory() > server.vm_max_memory) { if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } @@ -8391,40 +9016,17 @@ fmterr: exit(1); } -/* Write an object into a file in the bulk format $\r\n\r\n */ -static int fwriteBulkObject(FILE *fp, robj *obj) { - char buf[128]; - int decrrc = 0; - - /* Avoid the incr/decr ref count business if possible to help - * copy-on-write (we are often in a child process when this function - * is called). - * Also makes sure that key objects don't get incrRefCount-ed when VM - * is enabled */ - if (obj->encoding != REDIS_ENCODING_RAW) { - obj = getDecodedObject(obj); - decrrc = 1; - } - snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr)); - if (fwrite(buf,strlen(buf),1,fp) == 0) goto err; - if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) - goto err; - if (fwrite("\r\n",2,1,fp) == 0) goto err; - if (decrrc) decrRefCount(obj); - return 1; -err: - if (decrrc) decrRefCount(obj); - return 0; -} - /* Write binary-safe string into a file in the bulkformat * $\r\n\r\n */ static int fwriteBulkString(FILE *fp, char *s, unsigned long len) { - char buf[128]; - - snprintf(buf,sizeof(buf),"$%ld\r\n",(unsigned long)len); - if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; - if (len && fwrite(s,len,1,fp) == 0) return 0; + char cbuf[128]; + int clen; + cbuf[0] = '$'; + clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len); + cbuf[clen++] = '\r'; + cbuf[clen++] = '\n'; + if (fwrite(cbuf,clen,1,fp) == 0) return 0; + if (len > 0 && fwrite(s,len,1,fp) == 0) return 0; if (fwrite("\r\n",2,1,fp) == 0) return 0; return 1; } @@ -8441,16 +9043,28 @@ static int fwriteBulkDouble(FILE *fp, double d) { } /* Write a long value in bulk format $\r\n\r\n */ -static int fwriteBulkLong(FILE *fp, long l) { - char buf[128], lbuf[128]; - - snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l); - snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2); - if (fwrite(buf,strlen(buf),1,fp) == 0) return 0; - if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0; +static int fwriteBulkLongLong(FILE *fp, long long l) { + char bbuf[128], lbuf[128]; + unsigned int blen, llen; + llen = ll2string(lbuf,32,l); + blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf); + if (fwrite(bbuf,blen,1,fp) == 0) return 0; return 1; } +/* Delegate writing an object to writing a bulk string or bulk long long. */ +static int fwriteBulkObject(FILE *fp, robj *obj) { + /* Avoid using getDecodedObject to help copy-on-write (we are often + * in a child process when this function is called). */ + if (obj->encoding == REDIS_ENCODING_INT) { + return fwriteBulkLongLong(fp,(long)obj->ptr); + } else if (obj->encoding == REDIS_ENCODING_RAW) { + return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr)); + } else { + redisPanic("Unknown string encoding"); + } +} + /* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */ static int rewriteAppendOnlyFile(char *filename) { @@ -8482,28 +9096,30 @@ static int rewriteAppendOnlyFile(char *filename) { /* SELECT the new DB */ if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkLong(fp,j) == 0) goto werr; + if (fwriteBulkLongLong(fp,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - robj *key, *o; + sds keystr = dictGetEntryKey(de); + robj key, *o; time_t expiretime; int swapped; - key = dictGetEntryKey(de); + keystr = dictGetEntryKey(de); + o = dictGetEntryVal(de); + initStaticStringObject(key,keystr); /* If the value for this key is swapped, load a preview in memory. * We use a "swapped" flag to remember if we need to free the * value object instead to just increment the ref count anyway * in order to avoid copy-on-write of pages if we are forked() */ - if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) { - o = dictGetEntryVal(de); + if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || + o->storage == REDIS_VM_SWAPPING) { swapped = 0; } else { - o = vmPreviewObject(key); + o = vmPreviewObject(o); swapped = 1; } - expiretime = getExpire(db,key); + expiretime = getExpire(db,&key); /* Save the key and associated value */ if (o->type == REDIS_STRING) { @@ -8511,22 +9127,45 @@ static int rewriteAppendOnlyFile(char *filename) { char cmd[]="*3\r\n$3\r\nSET\r\n"; if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; /* Key and value */ - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { /* Emit the RPUSHes needed to rebuild the list */ - list *list = o->ptr; - listNode *ln; - listIter li; + char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = o->ptr; + unsigned char *p = ziplistIndex(zl,0); + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + while(ziplistGet(p,&vstr,&vlen,&vlong)) { + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (vstr) { + if (fwriteBulkString(fp,(char*)vstr,vlen) == 0) + goto werr; + } else { + if (fwriteBulkLongLong(fp,vlong) == 0) + goto werr; + } + p = ziplistNext(zl,p); + } + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + list *list = o->ptr; + listNode *ln; + listIter li; - listRewind(list,&li); - while((ln = listNext(&li))) { - char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; - robj *eleobj = listNodeValue(ln); + listRewind(list,&li); + while((ln = listNext(&li))) { + robj *eleobj = listNodeValue(ln); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + } + } else { + redisPanic("Unknown list encoding"); } } else if (o->type == REDIS_SET) { /* Emit the SADDs needed to rebuild the set */ @@ -8539,7 +9178,7 @@ static int rewriteAppendOnlyFile(char *filename) { robj *eleobj = dictGetEntryKey(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,eleobj) == 0) goto werr; } dictReleaseIterator(di); @@ -8555,7 +9194,7 @@ static int rewriteAppendOnlyFile(char *filename) { double *score = dictGetEntryVal(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkDouble(fp,*score) == 0) goto werr; if (fwriteBulkObject(fp,eleobj) == 0) goto werr; } @@ -8571,7 +9210,7 @@ static int rewriteAppendOnlyFile(char *filename) { while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkString(fp,(char*)field,flen) == -1) return -1; if (fwriteBulkString(fp,(char*)val,vlen) == -1) @@ -8586,7 +9225,7 @@ static int rewriteAppendOnlyFile(char *filename) { robj *val = dictGetEntryVal(de); if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkObject(fp,field) == -1) return -1; if (fwriteBulkObject(fp,val) == -1) return -1; } @@ -8601,8 +9240,8 @@ static int rewriteAppendOnlyFile(char *filename) { /* If this key is already expired skip it */ if (expiretime < now) continue; if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,key) == 0) goto werr; - if (fwriteBulkLong(fp,expiretime) == 0) goto werr; + if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr; } if (swapped) decrRefCount(o); } @@ -8611,7 +9250,7 @@ static int rewriteAppendOnlyFile(char *filename) { /* Make sure data will not remain on the OS's output buffers */ fflush(fp); - fsync(fileno(fp)); + aof_fsync(fileno(fp)); fclose(fp); /* Use RENAME to make sure the DB file is changed atomically only @@ -8726,6 +9365,17 @@ static void aofRemoveTempFile(pid_t childpid) { /* =================== Virtual Memory - Blocking Side ====================== */ +/* Create a VM pointer object. This kind of objects are used in place of + * values in the key -> value hash table, for swapped out objects. */ +static vmpointer *createVmPointer(int vtype) { + vmpointer *vp = zmalloc(sizeof(vmpointer)); + + vp->type = REDIS_VMPOINTER; + vp->storage = REDIS_VM_SWAPPED; + vp->vtype = vtype; + return vp; +} + static void vmInit(void) { off_t totsize; int pipefds[2]; @@ -8940,30 +9590,33 @@ static int vmWriteObjectOnSwap(robj *o, off_t page) { return REDIS_OK; } -/* Swap the 'val' object relative to 'key' into disk. Store all the information - * needed to later retrieve the object into the key object. +/* Transfers the 'val' object to disk. Store all the information + * a 'vmpointer' object containing all the information needed to load the + * object back later is returned. + * * If we can't find enough contiguous empty pages to swap the object on disk - * REDIS_ERR is returned. */ -static int vmSwapObjectBlocking(robj *key, robj *val) { + * NULL is returned. */ +static vmpointer *vmSwapObjectBlocking(robj *val) { off_t pages = rdbSavedObjectPages(val,NULL); off_t page; + vmpointer *vp; + + assert(val->storage == REDIS_VM_MEMORY); + assert(val->refcount == 1); + if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return NULL; + if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return NULL; - assert(key->storage == REDIS_VM_MEMORY); - assert(key->refcount == 1); - if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR; - if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return REDIS_ERR; - key->vm.page = page; - key->vm.usedpages = pages; - key->storage = REDIS_VM_SWAPPED; - key->vtype = val->type; + vp = createVmPointer(val->type); + vp->page = page; + vp->usedpages = pages; decrRefCount(val); /* Deallocate the object from memory. */ vmMarkPagesUsed(page,pages); - redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)", - (unsigned char*) key->ptr, + redisLog(REDIS_DEBUG,"VM: object %p swapped out at %lld (%lld pages)", + (void*) val, (unsigned long long) page, (unsigned long long) pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; - return REDIS_OK; + return vp; } static robj *vmReadObjectFromSwap(off_t page, int type) { @@ -8985,46 +9638,47 @@ static robj *vmReadObjectFromSwap(off_t page, int type) { return o; } -/* Load the value object relative to the 'key' object from swap to memory. +/* Load the specified object from swap to memory. * The newly allocated object is returned. * * If preview is true the unserialized object is returned to the caller but - * no changes are made to the key object, nor the pages are marked as freed */ -static robj *vmGenericLoadObject(robj *key, int preview) { + * the pages are not marked as freed, nor the vp object is freed. */ +static robj *vmGenericLoadObject(vmpointer *vp, int preview) { robj *val; - redisAssert(key->storage == REDIS_VM_SWAPPED || key->storage == REDIS_VM_LOADING); - val = vmReadObjectFromSwap(key->vm.page,key->vtype); + redisAssert(vp->type == REDIS_VMPOINTER && + (vp->storage == REDIS_VM_SWAPPED || vp->storage == REDIS_VM_LOADING)); + val = vmReadObjectFromSwap(vp->page,vp->vtype); if (!preview) { - key->storage = REDIS_VM_MEMORY; - key->vm.atime = server.unixtime; - vmMarkPagesFree(key->vm.page,key->vm.usedpages); - redisLog(REDIS_DEBUG, "VM: object %s loaded from disk", - (unsigned char*) key->ptr); + redisLog(REDIS_DEBUG, "VM: object %p loaded from disk", (void*)vp); + vmMarkPagesFree(vp->page,vp->usedpages); + zfree(vp); server.vm_stats_swapped_objects--; } else { - redisLog(REDIS_DEBUG, "VM: object %s previewed from disk", - (unsigned char*) key->ptr); + redisLog(REDIS_DEBUG, "VM: object %p previewed from disk", (void*)vp); } server.vm_stats_swapins++; return val; } -/* Plain object loading, from swap to memory */ -static robj *vmLoadObject(robj *key) { +/* Plain object loading, from swap to memory. + * + * 'o' is actually a redisVmPointer structure that will be freed by the call. + * The return value is the loaded object. */ +static robj *vmLoadObject(robj *o) { /* If we are loading the object in background, stop it, we * need to load this object synchronously ASAP. */ - if (key->storage == REDIS_VM_LOADING) - vmCancelThreadedIOJob(key); - return vmGenericLoadObject(key,0); + if (o->storage == REDIS_VM_LOADING) + vmCancelThreadedIOJob(o); + return vmGenericLoadObject((vmpointer*)o,0); } /* Just load the value on disk, without to modify the key. * This is useful when we want to perform some operation on the value * without to really bring it from swap to memory, like while saving the * dataset or rewriting the append only log. */ -static robj *vmPreviewObject(robj *key) { - return vmGenericLoadObject(key,1); +static robj *vmPreviewObject(robj *o) { + return vmGenericLoadObject((vmpointer*)o,1); } /* How a good candidate is this object for swapping? @@ -9039,14 +9693,18 @@ static robj *vmPreviewObject(robj *key) { * proportionally, this is why we use the logarithm. This algorithm is * just a first try and will probably be tuned later. */ static double computeObjectSwappability(robj *o) { - time_t age = server.unixtime - o->vm.atime; - long asize = 0; + /* actual age can be >= minage, but not < minage. As we use wrapping + * 21 bit clocks with minutes resolution for the LRU. */ + time_t minage = abs(server.lruclock - o->lru); + long asize = 0, elesize; + robj *ele; list *l; + listNode *ln; dict *d; struct dictEntry *de; int z; - if (age <= 0) return 0; + if (minage <= 0) return 0; switch(o->type) { case REDIS_STRING: if (o->encoding != REDIS_ENCODING_RAW) { @@ -9056,18 +9714,18 @@ static double computeObjectSwappability(robj *o) { } break; case REDIS_LIST: - l = o->ptr; - listNode *ln = listFirst(l); - - asize = sizeof(list); - if (ln) { - robj *ele = ln->value; - long elesize; - - elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); - asize += (sizeof(listNode)+elesize)*listLength(l); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + asize = sizeof(*o)+ziplistSize(o->ptr); + } else { + l = o->ptr; + ln = listFirst(l); + asize = sizeof(list); + if (ln) { + ele = ln->value; + elesize = (ele->encoding == REDIS_ENCODING_RAW) ? + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); + asize += (sizeof(listNode)+elesize)*listLength(l); + } } break; case REDIS_SET: @@ -9078,14 +9736,10 @@ static double computeObjectSwappability(robj *o) { asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d)); if (z) asize += sizeof(zset)-sizeof(dict); if (dictSize(d)) { - long elesize; - robj *ele; - de = dictGetRandomKey(d); ele = dictGetEntryKey(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); asize += (sizeof(struct dictEntry)+elesize)*dictSize(d); if (z) asize += sizeof(zskiplistNode)*dictSize(d); } @@ -9106,24 +9760,19 @@ static double computeObjectSwappability(robj *o) { d = o->ptr; asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d)); if (dictSize(d)) { - long elesize; - robj *ele; - de = dictGetRandomKey(d); ele = dictGetEntryKey(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); ele = dictGetEntryVal(de); elesize = (ele->encoding == REDIS_ENCODING_RAW) ? - (sizeof(*o)+sdslen(ele->ptr)) : - sizeof(*o); + (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o); asize += (sizeof(struct dictEntry)+elesize)*dictSize(d); } } break; } - return (double)age*log(1+asize); + return (double)minage*log(1+asize); } /* Try to swap an object that's a good candidate for swapping. @@ -9137,7 +9786,8 @@ static int vmSwapOneObject(int usethreads) { struct dictEntry *best = NULL; double best_swappability = 0; redisDb *best_db = NULL; - robj *key, *val; + robj *val; + sds key; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -9153,16 +9803,14 @@ static int vmSwapOneObject(int usethreads) { if (maxtries) maxtries--; de = dictGetRandomKey(db->dict); - key = dictGetEntryKey(de); val = dictGetEntryVal(de); /* Only swap objects that are currently in memory. * - * Also don't swap shared objects if threaded VM is on, as we - * try to ensure that the main thread does not touch the + * Also don't swap shared objects: not a good idea in general and + * we need to ensure that the main thread does not touch the * object while the I/O thread is using it, but we can't * control other keys without adding additional mutex. */ - if (key->storage != REDIS_VM_MEMORY || - (server.vm_max_threads != 0 && val->refcount != 1)) { + if (val->storage != REDIS_VM_MEMORY || val->refcount != 1) { if (maxtries) i--; /* don't count this try */ continue; } @@ -9179,21 +9827,19 @@ static int vmSwapOneObject(int usethreads) { val = dictGetEntryVal(best); redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f", - key->ptr, best_swappability); + key, best_swappability); - /* Unshare the key if needed */ - if (key->refcount > 1) { - robj *newkey = dupStringObject(key); - decrRefCount(key); - key = dictGetEntryKey(best) = newkey; - } /* Swap it */ if (usethreads) { - vmSwapObjectThreaded(key,val,best_db); + robj *keyobj = createStringObject(key,sdslen(key)); + vmSwapObjectThreaded(keyobj,val,best_db); + decrRefCount(keyobj); return REDIS_OK; } else { - if (vmSwapObjectBlocking(key,val) == REDIS_OK) { - dictGetEntryVal(best) = NULL; + vmpointer *vp; + + if ((vp = vmSwapObjectBlocking(val)) != NULL) { + dictGetEntryVal(best) = vp; return REDIS_OK; } else { return REDIS_ERR; @@ -9216,30 +9862,20 @@ static int vmCanSwapOut(void) { return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1); } -/* Delete a key if swapped. Returns 1 if the key was found, was swapped - * and was deleted. Otherwise 0 is returned. */ -static int deleteIfSwapped(redisDb *db, robj *key) { - dictEntry *de; - robj *foundkey; - - if ((de = dictFind(db->dict,key)) == NULL) return 0; - foundkey = dictGetEntryKey(de); - if (foundkey->storage == REDIS_VM_MEMORY) return 0; - deleteKey(db,key); - return 1; -} - /* =================== Virtual Memory - Threaded I/O ======================= */ static void freeIOJob(iojob *j) { if ((j->type == REDIS_IOJOB_PREPARE_SWAP || j->type == REDIS_IOJOB_DO_SWAP || j->type == REDIS_IOJOB_LOAD) && j->val != NULL) + { + /* we fix the storage type, otherwise decrRefCount() will try to + * kill the I/O thread Job (that does no longer exists). */ + if (j->val->storage == REDIS_VM_SWAPPING) + j->val->storage = REDIS_VM_MEMORY; decrRefCount(j->val); - /* We don't decrRefCount the j->key field as we did't incremented - * the count creating IO Jobs. This is because the key field here is - * just used as an indentifier and if a key is removed the Job should - * never be touched again. */ + } + decrRefCount(j->key); zfree(j); } @@ -9260,7 +9896,6 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, while((retval = read(fd,buf,1)) == 1) { iojob *j; listNode *ln; - robj *key; struct dictEntry *de; redisLog(REDIS_DEBUG,"Processing I/O completed job"); @@ -9283,27 +9918,26 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, } /* Post process it in the main thread, as there are things we * can do just here to avoid race conditions and/or invasive locks */ - redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount); - de = dictFind(j->db->dict,j->key); - assert(de != NULL); - key = dictGetEntryKey(de); + redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr); + de = dictFind(j->db->dict,j->key->ptr); + redisAssert(de != NULL); if (j->type == REDIS_IOJOB_LOAD) { redisDb *db; + vmpointer *vp = dictGetEntryVal(de); /* Key loaded, bring it at home */ - key->storage = REDIS_VM_MEMORY; - key->vm.atime = server.unixtime; - vmMarkPagesFree(key->vm.page,key->vm.usedpages); + vmMarkPagesFree(vp->page,vp->usedpages); redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)", - (unsigned char*) key->ptr); + (unsigned char*) j->key->ptr); server.vm_stats_swapped_objects--; server.vm_stats_swapins++; dictGetEntryVal(de) = j->val; incrRefCount(j->val); db = j->db; - freeIOJob(j); /* Handle clients waiting for this key to be loaded. */ - handleClientsBlockedOnSwappedKey(db,key); + handleClientsBlockedOnSwappedKey(db,j->key); + freeIOJob(j); + zfree(vp); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { /* Now we know the amount of pages required to swap this object. * Let's find some space for it, and queue this task again @@ -9313,8 +9947,8 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, { /* Ooops... no space or we can't swap as there is * a fork()ed Redis trying to save stuff on disk. */ + j->val->storage = REDIS_VM_MEMORY; /* undo operation */ freeIOJob(j); - key->storage = REDIS_VM_MEMORY; /* undo operation */ } else { /* Note that we need to mark this pages as used now, * if the job will be canceled, we'll mark them as freed @@ -9326,28 +9960,29 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, unlockThreadedIO(); } } else if (j->type == REDIS_IOJOB_DO_SWAP) { - robj *val; + vmpointer *vp; /* Key swapped. We can finally free some memory. */ - if (key->storage != REDIS_VM_SWAPPING) { - printf("key->storage: %d\n",key->storage); - printf("key->name: %s\n",(char*)key->ptr); - printf("key->refcount: %d\n",key->refcount); + if (j->val->storage != REDIS_VM_SWAPPING) { + vmpointer *vp = (vmpointer*) j->id; + printf("storage: %d\n",vp->storage); + printf("key->name: %s\n",(char*)j->key->ptr); printf("val: %p\n",(void*)j->val); printf("val->type: %d\n",j->val->type); printf("val->ptr: %s\n",(char*)j->val->ptr); } - redisAssert(key->storage == REDIS_VM_SWAPPING); - val = dictGetEntryVal(de); - key->vm.page = j->page; - key->vm.usedpages = j->pages; - key->storage = REDIS_VM_SWAPPED; - key->vtype = j->val->type; - decrRefCount(val); /* Deallocate the object from memory. */ - dictGetEntryVal(de) = NULL; + redisAssert(j->val->storage == REDIS_VM_SWAPPING); + vp = createVmPointer(j->val->type); + vp->page = j->page; + vp->usedpages = j->pages; + dictGetEntryVal(de) = vp; + /* Fix the storage otherwise decrRefCount will attempt to + * remove the associated I/O job */ + j->val->storage = REDIS_VM_MEMORY; + decrRefCount(j->val); redisLog(REDIS_DEBUG, "VM: object %s swapped out at %lld (%lld pages) (threaded)", - (unsigned char*) key->ptr, + (unsigned char*) j->key->ptr, (unsigned long long) j->page, (unsigned long long) j->pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; @@ -9402,7 +10037,7 @@ static void vmCancelThreadedIOJob(robj *o) { assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING); again: lockThreadedIO(); - /* Search for a matching key in one of the queues */ + /* Search for a matching object in one of the queues */ for (i = 0; i < 3; i++) { listNode *ln; listIter li; @@ -9412,9 +10047,9 @@ again: iojob *job = ln->value; if (job->canceled) continue; /* Skip this, already canceled. */ - if (job->key == o) { - redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n", - (void*)job, (char*)o->ptr, job->type, i); + if (job->id == o) { + redisLog(REDIS_DEBUG,"*** CANCELED %p (key %s) (type %d) (LIST ID %d)\n", + (void*)job, (char*)job->key->ptr, job->type, i); /* Mark the pages as free since the swap didn't happened * or happened but is now discarded. */ if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP) @@ -9462,12 +10097,14 @@ again: else if (o->storage == REDIS_VM_SWAPPING) o->storage = REDIS_VM_MEMORY; unlockThreadedIO(); + redisLog(REDIS_DEBUG,"*** DONE"); return; } } } unlockThreadedIO(); - assert(1 != 1); /* We should never reach this */ + printf("Not found: %p\n", (void*)o); + redisAssert(1 != 1); /* We should never reach this */ } static void *IOThreadEntryPoint(void *arg) { @@ -9500,7 +10137,8 @@ static void *IOThreadEntryPoint(void *arg) { /* Process the Job */ if (j->type == REDIS_IOJOB_LOAD) { - j->val = vmReadObjectFromSwap(j->page,j->key->vtype); + vmpointer *vp = (vmpointer*)j->id; + j->val = vmReadObjectFromSwap(j->page,vp->vtype); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { FILE *fp = fopen("/dev/null","w+"); j->pages = rdbSavedObjectPages(j->val,fp); @@ -9596,18 +10234,16 @@ static void queueIOJob(iojob *j) { static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { iojob *j; - assert(key->storage == REDIS_VM_MEMORY); - assert(key->refcount == 1); - j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_PREPARE_SWAP; j->db = db; j->key = key; - j->val = val; + incrRefCount(key); + j->id = j->val = val; incrRefCount(val); j->canceled = 0; j->thread = (pthread_t) -1; - key->storage = REDIS_VM_SWAPPING; + val->storage = REDIS_VM_SWAPPING; lockThreadedIO(); queueIOJob(j); @@ -9629,9 +10265,9 @@ static int waitForSwappedKey(redisClient *c, robj *key) { /* If the key does not exist or is already in RAM we don't need to * block the client at all. */ - de = dictFind(c->db->dict,key); + de = dictFind(c->db->dict,key->ptr); if (de == NULL) return 0; - o = dictGetEntryKey(de); + o = dictGetEntryVal(de); if (o->storage == REDIS_VM_MEMORY) { return 0; } else if (o->storage == REDIS_VM_SWAPPING) { @@ -9665,14 +10301,16 @@ static int waitForSwappedKey(redisClient *c, robj *key) { /* Are we already loading the key from disk? If not create a job */ if (o->storage == REDIS_VM_SWAPPED) { iojob *j; + vmpointer *vp = (vmpointer*)o; o->storage = REDIS_VM_LOADING; j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_LOAD; j->db = c->db; - j->key = o; - j->key->vtype = o->vtype; - j->page = o->vm.page; + j->id = (robj*)vp; + j->key = key; + incrRefCount(key); + j->page = vp->page; j->val = NULL; j->canceled = 0; j->thread = (pthread_t) -1; @@ -9797,6 +10435,8 @@ static int dontWaitForSwappedKey(redisClient *c, robj *key) { return listLength(c->io_keys) == 0; } +/* Every time we now a key was loaded back in memory, we handle clients + * waiting for this key if any. */ static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { struct dictEntry *de; list *l; @@ -9826,6 +10466,8 @@ static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { static void configSetCommand(redisClient *c) { robj *o = getDecodedObject(c->argv[3]); + long long ll; + if (!strcasecmp(c->argv[2]->ptr,"dbfilename")) { zfree(server.dbfilename); server.dbfilename = zstrdup(o->ptr); @@ -9836,7 +10478,13 @@ static void configSetCommand(redisClient *c) { zfree(server.masterauth); server.masterauth = zstrdup(o->ptr); } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) { - server.maxmemory = strtoll(o->ptr, NULL, 10); + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll < 0) goto badfmt; + server.maxmemory = ll; + } else if (!strcasecmp(c->argv[2]->ptr,"timeout")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll < 0 || ll > LONG_MAX) goto badfmt; + server.maxidletime = ll; } else if (!strcasecmp(c->argv[2]->ptr,"appendfsync")) { if (!strcasecmp(o->ptr,"no")) { server.appendfsync = APPENDFSYNC_NO; @@ -9847,6 +10495,28 @@ static void configSetCommand(redisClient *c) { } else { goto badfmt; } + } else if (!strcasecmp(c->argv[2]->ptr,"no-appendfsync-on-rewrite")) { + int yn = yesnotoi(o->ptr); + + if (yn == -1) goto badfmt; + server.no_appendfsync_on_rewrite = yn; + } else if (!strcasecmp(c->argv[2]->ptr,"appendonly")) { + int old = server.appendonly; + int new = yesnotoi(o->ptr); + + if (new == -1) goto badfmt; + if (old != new) { + if (new == 0) { + stopAppendOnly(); + } else { + if (startAppendOnly() == REDIS_ERR) { + addReplySds(c,sdscatprintf(sdsempty(), + "-ERR Unable to turn on AOF. Check server logs.\r\n")); + decrRefCount(o); + return; + } + } + } } else if (!strcasecmp(c->argv[2]->ptr,"save")) { int vlen, j; sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen); @@ -9927,11 +10597,29 @@ static void configGetCommand(redisClient *c) { if (stringmatch(pattern,"maxmemory",0)) { char buf[128]; - snprintf(buf,128,"%llu\n",server.maxmemory); + ll2string(buf,128,server.maxmemory); addReplyBulkCString(c,"maxmemory"); addReplyBulkCString(c,buf); matches++; } + if (stringmatch(pattern,"timeout",0)) { + char buf[128]; + + ll2string(buf,128,server.maxidletime); + addReplyBulkCString(c,"timeout"); + addReplyBulkCString(c,buf); + matches++; + } + if (stringmatch(pattern,"appendonly",0)) { + addReplyBulkCString(c,"appendonly"); + addReplyBulkCString(c,server.appendonly ? "yes" : "no"); + matches++; + } + if (stringmatch(pattern,"no-appendfsync-on-rewrite",0)) { + addReplyBulkCString(c,"no-appendfsync-on-rewrite"); + addReplyBulkCString(c,server.no_appendfsync_on_rewrite ? "yes" : "no"); + matches++; + } if (stringmatch(pattern,"appendfsync",0)) { char *policy; @@ -10251,6 +10939,147 @@ static void publishCommand(redisClient *c) { addReplyLongLong(c,receivers); } +/* ===================== WATCH (CAS alike for MULTI/EXEC) =================== + * + * The implementation uses a per-DB hash table mapping keys to list of clients + * WATCHing those keys, so that given a key that is going to be modified + * we can mark all the associated clients as dirty. + * + * Also every client contains a list of WATCHed keys so that's possible to + * un-watch such keys when the client is freed or when UNWATCH is called. */ + +/* In the client->watched_keys list we need to use watchedKey structures + * as in order to identify a key in Redis we need both the key name and the + * DB */ +typedef struct watchedKey { + robj *key; + redisDb *db; +} watchedKey; + +/* Watch for the specified key */ +static void watchForKey(redisClient *c, robj *key) { + list *clients = NULL; + listIter li; + listNode *ln; + watchedKey *wk; + + /* Check if we are already watching for this key */ + listRewind(c->watched_keys,&li); + while((ln = listNext(&li))) { + wk = listNodeValue(ln); + if (wk->db == c->db && equalStringObjects(key,wk->key)) + return; /* Key already watched */ + } + /* This key is not already watched in this DB. Let's add it */ + clients = dictFetchValue(c->db->watched_keys,key); + if (!clients) { + clients = listCreate(); + dictAdd(c->db->watched_keys,key,clients); + incrRefCount(key); + } + listAddNodeTail(clients,c); + /* Add the new key to the lits of keys watched by this client */ + wk = zmalloc(sizeof(*wk)); + wk->key = key; + wk->db = c->db; + incrRefCount(key); + listAddNodeTail(c->watched_keys,wk); +} + +/* Unwatch all the keys watched by this client. To clean the EXEC dirty + * flag is up to the caller. */ +static void unwatchAllKeys(redisClient *c) { + listIter li; + listNode *ln; + + if (listLength(c->watched_keys) == 0) return; + listRewind(c->watched_keys,&li); + while((ln = listNext(&li))) { + list *clients; + watchedKey *wk; + + /* Lookup the watched key -> clients list and remove the client + * from the list */ + wk = listNodeValue(ln); + clients = dictFetchValue(wk->db->watched_keys, wk->key); + assert(clients != NULL); + listDelNode(clients,listSearchKey(clients,c)); + /* Kill the entry at all if this was the only client */ + if (listLength(clients) == 0) + dictDelete(wk->db->watched_keys, wk->key); + /* Remove this watched key from the client->watched list */ + listDelNode(c->watched_keys,ln); + decrRefCount(wk->key); + zfree(wk); + } +} + +/* "Touch" a key, so that if this key is being WATCHed by some client the + * next EXEC will fail. */ +static void touchWatchedKey(redisDb *db, robj *key) { + list *clients; + listIter li; + listNode *ln; + + if (dictSize(db->watched_keys) == 0) return; + clients = dictFetchValue(db->watched_keys, key); + if (!clients) return; + + /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ + /* Check if we are already watching for this key */ + listRewind(clients,&li); + while((ln = listNext(&li))) { + redisClient *c = listNodeValue(ln); + + c->flags |= REDIS_DIRTY_CAS; + } +} + +/* On FLUSHDB or FLUSHALL all the watched keys that are present before the + * flush but will be deleted as effect of the flushing operation should + * be touched. "dbid" is the DB that's getting the flush. -1 if it is + * a FLUSHALL operation (all the DBs flushed). */ +static void touchWatchedKeysOnFlush(int dbid) { + listIter li1, li2; + listNode *ln; + + /* For every client, check all the waited keys */ + listRewind(server.clients,&li1); + while((ln = listNext(&li1))) { + redisClient *c = listNodeValue(ln); + listRewind(c->watched_keys,&li2); + while((ln = listNext(&li2))) { + watchedKey *wk = listNodeValue(ln); + + /* For every watched key matching the specified DB, if the + * key exists, mark the client as dirty, as the key will be + * removed. */ + if (dbid == -1 || wk->db->id == dbid) { + if (dictFind(wk->db->dict, wk->key->ptr) != NULL) + c->flags |= REDIS_DIRTY_CAS; + } + } + } +} + +static void watchCommand(redisClient *c) { + int j; + + if (c->flags & REDIS_MULTI) { + addReplySds(c,sdsnew("-ERR WATCH inside MULTI is not allowed\r\n")); + return; + } + for (j = 1; j < c->argc; j++) + watchForKey(c,c->argv[j]); + addReply(c,shared.ok); +} + +static void unwatchCommand(redisClient *c) { + unwatchAllKeys(c); + c->flags &= (~REDIS_DIRTY_CAS); + addReply(c,shared.ok); +} + /* ================================= Debugging ============================== */ /* Compute the sha1 of string at 's' with 'len' bytes long. @@ -10337,37 +11166,35 @@ static void computeDatasetDigest(unsigned char *final) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { - robj *key, *o; + sds key; + robj *keyobj, *o; time_t expiretime; memset(digest,0,20); /* This key-val digest */ key = dictGetEntryKey(de); - mixObjectDigest(digest,key); - if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING) { - o = dictGetEntryVal(de); - incrRefCount(o); - } else { - o = vmPreviewObject(key); - } + keyobj = createStringObject(key,sdslen(key)); + + mixDigest(digest,key,sdslen(key)); + + /* Make sure the key is loaded if VM is active */ + o = lookupKeyRead(db,keyobj); + aux = htonl(o->type); mixDigest(digest,&aux,sizeof(aux)); - expiretime = getExpire(db,key); + expiretime = getExpire(db,keyobj); /* Save the key and associated value */ if (o->type == REDIS_STRING) { mixObjectDigest(digest,o); } else if (o->type == REDIS_LIST) { - list *list = o->ptr; - listNode *ln; - listIter li; - - listRewind(list,&li); - while((ln = listNext(&li))) { - robj *eleobj = listNodeValue(ln); - + listTypeIterator *li = listTypeInitIterator(o,0,REDIS_TAIL); + listTypeEntry entry; + while(listTypeNext(li,&entry)) { + robj *eleobj = listTypeGet(&entry); mixObjectDigest(digest,eleobj); + decrRefCount(eleobj); } + listTypeReleaseIterator(li); } else if (o->type == REDIS_SET) { dict *set = o->ptr; dictIterator *di = dictGetIterator(set); @@ -10397,31 +11224,31 @@ static void computeDatasetDigest(unsigned char *final) { } dictReleaseIterator(di); } else if (o->type == REDIS_HASH) { - hashIterator *hi; + hashTypeIterator *hi; robj *obj; - hi = hashInitIterator(o); - while (hashNext(hi) != REDIS_ERR) { + hi = hashTypeInitIterator(o); + while (hashTypeNext(hi) != REDIS_ERR) { unsigned char eledigest[20]; memset(eledigest,0,20); - obj = hashCurrent(hi,REDIS_HASH_KEY); + obj = hashTypeCurrent(hi,REDIS_HASH_KEY); mixObjectDigest(eledigest,obj); decrRefCount(obj); - obj = hashCurrent(hi,REDIS_HASH_VALUE); + obj = hashTypeCurrent(hi,REDIS_HASH_VALUE); mixObjectDigest(eledigest,obj); decrRefCount(obj); xorDigest(digest,eledigest,20); } - hashReleaseIterator(hi); + hashTypeReleaseIterator(hi); } else { redisPanic("Unknown object type"); } - decrRefCount(o); /* If the key has an expire, add it to the mix */ if (expiretime != -1) xorDigest(digest,"!!expire!!",10); /* We can finally xor the key-val digest to the final digest */ xorDigest(final,digest,20); + decrRefCount(keyobj); } dictReleaseIterator(di); } @@ -10451,17 +11278,16 @@ static void debugCommand(redisClient *c) { redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF"); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) { - dictEntry *de = dictFind(c->db->dict,c->argv[2]); - robj *key, *val; + dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr); + robj *val; if (!de) { addReply(c,shared.nokeyerr); return; } - key = dictGetEntryKey(de); val = dictGetEntryVal(de); - if (!server.vm_enabled || (key->storage == REDIS_VM_MEMORY || - key->storage == REDIS_VM_SWAPPING)) { + if (!server.vm_enabled || (val->storage == REDIS_VM_MEMORY || + val->storage == REDIS_VM_SWAPPING)) { char *strenc; char buf[128]; @@ -10472,23 +11298,25 @@ static void debugCommand(redisClient *c) { strenc = buf; } addReplySds(c,sdscatprintf(sdsempty(), - "+Key at:%p refcount:%d, value at:%p refcount:%d " + "+Value at:%p refcount:%d " "encoding:%s serializedlength:%lld\r\n", - (void*)key, key->refcount, (void*)val, val->refcount, + (void*)val, val->refcount, strenc, (long long) rdbSavedObjectLen(val,NULL))); } else { + vmpointer *vp = (vmpointer*) val; addReplySds(c,sdscatprintf(sdsempty(), - "+Key at:%p refcount:%d, value swapped at: page %llu " + "+Value swapped at: page %llu " "using %llu pages\r\n", - (void*)key, key->refcount, (unsigned long long) key->vm.page, - (unsigned long long) key->vm.usedpages)); + (unsigned long long) vp->page, + (unsigned long long) vp->usedpages)); } } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) { lookupKeyRead(c->db,c->argv[2]); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) { - dictEntry *de = dictFind(c->db->dict,c->argv[2]); - robj *key, *val; + dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr); + robj *val; + vmpointer *vp; if (!server.vm_enabled) { addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n")); @@ -10498,19 +11326,14 @@ static void debugCommand(redisClient *c) { addReply(c,shared.nokeyerr); return; } - key = dictGetEntryKey(de); val = dictGetEntryVal(de); - /* If the key is shared we want to create a copy */ - if (key->refcount > 1) { - robj *newkey = dupStringObject(key); - decrRefCount(key); - key = dictGetEntryKey(de) = newkey; - } /* Swap it */ - if (key->storage != REDIS_VM_MEMORY) { + if (val->storage != REDIS_VM_MEMORY) { addReplySds(c,sdsnew("-ERR This key is not in memory\r\n")); - } else if (vmSwapObjectBlocking(key,val) == REDIS_OK) { - dictGetEntryVal(de) = NULL; + } else if (val->refcount != 1) { + addReplySds(c,sdsnew("-ERR Object is shared\r\n")); + } else if ((vp = vmSwapObjectBlocking(val)) != NULL) { + dictGetEntryVal(de) = vp; addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -10531,7 +11354,8 @@ static void debugCommand(redisClient *c) { } snprintf(buf,sizeof(buf),"value:%lu",j); val = createStringObject(buf,strlen(buf)); - dictAdd(c->db->dict,key,val); + dbAdd(c->db,key,val); + decrRefCount(key); } addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) { @@ -10553,7 +11377,7 @@ static void debugCommand(redisClient *c) { static void _redisAssert(char *estr, char *file, int line) { redisLog(REDIS_WARNING,"=== ASSERTION FAILED ==="); - redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true\n",file,line,estr); + redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true",file,line,estr); #ifdef HAVE_BACKTRACE redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)"); *((char*)-1) = 'x'; @@ -10618,7 +11442,8 @@ static void daemonize(void) { } static void version() { - printf("Redis server version %s\n", REDIS_VERSION); + printf("Redis server version %s (%s:%d)\n", REDIS_VERSION, + redisGitSHA1(), atoi(redisGitDirty()) > 0); exit(0); } @@ -10632,6 +11457,7 @@ int main(int argc, char **argv) { time_t start; initServerConfig(); + sortCommandTable(); if (argc == 2) { if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--version") == 0) version(); @@ -10732,6 +11558,13 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) { _exit(0); } +static void sigtermHandler(int sig) { + REDIS_NOTUSED(sig); + + redisLog(REDIS_WARNING,"SIGTERM received, scheduling shutting down..."); + server.shutdown_asap = 1; +} + static void setupSigSegvAction(void) { struct sigaction act; @@ -10745,6 +11578,10 @@ static void setupSigSegvAction(void) { sigaction (SIGFPE, &act, NULL); sigaction (SIGILL, &act, NULL); sigaction (SIGBUS, &act, NULL); + + act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND; + act.sa_handler = sigtermHandler; + sigaction (SIGTERM, &act, NULL); return; }