]> git.saurik.com Git - redis.git/commitdiff
Incrementally flush RDB on disk while loading it from a master.
authorantirez <antirez@gmail.com>
Fri, 24 Aug 2012 17:28:44 +0000 (19:28 +0200)
committerantirez <antirez@gmail.com>
Tue, 28 Aug 2012 10:47:33 +0000 (12:47 +0200)
This fixes issue #539.

Basically if there is enough free memory the OS may buffer the RDB file
that the slave transfers on disk from the master. The file may
actually be flused on disk at once by the operating system when it gets
closed by Redis, causing the close system call to block for a long time.

This patch is a modified version of one provided by yoav-steinberg of
@garantiadata (the original version was posted in the issue #539
comments), and tries to flush the OS buffers incrementally (every 8 MB
of loaded data).

src/config.h
src/fmacros.h
src/redis.c
src/redis.h
src/replication.c

index 28ef37d6ed01043a6000856de653608604218b51..617682fcf45247d07ae734f3724cc0a4fde600a5 100644 (file)
 #define aof_fsync fsync
 #endif
 
 #define aof_fsync fsync
 #endif
 
+/* Define rdb_fsync_range to sync_file_range() on Linux, otherwise we use
+ * the plain fsync() call. */
+#ifdef __linux__
+#define rdb_fsync_range(fd,off,size) sync_file_range(fd,off,size,SYNC_FILE_RANGE_WAIT_BEFORE|SYNC_FILE_RANGE_WRITE)
+#else
+#define rdb_fsync_range(fd,off,size) fsync(fd)
+#endif
+
 /* Byte ordering detection */
 #include <sys/types.h> /* This will likely define BYTE_ORDER */
 
 /* Byte ordering detection */
 #include <sys/types.h> /* This will likely define BYTE_ORDER */
 
index 866a9afa816163dfc54415619367a5ea8759a59e..3e54876575d44b53ec7dcf8c077e39c9f39afcdf 100644 (file)
@@ -3,6 +3,10 @@
 
 #define _BSD_SOURCE
 
 
 #define _BSD_SOURCE
 
+#if defined(__linux__)
+#define _GNU_SOURCE
+#endif
+
 #if defined(__linux__) || defined(__OpenBSD__)
 #define _XOPEN_SOURCE 700
 #else
 #if defined(__linux__) || defined(__OpenBSD__)
 #define _XOPEN_SOURCE 700
 #else
index 92f1323b3f5cca7838b148bc9be3d9cbec259ca0..0d7432d23c4886329c19a26151b4e6481561b313 100644 (file)
@@ -2079,9 +2079,10 @@ sds genRedisInfoString(char *section) {
 
             if (server.repl_state == REDIS_REPL_TRANSFER) {
                 info = sdscatprintf(info,
 
             if (server.repl_state == REDIS_REPL_TRANSFER) {
                 info = sdscatprintf(info,
-                    "master_sync_left_bytes:%ld\r\n"
+                    "master_sync_left_bytes:%lld\r\n"
                     "master_sync_last_io_seconds_ago:%d\r\n"
                     "master_sync_last_io_seconds_ago:%d\r\n"
-                    ,(long)server.repl_transfer_left,
+                    , (long long)
+                        (server.repl_transfer_size - server.repl_transfer_read),
                     (int)(server.unixtime-server.repl_transfer_lastio)
                 );
             }
                     (int)(server.unixtime-server.repl_transfer_lastio)
                 );
             }
index d12e944586d4f710c09f19e07b83b603f444ebb0..52bf5b8d549792d4df2e304c31a67f2f67f533eb 100644 (file)
@@ -684,7 +684,9 @@ struct redisServer {
     redisClient *master;     /* Client that is master for this slave */
     int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
     int repl_state;          /* Replication status if the instance is a slave */
     redisClient *master;     /* Client that is master for this slave */
     int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
     int repl_state;          /* Replication status if the instance is a slave */
-    off_t repl_transfer_left;  /* Bytes left reading .rdb  */
+    off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
+    off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
+    off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
     int repl_transfer_s;     /* Slave -> Master SYNC socket */
     int repl_transfer_fd;    /* Slave -> Master SYNC temp file descriptor */
     char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
     int repl_transfer_s;     /* Slave -> Master SYNC socket */
     int repl_transfer_fd;    /* Slave -> Master SYNC temp file descriptor */
     char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
index 45a223b8e36d4dbc333d83122052442ce22c747d..543477b661a2813e31603fd35267f08311c7d845 100644 (file)
@@ -311,16 +311,18 @@ void replicationAbortSyncTransfer(void) {
 }
 
 /* Asynchronously read the SYNC payload we receive from a master */
 }
 
 /* Asynchronously read the SYNC payload we receive from a master */
+#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
     char buf[4096];
     ssize_t nread, readlen;
 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
     char buf[4096];
     ssize_t nread, readlen;
+    off_t left;
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(privdata);
     REDIS_NOTUSED(mask);
 
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(privdata);
     REDIS_NOTUSED(mask);
 
-    /* If repl_transfer_left == -1 we still have to read the bulk length
+    /* If repl_transfer_size == -1 we still have to read the bulk length
      * from the master reply. */
      * from the master reply. */
-    if (server.repl_transfer_left == -1) {
+    if (server.repl_transfer_size == -1) {
         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
             redisLog(REDIS_WARNING,
                 "I/O error reading bulk count from MASTER: %s",
         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
             redisLog(REDIS_WARNING,
                 "I/O error reading bulk count from MASTER: %s",
@@ -343,16 +345,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
             redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
             goto error;
         }
             redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
             goto error;
         }
-        server.repl_transfer_left = strtol(buf+1,NULL,10);
+        server.repl_transfer_size = strtol(buf+1,NULL,10);
         redisLog(REDIS_NOTICE,
             "MASTER <-> SLAVE sync: receiving %ld bytes from master",
         redisLog(REDIS_NOTICE,
             "MASTER <-> SLAVE sync: receiving %ld bytes from master",
-            server.repl_transfer_left);
+            server.repl_transfer_size);
         return;
     }
 
     /* Read bulk data */
         return;
     }
 
     /* Read bulk data */
-    readlen = (server.repl_transfer_left < (signed)sizeof(buf)) ?
-        server.repl_transfer_left : (signed)sizeof(buf);
+    left = server.repl_transfer_size - server.repl_transfer_read;
+    readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
     nread = read(fd,buf,readlen);
     if (nread <= 0) {
         redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
     nread = read(fd,buf,readlen);
     if (nread <= 0) {
         redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
@@ -365,9 +367,23 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
         redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
         goto error;
     }
         redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
         goto error;
     }
-    server.repl_transfer_left -= nread;
+    server.repl_transfer_read += nread;
+
+    /* Sync data on disk from time to time, otherwise at the end of the transfer
+     * we may suffer a big delay as the memory buffers are copied into the
+     * actual disk. */
+    if (server.repl_transfer_read >=
+        server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
+    {
+        off_t sync_size = server.repl_transfer_read -
+                          server.repl_transfer_last_fsync_off;
+        rdb_fsync_range(server.repl_transfer_fd,
+            server.repl_transfer_last_fsync_off, sync_size);
+        server.repl_transfer_last_fsync_off += sync_size;
+    }
+
     /* Check if the transfer is now complete */
     /* Check if the transfer is now complete */
-    if (server.repl_transfer_left == 0) {
+    if (server.repl_transfer_read == server.repl_transfer_size) {
         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
             redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
             replicationAbortSyncTransfer();
         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
             redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
             replicationAbortSyncTransfer();
@@ -538,7 +554,9 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
     }
 
     server.repl_state = REDIS_REPL_TRANSFER;
     }
 
     server.repl_state = REDIS_REPL_TRANSFER;
-    server.repl_transfer_left = -1;
+    server.repl_transfer_size = -1;
+    server.repl_transfer_read = 0;
+    server.repl_transfer_last_fsync_off = 0;
     server.repl_transfer_fd = dfd;
     server.repl_transfer_lastio = server.unixtime;
     server.repl_transfer_tmpfile = zstrdup(tmpfile);
     server.repl_transfer_fd = dfd;
     server.repl_transfer_lastio = server.unixtime;
     server.repl_transfer_tmpfile = zstrdup(tmpfile);