8 /* ---------------------------------- MASTER -------------------------------- */ 
  10 void replicationFeedSlaves(list 
*slaves
, int dictid
, robj 
**argv
, int argc
) { 
  15     /* We need 1+(ARGS*3) objects since commands are using the new protocol 
  16      * and we one 1 object for the first "*<count>\r\n" multibulk count, then 
  17      * for every additional object we have "$<count>\r\n" + object + "\r\n". */ 
  18     robj 
*static_outv
[REDIS_STATIC_ARGS
*3+1]; 
  21     if (argc 
<= REDIS_STATIC_ARGS
) { 
  24         outv 
= zmalloc(sizeof(robj
*)*(argc
*3+1)); 
  27     lenobj 
= createObject(REDIS_STRING
, 
  28             sdscatprintf(sdsempty(), "*%d\r\n", argc
)); 
  30     outv
[outc
++] = lenobj
; 
  31     for (j 
= 0; j 
< argc
; j
++) { 
  32         lenobj 
= createObject(REDIS_STRING
, 
  33             sdscatprintf(sdsempty(),"$%lu\r\n", 
  34                 (unsigned long) stringObjectLen(argv
[j
]))); 
  36         outv
[outc
++] = lenobj
; 
  37         outv
[outc
++] = argv
[j
]; 
  38         outv
[outc
++] = shared
.crlf
; 
  41     /* Increment all the refcounts at start and decrement at end in order to 
  42      * be sure to free objects if there is no slave in a replication state 
  43      * able to be feed with commands */ 
  44     for (j 
= 0; j 
< outc
; j
++) incrRefCount(outv
[j
]); 
  45     listRewind(slaves
,&li
); 
  46     while((ln 
= listNext(&li
))) { 
  47         redisClient 
*slave 
= ln
->value
; 
  49         /* Don't feed slaves that are still waiting for BGSAVE to start */ 
  50         if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) continue; 
  52         /* Feed all the other slaves, MONITORs and so on */ 
  53         if (slave
->slaveseldb 
!= dictid
) { 
  57             case 0: selectcmd 
= shared
.select0
; break; 
  58             case 1: selectcmd 
= shared
.select1
; break; 
  59             case 2: selectcmd 
= shared
.select2
; break; 
  60             case 3: selectcmd 
= shared
.select3
; break; 
  61             case 4: selectcmd 
= shared
.select4
; break; 
  62             case 5: selectcmd 
= shared
.select5
; break; 
  63             case 6: selectcmd 
= shared
.select6
; break; 
  64             case 7: selectcmd 
= shared
.select7
; break; 
  65             case 8: selectcmd 
= shared
.select8
; break; 
  66             case 9: selectcmd 
= shared
.select9
; break; 
  68                 selectcmd 
= createObject(REDIS_STRING
, 
  69                     sdscatprintf(sdsempty(),"select %d\r\n",dictid
)); 
  70                 selectcmd
->refcount 
= 0; 
  73             addReply(slave
,selectcmd
); 
  74             slave
->slaveseldb 
= dictid
; 
  76         for (j 
= 0; j 
< outc
; j
++) addReply(slave
,outv
[j
]); 
  78     for (j 
= 0; j 
< outc
; j
++) decrRefCount(outv
[j
]); 
  79     if (outv 
!= static_outv
) zfree(outv
); 
  82 void replicationFeedMonitors(list 
*monitors
, int dictid
, robj 
**argv
, int argc
) { 
  86     sds cmdrepr 
= sdsnew("+"); 
  90     gettimeofday(&tv
,NULL
); 
  91     cmdrepr 
= sdscatprintf(cmdrepr
,"%ld.%06ld ",(long)tv
.tv_sec
,(long)tv
.tv_usec
); 
  92     if (dictid 
!= 0) cmdrepr 
= sdscatprintf(cmdrepr
,"(db %d) ", dictid
); 
  94     for (j 
= 0; j 
< argc
; j
++) { 
  95         if (argv
[j
]->encoding 
== REDIS_ENCODING_INT
) { 
  96             cmdrepr 
= sdscatprintf(cmdrepr
, "\"%ld\"", (long)argv
[j
]->ptr
); 
  98             cmdrepr 
= sdscatrepr(cmdrepr
,(char*)argv
[j
]->ptr
, 
  99                         sdslen(argv
[j
]->ptr
)); 
 102             cmdrepr 
= sdscatlen(cmdrepr
," ",1); 
 104     cmdrepr 
= sdscatlen(cmdrepr
,"\r\n",2); 
 105     cmdobj 
= createObject(REDIS_STRING
,cmdrepr
); 
 107     listRewind(monitors
,&li
); 
 108     while((ln 
= listNext(&li
))) { 
 109         redisClient 
*monitor 
= ln
->value
; 
 110         addReply(monitor
,cmdobj
); 
 112     decrRefCount(cmdobj
); 
 115 void syncCommand(redisClient 
*c
) { 
 116     /* ignore SYNC if aleady slave or in monitor mode */ 
 117     if (c
->flags 
& REDIS_SLAVE
) return; 
 119     /* Refuse SYNC requests if we are a slave but the link with our master 
 121     if (server
.masterhost 
&& server
.replstate 
!= REDIS_REPL_CONNECTED
) { 
 122         addReplyError(c
,"Can't SYNC while not connected with my master"); 
 126     /* SYNC can't be issued when the server has pending data to send to 
 127      * the client about already issued commands. We need a fresh reply 
 128      * buffer registering the differences between the BGSAVE and the current 
 129      * dataset, so that we can copy to other slaves if needed. */ 
 130     if (listLength(c
->reply
) != 0) { 
 131         addReplyError(c
,"SYNC is invalid with pending input"); 
 135     redisLog(REDIS_NOTICE
,"Slave ask for synchronization"); 
 136     /* Here we need to check if there is a background saving operation 
 137      * in progress, or if it is required to start one */ 
 138     if (server
.bgsavechildpid 
!= -1) { 
 139         /* Ok a background save is in progress. Let's check if it is a good 
 140          * one for replication, i.e. if there is another slave that is 
 141          * registering differences since the server forked to save */ 
 146         listRewind(server
.slaves
,&li
); 
 147         while((ln 
= listNext(&li
))) { 
 149             if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_END
) break; 
 152             /* Perfect, the server is already registering differences for 
 153              * another slave. Set the right state, and copy the buffer. */ 
 154             listRelease(c
->reply
); 
 155             c
->reply 
= listDup(slave
->reply
); 
 156             c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
 157             redisLog(REDIS_NOTICE
,"Waiting for end of BGSAVE for SYNC"); 
 159             /* No way, we need to wait for the next BGSAVE in order to 
 160              * register differences */ 
 161             c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_START
; 
 162             redisLog(REDIS_NOTICE
,"Waiting for next BGSAVE for SYNC"); 
 165         /* Ok we don't have a BGSAVE in progress, let's start one */ 
 166         redisLog(REDIS_NOTICE
,"Starting BGSAVE for SYNC"); 
 167         if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) { 
 168             redisLog(REDIS_NOTICE
,"Replication failed, can't BGSAVE"); 
 169             addReplyError(c
,"Unable to perform background save"); 
 172         c
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
 175     c
->flags 
|= REDIS_SLAVE
; 
 177     listAddNodeTail(server
.slaves
,c
); 
 181 void sendBulkToSlave(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 182     redisClient 
*slave 
= privdata
; 
 185     char buf
[REDIS_IOBUF_LEN
]; 
 186     ssize_t nwritten
, buflen
; 
 188     if (slave
->repldboff 
== 0) { 
 189         /* Write the bulk write count before to transfer the DB. In theory here 
 190          * we don't know how much room there is in the output buffer of the 
 191          * socket, but in pratice SO_SNDLOWAT (the minimum count for output 
 192          * operations) will never be smaller than the few bytes we need. */ 
 195         bulkcount 
= sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long) 
 197         if (write(fd
,bulkcount
,sdslen(bulkcount
)) != (signed)sdslen(bulkcount
)) 
 205     lseek(slave
->repldbfd
,slave
->repldboff
,SEEK_SET
); 
 206     buflen 
= read(slave
->repldbfd
,buf
,REDIS_IOBUF_LEN
); 
 208         redisLog(REDIS_WARNING
,"Read error sending DB to slave: %s", 
 209             (buflen 
== 0) ? "premature EOF" : strerror(errno
)); 
 213     if ((nwritten 
= write(fd
,buf
,buflen
)) == -1) { 
 214         redisLog(REDIS_VERBOSE
,"Write error sending DB to slave: %s", 
 219     slave
->repldboff 
+= nwritten
; 
 220     if (slave
->repldboff 
== slave
->repldbsize
) { 
 221         close(slave
->repldbfd
); 
 222         slave
->repldbfd 
= -1; 
 223         aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
); 
 224         slave
->replstate 
= REDIS_REPL_ONLINE
; 
 225         if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, 
 226             sendReplyToClient
, slave
) == AE_ERR
) { 
 230         addReplySds(slave
,sdsempty()); 
 231         redisLog(REDIS_NOTICE
,"Synchronization with slave succeeded"); 
 235 /* This function is called at the end of every backgrond saving. 
 236  * The argument bgsaveerr is REDIS_OK if the background saving succeeded 
 237  * otherwise REDIS_ERR is passed to the function. 
 239  * The goal of this function is to handle slaves waiting for a successful 
 240  * background saving in order to perform non-blocking synchronization. */ 
 241 void updateSlavesWaitingBgsave(int bgsaveerr
) { 
 246     listRewind(server
.slaves
,&li
); 
 247     while((ln 
= listNext(&li
))) { 
 248         redisClient 
*slave 
= ln
->value
; 
 250         if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) { 
 252             slave
->replstate 
= REDIS_REPL_WAIT_BGSAVE_END
; 
 253         } else if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_END
) { 
 254             struct redis_stat buf
; 
 256             if (bgsaveerr 
!= REDIS_OK
) { 
 258                 redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE child returned an error"); 
 261             if ((slave
->repldbfd 
= open(server
.dbfilename
,O_RDONLY
)) == -1 || 
 262                 redis_fstat(slave
->repldbfd
,&buf
) == -1) { 
 264                 redisLog(REDIS_WARNING
,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno
)); 
 267             slave
->repldboff 
= 0; 
 268             slave
->repldbsize 
= buf
.st_size
; 
 269             slave
->replstate 
= REDIS_REPL_SEND_BULK
; 
 270             aeDeleteFileEvent(server
.el
,slave
->fd
,AE_WRITABLE
); 
 271             if (aeCreateFileEvent(server
.el
, slave
->fd
, AE_WRITABLE
, sendBulkToSlave
, slave
) == AE_ERR
) { 
 278         if (rdbSaveBackground(server
.dbfilename
) != REDIS_OK
) { 
 281             listRewind(server
.slaves
,&li
); 
 282             redisLog(REDIS_WARNING
,"SYNC failed. BGSAVE failed"); 
 283             while((ln 
= listNext(&li
))) { 
 284                 redisClient 
*slave 
= ln
->value
; 
 286                 if (slave
->replstate 
== REDIS_REPL_WAIT_BGSAVE_START
) 
 293 /* ----------------------------------- SLAVE -------------------------------- */ 
 295 /* Abort the async download of the bulk dataset while SYNC-ing with master */ 
 296 void replicationAbortSyncTransfer(void) { 
 297     redisAssert(server
.replstate 
== REDIS_REPL_TRANSFER
); 
 299     aeDeleteFileEvent(server
.el
,server
.repl_transfer_s
,AE_READABLE
); 
 300     close(server
.repl_transfer_s
); 
 301     close(server
.repl_transfer_fd
); 
 302     unlink(server
.repl_transfer_tmpfile
); 
 303     zfree(server
.repl_transfer_tmpfile
); 
 304     server
.replstate 
= REDIS_REPL_CONNECT
; 
 307 /* Asynchronously read the SYNC payload we receive from a master */ 
 308 void readSyncBulkPayload(aeEventLoop 
*el
, int fd
, void *privdata
, int mask
) { 
 310     ssize_t nread
, readlen
; 
 312     REDIS_NOTUSED(privdata
); 
 315     /* If repl_transfer_left == -1 we still have to read the bulk length 
 316      * from the master reply. */ 
 317     if (server
.repl_transfer_left 
== -1) { 
 318         if (syncReadLine(fd
,buf
,1024,3600) == -1) { 
 319             redisLog(REDIS_WARNING
, 
 320                 "I/O error reading bulk count from MASTER: %s", 
 322             replicationAbortSyncTransfer(); 
 326             redisLog(REDIS_WARNING
, 
 327                 "MASTER aborted replication with an error: %s", 
 329             replicationAbortSyncTransfer(); 
 331         } else if (buf
[0] == '\0') { 
 332             /* At this stage just a newline works as a PING in order to take 
 333              * the connection live. So we refresh our last interaction 
 335             server
.repl_transfer_lastio 
= time(NULL
); 
 337         } else if (buf
[0] != '$') { 
 338             redisLog(REDIS_WARNING
,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?"); 
 339             replicationAbortSyncTransfer(); 
 342         server
.repl_transfer_left 
= strtol(buf
+1,NULL
,10); 
 343         redisLog(REDIS_NOTICE
, 
 344             "MASTER <-> SLAVE sync: receiving %ld bytes from master", 
 345             server
.repl_transfer_left
); 
 350     readlen 
= (server
.repl_transfer_left 
< (signed)sizeof(buf
)) ? 
 351         server
.repl_transfer_left 
: (signed)sizeof(buf
); 
 352     nread 
= read(fd
,buf
,readlen
); 
 354         redisLog(REDIS_WARNING
,"I/O error trying to sync with MASTER: %s", 
 355             (nread 
== -1) ? strerror(errno
) : "connection lost"); 
 356         replicationAbortSyncTransfer(); 
 359     server
.repl_transfer_lastio 
= time(NULL
); 
 360     if (write(server
.repl_transfer_fd
,buf
,nread
) != nread
) { 
 361         redisLog(REDIS_WARNING
,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno
)); 
 362         replicationAbortSyncTransfer(); 
 365     server
.repl_transfer_left 
-= nread
; 
 366     /* Check if the transfer is now complete */ 
 367     if (server
.repl_transfer_left 
== 0) { 
 368         if (rename(server
.repl_transfer_tmpfile
,server
.dbfilename
) == -1) { 
 369             redisLog(REDIS_WARNING
,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno
)); 
 370             replicationAbortSyncTransfer(); 
 373         redisLog(REDIS_NOTICE
, "MASTER <-> SLAVE sync: Loading DB in memory"); 
 375         /* Before loading the DB into memory we need to delete the readable 
 376          * handler, otherwise it will get called recursively since 
 377          * rdbLoad() will call the event loop to process events from time to 
 378          * time for non blocking loading. */ 
 379         aeDeleteFileEvent(server
.el
,server
.repl_transfer_s
,AE_READABLE
); 
 380         if (rdbLoad(server
.dbfilename
) != REDIS_OK
) { 
 381             redisLog(REDIS_WARNING
,"Failed trying to load the MASTER synchronization DB from disk"); 
 382             replicationAbortSyncTransfer(); 
 385         /* Final setup of the connected slave <- master link */ 
 386         zfree(server
.repl_transfer_tmpfile
); 
 387         close(server
.repl_transfer_fd
); 
 388         server
.master 
= createClient(server
.repl_transfer_s
); 
 389         server
.master
->flags 
|= REDIS_MASTER
; 
 390         server
.master
->authenticated 
= 1; 
 391         server
.replstate 
= REDIS_REPL_CONNECTED
; 
 392         redisLog(REDIS_NOTICE
, "MASTER <-> SLAVE sync: Finished with success"); 
 396 int syncWithMaster(void) { 
 397     char buf
[1024], tmpfile
[256], authcmd
[1024]; 
 398     int fd 
= anetTcpConnect(NULL
,server
.masterhost
,server
.masterport
); 
 399     int dfd
, maxtries 
= 5; 
 402         redisLog(REDIS_WARNING
,"Unable to connect to MASTER: %s", 
 407     /* AUTH with the master if required. */ 
 408     if(server
.masterauth
) { 
 409         snprintf(authcmd
, 1024, "AUTH %s\r\n", server
.masterauth
); 
 410         if (syncWrite(fd
, authcmd
, strlen(server
.masterauth
)+7, 5) == -1) { 
 412             redisLog(REDIS_WARNING
,"Unable to AUTH to MASTER: %s", 
 416         /* Read the AUTH result.  */ 
 417         if (syncReadLine(fd
,buf
,1024,3600) == -1) { 
 419             redisLog(REDIS_WARNING
,"I/O error reading auth result from MASTER: %s", 
 425             redisLog(REDIS_WARNING
,"Cannot AUTH to MASTER, is the masterauth password correct?"); 
 430     /* Issue the SYNC command */ 
 431     if (syncWrite(fd
,"SYNC \r\n",7,5) == -1) { 
 433         redisLog(REDIS_WARNING
,"I/O error writing to MASTER: %s", 
 438     /* Prepare a suitable temp file for bulk transfer */ 
 440         snprintf(tmpfile
,256, 
 441             "temp-%d.%ld.rdb",(int)time(NULL
),(long int)getpid()); 
 442         dfd 
= open(tmpfile
,O_CREAT
|O_WRONLY
|O_EXCL
,0644); 
 443         if (dfd 
!= -1) break; 
 448         redisLog(REDIS_WARNING
,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno
)); 
 452     /* Setup the non blocking download of the bulk file. */ 
 453     if (aeCreateFileEvent(server
.el
, fd
, AE_READABLE
, readSyncBulkPayload
, NULL
) 
 457         redisLog(REDIS_WARNING
,"Can't create readable event for SYNC"); 
 460     server
.replstate 
= REDIS_REPL_TRANSFER
; 
 461     server
.repl_transfer_left 
= -1; 
 462     server
.repl_transfer_s 
= fd
; 
 463     server
.repl_transfer_fd 
= dfd
; 
 464     server
.repl_transfer_lastio 
= time(NULL
); 
 465     server
.repl_transfer_tmpfile 
= zstrdup(tmpfile
); 
 469 void slaveofCommand(redisClient 
*c
) { 
 470     if (!strcasecmp(c
->argv
[1]->ptr
,"no") && 
 471         !strcasecmp(c
->argv
[2]->ptr
,"one")) { 
 472         if (server
.masterhost
) { 
 473             sdsfree(server
.masterhost
); 
 474             server
.masterhost 
= NULL
; 
 475             if (server
.master
) freeClient(server
.master
); 
 476             if (server
.replstate 
== REDIS_REPL_TRANSFER
) 
 477                 replicationAbortSyncTransfer(); 
 478             server
.replstate 
= REDIS_REPL_NONE
; 
 479             redisLog(REDIS_NOTICE
,"MASTER MODE enabled (user request)"); 
 482         sdsfree(server
.masterhost
); 
 483         server
.masterhost 
= sdsdup(c
->argv
[1]->ptr
); 
 484         server
.masterport 
= atoi(c
->argv
[2]->ptr
); 
 485         if (server
.master
) freeClient(server
.master
); 
 486         if (server
.replstate 
== REDIS_REPL_TRANSFER
) 
 487             replicationAbortSyncTransfer(); 
 488         server
.replstate 
= REDIS_REPL_CONNECT
; 
 489         redisLog(REDIS_NOTICE
,"SLAVE OF %s:%d enabled (user request)", 
 490             server
.masterhost
, server
.masterport
); 
 492     addReply(c
,shared
.ok
); 
 495 /* --------------------------- REPLICATION CRON  ---------------------------- */ 
 497 #define REDIS_REPL_TIMEOUT 60 
 498 #define REDIS_REPL_PING_SLAVE_PERIOD 10 
 500 void replicationCron(void) { 
 501     /* Bulk transfer I/O timeout? */ 
 502     if (server
.masterhost 
&& server
.replstate 
== REDIS_REPL_TRANSFER 
&& 
 503         (time(NULL
)-server
.repl_transfer_lastio
) > REDIS_REPL_TIMEOUT
) 
 505         redisLog(REDIS_WARNING
,"Timeout receiving bulk data from MASTER..."); 
 506         replicationAbortSyncTransfer(); 
 509     /* Timed out master when we are an already connected slave? */ 
 510     if (server
.masterhost 
&& server
.replstate 
== REDIS_REPL_CONNECTED 
&& 
 511         (time(NULL
)-server
.master
->lastinteraction
) > REDIS_REPL_TIMEOUT
) 
 513         redisLog(REDIS_WARNING
,"MASTER time out: no data nor PING received..."); 
 514         freeClient(server
.master
); 
 517     /* Check if we should connect to a MASTER */ 
 518     if (server
.replstate 
== REDIS_REPL_CONNECT
) { 
 519         redisLog(REDIS_NOTICE
,"Connecting to MASTER..."); 
 520         if (syncWithMaster() == REDIS_OK
) { 
 521             redisLog(REDIS_NOTICE
,"MASTER <-> SLAVE sync started: SYNC sent"); 
 522             if (server
.appendonly
) rewriteAppendOnlyFileBackground(); 
 526     /* If we have attached slaves, PING them from time to time. 
 527      * So slaves can implement an explicit timeout to masters, and will 
 528      * be able to detect a link disconnection even if the TCP connection 
 529      * will not actually go down. */ 
 530     if (!(server
.cronloops 
% (REDIS_REPL_PING_SLAVE_PERIOD
*10))) { 
 534         listRewind(server
.slaves
,&li
); 
 535         while((ln 
= listNext(&li
))) { 
 536             redisClient 
*slave 
= ln
->value
; 
 538             /* Don't ping slaves that are in the middle of a bulk transfer 
 539              * with the master for first synchronization. */ 
 540             if (slave
->replstate 
== REDIS_REPL_SEND_BULK
) continue; 
 541             if (slave
->replstate 
== REDIS_REPL_ONLINE
) { 
 542                 /* If the slave is online send a normal ping */ 
 543                 addReplySds(slave
,sdsnew("PING\r\n")); 
 545                 /* Otherwise we are in the pre-synchronization stage. 
 546                  * Just a newline will do the work of refreshing the 
 547                  * connection last interaction time, and at the same time 
 548                  * we'll be sure that being a single char there are no 
 549                  * short-write problems. */ 
 550                 if (write(slave
->fd
, "\n", 1) == -1) { 
 551                     /* Don't worry, it's just a ping. */