X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/188714211af246b96f46a2e598c66fee95c52375..739ba0d21119778438c36ea60fdb263271783468:/redis.c diff --git a/redis.c b/redis.c index 1d3958ab..46f97273 100644 --- a/redis.c +++ b/redis.c @@ -37,8 +37,6 @@ #include #include #include -#define __USE_POSIX199309 -#define __USE_UNIX98 #include #ifdef HAVE_BACKTRACE @@ -372,6 +370,7 @@ struct redisServer { pid_t bgsavechildpid; pid_t bgrewritechildpid; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ + sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; int saveparamslen; char *logfile; @@ -379,7 +378,6 @@ struct redisServer { char *dbfilename; char *appendfilename; char *requirepass; - int shareobjects; int rdbcompression; int activerehashing; /* Replication related */ @@ -452,6 +450,7 @@ typedef struct pubsubPattern { } pubsubPattern; typedef void redisCommandProc(redisClient *c); +typedef void redisVmPreloadProc(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); struct redisCommand { char *name; redisCommandProc *proc; @@ -460,7 +459,7 @@ struct redisCommand { /* Use a function to determine which keys need to be loaded * in the background prior to executing this command. Takes precedence * over vm_firstkey and others, ignored when NULL */ - redisCommandProc *vm_preload_proc; + redisVmPreloadProc *vm_preload_proc; /* What keys should be loaded in background when calling this command? */ int vm_firstkey; /* The first argument that's a key (0 = no keys) */ int vm_lastkey; /* THe last argument that's a key */ @@ -516,8 +515,9 @@ struct sharedObjectsStruct { *outofrangeerr, *plus, *select0, *select1, *select2, *select3, *select4, *select5, *select6, *select7, *select8, *select9, - *messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3, - *psubscribebulk, *punsubscribebulk, *integers[REDIS_SHARED_INTEGERS]; + *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3, + *mbulk4, *psubscribebulk, *punsubscribebulk, + *integers[REDIS_SHARED_INTEGERS]; } shared; /* Global vars that are actally used as constants. The following double @@ -558,6 +558,8 @@ static int rdbSaveBackground(char *filename); static robj *createStringObject(char *ptr, size_t len); static robj *dupStringObject(robj *o); static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); +static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc); +static void flushAppendOnlyFile(void); static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc); static int syncWithMaster(void); static robj *tryObjectEncoding(robj *o); @@ -607,7 +609,7 @@ static robj *vmReadObjectFromSwap(off_t page, int type); static void waitEmptyIOJobsQueue(void); static void vmReopenSwapFile(void); static int vmFreePage(off_t page); -static void zunionInterBlockClientOnSwappedKeys(redisClient *c); +static void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c); static int dontWaitForSwappedKey(redisClient *c, robj *key); static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); @@ -622,12 +624,14 @@ static void freePubsubPattern(void *p); static int listMatchPubsubPattern(void *a, void *b); static int compareStringObjects(robj *a, robj *b); static void usage(); +static int rewriteAppendOnlyFileBackground(void); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); static void echoCommand(redisClient *c); static void setCommand(redisClient *c); static void setnxCommand(redisClient *c); +static void setexCommand(redisClient *c); static void getCommand(redisClient *c); static void delCommand(redisClient *c); static void existsCommand(redisClient *c); @@ -736,6 +740,7 @@ static struct redisCommand cmdTable[] = { {"get",getCommand,2,REDIS_CMD_INLINE,NULL,1,1,1}, {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, + {"setex",setexCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,0,0,0}, {"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1}, {"substr",substrCommand,4,REDIS_CMD_INLINE,NULL,1,1,1}, {"del",delCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0}, @@ -970,6 +975,53 @@ static int stringmatch(const char *pattern, const char *string, int nocase) { return stringmatchlen(pattern,strlen(pattern),string,strlen(string),nocase); } +/* Convert a string representing an amount of memory into the number of + * bytes, so for instance memtoll("1Gi") will return 1073741824 that is + * (1024*1024*1024). + * + * On parsing error, if *err is not NULL, it's set to 1, otherwise it's + * set to 0 */ +static long long memtoll(const char *p, int *err) { + const char *u; + char buf[128]; + long mul; /* unit multiplier */ + long long val; + unsigned int digits; + + if (err) *err = 0; + /* Search the first non digit character. */ + u = p; + if (*u == '-') u++; + while(*u && isdigit(*u)) u++; + if (*u == '\0' || !strcasecmp(u,"b")) { + mul = 1; + } else if (!strcasecmp(u,"k")) { + mul = 1000; + } else if (!strcasecmp(u,"kb")) { + mul = 1024; + } else if (!strcasecmp(u,"m")) { + mul = 1000*1000; + } else if (!strcasecmp(u,"mb")) { + mul = 1024*1024; + } else if (!strcasecmp(u,"g")) { + mul = 1000L*1000*1000; + } else if (!strcasecmp(u,"gb")) { + mul = 1024L*1024*1024; + } else { + if (err) *err = 1; + mul = 1; + } + digits = u-p; + if (digits >= sizeof(buf)) { + if (err) *err = 1; + return LLONG_MAX; + } + memcpy(buf,p,digits); + buf[digits] = '\0'; + val = strtoll(buf,NULL,10); + return val*mul; +} + static void redisLog(int level, const char *fmt, ...) { va_list ap; FILE *fp; @@ -1467,6 +1519,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD redisLog(REDIS_NOTICE,"Connecting to MASTER..."); if (syncWithMaster() == REDIS_OK) { redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded"); + if (server.appendonly) rewriteAppendOnlyFileBackground(); } } return 100; @@ -1478,6 +1531,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD static void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); + /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; listNode *ln; @@ -1502,6 +1556,8 @@ static void beforeSleep(struct aeEventLoop *eventLoop) { processInputBuffer(c); } } + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(); } static void createSharedObjects(void) { @@ -1542,11 +1598,13 @@ static void createSharedObjects(void) { shared.select8 = createStringObject("select 8\r\n",10); shared.select9 = createStringObject("select 9\r\n",10); shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13); + shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14); shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15); shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18); shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17); shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19); shared.mbulk3 = createStringObject("*3\r\n",4); + shared.mbulk4 = createStringObject("*4\r\n",4); for (j = 0; j < REDIS_SHARED_INTEGERS; j++) { shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j); shared.integers[j]->encoding = REDIS_ENCODING_INT; @@ -1585,7 +1643,6 @@ static void initServerConfig() { server.dbfilename = zstrdup("dump.rdb"); server.appendfilename = zstrdup("appendonly.aof"); server.requirepass = NULL; - server.shareobjects = 0; server.rdbcompression = 1; server.activerehashing = 1; server.maxclients = 0; @@ -1661,6 +1718,7 @@ static void initServer() { server.bgsavechildpid = -1; server.bgrewritechildpid = -1; server.bgrewritebuf = sdsempty(); + server.aofbuf = sdsempty(); server.lastsave = time(NULL); server.dirty = 0; server.stat_numcommands = 0; @@ -1802,7 +1860,7 @@ static void loadServerConfig(char *filename) { } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) { server.maxclients = atoi(argv[1]); } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) { - server.maxmemory = strtoll(argv[1], NULL, 10); + server.maxmemory = memtoll(argv[1],NULL); } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) { server.masterhost = sdsnew(argv[1]); server.masterport = atoi(argv[2]); @@ -1813,10 +1871,6 @@ static void loadServerConfig(char *filename) { if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) { - if ((server.shareobjects = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) { if ((server.rdbcompression = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -1833,6 +1887,9 @@ static void loadServerConfig(char *filename) { if ((server.appendonly = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) { + zfree(server.appendfilename); + server.appendfilename = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) { if (!strcasecmp(argv[1],"no")) { server.appendfsync = APPENDFSYNC_NO; @@ -1860,19 +1917,17 @@ static void loadServerConfig(char *filename) { zfree(server.vm_swap_file); server.vm_swap_file = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) { - server.vm_max_memory = strtoll(argv[1], NULL, 10); + server.vm_max_memory = memtoll(argv[1],NULL); } else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) { - server.vm_page_size = strtoll(argv[1], NULL, 10); + server.vm_page_size = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) { - server.vm_pages = strtoll(argv[1], NULL, 10); + server.vm_pages = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) { server.vm_max_threads = strtoll(argv[1], NULL, 10); } else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2){ - server.hash_max_zipmap_entries = strtol(argv[1], NULL, 10); + server.hash_max_zipmap_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2){ - server.hash_max_zipmap_value = strtol(argv[1], NULL, 10); - } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) { - server.vm_max_threads = strtoll(argv[1], NULL, 10); + server.hash_max_zipmap_value = memtoll(argv[1], NULL); } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -2166,7 +2221,7 @@ static void call(redisClient *c, struct redisCommand *cmd) { listLength(server.slaves)) replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc); if (listLength(server.monitors)) - replicationFeedSlaves(server.monitors,c->db->id,c->argv,c->argc); + replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc); server.stat_numcommands++; } @@ -2421,6 +2476,64 @@ static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int arg if (outv != static_outv) zfree(outv); } +static sds sdscatrepr(sds s, char *p, size_t len) { + s = sdscatlen(s,"\"",1); + while(len--) { + switch(*p) { + case '\\': + case '"': + s = sdscatprintf(s,"\\%c",*p); + break; + case '\n': s = sdscatlen(s,"\\n",1); break; + case '\r': s = sdscatlen(s,"\\r",1); break; + case '\t': s = sdscatlen(s,"\\t",1); break; + case '\a': s = sdscatlen(s,"\\a",1); break; + case '\b': s = sdscatlen(s,"\\b",1); break; + default: + if (isprint(*p)) + s = sdscatprintf(s,"%c",*p); + else + s = sdscatprintf(s,"\\x%02x",(unsigned char)*p); + break; + } + p++; + } + return sdscatlen(s,"\"",1); +} + +static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc) { + listNode *ln; + listIter li; + int j; + sds cmdrepr = sdsnew("+"); + robj *cmdobj; + struct timeval tv; + + gettimeofday(&tv,NULL); + cmdrepr = sdscatprintf(cmdrepr,"%ld.%ld ",(long)tv.tv_sec,(long)tv.tv_usec); + if (dictid != 0) cmdrepr = sdscatprintf(cmdrepr,"(db %d) ", dictid); + + for (j = 0; j < argc; j++) { + if (argv[j]->encoding == REDIS_ENCODING_INT) { + cmdrepr = sdscatprintf(cmdrepr, "%ld", (long)argv[j]->ptr); + } else { + cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, + sdslen(argv[j]->ptr)); + } + if (j != argc-1) + cmdrepr = sdscatlen(cmdrepr," ",1); + } + cmdrepr = sdscatlen(cmdrepr,"\r\n",2); + cmdobj = createObject(REDIS_STRING,cmdrepr); + + listRewind(monitors,&li); + while((ln = listNext(&li))) { + redisClient *monitor = ln->value; + addReply(monitor,cmdobj); + } + decrRefCount(cmdobj); +} + static void processInputBuffer(redisClient *c) { again: /* Before to process the input buffer, make sure the client is not @@ -3118,7 +3231,7 @@ static int getDoubleFromObject(robj *o, double *target) { } else if (o->encoding == REDIS_ENCODING_INT) { value = (long)o->ptr; } else { - redisAssert(1 != 1); + redisPanic("Unknown string encoding"); } } @@ -3155,7 +3268,7 @@ static int getLongLongFromObject(robj *o, long long *target) { } else if (o->encoding == REDIS_ENCODING_INT) { value = (long)o->ptr; } else { - redisAssert(1 != 1); + redisPanic("Unknown string encoding"); } } @@ -3942,40 +4055,55 @@ static void echoCommand(redisClient *c) { /*=================================== Strings =============================== */ -static void setGenericCommand(redisClient *c, int nx) { +static void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expire) { int retval; + long seconds = 0; /* initialized to avoid an harmness warning */ - if (nx) deleteIfVolatile(c->db,c->argv[1]); - retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]); + if (expire) { + if (getLongFromObjectOrReply(c, expire, &seconds, NULL) != REDIS_OK) + return; + if (seconds <= 0) { + addReplySds(c,sdsnew("-ERR invalid expire time in SETEX\r\n")); + return; + } + } + + if (nx) deleteIfVolatile(c->db,key); + retval = dictAdd(c->db->dict,key,val); if (retval == DICT_ERR) { if (!nx) { /* If the key is about a swapped value, we want a new key object * to overwrite the old. So we delete the old key in the database. * This will also make sure that swap pages about the old object * will be marked as free. */ - if (server.vm_enabled && deleteIfSwapped(c->db,c->argv[1])) - incrRefCount(c->argv[1]); - dictReplace(c->db->dict,c->argv[1],c->argv[2]); - incrRefCount(c->argv[2]); + if (server.vm_enabled && deleteIfSwapped(c->db,key)) + incrRefCount(key); + dictReplace(c->db->dict,key,val); + incrRefCount(val); } else { addReply(c,shared.czero); return; } } else { - incrRefCount(c->argv[1]); - incrRefCount(c->argv[2]); + incrRefCount(key); + incrRefCount(val); } server.dirty++; - removeExpire(c->db,c->argv[1]); + removeExpire(c->db,key); + if (expire) setExpire(c->db,key,time(NULL)+seconds); addReply(c, nx ? shared.cone : shared.ok); } static void setCommand(redisClient *c) { - setGenericCommand(c,0); + setGenericCommand(c,0,c->argv[1],c->argv[2],NULL); } static void setnxCommand(redisClient *c) { - setGenericCommand(c,1); + setGenericCommand(c,1,c->argv[1],c->argv[2],NULL); +} + +static void setexCommand(redisClient *c) { + setGenericCommand(c,0,c->argv[1],c->argv[3],c->argv[2]); } static int getGenericCommand(redisClient *c) { @@ -4219,7 +4347,12 @@ static void delCommand(redisClient *c) { } static void existsCommand(redisClient *c) { - addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero); + expireIfNeeded(c->db,c->argv[1]); + if (dictFind(c->db->dict,c->argv[1])) { + addReply(c, shared.cone); + } else { + addReply(c, shared.czero); + } } static void selectCommand(redisClient *c) { @@ -4234,18 +4367,25 @@ static void selectCommand(redisClient *c) { static void randomkeyCommand(redisClient *c) { dictEntry *de; + robj *key; while(1) { de = dictGetRandomKey(c->db->dict); if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break; } + if (de == NULL) { - addReply(c,shared.plus); - addReply(c,shared.crlf); + addReply(c,shared.nullbulk); + return; + } + + key = dictGetEntryKey(de); + if (server.vm_enabled) { + key = dupStringObject(key); + addReplyBulk(c,key); + decrRefCount(key); } else { - addReply(c,shared.plus); - addReply(c,dictGetEntryKey(de)); - addReply(c,shared.crlf); + addReplyBulk(c,key); } } @@ -4354,7 +4494,6 @@ static void shutdownCommand(redisClient *c) { unlink(server.pidfile); redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory()); redisLog(REDIS_WARNING,"Server exit now, bye bye..."); - if (server.vm_enabled) unlink(server.vm_swap_file); exit(0); } else { /* Ooops.. error saving! The best we can do is to continue @@ -6337,12 +6476,11 @@ static void hincrbyCommand(redisClient *c) { if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return; if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; if ((current = hashGet(o,c->argv[2])) != NULL) { - if (current->encoding == REDIS_ENCODING_RAW) - value = strtoll(current->ptr,NULL,10); - else if (current->encoding == REDIS_ENCODING_INT) - value = (long)current->ptr; - else - redisAssert(1 != 1); + if (getLongLongFromObjectOrReply(c,current,&value, + "hash value is not an integer") != REDIS_OK) { + decrRefCount(current); + return; + } decrRefCount(current); } else { value = 0; @@ -7883,11 +8021,55 @@ static void freeMemoryIfNeeded(void) { /* ============================== Append Only file ========================== */ +/* Write the append only file buffer on disk. + * + * Since we are required to write the AOF before replying to the client, + * and the only way the client socket can get a write is entering when the + * the event loop, we accumulate all the AOF writes in a memory + * buffer and write it on disk using this function just before entering + * the event loop again. */ +static void flushAppendOnlyFile(void) { + time_t now; + ssize_t nwritten; + + if (sdslen(server.aofbuf) == 0) return; + + /* We want to perform a single write. This should be guaranteed atomic + * at least if the filesystem we are writing is a real physical one. + * While this will save us against the server being killed I don't think + * there is much to do about the whole server stopping for power problems + * or alike */ + nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); + if (nwritten != (signed)sdslen(server.aofbuf)) { + /* Ooops, we are in troubles. The best thing to do for now is + * aborting instead of giving the illusion that everything is + * working as expected. */ + if (nwritten == -1) { + redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); + } else { + redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno)); + } + exit(1); + } + sdsfree(server.aofbuf); + server.aofbuf = sdsempty(); + + /* Fsync if needed */ + now = time(NULL); + if (server.appendfsync == APPENDFSYNC_ALWAYS || + (server.appendfsync == APPENDFSYNC_EVERYSEC && + now-server.lastfsync > 1)) + { + /* aof_fsync is defined as fdatasync() for Linux in order to avoid + * flushing metadata. */ + aof_fsync(server.appendfd); /* Let's try to get this data on the disk */ + server.lastfsync = now; + } +} + static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); int j; - ssize_t nwritten; - time_t now; robj *tmpargv[3]; /* The DB this command was targetting is not the same as the last command @@ -7933,23 +8115,11 @@ static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv decrRefCount(argv[j]); } - /* We want to perform a single write. This should be guaranteed atomic - * at least if the filesystem we are writing is a real physical one. - * While this will save us against the server being killed I don't think - * there is much to do about the whole server stopping for power problems - * or alike */ - nwritten = write(server.appendfd,buf,sdslen(buf)); - if (nwritten != (signed)sdslen(buf)) { - /* Ooops, we are in troubles. The best thing to do for now is - * to simply exit instead to give the illusion that everything is - * working as expected. */ - if (nwritten == -1) { - redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); - } else { - redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno)); - } - exit(1); - } + /* Append to the AOF buffer. This will be flushed on disk just before + * of re-entering the event loop, so before the client will get a + * positive reply about the operation performed. */ + server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); + /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we @@ -7958,14 +8128,6 @@ static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); sdsfree(buf); - now = time(NULL); - if (server.appendfsync == APPENDFSYNC_ALWAYS || - (server.appendfsync == APPENDFSYNC_EVERYSEC && - now-server.lastfsync > 1)) - { - fsync(server.appendfd); /* Let's try to get this data on the disk */ - server.lastfsync = now; - } } /* In Redis commands are always executed in the context of a client, so in @@ -7985,12 +8147,14 @@ static struct redisClient *createFakeClient(void) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); + initClientMultiState(c); return c; } static void freeFakeClient(struct redisClient *c) { sdsfree(c->querybuf); listRelease(c->reply); + freeClientMultiState(c); zfree(c); } @@ -8002,6 +8166,7 @@ int loadAppendOnlyFile(char *filename) { FILE *fp = fopen(filename,"r"); struct redis_stat sb; unsigned long long loadedkeys = 0; + int appendonly = server.appendonly; if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) return REDIS_ERR; @@ -8011,6 +8176,10 @@ int loadAppendOnlyFile(char *filename) { exit(1); } + /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI + * to the same file we're about to read. */ + server.appendonly = 0; + fakeClient = createFakeClient(); while(1) { int argc, j; @@ -8066,8 +8235,14 @@ int loadAppendOnlyFile(char *filename) { } } } + + /* This point can only be reached when EOF is reached without errors. + * If the client is in the middle of a MULTI/EXEC, log error and quit. */ + if (fakeClient->flags & REDIS_MULTI) goto readerr; + fclose(fp); freeFakeClient(fakeClient); + server.appendonly = appendonly; return REDIS_OK; readerr: @@ -8417,42 +8592,38 @@ static void aofRemoveTempFile(pid_t childpid) { /* =================== Virtual Memory - Blocking Side ====================== */ -/* substitute the first occurrence of '%p' with the process pid in the - * swap file name. */ -static void expandVmSwapFilename(void) { - char *p = strstr(server.vm_swap_file,"%p"); - sds new; - - if (!p) return; - new = sdsempty(); - *p = '\0'; - new = sdscat(new,server.vm_swap_file); - new = sdscatprintf(new,"%ld",(long) getpid()); - new = sdscat(new,p+2); - zfree(server.vm_swap_file); - server.vm_swap_file = new; -} - static void vmInit(void) { off_t totsize; int pipefds[2]; size_t stacksize; + struct flock fl; if (server.vm_max_threads != 0) zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ - expandVmSwapFilename(); redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file); + /* Try to open the old swap file, otherwise create it */ if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) { server.vm_fp = fopen(server.vm_swap_file,"w+b"); } if (server.vm_fp == NULL) { redisLog(REDIS_WARNING, - "Impossible to open the swap file: %s. Exiting.", + "Can't open the swap file: %s. Exiting.", strerror(errno)); exit(1); } server.vm_fd = fileno(server.vm_fp); + /* Lock the swap file for writing, this is useful in order to avoid + * another instance to use the same swap file for a config error. */ + fl.l_type = F_WRLCK; + fl.l_whence = SEEK_SET; + fl.l_start = fl.l_len = 0; + if (fcntl(server.vm_fd,F_SETLK,&fl) == -1) { + redisLog(REDIS_WARNING, + "Can't lock the swap file at '%s': %s. Make sure it is not used by another Redis instance.", server.vm_swap_file, strerror(errno)); + exit(1); + } + /* Initialize */ server.vm_next_page = 0; server.vm_near_pages = 0; server.vm_stats_used_pages = 0; @@ -9378,12 +9549,30 @@ static int waitForSwappedKey(redisClient *c, robj *key) { return 1; } -/* Preload keys needed for the ZUNION and ZINTER commands. */ -static void zunionInterBlockClientOnSwappedKeys(redisClient *c) { +/* Preload keys for any command with first, last and step values for + * the command keys prototype, as defined in the command table. */ +static void waitForMultipleSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { + int j, last; + if (cmd->vm_firstkey == 0) return; + last = cmd->vm_lastkey; + if (last < 0) last = argc+last; + for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) { + redisAssert(j < argc); + waitForSwappedKey(c,argv[j]); + } +} + +/* Preload keys needed for the ZUNION and ZINTER commands. + * Note that the number of keys to preload is user-defined, so we need to + * apply a sanity check against argc. */ +static void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv) { int i, num; - num = atoi(c->argv[2]->ptr); + REDIS_NOTUSED(cmd); + + num = atoi(argv[2]->ptr); + if (num > (argc-3)) return; for (i = 0; i < num; i++) { - waitForSwappedKey(c,c->argv[3+i]); + waitForSwappedKey(c,argv[3+i]); } } @@ -9398,16 +9587,10 @@ static void zunionInterBlockClientOnSwappedKeys(redisClient *c) { * Return 1 if the client is marked as blocked, 0 if the client can * continue as the keys it is going to access appear to be in memory. */ static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c) { - int j, last; - if (cmd->vm_preload_proc != NULL) { - cmd->vm_preload_proc(c); + cmd->vm_preload_proc(c,cmd,c->argc,c->argv); } else { - if (cmd->vm_firstkey == 0) return 0; - last = cmd->vm_lastkey; - if (last < 0) last = c->argc+last; - for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep) - waitForSwappedKey(c,c->argv[j]); + waitForMultipleSwappedKeys(c,cmd,c->argc,c->argv); } /* If the client was blocked for at least one key, mark it as blocked. */ @@ -9771,8 +9954,9 @@ static int pubsubPublishMessage(robj *channel, robj *message) { sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { - addReply(pat->client,shared.mbulk3); - addReply(pat->client,shared.messagebulk); + addReply(pat->client,shared.mbulk4); + addReply(pat->client,shared.pmessagebulk); + addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); addReplyBulk(pat->client,message); receivers++;