X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/794e6ce14ff85f00b0d3e86365e9cc45a90d3bcf..087f41409052da35270417c85010c6739fd28200:/src/aof.c diff --git a/src/aof.c b/src/aof.c index 4a463bde..15a456f9 100644 --- a/src/aof.c +++ b/src/aof.c @@ -26,6 +26,7 @@ void stopAppendOnly(void) { server.appendfd = -1; server.appendseldb = -1; server.appendonly = 0; + server.aof_wait_rewrite = 0; /* rewrite operation in progress? kill it, wait child exit */ if (server.bgrewritechildpid != -1) { int statloc; @@ -35,6 +36,7 @@ void stopAppendOnly(void) { /* reset the buffer accumulating changes while the child saves */ sdsfree(server.bgrewritebuf); server.bgrewritebuf = sdsempty(); + aofRemoveTempFile(server.bgrewritechildpid); server.bgrewritechildpid = -1; } } @@ -46,15 +48,18 @@ int startAppendOnly(void) { server.lastfsync = time(NULL); server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644); if (server.appendfd == -1) { - redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno)); + redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't open the append only file: %s",strerror(errno)); return REDIS_ERR; } if (rewriteAppendOnlyFileBackground() == REDIS_ERR) { server.appendonly = 0; close(server.appendfd); - redisLog(REDIS_WARNING,"User tried turning on AOF with CONFIG SET but I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error."); + redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error."); return REDIS_ERR; } + /* We correctly switched on AOF, now wait for the rerwite to be complete + * in order to append data on disk. */ + server.aof_wait_rewrite = 1; return REDIS_OK; } @@ -254,8 +259,15 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a - * positive reply about the operation performed. */ - server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); + * positive reply about the operation performed. + * + * Note, we don't add stuff in the AOF buffer if aof_wait_rewrite is + * non zero, as this means we are starting with a new AOF and the + * current one is meaningless (this happens for instance after + * a slave resyncs with its master). */ + if (!server.aof_wait_rewrite) { + server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); + } /* If a background append only file rewriting is in progress we want to * accumulate the differences between the child DB and the current one @@ -422,8 +434,236 @@ int rioWriteBulkObject(rio *r, robj *obj) { } } +/* Emit the commands needed to rebuild a list object. + * The function returns 0 on error, 1 on success. */ +int rewriteListObject(rio *r, robj *key, robj *o) { + long long count = 0, items = listTypeLength(o); + + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = o->ptr; + unsigned char *p = ziplistIndex(zl,0); + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + while(ziplistGet(p,&vstr,&vlen,&vlong)) { + if (count == 0) { + int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? + REDIS_AOFREWRITE_ITEMS_PER_CMD : items; + + if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; + if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + } + if (vstr) { + if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0; + } else { + if (rioWriteBulkLongLong(r,vlong) == 0) return 0; + } + p = ziplistNext(zl,p); + if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; + items--; + } + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + list *list = o->ptr; + listNode *ln; + listIter li; + + listRewind(list,&li); + while((ln = listNext(&li))) { + robj *eleobj = listNodeValue(ln); + + if (count == 0) { + int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? + REDIS_AOFREWRITE_ITEMS_PER_CMD : items; + + if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; + if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + } + if (rioWriteBulkObject(r,eleobj) == 0) return 0; + if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; + items--; + } + } else { + redisPanic("Unknown list encoding"); + } + return 1; +} + +/* Emit the commands needed to rebuild a set object. + * The function returns 0 on error, 1 on success. */ +int rewriteSetObject(rio *r, robj *key, robj *o) { + long long count = 0, items = setTypeSize(o); + + if (o->encoding == REDIS_ENCODING_INTSET) { + int ii = 0; + int64_t llval; + + while(intsetGet(o->ptr,ii++,&llval)) { + if (count == 0) { + int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? + REDIS_AOFREWRITE_ITEMS_PER_CMD : items; + + if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; + if (rioWriteBulkString(r,"SADD",4) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + } + if (rioWriteBulkLongLong(r,llval) == 0) return 0; + if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; + items--; + } + } else if (o->encoding == REDIS_ENCODING_HT) { + dictIterator *di = dictGetIterator(o->ptr); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetKey(de); + if (count == 0) { + int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? + REDIS_AOFREWRITE_ITEMS_PER_CMD : items; + + if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0; + if (rioWriteBulkString(r,"SADD",4) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + } + if (rioWriteBulkObject(r,eleobj) == 0) return 0; + if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; + items--; + } + dictReleaseIterator(di); + } else { + redisPanic("Unknown set encoding"); + } + return 1; +} + +/* Emit the commands needed to rebuild a sorted set object. + * The function returns 0 on error, 1 on success. */ +int rewriteSortedSetObject(rio *r, robj *key, robj *o) { + long long count = 0, items = zsetLength(o); + + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *zl = o->ptr; + unsigned char *eptr, *sptr; + unsigned char *vstr; + unsigned int vlen; + long long vll; + double score; + + eptr = ziplistIndex(zl,0); + redisAssert(eptr != NULL); + sptr = ziplistNext(zl,eptr); + redisAssert(sptr != NULL); + + while (eptr != NULL) { + redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll)); + score = zzlGetScore(sptr); + + if (count == 0) { + int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? + REDIS_AOFREWRITE_ITEMS_PER_CMD : items; + + if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; + if (rioWriteBulkString(r,"ZADD",4) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + } + if (rioWriteBulkDouble(r,score) == 0) return 0; + if (vstr != NULL) { + if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0; + } else { + if (rioWriteBulkLongLong(r,vll) == 0) return 0; + } + zzlNext(zl,&eptr,&sptr); + if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; + items--; + } + } else if (o->encoding == REDIS_ENCODING_SKIPLIST) { + zset *zs = o->ptr; + dictIterator *di = dictGetIterator(zs->dict); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + robj *eleobj = dictGetKey(de); + double *score = dictGetVal(de); + + if (count == 0) { + int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? + REDIS_AOFREWRITE_ITEMS_PER_CMD : items; + + if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; + if (rioWriteBulkString(r,"ZADD",4) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + } + if (rioWriteBulkDouble(r,*score) == 0) return 0; + if (rioWriteBulkObject(r,eleobj) == 0) return 0; + if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; + items--; + } + dictReleaseIterator(di); + } else { + redisPanic("Unknown sorted zset encoding"); + } + return 1; +} + +/* Emit the commands needed to rebuild a hash object. + * The function returns 0 on error, 1 on success. */ +int rewriteHashObject(rio *r, robj *key, robj *o) { + long long count = 0, items = hashTypeLength(o); + + if (o->encoding == REDIS_ENCODING_ZIPMAP) { + unsigned char *p = zipmapRewind(o->ptr); + unsigned char *field, *val; + unsigned int flen, vlen; + + while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { + if (count == 0) { + int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? + REDIS_AOFREWRITE_ITEMS_PER_CMD : items; + + if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; + if (rioWriteBulkString(r,"HMSET",5) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + } + if (rioWriteBulkString(r,(char*)field,flen) == 0) return 0; + if (rioWriteBulkString(r,(char*)val,vlen) == 0) return 0; + if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; + items--; + } + } else { + dictIterator *di = dictGetIterator(o->ptr); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + robj *field = dictGetKey(de); + robj *val = dictGetVal(de); + + if (count == 0) { + int cmd_items = (items > REDIS_AOFREWRITE_ITEMS_PER_CMD) ? + REDIS_AOFREWRITE_ITEMS_PER_CMD : items; + + if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0; + if (rioWriteBulkString(r,"HMSET",5) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + } + if (rioWriteBulkObject(r,field) == 0) return 0; + if (rioWriteBulkObject(r,val) == 0) return 0; + if (++count == REDIS_AOFREWRITE_ITEMS_PER_CMD) count = 0; + items--; + } + dictReleaseIterator(di); + } + return 1; +} + /* Write a sequence of commands able to fully rebuild the dataset into - * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */ + * "filename". Used both by REWRITEAOF and BGREWRITEAOF. + * + * In order to minimize the number of commands needed in the rewritten + * log Redis uses variadic commands when possible, such as RPUSH, SADD + * and ZADD. However at max REDIS_AOFREWRITE_ITEMS_PER_CMD items per time + * are inserted using a single command. */ int rewriteAppendOnlyFile(char *filename) { dictIterator *di = NULL; dictEntry *de; @@ -479,151 +719,13 @@ int rewriteAppendOnlyFile(char *filename) { if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (rioWriteBulkObject(&aof,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { - /* Emit the RPUSHes needed to rebuild the list */ - char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; - if (o->encoding == REDIS_ENCODING_ZIPLIST) { - unsigned char *zl = o->ptr; - unsigned char *p = ziplistIndex(zl,0); - unsigned char *vstr; - unsigned int vlen; - long long vlong; - - while(ziplistGet(p,&vstr,&vlen,&vlong)) { - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (vstr) { - if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0) - goto werr; - } else { - if (rioWriteBulkLongLong(&aof,vlong) == 0) - goto werr; - } - p = ziplistNext(zl,p); - } - } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { - list *list = o->ptr; - listNode *ln; - listIter li; - - listRewind(list,&li); - while((ln = listNext(&li))) { - robj *eleobj = listNodeValue(ln); - - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr; - } - } else { - redisPanic("Unknown list encoding"); - } + if (rewriteListObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_SET) { - char cmd[]="*3\r\n$4\r\nSADD\r\n"; - - /* Emit the SADDs needed to rebuild the set */ - if (o->encoding == REDIS_ENCODING_INTSET) { - int ii = 0; - int64_t llval; - while(intsetGet(o->ptr,ii++,&llval)) { - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (rioWriteBulkLongLong(&aof,llval) == 0) goto werr; - } - } else if (o->encoding == REDIS_ENCODING_HT) { - dictIterator *di = dictGetIterator(o->ptr); - dictEntry *de; - while((de = dictNext(di)) != NULL) { - robj *eleobj = dictGetKey(de); - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr; - } - dictReleaseIterator(di); - } else { - redisPanic("Unknown set encoding"); - } + if (rewriteSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_ZSET) { - /* Emit the ZADDs needed to rebuild the sorted set */ - char cmd[]="*4\r\n$4\r\nZADD\r\n"; - - if (o->encoding == REDIS_ENCODING_ZIPLIST) { - unsigned char *zl = o->ptr; - unsigned char *eptr, *sptr; - unsigned char *vstr; - unsigned int vlen; - long long vll; - double score; - - eptr = ziplistIndex(zl,0); - redisAssert(eptr != NULL); - sptr = ziplistNext(zl,eptr); - redisAssert(sptr != NULL); - - while (eptr != NULL) { - redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll)); - score = zzlGetScore(sptr); - - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (rioWriteBulkDouble(&aof,score) == 0) goto werr; - if (vstr != NULL) { - if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0) - goto werr; - } else { - if (rioWriteBulkLongLong(&aof,vll) == 0) - goto werr; - } - zzlNext(zl,&eptr,&sptr); - } - } else if (o->encoding == REDIS_ENCODING_SKIPLIST) { - zset *zs = o->ptr; - dictIterator *di = dictGetIterator(zs->dict); - dictEntry *de; - - while((de = dictNext(di)) != NULL) { - robj *eleobj = dictGetKey(de); - double *score = dictGetVal(de); - - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (rioWriteBulkDouble(&aof,*score) == 0) goto werr; - if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr; - } - dictReleaseIterator(di); - } else { - redisPanic("Unknown sorted set encoding"); - } + if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr; } else if (o->type == REDIS_HASH) { - char cmd[]="*4\r\n$4\r\nHSET\r\n"; - - /* Emit the HSETs needed to rebuild the hash */ - if (o->encoding == REDIS_ENCODING_ZIPMAP) { - unsigned char *p = zipmapRewind(o->ptr); - unsigned char *field, *val; - unsigned int flen, vlen; - - while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (rioWriteBulkString(&aof,(char*)field,flen) == 0) - goto werr; - if (rioWriteBulkString(&aof,(char*)val,vlen) == 0) - goto werr; - } - } else { - dictIterator *di = dictGetIterator(o->ptr); - dictEntry *de; - - while((de = dictNext(di)) != NULL) { - robj *field = dictGetKey(de); - robj *val = dictGetVal(de); - - if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(&aof,&key) == 0) goto werr; - if (rioWriteBulkObject(&aof,field) == 0) goto werr; - if (rioWriteBulkObject(&aof,val) == 0) goto werr; - } - dictReleaseIterator(di); - } + if (rewriteHashObject(&aof,&key,o) == 0) goto werr; } else { redisPanic("Unknown object type"); } @@ -863,6 +965,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { } redisLog(REDIS_NOTICE, "Background AOF rewrite successful"); + server.aof_wait_rewrite = 0; /* Asynchronously close the overwritten AOF. */ if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); @@ -882,4 +985,10 @@ cleanup: server.bgrewritebuf = sdsempty(); aofRemoveTempFile(server.bgrewritechildpid); server.bgrewritechildpid = -1; + /* If we were waiting for an AOF rewrite before to start appending + * to the AOF again (this happens both when the user switches on + * AOF with CONFIG SET, and after a slave with AOF enabled syncs with + * the master), but the rewrite failed (otherwise aof_wait_rewrite + * would be zero), we need to schedule a new one. */ + if (server.aof_wait_rewrite) server.aofrewrite_scheduled = 1; }