X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/177727542c871330445700299aed803a1384b0ed..ca3f830b32a0a8303a5a761d6212925d9b5ac365:/redis.c diff --git a/redis.c b/redis.c index ad9ad81e..33db9a1a 100644 --- a/redis.c +++ b/redis.c @@ -27,7 +27,7 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#define REDIS_VERSION "1.3.8" +#define REDIS_VERSION "2.1.1" #include "fmacros.h" #include "config.h" @@ -37,8 +37,6 @@ #include #include #include -#define __USE_POSIX199309 -#define __USE_UNIX98 #include #ifdef HAVE_BACKTRACE @@ -59,6 +57,7 @@ #include #include #include +#include #include #include @@ -75,7 +74,9 @@ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ #include "lzf.h" /* LZF compression library */ #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ -#include "zipmap.h" +#include "zipmap.h" /* Compact dictionary-alike data structure */ +#include "sha1.h" /* SHA1 is used for DEBUG DIGEST */ +#include "release.h" /* Release and/or git repository information */ /* Error codes */ #define REDIS_OK 0 @@ -189,6 +190,7 @@ static char* strencoding[] = { #define REDIS_MULTI 8 /* This client is in a MULTI context */ #define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */ #define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ +#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ /* Slave replication state - slave side */ #define REDIS_REPL_NONE 0 /* No active replication */ @@ -284,8 +286,9 @@ typedef struct redisObject { typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ - dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */ + dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ dict *io_keys; /* Keys with clients waiting for VM I/O */ + dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -323,13 +326,14 @@ typedef struct redisClient { long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - robj **blockingkeys; /* The key we are waiting to terminate a blocking + robj **blocking_keys; /* The key we are waiting to terminate a blocking * operation such as BLPOP. Otherwise NULL. */ - int blockingkeysnum; /* Number of blocking keys */ + int blocking_keys_num; /* Number of blocking keys */ time_t blockingto; /* Blocking operation timeout. If UNIX current time * is >= blockingto then the operation timed out. */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ + list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ } redisClient; @@ -365,6 +369,7 @@ struct redisServer { int daemonize; int appendonly; int appendfsync; + int shutdown_asap; time_t lastfsync; int appendfd; int appendseldb; @@ -372,6 +377,7 @@ struct redisServer { pid_t bgsavechildpid; pid_t bgrewritechildpid; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ + sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; int saveparamslen; char *logfile; @@ -379,7 +385,6 @@ struct redisServer { char *dbfilename; char *appendfilename; char *requirepass; - int shareobjects; int rdbcompression; int activerehashing; /* Replication related */ @@ -452,6 +457,7 @@ typedef struct pubsubPattern { } pubsubPattern; typedef void redisCommandProc(redisClient *c); +typedef void redisVmPreloadProc(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); struct redisCommand { char *name; redisCommandProc *proc; @@ -460,7 +466,7 @@ struct redisCommand { /* Use a function to determine which keys need to be loaded * in the background prior to executing this command. Takes precedence * over vm_firstkey and others, ignored when NULL */ - redisCommandProc *vm_preload_proc; + redisVmPreloadProc *vm_preload_proc; /* What keys should be loaded in background when calling this command? */ int vm_firstkey; /* The first argument that's a key (0 = no keys) */ int vm_lastkey; /* THe last argument that's a key */ @@ -516,8 +522,9 @@ struct sharedObjectsStruct { *outofrangeerr, *plus, *select0, *select1, *select2, *select3, *select4, *select5, *select6, *select7, *select8, *select9, - *messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3, - *psubscribebulk, *punsubscribebulk, *integers[REDIS_SHARED_INTEGERS]; + *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3, + *mbulk4, *psubscribebulk, *punsubscribebulk, + *integers[REDIS_SHARED_INTEGERS]; } shared; /* Global vars that are actally used as constants. The following double @@ -558,6 +565,8 @@ static int rdbSaveBackground(char *filename); static robj *createStringObject(char *ptr, size_t len); static robj *dupStringObject(robj *o); static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); +static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc); +static void flushAppendOnlyFile(void); static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc); static int syncWithMaster(void); static robj *tryObjectEncoding(robj *o); @@ -607,8 +616,9 @@ static robj *vmReadObjectFromSwap(off_t page, int type); static void waitEmptyIOJobsQueue(void); static void vmReopenSwapFile(void); static int vmFreePage(off_t page); -static void zunionInterBlockClientOnSwappedKeys(redisClient *c); -static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c); +static void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); +static void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); +static int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); static int dontWaitForSwappedKey(redisClient *c, robj *key); static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); @@ -621,13 +631,21 @@ static int pubsubUnsubscribeAllPatterns(redisClient *c, int notify); static void freePubsubPattern(void *p); static int listMatchPubsubPattern(void *a, void *b); static int compareStringObjects(robj *a, robj *b); +static int equalStringObjects(robj *a, robj *b); static void usage(); +static int rewriteAppendOnlyFileBackground(void); +static int vmSwapObjectBlocking(robj *key, robj *val); +static int prepareForShutdown(); +static void touchWatchedKey(redisDb *db, robj *key); +static void touchWatchedKeysOnFlush(int dbid); +static void unwatchAllKeys(redisClient *c); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); static void echoCommand(redisClient *c); static void setCommand(redisClient *c); static void setnxCommand(redisClient *c); +static void setexCommand(redisClient *c); static void getCommand(redisClient *c); static void delCommand(redisClient *c); static void existsCommand(redisClient *c); @@ -714,8 +732,8 @@ static void hmgetCommand(redisClient *c); static void hdelCommand(redisClient *c); static void hlenCommand(redisClient *c); static void zremrangebyrankCommand(redisClient *c); -static void zunionCommand(redisClient *c); -static void zinterCommand(redisClient *c); +static void zunionstoreCommand(redisClient *c); +static void zinterstoreCommand(redisClient *c); static void hkeysCommand(redisClient *c); static void hvalsCommand(redisClient *c); static void hgetallCommand(redisClient *c); @@ -727,6 +745,8 @@ static void unsubscribeCommand(redisClient *c); static void psubscribeCommand(redisClient *c); static void punsubscribeCommand(redisClient *c); static void publishCommand(redisClient *c); +static void watchCommand(redisClient *c); +static void unwatchCommand(redisClient *c); /*================================= Globals ================================= */ @@ -736,6 +756,7 @@ static struct redisCommand cmdTable[] = { {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, + {"setex",setexCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, {"substr",substrCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, {"del",delCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, @@ -775,8 +796,8 @@ static struct redisCommand cmdTable[] = { {"zrem",zremCommand,3,REDIS_CMD_BULK,NULL,1,1,1}, {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zremrangebyrank",zremrangebyrankCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, - {"zunion",zunionCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, - {"zinter",zinterCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, + {"zunionstore",zunionstoreCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, + {"zinterstore",zinterstoreCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,zunionInterBlockClientOnSwappedKeys,0,0,0}, {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,NULL,1,1,1}, {"zcount",zcountCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, @@ -821,7 +842,7 @@ static struct redisCommand cmdTable[] = { {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"type",typeCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"multi",multiCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, - {"exec",execCommand,1,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,0,0,0}, + {"exec",execCommand,1,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,execBlockClientOnSwappedKeys,0,0,0}, {"discard",discardCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"sync",syncCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, @@ -838,6 +859,8 @@ static struct redisCommand cmdTable[] = { {"psubscribe",psubscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, {"punsubscribe",punsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0}, {"publish",publishCommand,3,REDIS_CMD_BULK|REDIS_CMD_FORCE_REPLICATION,NULL,0,0,0}, + {"watch",watchCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, + {"unwatch",unwatchCommand,1,REDIS_CMD_INLINE,NULL,0,0,0}, {NULL,NULL,0,0,NULL,0,0,0} }; @@ -970,6 +993,77 @@ static int stringmatch(const char *pattern, const char *string, int nocase) { return stringmatchlen(pattern,strlen(pattern),string,strlen(string),nocase); } +/* Convert a string representing an amount of memory into the number of + * bytes, so for instance memtoll("1Gi") will return 1073741824 that is + * (1024*1024*1024). + * + * On parsing error, if *err is not NULL, it's set to 1, otherwise it's + * set to 0 */ +static long long memtoll(const char *p, int *err) { + const char *u; + char buf[128]; + long mul; /* unit multiplier */ + long long val; + unsigned int digits; + + if (err) *err = 0; + /* Search the first non digit character. */ + u = p; + if (*u == '-') u++; + while(*u && isdigit(*u)) u++; + if (*u == '\0' || !strcasecmp(u,"b")) { + mul = 1; + } else if (!strcasecmp(u,"k")) { + mul = 1000; + } else if (!strcasecmp(u,"kb")) { + mul = 1024; + } else if (!strcasecmp(u,"m")) { + mul = 1000*1000; + } else if (!strcasecmp(u,"mb")) { + mul = 1024*1024; + } else if (!strcasecmp(u,"g")) { + mul = 1000L*1000*1000; + } else if (!strcasecmp(u,"gb")) { + mul = 1024L*1024*1024; + } else { + if (err) *err = 1; + mul = 1; + } + digits = u-p; + if (digits >= sizeof(buf)) { + if (err) *err = 1; + return LLONG_MAX; + } + memcpy(buf,p,digits); + buf[digits] = '\0'; + val = strtoll(buf,NULL,10); + return val*mul; +} + +/* Convert a long long into a string. Returns the number of + * characters needed to represent the number, that can be shorter if passed + * buffer length is not enough to store the whole number. */ +static int ll2string(char *s, size_t len, long long value) { + char buf[32], *p; + unsigned long long v; + size_t l; + + if (len == 0) return 0; + v = (value < 0) ? -value : value; + p = buf+31; /* point to the last character */ + do { + *p-- = '0'+(v%10); + v /= 10; + } while(v); + if (value < 0) *p-- = '-'; + p++; + l = 32-(p-buf); + if (l+1 > len) l = len-1; /* Make sure it fits, including the nul term */ + memcpy(s,p,l); + s[l] = '\0'; + return l; +} + static void redisLog(int level, const char *fmt, ...) { va_list ap; FILE *fp; @@ -1052,8 +1146,8 @@ static int dictEncObjKeyCompare(void *privdata, const void *key1, int cmp; if (o1->encoding == REDIS_ENCODING_INT && - o2->encoding == REDIS_ENCODING_INT && - o1->ptr == o2->ptr) return 1; + o2->encoding == REDIS_ENCODING_INT) + return o1->ptr == o2->ptr; o1 = getDecodedObject(o1); o2 = getDecodedObject(o2); @@ -1073,7 +1167,7 @@ static unsigned int dictEncObjHash(const void *key) { char buf[32]; int len; - len = snprintf(buf,32,"%ld",(long)o->ptr); + len = ll2string(buf,32,(long)o->ptr); return dictGenHashFunction((unsigned char*)buf, len); } else { unsigned int hash; @@ -1338,6 +1432,13 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD * To access a global var is faster than calling time(NULL) */ server.unixtime = time(NULL); + /* We received a SIGTERM, shutting down here in a safe way, as it is + * not ok doing so inside the signal handler. */ + if (server.shutdown_asap) { + if (prepareForShutdown() == REDIS_OK) exit(0); + redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); + } + /* Show some info about non-empty databases */ for (j = 0; j < server.dbnum; j++) { long long size, used, vkeys; @@ -1467,6 +1568,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD redisLog(REDIS_NOTICE,"Connecting to MASTER..."); if (syncWithMaster() == REDIS_OK) { redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded"); + if (server.appendonly) rewriteAppendOnlyFileBackground(); } } return 100; @@ -1478,6 +1580,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD static void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); + /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; listNode *ln; @@ -1502,6 +1605,8 @@ static void beforeSleep(struct aeEventLoop *eventLoop) { processInputBuffer(c); } } + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(); } static void createSharedObjects(void) { @@ -1542,11 +1647,13 @@ static void createSharedObjects(void) { shared.select8 = createStringObject("select 8\r\n",10); shared.select9 = createStringObject("select 9\r\n",10); shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13); + shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14); shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); shared.mbulk3 = createStringObject("*3\r\n",4); + shared.mbulk4 = createStringObject("*4\r\n",4); for (j = 0; j < REDIS_SHARED_INTEGERS; j++) { shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j); shared.integers[j]->encoding = REDIS_ENCODING_INT; @@ -1577,7 +1684,7 @@ static void initServerConfig() { server.glueoutputbuf = 1; server.daemonize = 0; server.appendonly = 0; - server.appendfsync = APPENDFSYNC_ALWAYS; + server.appendfsync = APPENDFSYNC_EVERYSEC; server.lastfsync = time(NULL); server.appendfd = -1; server.appendseldb = -1; /* Make sure the first time will not match */ @@ -1585,7 +1692,6 @@ static void initServerConfig() { server.dbfilename = zstrdup("dump.rdb"); server.appendfilename = zstrdup("appendonly.aof"); server.requirepass = NULL; - server.shareobjects = 0; server.rdbcompression = 1; server.activerehashing = 1; server.maxclients = 0; @@ -1600,6 +1706,7 @@ static void initServerConfig() { server.vm_blocked_clients = 0; server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES; server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE; + server.shutdown_asap = 0; resetServerSaveParams(); @@ -1648,7 +1755,8 @@ static void initServer() { for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); - server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL); + server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); + server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); if (server.vm_enabled) server.db[j].io_keys = dictCreate(&keylistDictType,NULL); server.db[j].id = j; @@ -1661,6 +1769,7 @@ static void initServer() { server.bgsavechildpid = -1; server.bgrewritechildpid = -1; server.bgrewritebuf = sdsempty(); + server.aofbuf = sdsempty(); server.lastsave = time(NULL); server.dirty = 0; server.stat_numcommands = 0; @@ -1802,7 +1911,7 @@ static void loadServerConfig(char *filename) { } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) { server.maxclients = atoi(argv[1]); } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) { - server.maxmemory = strtoll(argv[1], NULL, 10); + server.maxmemory = memtoll(argv[1],NULL); } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) { server.masterhost = sdsnew(argv[1]); server.masterport = atoi(argv[2]); @@ -1813,10 +1922,6 @@ static void loadServerConfig(char *filename) { if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) { - if ((server.shareobjects = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) { if ((server.rdbcompression = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -1833,6 +1938,9 @@ static void loadServerConfig(char *filename) { if ((server.appendonly = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) { + zfree(server.appendfilename); + server.appendfilename = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) { if (!strcasecmp(argv[1],"no")) { server.appendfsync = APPENDFSYNC_NO; @@ -1860,19 +1968,17 @@ static void loadServerConfig(char *filename) { zfree(server.vm_swap_file); server.vm_swap_file = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) { - server.vm_max_memory = strtoll(argv[1], NULL, 10); + server.vm_max_memory = memtoll(argv[1],NULL); } else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) { - server.vm_page_size = strtoll(argv[1], NULL, 10); + server.vm_page_size = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) { - server.vm_pages = strtoll(argv[1], NULL, 10); + server.vm_pages = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) { server.vm_max_threads = strtoll(argv[1], NULL, 10); } else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2){ - server.hash_max_zipmap_entries = strtol(argv[1], NULL, 10); + server.hash_max_zipmap_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2){ - server.hash_max_zipmap_value = strtol(argv[1], NULL, 10); - } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) { - server.vm_max_threads = strtoll(argv[1], NULL, 10); + server.hash_max_zipmap_value = memtoll(argv[1], NULL); } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -1916,6 +2022,9 @@ static void freeClient(redisClient *c) { if (c->flags & REDIS_BLOCKED) unblockClientWaitingData(c); + /* UNWATCH all the keys */ + unwatchAllKeys(c); + listRelease(c->watched_keys); /* Unsubscribe from all the pubsub channels */ pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeAllPatterns(c,0); @@ -1931,7 +2040,8 @@ static void freeClient(redisClient *c) { ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); - /* Remove from the list of clients waiting for swapped keys */ + /* Remove from the list of clients that are now ready to be restarted + * after waiting for swapped keys */ if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) { ln = listSearchKey(server.io_ready_clients,c); if (ln) { @@ -1939,6 +2049,7 @@ static void freeClient(redisClient *c) { server.vm_blocked_clients--; } } + /* Remove from the list of clients waiting for swapped keys */ while (server.vm_enabled && listLength(c->io_keys)) { ln = listFirst(c->io_keys); dontWaitForSwappedKey(c,ln->value); @@ -2166,7 +2277,7 @@ static void call(redisClient *c, struct redisCommand *cmd) { listLength(server.slaves)) replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc); if (listLength(server.monitors)) - replicationFeedSlaves(server.monitors,c->db->id,c->argv,c->argc); + replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc); server.stat_numcommands++; } @@ -2335,12 +2446,15 @@ static int processCommand(redisClient *c) { } /* Exec the command */ - if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) { + if (c->flags & REDIS_MULTI && + cmd->proc != execCommand && cmd->proc != discardCommand && + cmd->proc != multiCommand && cmd->proc != watchCommand) + { queueMultiCommand(c,cmd); addReply(c,shared.queued); } else { if (server.vm_enabled && server.vm_max_threads > 0 && - blockClientOnSwappedKeys(cmd,c)) return 1; + blockClientOnSwappedKeys(c,cmd)) return 1; call(c,cmd); } @@ -2421,6 +2535,64 @@ static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int arg if (outv != static_outv) zfree(outv); } +static sds sdscatrepr(sds s, char *p, size_t len) { + s = sdscatlen(s,"\"",1); + while(len--) { + switch(*p) { + case '\\': + case '"': + s = sdscatprintf(s,"\\%c",*p); + break; + case '\n': s = sdscatlen(s,"\\n",1); break; + case '\r': s = sdscatlen(s,"\\r",1); break; + case '\t': s = sdscatlen(s,"\\t",1); break; + case '\a': s = sdscatlen(s,"\\a",1); break; + case '\b': s = sdscatlen(s,"\\b",1); break; + default: + if (isprint(*p)) + s = sdscatprintf(s,"%c",*p); + else + s = sdscatprintf(s,"\\x%02x",(unsigned char)*p); + break; + } + p++; + } + return sdscatlen(s,"\"",1); +} + +static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) { + listNode *ln; + listIter li; + int j; + sds cmdrepr = sdsnew("+"); + robj *cmdobj; + struct timeval tv; + + gettimeofday(&tv,NULL); + cmdrepr = sdscatprintf(cmdrepr,"%ld.%ld ",(long)tv.tv_sec,(long)tv.tv_usec); + if (dictid != 0) cmdrepr = sdscatprintf(cmdrepr,"(db %d) ", dictid); + + for (j = 0; j < argc; j++) { + if (argv[j]->encoding == REDIS_ENCODING_INT) { + cmdrepr = sdscatprintf(cmdrepr, "%ld", (long)argv[j]->ptr); + } else { + cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, + sdslen(argv[j]->ptr)); + } + if (j != argc-1) + cmdrepr = sdscatlen(cmdrepr," ",1); + } + cmdrepr = sdscatlen(cmdrepr,"\r\n",2); + cmdobj = createObject(REDIS_STRING,cmdrepr); + + listRewind(monitors,&li); + while((ln = listNext(&li))) { + redisClient *monitor = ln->value; + addReply(monitor,cmdobj); + } + decrRefCount(cmdobj); +} + static void processInputBuffer(redisClient *c) { again: /* Before to process the input buffer, make sure the client is not @@ -2546,7 +2718,7 @@ static void *dupClientReplyValue(void *o) { } static int listMatchObjects(void *a, void *b) { - return compareStringObjects(a,b) == 0; + return equalStringObjects(a,b); } static redisClient *createClient(int fd) { @@ -2572,9 +2744,10 @@ static redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->blockingkeys = NULL; - c->blockingkeysnum = 0; + c->blocking_keys = NULL; + c->blocking_keys_num = 0; c->io_keys = listCreate(); + c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); c->pubsub_channels = dictCreate(&setDictType,NULL); c->pubsub_patterns = listCreate(); @@ -2618,21 +2791,6 @@ static void addReplyDouble(redisClient *c, double d) { (unsigned long) strlen(buf),buf)); } -static void addReplyLong(redisClient *c, long l) { - char buf[128]; - size_t len; - - if (l == 0) { - addReply(c,shared.czero); - return; - } else if (l == 1) { - addReply(c,shared.cone); - return; - } - len = snprintf(buf,sizeof(buf),":%ld\r\n",l); - addReplySds(c,sdsnewlen(buf,len)); -} - static void addReplyLongLong(redisClient *c, long long ll) { char buf[128]; size_t len; @@ -2644,8 +2802,11 @@ static void addReplyLongLong(redisClient *c, long long ll) { addReply(c,shared.cone); return; } - len = snprintf(buf,sizeof(buf),":%lld\r\n",ll); - addReplySds(c,sdsnewlen(buf,len)); + buf[0] = ':'; + len = ll2string(buf+1,sizeof(buf)-1,ll); + buf[len+1] = '\r'; + buf[len+2] = '\n'; + addReplySds(c,sdsnewlen(buf,len+3)); } static void addReplyUlong(redisClient *c, unsigned long ul) { @@ -2664,7 +2825,8 @@ static void addReplyUlong(redisClient *c, unsigned long ul) { } static void addReplyBulkLen(redisClient *c, robj *obj) { - size_t len; + size_t len, intlen; + char buf[128]; if (obj->encoding == REDIS_ENCODING_RAW) { len = sdslen(obj->ptr); @@ -2681,7 +2843,11 @@ static void addReplyBulkLen(redisClient *c, robj *obj) { len++; } } - addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len)); + buf[0] = '$'; + intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len); + buf[intlen+1] = '\r'; + buf[intlen+2] = '\n'; + addReplySds(c,sdsnewlen(buf,intlen+3)); } static void addReplyBulk(redisClient *c, robj *obj) { @@ -2781,12 +2947,12 @@ static robj *createStringObjectFromLongLong(long long value) { incrRefCount(shared.integers[value]); o = shared.integers[value]; } else { - o = createObject(REDIS_STRING, NULL); if (value >= LONG_MIN && value <= LONG_MAX) { + o = createObject(REDIS_STRING, NULL); o->encoding = REDIS_ENCODING_INT; o->ptr = (void*)((long)value); } else { - o->ptr = sdscatprintf(sdsempty(),"%lld",value); + o = createObject(REDIS_STRING,sdsfromlonglong(value)); } } return o; @@ -2950,6 +3116,7 @@ static robj *lookupKeyRead(redisDb *db, robj *key) { static robj *lookupKeyWrite(redisDb *db, robj *key) { deleteIfVolatile(db,key); + touchWatchedKey(db,key); return lookupKey(db,key); } @@ -3001,7 +3168,7 @@ static int isStringRepresentableAsLong(sds s, long *longval) { value = strtol(s, &endptr, 10); if (endptr[0] != '\0') return REDIS_ERR; - slen = snprintf(buf,32,"%ld",value); + slen = ll2string(buf,32,value); /* If the number converted back into a string is not identical * then it's not possible to encode the string as integer */ @@ -3054,17 +3221,17 @@ static robj *getDecodedObject(robj *o) { if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) { char buf[32]; - snprintf(buf,32,"%ld",(long)o->ptr); + ll2string(buf,32,(long)o->ptr); dec = createStringObject(buf,strlen(buf)); return dec; } else { - redisAssert(1 != 1); + redisPanic("Unknown encoding type"); } } /* Compare two string objects via strcmp() or alike. * Note that the objects may be integer-encoded. In such a case we - * use snprintf() to get a string representation of the numbers on the stack + * use ll2string() to get a string representation of the numbers on the stack * and compare the strings, it's much faster than calling getDecodedObject(). * * Important note: if objects are not integer encoded, but binary-safe strings, @@ -3077,14 +3244,14 @@ static int compareStringObjects(robj *a, robj *b) { if (a == b) return 0; if (a->encoding != REDIS_ENCODING_RAW) { - snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr); + ll2string(bufa,sizeof(bufa),(long) a->ptr); astr = bufa; bothsds = 0; } else { astr = a->ptr; } if (b->encoding != REDIS_ENCODING_RAW) { - snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr); + ll2string(bufb,sizeof(bufb),(long) b->ptr); bstr = bufb; bothsds = 0; } else { @@ -3093,6 +3260,18 @@ static int compareStringObjects(robj *a, robj *b) { return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr); } +/* Equal string objects return 1 if the two objects are the same from the + * point of view of a string comparison, otherwise 0 is returned. Note that + * this function is faster then checking for (compareStringObject(a,b) == 0) + * because it can perform some more optimization. */ +static int equalStringObjects(robj *a, robj *b) { + if (a->encoding != REDIS_ENCODING_RAW && b->encoding != REDIS_ENCODING_RAW){ + return a->ptr == b->ptr; + } else { + return compareStringObjects(a,b) == 0; + } +} + static size_t stringObjectLen(robj *o) { redisAssert(o->type == REDIS_STRING); if (o->encoding == REDIS_ENCODING_RAW) { @@ -3100,7 +3279,7 @@ static size_t stringObjectLen(robj *o) { } else { char buf[32]; - return snprintf(buf,32,"%ld",(long)o->ptr); + return ll2string(buf,32,(long)o->ptr); } } @@ -3118,7 +3297,7 @@ static int getDoubleFromObject(robj *o, double *target) { } else if (o->encoding == REDIS_ENCODING_INT) { value = (long)o->ptr; } else { - redisAssert(1 != 1); + redisPanic("Unknown string encoding"); } } @@ -3155,7 +3334,7 @@ static int getLongLongFromObject(robj *o, long long *target) { } else if (o->encoding == REDIS_ENCODING_INT) { value = (long)o->ptr; } else { - redisAssert(1 != 1); + redisPanic("Unknown string encoding"); } } @@ -3231,22 +3410,12 @@ static int rdbSaveLen(FILE *fp, uint32_t len) { return 0; } -/* String objects in the form "2391" "-100" without any space and with a - * range of values that can fit in an 8, 16 or 32 bit signed value can be - * encoded as integers to save space */ -static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) { - long long value; - char *endptr, buf[32]; - - /* Check if it's possible to encode this value as a number */ - value = strtoll(s, &endptr, 10); - if (endptr[0] != '\0') return 0; - snprintf(buf,32,"%lld",value); - - /* If the number converted back into a string is not identical - * then it's not possible to encode the string as integer */ - if (strlen(buf) != len || memcmp(buf,s,len)) return 0; - +/* Encode 'value' as an integer if possible (if integer will fit the + * supported range). If the function sucessful encoded the integer + * then the (up to 5 bytes) encoded representation is written in the + * string pointed by 'enc' and the length is returned. Otherwise + * 0 is returned. */ +static int rdbEncodeInteger(long long value, unsigned char *enc) { /* Finally check if it fits in our ranges */ if (value >= -(1<<7) && value <= (1<<7)-1) { enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8; @@ -3269,6 +3438,25 @@ static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) { } } +/* String objects in the form "2391" "-100" without any space and with a + * range of values that can fit in an 8, 16 or 32 bit signed value can be + * encoded as integers to save space */ +static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) { + long long value; + char *endptr, buf[32]; + + /* Check if it's possible to encode this value as a number */ + value = strtoll(s, &endptr, 10); + if (endptr[0] != '\0') return 0; + ll2string(buf,32,value); + + /* If the number converted back into a string is not identical + * then it's not possible to encode the string as integer */ + if (strlen(buf) != len || memcmp(buf,s,len)) return 0; + + return rdbEncodeInteger(value,enc); +} + static int rdbSaveLzfStringObject(FILE *fp, unsigned char *s, size_t len) { size_t comprlen, outlen; unsigned char byte; @@ -3332,6 +3520,21 @@ static int rdbSaveRawString(FILE *fp, unsigned char *s, size_t len) { static int rdbSaveStringObject(FILE *fp, robj *obj) { int retval; + /* Avoid to decode the object, then encode it again, if the + * object is alrady integer encoded. */ + if (obj->encoding == REDIS_ENCODING_INT) { + long val = (long) obj->ptr; + unsigned char buf[5]; + int enclen; + + if ((enclen = rdbEncodeInteger(val,buf)) > 0) { + if (fwrite(buf,enclen,1,fp) == 0) return -1; + return 0; + } + /* otherwise... fall throught and continue with the usual + * code path. */ + } + /* Avoid incr/decr ref count business when possible. * This plays well with copy-on-write given that we are probably * in a child process (BGSAVE). Also this makes sure key objects @@ -3366,7 +3569,23 @@ static int rdbSaveDoubleValue(FILE *fp, double val) { len = 1; buf[0] = (val < 0) ? 255 : 254; } else { - snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val); +#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL) + /* Check if the float is in a safe range to be casted into a + * long long. We are assuming that long long is 64 bit here. + * Also we are assuming that there are no implementations around where + * double has precision < 52 bit. + * + * Under this assumptions we test if a double is inside an interval + * where casting to long long is safe. Then using two castings we + * make sure the decimal part is zero. If all this is true we use + * integer printing function that is much faster. */ + double min = -4503599627370495; /* (2^52)-1 */ + double max = 4503599627370496; /* -(2^52) */ + if (val > min && val < max && val == ((double)((long long)val))) + ll2string((char*)buf+1,sizeof(buf),(long long)val); + else +#endif + snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val); buf[0] = strlen((char*)buf+1); len = buf[0]+1; } @@ -3650,7 +3869,11 @@ static uint32_t rdbLoadLen(FILE *fp, int *isencoded) { } } -static robj *rdbLoadIntegerObject(FILE *fp, int enctype) { +/* Load an integer-encoded object from file 'fp', with the specified + * encoding type 'enctype'. If encode is true the function may return + * an integer-encoded object as reply, otherwise the returned object + * will always be encoded as a raw string. */ +static robj *rdbLoadIntegerObject(FILE *fp, int enctype, int encode) { unsigned char enc[4]; long long val; @@ -3671,7 +3894,10 @@ static robj *rdbLoadIntegerObject(FILE *fp, int enctype) { val = 0; /* anti-warning */ redisPanic("Unknown RDB integer encoding type"); } - return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val)); + if (encode) + return createStringObjectFromLongLong(val); + else + return createObject(REDIS_STRING,sdsfromlonglong(val)); } static robj *rdbLoadLzfStringObject(FILE*fp) { @@ -3693,7 +3919,7 @@ err: return NULL; } -static robj *rdbLoadStringObject(FILE*fp) { +static robj *rdbGenericLoadStringObject(FILE*fp, int encode) { int isencoded; uint32_t len; sds val; @@ -3704,7 +3930,7 @@ static robj *rdbLoadStringObject(FILE*fp) { case REDIS_RDB_ENC_INT8: case REDIS_RDB_ENC_INT16: case REDIS_RDB_ENC_INT32: - return rdbLoadIntegerObject(fp,len); + return rdbLoadIntegerObject(fp,len,encode); case REDIS_RDB_ENC_LZF: return rdbLoadLzfStringObject(fp); default: @@ -3721,6 +3947,14 @@ static robj *rdbLoadStringObject(FILE*fp) { return createObject(REDIS_STRING,val); } +static robj *rdbLoadStringObject(FILE *fp) { + return rdbGenericLoadStringObject(fp,0); +} + +static robj *rdbLoadEncodedStringObject(FILE *fp) { + return rdbGenericLoadStringObject(fp,1); +} + /* For information about double serialization check rdbSaveDoubleValue() */ static int rdbLoadDoubleValue(FILE *fp, double *val) { char buf[128]; @@ -3747,7 +3981,7 @@ static robj *rdbLoadObject(int type, FILE *fp) { redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp)); if (type == REDIS_STRING) { /* Read string value */ - if ((o = rdbLoadStringObject(fp)) == NULL) return NULL; + if ((o = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; o = tryObjectEncoding(o); } else if (type == REDIS_LIST || type == REDIS_SET) { /* Read list/set value */ @@ -3763,7 +3997,7 @@ static robj *rdbLoadObject(int type, FILE *fp) { while(listlen--) { robj *ele; - if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL; + if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; ele = tryObjectEncoding(ele); if (type == REDIS_LIST) { listAddNodeTail((list*)o->ptr,ele); @@ -3784,7 +4018,7 @@ static robj *rdbLoadObject(int type, FILE *fp) { robj *ele; double *score = zmalloc(sizeof(double)); - if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL; + if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; ele = tryObjectEncoding(ele); if (rdbLoadDoubleValue(fp,score) == -1) return NULL; dictAdd(zs->dict,ele,score); @@ -3837,13 +4071,13 @@ static robj *rdbLoadObject(int type, FILE *fp) { static int rdbLoad(char *filename) { FILE *fp; - robj *keyobj = NULL; uint32_t dbid; int type, retval, rdbver; + int swap_all_values = 0; dict *d = server.db[0].dict; redisDb *db = server.db+0; char buf[1024]; - time_t expiretime = -1, now = time(NULL); + time_t expiretime, now = time(NULL); long long loadedkeys = 0; fp = fopen(filename,"r"); @@ -3862,8 +4096,9 @@ static int rdbLoad(char *filename) { return REDIS_ERR; } while(1) { - robj *o; + robj *key, *val; + expiretime = -1; /* Read type. */ if ((type = rdbLoadType(fp)) == -1) goto eoferr; if (type == REDIS_EXPIRETIME) { @@ -3885,41 +4120,101 @@ static int rdbLoad(char *filename) { continue; } /* Read key */ - if ((keyobj = rdbLoadStringObject(fp)) == NULL) goto eoferr; + if ((key = rdbLoadStringObject(fp)) == NULL) goto eoferr; /* Read value */ - if ((o = rdbLoadObject(type,fp)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,fp)) == NULL) goto eoferr; + /* Check if the key already expired */ + if (expiretime != -1 && expiretime < now) { + decrRefCount(key); + decrRefCount(val); + continue; + } /* Add the new object in the hash table */ - retval = dictAdd(d,keyobj,o); + retval = dictAdd(d,key,val); if (retval == DICT_ERR) { - redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr); + redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", key->ptr); exit(1); } + loadedkeys++; /* Set the expire time if needed */ - if (expiretime != -1) { - setExpire(db,keyobj,expiretime); - /* Delete this key if already expired */ - if (expiretime < now) deleteKey(db,keyobj); - expiretime = -1; - } - keyobj = o = NULL; + if (expiretime != -1) setExpire(db,key,expiretime); + /* Handle swapping while loading big datasets when VM is on */ - loadedkeys++; - if (server.vm_enabled && (loadedkeys % 5000) == 0) { + + /* If we detecter we are hopeless about fitting something in memory + * we just swap every new key on disk. Directly... + * Note that's important to check for this condition before resorting + * to random sampling, otherwise we may try to swap already + * swapped keys. */ + if (swap_all_values) { + dictEntry *de = dictFind(d,key); + + /* de may be NULL since the key already expired */ + if (de) { + key = dictGetEntryKey(de); + val = dictGetEntryVal(de); + + if (vmSwapObjectBlocking(key,val) == REDIS_OK) { + dictGetEntryVal(de) = NULL; + } + } + continue; + } + + /* If we have still some hope of having some value fitting memory + * then we try random sampling. */ + if (!swap_all_values && server.vm_enabled && (loadedkeys % 5000) == 0) { while (zmalloc_used_memory() > server.vm_max_memory) { if (vmSwapOneObjectBlocking() == REDIS_ERR) break; } + if (zmalloc_used_memory() > server.vm_max_memory) + swap_all_values = 1; /* We are already using too much mem */ } } fclose(fp); return REDIS_OK; eoferr: /* unexpected end of file is handled here with a fatal exit */ - if (keyobj) decrRefCount(keyobj); redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); exit(1); return REDIS_ERR; /* Just to avoid warning */ } +/*================================== Shutdown =============================== */ +static int prepareForShutdown() { + redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); + /* Kill the saving child if there is a background saving in progress. + We want to avoid race conditions, for instance our saving child may + overwrite the synchronous saving did by SHUTDOWN. */ + if (server.bgsavechildpid != -1) { + redisLog(REDIS_WARNING,"There is a live saving child. Killing it!"); + kill(server.bgsavechildpid,SIGKILL); + rdbRemoveTempFile(server.bgsavechildpid); + } + if (server.appendonly) { + /* Append only file: fsync() the AOF and exit */ + fsync(server.appendfd); + if (server.vm_enabled) unlink(server.vm_swap_file); + } else { + /* Snapshotting. Perform a SYNC SAVE and exit */ + if (rdbSave(server.dbfilename) == REDIS_OK) { + if (server.daemonize) + unlink(server.pidfile); + redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); + } else { + /* Ooops.. error saving! The best we can do is to continue + * operating. Note that if there was a background saving process, + * in the next cron() Redis will be notified that the background + * saving aborted, handling special stuff like slaves pending for + * synchronization... */ + redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); + return REDIS_ERR; + } + } + redisLog(REDIS_WARNING,"Server exit now, bye bye..."); + return REDIS_OK; +} + /*================================== Commands =============================== */ static void authCommand(redisClient *c) { @@ -3942,40 +4237,56 @@ static void echoCommand(redisClient *c) { /*=================================== Strings =============================== */ -static void setGenericCommand(redisClient *c, int nx) { +static void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire) { int retval; + long seconds = 0; /* initialized to avoid an harmness warning */ + + if (expire) { + if (getLongFromObjectOrReply(c, expire, &seconds, NULL) != REDIS_OK) + return; + if (seconds <= 0) { + addReplySds(c,sdsnew("-ERR invalid expire time in SETEX\r\n")); + return; + } + } - if (nx) deleteIfVolatile(c->db,c->argv[1]); - retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); + touchWatchedKey(c->db,key); + if (nx) deleteIfVolatile(c->db,key); + retval = dictAdd(c->db->dict,key,val); if (retval == DICT_ERR) { if (!nx) { /* If the key is about a swapped value, we want a new key object * to overwrite the old. So we delete the old key in the database. * This will also make sure that swap pages about the old object * will be marked as free. */ - if (server.vm_enabled && deleteIfSwapped(c->db,c->argv[1])) - incrRefCount(c->argv[1]); - dictReplace(c->db->dict,c->argv[1],c->argv[2]); - incrRefCount(c->argv[2]); + if (server.vm_enabled && deleteIfSwapped(c->db,key)) + incrRefCount(key); + dictReplace(c->db->dict,key,val); + incrRefCount(val); } else { addReply(c,shared.czero); return; } } else { - incrRefCount(c->argv[1]); - incrRefCount(c->argv[2]); + incrRefCount(key); + incrRefCount(val); } server.dirty++; - removeExpire(c->db,c->argv[1]); + removeExpire(c->db,key); + if (expire) setExpire(c->db,key,time(NULL)+seconds); addReply(c, nx ? shared.cone : shared.ok); } static void setCommand(redisClient *c) { - setGenericCommand(c,0); + setGenericCommand(c,0,c->argv[1],c->argv[2],NULL); } static void setnxCommand(redisClient *c) { - setGenericCommand(c,1); + setGenericCommand(c,1,c->argv[1],c->argv[2],NULL); +} + +static void setexCommand(redisClient *c) { + setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2]); } static int getGenericCommand(redisClient *c) { @@ -4080,12 +4391,11 @@ static void incrDecrCommand(redisClient *c, long long incr) { robj *o; o = lookupKeyWrite(c->db,c->argv[1]); - - if (getLongLongFromObjectOrReply(c, o, &value, NULL) != REDIS_OK) return; + if (o != NULL && checkType(c,o,REDIS_STRING)) return; + if (getLongLongFromObjectOrReply(c,o,&value,NULL) != REDIS_OK) return; value += incr; - o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value)); - o = tryObjectEncoding(o); + o = createStringObjectFromLongLong(value); retval = dictAdd(c->db->dict,c->argv[1],o); if (retval == DICT_ERR) { dictReplace(c->db->dict,c->argv[1],o); @@ -4211,15 +4521,21 @@ static void delCommand(redisClient *c) { for (j = 1; j < c->argc; j++) { if (deleteKey(c->db,c->argv[j])) { + touchWatchedKey(c->db,c->argv[j]); server.dirty++; deleted++; } } - addReplyLong(c,deleted); + addReplyLongLong(c,deleted); } static void existsCommand(redisClient *c) { - addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero); + expireIfNeeded(c->db,c->argv[1]); + if (dictFind(c->db->dict,c->argv[1])) { + addReply(c, shared.cone); + } else { + addReply(c, shared.czero); + } } static void selectCommand(redisClient *c) { @@ -4234,18 +4550,25 @@ static void selectCommand(redisClient *c) { static void randomkeyCommand(redisClient *c) { dictEntry *de; + robj *key; while(1) { de = dictGetRandomKey(c->db->dict); if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break; } + if (de == NULL) { - addReply(c,shared.plus); - addReply(c,shared.crlf); + addReply(c,shared.nullbulk); + return; + } + + key = dictGetEntryKey(de); + if (server.vm_enabled) { + key = dupStringObject(key); + addReplyBulk(c,key); + decrRefCount(key); } else { - addReply(c,shared.plus); - addReply(c,dictGetEntryKey(de)); - addReply(c,shared.crlf); + addReplyBulk(c,key); } } @@ -4333,40 +4656,9 @@ static void bgsaveCommand(redisClient *c) { } static void shutdownCommand(redisClient *c) { - redisLog(REDIS_WARNING,"User requested shutdown, saving DB..."); - /* Kill the saving child if there is a background saving in progress. - We want to avoid race conditions, for instance our saving child may - overwrite the synchronous saving did by SHUTDOWN. */ - if (server.bgsavechildpid != -1) { - redisLog(REDIS_WARNING,"There is a live saving child. Killing it!"); - kill(server.bgsavechildpid,SIGKILL); - rdbRemoveTempFile(server.bgsavechildpid); - } - if (server.appendonly) { - /* Append only file: fsync() the AOF and exit */ - fsync(server.appendfd); - if (server.vm_enabled) unlink(server.vm_swap_file); + if (prepareForShutdown() == REDIS_OK) exit(0); - } else { - /* Snapshotting. Perform a SYNC SAVE and exit */ - if (rdbSave(server.dbfilename) == REDIS_OK) { - if (server.daemonize) - unlink(server.pidfile); - redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); - redisLog(REDIS_WARNING,"Server exit now, bye bye..."); - if (server.vm_enabled) unlink(server.vm_swap_file); - exit(0); - } else { - /* Ooops.. error saving! The best we can do is to continue - * operating. Note that if there was a background saving process, - * in the next cron() Redis will be notified that the background - * saving aborted, handling special stuff like slaves pending for - * synchronization... */ - redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); - addReplySds(c, - sdsnew("-ERR can't quit, problems saving the DB\r\n")); - } - } + addReplySds(c, sdsnew("-ERR Errors trying to SHUTDOWN. Check logs.\r\n")); } static void renameGenericCommand(redisClient *c, int nx) { @@ -4394,6 +4686,7 @@ static void renameGenericCommand(redisClient *c, int nx) { incrRefCount(c->argv[2]); } deleteKey(c->db,c->argv[1]); + touchWatchedKey(c->db,c->argv[2]); server.dirty++; addReply(c,nx ? shared.cone : shared.ok); } @@ -4489,7 +4782,7 @@ static void pushGenericCommand(redisClient *c, int where) { incrRefCount(c->argv[2]); } server.dirty++; - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(list))); + addReplyLongLong(c,listLength(list)); } static void lpushCommand(redisClient *c) { @@ -4693,7 +4986,7 @@ static void lremCommand(redisClient *c) { robj *ele = listNodeValue(ln); next = fromtail ? ln->prev : ln->next; - if (compareStringObjects(ele,c->argv[3]) == 0) { + if (equalStringObjects(ele,c->argv[3])) { listDelNode(list,ln); server.dirty++; removed++; @@ -4991,7 +5284,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long if (dictSize((dict*)dstset->ptr) > 0) { dictAdd(c->db->dict,dstkey,dstset); incrRefCount(dstkey); - addReplyLong(c,dictSize((dict*)dstset->ptr)); + addReplyLongLong(c,dictSize((dict*)dstset->ptr)); } else { decrRefCount(dstset); addReply(c,shared.czero); @@ -5094,7 +5387,7 @@ static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnu if (dictSize((dict*)dstset->ptr) > 0) { dictAdd(c->db->dict,dstkey,dstset); incrRefCount(dstkey); - addReplyLong(c,dictSize((dict*)dstset->ptr)); + addReplyLongLong(c,dictSize((dict*)dstset->ptr)); } else { decrRefCount(dstset); addReply(c,shared.czero); @@ -5143,8 +5436,10 @@ static zskiplistNode *zslCreateNode(int level, double score, robj *obj) { zskiplistNode *zn = zmalloc(sizeof(*zn)); zn->forward = zmalloc(sizeof(zskiplistNode*) * level); - if (level > 0) + if (level > 1) zn->span = zmalloc(sizeof(unsigned int) * (level - 1)); + else + zn->span = NULL; zn->score = score; zn->obj = obj; return zn; @@ -5297,7 +5592,7 @@ static int zslDelete(zskiplist *zsl, double score, robj *obj) { /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ x = x->forward[0]; - if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) { + if (x && score == x->score && equalStringObjects(x->obj,obj)) { zslDeleteNode(zsl, x, update); zslFreeNode(x); return 1; @@ -5402,7 +5697,7 @@ static unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { } /* x might be equal to zsl->header, so test if obj is non-NULL */ - if (x->obj && compareStringObjects(x->obj,o) == 0) { + if (x->obj && equalStringObjects(x->obj,o)) { return rank; } } @@ -5573,7 +5868,7 @@ static void zremrangebyscoreCommand(redisClient *c) { if (htNeedsResize(zs->dict)) dictResize(zs->dict); if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); server.dirty += deleted; - addReplyLong(c,deleted); + addReplyLongLong(c,deleted); } static void zremrangebyrankCommand(redisClient *c) { @@ -5611,7 +5906,7 @@ static void zremrangebyrankCommand(redisClient *c) { if (htNeedsResize(zs->dict)) dictResize(zs->dict); if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]); server.dirty += deleted; - addReplyLong(c, deleted); + addReplyLongLong(c, deleted); } typedef struct { @@ -5630,6 +5925,7 @@ static int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) { #define REDIS_AGGR_SUM 1 #define REDIS_AGGR_MIN 2 #define REDIS_AGGR_MAX 3 +#define zunionInterDictValue(_e) (dictGetEntryVal(_e) == NULL ? 1.0 : *(double*)dictGetEntryVal(_e)) inline static void zunionInterAggregate(double *target, double val, int aggregate) { if (aggregate == REDIS_AGGR_SUM) { @@ -5645,7 +5941,7 @@ inline static void zunionInterAggregate(double *target, double val, int aggregat } static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { - int i, j, zsetnum; + int i, j, setnum; int aggregate = REDIS_AGGR_SUM; zsetopsrc *src; robj *dstobj; @@ -5653,32 +5949,35 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictIterator *di; dictEntry *de; - /* expect zsetnum input keys to be given */ - zsetnum = atoi(c->argv[2]->ptr); - if (zsetnum < 1) { - addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNION/ZINTER\r\n")); + /* expect setnum input keys to be given */ + setnum = atoi(c->argv[2]->ptr); + if (setnum < 1) { + addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n")); return; } /* test if the expected number of keys would overflow */ - if (3+zsetnum > c->argc) { + if (3+setnum > c->argc) { addReply(c,shared.syntaxerr); return; } /* read keys to be used for input */ - src = zmalloc(sizeof(zsetopsrc) * zsetnum); - for (i = 0, j = 3; i < zsetnum; i++, j++) { - robj *zsetobj = lookupKeyWrite(c->db,c->argv[j]); - if (!zsetobj) { + src = zmalloc(sizeof(zsetopsrc) * setnum); + for (i = 0, j = 3; i < setnum; i++, j++) { + robj *obj = lookupKeyWrite(c->db,c->argv[j]); + if (!obj) { src[i].dict = NULL; } else { - if (zsetobj->type != REDIS_ZSET) { + if (obj->type == REDIS_ZSET) { + src[i].dict = ((zset*)obj->ptr)->dict; + } else if (obj->type == REDIS_SET) { + src[i].dict = (obj->ptr); + } else { zfree(src); addReply(c,shared.wrongtypeerr); return; } - src[i].dict = ((zset*)zsetobj->ptr)->dict; } /* default all weights to 1 */ @@ -5690,9 +5989,9 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { int remaining = c->argc - j; while (remaining) { - if (remaining >= (zsetnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { + if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) { j++; remaining--; - for (i = 0; i < zsetnum; i++, j++, remaining--) { + for (i = 0; i < setnum; i++, j++, remaining--) { if (getDoubleFromObjectOrReply(c, c->argv[j], &src[i].weight, NULL) != REDIS_OK) return; } @@ -5720,7 +6019,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { /* sort sets from the smallest to largest, this will improve our * algorithm's performance */ - qsort(src,zsetnum,sizeof(zsetopsrc), qsortCompareZsetopsrcByCardinality); + qsort(src,setnum,sizeof(zsetopsrc),qsortCompareZsetopsrcByCardinality); dstobj = createZsetObject(); dstzset = dstobj->ptr; @@ -5733,12 +6032,12 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { di = dictGetIterator(src[0].dict); while((de = dictNext(di)) != NULL) { double *score = zmalloc(sizeof(double)), value; - *score = src[0].weight * (*(double*)dictGetEntryVal(de)); + *score = src[0].weight * zunionInterDictValue(de); - for (j = 1; j < zsetnum; j++) { + for (j = 1; j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { - value = src[j].weight * (*(double*)dictGetEntryVal(other)); + value = src[j].weight * zunionInterDictValue(other); zunionInterAggregate(score, value, aggregate); } else { break; @@ -5746,7 +6045,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { } /* skip entry when not present in every source dict */ - if (j != zsetnum) { + if (j != setnum) { zfree(score); } else { robj *o = dictGetEntryKey(de); @@ -5759,7 +6058,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { dictReleaseIterator(di); } } else if (op == REDIS_OP_UNION) { - for (i = 0; i < zsetnum; i++) { + for (i = 0; i < setnum; i++) { if (!src[i].dict) continue; di = dictGetIterator(src[i].dict); @@ -5768,14 +6067,14 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue; double *score = zmalloc(sizeof(double)), value; - *score = src[i].weight * (*(double*)dictGetEntryVal(de)); + *score = src[i].weight * zunionInterDictValue(de); /* because the zsets are sorted by size, its only possible * for sets at larger indices to hold this entry */ - for (j = (i+1); j < zsetnum; j++) { + for (j = (i+1); j < setnum; j++) { dictEntry *other = dictFind(src[j].dict,dictGetEntryKey(de)); if (other) { - value = src[j].weight * (*(double*)dictGetEntryVal(other)); + value = src[j].weight * zunionInterDictValue(other); zunionInterAggregate(score, value, aggregate); } } @@ -5797,7 +6096,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { if (dstzset->zsl->length) { dictAdd(c->db->dict,dstkey,dstobj); incrRefCount(dstkey); - addReplyLong(c, dstzset->zsl->length); + addReplyLongLong(c, dstzset->zsl->length); server.dirty++; } else { decrRefCount(dstobj); @@ -5806,11 +6105,11 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) { zfree(src); } -static void zunionCommand(redisClient *c) { +static void zunionstoreCommand(redisClient *c) { zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION); } -static void zinterCommand(redisClient *c) { +static void zinterstoreCommand(redisClient *c) { zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER); } @@ -5993,7 +6292,7 @@ static void genericZrangebyscoreCommand(redisClient *c, int justcount) { if (limit > 0) limit--; } if (justcount) { - addReplyLong(c,(long)rangelen); + addReplyLongLong(c,(long)rangelen); } else { lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n", withscores ? (rangelen*2) : rangelen); @@ -6063,9 +6362,9 @@ static void zrankGenericCommand(redisClient *c, int reverse) { rank = zslGetRank(zsl, *score, c->argv[2]); if (rank) { if (reverse) { - addReplyLong(c, zsl->length - rank); + addReplyLongLong(c, zsl->length - rank); } else { - addReplyLong(c, rank-1); + addReplyLongLong(c, rank-1); } } else { addReply(c,shared.nullbulk); @@ -6337,12 +6636,11 @@ static void hincrbyCommand(redisClient *c) { if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return; if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; if ((current = hashGet(o,c->argv[2])) != NULL) { - if (current->encoding == REDIS_ENCODING_RAW) - value = strtoll(current->ptr,NULL,10); - else if (current->encoding == REDIS_ENCODING_INT) - value = (long)current->ptr; - else - redisAssert(1 != 1); + if (getLongLongFromObjectOrReply(c,current,&value, + "hash value is not an integer") != REDIS_OK) { + decrRefCount(current); + return; + } decrRefCount(current); } else { value = 0; @@ -6491,12 +6789,14 @@ static void convertToRealHash(robj *o) { static void flushdbCommand(redisClient *c) { server.dirty += dictSize(c->db->dict); + touchWatchedKeysOnFlush(c->db->id); dictEmpty(c->db->dict); dictEmpty(c->db->expires); addReply(c,shared.ok); } static void flushallCommand(redisClient *c) { + touchWatchedKeysOnFlush(-1); server.dirty += emptyDb(); addReply(c,shared.ok); if (server.bgsavechildpid != -1) { @@ -6626,9 +6926,8 @@ static int sortCompare(const void *s1, const void *s2) { cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr); } } else { - /* Compare elements directly. Note that these objects already - * need to be non-encoded (see sortCommand). */ - cmp = strcoll(so1->obj->ptr,so2->obj->ptr); + /* Compare elements directly. */ + cmp = compareStringObjects(so1->obj,so2->obj); } } return server.sort_desc ? -cmp : cmp; @@ -6766,7 +7065,7 @@ static void sortCommand(redisClient *c) { } if (alpha) { - vector[j].u.cmpobj = getDecodedObject(byval); + if (sortby) vector[j].u.cmpobj = getDecodedObject(byval); } else { if (byval->encoding == REDIS_ENCODING_RAW) { vector[j].u.score = strtod(byval->ptr,NULL); @@ -6924,6 +7223,8 @@ static sds genRedisInfoString(void) { bytesToHuman(hmem,zmalloc_used_memory()); info = sdscatprintf(sdsempty(), "redis_version:%s\r\n" + "redis_git_sha1:%s\r\n" + "redis_git_dirty:%d\r\n" "arch_bits:%s\r\n" "multiplexing_api:%s\r\n" "process_id:%ld\r\n" @@ -6941,13 +7242,15 @@ static sds genRedisInfoString(void) { "total_connections_received:%lld\r\n" "total_commands_processed:%lld\r\n" "expired_keys:%lld\r\n" - "hash_max_zipmap_entries:%ld\r\n" - "hash_max_zipmap_value:%ld\r\n" + "hash_max_zipmap_entries:%zu\r\n" + "hash_max_zipmap_value:%zu\r\n" "pubsub_channels:%ld\r\n" "pubsub_patterns:%u\r\n" "vm_enabled:%d\r\n" "role:%s\r\n" ,REDIS_VERSION, + REDIS_GIT_SHA1, + strtol(REDIS_GIT_DIRTY,NULL,10) > 0, (sizeof(long) == 8) ? "64" : "32", aeGetApiName(), (long) getpid(), @@ -7198,6 +7501,10 @@ static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) { } static void multiCommand(redisClient *c) { + if (c->flags & REDIS_MULTI) { + addReplySds(c,sdsnew("-ERR MULTI calls can not be nested\r\n")); + return; + } c->flags |= REDIS_MULTI; addReply(c,shared.ok); } @@ -7214,6 +7521,20 @@ static void discardCommand(redisClient *c) { addReply(c,shared.ok); } +/* Send a MULTI command to all the slaves and AOF file. Check the execCommand + * implememntation for more information. */ +static void execCommandReplicateMulti(redisClient *c) { + struct redisCommand *cmd; + robj *multistring = createStringObject("MULTI",5); + + cmd = lookupCommand("multi"); + if (server.appendonly) + feedAppendOnlyFile(cmd,c->db->id,&multistring,1); + if (listLength(server.slaves)) + replicationFeedSlaves(server.slaves,c->db->id,&multistring,1); + decrRefCount(multistring); +} + static void execCommand(redisClient *c) { int j; robj **orig_argv; @@ -7224,6 +7545,25 @@ static void execCommand(redisClient *c) { return; } + /* Check if we need to abort the EXEC if some WATCHed key was touched. + * A failed EXEC will return a multi bulk nil object. */ + if (c->flags & REDIS_DIRTY_CAS) { + freeClientMultiState(c); + initClientMultiState(c); + c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); + unwatchAllKeys(c); + addReply(c,shared.nullmultibulk); + return; + } + + /* Replicate a MULTI request now that we are sure the block is executed. + * This way we'll deliver the MULTI/..../EXEC block as a whole and + * both the AOF and the replication link will have the same consistency + * and atomicity guarantees. */ + execCommandReplicateMulti(c); + + /* Exec all the queued commands */ + unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; orig_argc = c->argc; addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count)); @@ -7236,7 +7576,11 @@ static void execCommand(redisClient *c) { c->argc = orig_argc; freeClientMultiState(c); initClientMultiState(c); - c->flags &= (~REDIS_MULTI); + c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS); + /* Make sure the EXEC command is always replicated / AOF, since we + * always send the MULTI command (we can't know beforehand if the + * next operations will contain at least a modification to the DB). */ + server.dirty++; } /* =========================== Blocking Operations ========================= */ @@ -7259,7 +7603,7 @@ static void execCommand(redisClient *c) { * empty we need to block. In order to do so we remove the notification for * new data to read in the client socket (so that we'll not serve new * requests if the blocking request is not served). Also we put the client - * in a dictionary (db->blockingkeys) mapping keys to a list of clients + * in a dictionary (db->blocking_keys) mapping keys to a list of clients * blocking for this keys. * - If a PUSH operation against a key with blocked clients waiting is * performed, we serve the first in the list: basically instead to push @@ -7277,22 +7621,22 @@ static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeou list *l; int j; - c->blockingkeys = zmalloc(sizeof(robj*)*numkeys); - c->blockingkeysnum = numkeys; + c->blocking_keys = zmalloc(sizeof(robj*)*numkeys); + c->blocking_keys_num = numkeys; c->blockingto = timeout; for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->blockingkeys[j] = keys[j]; + c->blocking_keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ - de = dictFind(c->db->blockingkeys,keys[j]); + de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); - retval = dictAdd(c->db->blockingkeys,keys[j],l); + retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); assert(retval == DICT_OK); } else { @@ -7311,22 +7655,22 @@ static void unblockClientWaitingData(redisClient *c) { list *l; int j; - assert(c->blockingkeys != NULL); + assert(c->blocking_keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->blockingkeysnum; j++) { + for (j = 0; j < c->blocking_keys_num; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blockingkeys,c->blockingkeys[j]); + de = dictFind(c->db->blocking_keys,c->blocking_keys[j]); assert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blockingkeys,c->blockingkeys[j]); - decrRefCount(c->blockingkeys[j]); + dictDelete(c->db->blocking_keys,c->blocking_keys[j]); + decrRefCount(c->blocking_keys[j]); } /* Cleanup the client structure */ - zfree(c->blockingkeys); - c->blockingkeys = NULL; + zfree(c->blocking_keys); + c->blocking_keys = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -7353,7 +7697,7 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { list *l; listNode *ln; - de = dictFind(c->db->blockingkeys,key); + de = dictFind(c->db->blocking_keys,key); if (de == NULL) return 0; l = dictGetEntryVal(de); ln = listFirst(l); @@ -7859,65 +8203,28 @@ static void freeMemoryIfNeeded(void) { /* ============================== Append Only file ========================== */ -static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { - sds buf = sdsempty(); - int j; - ssize_t nwritten; +/* Write the append only file buffer on disk. + * + * Since we are required to write the AOF before replying to the client, + * and the only way the client socket can get a write is entering when the + * the event loop, we accumulate all the AOF writes in a memory + * buffer and write it on disk using this function just before entering + * the event loop again. */ +static void flushAppendOnlyFile(void) { time_t now; - robj *tmpargv[3]; - - /* The DB this command was targetting is not the same as the last command - * we appendend. To issue a SELECT command is needed. */ - if (dictid != server.appendseldb) { - char seldb[64]; - - snprintf(seldb,sizeof(seldb),"%d",dictid); - buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", - (unsigned long)strlen(seldb),seldb); - server.appendseldb = dictid; - } - - /* "Fix" the argv vector if the command is EXPIRE. We want to translate - * EXPIREs into EXPIREATs calls */ - if (cmd->proc == expireCommand) { - long when; - - tmpargv[0] = createStringObject("EXPIREAT",8); - tmpargv[1] = argv[1]; - incrRefCount(argv[1]); - when = time(NULL)+strtol(argv[2]->ptr,NULL,10); - tmpargv[2] = createObject(REDIS_STRING, - sdscatprintf(sdsempty(),"%ld",when)); - argv = tmpargv; - } - - /* Append the actual command */ - buf = sdscatprintf(buf,"*%d\r\n",argc); - for (j = 0; j < argc; j++) { - robj *o = argv[j]; - - o = getDecodedObject(o); - buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr)); - buf = sdscatlen(buf,o->ptr,sdslen(o->ptr)); - buf = sdscatlen(buf,"\r\n",2); - decrRefCount(o); - } + ssize_t nwritten; - /* Free the objects from the modified argv for EXPIREAT */ - if (cmd->proc == expireCommand) { - for (j = 0; j < 3; j++) - decrRefCount(argv[j]); - } + if (sdslen(server.aofbuf) == 0) return; /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ - nwritten = write(server.appendfd,buf,sdslen(buf)); - if (nwritten != (signed)sdslen(buf)) { + nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); + if (nwritten != (signed)sdslen(server.aofbuf)) { /* Ooops, we are in troubles. The best thing to do for now is - * to simply exit instead to give the illusion that everything is + * aborting instead of giving the illusion that everything is * working as expected. */ if (nwritten == -1) { redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); @@ -7926,24 +8233,100 @@ static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv } exit(1); } - /* If a background append only file rewriting is in progress we want to - * accumulate the differences between the child DB and the current one - * in a buffer, so that when the child process will do its work we - * can append the differences to the new append only file. */ - if (server.bgrewritechildpid != -1) - server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); + sdsfree(server.aofbuf); + server.aofbuf = sdsempty(); - sdsfree(buf); + /* Fsync if needed */ now = time(NULL); if (server.appendfsync == APPENDFSYNC_ALWAYS || (server.appendfsync == APPENDFSYNC_EVERYSEC && now-server.lastfsync > 1)) { - fsync(server.appendfd); /* Let's try to get this data on the disk */ + /* aof_fsync is defined as fdatasync() for Linux in order to avoid + * flushing metadata. */ + aof_fsync(server.appendfd); /* Let's try to get this data on the disk */ server.lastfsync = now; } } +static sds catAppendOnlyGenericCommand(sds buf, int argc, robj **argv) { + int j; + buf = sdscatprintf(buf,"*%d\r\n",argc); + for (j = 0; j < argc; j++) { + robj *o = getDecodedObject(argv[j]); + buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr)); + buf = sdscatlen(buf,o->ptr,sdslen(o->ptr)); + buf = sdscatlen(buf,"\r\n",2); + decrRefCount(o); + } + return buf; +} + +static sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) { + int argc = 3; + long when; + robj *argv[3]; + + /* Make sure we can use strtol */ + seconds = getDecodedObject(seconds); + when = time(NULL)+strtol(seconds->ptr,NULL,10); + decrRefCount(seconds); + + argv[0] = createStringObject("EXPIREAT",8); + argv[1] = key; + argv[2] = createObject(REDIS_STRING, + sdscatprintf(sdsempty(),"%ld",when)); + buf = catAppendOnlyGenericCommand(buf, argc, argv); + decrRefCount(argv[0]); + decrRefCount(argv[2]); + return buf; +} + +static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { + sds buf = sdsempty(); + robj *tmpargv[3]; + + /* The DB this command was targetting is not the same as the last command + * we appendend. To issue a SELECT command is needed. */ + if (dictid != server.appendseldb) { + char seldb[64]; + + snprintf(seldb,sizeof(seldb),"%d",dictid); + buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", + (unsigned long)strlen(seldb),seldb); + server.appendseldb = dictid; + } + + if (cmd->proc == expireCommand) { + /* Translate EXPIRE into EXPIREAT */ + buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]); + } else if (cmd->proc == setexCommand) { + /* Translate SETEX to SET and EXPIREAT */ + tmpargv[0] = createStringObject("SET",3); + tmpargv[1] = argv[1]; + tmpargv[2] = argv[3]; + buf = catAppendOnlyGenericCommand(buf,3,tmpargv); + decrRefCount(tmpargv[0]); + buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]); + } else { + buf = catAppendOnlyGenericCommand(buf,argc,argv); + } + + /* Append to the AOF buffer. This will be flushed on disk just before + * of re-entering the event loop, so before the client will get a + * positive reply about the operation performed. */ + server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); + + /* If a background append only file rewriting is in progress we want to + * accumulate the differences between the child DB and the current one + * in a buffer, so that when the child process will do its work we + * can append the differences to the new append only file. */ + if (server.bgrewritechildpid != -1) + server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); + + sdsfree(buf); +} + /* In Redis commands are always executed in the context of a client, so in * order to load the append only file we need to create a fake client. */ static struct redisClient *createFakeClient(void) { @@ -7961,12 +8344,14 @@ static struct redisClient *createFakeClient(void) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); + initClientMultiState(c); return c; } static void freeFakeClient(struct redisClient *c) { sdsfree(c->querybuf); listRelease(c->reply); + freeClientMultiState(c); zfree(c); } @@ -7978,6 +8363,7 @@ int loadAppendOnlyFile(char *filename) { FILE *fp = fopen(filename,"r"); struct redis_stat sb; unsigned long long loadedkeys = 0; + int appendonly = server.appendonly; if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) return REDIS_ERR; @@ -7987,6 +8373,10 @@ int loadAppendOnlyFile(char *filename) { exit(1); } + /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI + * to the same file we're about to read. */ + server.appendonly = 0; + fakeClient = createFakeClient(); while(1) { int argc, j; @@ -8042,8 +8432,14 @@ int loadAppendOnlyFile(char *filename) { } } } + + /* This point can only be reached when EOF is reached without errors. + * If the client is in the middle of a MULTI/EXEC, log error and quit. */ + if (fakeClient->flags & REDIS_MULTI) goto readerr; + fclose(fp); freeFakeClient(fakeClient); + server.appendonly = appendonly; return REDIS_OK; readerr: @@ -8391,44 +8787,82 @@ static void aofRemoveTempFile(pid_t childpid) { * as a fully non-blocking VM. */ -/* =================== Virtual Memory - Blocking Side ====================== */ +/* Called when the user switches from "appendonly yes" to "appendonly no" + * at runtime using the CONFIG command. */ +static void stopAppendOnly(void) { + flushAppendOnlyFile(); + fsync(server.appendfd); + close(server.appendfd); -/* substitute the first occurrence of '%p' with the process pid in the - * swap file name. */ -static void expandVmSwapFilename(void) { - char *p = strstr(server.vm_swap_file,"%p"); - sds new; + server.appendfd = -1; + server.appendseldb = -1; + server.appendonly = 0; + /* rewrite operation in progress? kill it, wait child exit */ + if (server.bgsavechildpid != -1) { + int statloc; - if (!p) return; - new = sdsempty(); - *p = '\0'; - new = sdscat(new,server.vm_swap_file); - new = sdscatprintf(new,"%ld",(long) getpid()); - new = sdscat(new,p+2); - zfree(server.vm_swap_file); - server.vm_swap_file = new; + if (kill(server.bgsavechildpid,SIGKILL) != -1) + wait3(&statloc,0,NULL); + /* reset the buffer accumulating changes while the child saves */ + sdsfree(server.bgrewritebuf); + server.bgrewritebuf = sdsempty(); + server.bgsavechildpid = -1; + } } +/* Called when the user switches from "appendonly no" to "appendonly yes" + * at runtime using the CONFIG command. */ +static int startAppendOnly(void) { + server.appendonly = 1; + server.lastfsync = time(NULL); + server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); + if (server.appendfd == -1) { + redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno)); + return REDIS_ERR; + } + if (rewriteAppendOnlyFileBackground() == REDIS_ERR) { + server.appendonly = 0; + close(server.appendfd); + redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno)); + return REDIS_ERR; + } + return REDIS_OK; +} + +/* =================== Virtual Memory - Blocking Side ====================== */ + static void vmInit(void) { off_t totsize; int pipefds[2]; size_t stacksize; + struct flock fl; if (server.vm_max_threads != 0) zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ - expandVmSwapFilename(); redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file); + /* Try to open the old swap file, otherwise create it */ if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) { server.vm_fp = fopen(server.vm_swap_file,"w+b"); } if (server.vm_fp == NULL) { redisLog(REDIS_WARNING, - "Impossible to open the swap file: %s. Exiting.", + "Can't open the swap file: %s. Exiting.", strerror(errno)); exit(1); } server.vm_fd = fileno(server.vm_fp); + /* Lock the swap file for writing, this is useful in order to avoid + * another instance to use the same swap file for a config error. */ + fl.l_type = F_WRLCK; + fl.l_whence = SEEK_SET; + fl.l_start = fl.l_len = 0; + if (fcntl(server.vm_fd,F_SETLK,&fl) == -1) { + redisLog(REDIS_WARNING, + "Can't lock the swap file at '%s': %s. Make sure it is not used by another Redis instance.", server.vm_swap_file, strerror(errno)); + exit(1); + } + /* Initialize */ server.vm_next_page = 0; server.vm_near_pages = 0; server.vm_stats_used_pages = 0; @@ -9354,12 +9788,56 @@ static int waitForSwappedKey(redisClient *c, robj *key) { return 1; } -/* Preload keys needed for the ZUNION and ZINTER commands. */ -static void zunionInterBlockClientOnSwappedKeys(redisClient *c) { +/* Preload keys for any command with first, last and step values for + * the command keys prototype, as defined in the command table. */ +static void waitForMultipleSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { + int j, last; + if (cmd->vm_firstkey == 0) return; + last = cmd->vm_lastkey; + if (last < 0) last = argc+last; + for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) { + redisAssert(j < argc); + waitForSwappedKey(c,argv[j]); + } +} + +/* Preload keys needed for the ZUNIONSTORE and ZINTERSTORE commands. + * Note that the number of keys to preload is user-defined, so we need to + * apply a sanity check against argc. */ +static void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { int i, num; - num = atoi(c->argv[2]->ptr); + REDIS_NOTUSED(cmd); + + num = atoi(argv[2]->ptr); + if (num > (argc-3)) return; for (i = 0; i < num; i++) { - waitForSwappedKey(c,c->argv[3+i]); + waitForSwappedKey(c,argv[3+i]); + } +} + +/* Preload keys needed to execute the entire MULTI/EXEC block. + * + * This function is called by blockClientOnSwappedKeys when EXEC is issued, + * and will block the client when any command requires a swapped out value. */ +static void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { + int i, margc; + struct redisCommand *mcmd; + robj **margv; + REDIS_NOTUSED(cmd); + REDIS_NOTUSED(argc); + REDIS_NOTUSED(argv); + + if (!(c->flags & REDIS_MULTI)) return; + for (i = 0; i < c->mstate.count; i++) { + mcmd = c->mstate.commands[i].cmd; + margc = c->mstate.commands[i].argc; + margv = c->mstate.commands[i].argv; + + if (mcmd->vm_preload_proc != NULL) { + mcmd->vm_preload_proc(c,mcmd,margc,margv); + } else { + waitForMultipleSwappedKeys(c,mcmd,margc,margv); + } } } @@ -9373,17 +9851,11 @@ static void zunionInterBlockClientOnSwappedKeys(redisClient *c) { * * Return 1 if the client is marked as blocked, 0 if the client can * continue as the keys it is going to access appear to be in memory. */ -static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c) { - int j, last; - +static int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) { if (cmd->vm_preload_proc != NULL) { - cmd->vm_preload_proc(c); + cmd->vm_preload_proc(c,cmd,c->argc,c->argv); } else { - if (cmd->vm_firstkey == 0) return 0; - last = cmd->vm_lastkey; - if (last < 0) last = c->argc+last; - for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) - waitForSwappedKey(c,c->argv[j]); + waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv); } /* If the client was blocked for at least one key, mark it as blocked. */ @@ -9410,7 +9882,7 @@ static int dontWaitForSwappedKey(redisClient *c, robj *key) { /* Remove the key from the list of keys this client is waiting for. */ listRewind(c->io_keys,&li); while ((ln = listNext(&li)) != NULL) { - if (compareStringObjects(ln->value,key) == 0) { + if (equalStringObjects(ln->value,key)) { listDelNode(c->io_keys,ln); break; } @@ -9459,6 +9931,8 @@ static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) { static void configSetCommand(redisClient *c) { robj *o = getDecodedObject(c->argv[3]); + long long ll; + if (!strcasecmp(c->argv[2]->ptr,"dbfilename")) { zfree(server.dbfilename); server.dbfilename = zstrdup(o->ptr); @@ -9469,7 +9943,74 @@ static void configSetCommand(redisClient *c) { zfree(server.masterauth); server.masterauth = zstrdup(o->ptr); } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) { - server.maxmemory = strtoll(o->ptr, NULL, 10); + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll < 0) goto badfmt; + server.maxmemory = ll; + } else if (!strcasecmp(c->argv[2]->ptr,"timeout")) { + if (getLongLongFromObject(o,&ll) == REDIS_ERR || + ll < 0 || ll > LONG_MAX) goto badfmt; + server.maxidletime = ll; + } else if (!strcasecmp(c->argv[2]->ptr,"appendfsync")) { + if (!strcasecmp(o->ptr,"no")) { + server.appendfsync = APPENDFSYNC_NO; + } else if (!strcasecmp(o->ptr,"everysec")) { + server.appendfsync = APPENDFSYNC_EVERYSEC; + } else if (!strcasecmp(o->ptr,"always")) { + server.appendfsync = APPENDFSYNC_ALWAYS; + } else { + goto badfmt; + } + } else if (!strcasecmp(c->argv[2]->ptr,"appendonly")) { + int old = server.appendonly; + int new = yesnotoi(o->ptr); + + if (new == -1) goto badfmt; + if (old != new) { + if (new == 0) { + stopAppendOnly(); + } else { + if (startAppendOnly() == REDIS_ERR) { + addReplySds(c,sdscatprintf(sdsempty(), + "-ERR Unable to turn on AOF. Check server logs.\r\n")); + decrRefCount(o); + return; + } + } + } + } else if (!strcasecmp(c->argv[2]->ptr,"save")) { + int vlen, j; + sds *v = sdssplitlen(o->ptr,sdslen(o->ptr)," ",1,&vlen); + + /* Perform sanity check before setting the new config: + * - Even number of args + * - Seconds >= 1, changes >= 0 */ + if (vlen & 1) { + sdsfreesplitres(v,vlen); + goto badfmt; + } + for (j = 0; j < vlen; j++) { + char *eptr; + long val; + + val = strtoll(v[j], &eptr, 10); + if (eptr[0] != '\0' || + ((j & 1) == 0 && val < 1) || + ((j & 1) == 1 && val < 0)) { + sdsfreesplitres(v,vlen); + goto badfmt; + } + } + /* Finally set the new config */ + resetServerSaveParams(); + for (j = 0; j < vlen; j += 2) { + time_t seconds; + int changes; + + seconds = strtoll(v[j],NULL,10); + changes = strtoll(v[j+1],NULL,10); + appendServerSaveParams(seconds, changes); + } + sdsfreesplitres(v,vlen); } else { addReplySds(c,sdscatprintf(sdsempty(), "-ERR not supported CONFIG parameter %s\r\n", @@ -9479,6 +10020,14 @@ static void configSetCommand(redisClient *c) { } decrRefCount(o); addReply(c,shared.ok); + return; + +badfmt: /* Bad format errors */ + addReplySds(c,sdscatprintf(sdsempty(), + "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n", + (char*)o->ptr, + (char*)c->argv[2]->ptr)); + decrRefCount(o); } static void configGetCommand(redisClient *c) { @@ -9508,11 +10057,53 @@ static void configGetCommand(redisClient *c) { if (stringmatch(pattern,"maxmemory",0)) { char buf[128]; - snprintf(buf,128,"%llu\n",server.maxmemory); + ll2string(buf,128,server.maxmemory); addReplyBulkCString(c,"maxmemory"); addReplyBulkCString(c,buf); matches++; } + if (stringmatch(pattern,"timeout",0)) { + char buf[128]; + + ll2string(buf,128,server.maxidletime); + addReplyBulkCString(c,"timeout"); + addReplyBulkCString(c,buf); + matches++; + } + if (stringmatch(pattern,"appendonly",0)) { + addReplyBulkCString(c,"appendonly"); + addReplyBulkCString(c,server.appendonly ? "yes" : "no"); + matches++; + } + if (stringmatch(pattern,"appendfsync",0)) { + char *policy; + + switch(server.appendfsync) { + case APPENDFSYNC_NO: policy = "no"; break; + case APPENDFSYNC_EVERYSEC: policy = "everysec"; break; + case APPENDFSYNC_ALWAYS: policy = "always"; break; + default: policy = "unknown"; break; /* too harmless to panic */ + } + addReplyBulkCString(c,"appendfsync"); + addReplyBulkCString(c,policy); + matches++; + } + if (stringmatch(pattern,"save",0)) { + sds buf = sdsempty(); + int j; + + for (j = 0; j < server.saveparamslen; j++) { + buf = sdscatprintf(buf,"%ld %d", + server.saveparams[j].seconds, + server.saveparams[j].changes); + if (j != server.saveparamslen-1) + buf = sdscatlen(buf," ",1); + } + addReplyBulkCString(c,"save"); + addReplyBulkCString(c,buf); + sdsfree(buf); + matches++; + } decrRefCount(o); lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2); } @@ -9556,7 +10147,7 @@ static int listMatchPubsubPattern(void *a, void *b) { pubsubPattern *pa = a, *pb = b; return (pa->client == pb->client) && - (compareStringObjects(pa->pattern,pb->pattern) == 0); + (equalStringObjects(pa->pattern,pb->pattern)); } /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or @@ -9585,7 +10176,7 @@ static int pubsubSubscribeChannel(redisClient *c, robj *channel) { addReply(c,shared.mbulk3); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); - addReplyLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); + addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); return retval; } @@ -9621,7 +10212,7 @@ static int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { addReply(c,shared.mbulk3); addReply(c,shared.unsubscribebulk); addReplyBulk(c,channel); - addReplyLong(c,dictSize(c->pubsub_channels)+ + addReplyLongLong(c,dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns)); } @@ -9647,7 +10238,7 @@ static int pubsubSubscribePattern(redisClient *c, robj *pattern) { addReply(c,shared.mbulk3); addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); - addReplyLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); + addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); return retval; } @@ -9672,7 +10263,7 @@ static int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { addReply(c,shared.mbulk3); addReply(c,shared.punsubscribebulk); addReplyBulk(c,pattern); - addReplyLong(c,dictSize(c->pubsub_channels)+ + addReplyLongLong(c,dictSize(c->pubsub_channels)+ listLength(c->pubsub_patterns)); } decrRefCount(pattern); @@ -9747,8 +10338,9 @@ static int pubsubPublishMessage(robj *channel, robj *message) { sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { - addReply(pat->client,shared.mbulk3); - addReply(pat->client,shared.messagebulk); + addReply(pat->client,shared.mbulk4); + addReply(pat->client,shared.pmessagebulk); + addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++; @@ -9799,11 +10391,330 @@ static void punsubscribeCommand(redisClient *c) { static void publishCommand(redisClient *c) { int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); - addReplyLong(c,receivers); + addReplyLongLong(c,receivers); +} + +/* ===================== WATCH (CAS alike for MULTI/EXEC) =================== + * + * The implementation uses a per-DB hash table mapping keys to list of clients + * WATCHing those keys, so that given a key that is going to be modified + * we can mark all the associated clients as dirty. + * + * Also every client contains a list of WATCHed keys so that's possible to + * un-watch such keys when the client is freed or when UNWATCH is called. */ + +/* In the client->watched_keys list we need to use watchedKey structures + * as in order to identify a key in Redis we need both the key name and the + * DB */ +typedef struct watchedKey { + robj *key; + redisDb *db; +} watchedKey; + +/* Watch for the specified key */ +static void watchForKey(redisClient *c, robj *key) { + list *clients = NULL; + listIter li; + listNode *ln; + watchedKey *wk; + + /* Check if we are already watching for this key */ + listRewind(c->watched_keys,&li); + while((ln = listNext(&li))) { + wk = listNodeValue(ln); + if (wk->db == c->db && equalStringObjects(key,wk->key)) + return; /* Key already watched */ + } + /* This key is not already watched in this DB. Let's add it */ + clients = dictFetchValue(c->db->watched_keys,key); + if (!clients) { + clients = listCreate(); + dictAdd(c->db->watched_keys,key,clients); + incrRefCount(key); + } + listAddNodeTail(clients,c); + /* Add the new key to the lits of keys watched by this client */ + wk = zmalloc(sizeof(*wk)); + wk->key = key; + wk->db = c->db; + incrRefCount(key); + listAddNodeTail(c->watched_keys,wk); +} + +/* Unwatch all the keys watched by this client. To clean the EXEC dirty + * flag is up to the caller. */ +static void unwatchAllKeys(redisClient *c) { + listIter li; + listNode *ln; + + if (listLength(c->watched_keys) == 0) return; + listRewind(c->watched_keys,&li); + while((ln = listNext(&li))) { + list *clients; + watchedKey *wk; + + /* Lookup the watched key -> clients list and remove the client + * from the list */ + wk = listNodeValue(ln); + clients = dictFetchValue(wk->db->watched_keys, wk->key); + assert(clients != NULL); + listDelNode(clients,listSearchKey(clients,c)); + /* Kill the entry at all if this was the only client */ + if (listLength(clients) == 0) + dictDelete(wk->db->watched_keys, wk->key); + /* Remove this watched key from the client->watched list */ + listDelNode(c->watched_keys,ln); + decrRefCount(wk->key); + zfree(wk); + } +} + +/* "Touch" a key, so that if this key is being WATCHed by some client the + * next EXEC will fail. */ +static void touchWatchedKey(redisDb *db, robj *key) { + list *clients; + listIter li; + listNode *ln; + + if (dictSize(db->watched_keys) == 0) return; + clients = dictFetchValue(db->watched_keys, key); + if (!clients) return; + + /* Mark all the clients watching this key as REDIS_DIRTY_CAS */ + /* Check if we are already watching for this key */ + listRewind(clients,&li); + while((ln = listNext(&li))) { + redisClient *c = listNodeValue(ln); + + c->flags |= REDIS_DIRTY_CAS; + } +} + +/* On FLUSHDB or FLUSHALL all the watched keys that are present before the + * flush but will be deleted as effect of the flushing operation should + * be touched. "dbid" is the DB that's getting the flush. -1 if it is + * a FLUSHALL operation (all the DBs flushed). */ +static void touchWatchedKeysOnFlush(int dbid) { + listIter li1, li2; + listNode *ln; + + /* For every client, check all the waited keys */ + listRewind(server.clients,&li1); + while((ln = listNext(&li1))) { + redisClient *c = listNodeValue(ln); + listRewind(c->watched_keys,&li2); + while((ln = listNext(&li2))) { + watchedKey *wk = listNodeValue(ln); + + /* For every watched key matching the specified DB, if the + * key exists, mark the client as dirty, as the key will be + * removed. */ + if (dbid == -1 || wk->db->id == dbid) { + if (dictFind(wk->db->dict, wk->key) != NULL) + c->flags |= REDIS_DIRTY_CAS; + } + } + } +} + +static void watchCommand(redisClient *c) { + int j; + + if (c->flags & REDIS_MULTI) { + addReplySds(c,sdsnew("-ERR WATCH inside MULTI is not allowed\r\n")); + return; + } + for (j = 1; j < c->argc; j++) + watchForKey(c,c->argv[j]); + addReply(c,shared.ok); +} + +static void unwatchCommand(redisClient *c) { + unwatchAllKeys(c); + c->flags &= (~REDIS_DIRTY_CAS); + addReply(c,shared.ok); } /* ================================= Debugging ============================== */ +/* Compute the sha1 of string at 's' with 'len' bytes long. + * The SHA1 is then xored againt the string pointed by digest. + * Since xor is commutative, this operation is used in order to + * "add" digests relative to unordered elements. + * + * So digest(a,b,c,d) will be the same of digest(b,a,c,d) */ +static void xorDigest(unsigned char *digest, void *ptr, size_t len) { + SHA1_CTX ctx; + unsigned char hash[20], *s = ptr; + int j; + + SHA1Init(&ctx); + SHA1Update(&ctx,s,len); + SHA1Final(hash,&ctx); + + for (j = 0; j < 20; j++) + digest[j] ^= hash[j]; +} + +static void xorObjectDigest(unsigned char *digest, robj *o) { + o = getDecodedObject(o); + xorDigest(digest,o->ptr,sdslen(o->ptr)); + decrRefCount(o); +} + +/* This function instead of just computing the SHA1 and xoring it + * against diget, also perform the digest of "digest" itself and + * replace the old value with the new one. + * + * So the final digest will be: + * + * digest = SHA1(digest xor SHA1(data)) + * + * This function is used every time we want to preserve the order so + * that digest(a,b,c,d) will be different than digest(b,c,d,a) + * + * Also note that mixdigest("foo") followed by mixdigest("bar") + * will lead to a different digest compared to "fo", "obar". + */ +static void mixDigest(unsigned char *digest, void *ptr, size_t len) { + SHA1_CTX ctx; + char *s = ptr; + + xorDigest(digest,s,len); + SHA1Init(&ctx); + SHA1Update(&ctx,digest,20); + SHA1Final(digest,&ctx); +} + +static void mixObjectDigest(unsigned char *digest, robj *o) { + o = getDecodedObject(o); + mixDigest(digest,o->ptr,sdslen(o->ptr)); + decrRefCount(o); +} + +/* Compute the dataset digest. Since keys, sets elements, hashes elements + * are not ordered, we use a trick: every aggregate digest is the xor + * of the digests of their elements. This way the order will not change + * the result. For list instead we use a feedback entering the output digest + * as input in order to ensure that a different ordered list will result in + * a different digest. */ +static void computeDatasetDigest(unsigned char *final) { + unsigned char digest[20]; + char buf[128]; + dictIterator *di = NULL; + dictEntry *de; + int j; + uint32_t aux; + + memset(final,0,20); /* Start with a clean result */ + + for (j = 0; j < server.dbnum; j++) { + redisDb *db = server.db+j; + + if (dictSize(db->dict) == 0) continue; + di = dictGetIterator(db->dict); + + /* hash the DB id, so the same dataset moved in a different + * DB will lead to a different digest */ + aux = htonl(j); + mixDigest(final,&aux,sizeof(aux)); + + /* Iterate this DB writing every entry */ + while((de = dictNext(di)) != NULL) { + robj *key, *o, *kcopy; + time_t expiretime; + + memset(digest,0,20); /* This key-val digest */ + key = dictGetEntryKey(de); + + if (!server.vm_enabled) { + mixObjectDigest(digest,key); + o = dictGetEntryVal(de); + } else { + /* Don't work with the key directly as when VM is active + * this is unsafe: TODO: fix decrRefCount to check if the + * count really reached 0 to avoid this mess */ + kcopy = dupStringObject(key); + mixObjectDigest(digest,kcopy); + o = lookupKeyRead(db,kcopy); + decrRefCount(kcopy); + } + aux = htonl(o->type); + mixDigest(digest,&aux,sizeof(aux)); + expiretime = getExpire(db,key); + + /* Save the key and associated value */ + if (o->type == REDIS_STRING) { + mixObjectDigest(digest,o); + } else if (o->type == REDIS_LIST) { + list *list = o->ptr; + listNode *ln; + listIter li; + + listRewind(list,&li); + while((ln = listNext(&li))) { + robj *eleobj = listNodeValue(ln); + + mixObjectDigest(digest,eleobj); + } + } else if (o->type == REDIS_SET) { + dict *set = o->ptr; + dictIterator *di = dictGetIterator(set); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetEntryKey(de); + + xorObjectDigest(digest,eleobj); + } + dictReleaseIterator(di); + } else if (o->type == REDIS_ZSET) { + zset *zs = o->ptr; + dictIterator *di = dictGetIterator(zs->dict); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetEntryKey(de); + double *score = dictGetEntryVal(de); + unsigned char eledigest[20]; + + snprintf(buf,sizeof(buf),"%.17g",*score); + memset(eledigest,0,20); + mixObjectDigest(eledigest,eleobj); + mixDigest(eledigest,buf,strlen(buf)); + xorDigest(digest,eledigest,20); + } + dictReleaseIterator(di); + } else if (o->type == REDIS_HASH) { + hashIterator *hi; + robj *obj; + + hi = hashInitIterator(o); + while (hashNext(hi) != REDIS_ERR) { + unsigned char eledigest[20]; + + memset(eledigest,0,20); + obj = hashCurrent(hi,REDIS_HASH_KEY); + mixObjectDigest(eledigest,obj); + decrRefCount(obj); + obj = hashCurrent(hi,REDIS_HASH_VALUE); + mixObjectDigest(eledigest,obj); + decrRefCount(obj); + xorDigest(digest,eledigest,20); + } + hashReleaseIterator(hi); + } else { + redisPanic("Unknown object type"); + } + /* If the key has an expire, add it to the mix */ + if (expiretime != -1) xorDigest(digest,"!!expire!!",10); + /* We can finally xor the key-val digest to the final digest */ + xorDigest(final,digest,20); + } + dictReleaseIterator(di); + } +} + static void debugCommand(redisClient *c) { if (!strcasecmp(c->argv[1]->ptr,"segfault")) { *((char*)-1) = 'x'; @@ -9892,6 +10803,36 @@ static void debugCommand(redisClient *c) { } else { addReply(c,shared.err); } + } else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) { + long keys, j; + robj *key, *val; + char buf[128]; + + if (getLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != REDIS_OK) + return; + for (j = 0; j < keys; j++) { + snprintf(buf,sizeof(buf),"key:%lu",j); + key = createStringObject(buf,strlen(buf)); + if (lookupKeyRead(c->db,key) != NULL) { + decrRefCount(key); + continue; + } + snprintf(buf,sizeof(buf),"value:%lu",j); + val = createStringObject(buf,strlen(buf)); + dictAdd(c->db->dict,key,val); + } + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) { + unsigned char digest[20]; + sds d = sdsnew("+"); + int j; + + computeDatasetDigest(digest); + for (j = 0; j < 20; j++) + d = sdscatprintf(d, "%02x",digest[j]); + + d = sdscatlen(d,"\r\n",2); + addReplySds(c,d); } else { addReplySds(c,sdsnew( "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT |SWAPIN |SWAPOUT |RELOAD]\r\n")); @@ -9900,7 +10841,7 @@ static void debugCommand(redisClient *c) { static void _redisAssert(char *estr, char *file, int line) { redisLog(REDIS_WARNING,"=== ASSERTION FAILED ==="); - redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true\n",file,line,estr); + redisLog(REDIS_WARNING,"==> %s:%d '%s' is not true",file,line,estr); #ifdef HAVE_BACKTRACE redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)"); *((char*)-1) = 'x'; @@ -9935,7 +10876,7 @@ int linuxOvercommitMemoryValue(void) { void linuxOvercommitMemoryWarning(void) { if (linuxOvercommitMemoryValue() == 0) { - redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); + redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect."); } } #endif /* __linux__ */ @@ -10079,6 +11020,13 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) { _exit(0); } +static void sigtermHandler(int sig) { + REDIS_NOTUSED(sig); + + redisLog(REDIS_WARNING,"SIGTERM received, scheduling shutting down..."); + server.shutdown_asap = 1; +} + static void setupSigSegvAction(void) { struct sigaction act; @@ -10092,6 +11040,10 @@ static void setupSigSegvAction(void) { sigaction (SIGFPE, &act, NULL); sigaction (SIGILL, &act, NULL); sigaction (SIGBUS, &act, NULL); + + act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND; + act.sa_handler = sigtermHandler; + sigaction (SIGTERM, &act, NULL); return; }