X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/dbb27a0a90ca3800f5be1d8170e404b9e7b9bc44..f0d962c0eca870e3fd2a4529de321db217c2e9fe:/src/aof.c?ds=inline diff --git a/src/aof.c b/src/aof.c index 0bdcd9ed..607bdce8 100644 --- a/src/aof.c +++ b/src/aof.c @@ -12,6 +12,121 @@ void aofUpdateCurrentSize(void); +/* ---------------------------------------------------------------------------- + * AOF rewrite buffer implementation. + * + * The following code implement a simple buffer used in order to accumulate + * changes while the background process is rewriting the AOF file. + * + * We only need to append, but can't just use realloc with a large block + * because 'huge' reallocs are not always handled as one could expect + * (via remapping of pages at OS level) but may involve copying data. + * + * For this reason we use a list of blocks, every block is + * AOF_RW_BUF_BLOCK_SIZE bytes. + * ------------------------------------------------------------------------- */ + +#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10) /* 10 MB per block */ + +typedef struct aofrwblock { + unsigned long used, free; + char buf[AOF_RW_BUF_BLOCK_SIZE]; +} aofrwblock; + +/* This function free the old AOF rewrite buffer if needed, and initialize + * a fresh new one. It tests for server.aof_rewrite_buf_blocks equal to NULL + * so can be used for the first initialization as well. */ +void aofRewriteBufferReset(void) { + if (server.aof_rewrite_buf_blocks) + listRelease(server.aof_rewrite_buf_blocks); + + server.aof_rewrite_buf_blocks = listCreate(); + listSetFreeMethod(server.aof_rewrite_buf_blocks,zfree); +} + +/* Return the current size of the AOF rerwite buffer. */ +unsigned long aofRewriteBufferSize(void) { + listNode *ln = listLast(server.aof_rewrite_buf_blocks); + aofrwblock *block = ln ? ln->value : NULL; + + if (block == NULL) return 0; + unsigned long size = + (listLength(server.aof_rewrite_buf_blocks)-1) * AOF_RW_BUF_BLOCK_SIZE; + size += block->used; + return size; +} + +/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ +void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { + listNode *ln = listLast(server.aof_rewrite_buf_blocks); + aofrwblock *block = ln ? ln->value : NULL; + + while(len) { + /* If we already got at least an allocated block, try appending + * at least some piece into it. */ + if (block) { + unsigned long thislen = (block->free < len) ? block->free : len; + if (thislen) { /* The current block is not already full. */ + memcpy(block->buf+block->used, s, thislen); + block->used += thislen; + block->free -= thislen; + s += thislen; + len -= thislen; + } + } + + if (len) { /* First block to allocate, or need another block. */ + int numblocks; + + block = zmalloc(sizeof(*block)); + block->free = AOF_RW_BUF_BLOCK_SIZE; + block->used = 0; + listAddNodeTail(server.aof_rewrite_buf_blocks,block); + + /* Log every time we cross more 10 or 100 blocks, respectively + * as a notice or warning. */ + numblocks = listLength(server.aof_rewrite_buf_blocks); + if (((numblocks+1) % 10) == 0) { + int level = ((numblocks+1) % 100) == 0 ? REDIS_WARNING : + REDIS_NOTICE; + redisLog(level,"Background AOF buffer size: %lu MB", + aofRewriteBufferSize()/(1024*1024)); + } + } + } +} + +/* Write the buffer (possibly composed of multiple blocks) into the specified + * fd. If no short write or any other error happens -1 is returned, + * otherwise the number of bytes written is returned. */ +ssize_t aofRewriteBufferWrite(int fd) { + listNode *ln; + listIter li; + ssize_t count = 0; + + listRewind(server.aof_rewrite_buf_blocks,&li); + while((ln = listNext(&li))) { + aofrwblock *block = listNodeValue(ln); + ssize_t nwritten; + + if (block->used) { + nwritten = write(fd,block->buf,block->used); + if (nwritten != block->used) { + if (nwritten == 0) errno = EIO; + return -1; + } + count += nwritten; + } + } + return count; +} + +/* ---------------------------------------------------------------------------- + * AOF file implementation + * ------------------------------------------------------------------------- */ + +/* Starts a background task that performs fsync() against the specified + * file descriptor (the one of the AOF file) in another thread. */ void aof_background_fsync(int fd) { bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL); } @@ -36,8 +151,7 @@ void stopAppendOnly(void) { if (kill(server.aof_child_pid,SIGKILL) != -1) wait3(&statloc,0,NULL); /* reset the buffer accumulating changes while the child saves */ - sdsfree(server.aof_rewrite_buf); - server.aof_rewrite_buf = sdsempty(); + aofRewriteBufferReset(); aofRemoveTempFile(server.aof_child_pid); server.aof_child_pid = -1; } @@ -46,7 +160,7 @@ void stopAppendOnly(void) { /* Called when the user switches from "appendonly no" to "appendonly yes" * at runtime using the CONFIG command. */ int startAppendOnly(void) { - server.aof_last_fsync = time(NULL); + server.aof_last_fsync = server.unixtime; server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644); redisAssert(server.aof_state == REDIS_AOF_OFF); if (server.aof_fd == -1) { @@ -108,6 +222,7 @@ void flushAppendOnlyFile(int force) { } /* Otherwise fall trough, and go write since we can't wait * over two seconds. */ + server.aof_delayed_fsync++; redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); } } @@ -274,11 +389,15 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a * in a buffer, so that when the child process will do its work we * can append the differences to the new append only file. */ if (server.aof_child_pid != -1) - server.aof_rewrite_buf = sdscatlen(server.aof_rewrite_buf,buf,sdslen(buf)); + aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); sdsfree(buf); } +/* ---------------------------------------------------------------------------- + * AOF loading + * ------------------------------------------------------------------------- */ + /* In Redis commands are always executed in the context of a client, so in * order to load the append only file we need to create a fake client. */ struct redisClient *createFakeClient(void) { @@ -287,6 +406,7 @@ struct redisClient *createFakeClient(void) { selectDb(c,0); c->fd = -1; c->querybuf = sdsempty(); + c->querybuf_peak = 0; c->argc = 0; c->argv = NULL; c->bufpos = 0; @@ -422,6 +542,10 @@ fmterr: exit(1); } +/* ---------------------------------------------------------------------------- + * AOF rewrite + * ------------------------------------------------------------------------- */ + /* Delegate writing an object to writing a bulk string or bulk long long. * This is not placed in rio.c since that adds the redis.h dependency. */ int rioWriteBulkObject(rio *r, robj *obj) { @@ -609,53 +733,61 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) { return 1; } +/* Write either the key or the value of the currently selected item of an hash. + * The 'hi' argument passes a valid Redis hash iterator. + * The 'what' filed specifies if to write a key or a value and can be + * either REDIS_HASH_KEY or REDIS_HASH_VALUE. + * + * The function returns 0 on error, non-zero on success. */ +static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { + if (hi->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; + + hashTypeCurrentFromZiplist(hi, what, &vstr, &vlen, &vll); + if (vstr) { + return rioWriteBulkString(r, (char*)vstr, vlen); + } else { + return rioWriteBulkLongLong(r, vll); + } + + } else if (hi->encoding == REDIS_ENCODING_HT) { + robj *value; + + hashTypeCurrentFromHashTable(hi, what, &value); + return rioWriteBulkObject(r, value); + } + + redisPanic("Unknown hash encoding"); + return 0; +} + /* Emit the commands needed to rebuild a hash object. * The function returns 0 on error, 1 on success. */ int rewriteHashObject(rio *r, robj *key, robj *o) { + hashTypeIterator *hi; long long count = 0, items = hashTypeLength(o); - if (o->encoding == REDIS_ENCODING_ZIPMAP) { - unsigned char *p = zipmapRewind(o->ptr); - unsigned char *field, *val; - unsigned int flen, vlen; - - while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { - if (count == 0) { - int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ? - REDIS_AOF_REWRITE_ITEMS_PER_CMD : items; + hi = hashTypeInitIterator(o); + while (hashTypeNext(hi) != REDIS_ERR) { + if (count == 0) { + int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ? + REDIS_AOF_REWRITE_ITEMS_PER_CMD : items; - if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; - if (rioWriteBulkString(r,"HMSET",5) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; - } - if (rioWriteBulkString(r,(char*)field,flen) == 0) return 0; - if (rioWriteBulkString(r,(char*)val,vlen) == 0) return 0; - if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0; - items--; + if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; + if (rioWriteBulkString(r,"HMSET",5) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; } - } else { - dictIterator *di = dictGetIterator(o->ptr); - dictEntry *de; - while((de = dictNext(di)) != NULL) { - robj *field = dictGetKey(de); - robj *val = dictGetVal(de); + if (rioWriteHashIteratorCursor(r, hi, REDIS_HASH_KEY) == 0) return 0; + if (rioWriteHashIteratorCursor(r, hi, REDIS_HASH_VALUE) == 0) return 0; + if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0; + items--; + } - if (count == 0) { - int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ? - REDIS_AOF_REWRITE_ITEMS_PER_CMD : items; + hashTypeReleaseIterator(hi); - if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; - if (rioWriteBulkString(r,"HMSET",5) == 0) return 0; - if (rioWriteBulkObject(r,key) == 0) return 0; - } - if (rioWriteBulkObject(r,field) == 0) return 0; - if (rioWriteBulkObject(r,val) == 0) return 0; - if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0; - items--; - } - dictReleaseIterator(di); - } return 1; } @@ -793,9 +925,9 @@ int rewriteAppendOnlyFileBackground(void) { if (server.sofd > 0) close(server.sofd); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { - _exit(0); + exitFromChild(0); } else { - _exit(1); + exitFromChild(1); } } else { /* Parent */ @@ -861,7 +993,6 @@ void aofUpdateCurrentSize(void) { void backgroundRewriteDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { int newfd, oldfd; - int nwritten; char tmpfile[256]; long long now = ustime(); @@ -879,21 +1010,15 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { goto cleanup; } - nwritten = write(newfd,server.aof_rewrite_buf,sdslen(server.aof_rewrite_buf)); - if (nwritten != (signed)sdslen(server.aof_rewrite_buf)) { - if (nwritten == -1) { - redisLog(REDIS_WARNING, - "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); - } else { - redisLog(REDIS_WARNING, - "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); - } + if (aofRewriteBufferWrite(newfd) == -1) { + redisLog(REDIS_WARNING, + "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); close(newfd); goto cleanup; } redisLog(REDIS_NOTICE, - "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten); + "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", aofRewriteBufferSize()); /* The only remaining thing to do is to rename the temporary file to * the configured file and switch the file descriptor used to do AOF @@ -985,8 +1110,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { } cleanup: - sdsfree(server.aof_rewrite_buf); - server.aof_rewrite_buf = sdsempty(); + aofRewriteBufferReset(); aofRemoveTempFile(server.aof_child_pid); server.aof_child_pid = -1; /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */