]> git.saurik.com Git - redis.git/blobdiff - src/aof.c
Test: more MIGRATE tests.
[redis.git] / src / aof.c
index b1f3c69371b996c9bfe220a9d5928eccf8c48775..bc92cdb98b9c3a5b897bd07f808cd68098635989 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -1,3 +1,32 @@
+/*
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *   * Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   * Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *   * Neither the name of Redis nor the names of its contributors may be used
+ *     to endorse or promote products derived from this software without
+ *     specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
 #include "redis.h"
 #include "bio.h"
 #include "rio.h"
 
 void aofUpdateCurrentSize(void);
 
+/* ----------------------------------------------------------------------------
+ * AOF rewrite buffer implementation.
+ *
+ * The following code implement a simple buffer used in order to accumulate
+ * changes while the background process is rewriting the AOF file.
+ *
+ * We only need to append, but can't just use realloc with a large block
+ * because 'huge' reallocs are not always handled as one could expect
+ * (via remapping of pages at OS level) but may involve copying data.
+ *
+ * For this reason we use a list of blocks, every block is
+ * AOF_RW_BUF_BLOCK_SIZE bytes.
+ * ------------------------------------------------------------------------- */
+
+#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */
+
+typedef struct aofrwblock {
+    unsigned long used, free;
+    char buf[AOF_RW_BUF_BLOCK_SIZE];
+} aofrwblock;
+
+/* This function free the old AOF rewrite buffer if needed, and initialize
+ * a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL
+ * so can be used for the first initialization as well. */
+void aofRewriteBufferReset(void) {
+    if (server.aof_rewrite_buf_blocks)
+        listRelease(server.aof_rewrite_buf_blocks);
+
+    server.aof_rewrite_buf_blocks = listCreate();
+    listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree);
+}
+
+/* Return the current size of the AOF rerwite buffer. */
+unsigned long aofRewriteBufferSize(void) {
+    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
+    aofrwblock *block = ln ? ln->value : NULL;
+
+    if (block == NULL) return 0;
+    unsigned long size =
+        (listLength(server.aof_rewrite_buf_blocks)-1) * AOF_RW_BUF_BLOCK_SIZE;
+    size += block->used;
+    return size;
+}
+
+/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
+void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
+    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
+    aofrwblock *block = ln ? ln->value : NULL;
+
+    while(len) {
+        /* If we already got at least an allocated block, try appending
+         * at least some piece into it. */
+        if (block) {
+            unsigned long thislen = (block->free < len) ? block->free : len;
+            if (thislen) {  /* The current block is not already full. */
+                memcpy(block->buf+block->used, s, thislen);
+                block->used += thislen;
+                block->free -= thislen;
+                s += thislen;
+                len -= thislen;
+            }
+        }
+
+        if (len) { /* First block to allocate, or need another block. */
+            int numblocks;
+
+            block = zmalloc(sizeof(*block));
+            block->free = AOF_RW_BUF_BLOCK_SIZE;
+            block->used = 0;
+            listAddNodeTail(server.aof_rewrite_buf_blocks,block);
+
+            /* Log every time we cross more 10 or 100 blocks, respectively
+             * as a notice or warning. */
+            numblocks = listLength(server.aof_rewrite_buf_blocks);
+            if (((numblocks+1) % 10) == 0) {
+                int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING :
+                                                         REDIS_NOTICE;
+                redisLog(level,"Background AOF buffer size: %lu MB",
+                    aofRewriteBufferSize()/(1024*1024));
+            }
+        }
+    }
+}
+
+/* Write the buffer (possibly composed of multiple blocks) into the specified
+ * fd. If no short write or any other error happens -1 is returned,
+ * otherwise the number of bytes written is returned. */
+ssize_t aofRewriteBufferWrite(int fd) {
+    listNode *ln;
+    listIter li;
+    ssize_t count = 0;
+
+    listRewind(server.aof_rewrite_buf_blocks,&li);
+    while((ln = listNext(&li))) {
+        aofrwblock *block = listNodeValue(ln);
+        ssize_t nwritten;
+
+        if (block->used) {
+            nwritten = write(fd,block->buf,block->used);
+            if (nwritten != block->used) {
+                if (nwritten == 0) errno = EIO;
+                return -1;
+            }
+            count += nwritten;
+        }
+    }
+    return count;
+}
+
+/* ----------------------------------------------------------------------------
+ * AOF file implementation
+ * ------------------------------------------------------------------------- */
+
+/* Starts a background task that performs fsync() against the specified
+ * file descriptor (the one of the AOF file) in another thread. */
 void aof_background_fsync(int fd) {
     bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
 }
@@ -31,20 +175,22 @@ void stopAppendOnly(void) {
     if (server.aof_child_pid != -1) {
         int statloc;
 
+        redisLog(REDIS_NOTICE,"Killing running AOF rewrite child: %ld",
+            (long) server.aof_child_pid);
         if (kill(server.aof_child_pid,SIGKILL) != -1)
             wait3(&statloc,0,NULL);
         /* reset the buffer accumulating changes while the child saves */
-        sdsfree(server.aof_rewrite_buf);
-        server.aof_rewrite_buf = sdsempty();
+        aofRewriteBufferReset();
         aofRemoveTempFile(server.aof_child_pid);
         server.aof_child_pid = -1;
+        server.aof_rewrite_time_start = -1;
     }
 }
 
 /* Called when the user switches from "appendonly no" to "appendonly yes"
  * at runtime using the CONFIG command. */
 int startAppendOnly(void) {
-    server.aof_last_fsync = time(NULL);
+    server.aof_last_fsync = server.unixtime;
     server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
     redisAssert(server.aof_state == REDIS_AOF_OFF);
     if (server.aof_fd == -1) {
@@ -106,6 +252,7 @@ void flushAppendOnlyFile(int force) {
             }
             /* Otherwise fall trough, and go write since we can't wait
              * over two seconds. */
+            server.aof_delayed_fsync++;
             redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
         }
     }
@@ -126,7 +273,19 @@ void flushAppendOnlyFile(int force) {
         if (nwritten == -1) {
             redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
         } else {
-            redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
+            redisLog(REDIS_WARNING,"Exiting on short write while writing to "
+                                   "the append-only file: %s (nwritten=%ld, "
+                                   "expected=%ld)",
+                                   strerror(errno),
+                                   (long)nwritten,
+                                   (long)sdslen(server.aof_buf));
+
+            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
+                redisLog(REDIS_WARNING, "Could not remove short write "
+                         "from the append-only file.  Redis may refuse "
+                         "to load the AOF the next time it starts.  "
+                         "ftruncate: %s", strerror(errno));
+            }
         }
         exit(1);
     }
@@ -267,11 +426,15 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
      * in a buffer, so that when the child process will do its work we
      * can append the differences to the new append only file. */
     if (server.aof_child_pid != -1)
-        server.aof_rewrite_buf = sdscatlen(server.aof_rewrite_buf,buf,sdslen(buf));
+        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
 
     sdsfree(buf);
 }
 
+/* ----------------------------------------------------------------------------
+ * AOF loading
+ * ------------------------------------------------------------------------- */
+
 /* In Redis commands are always executed in the context of a client, so in
  * order to load the append only file we need to create a fake client. */
 struct redisClient *createFakeClient(void) {
@@ -280,6 +443,7 @@ struct redisClient *createFakeClient(void) {
     selectDb(c,0);
     c->fd = -1;
     c->querybuf = sdsempty();
+    c->querybuf_peak = 0;
     c->argc = 0;
     c->argv = NULL;
     c->bufpos = 0;
@@ -288,6 +452,8 @@ struct redisClient *createFakeClient(void) {
      * so that Redis will not try to send replies to this client. */
     c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
     c->reply = listCreate();
+    c->reply_bytes = 0;
+    c->obuf_soft_limit_reached_time = 0;
     c->watched_keys = listCreate();
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
@@ -413,6 +579,10 @@ fmterr:
     exit(1);
 }
 
+/* ----------------------------------------------------------------------------
+ * AOF rewrite
+ * ------------------------------------------------------------------------- */
+
 /* Delegate writing an object to writing a bulk string or bulk long long.
  * This is not placed in rio.c since that adds the redis.h dependency. */
 int rioWriteBulkObject(rio *r, robj *obj) {
@@ -600,53 +770,61 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
     return 1;
 }
 
+/* Write either the key or the value of the currently selected item of an hash.
+ * The 'hi' argument passes a valid Redis hash iterator.
+ * The 'what' filed specifies if to write a key or a value and can be
+ * either REDIS_HASH_KEY or REDIS_HASH_VALUE.
+ *
+ * The function returns 0 on error, non-zero on success. */
+static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
+    if (hi->encoding == REDIS_ENCODING_ZIPLIST) {
+        unsigned char *vstr = NULL;
+        unsigned int vlen = UINT_MAX;
+        long long vll = LLONG_MAX;
+
+        hashTypeCurrentFromZiplist(hi, what, &vstr, &vlen, &vll);
+        if (vstr) {
+            return rioWriteBulkString(r, (char*)vstr, vlen);
+        } else {
+            return rioWriteBulkLongLong(r, vll);
+        }
+
+    } else if (hi->encoding == REDIS_ENCODING_HT) {
+        robj *value;
+
+        hashTypeCurrentFromHashTable(hi, what, &value);
+        return rioWriteBulkObject(r, value);
+    }
+
+    redisPanic("Unknown hash encoding");
+    return 0;
+}
+
 /* Emit the commands needed to rebuild a hash object.
  * The function returns 0 on error, 1 on success. */
 int rewriteHashObject(rio *r, robj *key, robj *o) {
+    hashTypeIterator *hi;
     long long count = 0, items = hashTypeLength(o);
 
-    if (o->encoding == REDIS_ENCODING_ZIPMAP) {
-        unsigned char *p = zipmapRewind(o->ptr);
-        unsigned char *field, *val;
-        unsigned int flen, vlen;
+    hi = hashTypeInitIterator(o);
+    while (hashTypeNext(hi) != REDIS_ERR) {
+        if (count == 0) {
+            int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
+                REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
 
-        while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
-            if (count == 0) {
-                int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
-                    REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
-
-                if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
-                if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
-                if (rioWriteBulkObject(r,key) == 0) return 0;
-            }
-            if (rioWriteBulkString(r,(char*)field,flen) == 0) return 0;
-            if (rioWriteBulkString(r,(char*)val,vlen) == 0) return 0;
-            if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
-            items--;
+            if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
+            if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
+            if (rioWriteBulkObject(r,key) == 0) return 0;
         }
-    } else {
-        dictIterator *di = dictGetIterator(o->ptr);
-        dictEntry *de;
 
-        while((de = dictNext(di)) != NULL) {
-            robj *field = dictGetKey(de);
-            robj *val = dictGetVal(de);
+        if (rioWriteHashIteratorCursor(r, hi, REDIS_HASH_KEY) == 0) return 0;
+        if (rioWriteHashIteratorCursor(r, hi, REDIS_HASH_VALUE) == 0) return 0;
+        if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
+        items--;
+    }
 
-            if (count == 0) {
-                int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
-                    REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
+    hashTypeReleaseIterator(hi);
 
-                if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
-                if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
-                if (rioWriteBulkObject(r,key) == 0) return 0;
-            }
-            if (rioWriteBulkObject(r,field) == 0) return 0;
-            if (rioWriteBulkObject(r,val) == 0) return 0;
-            if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
-            items--;
-        }
-        dictReleaseIterator(di);
-    }
     return 1;
 }
 
@@ -671,7 +849,7 @@ int rewriteAppendOnlyFile(char *filename) {
     snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
     fp = fopen(tmpfile,"w");
     if (!fp) {
-        redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
+        redisLog(REDIS_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
         return REDIS_ERR;
     }
 
@@ -784,9 +962,9 @@ int rewriteAppendOnlyFileBackground(void) {
         if (server.sofd > 0) close(server.sofd);
         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
         if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
-            _exit(0);
+            exitFromChild(0);
         } else {
-            _exit(1);
+            exitFromChild(1);
         }
     } else {
         /* Parent */
@@ -800,6 +978,7 @@ int rewriteAppendOnlyFileBackground(void) {
         redisLog(REDIS_NOTICE,
             "Background append only file rewriting started by pid %d",childpid);
         server.aof_rewrite_scheduled = 0;
+        server.aof_rewrite_time_start = time(NULL);
         server.aof_child_pid = childpid;
         updateDictResizePolicy();
         /* We set appendseldb to -1 in order to force the next call to the
@@ -835,12 +1014,12 @@ void aofRemoveTempFile(pid_t childpid) {
 /* Update the server.aof_current_size filed explicitly using stat(2)
  * to check the size of the file. This is useful after a rewrite or after
  * a restart, normally the size is updated just adding the write length
- * to the current lenght, that is much faster. */
+ * to the current length, that is much faster. */
 void aofUpdateCurrentSize(void) {
     struct redis_stat sb;
 
     if (redis_fstat(server.aof_fd,&sb) == -1) {
-        redisLog(REDIS_WARNING,"Unable to check the AOF length: %s",
+        redisLog(REDIS_WARNING,"Unable to obtain the AOF file length. stat: %s",
             strerror(errno));
     } else {
         server.aof_current_size = sb.st_size;
@@ -852,7 +1031,6 @@ void aofUpdateCurrentSize(void) {
 void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
     if (!bysignal && exitcode == 0) {
         int newfd, oldfd;
-        int nwritten;
         char tmpfile[256];
         long long now = ustime();
 
@@ -870,21 +1048,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
             goto cleanup;
         }
 
-        nwritten = write(newfd,server.aof_rewrite_buf,sdslen(server.aof_rewrite_buf));
-        if (nwritten != (signed)sdslen(server.aof_rewrite_buf)) {
-            if (nwritten == -1) {
-                redisLog(REDIS_WARNING,
-                    "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
-            } else {
-                redisLog(REDIS_WARNING,
-                    "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
-            }
+        if (aofRewriteBufferWrite(newfd) == -1) {
+            redisLog(REDIS_WARNING,
+                "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
             close(newfd);
             goto cleanup;
         }
 
         redisLog(REDIS_NOTICE,
-            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);
+            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize());
 
         /* The only remaining thing to do is to rename the temporary file to
          * the configured file and switch the file descriptor used to do AOF
@@ -929,7 +1101,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
          * it exists, because we reference it with "oldfd". */
         if (rename(tmpfile,server.aof_filename) == -1) {
             redisLog(REDIS_WARNING,
-                "Error trying to rename the temporary AOF: %s", strerror(errno));
+                "Error trying to rename the temporary AOF file: %s", strerror(errno));
             close(newfd);
             if (oldfd != -1) close(oldfd);
             goto cleanup;
@@ -957,7 +1129,9 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
             server.aof_buf = sdsempty();
         }
 
-        redisLog(REDIS_NOTICE, "Background AOF rewrite successful");
+        server.aof_lastbgrewrite_status = REDIS_OK;
+
+        redisLog(REDIS_NOTICE, "Background AOF rewrite finished successfully");
         /* Change state from WAIT_REWRITE to ON if needed */
         if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
             server.aof_state = REDIS_AOF_ON;
@@ -968,18 +1142,23 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
         redisLog(REDIS_VERBOSE,
             "Background AOF rewrite signal handler took %lldus", ustime()-now);
     } else if (!bysignal && exitcode != 0) {
+        server.aof_lastbgrewrite_status = REDIS_ERR;
+
         redisLog(REDIS_WARNING,
             "Background AOF rewrite terminated with error");
     } else {
+        server.aof_lastbgrewrite_status = REDIS_ERR;
+
         redisLog(REDIS_WARNING,
             "Background AOF rewrite terminated by signal %d", bysignal);
     }
 
 cleanup:
-    sdsfree(server.aof_rewrite_buf);
-    server.aof_rewrite_buf = sdsempty();
+    aofRewriteBufferReset();
     aofRemoveTempFile(server.aof_child_pid);
     server.aof_child_pid = -1;
+    server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
+    server.aof_rewrite_time_start = -1;
     /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
     if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
         server.aof_rewrite_scheduled = 1;