]> git.saurik.com Git - redis.git/commitdiff
source reshaped a bit to play well with a bgsaving thread, still work to do, does...
authorantirez <antirez@gmail.com>
Fri, 7 Jan 2011 17:15:14 +0000 (18:15 +0100)
committerantirez <antirez@gmail.com>
Fri, 7 Jan 2011 17:15:14 +0000 (18:15 +0100)
TODO
src/aof.c
src/db.c
src/diskstore.c
src/dscache.c
src/rdb.c
src/redis.c
src/redis.h

diff --git a/TODO b/TODO
index b5dabd3a2b1af1e29072d5ce8b4f4efa396a52b4..2402a9d413034a345e33a0a08531c71c4ac6b3eb 100644 (file)
--- a/TODO
+++ b/TODO
@@ -15,6 +15,8 @@ DISKSTORE TODO
 * Check that 00/00 and ff/ff exist at startup, otherwise exit with error.
 * Implement sync flush option, where data is written synchronously on disk when a command is executed.
 * Implement MULTI/EXEC as transaction abstract API to diskstore.c, with transaction_start, transaction_end, and a journal to recover.
+* Stop BGSAVE thread on shutdown and any other condition where the child is killed during normal bgsave.
+* Use a mutex to log on the file, so that we don't get overlapping messages, or even better make sure to use a single write against it.
 
 REPLICATION
 ===========
index f5d04a62b4d493d1cbb82b5b7f2b98a55adf0a10..723d845f64297a5c3b84203fac5694560e828cbc 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -585,10 +585,7 @@ void aofRemoveTempFile(pid_t childpid) {
 
 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
  * Handle this. */
-void backgroundRewriteDoneHandler(int statloc) {
-    int exitcode = WEXITSTATUS(statloc);
-    int bysignal = WIFSIGNALED(statloc);
-
+void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
     if (!bysignal && exitcode == 0) {
         int fd;
         char tmpfile[256];
@@ -636,7 +633,7 @@ void backgroundRewriteDoneHandler(int statloc) {
     } else {
         redisLog(REDIS_WARNING,
             "Background append only file rewriting terminated by signal %d",
-            WTERMSIG(statloc));
+            bysitnal);
     }
 cleanup:
     sdsfree(server.bgrewritebuf);
index 1242c88983f23dc5234a408d6bcf2618e69eb135..6276c992b27be23db0b15801fc96fdfbd674729a 100644 (file)
--- a/src/db.c
+++ b/src/db.c
@@ -379,30 +379,6 @@ void typeCommand(redisClient *c) {
     addReplyStatus(c,type);
 }
 
-void saveCommand(redisClient *c) {
-    if (server.bgsavechildpid != -1) {
-        addReplyError(c,"Background save already in progress");
-        return;
-    }
-    if (rdbSave(server.dbfilename) == REDIS_OK) {
-        addReply(c,shared.ok);
-    } else {
-        addReply(c,shared.err);
-    }
-}
-
-void bgsaveCommand(redisClient *c) {
-    if (server.bgsavechildpid != -1) {
-        addReplyError(c,"Background save already in progress");
-        return;
-    }
-    if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
-        addReplyStatus(c,"Background saving started");
-    } else {
-        addReply(c,shared.err);
-    }
-}
-
 void shutdownCommand(redisClient *c) {
     if (prepareForShutdown() == REDIS_OK)
         exit(0);
index 26f3af6076927ed051b005e137f2d212a8a781e5..0aa8e37fe60b35cf0de4885910793789181695bc 100644 (file)
@@ -349,11 +349,16 @@ void dsFlushDb(int dbid) {
     }
 }
 
-int dsRdbSave(char *filename) {
-    char tmpfile[256];
+void *dsRdbSave_thread(void *arg) {
+    char tmpfile[256], *filename = (char*)arg;
     int j, i;
     time_t now = time(NULL);
 
+    /* Change state to ACTIVE, to signal there is a saving thead working. */
+    pthread_mutex_lock(&server.bgsavethread_mutex);
+    server.bgsavethread_state = REDIS_BGSAVE_THREAD_ACTIVE;
+    pthread_mutex_unlock(&server.bgsavethread_mutex);
+
     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
     fp = fopen(tmpfile,"w");
     if (!fp) {
@@ -377,6 +382,7 @@ int dsRdbSave(char *filename) {
     fflush(fp);
     fsync(fileno(fp));
     fclose(fp);
+    zfree(filename);
 
     /* Use RENAME to make sure the DB file is changed atomically only
      * if the generate DB file is ok. */
@@ -386,12 +392,24 @@ int dsRdbSave(char *filename) {
         return REDIS_ERR;
     }
     redisLog(REDIS_NOTICE,"DB saved on disk");
-    server.dirty = 0;
-    server.lastsave = time(NULL);
     return REDIS_OK;
 
 werr:
+    zfree(filename);
     fclose(fp);
     unlink(tmpfile);
     redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
 }
+
+int dsRdbSave(char *filename) {
+    pthread_t thread;
+
+    if (pthread_create(&thread,NULL,dsRdbSave_thread,zstrdup(filename)) != 0) {
+        redisLog(REDIS_WARNING,"Can't create diskstore BGSAVE thread: %s",
+            strerror(errno));
+        return REDIS_ERR;
+    } else {
+        server.bgsavethread = thread;
+        return REDIS_OK;
+    }
+}
index 1adba6f56c90af9ae3c48890394e6be7cb0a8f5a..1c419c6a77fb582ae4586286f9e212c06d9eaefc 100644 (file)
@@ -132,6 +132,7 @@ void dsInit(void) {
     server.io_ready_clients = listCreate();
     pthread_mutex_init(&server.io_mutex,NULL);
     pthread_cond_init(&server.io_condvar,NULL);
+    pthread_mutex_init(&server.bgsavethread_mutex,NULL);
     server.io_active_threads = 0;
     if (pipe(pipefds) == -1) {
         redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting."
index 6b6b6ab643b232d6f13c278216111edcb6569a16..62756d3047252c9ac04149ce4384c1a3f1f77985 100644 (file)
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -496,22 +496,23 @@ werr:
 int rdbSaveBackground(char *filename) {
     pid_t childpid;
 
-    if (server.bgsavechildpid != -1) return REDIS_ERR;
+    if (server.bgsavechildpid != -1 ||
+        server.bgsavethread != (pthread_t) -1) return REDIS_ERR;
 
     server.dirty_before_bgsave = server.dirty;
 
+    if (server.ds_enabled) {
+        cacheForcePointInTime();
+        return dsRdbSave(filename);
+    }
+
     if ((childpid = fork()) == 0) {
         int retval;
 
         /* Child */
         if (server.ipfd > 0) close(server.ipfd);
         if (server.sofd > 0) close(server.sofd);
-        if (server.ds_enabled) {
-            cacheForcePointInTime();
-            dsRdbSave(filename);
-        } else {
-            rdbSave(filename);
-        }
+        retval = rdbSave(filename);
         _exit((retval == REDIS_OK) ? 0 : 1);
     } else {
         /* Parent */
@@ -950,10 +951,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
 }
 
 /* A background saving child (BGSAVE) terminated its work. Handle this. */
-void backgroundSaveDoneHandler(int statloc) {
-    int exitcode = WEXITSTATUS(statloc);
-    int bysignal = WIFSIGNALED(statloc);
-
+void backgroundSaveDoneHandler(int exitcode, int bysignal) {
     if (!bysignal && exitcode == 0) {
         redisLog(REDIS_NOTICE,
             "Background saving terminated with success");
@@ -963,11 +961,37 @@ void backgroundSaveDoneHandler(int statloc) {
         redisLog(REDIS_WARNING, "Background saving error");
     } else {
         redisLog(REDIS_WARNING,
-            "Background saving terminated by signal %d", WTERMSIG(statloc));
+            "Background saving terminated by signal %d", bysignal);
         rdbRemoveTempFile(server.bgsavechildpid);
     }
     server.bgsavechildpid = -1;
+    server.bgsavethread = (pthread_t) -1;
+    server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE;
     /* Possibly there are slaves waiting for a BGSAVE in order to be served
      * (the first stage of SYNC is a bulk transfer of dump.rdb) */
     updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
 }
+
+void saveCommand(redisClient *c) {
+    if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread-t)-1) {
+        addReplyError(c,"Background save already in progress");
+        return;
+    }
+    if (rdbSave(server.dbfilename) == REDIS_OK) {
+        addReply(c,shared.ok);
+    } else {
+        addReply(c,shared.err);
+    }
+}
+
+void bgsaveCommand(redisClient *c) {
+    if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread-t)-1) {
+        addReplyError(c,"Background save already in progress");
+        return;
+    }
+    if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
+        addReplyStatus(c,"Background saving started");
+    } else {
+        addReply(c,shared.err);
+    }
+}
index c0dac05fe6c8c4789ede397e02b4cd7cb6d3f95d..2fd3ee39e3c8843f1f5e56c991e195c11b8aaf10 100644 (file)
@@ -589,13 +589,31 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
         pid_t pid;
 
         if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
+            int exitcode = WEXITSTATUS(statloc);
+            int bysignal = 0;
+            
+            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
+
             if (pid == server.bgsavechildpid) {
-                backgroundSaveDoneHandler(statloc);
+                backgroundSaveDoneHandler(exitcode,bysignal);
             } else {
-                backgroundRewriteDoneHandler(statloc);
+                backgroundRewriteDoneHandler(exitcode,bysignal);
             }
             updateDictResizePolicy();
         }
+        if (server.bgsavethread != (pthread_t) -1) {
+            int state;
+
+            pthread_mutex_lock(&server.bgsavethread_mutex);
+            state = server.bgsavethread_state;
+            pthread_mutex_unlock(&server.bgsavethread_mutex);
+
+            if (state == REDIS_BGSAVE_DONE_OK || state == REDIS_BGSAVE_DONE_ERR)
+            {
+                backgroundSaveDoneHandler(
+                    (state == REDIS_BGSAVE_DONE_OK) ? 0 : 1, 0);
+            }
+        }
     } else if (!server.ds_enabled) {
         /* If there is not a background saving in progress check if
          * we have to save now */
@@ -867,6 +885,8 @@ void initServer() {
     server.cronloops = 0;
     server.bgsavechildpid = -1;
     server.bgrewritechildpid = -1;
+    server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE;
+    server.bgsavethread = (pthread_t) -1;
     server.bgrewritebuf = sdsempty();
     server.aofbuf = sdsempty();
     server.lastsave = time(NULL);
index 495de985916f91f5b55840da33d621a173718df2..c87613349deff2e6e1db99d17a497bde95b3c33d 100644 (file)
 #define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4
 #define REDIS_MAXMEMORY_NO_EVICTION 5
 
+/* Diskstore background saving thread states */
+#define REDIS_BGSAVE_THREAD_UNACTIVE 0
+#define REDIS_BGSAVE_THREAD_ACTIVE 1
+#define REDIS_BGSAVE_THREAD_DONE_OK 2
+#define REDIS_BGSAVE_THREAD_DONE_ERR 3
+
 /* We can print the stacktrace, so our assert is defined this way: */
 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
 #define redisPanic(_e) _redisPanic(#_e,__FILE__,__LINE__),_exit(1)
@@ -390,25 +396,30 @@ struct redisServer {
     int appendfsync;
     int no_appendfsync_on_rewrite;
     int shutdown_asap;
+    int activerehashing;
+    char *requirepass;
+    /* Persistence */
     time_t lastfsync;
     int appendfd;
     int appendseldb;
     char *pidfile;
     pid_t bgsavechildpid;
     pid_t bgrewritechildpid;
+    int bgsavethread_state;
+    pthread_mutex_t bgsavethread_mutex;
+    pthread_t bgsavethread;
     sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
     sds aofbuf;       /* AOF buffer, written before entering the event loop */
     struct saveparam *saveparams;
     int saveparamslen;
+    char *dbfilename;
+    int rdbcompression;
+    char *appendfilename;
+    /* Logging */
     char *logfile;
     int syslog_enabled;
     char *syslog_ident;
     int syslog_facility;
-    char *dbfilename;
-    char *appendfilename;
-    char *requirepass;
-    int rdbcompression;
-    int activerehashing;
     /* Replication related */
     int isslave;
     /* Slave specific fields */
@@ -745,7 +756,7 @@ int rdbSaveObject(FILE *fp, robj *o);
 off_t rdbSavedObjectLen(robj *o);
 off_t rdbSavedObjectPages(robj *o);
 robj *rdbLoadObject(int type, FILE *fp);
-void backgroundSaveDoneHandler(int statloc);
+void backgroundSaveDoneHandler(int exitcode, int bysignal) {
 int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, time_t now);
 int rdbLoadType(FILE *fp);
 time_t rdbLoadTime(FILE *fp);
@@ -759,7 +770,7 @@ int rewriteAppendOnlyFileBackground(void);
 int loadAppendOnlyFile(char *filename);
 void stopAppendOnly(void);
 int startAppendOnly(void);
-void backgroundRewriteDoneHandler(int statloc);
+void backgroundRewriteDoneHandler(int exitcode, int bysignal);
 
 /* Sorted sets data type */
 zskiplist *zslCreate(void);