]> git.saurik.com Git - redis.git/blobdiff - src/aof.c
Allow Pub/Sub in contexts where other commands are blocked.
[redis.git] / src / aof.c
index 742af9052f017b3c34a165e524c0a3c2f90a8476..09bfb04924520db75201a66382cf409f24a5b088 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
 
 void aofUpdateCurrentSize(void);
 
+/* ----------------------------------------------------------------------------
+ * AOF rewrite buffer implementation.
+ *
+ * The following code implement a simple buffer used in order to accumulate
+ * changes while the background process is rewriting the AOF file.
+ *
+ * We only need to append, but can't just use realloc with a large block
+ * because 'huge' reallocs are not always handled as one could expect
+ * (via remapping of pages at OS level) but may involve copying data.
+ *
+ * For this reason we use a list of blocks, every block is
+ * AOF_RW_BUF_BLOCK_SIZE bytes.
+ * ------------------------------------------------------------------------- */
+
+#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */
+
+typedef struct aofrwblock {
+    unsigned long used, free;
+    char buf[AOF_RW_BUF_BLOCK_SIZE];
+} aofrwblock;
+
+/* This function free the old AOF rewrite buffer if needed, and initialize
+ * a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL
+ * so can be used for the first initialization as well. */
+void aofRewriteBufferReset(void) {
+    if (server.aof_rewrite_buf_blocks)
+        listRelease(server.aof_rewrite_buf_blocks);
+
+    server.aof_rewrite_buf_blocks = listCreate();
+    listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
+}
+
+/* Return the current size of the AOF rerwite buffer. */
+unsigned long aofRewriteBufferSize(void) {
+    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
+    aofrwblock *block = ln ? ln->value : NULL;
+
+    if (block == NULL) return 0;
+    unsigned long size =
+        (listLength(server.aof_rewrite_buf_blocks)-1) * AOF_RW_BUF_BLOCK_SIZE;
+    size += block->used;
+    return size;
+}
+
+/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
+void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
+    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
+    aofrwblock *block = ln ? ln->value : NULL;
+
+    while(len) {
+        /* If we already got at least an allocated block, try appending
+         * at least some piece into it. */
+        if (block) {
+            unsigned long thislen = (block->free < len) ? block->free : len;
+            if (thislen) {  /* The current block is not already full. */
+                memcpy(block->buf+block->used, s, thislen);
+                block->used += thislen;
+                block->free -= thislen;
+                s += thislen;
+                len -= thislen;
+            }
+        }
+
+        if (len) { /* First block to allocate, or need another block. */
+            int numblocks;
+
+            block = zmalloc(sizeof(*block));
+            block->free = AOF_RW_BUF_BLOCK_SIZE;
+            block->used = 0;
+            listAddNodeTail(server.aof_rewrite_buf_blocks,block);
+
+            /* Log every time we cross more 10 or 100 blocks, respectively
+             * as a notice or warning. */
+            numblocks = listLength(server.aof_rewrite_buf_blocks);
+            if (((numblocks+1) % 10) == 0) {
+                int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :
+                                                         REDIS_NOTICE;
+                redisLog(level,"Background AOF buffer size: %lu MB",
+                    aofRewriteBufferSize()/(1024*1024));
+            }
+        }
+    }
+}
+
+/* Write the buffer (possibly composed of multiple blocks) into the specified
+ * fd. If no short write or any other error happens -1 is returned,
+ * otherwise the number of bytes written is returned. */
+ssize_t aofRewriteBufferWrite(int fd) {
+    listNode *ln;
+    listIter li;
+    ssize_t count = 0;
+
+    listRewind(server.aof_rewrite_buf_blocks,&li);
+    while((ln = listNext(&li))) {
+        aofrwblock *block = listNodeValue(ln);
+        ssize_t nwritten;
+
+        if (block->used) {
+            nwritten = write(fd,block->buf,block->used);
+            if (nwritten != block->used) {
+                if (nwritten == 0) errno = EIO;
+                return -1;
+            }
+            count += nwritten;
+        }
+    }
+    return count;
+}
+
+/* ----------------------------------------------------------------------------
+ * AOF file implementation
+ * ------------------------------------------------------------------------- */
+
+/* Starts a background task that performs fsync() against the specified
+ * file descriptor (the one of the AOF file) in another thread. */
 void aof_background_fsync(int fd) {
     bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
 }
@@ -36,17 +151,17 @@ void stopAppendOnly(void) {
         if (kill(server.aof_child_pid,SIGKILL) != -1)
             wait3(&statloc,0,NULL);
         /* reset the buffer accumulating changes while the child saves */
-        sdsfree(server.aof_rewrite_buf);
-        server.aof_rewrite_buf = sdsempty();
+        aofRewriteBufferReset();
         aofRemoveTempFile(server.aof_child_pid);
         server.aof_child_pid = -1;
+        server.aof_rewrite_time_start = -1;
     }
 }
 
 /* Called when the user switches from "appendonly no" to "appendonly yes"
  * at runtime using the CONFIG command. */
 int startAppendOnly(void) {
-    server.aof_last_fsync = time(NULL);
+    server.aof_last_fsync = server.unixtime;
     server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
     redisAssert(server.aof_state == REDIS_AOF_OFF);
     if (server.aof_fd == -1) {
@@ -108,6 +223,7 @@ void flushAppendOnlyFile(int force) {
             }
             /* Otherwise fall trough, and go write since we can't wait
              * over two seconds. */
+            server.aof_delayed_fsync++;
             redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
         }
     }
@@ -274,11 +390,15 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
      * in a buffer, so that when the child process will do its work we
      * can append the differences to the new append only file. */
     if (server.aof_child_pid != -1)
-        server.aof_rewrite_buf = sdscatlen(server.aof_rewrite_buf,buf,sdslen(buf));
+        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
 
     sdsfree(buf);
 }
 
+/* ----------------------------------------------------------------------------
+ * AOF loading
+ * ------------------------------------------------------------------------- */
+
 /* In Redis commands are always executed in the context of a client, so in
  * order to load the append only file we need to create a fake client. */
 struct redisClient *createFakeClient(void) {
@@ -287,6 +407,7 @@ struct redisClient *createFakeClient(void) {
     selectDb(c,0);
     c->fd = -1;
     c->querybuf = sdsempty();
+    c->querybuf_peak = 0;
     c->argc = 0;
     c->argv = NULL;
     c->bufpos = 0;
@@ -295,6 +416,8 @@ struct redisClient *createFakeClient(void) {
      * so that Redis will not try to send replies to this client. */
     c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
     c->reply = listCreate();
+    c->reply_bytes = 0;
+    c->obuf_soft_limit_reached_time = 0;
     c->watched_keys = listCreate();
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
@@ -420,6 +543,10 @@ fmterr:
     exit(1);
 }
 
+/* ----------------------------------------------------------------------------
+ * AOF rewrite
+ * ------------------------------------------------------------------------- */
+
 /* Delegate writing an object to writing a bulk string or bulk long long.
  * This is not placed in rio.c since that adds the redis.h dependency. */
 int rioWriteBulkObject(rio *r, robj *obj) {
@@ -607,6 +734,12 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
     return 1;
 }
 
+/* Write either the key or the value of the currently selected item of an hash.
+ * The 'hi' argument passes a valid Redis hash iterator.
+ * The 'what' filed specifies if to write a key or a value and can be
+ * either REDIS_HASH_KEY or REDIS_HASH_VALUE.
+ *
+ * The function returns 0 on error, non-zero on success. */
 static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
     if (hi->encoding == REDIS_ENCODING_ZIPLIST) {
         unsigned char *vstr = NULL;
@@ -793,9 +926,9 @@ int rewriteAppendOnlyFileBackground(void) {
         if (server.sofd > 0) close(server.sofd);
         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
         if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
-            _exit(0);
+            exitFromChild(0);
         } else {
-            _exit(1);
+            exitFromChild(1);
         }
     } else {
         /* Parent */
@@ -809,6 +942,7 @@ int rewriteAppendOnlyFileBackground(void) {
         redisLog(REDIS_NOTICE,
             "Background append only file rewriting started by pid %d",childpid);
         server.aof_rewrite_scheduled = 0;
+        server.aof_rewrite_time_start = time(NULL);
         server.aof_child_pid = childpid;
         updateDictResizePolicy();
         /* We set appendseldb to -1 in order to force the next call to the
@@ -844,7 +978,7 @@ void aofRemoveTempFile(pid_t childpid) {
 /* Update the server.aof_current_size filed explicitly using stat(2)
  * to check the size of the file. This is useful after a rewrite or after
  * a restart, normally the size is updated just adding the write length
- * to the current lenght, that is much faster. */
+ * to the current length, that is much faster. */
 void aofUpdateCurrentSize(void) {
     struct redis_stat sb;
 
@@ -861,7 +995,6 @@ void aofUpdateCurrentSize(void) {
 void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
     if (!bysignal && exitcode == 0) {
         int newfd, oldfd;
-        int nwritten;
         char tmpfile[256];
         long long now = ustime();
 
@@ -879,21 +1012,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
             goto cleanup;
         }
 
-        nwritten = write(newfd,server.aof_rewrite_buf,sdslen(server.aof_rewrite_buf));
-        if (nwritten != (signed)sdslen(server.aof_rewrite_buf)) {
-            if (nwritten == -1) {
-                redisLog(REDIS_WARNING,
-                    "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
-            } else {
-                redisLog(REDIS_WARNING,
-                    "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
-            }
+        if (aofRewriteBufferWrite(newfd) == -1) {
+            redisLog(REDIS_WARNING,
+                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
             close(newfd);
             goto cleanup;
         }
 
         redisLog(REDIS_NOTICE,
-            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);
+            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());
 
         /* The only remaining thing to do is to rename the temporary file to
          * the configured file and switch the file descriptor used to do AOF
@@ -985,10 +1112,11 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
     }
 
 cleanup:
-    sdsfree(server.aof_rewrite_buf);
-    server.aof_rewrite_buf = sdsempty();
+    aofRewriteBufferReset();
     aofRemoveTempFile(server.aof_child_pid);
     server.aof_child_pid = -1;
+    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
+    server.aof_rewrite_time_start = -1;
     /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
     if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
         server.aof_rewrite_scheduled = 1;