]> git.saurik.com Git - redis.git/blob - src/aof.c
server.replstate -> server.repl_state
[redis.git] / src / aof.c
1 #include "redis.h"
2 #include "bio.h"
3 #include "rio.h"
4
5 #include <signal.h>
6 #include <fcntl.h>
7 #include <sys/stat.h>
8 #include <sys/types.h>
9 #include <sys/time.h>
10 #include <sys/resource.h>
11 #include <sys/wait.h>
12
13 void aofUpdateCurrentSize(void);
14
15 void aof_background_fsync(int fd) {
16 bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
17 }
18
19 /* Called when the user switches from "appendonly yes" to "appendonly no"
20 * at runtime using the CONFIG command. */
21 void stopAppendOnly(void) {
22 redisAssert(server.aof_state != REDIS_AOF_OFF);
23 flushAppendOnlyFile(1);
24 aof_fsync(server.aof_fd);
25 close(server.aof_fd);
26
27 server.aof_fd = -1;
28 server.aof_selected_db = -1;
29 server.aof_state = REDIS_AOF_OFF;
30 /* rewrite operation in progress? kill it, wait child exit */
31 if (server.aof_child_pid != -1) {
32 int statloc;
33
34 if (kill(server.aof_child_pid,SIGKILL) != -1)
35 wait3(&statloc,0,NULL);
36 /* reset the buffer accumulating changes while the child saves */
37 sdsfree(server.aof_rewrite_buf);
38 server.aof_rewrite_buf = sdsempty();
39 aofRemoveTempFile(server.aof_child_pid);
40 server.aof_child_pid = -1;
41 }
42 }
43
44 /* Called when the user switches from "appendonly no" to "appendonly yes"
45 * at runtime using the CONFIG command. */
46 int startAppendOnly(void) {
47 server.aof_last_fsync = time(NULL);
48 server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);
49 redisAssert(server.aof_state == REDIS_AOF_OFF);
50 if (server.aof_fd == -1) {
51 redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't open the append only file: %s",strerror(errno));
52 return REDIS_ERR;
53 }
54 if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {
55 close(server.aof_fd);
56 redisLog(REDIS_WARNING,"Redis needs to enable the AOF but can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.");
57 return REDIS_ERR;
58 }
59 /* We correctly switched on AOF, now wait for the rerwite to be complete
60 * in order to append data on disk. */
61 server.aof_state = REDIS_AOF_WAIT_REWRITE;
62 return REDIS_OK;
63 }
64
65 /* Write the append only file buffer on disk.
66 *
67 * Since we are required to write the AOF before replying to the client,
68 * and the only way the client socket can get a write is entering when the
69 * the event loop, we accumulate all the AOF writes in a memory
70 * buffer and write it on disk using this function just before entering
71 * the event loop again.
72 *
73 * About the 'force' argument:
74 *
75 * When the fsync policy is set to 'everysec' we may delay the flush if there
76 * is still an fsync() going on in the background thread, since for instance
77 * on Linux write(2) will be blocked by the background fsync anyway.
78 * When this happens we remember that there is some aof buffer to be
79 * flushed ASAP, and will try to do that in the serverCron() function.
80 *
81 * However if force is set to 1 we'll write regardless of the background
82 * fsync. */
83 void flushAppendOnlyFile(int force) {
84 ssize_t nwritten;
85 int sync_in_progress = 0;
86
87 if (sdslen(server.aof_buf) == 0) return;
88
89 if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
90 sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
91
92 if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
93 /* With this append fsync policy we do background fsyncing.
94 * If the fsync is still in progress we can try to delay
95 * the write for a couple of seconds. */
96 if (sync_in_progress) {
97 if (server.aof_flush_postponed_start == 0) {
98 /* No previous write postponinig, remember that we are
99 * postponing the flush and return. */
100 server.aof_flush_postponed_start = server.unixtime;
101 return;
102 } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
103 /* We were already waiting for fsync to finish, but for less
104 * than two seconds this is still ok. Postpone again. */
105 return;
106 }
107 /* Otherwise fall trough, and go write since we can't wait
108 * over two seconds. */
109 redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
110 }
111 }
112 /* If you are following this code path, then we are going to write so
113 * set reset the postponed flush sentinel to zero. */
114 server.aof_flush_postponed_start = 0;
115
116 /* We want to perform a single write. This should be guaranteed atomic
117 * at least if the filesystem we are writing is a real physical one.
118 * While this will save us against the server being killed I don't think
119 * there is much to do about the whole server stopping for power problems
120 * or alike */
121 nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
122 if (nwritten != (signed)sdslen(server.aof_buf)) {
123 /* Ooops, we are in troubles. The best thing to do for now is
124 * aborting instead of giving the illusion that everything is
125 * working as expected. */
126 if (nwritten == -1) {
127 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
128 } else {
129 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
130 }
131 exit(1);
132 }
133 server.aof_current_size += nwritten;
134
135 /* Re-use AOF buffer when it is small enough. The maximum comes from the
136 * arena size of 4k minus some overhead (but is otherwise arbitrary). */
137 if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
138 sdsclear(server.aof_buf);
139 } else {
140 sdsfree(server.aof_buf);
141 server.aof_buf = sdsempty();
142 }
143
144 /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
145 * children doing I/O in the background. */
146 if (server.aof_no_fsync_on_rewrite &&
147 (server.aof_child_pid != -1 || server.rdb_child_pid != -1))
148 return;
149
150 /* Perform the fsync if needed. */
151 if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
152 /* aof_fsync is defined as fdatasync() for Linux in order to avoid
153 * flushing metadata. */
154 aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
155 server.aof_last_fsync = server.unixtime;
156 } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
157 server.unixtime > server.aof_last_fsync)) {
158 if (!sync_in_progress) aof_background_fsync(server.aof_fd);
159 server.aof_last_fsync = server.unixtime;
160 }
161 }
162
163 sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
164 char buf[32];
165 int len, j;
166 robj *o;
167
168 buf[0] = '*';
169 len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
170 buf[len++] = '\r';
171 buf[len++] = '\n';
172 dst = sdscatlen(dst,buf,len);
173
174 for (j = 0; j < argc; j++) {
175 o = getDecodedObject(argv[j]);
176 buf[0] = '$';
177 len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
178 buf[len++] = '\r';
179 buf[len++] = '\n';
180 dst = sdscatlen(dst,buf,len);
181 dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
182 dst = sdscatlen(dst,"\r\n",2);
183 decrRefCount(o);
184 }
185 return dst;
186 }
187
188 /* Create the sds representation of an PEXPIREAT command, using
189 * 'seconds' as time to live and 'cmd' to understand what command
190 * we are translating into a PEXPIREAT.
191 *
192 * This command is used in order to translate EXPIRE and PEXPIRE commands
193 * into PEXPIREAT command so that we retain precision in the append only
194 * file, and the time is always absolute and not relative. */
195 sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, robj *seconds) {
196 long long when;
197 robj *argv[3];
198
199 /* Make sure we can use strtol */
200 seconds = getDecodedObject(seconds);
201 when = strtoll(seconds->ptr,NULL,10);
202 /* Convert argument into milliseconds for EXPIRE, SETEX, EXPIREAT */
203 if (cmd->proc == expireCommand || cmd->proc == setexCommand ||
204 cmd->proc == expireatCommand)
205 {
206 when *= 1000;
207 }
208 /* Convert into absolute time for EXPIRE, PEXPIRE, SETEX, PSETEX */
209 if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
210 cmd->proc == setexCommand || cmd->proc == psetexCommand)
211 {
212 when += mstime();
213 }
214 decrRefCount(seconds);
215
216 argv[0] = createStringObject("PEXPIREAT",9);
217 argv[1] = key;
218 argv[2] = createStringObjectFromLongLong(when);
219 buf = catAppendOnlyGenericCommand(buf, 3, argv);
220 decrRefCount(argv[0]);
221 decrRefCount(argv[2]);
222 return buf;
223 }
224
225 void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
226 sds buf = sdsempty();
227 robj *tmpargv[3];
228
229 /* The DB this command was targetting is not the same as the last command
230 * we appendend. To issue a SELECT command is needed. */
231 if (dictid != server.aof_selected_db) {
232 char seldb[64];
233
234 snprintf(seldb,sizeof(seldb),"%d",dictid);
235 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
236 (unsigned long)strlen(seldb),seldb);
237 server.aof_selected_db = dictid;
238 }
239
240 if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
241 cmd->proc == expireatCommand) {
242 /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
243 buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
244 } else if (cmd->proc == setexCommand || cmd->proc == psetexCommand) {
245 /* Translate SETEX/PSETEX to SET and PEXPIREAT */
246 tmpargv[0] = createStringObject("SET",3);
247 tmpargv[1] = argv[1];
248 tmpargv[2] = argv[3];
249 buf = catAppendOnlyGenericCommand(buf,3,tmpargv);
250 decrRefCount(tmpargv[0]);
251 buf = catAppendOnlyExpireAtCommand(buf,cmd,argv[1],argv[2]);
252 } else {
253 /* All the other commands don't need translation or need the
254 * same translation already operated in the command vector
255 * for the replication itself. */
256 buf = catAppendOnlyGenericCommand(buf,argc,argv);
257 }
258
259 /* Append to the AOF buffer. This will be flushed on disk just before
260 * of re-entering the event loop, so before the client will get a
261 * positive reply about the operation performed. */
262 if (server.aof_state == REDIS_AOF_ON)
263 server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
264
265 /* If a background append only file rewriting is in progress we want to
266 * accumulate the differences between the child DB and the current one
267 * in a buffer, so that when the child process will do its work we
268 * can append the differences to the new append only file. */
269 if (server.aof_child_pid != -1)
270 server.aof_rewrite_buf = sdscatlen(server.aof_rewrite_buf,buf,sdslen(buf));
271
272 sdsfree(buf);
273 }
274
275 /* In Redis commands are always executed in the context of a client, so in
276 * order to load the append only file we need to create a fake client. */
277 struct redisClient *createFakeClient(void) {
278 struct redisClient *c = zmalloc(sizeof(*c));
279
280 selectDb(c,0);
281 c->fd = -1;
282 c->querybuf = sdsempty();
283 c->argc = 0;
284 c->argv = NULL;
285 c->bufpos = 0;
286 c->flags = 0;
287 /* We set the fake client as a slave waiting for the synchronization
288 * so that Redis will not try to send replies to this client. */
289 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
290 c->reply = listCreate();
291 c->watched_keys = listCreate();
292 listSetFreeMethod(c->reply,decrRefCount);
293 listSetDupMethod(c->reply,dupClientReplyValue);
294 initClientMultiState(c);
295 return c;
296 }
297
298 void freeFakeClient(struct redisClient *c) {
299 sdsfree(c->querybuf);
300 listRelease(c->reply);
301 listRelease(c->watched_keys);
302 freeClientMultiState(c);
303 zfree(c);
304 }
305
306 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
307 * error (the append only file is zero-length) REDIS_ERR is returned. On
308 * fatal error an error message is logged and the program exists. */
309 int loadAppendOnlyFile(char *filename) {
310 struct redisClient *fakeClient;
311 FILE *fp = fopen(filename,"r");
312 struct redis_stat sb;
313 int old_aof_state = server.aof_state;
314 long loops = 0;
315
316 if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
317 server.aof_current_size = 0;
318 fclose(fp);
319 return REDIS_ERR;
320 }
321
322 if (fp == NULL) {
323 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
324 exit(1);
325 }
326
327 /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
328 * to the same file we're about to read. */
329 server.aof_state = REDIS_AOF_OFF;
330
331 fakeClient = createFakeClient();
332 startLoading(fp);
333
334 while(1) {
335 int argc, j;
336 unsigned long len;
337 robj **argv;
338 char buf[128];
339 sds argsds;
340 struct redisCommand *cmd;
341
342 /* Serve the clients from time to time */
343 if (!(loops++ % 1000)) {
344 loadingProgress(ftello(fp));
345 aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
346 }
347
348 if (fgets(buf,sizeof(buf),fp) == NULL) {
349 if (feof(fp))
350 break;
351 else
352 goto readerr;
353 }
354 if (buf[0] != '*') goto fmterr;
355 argc = atoi(buf+1);
356 if (argc < 1) goto fmterr;
357
358 argv = zmalloc(sizeof(robj*)*argc);
359 for (j = 0; j < argc; j++) {
360 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
361 if (buf[0] != '$') goto fmterr;
362 len = strtol(buf+1,NULL,10);
363 argsds = sdsnewlen(NULL,len);
364 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
365 argv[j] = createObject(REDIS_STRING,argsds);
366 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
367 }
368
369 /* Command lookup */
370 cmd = lookupCommand(argv[0]->ptr);
371 if (!cmd) {
372 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
373 exit(1);
374 }
375 /* Run the command in the context of a fake client */
376 fakeClient->argc = argc;
377 fakeClient->argv = argv;
378 cmd->proc(fakeClient);
379
380 /* The fake client should not have a reply */
381 redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
382 /* The fake client should never get blocked */
383 redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
384
385 /* Clean up. Command code may have changed argv/argc so we use the
386 * argv/argc of the client instead of the local variables. */
387 for (j = 0; j < fakeClient->argc; j++)
388 decrRefCount(fakeClient->argv[j]);
389 zfree(fakeClient->argv);
390 }
391
392 /* This point can only be reached when EOF is reached without errors.
393 * If the client is in the middle of a MULTI/EXEC, log error and quit. */
394 if (fakeClient->flags & REDIS_MULTI) goto readerr;
395
396 fclose(fp);
397 freeFakeClient(fakeClient);
398 server.aof_state = old_aof_state;
399 stopLoading();
400 aofUpdateCurrentSize();
401 server.aof_rewrite_base_size = server.aof_current_size;
402 return REDIS_OK;
403
404 readerr:
405 if (feof(fp)) {
406 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
407 } else {
408 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
409 }
410 exit(1);
411 fmterr:
412 redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
413 exit(1);
414 }
415
416 /* Delegate writing an object to writing a bulk string or bulk long long.
417 * This is not placed in rio.c since that adds the redis.h dependency. */
418 int rioWriteBulkObject(rio *r, robj *obj) {
419 /* Avoid using getDecodedObject to help copy-on-write (we are often
420 * in a child process when this function is called). */
421 if (obj->encoding == REDIS_ENCODING_INT) {
422 return rioWriteBulkLongLong(r,(long)obj->ptr);
423 } else if (obj->encoding == REDIS_ENCODING_RAW) {
424 return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr));
425 } else {
426 redisPanic("Unknown string encoding");
427 }
428 }
429
430 /* Emit the commands needed to rebuild a list object.
431 * The function returns 0 on error, 1 on success. */
432 int rewriteListObject(rio *r, robj *key, robj *o) {
433 long long count = 0, items = listTypeLength(o);
434
435 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
436 unsigned char *zl = o->ptr;
437 unsigned char *p = ziplistIndex(zl,0);
438 unsigned char *vstr;
439 unsigned int vlen;
440 long long vlong;
441
442 while(ziplistGet(p,&vstr,&vlen,&vlong)) {
443 if (count == 0) {
444 int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
445 REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
446
447 if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
448 if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
449 if (rioWriteBulkObject(r,key) == 0) return 0;
450 }
451 if (vstr) {
452 if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0;
453 } else {
454 if (rioWriteBulkLongLong(r,vlong) == 0) return 0;
455 }
456 p = ziplistNext(zl,p);
457 if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
458 items--;
459 }
460 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
461 list *list = o->ptr;
462 listNode *ln;
463 listIter li;
464
465 listRewind(list,&li);
466 while((ln = listNext(&li))) {
467 robj *eleobj = listNodeValue(ln);
468
469 if (count == 0) {
470 int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
471 REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
472
473 if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
474 if (rioWriteBulkString(r,"RPUSH",5) == 0) return 0;
475 if (rioWriteBulkObject(r,key) == 0) return 0;
476 }
477 if (rioWriteBulkObject(r,eleobj) == 0) return 0;
478 if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
479 items--;
480 }
481 } else {
482 redisPanic("Unknown list encoding");
483 }
484 return 1;
485 }
486
487 /* Emit the commands needed to rebuild a set object.
488 * The function returns 0 on error, 1 on success. */
489 int rewriteSetObject(rio *r, robj *key, robj *o) {
490 long long count = 0, items = setTypeSize(o);
491
492 if (o->encoding == REDIS_ENCODING_INTSET) {
493 int ii = 0;
494 int64_t llval;
495
496 while(intsetGet(o->ptr,ii++,&llval)) {
497 if (count == 0) {
498 int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
499 REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
500
501 if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
502 if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
503 if (rioWriteBulkObject(r,key) == 0) return 0;
504 }
505 if (rioWriteBulkLongLong(r,llval) == 0) return 0;
506 if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
507 items--;
508 }
509 } else if (o->encoding == REDIS_ENCODING_HT) {
510 dictIterator *di = dictGetIterator(o->ptr);
511 dictEntry *de;
512
513 while((de = dictNext(di)) != NULL) {
514 robj *eleobj = dictGetKey(de);
515 if (count == 0) {
516 int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
517 REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
518
519 if (rioWriteBulkCount(r,'*',2+cmd_items) == 0) return 0;
520 if (rioWriteBulkString(r,"SADD",4) == 0) return 0;
521 if (rioWriteBulkObject(r,key) == 0) return 0;
522 }
523 if (rioWriteBulkObject(r,eleobj) == 0) return 0;
524 if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
525 items--;
526 }
527 dictReleaseIterator(di);
528 } else {
529 redisPanic("Unknown set encoding");
530 }
531 return 1;
532 }
533
534 /* Emit the commands needed to rebuild a sorted set object.
535 * The function returns 0 on error, 1 on success. */
536 int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
537 long long count = 0, items = zsetLength(o);
538
539 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
540 unsigned char *zl = o->ptr;
541 unsigned char *eptr, *sptr;
542 unsigned char *vstr;
543 unsigned int vlen;
544 long long vll;
545 double score;
546
547 eptr = ziplistIndex(zl,0);
548 redisAssert(eptr != NULL);
549 sptr = ziplistNext(zl,eptr);
550 redisAssert(sptr != NULL);
551
552 while (eptr != NULL) {
553 redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
554 score = zzlGetScore(sptr);
555
556 if (count == 0) {
557 int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
558 REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
559
560 if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
561 if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
562 if (rioWriteBulkObject(r,key) == 0) return 0;
563 }
564 if (rioWriteBulkDouble(r,score) == 0) return 0;
565 if (vstr != NULL) {
566 if (rioWriteBulkString(r,(char*)vstr,vlen) == 0) return 0;
567 } else {
568 if (rioWriteBulkLongLong(r,vll) == 0) return 0;
569 }
570 zzlNext(zl,&eptr,&sptr);
571 if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
572 items--;
573 }
574 } else if (o->encoding == REDIS_ENCODING_SKIPLIST) {
575 zset *zs = o->ptr;
576 dictIterator *di = dictGetIterator(zs->dict);
577 dictEntry *de;
578
579 while((de = dictNext(di)) != NULL) {
580 robj *eleobj = dictGetKey(de);
581 double *score = dictGetVal(de);
582
583 if (count == 0) {
584 int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
585 REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
586
587 if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
588 if (rioWriteBulkString(r,"ZADD",4) == 0) return 0;
589 if (rioWriteBulkObject(r,key) == 0) return 0;
590 }
591 if (rioWriteBulkDouble(r,*score) == 0) return 0;
592 if (rioWriteBulkObject(r,eleobj) == 0) return 0;
593 if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
594 items--;
595 }
596 dictReleaseIterator(di);
597 } else {
598 redisPanic("Unknown sorted zset encoding");
599 }
600 return 1;
601 }
602
603 /* Emit the commands needed to rebuild a hash object.
604 * The function returns 0 on error, 1 on success. */
605 int rewriteHashObject(rio *r, robj *key, robj *o) {
606 long long count = 0, items = hashTypeLength(o);
607
608 if (o->encoding == REDIS_ENCODING_ZIPMAP) {
609 unsigned char *p = zipmapRewind(o->ptr);
610 unsigned char *field, *val;
611 unsigned int flen, vlen;
612
613 while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
614 if (count == 0) {
615 int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
616 REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
617
618 if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
619 if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
620 if (rioWriteBulkObject(r,key) == 0) return 0;
621 }
622 if (rioWriteBulkString(r,(char*)field,flen) == 0) return 0;
623 if (rioWriteBulkString(r,(char*)val,vlen) == 0) return 0;
624 if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
625 items--;
626 }
627 } else {
628 dictIterator *di = dictGetIterator(o->ptr);
629 dictEntry *de;
630
631 while((de = dictNext(di)) != NULL) {
632 robj *field = dictGetKey(de);
633 robj *val = dictGetVal(de);
634
635 if (count == 0) {
636 int cmd_items = (items > REDIS_AOF_REWRITE_ITEMS_PER_CMD) ?
637 REDIS_AOF_REWRITE_ITEMS_PER_CMD : items;
638
639 if (rioWriteBulkCount(r,'*',2+cmd_items*2) == 0) return 0;
640 if (rioWriteBulkString(r,"HMSET",5) == 0) return 0;
641 if (rioWriteBulkObject(r,key) == 0) return 0;
642 }
643 if (rioWriteBulkObject(r,field) == 0) return 0;
644 if (rioWriteBulkObject(r,val) == 0) return 0;
645 if (++count == REDIS_AOF_REWRITE_ITEMS_PER_CMD) count = 0;
646 items--;
647 }
648 dictReleaseIterator(di);
649 }
650 return 1;
651 }
652
653 /* Write a sequence of commands able to fully rebuild the dataset into
654 * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
655 *
656 * In order to minimize the number of commands needed in the rewritten
657 * log Redis uses variadic commands when possible, such as RPUSH, SADD
658 * and ZADD. However at max REDIS_AOF_REWRITE_ITEMS_PER_CMD items per time
659 * are inserted using a single command. */
660 int rewriteAppendOnlyFile(char *filename) {
661 dictIterator *di = NULL;
662 dictEntry *de;
663 rio aof;
664 FILE *fp;
665 char tmpfile[256];
666 int j;
667 long long now = mstime();
668
669 /* Note that we have to use a different temp name here compared to the
670 * one used by rewriteAppendOnlyFileBackground() function. */
671 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
672 fp = fopen(tmpfile,"w");
673 if (!fp) {
674 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
675 return REDIS_ERR;
676 }
677
678 rioInitWithFile(&aof,fp);
679 for (j = 0; j < server.dbnum; j++) {
680 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
681 redisDb *db = server.db+j;
682 dict *d = db->dict;
683 if (dictSize(d) == 0) continue;
684 di = dictGetSafeIterator(d);
685 if (!di) {
686 fclose(fp);
687 return REDIS_ERR;
688 }
689
690 /* SELECT the new DB */
691 if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
692 if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
693
694 /* Iterate this DB writing every entry */
695 while((de = dictNext(di)) != NULL) {
696 sds keystr;
697 robj key, *o;
698 long long expiretime;
699
700 keystr = dictGetKey(de);
701 o = dictGetVal(de);
702 initStaticStringObject(key,keystr);
703
704 expiretime = getExpire(db,&key);
705
706 /* Save the key and associated value */
707 if (o->type == REDIS_STRING) {
708 /* Emit a SET command */
709 char cmd[]="*3\r\n$3\r\nSET\r\n";
710 if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
711 /* Key and value */
712 if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
713 if (rioWriteBulkObject(&aof,o) == 0) goto werr;
714 } else if (o->type == REDIS_LIST) {
715 if (rewriteListObject(&aof,&key,o) == 0) goto werr;
716 } else if (o->type == REDIS_SET) {
717 if (rewriteSetObject(&aof,&key,o) == 0) goto werr;
718 } else if (o->type == REDIS_ZSET) {
719 if (rewriteSortedSetObject(&aof,&key,o) == 0) goto werr;
720 } else if (o->type == REDIS_HASH) {
721 if (rewriteHashObject(&aof,&key,o) == 0) goto werr;
722 } else {
723 redisPanic("Unknown object type");
724 }
725 /* Save the expire time */
726 if (expiretime != -1) {
727 char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
728 /* If this key is already expired skip it */
729 if (expiretime < now) continue;
730 if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
731 if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
732 if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
733 }
734 }
735 dictReleaseIterator(di);
736 }
737
738 /* Make sure data will not remain on the OS's output buffers */
739 fflush(fp);
740 aof_fsync(fileno(fp));
741 fclose(fp);
742
743 /* Use RENAME to make sure the DB file is changed atomically only
744 * if the generate DB file is ok. */
745 if (rename(tmpfile,filename) == -1) {
746 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
747 unlink(tmpfile);
748 return REDIS_ERR;
749 }
750 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
751 return REDIS_OK;
752
753 werr:
754 fclose(fp);
755 unlink(tmpfile);
756 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
757 if (di) dictReleaseIterator(di);
758 return REDIS_ERR;
759 }
760
761 /* This is how rewriting of the append only file in background works:
762 *
763 * 1) The user calls BGREWRITEAOF
764 * 2) Redis calls this function, that forks():
765 * 2a) the child rewrite the append only file in a temp file.
766 * 2b) the parent accumulates differences in server.aof_rewrite_buf.
767 * 3) When the child finished '2a' exists.
768 * 4) The parent will trap the exit code, if it's OK, will append the
769 * data accumulated into server.aof_rewrite_buf into the temp file, and
770 * finally will rename(2) the temp file in the actual file name.
771 * The the new file is reopened as the new append only file. Profit!
772 */
773 int rewriteAppendOnlyFileBackground(void) {
774 pid_t childpid;
775 long long start;
776
777 if (server.aof_child_pid != -1) return REDIS_ERR;
778 start = ustime();
779 if ((childpid = fork()) == 0) {
780 char tmpfile[256];
781
782 /* Child */
783 if (server.ipfd > 0) close(server.ipfd);
784 if (server.sofd > 0) close(server.sofd);
785 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
786 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
787 _exit(0);
788 } else {
789 _exit(1);
790 }
791 } else {
792 /* Parent */
793 server.stat_fork_time = ustime()-start;
794 if (childpid == -1) {
795 redisLog(REDIS_WARNING,
796 "Can't rewrite append only file in background: fork: %s",
797 strerror(errno));
798 return REDIS_ERR;
799 }
800 redisLog(REDIS_NOTICE,
801 "Background append only file rewriting started by pid %d",childpid);
802 server.aof_rewrite_scheduled = 0;
803 server.aof_child_pid = childpid;
804 updateDictResizePolicy();
805 /* We set appendseldb to -1 in order to force the next call to the
806 * feedAppendOnlyFile() to issue a SELECT command, so the differences
807 * accumulated by the parent into server.aof_rewrite_buf will start
808 * with a SELECT statement and it will be safe to merge. */
809 server.aof_selected_db = -1;
810 return REDIS_OK;
811 }
812 return REDIS_OK; /* unreached */
813 }
814
815 void bgrewriteaofCommand(redisClient *c) {
816 if (server.aof_child_pid != -1) {
817 addReplyError(c,"Background append only file rewriting already in progress");
818 } else if (server.rdb_child_pid != -1) {
819 server.aof_rewrite_scheduled = 1;
820 addReplyStatus(c,"Background append only file rewriting scheduled");
821 } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
822 addReplyStatus(c,"Background append only file rewriting started");
823 } else {
824 addReply(c,shared.err);
825 }
826 }
827
828 void aofRemoveTempFile(pid_t childpid) {
829 char tmpfile[256];
830
831 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
832 unlink(tmpfile);
833 }
834
835 /* Update the server.aof_current_size filed explicitly using stat(2)
836 * to check the size of the file. This is useful after a rewrite or after
837 * a restart, normally the size is updated just adding the write length
838 * to the current lenght, that is much faster. */
839 void aofUpdateCurrentSize(void) {
840 struct redis_stat sb;
841
842 if (redis_fstat(server.aof_fd,&sb) == -1) {
843 redisLog(REDIS_WARNING,"Unable to check the AOF length: %s",
844 strerror(errno));
845 } else {
846 server.aof_current_size = sb.st_size;
847 }
848 }
849
850 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
851 * Handle this. */
852 void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
853 if (!bysignal && exitcode == 0) {
854 int newfd, oldfd;
855 int nwritten;
856 char tmpfile[256];
857 long long now = ustime();
858
859 redisLog(REDIS_NOTICE,
860 "Background AOF rewrite terminated with success");
861
862 /* Flush the differences accumulated by the parent to the
863 * rewritten AOF. */
864 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
865 (int)server.aof_child_pid);
866 newfd = open(tmpfile,O_WRONLY|O_APPEND);
867 if (newfd == -1) {
868 redisLog(REDIS_WARNING,
869 "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
870 goto cleanup;
871 }
872
873 nwritten = write(newfd,server.aof_rewrite_buf,sdslen(server.aof_rewrite_buf));
874 if (nwritten != (signed)sdslen(server.aof_rewrite_buf)) {
875 if (nwritten == -1) {
876 redisLog(REDIS_WARNING,
877 "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
878 } else {
879 redisLog(REDIS_WARNING,
880 "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
881 }
882 close(newfd);
883 goto cleanup;
884 }
885
886 redisLog(REDIS_NOTICE,
887 "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);
888
889 /* The only remaining thing to do is to rename the temporary file to
890 * the configured file and switch the file descriptor used to do AOF
891 * writes. We don't want close(2) or rename(2) calls to block the
892 * server on old file deletion.
893 *
894 * There are two possible scenarios:
895 *
896 * 1) AOF is DISABLED and this was a one time rewrite. The temporary
897 * file will be renamed to the configured file. When this file already
898 * exists, it will be unlinked, which may block the server.
899 *
900 * 2) AOF is ENABLED and the rewritten AOF will immediately start
901 * receiving writes. After the temporary file is renamed to the
902 * configured file, the original AOF file descriptor will be closed.
903 * Since this will be the last reference to that file, closing it
904 * causes the underlying file to be unlinked, which may block the
905 * server.
906 *
907 * To mitigate the blocking effect of the unlink operation (either
908 * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
909 * use a background thread to take care of this. First, we
910 * make scenario 1 identical to scenario 2 by opening the target file
911 * when it exists. The unlink operation after the rename(2) will then
912 * be executed upon calling close(2) for its descriptor. Everything to
913 * guarantee atomicity for this switch has already happened by then, so
914 * we don't care what the outcome or duration of that close operation
915 * is, as long as the file descriptor is released again. */
916 if (server.aof_fd == -1) {
917 /* AOF disabled */
918
919 /* Don't care if this fails: oldfd will be -1 and we handle that.
920 * One notable case of -1 return is if the old file does
921 * not exist. */
922 oldfd = open(server.aof_filename,O_RDONLY|O_NONBLOCK);
923 } else {
924 /* AOF enabled */
925 oldfd = -1; /* We'll set this to the current AOF filedes later. */
926 }
927
928 /* Rename the temporary file. This will not unlink the target file if
929 * it exists, because we reference it with "oldfd". */
930 if (rename(tmpfile,server.aof_filename) == -1) {
931 redisLog(REDIS_WARNING,
932 "Error trying to rename the temporary AOF: %s", strerror(errno));
933 close(newfd);
934 if (oldfd != -1) close(oldfd);
935 goto cleanup;
936 }
937
938 if (server.aof_fd == -1) {
939 /* AOF disabled, we don't need to set the AOF file descriptor
940 * to this new file, so we can close it. */
941 close(newfd);
942 } else {
943 /* AOF enabled, replace the old fd with the new one. */
944 oldfd = server.aof_fd;
945 server.aof_fd = newfd;
946 if (server.aof_fsync == AOF_FSYNC_ALWAYS)
947 aof_fsync(newfd);
948 else if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
949 aof_background_fsync(newfd);
950 server.aof_selected_db = -1; /* Make sure SELECT is re-issued */
951 aofUpdateCurrentSize();
952 server.aof_rewrite_base_size = server.aof_current_size;
953
954 /* Clear regular AOF buffer since its contents was just written to
955 * the new AOF from the background rewrite buffer. */
956 sdsfree(server.aof_buf);
957 server.aof_buf = sdsempty();
958 }
959
960 redisLog(REDIS_NOTICE, "Background AOF rewrite successful");
961 /* Change state from WAIT_REWRITE to ON if needed */
962 if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
963 server.aof_state = REDIS_AOF_ON;
964
965 /* Asynchronously close the overwritten AOF. */
966 if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
967
968 redisLog(REDIS_VERBOSE,
969 "Background AOF rewrite signal handler took %lldus", ustime()-now);
970 } else if (!bysignal && exitcode != 0) {
971 redisLog(REDIS_WARNING,
972 "Background AOF rewrite terminated with error");
973 } else {
974 redisLog(REDIS_WARNING,
975 "Background AOF rewrite terminated by signal %d", bysignal);
976 }
977
978 cleanup:
979 sdsfree(server.aof_rewrite_buf);
980 server.aof_rewrite_buf = sdsempty();
981 aofRemoveTempFile(server.aof_child_pid);
982 server.aof_child_pid = -1;
983 /* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */
984 if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
985 server.aof_rewrite_scheduled = 1;
986 }