]> git.saurik.com Git - redis.git/commitdiff
Allow an AOF rewrite buffer > 2GB (Fix for issue #504).
authorantirez <antirez@gmail.com>
Tue, 22 May 2012 11:03:41 +0000 (13:03 +0200)
committerantirez <antirez@gmail.com>
Thu, 24 May 2012 13:22:35 +0000 (15:22 +0200)
During the AOF rewrite process, the parent process needs to accumulate
the new writes in an in-memory buffer: when the child will terminate the
AOF rewriting process this buffer (that ist the difference between the
dataset when the rewrite was started, and the current dataset) is
flushed to the new AOF file.

We used to implement this buffer using an sds.c string, but sds.c has a
2GB limit. Sometimes the dataset can be big enough, the amount of writes
so high, and the rewrite process slow enough that we overflow the 2GB
limit, causing a crash, documented on github by issue #504.

In order to prevent this from happening, this commit introduces a new
system to accumulate writes, implemented by a linked list of blocks of
10 MB each, so that we also avoid paying the reallocation cost.

Note that theoretically modern operating systems may implement realloc()
simply as a remaping of the old pages, thus with very good performances,
see for instance the mremap() syscall on Linux. However this is not
always true, and jemalloc by default avoids doing this because there are
issues with the current implementation of mremap().

For this reason we are using a linked list of blocks instead of a single
block that gets reallocated again and again.

The changes in this commit lacks testing, that will be performed before
merging into the unstable branch. This fix will not enter 2.4 because it
is too invasive. However 2.4 will log a warning when the AOF rewrite
buffer is near to the 2GB limit.

src/aof.c
src/redis.c
src/redis.h

index 59b5ab89f1c710ed3863764708b6081f858ac62e..607bdce8259c2e151fc440aeabc61e49a4fb1726 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
 
 void aofUpdateCurrentSize(void);
 
+/* ----------------------------------------------------------------------------
+ * AOF rewrite buffer implementation.
+ *
+ * The following code implement a simple buffer used in order to accumulate
+ * changes while the background process is rewriting the AOF file.
+ *
+ * We only need to append, but can't just use realloc with a large block
+ * because 'huge' reallocs are not always handled as one could expect
+ * (via remapping of pages at OS level) but may involve copying data.
+ *
+ * For this reason we use a list of blocks, every block is
+ * AOF_RW_BUF_BLOCK_SIZE bytes.
+ * ------------------------------------------------------------------------- */
+
+#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */
+
+typedef struct aofrwblock {
+    unsigned long used, free;
+    char buf[AOF_RW_BUF_BLOCK_SIZE];
+} aofrwblock;
+
+/* This function free the old AOF rewrite buffer if needed, and initialize
+ * a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL
+ * so can be used for the first initialization as well. */
+void aofRewriteBufferReset(void) {
+    if (server.aof_rewrite_buf_blocks)
+        listRelease(server.aof_rewrite_buf_blocks);
+
+    server.aof_rewrite_buf_blocks = listCreate();
+    listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
+}
+
+/* Return the current size of the AOF rerwite buffer. */
+unsigned long aofRewriteBufferSize(void) {
+    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
+    aofrwblock *block = ln ? ln->value : NULL;
+
+    if (block == NULL) return 0;
+    unsigned long size =
+        (listLength(server.aof_rewrite_buf_blocks)-1) * AOF_RW_BUF_BLOCK_SIZE;
+    size += block->used;
+    return size;
+}
+
+/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
+void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
+    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
+    aofrwblock *block = ln ? ln->value : NULL;
+
+    while(len) {
+        /* If we already got at least an allocated block, try appending
+         * at least some piece into it. */
+        if (block) {
+            unsigned long thislen = (block->free < len) ? block->free : len;
+            if (thislen) {  /* The current block is not already full. */
+                memcpy(block->buf+block->used, s, thislen);
+                block->used += thislen;
+                block->free -= thislen;
+                s += thislen;
+                len -= thislen;
+            }
+        }
+
+        if (len) { /* First block to allocate, or need another block. */
+            int numblocks;
+
+            block = zmalloc(sizeof(*block));
+            block->free = AOF_RW_BUF_BLOCK_SIZE;
+            block->used = 0;
+            listAddNodeTail(server.aof_rewrite_buf_blocks,block);
+
+            /* Log every time we cross more 10 or 100 blocks, respectively
+             * as a notice or warning. */
+            numblocks = listLength(server.aof_rewrite_buf_blocks);
+            if (((numblocks+1) % 10) == 0) {
+                int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :
+                                                         REDIS_NOTICE;
+                redisLog(level,"Background AOF buffer size: %lu MB",
+                    aofRewriteBufferSize()/(1024*1024));
+            }
+        }
+    }
+}
+
+/* Write the buffer (possibly composed of multiple blocks) into the specified
+ * fd. If no short write or any other error happens -1 is returned,
+ * otherwise the number of bytes written is returned. */
+ssize_t aofRewriteBufferWrite(int fd) {
+    listNode *ln;
+    listIter li;
+    ssize_t count = 0;
+
+    listRewind(server.aof_rewrite_buf_blocks,&li);
+    while((ln = listNext(&li))) {
+        aofrwblock *block = listNodeValue(ln);
+        ssize_t nwritten;
+
+        if (block->used) {
+            nwritten = write(fd,block->buf,block->used);
+            if (nwritten != block->used) {
+                if (nwritten == 0) errno = EIO;
+                return -1;
+            }
+            count += nwritten;
+        }
+    }
+    return count;
+}
+
 /* ----------------------------------------------------------------------------
  * AOF file implementation
  * ------------------------------------------------------------------------- */
@@ -42,8 +151,7 @@ void stopAppendOnly(void) {
         if (kill(server.aof_child_pid,SIGKILL) != -1)
             wait3(&statloc,0,NULL);
         /* reset the buffer accumulating changes while the child saves */
-        sdsfree(server.aof_rewrite_buf);
-        server.aof_rewrite_buf = sdsempty();
+        aofRewriteBufferReset();
         aofRemoveTempFile(server.aof_child_pid);
         server.aof_child_pid = -1;
     }
@@ -281,7 +389,7 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
      * in a buffer, so that when the child process will do its work we
      * can append the differences to the new append only file. */
     if (server.aof_child_pid != -1)
-        server.aof_rewrite_buf = sdscatlen(server.aof_rewrite_buf,buf,sdslen(buf));
+        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
 
     sdsfree(buf);
 }
@@ -885,7 +993,6 @@ void aofUpdateCurrentSize(void) {
 void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
     if (!bysignal && exitcode == 0) {
         int newfd, oldfd;
-        int nwritten;
         char tmpfile[256];
         long long now = ustime();
 
@@ -903,21 +1010,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
             goto cleanup;
         }
 
-        nwritten = write(newfd,server.aof_rewrite_buf,sdslen(server.aof_rewrite_buf));
-        if (nwritten != (signed)sdslen(server.aof_rewrite_buf)) {
-            if (nwritten == -1) {
-                redisLog(REDIS_WARNING,
-                    "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
-            } else {
-                redisLog(REDIS_WARNING,
-                    "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
-            }
+        if (aofRewriteBufferWrite(newfd) == -1) {
+            redisLog(REDIS_WARNING,
+                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
             close(newfd);
             goto cleanup;
         }
 
         redisLog(REDIS_NOTICE,
-            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);
+            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());
 
         /* The only remaining thing to do is to rename the temporary file to
          * the configured file and switch the file descriptor used to do AOF
@@ -1009,8 +1110,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
     }
 
 cleanup:
-    sdsfree(server.aof_rewrite_buf);
-    server.aof_rewrite_buf = sdsempty();
+    aofRewriteBufferReset();
     aofRemoveTempFile(server.aof_child_pid);
     server.aof_child_pid = -1;
     /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
index e12d27cc339c35bd96c8c4ce0ac1b7b6341035b0..73d485a168156e6653eb7aa093a9acb1547675f7 100644 (file)
@@ -1279,7 +1279,7 @@ void initServer() {
     server.cronloops = 0;
     server.rdb_child_pid = -1;
     server.aof_child_pid = -1;
-    server.aof_rewrite_buf = sdsempty();
+    aofRewriteBufferReset();
     server.aof_buf = sdsempty();
     server.lastsave = time(NULL);
     server.dirty = 0;
@@ -2134,7 +2134,7 @@ int freeMemoryIfNeeded(void) {
     }
     if (server.aof_state != REDIS_AOF_OFF) {
         mem_used -= sdslen(server.aof_buf);
-        mem_used -= sdslen(server.aof_rewrite_buf);
+        mem_used -= aofRewriteBufferSize();
     }
 
     /* Check if we are over the memory limit. */
index 895241606617494d0c2cdf18d6e91e09c781704e..5d11799ab848c7eca7f8284d654c9f3ada05f9f4 100644 (file)
@@ -505,7 +505,7 @@ struct redisServer {
     off_t aof_current_size;         /* AOF current size. */
     int aof_rewrite_scheduled;      /* Rewrite once BGSAVE terminates. */
     pid_t aof_child_pid;            /* PID if rewriting process */
-    sds aof_rewrite_buf; /* buffer taken by parent during oppend only rewrite */
+    list *aof_rewrite_buf_blocks;   /* Hold changes during an AOF rewrite. */
     sds aof_buf;      /* AOF buffer, written before entering the event loop */
     int aof_fd;       /* File descriptor of currently selected AOF file */
     int aof_selected_db; /* Currently selected DB in AOF */
@@ -843,6 +843,8 @@ int loadAppendOnlyFile(char *filename);
 void stopAppendOnly(void);
 int startAppendOnly(void);
 void backgroundRewriteDoneHandler(int exitcode, int bysignal);
+void aofRewriteBufferReset(void);
+unsigned long aofRewriteBufferSize(void);
 
 /* Sorted sets data type */