8 /* ---------------------------------- MASTER -------------------------------- */ 
  10 void replicationFeedSlaves(list 
*slaves
, int dictid
, robj 
**argv
, int argc
) { 
  15     listRewind(slaves
,&li
); 
  16     while((ln 
= listNext(&li
))) { 
  17         redisClient 
*slave 
= ln
->value
; 
  19         /* Don't feed slaves that are still waiting for BGSAVE to start */ 
  20         if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) continue; 
  22         /* Feed slaves that are waiting for the initial SYNC (so these commands 
  23          * are queued in the output buffer until the intial SYNC completes), 
  24          * or are already in sync with the master. */ 
  25         if (slave
->slaveseldb 
!= dictid
) { 
  29             case 0: selectcmd 
= shared
.select0
; break; 
  30             case 1: selectcmd 
= shared
.select1
; break; 
  31             case 2: selectcmd 
= shared
.select2
; break; 
  32             case 3: selectcmd 
= shared
.select3
; break; 
  33             case 4: selectcmd 
= shared
.select4
; break; 
  34             case 5: selectcmd 
= shared
.select5
; break; 
  35             case 6: selectcmd 
= shared
.select6
; break; 
  36             case 7: selectcmd 
= shared
.select7
; break; 
  37             case 8: selectcmd 
= shared
.select8
; break; 
  38             case 9: selectcmd 
= shared
.select9
; break; 
  40                 selectcmd 
= createObject(REDIS_STRING
, 
  41                     sdscatprintf(sdsempty(),"select %d\r\n",dictid
)); 
  42                 selectcmd
->refcount 
= 0; 
  45             addReply(slave
,selectcmd
); 
  46             slave
->slaveseldb 
= dictid
; 
  48         addReplyMultiBulkLen(slave
,argc
); 
  49         for (j 
= 0; j 
< argc
; j
++) addReplyBulk(slave
,argv
[j
]); 
  53 void replicationFeedMonitors(list 
*monitors
, int dictid
, robj 
**argv
, int argc
) { 
  57     sds cmdrepr 
= sdsnew("+"); 
  61     gettimeofday(&tv
,NULL
); 
  62     cmdrepr 
= sdscatprintf(cmdrepr
,"%ld.%06ld ",(long)tv
.tv_sec
,(long)tv
.tv_usec
); 
  63     if (dictid 
!= 0) cmdrepr 
= sdscatprintf(cmdrepr
,"(db %d) ", dictid
); 
  65     for (j 
= 0; j 
< argc
; j
++) { 
  66         if (argv
[j
]->encoding 
== REDIS_ENCODING_INT
) { 
  67             cmdrepr 
= sdscatprintf(cmdrepr
, "\"%ld\"", (long)argv
[j
]->ptr
); 
  69             cmdrepr 
= sdscatrepr(cmdrepr
,(char*)argv
[j
]->ptr
, 
  70                         sdslen(argv
[j
]->ptr
)); 
  73             cmdrepr 
= sdscatlen(cmdrepr
," ",1); 
  75     cmdrepr 
= sdscatlen(cmdrepr
,"\r\n",2); 
  76     cmdobj 
= createObject(REDIS_STRING
,cmdrepr
); 
  78     listRewind(monitors
,&li
); 
  79     while((ln 
= listNext(&li
))) { 
  80         redisClient 
*monitor 
= ln
->value
; 
  81         addReply(monitor
,cmdobj
); 
  86 void syncCommand(redisClient 
*c
) { 
  87     /* ignore SYNC if aleady slave or in monitor mode */ 
  88     if (c
->flags 
& REDIS_SLAVE
) return; 
  90     /* Refuse SYNC requests if we are a slave but the link with our master 
  92     if (server
.masterhost 
&& server
.replstate 
!= REDIS_REPL_CONNECTED
) { 
  93         addReplyError(c
,"Can't SYNC while not connected with my master"); 
  97     /* SYNC can't be issued when the server has pending data to send to 
  98      * the client about already issued commands. We need a fresh reply 
  99      * buffer registering the differences between the BGSAVE and the current 
 100      * dataset, so that we can copy to other slaves if needed. */ 
 101     if (listLength(c
->reply
) != 0) { 
 102         addReplyError(c
,"SYNC is invalid with pending input"); 
 106     redisLog(REDIS_NOTICE
,"Slave ask for synchronization"); 
 107     /* Here we need to check if there is a background saving operation 
 108      * in progress, or if it is required to start one */ 
 109     if (server
.bgsavechildpid 
!= -1) { 
 110         /* Ok a background save is in progress. Let's check if it is a good 
 111          * one for replication, i.e. if there is another slave that is 
 112          * registering differences since the server forked to save */ 
 117         listRewind(server
.slaves
,&li
); 
 118         while((ln 
= listNext(&li
))) { 
 120             if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_END
) break; 
 123             /* Perfect, the server is already registering differences for 
 124              * another slave. Set the right state, and copy the buffer. */ 
 125             listRelease(c
->reply
); 
 126             c
->reply 
= listDup(slave
->reply
); 
 127             c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
 128             redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC"); 
 130             /* No way, we need to wait for the next BGSAVE in order to 
 131              * register differences */ 
 132             c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_START
; 
 133             redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC"); 
 136         /* Ok we don't have a BGSAVE in progress, let's start one */ 
 137         redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC"); 
 138         if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) { 
 139             redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE"); 
 140             addReplyError(c
,"Unable to perform background save"); 
 143         c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
 146     c
->flags 
|= REDIS_SLAVE
; 
 148     listAddNodeTail(server
.slaves
,c
); 
 152 void sendBulkToSlave(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 153     redisClient 
*slave 
= privdata
; 
 156     char buf
[REDIS_IOBUF_LEN
]; 
 157     ssize_t nwritten
, buflen
; 
 159     if (slave
->repldboff 
== 0) { 
 160         /* Write the bulk write count before to transfer the DB. In theory here 
 161          * we don't know how much room there is in the output buffer of the 
 162          * socket, but in pratice SO_SNDLOWAT (the minimum count for output 
 163          * operations) will never be smaller than the few bytes we need. */ 
 166         bulkcount 
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) 
 168         if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
)) 
 176     lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
); 
 177     buflen 
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
); 
 179         redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s", 
 180             (buflen 
== 0) ? "premature EOF" : strerror(errno
)); 
 184     if ((nwritten 
= write(fd
,buf
,buflen
)) == -1) { 
 185         redisLog(REDIS_VERBOSE
,"Write error sending DB to slave: %s", 
 190     slave
->repldboff 
+= nwritten
; 
 191     if (slave
->repldboff 
== slave
->repldbsize
) { 
 192         close(slave
->repldbfd
); 
 193         slave
->repldbfd 
= -1; 
 194         aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
); 
 195         slave
->replstate 
= REDIS_REPL_ONLINE
; 
 196         if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, 
 197             sendReplyToClient
, slave
) == AE_ERR
) { 
 201         addReplySds(slave
,sdsempty()); 
 202         redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded"); 
 206 /* This function is called at the end of every backgrond saving. 
 207  * The argument bgsaveerr is REDIS_OK if the background saving succeeded 
 208  * otherwise REDIS_ERR is passed to the function. 
 210  * The goal of this function is to handle slaves waiting for a successful 
 211  * background saving in order to perform non-blocking synchronization. */ 
 212 void updateSlavesWaitingBgsave(int bgsaveerr
) { 
 217     listRewind(server
.slaves
,&li
); 
 218     while((ln 
= listNext(&li
))) { 
 219         redisClient 
*slave 
= ln
->value
; 
 221         if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) { 
 223             slave
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
 224         } else if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_END
) { 
 225             struct redis_stat buf
; 
 227             if (bgsaveerr 
!= REDIS_OK
) { 
 229                 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error"); 
 232             if ((slave
->repldbfd 
= open(server
.dbfilename
,O_RDONLY
)) == -1 || 
 233                 redis_fstat(slave
->repldbfd
,&buf
) == -1) { 
 235                 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
)); 
 238             slave
->repldboff 
= 0; 
 239             slave
->repldbsize 
= buf
.st_size
; 
 240             slave
->replstate 
= REDIS_REPL_SEND_BULK
; 
 241             aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
); 
 242             if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) { 
 249         if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) { 
 252             listRewind(server
.slaves
,&li
); 
 253             redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed"); 
 254             while((ln 
= listNext(&li
))) { 
 255                 redisClient 
*slave 
= ln
->value
; 
 257                 if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) 
 264 /* ----------------------------------- SLAVE -------------------------------- */ 
 266 /* Abort the async download of the bulk dataset while SYNC-ing with master */ 
 267 void replicationAbortSyncTransfer(void) { 
 268     redisAssert(server
.replstate 
== REDIS_REPL_TRANSFER
); 
 270     aeDeleteFileEvent(server
.el
,server
.repl_transfer_s
,AE_READABLE
); 
 271     close(server
.repl_transfer_s
); 
 272     close(server
.repl_transfer_fd
); 
 273     unlink(server
.repl_transfer_tmpfile
); 
 274     zfree(server
.repl_transfer_tmpfile
); 
 275     server
.replstate 
= REDIS_REPL_CONNECT
; 
 278 /* Asynchronously read the SYNC payload we receive from a master */ 
 279 void readSyncBulkPayload(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 281     ssize_t nread
, readlen
; 
 283     REDIS_NOTUSED(privdata
); 
 286     /* If repl_transfer_left == -1 we still have to read the bulk length 
 287      * from the master reply. */ 
 288     if (server
.repl_transfer_left 
== -1) { 
 289         if (syncReadLine(fd
,buf
,1024,server
.repl_syncio_timeout
) == -1) { 
 290             redisLog(REDIS_WARNING
, 
 291                 "I/O error reading bulk count from MASTER: %s", 
 297             redisLog(REDIS_WARNING
, 
 298                 "MASTER aborted replication with an error: %s", 
 301         } else if (buf
[0] == '\0') { 
 302             /* At this stage just a newline works as a PING in order to take 
 303              * the connection live. So we refresh our last interaction 
 305             server
.repl_transfer_lastio 
= time(NULL
); 
 307         } else if (buf
[0] != '$') { 
 308             redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?"); 
 311         server
.repl_transfer_left 
= strtol(buf
+1,NULL
,10); 
 312         redisLog(REDIS_NOTICE
, 
 313             "MASTER <-> SLAVE sync: receiving %ld bytes from master", 
 314             server
.repl_transfer_left
); 
 319     readlen 
= (server
.repl_transfer_left 
< (signed)sizeof(buf
)) ? 
 320         server
.repl_transfer_left 
: (signed)sizeof(buf
); 
 321     nread 
= read(fd
,buf
,readlen
); 
 323         redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s", 
 324             (nread 
== -1) ? strerror(errno
) : "connection lost"); 
 325         replicationAbortSyncTransfer(); 
 328     server
.repl_transfer_lastio 
= time(NULL
); 
 329     if (write(server
.repl_transfer_fd
,buf
,nread
) != nread
) { 
 330         redisLog(REDIS_WARNING
,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
)); 
 333     server
.repl_transfer_left 
-= nread
; 
 334     /* Check if the transfer is now complete */ 
 335     if (server
.repl_transfer_left 
== 0) { 
 336         if (rename(server
.repl_transfer_tmpfile
,server
.dbfilename
) == -1) { 
 337             redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
)); 
 338             replicationAbortSyncTransfer(); 
 341         redisLog(REDIS_NOTICE
, "MASTER <-> SLAVE sync: Loading DB in memory"); 
 343         /* Before loading the DB into memory we need to delete the readable 
 344          * handler, otherwise it will get called recursively since 
 345          * rdbLoad() will call the event loop to process events from time to 
 346          * time for non blocking loading. */ 
 347         aeDeleteFileEvent(server
.el
,server
.repl_transfer_s
,AE_READABLE
); 
 348         if (rdbLoad(server
.dbfilename
) != REDIS_OK
) { 
 349             redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk"); 
 350             replicationAbortSyncTransfer(); 
 353         /* Final setup of the connected slave <- master link */ 
 354         zfree(server
.repl_transfer_tmpfile
); 
 355         close(server
.repl_transfer_fd
); 
 356         server
.master 
= createClient(server
.repl_transfer_s
); 
 357         server
.master
->flags 
|= REDIS_MASTER
; 
 358         server
.master
->authenticated 
= 1; 
 359         server
.replstate 
= REDIS_REPL_CONNECTED
; 
 360         redisLog(REDIS_NOTICE
, "MASTER <-> SLAVE sync: Finished with success"); 
 366     replicationAbortSyncTransfer(); 
 370 void syncWithMaster(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 371     char buf
[1024], tmpfile
[256]; 
 372     int dfd
, maxtries 
= 5; 
 374     REDIS_NOTUSED(privdata
); 
 377     /* This event should only be triggered once since it is used to have a 
 378      * non-blocking connect(2) to the master. It has been triggered when this 
 379      * function is called, so we can delete it. */ 
 380     aeDeleteFileEvent(server
.el
,fd
,AE_WRITABLE
); 
 382     /* AUTH with the master if required. */ 
 383     if(server
.masterauth
) { 
 387         authlen 
= snprintf(authcmd
,sizeof(authcmd
),"AUTH %s\r\n",server
.masterauth
); 
 388         if (syncWrite(fd
,authcmd
,authlen
,server
.repl_syncio_timeout
) == -1) { 
 389             redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s", 
 393         /* Read the AUTH result.  */ 
 394         if (syncReadLine(fd
,buf
,1024,server
.repl_syncio_timeout
) == -1) { 
 395             redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s", 
 400             redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?"); 
 405     /* Issue the SYNC command */ 
 406     if (syncWrite(fd
,"SYNC \r\n",7,server
.repl_syncio_timeout
) == -1) { 
 407         redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s", 
 412     /* Prepare a suitable temp file for bulk transfer */ 
 414         snprintf(tmpfile
,256, 
 415             "temp-%d.%ld.rdb",(int)time(NULL
),(long int)getpid()); 
 416         dfd 
= open(tmpfile
,O_CREAT
|O_WRONLY
|O_EXCL
,0644); 
 417         if (dfd 
!= -1) break; 
 421         redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
)); 
 425     /* Setup the non blocking download of the bulk file. */ 
 426     if (aeCreateFileEvent(server
.el
,fd
, AE_READABLE
,readSyncBulkPayload
,NULL
) 
 429         redisLog(REDIS_WARNING
,"Can't create readable event for SYNC"); 
 433     server
.replstate 
= REDIS_REPL_TRANSFER
; 
 434     server
.repl_transfer_left 
= -1; 
 435     server
.repl_transfer_fd 
= dfd
; 
 436     server
.repl_transfer_lastio 
= time(NULL
); 
 437     server
.repl_transfer_tmpfile 
= zstrdup(tmpfile
); 
 441     server
.replstate 
= REDIS_REPL_CONNECT
; 
 446 int connectWithMaster(void) { 
 449     fd 
= anetTcpNonBlockConnect(NULL
,server
.masterhost
,server
.masterport
); 
 451         redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s", 
 456     if (aeCreateFileEvent(server
.el
,fd
,AE_WRITABLE
,syncWithMaster
,NULL
) == 
 460         redisLog(REDIS_WARNING
,"Can't create readable event for SYNC"); 
 464     server
.repl_transfer_s 
= fd
; 
 465     server
.replstate 
= REDIS_REPL_CONNECTING
; 
 469 void slaveofCommand(redisClient 
*c
) { 
 470     if (!strcasecmp(c
->argv
[1]->ptr
,"no") && 
 471         !strcasecmp(c
->argv
[2]->ptr
,"one")) { 
 472         if (server
.masterhost
) { 
 473             sdsfree(server
.masterhost
); 
 474             server
.masterhost 
= NULL
; 
 475             if (server
.master
) freeClient(server
.master
); 
 476             if (server
.replstate 
== REDIS_REPL_TRANSFER
) 
 477                 replicationAbortSyncTransfer(); 
 478             server
.replstate 
= REDIS_REPL_NONE
; 
 479             redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)"); 
 482         sdsfree(server
.masterhost
); 
 483         server
.masterhost 
= sdsdup(c
->argv
[1]->ptr
); 
 484         server
.masterport 
= atoi(c
->argv
[2]->ptr
); 
 485         if (server
.master
) freeClient(server
.master
); 
 486         if (server
.replstate 
== REDIS_REPL_TRANSFER
) 
 487             replicationAbortSyncTransfer(); 
 488         server
.replstate 
= REDIS_REPL_CONNECT
; 
 489         redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)", 
 490             server
.masterhost
, server
.masterport
); 
 492     addReply(c
,shared
.ok
); 
 495 /* --------------------------- REPLICATION CRON  ---------------------------- */ 
 497 #define REDIS_REPL_TIMEOUT 60 
 498 #define REDIS_REPL_PING_SLAVE_PERIOD 10 
 500 void replicationCron(void) { 
 501     /* Bulk transfer I/O timeout? */ 
 502     if (server
.masterhost 
&& server
.replstate 
== REDIS_REPL_TRANSFER 
&& 
 503         (time(NULL
)-server
.repl_transfer_lastio
) > REDIS_REPL_TIMEOUT
) 
 505         redisLog(REDIS_WARNING
,"Timeout receiving bulk data from MASTER..."); 
 506         replicationAbortSyncTransfer(); 
 509     /* Timed out master when we are an already connected slave? */ 
 510     if (server
.masterhost 
&& server
.replstate 
== REDIS_REPL_CONNECTED 
&& 
 511         (time(NULL
)-server
.master
->lastinteraction
) > REDIS_REPL_TIMEOUT
) 
 513         redisLog(REDIS_WARNING
,"MASTER time out: no data nor PING received..."); 
 514         freeClient(server
.master
); 
 517     /* Check if we should connect to a MASTER */ 
 518     if (server
.replstate 
== REDIS_REPL_CONNECT
) { 
 519         redisLog(REDIS_NOTICE
,"Connecting to MASTER..."); 
 520         if (connectWithMaster() == REDIS_OK
) { 
 521             redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync started"); 
 522             if (server
.appendonly
) rewriteAppendOnlyFileBackground(); 
 526     /* If we have attached slaves, PING them from time to time. 
 527      * So slaves can implement an explicit timeout to masters, and will 
 528      * be able to detect a link disconnection even if the TCP connection 
 529      * will not actually go down. */ 
 530     if (!(server
.cronloops 
% (REDIS_REPL_PING_SLAVE_PERIOD
*10))) { 
 534         listRewind(server
.slaves
,&li
); 
 535         while((ln 
= listNext(&li
))) { 
 536             redisClient 
*slave 
= ln
->value
; 
 538             /* Don't ping slaves that are in the middle of a bulk transfer 
 539              * with the master for first synchronization. */ 
 540             if (slave
->replstate 
== REDIS_REPL_SEND_BULK
) continue; 
 541             if (slave
->replstate 
== REDIS_REPL_ONLINE
) { 
 542                 /* If the slave is online send a normal ping */ 
 543                 addReplySds(slave
,sdsnew("PING\r\n")); 
 545                 /* Otherwise we are in the pre-synchronization stage. 
 546                  * Just a newline will do the work of refreshing the 
 547                  * connection last interaction time, and at the same time 
 548                  * we'll be sure that being a single char there are no 
 549                  * short-write problems. */ 
 550                 if (write(slave
->fd
, "\n", 1) == -1) { 
 551                     /* Don't worry, it's just a ping. */