6 #include <sys/socket.h>
9 /* ---------------------------------- MASTER -------------------------------- */
11 void replicationFeedSlaves(list
*slaves
, int dictid
, robj
**argv
, int argc
) {
16 listRewind(slaves
,&li
);
17 while((ln
= listNext(&li
))) {
18 redisClient
*slave
= ln
->value
;
20 /* Don't feed slaves that are still waiting for BGSAVE to start */
21 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) continue;
23 /* Feed slaves that are waiting for the initial SYNC (so these commands
24 * are queued in the output buffer until the intial SYNC completes),
25 * or are already in sync with the master. */
26 if (slave
->slaveseldb
!= dictid
) {
29 if (dictid
>= 0 && dictid
< REDIS_SHARED_SELECT_CMDS
) {
30 selectcmd
= shared
.select
[dictid
];
31 incrRefCount(selectcmd
);
33 selectcmd
= createObject(REDIS_STRING
,
34 sdscatprintf(sdsempty(),"select %d\r\n",dictid
));
36 addReply(slave
,selectcmd
);
37 decrRefCount(selectcmd
);
38 slave
->slaveseldb
= dictid
;
40 addReplyMultiBulkLen(slave
,argc
);
41 for (j
= 0; j
< argc
; j
++) addReplyBulk(slave
,argv
[j
]);
45 void replicationFeedMonitors(redisClient
*c
, list
*monitors
, int dictid
, robj
**argv
, int argc
) {
49 sds cmdrepr
= sdsnew("+");
54 gettimeofday(&tv
,NULL
);
55 cmdrepr
= sdscatprintf(cmdrepr
,"%ld.%06ld ",(long)tv
.tv_sec
,(long)tv
.tv_usec
);
56 if (c
->flags
& REDIS_LUA_CLIENT
) {
57 cmdrepr
= sdscatprintf(cmdrepr
,"[%d lua] ",dictid
);
58 } else if (c
->flags
& REDIS_UNIX_SOCKET
) {
59 cmdrepr
= sdscatprintf(cmdrepr
,"[%d unix:%s] ",dictid
,server
.unixsocket
);
61 anetPeerToString(c
->fd
,ip
,&port
);
62 cmdrepr
= sdscatprintf(cmdrepr
,"[%d %s:%d] ",dictid
,ip
,port
);
65 for (j
= 0; j
< argc
; j
++) {
66 if (argv
[j
]->encoding
== REDIS_ENCODING_INT
) {
67 cmdrepr
= sdscatprintf(cmdrepr
, "\"%ld\"", (long)argv
[j
]->ptr
);
69 cmdrepr
= sdscatrepr(cmdrepr
,(char*)argv
[j
]->ptr
,
70 sdslen(argv
[j
]->ptr
));
73 cmdrepr
= sdscatlen(cmdrepr
," ",1);
75 cmdrepr
= sdscatlen(cmdrepr
,"\r\n",2);
76 cmdobj
= createObject(REDIS_STRING
,cmdrepr
);
78 listRewind(monitors
,&li
);
79 while((ln
= listNext(&li
))) {
80 redisClient
*monitor
= ln
->value
;
81 addReply(monitor
,cmdobj
);
86 void syncCommand(redisClient
*c
) {
87 /* ignore SYNC if aleady slave or in monitor mode */
88 if (c
->flags
& REDIS_SLAVE
) return;
90 /* Refuse SYNC requests if we are a slave but the link with our master
92 if (server
.masterhost
&& server
.repl_state
!= REDIS_REPL_CONNECTED
) {
93 addReplyError(c
,"Can't SYNC while not connected with my master");
97 /* SYNC can't be issued when the server has pending data to send to
98 * the client about already issued commands. We need a fresh reply
99 * buffer registering the differences between the BGSAVE and the current
100 * dataset, so that we can copy to other slaves if needed. */
101 if (listLength(c
->reply
) != 0) {
102 addReplyError(c
,"SYNC is invalid with pending input");
106 redisLog(REDIS_NOTICE
,"Slave ask for synchronization");
107 /* Here we need to check if there is a background saving operation
108 * in progress, or if it is required to start one */
109 if (server
.rdb_child_pid
!= -1) {
110 /* Ok a background save is in progress. Let's check if it is a good
111 * one for replication, i.e. if there is another slave that is
112 * registering differences since the server forked to save */
117 listRewind(server
.slaves
,&li
);
118 while((ln
= listNext(&li
))) {
120 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) break;
123 /* Perfect, the server is already registering differences for
124 * another slave. Set the right state, and copy the buffer. */
125 copyClientOutputBuffer(c
,slave
);
126 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
127 redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC");
129 /* No way, we need to wait for the next BGSAVE in order to
130 * register differences */
131 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_START
;
132 redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC");
135 /* Ok we don't have a BGSAVE in progress, let's start one */
136 redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC");
137 if (rdbSaveBackground(server
.rdb_filename
) != REDIS_OK
) {
138 redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE");
139 addReplyError(c
,"Unable to perform background save");
142 c
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
145 c
->flags
|= REDIS_SLAVE
;
147 listAddNodeTail(server
.slaves
,c
);
151 /* REPLCONF <option> <value> <option> <value> ...
152 * This command is used by a slave in order to configure the replication
153 * process before starting it with the SYNC command.
155 * Currently the only use of this command is to communicate to the master
156 * what is the listening port of the Slave redis instance, so that the
157 * master can accurately list slaves and their listening ports in
160 * In the future the same command can be used in order to configure
161 * the replication to initiate an incremental replication instead of a
163 void replconfCommand(redisClient
*c
) {
166 if ((c
->argc
% 2) == 0) {
167 /* Number of arguments must be odd to make sure that every
168 * option has a corresponding value. */
169 addReply(c
,shared
.syntaxerr
);
173 /* Process every option-value pair. */
174 for (j
= 1; j
< c
->argc
; j
+=2) {
175 if (!strcasecmp(c
->argv
[j
]->ptr
,"listening-port")) {
178 if ((getLongFromObjectOrReply(c
,c
->argv
[j
+1],
179 &port
,NULL
) != REDIS_OK
))
181 c
->slave_listening_port
= port
;
183 addReplyErrorFormat(c
,"Unrecognized REPLCONF option: %s",
184 (char*)c
->argv
[j
]->ptr
);
188 addReply(c
,shared
.ok
);
191 void sendBulkToSlave(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
192 redisClient
*slave
= privdata
;
195 char buf
[REDIS_IOBUF_LEN
];
196 ssize_t nwritten
, buflen
;
198 if (slave
->repldboff
== 0) {
199 /* Write the bulk write count before to transfer the DB. In theory here
200 * we don't know how much room there is in the output buffer of the
201 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
202 * operations) will never be smaller than the few bytes we need. */
205 bulkcount
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
207 if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
))
215 lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
);
216 buflen
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
);
218 redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s",
219 (buflen
== 0) ? "premature EOF" : strerror(errno
));
223 if ((nwritten
= write(fd
,buf
,buflen
)) == -1) {
224 redisLog(REDIS_VERBOSE
,"Write error sending DB to slave: %s",
229 slave
->repldboff
+= nwritten
;
230 if (slave
->repldboff
== slave
->repldbsize
) {
231 close(slave
->repldbfd
);
232 slave
->repldbfd
= -1;
233 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
234 slave
->replstate
= REDIS_REPL_ONLINE
;
235 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
,
236 sendReplyToClient
, slave
) == AE_ERR
) {
240 redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded");
244 /* This function is called at the end of every backgrond saving.
245 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
246 * otherwise REDIS_ERR is passed to the function.
248 * The goal of this function is to handle slaves waiting for a successful
249 * background saving in order to perform non-blocking synchronization. */
250 void updateSlavesWaitingBgsave(int bgsaveerr
) {
255 listRewind(server
.slaves
,&li
);
256 while((ln
= listNext(&li
))) {
257 redisClient
*slave
= ln
->value
;
259 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
) {
261 slave
->replstate
= REDIS_REPL_WAIT_BGSAVE_END
;
262 } else if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_END
) {
263 struct redis_stat buf
;
265 if (bgsaveerr
!= REDIS_OK
) {
267 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error");
270 if ((slave
->repldbfd
= open(server
.rdb_filename
,O_RDONLY
)) == -1 ||
271 redis_fstat(slave
->repldbfd
,&buf
) == -1) {
273 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
));
276 slave
->repldboff
= 0;
277 slave
->repldbsize
= buf
.st_size
;
278 slave
->replstate
= REDIS_REPL_SEND_BULK
;
279 aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
);
280 if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) {
287 if (rdbSaveBackground(server
.rdb_filename
) != REDIS_OK
) {
290 listRewind(server
.slaves
,&li
);
291 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed");
292 while((ln
= listNext(&li
))) {
293 redisClient
*slave
= ln
->value
;
295 if (slave
->replstate
== REDIS_REPL_WAIT_BGSAVE_START
)
302 /* ----------------------------------- SLAVE -------------------------------- */
304 /* Abort the async download of the bulk dataset while SYNC-ing with master */
305 void replicationAbortSyncTransfer(void) {
306 redisAssert(server
.repl_state
== REDIS_REPL_TRANSFER
);
308 aeDeleteFileEvent(server
.el
,server
.repl_transfer_s
,AE_READABLE
);
309 close(server
.repl_transfer_s
);
310 close(server
.repl_transfer_fd
);
311 unlink(server
.repl_transfer_tmpfile
);
312 zfree(server
.repl_transfer_tmpfile
);
313 server
.repl_state
= REDIS_REPL_CONNECT
;
316 /* Asynchronously read the SYNC payload we receive from a master */
317 #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
318 void readSyncBulkPayload(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
320 ssize_t nread
, readlen
;
323 REDIS_NOTUSED(privdata
);
326 /* If repl_transfer_size == -1 we still have to read the bulk length
327 * from the master reply. */
328 if (server
.repl_transfer_size
== -1) {
329 if (syncReadLine(fd
,buf
,1024,server
.repl_syncio_timeout
*1000) == -1) {
330 redisLog(REDIS_WARNING
,
331 "I/O error reading bulk count from MASTER: %s",
337 redisLog(REDIS_WARNING
,
338 "MASTER aborted replication with an error: %s",
341 } else if (buf
[0] == '\0') {
342 /* At this stage just a newline works as a PING in order to take
343 * the connection live. So we refresh our last interaction
345 server
.repl_transfer_lastio
= server
.unixtime
;
347 } else if (buf
[0] != '$') {
348 redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
351 server
.repl_transfer_size
= strtol(buf
+1,NULL
,10);
352 redisLog(REDIS_NOTICE
,
353 "MASTER <-> SLAVE sync: receiving %ld bytes from master",
354 server
.repl_transfer_size
);
359 left
= server
.repl_transfer_size
- server
.repl_transfer_read
;
360 readlen
= (left
< (signed)sizeof(buf
)) ? left
: (signed)sizeof(buf
);
361 nread
= read(fd
,buf
,readlen
);
363 redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s",
364 (nread
== -1) ? strerror(errno
) : "connection lost");
365 replicationAbortSyncTransfer();
368 server
.repl_transfer_lastio
= server
.unixtime
;
369 if (write(server
.repl_transfer_fd
,buf
,nread
) != nread
) {
370 redisLog(REDIS_WARNING
,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno
));
373 server
.repl_transfer_read
+= nread
;
375 /* Sync data on disk from time to time, otherwise at the end of the transfer
376 * we may suffer a big delay as the memory buffers are copied into the
378 if (server
.repl_transfer_read
>=
379 server
.repl_transfer_last_fsync_off
+ REPL_MAX_WRITTEN_BEFORE_FSYNC
)
381 off_t sync_size
= server
.repl_transfer_read
-
382 server
.repl_transfer_last_fsync_off
;
383 rdb_fsync_range(server
.repl_transfer_fd
,
384 server
.repl_transfer_last_fsync_off
, sync_size
);
385 server
.repl_transfer_last_fsync_off
+= sync_size
;
388 /* Check if the transfer is now complete */
389 if (server
.repl_transfer_read
== server
.repl_transfer_size
) {
390 if (rename(server
.repl_transfer_tmpfile
,server
.rdb_filename
) == -1) {
391 redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
));
392 replicationAbortSyncTransfer();
395 redisLog(REDIS_NOTICE
, "MASTER <-> SLAVE sync: Loading DB in memory");
397 /* Before loading the DB into memory we need to delete the readable
398 * handler, otherwise it will get called recursively since
399 * rdbLoad() will call the event loop to process events from time to
400 * time for non blocking loading. */
401 aeDeleteFileEvent(server
.el
,server
.repl_transfer_s
,AE_READABLE
);
402 if (rdbLoad(server
.rdb_filename
) != REDIS_OK
) {
403 redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk");
404 replicationAbortSyncTransfer();
407 /* Final setup of the connected slave <- master link */
408 zfree(server
.repl_transfer_tmpfile
);
409 close(server
.repl_transfer_fd
);
410 server
.master
= createClient(server
.repl_transfer_s
);
411 server
.master
->flags
|= REDIS_MASTER
;
412 server
.master
->authenticated
= 1;
413 server
.repl_state
= REDIS_REPL_CONNECTED
;
414 redisLog(REDIS_NOTICE
, "MASTER <-> SLAVE sync: Finished with success");
415 /* Restart the AOF subsystem now that we finished the sync. This
416 * will trigger an AOF rewrite, and when done will start appending
417 * to the new file. */
418 if (server
.aof_state
!= REDIS_AOF_OFF
) {
422 while (retry
-- && startAppendOnly() == REDIS_ERR
) {
423 redisLog(REDIS_WARNING
,"Failed enabling the AOF after successful master synchrnization! Trying it again in one second.");
427 redisLog(REDIS_WARNING
,"FATAL: this slave instance finished the synchronization with its master, but the AOF can't be turned on. Exiting now.");
436 replicationAbortSyncTransfer();
440 /* Send a synchronous command to the master. Used to send AUTH and
441 * REPLCONF commands before starting the replication with SYNC.
443 * On success NULL is returned.
444 * On error an sds string describing the error is returned.
446 char *sendSynchronousCommand(int fd
, ...) {
448 sds cmd
= sdsempty();
451 /* Create the command to send to the master, we use simple inline
452 * protocol for simplicity as currently we only send simple strings. */
455 arg
= va_arg(ap
, char*);
456 if (arg
== NULL
) break;
458 if (sdslen(cmd
) != 0) cmd
= sdscatlen(cmd
," ",1);
459 cmd
= sdscat(cmd
,arg
);
461 cmd
= sdscatlen(cmd
,"\r\n",2);
463 /* Transfer command to the server. */
464 if (syncWrite(fd
,cmd
,sdslen(cmd
),server
.repl_syncio_timeout
*1000) == -1) {
466 return sdscatprintf(sdsempty(),"Writing to master: %s",
471 /* Read the reply from the server. */
472 if (syncReadLine(fd
,buf
,sizeof(buf
),server
.repl_syncio_timeout
*1000) == -1)
474 return sdscatprintf(sdsempty(),"Reading from master: %s",
478 /* Check for errors from the server. */
480 return sdscatprintf(sdsempty(),"Error from master: %s", buf
);
483 return NULL
; /* No errors. */
486 void syncWithMaster(aeEventLoop
*el
, int fd
, void *privdata
, int mask
) {
487 char tmpfile
[256], *err
;
488 int dfd
, maxtries
= 5;
490 socklen_t errlen
= sizeof(sockerr
);
492 REDIS_NOTUSED(privdata
);
495 /* If this event fired after the user turned the instance into a master
496 * with SLAVEOF NO ONE we must just return ASAP. */
497 if (server
.repl_state
== REDIS_REPL_NONE
) {
502 /* Check for errors in the socket. */
503 if (getsockopt(fd
, SOL_SOCKET
, SO_ERROR
, &sockerr
, &errlen
) == -1)
506 aeDeleteFileEvent(server
.el
,fd
,AE_READABLE
|AE_WRITABLE
);
507 redisLog(REDIS_WARNING
,"Error condition on socket for SYNC: %s",
512 /* If we were connecting, it's time to send a non blocking PING, we want to
513 * make sure the master is able to reply before going into the actual
514 * replication process where we have long timeouts in the order of
515 * seconds (in the meantime the slave would block). */
516 if (server
.repl_state
== REDIS_REPL_CONNECTING
) {
517 redisLog(REDIS_NOTICE
,"Non blocking connect for SYNC fired the event.");
518 /* Delete the writable event so that the readable event remains
519 * registered and we can wait for the PONG reply. */
520 aeDeleteFileEvent(server
.el
,fd
,AE_WRITABLE
);
521 server
.repl_state
= REDIS_REPL_RECEIVE_PONG
;
522 /* Send the PING, don't check for errors at all, we have the timeout
523 * that will take care about this. */
524 syncWrite(fd
,"PING\r\n",6,100);
528 /* Receive the PONG command. */
529 if (server
.repl_state
== REDIS_REPL_RECEIVE_PONG
) {
532 /* Delete the readable event, we no longer need it now that there is
533 * the PING reply to read. */
534 aeDeleteFileEvent(server
.el
,fd
,AE_READABLE
);
536 /* Read the reply with explicit timeout. */
538 if (syncReadLine(fd
,buf
,sizeof(buf
),
539 server
.repl_syncio_timeout
*1000) == -1)
541 redisLog(REDIS_WARNING
,
542 "I/O error reading PING reply from master: %s",
547 /* We don't care about the reply, it can be +PONG or an error since
548 * the server requires AUTH. As long as it replies correctly, it's
549 * fine from our point of view. */
550 if (buf
[0] != '-' && buf
[0] != '+') {
551 redisLog(REDIS_WARNING
,"Unexpected reply to PING from master.");
554 redisLog(REDIS_NOTICE
,
555 "Master replied to PING, replication can continue...");
559 /* AUTH with the master if required. */
560 if(server
.masterauth
) {
561 err
= sendSynchronousCommand(fd
,"AUTH",server
.masterauth
,NULL
);
563 redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s",err
);
569 /* Set the slave port, so that Master's INFO command can list the
570 * slave listening port correctly. */
572 sds port
= sdsfromlonglong(server
.port
);
573 err
= sendSynchronousCommand(fd
,"REPLCONF","listening-port",port
,
576 /* Ignore the error if any, not all the Redis versions support
577 * REPLCONF listening-port. */
579 redisLog(REDIS_NOTICE
,"(non critical): Master does not understand REPLCONF listening-port: %s", err
);
584 /* Issue the SYNC command */
585 if (syncWrite(fd
,"SYNC\r\n",6,server
.repl_syncio_timeout
*1000) == -1) {
586 redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s",
591 /* Prepare a suitable temp file for bulk transfer */
593 snprintf(tmpfile
,256,
594 "temp-%d.%ld.rdb",(int)server
.unixtime
,(long int)getpid());
595 dfd
= open(tmpfile
,O_CREAT
|O_WRONLY
|O_EXCL
,0644);
596 if (dfd
!= -1) break;
600 redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
));
604 /* Setup the non blocking download of the bulk file. */
605 if (aeCreateFileEvent(server
.el
,fd
, AE_READABLE
,readSyncBulkPayload
,NULL
)
608 redisLog(REDIS_WARNING
,"Can't create readable event for SYNC");
612 server
.repl_state
= REDIS_REPL_TRANSFER
;
613 server
.repl_transfer_size
= -1;
614 server
.repl_transfer_read
= 0;
615 server
.repl_transfer_last_fsync_off
= 0;
616 server
.repl_transfer_fd
= dfd
;
617 server
.repl_transfer_lastio
= server
.unixtime
;
618 server
.repl_transfer_tmpfile
= zstrdup(tmpfile
);
623 server
.repl_transfer_s
= -1;
624 server
.repl_state
= REDIS_REPL_CONNECT
;
628 int connectWithMaster(void) {
631 fd
= anetTcpNonBlockConnect(NULL
,server
.masterhost
,server
.masterport
);
633 redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s",
638 if (aeCreateFileEvent(server
.el
,fd
,AE_READABLE
|AE_WRITABLE
,syncWithMaster
,NULL
) ==
642 redisLog(REDIS_WARNING
,"Can't create readable event for SYNC");
646 server
.repl_transfer_lastio
= server
.unixtime
;
647 server
.repl_transfer_s
= fd
;
648 server
.repl_state
= REDIS_REPL_CONNECTING
;
652 /* This function can be called when a non blocking connection is currently
653 * in progress to undo it. */
654 void undoConnectWithMaster(void) {
655 int fd
= server
.repl_transfer_s
;
657 redisAssert(server
.repl_state
== REDIS_REPL_CONNECTING
||
658 server
.repl_state
== REDIS_REPL_RECEIVE_PONG
);
659 aeDeleteFileEvent(server
.el
,fd
,AE_READABLE
|AE_WRITABLE
);
661 server
.repl_transfer_s
= -1;
662 server
.repl_state
= REDIS_REPL_CONNECT
;
665 void slaveofCommand(redisClient
*c
) {
666 if (!strcasecmp(c
->argv
[1]->ptr
,"no") &&
667 !strcasecmp(c
->argv
[2]->ptr
,"one")) {
668 if (server
.masterhost
) {
669 sdsfree(server
.masterhost
);
670 server
.masterhost
= NULL
;
671 if (server
.master
) freeClient(server
.master
);
672 if (server
.repl_state
== REDIS_REPL_TRANSFER
)
673 replicationAbortSyncTransfer();
674 else if (server
.repl_state
== REDIS_REPL_CONNECTING
||
675 server
.repl_state
== REDIS_REPL_RECEIVE_PONG
)
676 undoConnectWithMaster();
677 server
.repl_state
= REDIS_REPL_NONE
;
678 redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)");
683 if ((getLongFromObjectOrReply(c
, c
->argv
[2], &port
, NULL
) != REDIS_OK
))
686 /* Check if we are already attached to the specified slave */
687 if (server
.masterhost
&& !strcasecmp(server
.masterhost
,c
->argv
[1]->ptr
)
688 && server
.masterport
== port
) {
689 redisLog(REDIS_NOTICE
,"SLAVE OF would result into synchronization with the master we are already connected with. No operation performed.");
690 addReplySds(c
,sdsnew("+OK Already connected to specified master\r\n"));
693 /* There was no previous master or the user specified a different one,
694 * we can continue. */
695 sdsfree(server
.masterhost
);
696 server
.masterhost
= sdsdup(c
->argv
[1]->ptr
);
697 server
.masterport
= port
;
698 if (server
.master
) freeClient(server
.master
);
699 disconnectSlaves(); /* Force our slaves to resync with us as well. */
700 if (server
.repl_state
== REDIS_REPL_TRANSFER
)
701 replicationAbortSyncTransfer();
702 server
.repl_state
= REDIS_REPL_CONNECT
;
703 redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)",
704 server
.masterhost
, server
.masterport
);
706 addReply(c
,shared
.ok
);
709 /* --------------------------- REPLICATION CRON ---------------------------- */
711 void replicationCron(void) {
712 /* Non blocking connection timeout? */
713 if (server
.masterhost
&&
714 (server
.repl_state
== REDIS_REPL_CONNECTING
||
715 server
.repl_state
== REDIS_REPL_RECEIVE_PONG
) &&
716 (time(NULL
)-server
.repl_transfer_lastio
) > server
.repl_timeout
)
718 redisLog(REDIS_WARNING
,"Timeout connecting to the MASTER...");
719 undoConnectWithMaster();
722 /* Bulk transfer I/O timeout? */
723 if (server
.masterhost
&& server
.repl_state
== REDIS_REPL_TRANSFER
&&
724 (time(NULL
)-server
.repl_transfer_lastio
) > server
.repl_timeout
)
726 redisLog(REDIS_WARNING
,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
727 replicationAbortSyncTransfer();
730 /* Timed out master when we are an already connected slave? */
731 if (server
.masterhost
&& server
.repl_state
== REDIS_REPL_CONNECTED
&&
732 (time(NULL
)-server
.master
->lastinteraction
) > server
.repl_timeout
)
734 redisLog(REDIS_WARNING
,"MASTER time out: no data nor PING received...");
735 freeClient(server
.master
);
738 /* Check if we should connect to a MASTER */
739 if (server
.repl_state
== REDIS_REPL_CONNECT
) {
740 redisLog(REDIS_NOTICE
,"Connecting to MASTER...");
741 if (connectWithMaster() == REDIS_OK
) {
742 redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync started");
746 /* If we have attached slaves, PING them from time to time.
747 * So slaves can implement an explicit timeout to masters, and will
748 * be able to detect a link disconnection even if the TCP connection
749 * will not actually go down. */
750 if (!(server
.cronloops
% (server
.repl_ping_slave_period
* REDIS_HZ
))) {
754 listRewind(server
.slaves
,&li
);
755 while((ln
= listNext(&li
))) {
756 redisClient
*slave
= ln
->value
;
758 /* Don't ping slaves that are in the middle of a bulk transfer
759 * with the master for first synchronization. */
760 if (slave
->replstate
== REDIS_REPL_SEND_BULK
) continue;
761 if (slave
->replstate
== REDIS_REPL_ONLINE
) {
762 /* If the slave is online send a normal ping */
763 addReplySds(slave
,sdsnew("*1\r\n$4\r\nPING\r\n"));
765 /* Otherwise we are in the pre-synchronization stage.
766 * Just a newline will do the work of refreshing the
767 * connection last interaction time, and at the same time
768 * we'll be sure that being a single char there are no
769 * short-write problems. */
770 if (write(slave
->fd
, "\n", 1) == -1) {
771 /* Don't worry, it's just a ping. */