]> git.saurik.com Git - redis.git/blob - src/replication.c
Test: more MIGRATE tests.
[redis.git] / src / replication.c
1 /* Asynchronous replication implementation.
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31
32 #include "redis.h"
33
34 #include <sys/time.h>
35 #include <unistd.h>
36 #include <fcntl.h>
37 #include <sys/socket.h>
38 #include <sys/stat.h>
39
40 /* ---------------------------------- MASTER -------------------------------- */
41
42 void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
43 listNode *ln;
44 listIter li;
45 int j;
46
47 listRewind(slaves,&li);
48 while((ln = listNext(&li))) {
49 redisClient *slave = ln->value;
50
51 /* Don't feed slaves that are still waiting for BGSAVE to start */
52 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
53
54 /* Feed slaves that are waiting for the initial SYNC (so these commands
55 * are queued in the output buffer until the intial SYNC completes),
56 * or are already in sync with the master. */
57 if (slave->slaveseldb != dictid) {
58 robj *selectcmd;
59
60 if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {
61 selectcmd = shared.select[dictid];
62 incrRefCount(selectcmd);
63 } else {
64 selectcmd = createObject(REDIS_STRING,
65 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
66 }
67 addReply(slave,selectcmd);
68 decrRefCount(selectcmd);
69 slave->slaveseldb = dictid;
70 }
71 addReplyMultiBulkLen(slave,argc);
72 for (j = 0; j < argc; j++) addReplyBulk(slave,argv[j]);
73 }
74 }
75
76 void replicationFeedMonitors(redisClient *c, list *monitors, int dictid, robj **argv, int argc) {
77 listNode *ln;
78 listIter li;
79 int j, port;
80 sds cmdrepr = sdsnew("+");
81 robj *cmdobj;
82 char ip[32];
83 struct timeval tv;
84
85 gettimeofday(&tv,NULL);
86 cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
87 if (c->flags & REDIS_LUA_CLIENT) {
88 cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
89 } else if (c->flags & REDIS_UNIX_SOCKET) {
90 cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
91 } else {
92 anetPeerToString(c->fd,ip,&port);
93 cmdrepr = sdscatprintf(cmdrepr,"[%d %s:%d] ",dictid,ip,port);
94 }
95
96 for (j = 0; j < argc; j++) {
97 if (argv[j]->encoding == REDIS_ENCODING_INT) {
98 cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
99 } else {
100 cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
101 sdslen(argv[j]->ptr));
102 }
103 if (j != argc-1)
104 cmdrepr = sdscatlen(cmdrepr," ",1);
105 }
106 cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
107 cmdobj = createObject(REDIS_STRING,cmdrepr);
108
109 listRewind(monitors,&li);
110 while((ln = listNext(&li))) {
111 redisClient *monitor = ln->value;
112 addReply(monitor,cmdobj);
113 }
114 decrRefCount(cmdobj);
115 }
116
117 void syncCommand(redisClient *c) {
118 /* ignore SYNC if aleady slave or in monitor mode */
119 if (c->flags & REDIS_SLAVE) return;
120
121 /* Refuse SYNC requests if we are a slave but the link with our master
122 * is not ok... */
123 if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED) {
124 addReplyError(c,"Can't SYNC while not connected with my master");
125 return;
126 }
127
128 /* SYNC can't be issued when the server has pending data to send to
129 * the client about already issued commands. We need a fresh reply
130 * buffer registering the differences between the BGSAVE and the current
131 * dataset, so that we can copy to other slaves if needed. */
132 if (listLength(c->reply) != 0) {
133 addReplyError(c,"SYNC is invalid with pending input");
134 return;
135 }
136
137 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
138 /* Here we need to check if there is a background saving operation
139 * in progress, or if it is required to start one */
140 if (server.rdb_child_pid != -1) {
141 /* Ok a background save is in progress. Let's check if it is a good
142 * one for replication, i.e. if there is another slave that is
143 * registering differences since the server forked to save */
144 redisClient *slave;
145 listNode *ln;
146 listIter li;
147
148 listRewind(server.slaves,&li);
149 while((ln = listNext(&li))) {
150 slave = ln->value;
151 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
152 }
153 if (ln) {
154 /* Perfect, the server is already registering differences for
155 * another slave. Set the right state, and copy the buffer. */
156 copyClientOutputBuffer(c,slave);
157 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
158 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
159 } else {
160 /* No way, we need to wait for the next BGSAVE in order to
161 * register differences */
162 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
163 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
164 }
165 } else {
166 /* Ok we don't have a BGSAVE in progress, let's start one */
167 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
168 if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
169 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
170 addReplyError(c,"Unable to perform background save");
171 return;
172 }
173 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
174 }
175 c->repldbfd = -1;
176 c->flags |= REDIS_SLAVE;
177 c->slaveseldb = 0;
178 listAddNodeTail(server.slaves,c);
179 return;
180 }
181
182 /* REPLCONF <option> <value> <option> <value> ...
183 * This command is used by a slave in order to configure the replication
184 * process before starting it with the SYNC command.
185 *
186 * Currently the only use of this command is to communicate to the master
187 * what is the listening port of the Slave redis instance, so that the
188 * master can accurately list slaves and their listening ports in
189 * the INFO output.
190 *
191 * In the future the same command can be used in order to configure
192 * the replication to initiate an incremental replication instead of a
193 * full resync. */
194 void replconfCommand(redisClient *c) {
195 int j;
196
197 if ((c->argc % 2) == 0) {
198 /* Number of arguments must be odd to make sure that every
199 * option has a corresponding value. */
200 addReply(c,shared.syntaxerr);
201 return;
202 }
203
204 /* Process every option-value pair. */
205 for (j = 1; j < c->argc; j+=2) {
206 if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
207 long port;
208
209 if ((getLongFromObjectOrReply(c,c->argv[j+1],
210 &port,NULL) != REDIS_OK))
211 return;
212 c->slave_listening_port = port;
213 } else {
214 addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
215 (char*)c->argv[j]->ptr);
216 return;
217 }
218 }
219 addReply(c,shared.ok);
220 }
221
222 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
223 redisClient *slave = privdata;
224 REDIS_NOTUSED(el);
225 REDIS_NOTUSED(mask);
226 char buf[REDIS_IOBUF_LEN];
227 ssize_t nwritten, buflen;
228
229 if (slave->repldboff == 0) {
230 /* Write the bulk write count before to transfer the DB. In theory here
231 * we don't know how much room there is in the output buffer of the
232 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
233 * operations) will never be smaller than the few bytes we need. */
234 sds bulkcount;
235
236 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
237 slave->repldbsize);
238 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
239 {
240 sdsfree(bulkcount);
241 freeClient(slave);
242 return;
243 }
244 sdsfree(bulkcount);
245 }
246 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
247 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
248 if (buflen <= 0) {
249 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
250 (buflen == 0) ? "premature EOF" : strerror(errno));
251 freeClient(slave);
252 return;
253 }
254 if ((nwritten = write(fd,buf,buflen)) == -1) {
255 redisLog(REDIS_VERBOSE,"Write error sending DB to slave: %s",
256 strerror(errno));
257 freeClient(slave);
258 return;
259 }
260 slave->repldboff += nwritten;
261 if (slave->repldboff == slave->repldbsize) {
262 close(slave->repldbfd);
263 slave->repldbfd = -1;
264 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
265 slave->replstate = REDIS_REPL_ONLINE;
266 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
267 sendReplyToClient, slave) == AE_ERR) {
268 freeClient(slave);
269 return;
270 }
271 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
272 }
273 }
274
275 /* This function is called at the end of every backgrond saving.
276 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
277 * otherwise REDIS_ERR is passed to the function.
278 *
279 * The goal of this function is to handle slaves waiting for a successful
280 * background saving in order to perform non-blocking synchronization. */
281 void updateSlavesWaitingBgsave(int bgsaveerr) {
282 listNode *ln;
283 int startbgsave = 0;
284 listIter li;
285
286 listRewind(server.slaves,&li);
287 while((ln = listNext(&li))) {
288 redisClient *slave = ln->value;
289
290 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
291 startbgsave = 1;
292 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
293 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
294 struct redis_stat buf;
295
296 if (bgsaveerr != REDIS_OK) {
297 freeClient(slave);
298 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
299 continue;
300 }
301 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
302 redis_fstat(slave->repldbfd,&buf) == -1) {
303 freeClient(slave);
304 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
305 continue;
306 }
307 slave->repldboff = 0;
308 slave->repldbsize = buf.st_size;
309 slave->replstate = REDIS_REPL_SEND_BULK;
310 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
311 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
312 freeClient(slave);
313 continue;
314 }
315 }
316 }
317 if (startbgsave) {
318 if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {
319 listIter li;
320
321 listRewind(server.slaves,&li);
322 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
323 while((ln = listNext(&li))) {
324 redisClient *slave = ln->value;
325
326 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
327 freeClient(slave);
328 }
329 }
330 }
331 }
332
333 /* ----------------------------------- SLAVE -------------------------------- */
334
335 /* Abort the async download of the bulk dataset while SYNC-ing with master */
336 void replicationAbortSyncTransfer(void) {
337 redisAssert(server.repl_state == REDIS_REPL_TRANSFER);
338
339 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
340 close(server.repl_transfer_s);
341 close(server.repl_transfer_fd);
342 unlink(server.repl_transfer_tmpfile);
343 zfree(server.repl_transfer_tmpfile);
344 server.repl_state = REDIS_REPL_CONNECT;
345 }
346
347 /* Asynchronously read the SYNC payload we receive from a master */
348 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
349 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
350 char buf[4096];
351 ssize_t nread, readlen;
352 off_t left;
353 REDIS_NOTUSED(el);
354 REDIS_NOTUSED(privdata);
355 REDIS_NOTUSED(mask);
356
357 /* If repl_transfer_size == -1 we still have to read the bulk length
358 * from the master reply. */
359 if (server.repl_transfer_size == -1) {
360 if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
361 redisLog(REDIS_WARNING,
362 "I/O error reading bulk count from MASTER: %s",
363 strerror(errno));
364 goto error;
365 }
366
367 if (buf[0] == '-') {
368 redisLog(REDIS_WARNING,
369 "MASTER aborted replication with an error: %s",
370 buf+1);
371 goto error;
372 } else if (buf[0] == '\0') {
373 /* At this stage just a newline works as a PING in order to take
374 * the connection live. So we refresh our last interaction
375 * timestamp. */
376 server.repl_transfer_lastio = server.unixtime;
377 return;
378 } else if (buf[0] != '$') {
379 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
380 goto error;
381 }
382 server.repl_transfer_size = strtol(buf+1,NULL,10);
383 redisLog(REDIS_NOTICE,
384 "MASTER <-> SLAVE sync: receiving %ld bytes from master",
385 server.repl_transfer_size);
386 return;
387 }
388
389 /* Read bulk data */
390 left = server.repl_transfer_size - server.repl_transfer_read;
391 readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
392 nread = read(fd,buf,readlen);
393 if (nread <= 0) {
394 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
395 (nread == -1) ? strerror(errno) : "connection lost");
396 replicationAbortSyncTransfer();
397 return;
398 }
399 server.repl_transfer_lastio = server.unixtime;
400 if (write(server.repl_transfer_fd,buf,nread) != nread) {
401 redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
402 goto error;
403 }
404 server.repl_transfer_read += nread;
405
406 /* Sync data on disk from time to time, otherwise at the end of the transfer
407 * we may suffer a big delay as the memory buffers are copied into the
408 * actual disk. */
409 if (server.repl_transfer_read >=
410 server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
411 {
412 off_t sync_size = server.repl_transfer_read -
413 server.repl_transfer_last_fsync_off;
414 rdb_fsync_range(server.repl_transfer_fd,
415 server.repl_transfer_last_fsync_off, sync_size);
416 server.repl_transfer_last_fsync_off += sync_size;
417 }
418
419 /* Check if the transfer is now complete */
420 if (server.repl_transfer_read == server.repl_transfer_size) {
421 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
422 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
423 replicationAbortSyncTransfer();
424 return;
425 }
426 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
427 emptyDb();
428 /* Before loading the DB into memory we need to delete the readable
429 * handler, otherwise it will get called recursively since
430 * rdbLoad() will call the event loop to process events from time to
431 * time for non blocking loading. */
432 aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
433 if (rdbLoad(server.rdb_filename) != REDIS_OK) {
434 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
435 replicationAbortSyncTransfer();
436 return;
437 }
438 /* Final setup of the connected slave <- master link */
439 zfree(server.repl_transfer_tmpfile);
440 close(server.repl_transfer_fd);
441 server.master = createClient(server.repl_transfer_s);
442 server.master->flags |= REDIS_MASTER;
443 server.master->authenticated = 1;
444 server.repl_state = REDIS_REPL_CONNECTED;
445 redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
446 /* Restart the AOF subsystem now that we finished the sync. This
447 * will trigger an AOF rewrite, and when done will start appending
448 * to the new file. */
449 if (server.aof_state != REDIS_AOF_OFF) {
450 int retry = 10;
451
452 stopAppendOnly();
453 while (retry-- && startAppendOnly() == REDIS_ERR) {
454 redisLog(REDIS_WARNING,"Failed enabling the AOF after successful master synchrnization! Trying it again in one second.");
455 sleep(1);
456 }
457 if (!retry) {
458 redisLog(REDIS_WARNING,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
459 exit(1);
460 }
461 }
462 }
463
464 return;
465
466 error:
467 replicationAbortSyncTransfer();
468 return;
469 }
470
471 /* Send a synchronous command to the master. Used to send AUTH and
472 * REPLCONF commands before starting the replication with SYNC.
473 *
474 * On success NULL is returned.
475 * On error an sds string describing the error is returned.
476 */
477 char *sendSynchronousCommand(int fd, ...) {
478 va_list ap;
479 sds cmd = sdsempty();
480 char *arg, buf[256];
481
482 /* Create the command to send to the master, we use simple inline
483 * protocol for simplicity as currently we only send simple strings. */
484 va_start(ap,fd);
485 while(1) {
486 arg = va_arg(ap, char*);
487 if (arg == NULL) break;
488
489 if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
490 cmd = sdscat(cmd,arg);
491 }
492 cmd = sdscatlen(cmd,"\r\n",2);
493
494 /* Transfer command to the server. */
495 if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
496 sdsfree(cmd);
497 return sdscatprintf(sdsempty(),"Writing to master: %s",
498 strerror(errno));
499 }
500 sdsfree(cmd);
501
502 /* Read the reply from the server. */
503 if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
504 {
505 return sdscatprintf(sdsempty(),"Reading from master: %s",
506 strerror(errno));
507 }
508
509 /* Check for errors from the server. */
510 if (buf[0] != '+') {
511 return sdscatprintf(sdsempty(),"Error from master: %s", buf);
512 }
513
514 return NULL; /* No errors. */
515 }
516
517 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
518 char tmpfile[256], *err;
519 int dfd, maxtries = 5;
520 int sockerr = 0;
521 socklen_t errlen = sizeof(sockerr);
522 REDIS_NOTUSED(el);
523 REDIS_NOTUSED(privdata);
524 REDIS_NOTUSED(mask);
525
526 /* If this event fired after the user turned the instance into a master
527 * with SLAVEOF NO ONE we must just return ASAP. */
528 if (server.repl_state == REDIS_REPL_NONE) {
529 close(fd);
530 return;
531 }
532
533 /* Check for errors in the socket. */
534 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
535 sockerr = errno;
536 if (sockerr) {
537 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
538 redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
539 strerror(sockerr));
540 goto error;
541 }
542
543 /* If we were connecting, it's time to send a non blocking PING, we want to
544 * make sure the master is able to reply before going into the actual
545 * replication process where we have long timeouts in the order of
546 * seconds (in the meantime the slave would block). */
547 if (server.repl_state == REDIS_REPL_CONNECTING) {
548 redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
549 /* Delete the writable event so that the readable event remains
550 * registered and we can wait for the PONG reply. */
551 aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
552 server.repl_state = REDIS_REPL_RECEIVE_PONG;
553 /* Send the PING, don't check for errors at all, we have the timeout
554 * that will take care about this. */
555 syncWrite(fd,"PING\r\n",6,100);
556 return;
557 }
558
559 /* Receive the PONG command. */
560 if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
561 char buf[1024];
562
563 /* Delete the readable event, we no longer need it now that there is
564 * the PING reply to read. */
565 aeDeleteFileEvent(server.el,fd,AE_READABLE);
566
567 /* Read the reply with explicit timeout. */
568 buf[0] = '\0';
569 if (syncReadLine(fd,buf,sizeof(buf),
570 server.repl_syncio_timeout*1000) == -1)
571 {
572 redisLog(REDIS_WARNING,
573 "I/O error reading PING reply from master: %s",
574 strerror(errno));
575 goto error;
576 }
577
578 /* We don't care about the reply, it can be +PONG or an error since
579 * the server requires AUTH. As long as it replies correctly, it's
580 * fine from our point of view. */
581 if (buf[0] != '-' && buf[0] != '+') {
582 redisLog(REDIS_WARNING,"Unexpected reply to PING from master.");
583 goto error;
584 } else {
585 redisLog(REDIS_NOTICE,
586 "Master replied to PING, replication can continue...");
587 }
588 }
589
590 /* AUTH with the master if required. */
591 if(server.masterauth) {
592 err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
593 if (err) {
594 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
595 sdsfree(err);
596 goto error;
597 }
598 }
599
600 /* Set the slave port, so that Master's INFO command can list the
601 * slave listening port correctly. */
602 {
603 sds port = sdsfromlonglong(server.port);
604 err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
605 NULL);
606 sdsfree(port);
607 /* Ignore the error if any, not all the Redis versions support
608 * REPLCONF listening-port. */
609 if (err) {
610 redisLog(REDIS_NOTICE,"(non critical): Master does not understand REPLCONF listening-port: %s", err);
611 sdsfree(err);
612 }
613 }
614
615 /* Issue the SYNC command */
616 if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
617 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
618 strerror(errno));
619 goto error;
620 }
621
622 /* Prepare a suitable temp file for bulk transfer */
623 while(maxtries--) {
624 snprintf(tmpfile,256,
625 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
626 dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
627 if (dfd != -1) break;
628 sleep(1);
629 }
630 if (dfd == -1) {
631 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
632 goto error;
633 }
634
635 /* Setup the non blocking download of the bulk file. */
636 if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)
637 == AE_ERR)
638 {
639 redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
640 goto error;
641 }
642
643 server.repl_state = REDIS_REPL_TRANSFER;
644 server.repl_transfer_size = -1;
645 server.repl_transfer_read = 0;
646 server.repl_transfer_last_fsync_off = 0;
647 server.repl_transfer_fd = dfd;
648 server.repl_transfer_lastio = server.unixtime;
649 server.repl_transfer_tmpfile = zstrdup(tmpfile);
650 return;
651
652 error:
653 close(fd);
654 server.repl_transfer_s = -1;
655 server.repl_state = REDIS_REPL_CONNECT;
656 return;
657 }
658
659 int connectWithMaster(void) {
660 int fd;
661
662 fd = anetTcpNonBlockConnect(NULL,server.masterhost,server.masterport);
663 if (fd == -1) {
664 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
665 strerror(errno));
666 return REDIS_ERR;
667 }
668
669 if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
670 AE_ERR)
671 {
672 close(fd);
673 redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
674 return REDIS_ERR;
675 }
676
677 server.repl_transfer_lastio = server.unixtime;
678 server.repl_transfer_s = fd;
679 server.repl_state = REDIS_REPL_CONNECTING;
680 return REDIS_OK;
681 }
682
683 /* This function can be called when a non blocking connection is currently
684 * in progress to undo it. */
685 void undoConnectWithMaster(void) {
686 int fd = server.repl_transfer_s;
687
688 redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
689 server.repl_state == REDIS_REPL_RECEIVE_PONG);
690 aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
691 close(fd);
692 server.repl_transfer_s = -1;
693 server.repl_state = REDIS_REPL_CONNECT;
694 }
695
696 void slaveofCommand(redisClient *c) {
697 if (!strcasecmp(c->argv[1]->ptr,"no") &&
698 !strcasecmp(c->argv[2]->ptr,"one")) {
699 if (server.masterhost) {
700 sdsfree(server.masterhost);
701 server.masterhost = NULL;
702 if (server.master) freeClient(server.master);
703 if (server.repl_state == REDIS_REPL_TRANSFER)
704 replicationAbortSyncTransfer();
705 else if (server.repl_state == REDIS_REPL_CONNECTING ||
706 server.repl_state == REDIS_REPL_RECEIVE_PONG)
707 undoConnectWithMaster();
708 server.repl_state = REDIS_REPL_NONE;
709 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
710 }
711 } else {
712 long port;
713
714 if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))
715 return;
716
717 /* Check if we are already attached to the specified slave */
718 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
719 && server.masterport == port) {
720 redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
721 addReplySds(c,sdsnew("+OK Already connected to specified master\r\n"));
722 return;
723 }
724 /* There was no previous master or the user specified a different one,
725 * we can continue. */
726 sdsfree(server.masterhost);
727 server.masterhost = sdsdup(c->argv[1]->ptr);
728 server.masterport = port;
729 if (server.master) freeClient(server.master);
730 disconnectSlaves(); /* Force our slaves to resync with us as well. */
731 if (server.repl_state == REDIS_REPL_TRANSFER)
732 replicationAbortSyncTransfer();
733 server.repl_state = REDIS_REPL_CONNECT;
734 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
735 server.masterhost, server.masterport);
736 }
737 addReply(c,shared.ok);
738 }
739
740 /* --------------------------- REPLICATION CRON ---------------------------- */
741
742 void replicationCron(void) {
743 /* Non blocking connection timeout? */
744 if (server.masterhost &&
745 (server.repl_state == REDIS_REPL_CONNECTING ||
746 server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
747 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
748 {
749 redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
750 undoConnectWithMaster();
751 }
752
753 /* Bulk transfer I/O timeout? */
754 if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER &&
755 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
756 {
757 redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
758 replicationAbortSyncTransfer();
759 }
760
761 /* Timed out master when we are an already connected slave? */
762 if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTED &&
763 (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
764 {
765 redisLog(REDIS_WARNING,"MASTER time out: no data nor PING received...");
766 freeClient(server.master);
767 }
768
769 /* Check if we should connect to a MASTER */
770 if (server.repl_state == REDIS_REPL_CONNECT) {
771 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
772 if (connectWithMaster() == REDIS_OK) {
773 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync started");
774 }
775 }
776
777 /* If we have attached slaves, PING them from time to time.
778 * So slaves can implement an explicit timeout to masters, and will
779 * be able to detect a link disconnection even if the TCP connection
780 * will not actually go down. */
781 if (!(server.cronloops % (server.repl_ping_slave_period * REDIS_HZ))) {
782 listIter li;
783 listNode *ln;
784
785 listRewind(server.slaves,&li);
786 while((ln = listNext(&li))) {
787 redisClient *slave = ln->value;
788
789 /* Don't ping slaves that are in the middle of a bulk transfer
790 * with the master for first synchronization. */
791 if (slave->replstate == REDIS_REPL_SEND_BULK) continue;
792 if (slave->replstate == REDIS_REPL_ONLINE) {
793 /* If the slave is online send a normal ping */
794 addReplySds(slave,sdsnew("*1\r\n$4\r\nPING\r\n"));
795 } else {
796 /* Otherwise we are in the pre-synchronization stage.
797 * Just a newline will do the work of refreshing the
798 * connection last interaction time, and at the same time
799 * we'll be sure that being a single char there are no
800 * short-write problems. */
801 if (write(slave->fd, "\n", 1) == -1) {
802 /* Don't worry, it's just a ping. */
803 }
804 }
805 }
806 }
807 }