]> git.saurik.com Git - redis.git/blame - src/aof.c
postpone the AOF fsync if policy is everysec and there is a background fsync already...
[redis.git] / src / aof.c
CommitLineData
e2641e09 1#include "redis.h"
986630af 2#include "bio.h"
e2641e09 3
4#include <signal.h>
5#include <fcntl.h>
6#include <sys/stat.h>
3688d7f3 7#include <sys/types.h>
8#include <sys/time.h>
9#include <sys/resource.h>
10#include <sys/wait.h>
e2641e09 11
b333e239 12void aofUpdateCurrentSize(void);
13
4b77700a 14void aof_background_fsync(int fd) {
9a35eb22 15 bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
4b77700a 16}
17
e2641e09 18/* Called when the user switches from "appendonly yes" to "appendonly no"
19 * at runtime using the CONFIG command. */
20void stopAppendOnly(void) {
db3c2a4f 21 flushAppendOnlyFile(1);
e2641e09 22 aof_fsync(server.appendfd);
23 close(server.appendfd);
24
25 server.appendfd = -1;
26 server.appendseldb = -1;
27 server.appendonly = 0;
28 /* rewrite operation in progress? kill it, wait child exit */
b333e239 29 if (server.bgrewritechildpid != -1) {
e2641e09 30 int statloc;
31
b333e239 32 if (kill(server.bgrewritechildpid,SIGKILL) != -1)
e2641e09 33 wait3(&statloc,0,NULL);
34 /* reset the buffer accumulating changes while the child saves */
35 sdsfree(server.bgrewritebuf);
36 server.bgrewritebuf = sdsempty();
b333e239 37 server.bgrewritechildpid = -1;
e2641e09 38 }
39}
40
41/* Called when the user switches from "appendonly no" to "appendonly yes"
42 * at runtime using the CONFIG command. */
43int startAppendOnly(void) {
44 server.appendonly = 1;
45 server.lastfsync = time(NULL);
46 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
47 if (server.appendfd == -1) {
48 redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno));
49 return REDIS_ERR;
50 }
51 if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {
52 server.appendonly = 0;
53 close(server.appendfd);
54 redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno));
55 return REDIS_ERR;
56 }
57 return REDIS_OK;
58}
59
60/* Write the append only file buffer on disk.
61 *
62 * Since we are required to write the AOF before replying to the client,
63 * and the only way the client socket can get a write is entering when the
64 * the event loop, we accumulate all the AOF writes in a memory
65 * buffer and write it on disk using this function just before entering
db3c2a4f 66 * the event loop again.
67 *
68 * About the 'force' argument:
69 *
70 * When the fsync policy is set to 'everysec' we may delay the flush if there
71 * is still an fsync() going on in the background thread, since for instance
72 * on Linux write(2) will be blocked by the background fsync anyway.
73 * When this happens we remember that there is some aof buffer to be
74 * flushed ASAP, and will try to do that in the serverCron() function.
75 *
76 * However if force is set to 1 we'll write regardless of the background
77 * fsync. */
78void flushAppendOnlyFile(int force) {
e2641e09 79 ssize_t nwritten;
db3c2a4f 80 int sync_in_progress = 0;
e2641e09 81
82 if (sdslen(server.aofbuf) == 0) return;
83
db3c2a4f 84 if (server.appendfsync == APPENDFSYNC_EVERYSEC)
85 sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
86
87 if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) {
88 /* With this append fsync policy we do background fsyncing.
89 * If the fsync is still in progress we can try to delay
90 * the write for a couple of seconds. */
91 if (sync_in_progress) {
92 if (server.aof_flush_postponed_start == 0) {
93 /* No previous write postponinig, remember that we are
94 * postponing the flush and return. */
95 server.aof_flush_postponed_start = server.unixtime;
96 return;
97 } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
98 /* We were already writing for fsync to finish, but for less
99 * than two seconds this is still ok. Postpone again. */
100 return;
101 }
102 /* Otherwise fall trough, and go write since we can't wait
103 * over two seconds. */
104 }
105 }
106 /* If you are following this code path, then we are going to write so
107 * set reset the postponed flush sentinel to zero. */
108 server.aof_flush_postponed_start = 0;
109
e2641e09 110 /* We want to perform a single write. This should be guaranteed atomic
111 * at least if the filesystem we are writing is a real physical one.
112 * While this will save us against the server being killed I don't think
113 * there is much to do about the whole server stopping for power problems
114 * or alike */
a57225c2
PN
115 nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
116 if (nwritten != (signed)sdslen(server.aofbuf)) {
e2641e09 117 /* Ooops, we are in troubles. The best thing to do for now is
118 * aborting instead of giving the illusion that everything is
119 * working as expected. */
a57225c2 120 if (nwritten == -1) {
e2641e09 121 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
a57225c2 122 } else {
e2641e09 123 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
a57225c2
PN
124 }
125 exit(1);
e2641e09 126 }
b333e239 127 server.appendonly_current_size += nwritten;
e2641e09 128
f990782f
PN
129 /* Re-use AOF buffer when it is small enough. The maximum comes from the
130 * arena size of 4k minus some overhead (but is otherwise arbitrary). */
131 if ((sdslen(server.aofbuf)+sdsavail(server.aofbuf)) < 4000) {
132 sdsclear(server.aofbuf);
133 } else {
134 sdsfree(server.aofbuf);
135 server.aofbuf = sdsempty();
136 }
137
29732248
PN
138 /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
139 * children doing I/O in the background. */
e2641e09 140 if (server.no_appendfsync_on_rewrite &&
141 (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1))
142 return;
29732248
PN
143
144 /* Perform the fsync if needed. */
db3c2a4f 145 if (server.appendfsync == APPENDFSYNC_ALWAYS) {
e2641e09 146 /* aof_fsync is defined as fdatasync() for Linux in order to avoid
147 * flushing metadata. */
148 aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
29732248 149 server.lastfsync = server.unixtime;
db3c2a4f 150 } else if ((server.appendfsync == APPENDFSYNC_EVERYSEC &&
151 server.unixtime > server.lastfsync)) {
152 if (!sync_in_progress) aof_background_fsync(server.appendfd);
153 server.lastfsync = server.unixtime;
e2641e09 154 }
155}
156
d1ec6c8b
PN
157sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
158 char buf[32];
159 int len, j;
160 robj *o;
161
162 buf[0] = '*';
163 len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
164 buf[len++] = '\r';
165 buf[len++] = '\n';
166 dst = sdscatlen(dst,buf,len);
167
e2641e09 168 for (j = 0; j < argc; j++) {
d1ec6c8b
PN
169 o = getDecodedObject(argv[j]);
170 buf[0] = '$';
171 len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
172 buf[len++] = '\r';
173 buf[len++] = '\n';
174 dst = sdscatlen(dst,buf,len);
175 dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
176 dst = sdscatlen(dst,"\r\n",2);
e2641e09 177 decrRefCount(o);
178 }
d1ec6c8b 179 return dst;
e2641e09 180}
181
182sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) {
183 int argc = 3;
184 long when;
185 robj *argv[3];
186
187 /* Make sure we can use strtol */
188 seconds = getDecodedObject(seconds);
189 when = time(NULL)+strtol(seconds->ptr,NULL,10);
190 decrRefCount(seconds);
191
192 argv[0] = createStringObject("EXPIREAT",8);
193 argv[1] = key;
194 argv[2] = createObject(REDIS_STRING,
195 sdscatprintf(sdsempty(),"%ld",when));
196 buf = catAppendOnlyGenericCommand(buf, argc, argv);
197 decrRefCount(argv[0]);
198 decrRefCount(argv[2]);
199 return buf;
200}
201
202void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
203 sds buf = sdsempty();
204 robj *tmpargv[3];
205
206 /* The DB this command was targetting is not the same as the last command
207 * we appendend. To issue a SELECT command is needed. */
208 if (dictid != server.appendseldb) {
209 char seldb[64];
210
211 snprintf(seldb,sizeof(seldb),"%d",dictid);
212 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
213 (unsigned long)strlen(seldb),seldb);
214 server.appendseldb = dictid;
215 }
216
217 if (cmd->proc == expireCommand) {
218 /* Translate EXPIRE into EXPIREAT */
219 buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]);
220 } else if (cmd->proc == setexCommand) {
221 /* Translate SETEX to SET and EXPIREAT */
222 tmpargv[0] = createStringObject("SET",3);
223 tmpargv[1] = argv[1];
224 tmpargv[2] = argv[3];
225 buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
226 decrRefCount(tmpargv[0]);
227 buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]);
228 } else {
229 buf = catAppendOnlyGenericCommand(buf,argc,argv);
230 }
231
232 /* Append to the AOF buffer. This will be flushed on disk just before
233 * of re-entering the event loop, so before the client will get a
234 * positive reply about the operation performed. */
235 server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf));
236
237 /* If a background append only file rewriting is in progress we want to
238 * accumulate the differences between the child DB and the current one
239 * in a buffer, so that when the child process will do its work we
240 * can append the differences to the new append only file. */
241 if (server.bgrewritechildpid != -1)
242 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
243
244 sdsfree(buf);
245}
246
247/* In Redis commands are always executed in the context of a client, so in
248 * order to load the append only file we need to create a fake client. */
249struct redisClient *createFakeClient(void) {
250 struct redisClient *c = zmalloc(sizeof(*c));
251
252 selectDb(c,0);
253 c->fd = -1;
254 c->querybuf = sdsempty();
255 c->argc = 0;
256 c->argv = NULL;
2403fc9f 257 c->bufpos = 0;
e2641e09 258 c->flags = 0;
259 /* We set the fake client as a slave waiting for the synchronization
260 * so that Redis will not try to send replies to this client. */
261 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
262 c->reply = listCreate();
b67d2345 263 c->watched_keys = listCreate();
e2641e09 264 listSetFreeMethod(c->reply,decrRefCount);
265 listSetDupMethod(c->reply,dupClientReplyValue);
266 initClientMultiState(c);
267 return c;
268}
269
270void freeFakeClient(struct redisClient *c) {
271 sdsfree(c->querybuf);
272 listRelease(c->reply);
b67d2345 273 listRelease(c->watched_keys);
e2641e09 274 freeClientMultiState(c);
275 zfree(c);
276}
277
278/* Replay the append log file. On error REDIS_OK is returned. On non fatal
279 * error (the append only file is zero-length) REDIS_ERR is returned. On
280 * fatal error an error message is logged and the program exists. */
281int loadAppendOnlyFile(char *filename) {
282 struct redisClient *fakeClient;
283 FILE *fp = fopen(filename,"r");
284 struct redis_stat sb;
285 int appendonly = server.appendonly;
97e7f8ae 286 long loops = 0;
e2641e09 287
4aec2ec8 288 if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
b333e239 289 server.appendonly_current_size = 0;
4aec2ec8 290 fclose(fp);
e2641e09 291 return REDIS_ERR;
4aec2ec8 292 }
e2641e09 293
294 if (fp == NULL) {
295 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
296 exit(1);
297 }
298
299 /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
300 * to the same file we're about to read. */
301 server.appendonly = 0;
302
303 fakeClient = createFakeClient();
97e7f8ae 304 startLoading(fp);
305
e2641e09 306 while(1) {
307 int argc, j;
308 unsigned long len;
309 robj **argv;
310 char buf[128];
311 sds argsds;
312 struct redisCommand *cmd;
e2641e09 313
97e7f8ae 314 /* Serve the clients from time to time */
315 if (!(loops++ % 1000)) {
316 loadingProgress(ftello(fp));
317 aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
318 }
319
e2641e09 320 if (fgets(buf,sizeof(buf),fp) == NULL) {
321 if (feof(fp))
322 break;
323 else
324 goto readerr;
325 }
326 if (buf[0] != '*') goto fmterr;
327 argc = atoi(buf+1);
328 argv = zmalloc(sizeof(robj*)*argc);
329 for (j = 0; j < argc; j++) {
330 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
331 if (buf[0] != '$') goto fmterr;
332 len = strtol(buf+1,NULL,10);
333 argsds = sdsnewlen(NULL,len);
334 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
335 argv[j] = createObject(REDIS_STRING,argsds);
336 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
337 }
338
339 /* Command lookup */
340 cmd = lookupCommand(argv[0]->ptr);
341 if (!cmd) {
342 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
343 exit(1);
344 }
e2641e09 345 /* Run the command in the context of a fake client */
346 fakeClient->argc = argc;
347 fakeClient->argv = argv;
348 cmd->proc(fakeClient);
57b07380
PN
349
350 /* The fake client should not have a reply */
351 redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
ef67a2fc 352 /* The fake client should never get blocked */
353 redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
57b07380 354
45b0f6fb
PN
355 /* Clean up. Command code may have changed argv/argc so we use the
356 * argv/argc of the client instead of the local variables. */
357 for (j = 0; j < fakeClient->argc; j++)
358 decrRefCount(fakeClient->argv[j]);
359 zfree(fakeClient->argv);
e2641e09 360 }
361
362 /* This point can only be reached when EOF is reached without errors.
363 * If the client is in the middle of a MULTI/EXEC, log error and quit. */
364 if (fakeClient->flags & REDIS_MULTI) goto readerr;
365
366 fclose(fp);
367 freeFakeClient(fakeClient);
368 server.appendonly = appendonly;
97e7f8ae 369 stopLoading();
b333e239 370 aofUpdateCurrentSize();
c66bf1fa 371 server.auto_aofrewrite_base_size = server.appendonly_current_size;
e2641e09 372 return REDIS_OK;
373
374readerr:
375 if (feof(fp)) {
376 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
377 } else {
378 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
379 }
380 exit(1);
381fmterr:
412e457c 382 redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
e2641e09 383 exit(1);
384}
385
e2641e09 386/* Write a sequence of commands able to fully rebuild the dataset into
387 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
388int rewriteAppendOnlyFile(char *filename) {
389 dictIterator *di = NULL;
390 dictEntry *de;
391 FILE *fp;
392 char tmpfile[256];
393 int j;
394 time_t now = time(NULL);
395
396 /* Note that we have to use a different temp name here compared to the
397 * one used by rewriteAppendOnlyFileBackground() function. */
398 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
399 fp = fopen(tmpfile,"w");
400 if (!fp) {
401 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
402 return REDIS_ERR;
403 }
404 for (j = 0; j < server.dbnum; j++) {
405 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
406 redisDb *db = server.db+j;
407 dict *d = db->dict;
408 if (dictSize(d) == 0) continue;
591f29e0 409 di = dictGetSafeIterator(d);
e2641e09 410 if (!di) {
411 fclose(fp);
412 return REDIS_ERR;
413 }
414
415 /* SELECT the new DB */
416 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
417 if (fwriteBulkLongLong(fp,j) == 0) goto werr;
418
419 /* Iterate this DB writing every entry */
420 while((de = dictNext(di)) != NULL) {
6901fe77 421 sds keystr;
e2641e09 422 robj key, *o;
423 time_t expiretime;
e2641e09 424
425 keystr = dictGetEntryKey(de);
426 o = dictGetEntryVal(de);
427 initStaticStringObject(key,keystr);
16d77878 428
e2641e09 429 expiretime = getExpire(db,&key);
430
431 /* Save the key and associated value */
432 if (o->type == REDIS_STRING) {
433 /* Emit a SET command */
434 char cmd[]="*3\r\n$3\r\nSET\r\n";
435 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
436 /* Key and value */
437 if (fwriteBulkObject(fp,&key) == 0) goto werr;
438 if (fwriteBulkObject(fp,o) == 0) goto werr;
439 } else if (o->type == REDIS_LIST) {
440 /* Emit the RPUSHes needed to rebuild the list */
441 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
442 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
443 unsigned char *zl = o->ptr;
444 unsigned char *p = ziplistIndex(zl,0);
445 unsigned char *vstr;
446 unsigned int vlen;
447 long long vlong;
448
449 while(ziplistGet(p,&vstr,&vlen,&vlong)) {
450 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
451 if (fwriteBulkObject(fp,&key) == 0) goto werr;
452 if (vstr) {
453 if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
454 goto werr;
455 } else {
456 if (fwriteBulkLongLong(fp,vlong) == 0)
457 goto werr;
458 }
459 p = ziplistNext(zl,p);
460 }
461 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
462 list *list = o->ptr;
463 listNode *ln;
464 listIter li;
465
466 listRewind(list,&li);
467 while((ln = listNext(&li))) {
468 robj *eleobj = listNodeValue(ln);
469
470 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
471 if (fwriteBulkObject(fp,&key) == 0) goto werr;
472 if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
473 }
474 } else {
475 redisPanic("Unknown list encoding");
476 }
477 } else if (o->type == REDIS_SET) {
2767f1c0 478 char cmd[]="*3\r\n$4\r\nSADD\r\n";
e2641e09 479
2767f1c0
PN
480 /* Emit the SADDs needed to rebuild the set */
481 if (o->encoding == REDIS_ENCODING_INTSET) {
482 int ii = 0;
23c64fe5 483 int64_t llval;
2767f1c0
PN
484 while(intsetGet(o->ptr,ii++,&llval)) {
485 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
486 if (fwriteBulkObject(fp,&key) == 0) goto werr;
487 if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
488 }
489 } else if (o->encoding == REDIS_ENCODING_HT) {
490 dictIterator *di = dictGetIterator(o->ptr);
491 dictEntry *de;
492 while((de = dictNext(di)) != NULL) {
493 robj *eleobj = dictGetEntryKey(de);
494 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
495 if (fwriteBulkObject(fp,&key) == 0) goto werr;
496 if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
497 }
498 dictReleaseIterator(di);
499 } else {
500 redisPanic("Unknown set encoding");
e2641e09 501 }
e2641e09 502 } else if (o->type == REDIS_ZSET) {
503 /* Emit the ZADDs needed to rebuild the sorted set */
dddf5335
PN
504 char cmd[]="*4\r\n$4\r\nZADD\r\n";
505
506 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
507 unsigned char *zl = o->ptr;
508 unsigned char *eptr, *sptr;
509 unsigned char *vstr;
510 unsigned int vlen;
511 long long vll;
512 double score;
513
514 eptr = ziplistIndex(zl,0);
515 redisAssert(eptr != NULL);
516 sptr = ziplistNext(zl,eptr);
517 redisAssert(sptr != NULL);
518
519 while (eptr != NULL) {
520 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
521 score = zzlGetScore(sptr);
522
523 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
524 if (fwriteBulkObject(fp,&key) == 0) goto werr;
525 if (fwriteBulkDouble(fp,score) == 0) goto werr;
526 if (vstr != NULL) {
527 if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
528 goto werr;
529 } else {
530 if (fwriteBulkLongLong(fp,vll) == 0)
531 goto werr;
532 }
533 zzlNext(zl,&eptr,&sptr);
534 }
100ed062 535 } else if (o->encoding == REDIS_ENCODING_SKIPLIST) {
dddf5335
PN
536 zset *zs = o->ptr;
537 dictIterator *di = dictGetIterator(zs->dict);
538 dictEntry *de;
539
540 while((de = dictNext(di)) != NULL) {
541 robj *eleobj = dictGetEntryKey(de);
542 double *score = dictGetEntryVal(de);
543
544 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
545 if (fwriteBulkObject(fp,&key) == 0) goto werr;
546 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
547 if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
548 }
549 dictReleaseIterator(di);
550 } else {
551 redisPanic("Unknown sorted set encoding");
e2641e09 552 }
e2641e09 553 } else if (o->type == REDIS_HASH) {
554 char cmd[]="*4\r\n$4\r\nHSET\r\n";
555
556 /* Emit the HSETs needed to rebuild the hash */
557 if (o->encoding == REDIS_ENCODING_ZIPMAP) {
558 unsigned char *p = zipmapRewind(o->ptr);
559 unsigned char *field, *val;
560 unsigned int flen, vlen;
561
562 while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
563 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
564 if (fwriteBulkObject(fp,&key) == 0) goto werr;
daf2049d 565 if (fwriteBulkString(fp,(char*)field,flen) == 0)
5bd09cd4 566 goto werr;
daf2049d 567 if (fwriteBulkString(fp,(char*)val,vlen) == 0)
5bd09cd4 568 goto werr;
e2641e09 569 }
570 } else {
571 dictIterator *di = dictGetIterator(o->ptr);
572 dictEntry *de;
573
574 while((de = dictNext(di)) != NULL) {
575 robj *field = dictGetEntryKey(de);
576 robj *val = dictGetEntryVal(de);
577
578 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
579 if (fwriteBulkObject(fp,&key) == 0) goto werr;
5bd09cd4 580 if (fwriteBulkObject(fp,field) == 0) goto werr;
581 if (fwriteBulkObject(fp,val) == 0) goto werr;
e2641e09 582 }
583 dictReleaseIterator(di);
584 }
585 } else {
586 redisPanic("Unknown object type");
587 }
588 /* Save the expire time */
589 if (expiretime != -1) {
590 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
591 /* If this key is already expired skip it */
592 if (expiretime < now) continue;
593 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
594 if (fwriteBulkObject(fp,&key) == 0) goto werr;
595 if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
596 }
e2641e09 597 }
598 dictReleaseIterator(di);
599 }
600
601 /* Make sure data will not remain on the OS's output buffers */
602 fflush(fp);
603 aof_fsync(fileno(fp));
604 fclose(fp);
605
606 /* Use RENAME to make sure the DB file is changed atomically only
607 * if the generate DB file is ok. */
608 if (rename(tmpfile,filename) == -1) {
609 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
610 unlink(tmpfile);
611 return REDIS_ERR;
612 }
613 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
614 return REDIS_OK;
615
616werr:
617 fclose(fp);
618 unlink(tmpfile);
619 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
620 if (di) dictReleaseIterator(di);
621 return REDIS_ERR;
622}
623
624/* This is how rewriting of the append only file in background works:
625 *
626 * 1) The user calls BGREWRITEAOF
627 * 2) Redis calls this function, that forks():
628 * 2a) the child rewrite the append only file in a temp file.
629 * 2b) the parent accumulates differences in server.bgrewritebuf.
630 * 3) When the child finished '2a' exists.
631 * 4) The parent will trap the exit code, if it's OK, will append the
632 * data accumulated into server.bgrewritebuf into the temp file, and
633 * finally will rename(2) the temp file in the actual file name.
634 * The the new file is reopened as the new append only file. Profit!
635 */
636int rewriteAppendOnlyFileBackground(void) {
637 pid_t childpid;
615e414c 638 long long start;
e2641e09 639
640 if (server.bgrewritechildpid != -1) return REDIS_ERR;
615e414c 641 start = ustime();
e2641e09 642 if ((childpid = fork()) == 0) {
e2641e09 643 char tmpfile[256];
644
615e414c 645 /* Child */
a5639e7d
PN
646 if (server.ipfd > 0) close(server.ipfd);
647 if (server.sofd > 0) close(server.sofd);
e2641e09 648 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
649 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
650 _exit(0);
651 } else {
652 _exit(1);
653 }
654 } else {
655 /* Parent */
615e414c 656 server.stat_fork_time = ustime()-start;
e2641e09 657 if (childpid == -1) {
658 redisLog(REDIS_WARNING,
659 "Can't rewrite append only file in background: fork: %s",
660 strerror(errno));
661 return REDIS_ERR;
662 }
663 redisLog(REDIS_NOTICE,
664 "Background append only file rewriting started by pid %d",childpid);
665 server.bgrewritechildpid = childpid;
666 updateDictResizePolicy();
667 /* We set appendseldb to -1 in order to force the next call to the
668 * feedAppendOnlyFile() to issue a SELECT command, so the differences
669 * accumulated by the parent into server.bgrewritebuf will start
670 * with a SELECT statement and it will be safe to merge. */
671 server.appendseldb = -1;
672 return REDIS_OK;
673 }
674 return REDIS_OK; /* unreached */
675}
676
677void bgrewriteaofCommand(redisClient *c) {
678 if (server.bgrewritechildpid != -1) {
3ab20376 679 addReplyError(c,"Background append only file rewriting already in progress");
b333e239 680 } else if (server.bgsavechildpid != -1) {
681 server.aofrewrite_scheduled = 1;
9e40bce3 682 addReplyStatus(c,"Background append only file rewriting scheduled");
b333e239 683 } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
3ab20376 684 addReplyStatus(c,"Background append only file rewriting started");
e2641e09 685 } else {
686 addReply(c,shared.err);
687 }
688}
689
690void aofRemoveTempFile(pid_t childpid) {
691 char tmpfile[256];
692
693 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
694 unlink(tmpfile);
695}
696
b333e239 697/* Update the server.appendonly_current_size filed explicitly using stat(2)
698 * to check the size of the file. This is useful after a rewrite or after
699 * a restart, normally the size is updated just adding the write length
700 * to the current lenght, that is much faster. */
701void aofUpdateCurrentSize(void) {
702 struct redis_stat sb;
703
704 if (redis_fstat(server.appendfd,&sb) == -1) {
705 redisLog(REDIS_WARNING,"Unable to check the AOF length: %s",
706 strerror(errno));
707 } else {
708 server.appendonly_current_size = sb.st_size;
709 }
710}
711
e2641e09 712/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
713 * Handle this. */
36c17a53 714void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
e2641e09 715 if (!bysignal && exitcode == 0) {
b454056d
PN
716 int newfd, oldfd;
717 int nwritten;
e2641e09 718 char tmpfile[256];
b454056d 719 long long now = ustime();
e2641e09 720
721 redisLog(REDIS_NOTICE,
b454056d
PN
722 "Background AOF rewrite terminated with success");
723
986630af 724 /* Flush the differences accumulated by the parent to the
725 * rewritten AOF. */
726 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
727 (int)server.bgrewritechildpid);
b454056d
PN
728 newfd = open(tmpfile,O_WRONLY|O_APPEND);
729 if (newfd == -1) {
730 redisLog(REDIS_WARNING,
731 "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
e2641e09 732 goto cleanup;
733 }
b454056d
PN
734
735 nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf));
736 if (nwritten != (signed)sdslen(server.bgrewritebuf)) {
737 if (nwritten == -1) {
738 redisLog(REDIS_WARNING,
739 "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
740 } else {
741 redisLog(REDIS_WARNING,
742 "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
743 }
744 close(newfd);
e2641e09 745 goto cleanup;
746 }
b454056d
PN
747
748 redisLog(REDIS_NOTICE,
749 "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);
750
751 /* The only remaining thing to do is to rename the temporary file to
752 * the configured file and switch the file descriptor used to do AOF
986630af 753 * writes. We don't want close(2) or rename(2) calls to block the
754 * server on old file deletion.
755 *
756 * There are two possible scenarios:
b454056d
PN
757 *
758 * 1) AOF is DISABLED and this was a one time rewrite. The temporary
759 * file will be renamed to the configured file. When this file already
760 * exists, it will be unlinked, which may block the server.
761 *
762 * 2) AOF is ENABLED and the rewritten AOF will immediately start
763 * receiving writes. After the temporary file is renamed to the
764 * configured file, the original AOF file descriptor will be closed.
765 * Since this will be the last reference to that file, closing it
766 * causes the underlying file to be unlinked, which may block the
767 * server.
768 *
769 * To mitigate the blocking effect of the unlink operation (either
770 * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
986630af 771 * use a background thread to take care of this. First, we
b454056d
PN
772 * make scenario 1 identical to scenario 2 by opening the target file
773 * when it exists. The unlink operation after the rename(2) will then
774 * be executed upon calling close(2) for its descriptor. Everything to
775 * guarantee atomicity for this switch has already happened by then, so
776 * we don't care what the outcome or duration of that close operation
777 * is, as long as the file descriptor is released again. */
778 if (server.appendfd == -1) {
779 /* AOF disabled */
b454056d 780
986630af 781 /* Don't care if this fails: oldfd will be -1 and we handle that.
782 * One notable case of -1 return is if the old file does
783 * not exist. */
784 oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK);
b454056d
PN
785 } else {
786 /* AOF enabled */
986630af 787 oldfd = -1; /* We'll set this to the current AOF filedes later. */
b454056d
PN
788 }
789
790 /* Rename the temporary file. This will not unlink the target file if
791 * it exists, because we reference it with "oldfd". */
e2641e09 792 if (rename(tmpfile,server.appendfilename) == -1) {
b454056d
PN
793 redisLog(REDIS_WARNING,
794 "Error trying to rename the temporary AOF: %s", strerror(errno));
795 close(newfd);
986630af 796 if (oldfd != -1) close(oldfd);
e2641e09 797 goto cleanup;
798 }
b454056d
PN
799
800 if (server.appendfd == -1) {
986630af 801 /* AOF disabled, we don't need to set the AOF file descriptor
802 * to this new file, so we can close it. */
b454056d
PN
803 close(newfd);
804 } else {
986630af 805 /* AOF enabled, replace the old fd with the new one. */
b454056d
PN
806 oldfd = server.appendfd;
807 server.appendfd = newfd;
4b77700a 808 if (server.appendfsync == APPENDFSYNC_ALWAYS)
809 aof_fsync(newfd);
810 else if (server.appendfsync == APPENDFSYNC_EVERYSEC)
811 aof_background_fsync(newfd);
b454056d 812 server.appendseldb = -1; /* Make sure SELECT is re-issued */
b333e239 813 aofUpdateCurrentSize();
c66bf1fa 814 server.auto_aofrewrite_base_size = server.appendonly_current_size;
e2641e09 815 }
b454056d
PN
816
817 redisLog(REDIS_NOTICE, "Background AOF rewrite successful");
818
819 /* Asynchronously close the overwritten AOF. */
50be9b97 820 if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
b454056d
PN
821
822 redisLog(REDIS_VERBOSE,
823 "Background AOF rewrite signal handler took %lldus", ustime()-now);
e2641e09 824 } else if (!bysignal && exitcode != 0) {
b454056d
PN
825 redisLog(REDIS_WARNING,
826 "Background AOF rewrite terminated with error");
e2641e09 827 } else {
828 redisLog(REDIS_WARNING,
b454056d 829 "Background AOF rewrite terminated by signal %d", bysignal);
e2641e09 830 }
b454056d 831
e2641e09 832cleanup:
833 sdsfree(server.bgrewritebuf);
834 server.bgrewritebuf = sdsempty();
835 aofRemoveTempFile(server.bgrewritechildpid);
836 server.bgrewritechildpid = -1;
837}