]> git.saurik.com Git - redis.git/blame - src/aof.c
Merge pull request #74 from kmerenkov/issue_620
[redis.git] / src / aof.c
CommitLineData
e2641e09 1#include "redis.h"
986630af 2#include "bio.h"
e2641e09 3
4#include <signal.h>
5#include <fcntl.h>
6#include <sys/stat.h>
3688d7f3 7#include <sys/types.h>
8#include <sys/time.h>
9#include <sys/resource.h>
10#include <sys/wait.h>
e2641e09 11
b333e239 12void aofUpdateCurrentSize(void);
13
4b77700a 14void aof_background_fsync(int fd) {
9a35eb22 15 bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
4b77700a 16}
17
e2641e09 18/* Called when the user switches from "appendonly yes" to "appendonly no"
19 * at runtime using the CONFIG command. */
20void stopAppendOnly(void) {
db3c2a4f 21 flushAppendOnlyFile(1);
e2641e09 22 aof_fsync(server.appendfd);
23 close(server.appendfd);
24
25 server.appendfd = -1;
26 server.appendseldb = -1;
27 server.appendonly = 0;
28 /* rewrite operation in progress? kill it, wait child exit */
b333e239 29 if (server.bgrewritechildpid != -1) {
e2641e09 30 int statloc;
31
b333e239 32 if (kill(server.bgrewritechildpid,SIGKILL) != -1)
e2641e09 33 wait3(&statloc,0,NULL);
34 /* reset the buffer accumulating changes while the child saves */
35 sdsfree(server.bgrewritebuf);
36 server.bgrewritebuf = sdsempty();
b333e239 37 server.bgrewritechildpid = -1;
e2641e09 38 }
39}
40
41/* Called when the user switches from "appendonly no" to "appendonly yes"
42 * at runtime using the CONFIG command. */
43int startAppendOnly(void) {
44 server.appendonly = 1;
45 server.lastfsync = time(NULL);
46 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
47 if (server.appendfd == -1) {
48 redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno));
49 return REDIS_ERR;
50 }
51 if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {
52 server.appendonly = 0;
53 close(server.appendfd);
54 redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno));
55 return REDIS_ERR;
56 }
57 return REDIS_OK;
58}
59
60/* Write the append only file buffer on disk.
61 *
62 * Since we are required to write the AOF before replying to the client,
63 * and the only way the client socket can get a write is entering when the
64 * the event loop, we accumulate all the AOF writes in a memory
65 * buffer and write it on disk using this function just before entering
db3c2a4f 66 * the event loop again.
67 *
68 * About the 'force' argument:
69 *
70 * When the fsync policy is set to 'everysec' we may delay the flush if there
71 * is still an fsync() going on in the background thread, since for instance
72 * on Linux write(2) will be blocked by the background fsync anyway.
73 * When this happens we remember that there is some aof buffer to be
74 * flushed ASAP, and will try to do that in the serverCron() function.
75 *
76 * However if force is set to 1 we'll write regardless of the background
77 * fsync. */
78void flushAppendOnlyFile(int force) {
e2641e09 79 ssize_t nwritten;
db3c2a4f 80 int sync_in_progress = 0;
e2641e09 81
82 if (sdslen(server.aofbuf) == 0) return;
83
db3c2a4f 84 if (server.appendfsync == APPENDFSYNC_EVERYSEC)
85 sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
86
87 if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) {
88 /* With this append fsync policy we do background fsyncing.
89 * If the fsync is still in progress we can try to delay
90 * the write for a couple of seconds. */
91 if (sync_in_progress) {
92 if (server.aof_flush_postponed_start == 0) {
93 /* No previous write postponinig, remember that we are
94 * postponing the flush and return. */
95 server.aof_flush_postponed_start = server.unixtime;
96 return;
97 } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
e7aec180 98 /* We were already waiting for fsync to finish, but for less
db3c2a4f 99 * than two seconds this is still ok. Postpone again. */
100 return;
101 }
102 /* Otherwise fall trough, and go write since we can't wait
103 * over two seconds. */
77ca5fcb 104 redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
db3c2a4f 105 }
106 }
107 /* If you are following this code path, then we are going to write so
108 * set reset the postponed flush sentinel to zero. */
109 server.aof_flush_postponed_start = 0;
110
e2641e09 111 /* We want to perform a single write. This should be guaranteed atomic
112 * at least if the filesystem we are writing is a real physical one.
113 * While this will save us against the server being killed I don't think
114 * there is much to do about the whole server stopping for power problems
115 * or alike */
a57225c2
PN
116 nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
117 if (nwritten != (signed)sdslen(server.aofbuf)) {
e2641e09 118 /* Ooops, we are in troubles. The best thing to do for now is
119 * aborting instead of giving the illusion that everything is
120 * working as expected. */
a57225c2 121 if (nwritten == -1) {
e2641e09 122 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
a57225c2 123 } else {
e2641e09 124 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
a57225c2
PN
125 }
126 exit(1);
e2641e09 127 }
b333e239 128 server.appendonly_current_size += nwritten;
e2641e09 129
f990782f
PN
130 /* Re-use AOF buffer when it is small enough. The maximum comes from the
131 * arena size of 4k minus some overhead (but is otherwise arbitrary). */
132 if ((sdslen(server.aofbuf)+sdsavail(server.aofbuf)) < 4000) {
133 sdsclear(server.aofbuf);
134 } else {
135 sdsfree(server.aofbuf);
136 server.aofbuf = sdsempty();
137 }
138
29732248
PN
139 /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
140 * children doing I/O in the background. */
e2641e09 141 if (server.no_appendfsync_on_rewrite &&
142 (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1))
143 return;
29732248
PN
144
145 /* Perform the fsync if needed. */
db3c2a4f 146 if (server.appendfsync == APPENDFSYNC_ALWAYS) {
e2641e09 147 /* aof_fsync is defined as fdatasync() for Linux in order to avoid
148 * flushing metadata. */
149 aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
29732248 150 server.lastfsync = server.unixtime;
db3c2a4f 151 } else if ((server.appendfsync == APPENDFSYNC_EVERYSEC &&
152 server.unixtime > server.lastfsync)) {
153 if (!sync_in_progress) aof_background_fsync(server.appendfd);
154 server.lastfsync = server.unixtime;
e2641e09 155 }
156}
157
d1ec6c8b
PN
158sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
159 char buf[32];
160 int len, j;
161 robj *o;
162
163 buf[0] = '*';
164 len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
165 buf[len++] = '\r';
166 buf[len++] = '\n';
167 dst = sdscatlen(dst,buf,len);
168
e2641e09 169 for (j = 0; j < argc; j++) {
d1ec6c8b
PN
170 o = getDecodedObject(argv[j]);
171 buf[0] = '$';
172 len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
173 buf[len++] = '\r';
174 buf[len++] = '\n';
175 dst = sdscatlen(dst,buf,len);
176 dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
177 dst = sdscatlen(dst,"\r\n",2);
e2641e09 178 decrRefCount(o);
179 }
d1ec6c8b 180 return dst;
e2641e09 181}
182
183sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) {
184 int argc = 3;
185 long when;
186 robj *argv[3];
187
188 /* Make sure we can use strtol */
189 seconds = getDecodedObject(seconds);
190 when = time(NULL)+strtol(seconds->ptr,NULL,10);
191 decrRefCount(seconds);
192
193 argv[0] = createStringObject("EXPIREAT",8);
194 argv[1] = key;
195 argv[2] = createObject(REDIS_STRING,
196 sdscatprintf(sdsempty(),"%ld",when));
197 buf = catAppendOnlyGenericCommand(buf, argc, argv);
198 decrRefCount(argv[0]);
199 decrRefCount(argv[2]);
200 return buf;
201}
202
203void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
204 sds buf = sdsempty();
205 robj *tmpargv[3];
206
207 /* The DB this command was targetting is not the same as the last command
208 * we appendend. To issue a SELECT command is needed. */
209 if (dictid != server.appendseldb) {
210 char seldb[64];
211
212 snprintf(seldb,sizeof(seldb),"%d",dictid);
213 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
214 (unsigned long)strlen(seldb),seldb);
215 server.appendseldb = dictid;
216 }
217
218 if (cmd->proc == expireCommand) {
219 /* Translate EXPIRE into EXPIREAT */
220 buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]);
221 } else if (cmd->proc == setexCommand) {
222 /* Translate SETEX to SET and EXPIREAT */
223 tmpargv[0] = createStringObject("SET",3);
224 tmpargv[1] = argv[1];
225 tmpargv[2] = argv[3];
226 buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
227 decrRefCount(tmpargv[0]);
228 buf = catAppendOnlyExpireAtCommand(buf,argv[1],argv[2]);
229 } else {
230 buf = catAppendOnlyGenericCommand(buf,argc,argv);
231 }
232
233 /* Append to the AOF buffer. This will be flushed on disk just before
234 * of re-entering the event loop, so before the client will get a
235 * positive reply about the operation performed. */
236 server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf));
237
238 /* If a background append only file rewriting is in progress we want to
239 * accumulate the differences between the child DB and the current one
240 * in a buffer, so that when the child process will do its work we
241 * can append the differences to the new append only file. */
242 if (server.bgrewritechildpid != -1)
243 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
244
245 sdsfree(buf);
246}
247
248/* In Redis commands are always executed in the context of a client, so in
249 * order to load the append only file we need to create a fake client. */
250struct redisClient *createFakeClient(void) {
251 struct redisClient *c = zmalloc(sizeof(*c));
252
253 selectDb(c,0);
254 c->fd = -1;
255 c->querybuf = sdsempty();
256 c->argc = 0;
257 c->argv = NULL;
2403fc9f 258 c->bufpos = 0;
e2641e09 259 c->flags = 0;
260 /* We set the fake client as a slave waiting for the synchronization
261 * so that Redis will not try to send replies to this client. */
262 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
263 c->reply = listCreate();
b67d2345 264 c->watched_keys = listCreate();
e2641e09 265 listSetFreeMethod(c->reply,decrRefCount);
266 listSetDupMethod(c->reply,dupClientReplyValue);
267 initClientMultiState(c);
268 return c;
269}
270
271void freeFakeClient(struct redisClient *c) {
272 sdsfree(c->querybuf);
273 listRelease(c->reply);
b67d2345 274 listRelease(c->watched_keys);
e2641e09 275 freeClientMultiState(c);
276 zfree(c);
277}
278
279/* Replay the append log file. On error REDIS_OK is returned. On non fatal
280 * error (the append only file is zero-length) REDIS_ERR is returned. On
281 * fatal error an error message is logged and the program exists. */
282int loadAppendOnlyFile(char *filename) {
283 struct redisClient *fakeClient;
284 FILE *fp = fopen(filename,"r");
285 struct redis_stat sb;
286 int appendonly = server.appendonly;
97e7f8ae 287 long loops = 0;
e2641e09 288
4aec2ec8 289 if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
b333e239 290 server.appendonly_current_size = 0;
4aec2ec8 291 fclose(fp);
e2641e09 292 return REDIS_ERR;
4aec2ec8 293 }
e2641e09 294
295 if (fp == NULL) {
296 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
297 exit(1);
298 }
299
300 /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
301 * to the same file we're about to read. */
302 server.appendonly = 0;
303
304 fakeClient = createFakeClient();
97e7f8ae 305 startLoading(fp);
306
e2641e09 307 while(1) {
308 int argc, j;
309 unsigned long len;
310 robj **argv;
311 char buf[128];
312 sds argsds;
313 struct redisCommand *cmd;
e2641e09 314
97e7f8ae 315 /* Serve the clients from time to time */
316 if (!(loops++ % 1000)) {
317 loadingProgress(ftello(fp));
318 aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
319 }
320
e2641e09 321 if (fgets(buf,sizeof(buf),fp) == NULL) {
322 if (feof(fp))
323 break;
324 else
325 goto readerr;
326 }
327 if (buf[0] != '*') goto fmterr;
328 argc = atoi(buf+1);
be6f6395
KM
329 if (argc < 1) goto fmterr;
330
e2641e09 331 argv = zmalloc(sizeof(robj*)*argc);
332 for (j = 0; j < argc; j++) {
333 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
334 if (buf[0] != '$') goto fmterr;
335 len = strtol(buf+1,NULL,10);
336 argsds = sdsnewlen(NULL,len);
337 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
338 argv[j] = createObject(REDIS_STRING,argsds);
339 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
340 }
341
342 /* Command lookup */
343 cmd = lookupCommand(argv[0]->ptr);
344 if (!cmd) {
345 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
346 exit(1);
347 }
e2641e09 348 /* Run the command in the context of a fake client */
349 fakeClient->argc = argc;
350 fakeClient->argv = argv;
351 cmd->proc(fakeClient);
57b07380
PN
352
353 /* The fake client should not have a reply */
354 redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
ef67a2fc 355 /* The fake client should never get blocked */
356 redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
57b07380 357
45b0f6fb
PN
358 /* Clean up. Command code may have changed argv/argc so we use the
359 * argv/argc of the client instead of the local variables. */
360 for (j = 0; j < fakeClient->argc; j++)
361 decrRefCount(fakeClient->argv[j]);
362 zfree(fakeClient->argv);
e2641e09 363 }
364
365 /* This point can only be reached when EOF is reached without errors.
366 * If the client is in the middle of a MULTI/EXEC, log error and quit. */
367 if (fakeClient->flags & REDIS_MULTI) goto readerr;
368
369 fclose(fp);
370 freeFakeClient(fakeClient);
371 server.appendonly = appendonly;
97e7f8ae 372 stopLoading();
b333e239 373 aofUpdateCurrentSize();
c66bf1fa 374 server.auto_aofrewrite_base_size = server.appendonly_current_size;
e2641e09 375 return REDIS_OK;
376
377readerr:
378 if (feof(fp)) {
379 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
380 } else {
381 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
382 }
383 exit(1);
384fmterr:
412e457c 385 redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
e2641e09 386 exit(1);
387}
388
e2641e09 389/* Write a sequence of commands able to fully rebuild the dataset into
390 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
391int rewriteAppendOnlyFile(char *filename) {
392 dictIterator *di = NULL;
393 dictEntry *de;
394 FILE *fp;
395 char tmpfile[256];
396 int j;
397 time_t now = time(NULL);
398
399 /* Note that we have to use a different temp name here compared to the
400 * one used by rewriteAppendOnlyFileBackground() function. */
401 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
402 fp = fopen(tmpfile,"w");
403 if (!fp) {
404 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
405 return REDIS_ERR;
406 }
407 for (j = 0; j < server.dbnum; j++) {
408 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
409 redisDb *db = server.db+j;
410 dict *d = db->dict;
411 if (dictSize(d) == 0) continue;
591f29e0 412 di = dictGetSafeIterator(d);
e2641e09 413 if (!di) {
414 fclose(fp);
415 return REDIS_ERR;
416 }
417
418 /* SELECT the new DB */
419 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
420 if (fwriteBulkLongLong(fp,j) == 0) goto werr;
421
422 /* Iterate this DB writing every entry */
423 while((de = dictNext(di)) != NULL) {
6901fe77 424 sds keystr;
e2641e09 425 robj key, *o;
426 time_t expiretime;
e2641e09 427
428 keystr = dictGetEntryKey(de);
429 o = dictGetEntryVal(de);
430 initStaticStringObject(key,keystr);
16d77878 431
e2641e09 432 expiretime = getExpire(db,&key);
433
434 /* Save the key and associated value */
435 if (o->type == REDIS_STRING) {
436 /* Emit a SET command */
437 char cmd[]="*3\r\n$3\r\nSET\r\n";
438 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
439 /* Key and value */
440 if (fwriteBulkObject(fp,&key) == 0) goto werr;
441 if (fwriteBulkObject(fp,o) == 0) goto werr;
442 } else if (o->type == REDIS_LIST) {
443 /* Emit the RPUSHes needed to rebuild the list */
444 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
445 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
446 unsigned char *zl = o->ptr;
447 unsigned char *p = ziplistIndex(zl,0);
448 unsigned char *vstr;
449 unsigned int vlen;
450 long long vlong;
451
452 while(ziplistGet(p,&vstr,&vlen,&vlong)) {
453 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
454 if (fwriteBulkObject(fp,&key) == 0) goto werr;
455 if (vstr) {
456 if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
457 goto werr;
458 } else {
459 if (fwriteBulkLongLong(fp,vlong) == 0)
460 goto werr;
461 }
462 p = ziplistNext(zl,p);
463 }
464 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
465 list *list = o->ptr;
466 listNode *ln;
467 listIter li;
468
469 listRewind(list,&li);
470 while((ln = listNext(&li))) {
471 robj *eleobj = listNodeValue(ln);
472
473 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
474 if (fwriteBulkObject(fp,&key) == 0) goto werr;
475 if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
476 }
477 } else {
478 redisPanic("Unknown list encoding");
479 }
480 } else if (o->type == REDIS_SET) {
2767f1c0 481 char cmd[]="*3\r\n$4\r\nSADD\r\n";
e2641e09 482
2767f1c0
PN
483 /* Emit the SADDs needed to rebuild the set */
484 if (o->encoding == REDIS_ENCODING_INTSET) {
485 int ii = 0;
23c64fe5 486 int64_t llval;
2767f1c0
PN
487 while(intsetGet(o->ptr,ii++,&llval)) {
488 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
489 if (fwriteBulkObject(fp,&key) == 0) goto werr;
490 if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
491 }
492 } else if (o->encoding == REDIS_ENCODING_HT) {
493 dictIterator *di = dictGetIterator(o->ptr);
494 dictEntry *de;
495 while((de = dictNext(di)) != NULL) {
496 robj *eleobj = dictGetEntryKey(de);
497 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
498 if (fwriteBulkObject(fp,&key) == 0) goto werr;
499 if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
500 }
501 dictReleaseIterator(di);
502 } else {
503 redisPanic("Unknown set encoding");
e2641e09 504 }
e2641e09 505 } else if (o->type == REDIS_ZSET) {
506 /* Emit the ZADDs needed to rebuild the sorted set */
dddf5335
PN
507 char cmd[]="*4\r\n$4\r\nZADD\r\n";
508
509 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
510 unsigned char *zl = o->ptr;
511 unsigned char *eptr, *sptr;
512 unsigned char *vstr;
513 unsigned int vlen;
514 long long vll;
515 double score;
516
517 eptr = ziplistIndex(zl,0);
518 redisAssert(eptr != NULL);
519 sptr = ziplistNext(zl,eptr);
520 redisAssert(sptr != NULL);
521
522 while (eptr != NULL) {
523 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
524 score = zzlGetScore(sptr);
525
526 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
527 if (fwriteBulkObject(fp,&key) == 0) goto werr;
528 if (fwriteBulkDouble(fp,score) == 0) goto werr;
529 if (vstr != NULL) {
530 if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
531 goto werr;
532 } else {
533 if (fwriteBulkLongLong(fp,vll) == 0)
534 goto werr;
535 }
536 zzlNext(zl,&eptr,&sptr);
537 }
100ed062 538 } else if (o->encoding == REDIS_ENCODING_SKIPLIST) {
dddf5335
PN
539 zset *zs = o->ptr;
540 dictIterator *di = dictGetIterator(zs->dict);
541 dictEntry *de;
542
543 while((de = dictNext(di)) != NULL) {
544 robj *eleobj = dictGetEntryKey(de);
545 double *score = dictGetEntryVal(de);
546
547 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
548 if (fwriteBulkObject(fp,&key) == 0) goto werr;
549 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
550 if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
551 }
552 dictReleaseIterator(di);
553 } else {
554 redisPanic("Unknown sorted set encoding");
e2641e09 555 }
e2641e09 556 } else if (o->type == REDIS_HASH) {
557 char cmd[]="*4\r\n$4\r\nHSET\r\n";
558
559 /* Emit the HSETs needed to rebuild the hash */
560 if (o->encoding == REDIS_ENCODING_ZIPMAP) {
561 unsigned char *p = zipmapRewind(o->ptr);
562 unsigned char *field, *val;
563 unsigned int flen, vlen;
564
565 while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
566 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
567 if (fwriteBulkObject(fp,&key) == 0) goto werr;
daf2049d 568 if (fwriteBulkString(fp,(char*)field,flen) == 0)
5bd09cd4 569 goto werr;
daf2049d 570 if (fwriteBulkString(fp,(char*)val,vlen) == 0)
5bd09cd4 571 goto werr;
e2641e09 572 }
573 } else {
574 dictIterator *di = dictGetIterator(o->ptr);
575 dictEntry *de;
576
577 while((de = dictNext(di)) != NULL) {
578 robj *field = dictGetEntryKey(de);
579 robj *val = dictGetEntryVal(de);
580
581 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
582 if (fwriteBulkObject(fp,&key) == 0) goto werr;
5bd09cd4 583 if (fwriteBulkObject(fp,field) == 0) goto werr;
584 if (fwriteBulkObject(fp,val) == 0) goto werr;
e2641e09 585 }
586 dictReleaseIterator(di);
587 }
588 } else {
589 redisPanic("Unknown object type");
590 }
591 /* Save the expire time */
592 if (expiretime != -1) {
593 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
594 /* If this key is already expired skip it */
595 if (expiretime < now) continue;
596 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
597 if (fwriteBulkObject(fp,&key) == 0) goto werr;
598 if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
599 }
e2641e09 600 }
601 dictReleaseIterator(di);
602 }
603
604 /* Make sure data will not remain on the OS's output buffers */
605 fflush(fp);
606 aof_fsync(fileno(fp));
607 fclose(fp);
608
609 /* Use RENAME to make sure the DB file is changed atomically only
610 * if the generate DB file is ok. */
611 if (rename(tmpfile,filename) == -1) {
612 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
613 unlink(tmpfile);
614 return REDIS_ERR;
615 }
616 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
617 return REDIS_OK;
618
619werr:
620 fclose(fp);
621 unlink(tmpfile);
622 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
623 if (di) dictReleaseIterator(di);
624 return REDIS_ERR;
625}
626
627/* This is how rewriting of the append only file in background works:
628 *
629 * 1) The user calls BGREWRITEAOF
630 * 2) Redis calls this function, that forks():
631 * 2a) the child rewrite the append only file in a temp file.
632 * 2b) the parent accumulates differences in server.bgrewritebuf.
633 * 3) When the child finished '2a' exists.
634 * 4) The parent will trap the exit code, if it's OK, will append the
635 * data accumulated into server.bgrewritebuf into the temp file, and
636 * finally will rename(2) the temp file in the actual file name.
637 * The the new file is reopened as the new append only file. Profit!
638 */
639int rewriteAppendOnlyFileBackground(void) {
640 pid_t childpid;
615e414c 641 long long start;
e2641e09 642
643 if (server.bgrewritechildpid != -1) return REDIS_ERR;
615e414c 644 start = ustime();
e2641e09 645 if ((childpid = fork()) == 0) {
e2641e09 646 char tmpfile[256];
647
615e414c 648 /* Child */
a5639e7d
PN
649 if (server.ipfd > 0) close(server.ipfd);
650 if (server.sofd > 0) close(server.sofd);
e2641e09 651 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
652 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
653 _exit(0);
654 } else {
655 _exit(1);
656 }
657 } else {
658 /* Parent */
615e414c 659 server.stat_fork_time = ustime()-start;
e2641e09 660 if (childpid == -1) {
661 redisLog(REDIS_WARNING,
662 "Can't rewrite append only file in background: fork: %s",
663 strerror(errno));
664 return REDIS_ERR;
665 }
666 redisLog(REDIS_NOTICE,
667 "Background append only file rewriting started by pid %d",childpid);
668 server.bgrewritechildpid = childpid;
669 updateDictResizePolicy();
670 /* We set appendseldb to -1 in order to force the next call to the
671 * feedAppendOnlyFile() to issue a SELECT command, so the differences
672 * accumulated by the parent into server.bgrewritebuf will start
673 * with a SELECT statement and it will be safe to merge. */
674 server.appendseldb = -1;
675 return REDIS_OK;
676 }
677 return REDIS_OK; /* unreached */
678}
679
680void bgrewriteaofCommand(redisClient *c) {
681 if (server.bgrewritechildpid != -1) {
3ab20376 682 addReplyError(c,"Background append only file rewriting already in progress");
b333e239 683 } else if (server.bgsavechildpid != -1) {
684 server.aofrewrite_scheduled = 1;
9e40bce3 685 addReplyStatus(c,"Background append only file rewriting scheduled");
b333e239 686 } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
3ab20376 687 addReplyStatus(c,"Background append only file rewriting started");
e2641e09 688 } else {
689 addReply(c,shared.err);
690 }
691}
692
693void aofRemoveTempFile(pid_t childpid) {
694 char tmpfile[256];
695
696 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
697 unlink(tmpfile);
698}
699
b333e239 700/* Update the server.appendonly_current_size filed explicitly using stat(2)
701 * to check the size of the file. This is useful after a rewrite or after
702 * a restart, normally the size is updated just adding the write length
703 * to the current lenght, that is much faster. */
704void aofUpdateCurrentSize(void) {
705 struct redis_stat sb;
706
707 if (redis_fstat(server.appendfd,&sb) == -1) {
708 redisLog(REDIS_WARNING,"Unable to check the AOF length: %s",
709 strerror(errno));
710 } else {
711 server.appendonly_current_size = sb.st_size;
712 }
713}
714
e2641e09 715/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
716 * Handle this. */
36c17a53 717void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
e2641e09 718 if (!bysignal && exitcode == 0) {
b454056d
PN
719 int newfd, oldfd;
720 int nwritten;
e2641e09 721 char tmpfile[256];
b454056d 722 long long now = ustime();
e2641e09 723
724 redisLog(REDIS_NOTICE,
b454056d
PN
725 "Background AOF rewrite terminated with success");
726
986630af 727 /* Flush the differences accumulated by the parent to the
728 * rewritten AOF. */
729 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
730 (int)server.bgrewritechildpid);
b454056d
PN
731 newfd = open(tmpfile,O_WRONLY|O_APPEND);
732 if (newfd == -1) {
733 redisLog(REDIS_WARNING,
734 "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
e2641e09 735 goto cleanup;
736 }
b454056d
PN
737
738 nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf));
739 if (nwritten != (signed)sdslen(server.bgrewritebuf)) {
740 if (nwritten == -1) {
741 redisLog(REDIS_WARNING,
742 "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
743 } else {
744 redisLog(REDIS_WARNING,
745 "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
746 }
747 close(newfd);
e2641e09 748 goto cleanup;
749 }
b454056d
PN
750
751 redisLog(REDIS_NOTICE,
752 "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);
753
754 /* The only remaining thing to do is to rename the temporary file to
755 * the configured file and switch the file descriptor used to do AOF
986630af 756 * writes. We don't want close(2) or rename(2) calls to block the
757 * server on old file deletion.
758 *
759 * There are two possible scenarios:
b454056d
PN
760 *
761 * 1) AOF is DISABLED and this was a one time rewrite. The temporary
762 * file will be renamed to the configured file. When this file already
763 * exists, it will be unlinked, which may block the server.
764 *
765 * 2) AOF is ENABLED and the rewritten AOF will immediately start
766 * receiving writes. After the temporary file is renamed to the
767 * configured file, the original AOF file descriptor will be closed.
768 * Since this will be the last reference to that file, closing it
769 * causes the underlying file to be unlinked, which may block the
770 * server.
771 *
772 * To mitigate the blocking effect of the unlink operation (either
773 * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
986630af 774 * use a background thread to take care of this. First, we
b454056d
PN
775 * make scenario 1 identical to scenario 2 by opening the target file
776 * when it exists. The unlink operation after the rename(2) will then
777 * be executed upon calling close(2) for its descriptor. Everything to
778 * guarantee atomicity for this switch has already happened by then, so
779 * we don't care what the outcome or duration of that close operation
780 * is, as long as the file descriptor is released again. */
781 if (server.appendfd == -1) {
782 /* AOF disabled */
b454056d 783
986630af 784 /* Don't care if this fails: oldfd will be -1 and we handle that.
785 * One notable case of -1 return is if the old file does
786 * not exist. */
787 oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK);
b454056d
PN
788 } else {
789 /* AOF enabled */
986630af 790 oldfd = -1; /* We'll set this to the current AOF filedes later. */
b454056d
PN
791 }
792
793 /* Rename the temporary file. This will not unlink the target file if
794 * it exists, because we reference it with "oldfd". */
e2641e09 795 if (rename(tmpfile,server.appendfilename) == -1) {
b454056d
PN
796 redisLog(REDIS_WARNING,
797 "Error trying to rename the temporary AOF: %s", strerror(errno));
798 close(newfd);
986630af 799 if (oldfd != -1) close(oldfd);
e2641e09 800 goto cleanup;
801 }
b454056d
PN
802
803 if (server.appendfd == -1) {
986630af 804 /* AOF disabled, we don't need to set the AOF file descriptor
805 * to this new file, so we can close it. */
b454056d
PN
806 close(newfd);
807 } else {
986630af 808 /* AOF enabled, replace the old fd with the new one. */
b454056d
PN
809 oldfd = server.appendfd;
810 server.appendfd = newfd;
4b77700a 811 if (server.appendfsync == APPENDFSYNC_ALWAYS)
812 aof_fsync(newfd);
813 else if (server.appendfsync == APPENDFSYNC_EVERYSEC)
814 aof_background_fsync(newfd);
b454056d 815 server.appendseldb = -1; /* Make sure SELECT is re-issued */
b333e239 816 aofUpdateCurrentSize();
c66bf1fa 817 server.auto_aofrewrite_base_size = server.appendonly_current_size;
5f54a5e6
PN
818
819 /* Clear regular AOF buffer since its contents was just written to
820 * the new AOF from the background rewrite buffer. */
821 sdsfree(server.aofbuf);
822 server.aofbuf = sdsempty();
e2641e09 823 }
b454056d
PN
824
825 redisLog(REDIS_NOTICE, "Background AOF rewrite successful");
826
827 /* Asynchronously close the overwritten AOF. */
50be9b97 828 if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
b454056d
PN
829
830 redisLog(REDIS_VERBOSE,
831 "Background AOF rewrite signal handler took %lldus", ustime()-now);
e2641e09 832 } else if (!bysignal && exitcode != 0) {
b454056d
PN
833 redisLog(REDIS_WARNING,
834 "Background AOF rewrite terminated with error");
e2641e09 835 } else {
836 redisLog(REDIS_WARNING,
b454056d 837 "Background AOF rewrite terminated by signal %d", bysignal);
e2641e09 838 }
b454056d 839
e2641e09 840cleanup:
841 sdsfree(server.bgrewritebuf);
842 server.bgrewritebuf = sdsempty();
843 aofRemoveTempFile(server.bgrewritechildpid);
844 server.bgrewritechildpid = -1;
845}