From: antirez Date: Thu, 6 May 2010 21:19:46 +0000 (+0200) Subject: Merge branch 'master' into aof-speedup X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/f424d5f398dde0679ae2afd5063c80d845641a00?ds=sidebyside;hp=-c Merge branch 'master' into aof-speedup --- f424d5f398dde0679ae2afd5063c80d845641a00 diff --combined redis.c index e842b480,d8d024e7..d47dc60b --- a/redis.c +++ b/redis.c @@@ -370,7 -370,6 +370,7 @@@ struct redisServer pid_t bgsavechildpid; pid_t bgrewritechildpid; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ + sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; int saveparamslen; char *logfile; @@@ -558,7 -557,6 +558,7 @@@ static robj *createStringObject(char *p static robj *dupStringObject(robj *o); static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); static void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc); +static void flushAppendOnlyFile(void); static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc); static int syncWithMaster(void); static robj *tryObjectEncoding(robj *o); @@@ -1530,7 -1528,6 +1530,7 @@@ static int serverCron(struct aeEventLoo static void beforeSleep(struct aeEventLoop *eventLoop) { REDIS_NOTUSED(eventLoop); + /* Awake clients that got all the swapped keys they requested */ if (server.vm_enabled && listLength(server.io_ready_clients)) { listIter li; listNode *ln; @@@ -1555,8 -1552,6 +1555,8 @@@ processInputBuffer(c); } } + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(); } static void createSharedObjects(void) { @@@ -1717,7 -1712,6 +1717,7 @@@ static void initServer() server.bgsavechildpid = -1; server.bgrewritechildpid = -1; server.bgrewritebuf = sdsempty(); + server.aofbuf = sdsempty(); server.lastsave = time(NULL); server.dirty = 0; server.stat_numcommands = 0; @@@ -3227,7 -3221,7 +3227,7 @@@ static int getDoubleFromObject(robj *o } else if (o->encoding == REDIS_ENCODING_INT) { value = (long)o->ptr; } else { - redisAssert(1 != 1); + redisPanic("Unknown string encoding"); } } @@@ -3264,7 -3258,7 +3264,7 @@@ static int getLongLongFromObject(robj * } else if (o->encoding == REDIS_ENCODING_INT) { value = (long)o->ptr; } else { - redisAssert(1 != 1); + redisPanic("Unknown string encoding"); } } @@@ -6468,12 -6462,11 +6468,11 @@@ static void hincrbyCommand(redisClient if (getLongLongFromObjectOrReply(c,c->argv[3],&incr,NULL) != REDIS_OK) return; if ((o = hashLookupWriteOrCreate(c,c->argv[1])) == NULL) return; if ((current = hashGet(o,c->argv[2])) != NULL) { - if (current->encoding == REDIS_ENCODING_RAW) - value = strtoll(current->ptr,NULL,10); - else if (current->encoding == REDIS_ENCODING_INT) - value = (long)current->ptr; - else - redisAssert(1 != 1); + if (getLongLongFromObjectOrReply(c,current,&value, + "hash value is not an integer") != REDIS_OK) { + decrRefCount(current); + return; + } decrRefCount(current); } else { value = 0; @@@ -8014,55 -8007,11 +8013,55 @@@ static void freeMemoryIfNeeded(void) /* ============================== Append Only file ========================== */ +/* Write the append only file buffer on disk. + * + * Since we are required to write the AOF before replying to the client, + * and the only way the client socket can get a write is entering when the + * the event loop, we accumulate all the AOF writes in a memory + * buffer and write it on disk using this function just before entering + * the event loop again. */ +static void flushAppendOnlyFile(void) { + time_t now; + ssize_t nwritten; + + if (sdslen(server.aofbuf) == 0) return; + + /* We want to perform a single write. This should be guaranteed atomic + * at least if the filesystem we are writing is a real physical one. + * While this will save us against the server being killed I don't think + * there is much to do about the whole server stopping for power problems + * or alike */ + nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); + if (nwritten != (signed)sdslen(server.aofbuf)) { + /* Ooops, we are in troubles. The best thing to do for now is + * aborting instead of giving the illusion that everything is + * working as expected. */ + if (nwritten == -1) { + redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); + } else { + redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno)); + } + exit(1); + } + sdsfree(server.aofbuf); + server.aofbuf = sdsempty(); + + /* Fsync if needed */ + now = time(NULL); + if (server.appendfsync == APPENDFSYNC_ALWAYS || + (server.appendfsync == APPENDFSYNC_EVERYSEC && + now-server.lastfsync > 1)) + { + /* aof_fsync is defined as fdatasync() for Linux in order to avoid + * flushing metadata. */ + aof_fsync(server.appendfd); /* Let's try to get this data on the disk */ + server.lastfsync = now; + } +} + static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { sds buf = sdsempty(); int j; - ssize_t nwritten; - time_t now; robj *tmpargv[3]; /* The DB this command was targetting is not the same as the last command @@@ -8108,11 -8057,23 +8107,11 @@@ decrRefCount(argv[j]); } - /* We want to perform a single write. This should be guaranteed atomic - * at least if the filesystem we are writing is a real physical one. - * While this will save us against the server being killed I don't think - * there is much to do about the whole server stopping for power problems - * or alike */ - nwritten = write(server.appendfd,buf,sdslen(buf)); - if (nwritten != (signed)sdslen(buf)) { - /* Ooops, we are in troubles. The best thing to do for now is - * to simply exit instead to give the illusion that everything is - * working as expected. */ - if (nwritten == -1) { - redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); - } else { - redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno)); - } - exit(1); - } + /* Append to the AOF buffer. This will be flushed on disk just before + * of re-entering the event loop, so before the client will get a + * positive reply about the operation performed. */ + server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); + /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one * in a buffer, so that when the child process will do its work we @@@ -8121,6 -8082,16 +8120,6 @@@ server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); sdsfree(buf); - now = time(NULL); - if (server.appendfsync == APPENDFSYNC_ALWAYS || - (server.appendfsync == APPENDFSYNC_EVERYSEC && - now-server.lastfsync > 1)) - { - /* aof_fsync is defined as fdatasync() for Linux in order to avoid - * flushing metadata. */ - aof_fsync(server.appendfd); /* Let's try to get this data on the disk */ - server.lastfsync = now; - } } /* In Redis commands are always executed in the context of a client, so in @@@ -8140,12 -8111,14 +8139,14 @@@ static struct redisClient *createFakeCl c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); + initClientMultiState(c); return c; } static void freeFakeClient(struct redisClient *c) { sdsfree(c->querybuf); listRelease(c->reply); + freeClientMultiState(c); zfree(c); } @@@ -8157,6 -8130,7 +8158,7 @@@ int loadAppendOnlyFile(char *filename) FILE *fp = fopen(filename,"r"); struct redis_stat sb; unsigned long long loadedkeys = 0; + int appendonly = server.appendonly; if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) return REDIS_ERR; @@@ -8166,6 -8140,10 +8168,10 @@@ exit(1); } + /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI + * to the same file we're about to read. */ + server.appendonly = 0; + fakeClient = createFakeClient(); while(1) { int argc, j; @@@ -8221,8 -8199,14 +8227,14 @@@ } } } + + /* This point can only be reached when EOF is reached without errors. + * If the client is in the middle of a MULTI/EXEC, log error and quit. */ + if (fakeClient->flags & REDIS_MULTI) goto readerr; + fclose(fp); freeFakeClient(fakeClient); + server.appendonly = appendonly; return REDIS_OK; readerr: