+/* Abort the async download of the bulk dataset while SYNC-ing with master */
+void replicationAbortSyncTransfer(void) {
+ redisAssert(server.replstate == REDIS_REPL_TRANSFER);
+
+ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
+ close(server.repl_transfer_s);
+ close(server.repl_transfer_fd);
+ unlink(server.repl_transfer_tmpfile);
+ zfree(server.repl_transfer_tmpfile);
+ server.replstate = REDIS_REPL_CONNECT;
+}
+
+/* Asynchronously read the SYNC payload we receive from a master */
+void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
+ char buf[4096];
+ ssize_t nread, readlen;
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(privdata);
+ REDIS_NOTUSED(mask);
+
+ /* If repl_transfer_left == -1 we still have to read the bulk length
+ * from the master reply. */
+ if (server.repl_transfer_left == -1) {
+ if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout) == -1) {
+ redisLog(REDIS_WARNING,
+ "I/O error reading bulk count from MASTER: %s",
+ strerror(errno));
+ goto error;
+ }
+
+ if (buf[0] == '-') {
+ redisLog(REDIS_WARNING,
+ "MASTER aborted replication with an error: %s",
+ buf+1);
+ goto error;
+ } else if (buf[0] == '\0') {
+ /* At this stage just a newline works as a PING in order to take
+ * the connection live. So we refresh our last interaction
+ * timestamp. */
+ server.repl_transfer_lastio = time(NULL);
+ return;
+ } else if (buf[0] != '$') {
+ redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
+ goto error;
+ }
+ server.repl_transfer_left = strtol(buf+1,NULL,10);
+ redisLog(REDIS_NOTICE,
+ "MASTER <-> SLAVE sync: receiving %ld bytes from master",
+ server.repl_transfer_left);
+ return;
+ }
+
+ /* Read bulk data */
+ readlen = (server.repl_transfer_left < (signed)sizeof(buf)) ?
+ server.repl_transfer_left : (signed)sizeof(buf);
+ nread = read(fd,buf,readlen);
+ if (nread <= 0) {
+ redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
+ (nread == -1) ? strerror(errno) : "connection lost");
+ replicationAbortSyncTransfer();
+ return;
+ }
+ server.repl_transfer_lastio = time(NULL);
+ if (write(server.repl_transfer_fd,buf,nread) != nread) {
+ redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
+ goto error;
+ }
+ server.repl_transfer_left -= nread;
+ /* Check if the transfer is now complete */
+ if (server.repl_transfer_left == 0) {
+ if (rename(server.repl_transfer_tmpfile,server.dbfilename) == -1) {
+ redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
+ replicationAbortSyncTransfer();
+ return;
+ }
+ redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Loading DB in memory");
+ emptyDb();
+ /* Before loading the DB into memory we need to delete the readable
+ * handler, otherwise it will get called recursively since
+ * rdbLoad() will call the event loop to process events from time to
+ * time for non blocking loading. */
+ aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
+ if (rdbLoad(server.dbfilename) != REDIS_OK) {
+ redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
+ replicationAbortSyncTransfer();
+ return;
+ }
+ /* Final setup of the connected slave <- master link */
+ zfree(server.repl_transfer_tmpfile);
+ close(server.repl_transfer_fd);
+ server.master = createClient(server.repl_transfer_s);
+ server.master->flags |= REDIS_MASTER;
+ server.master->authenticated = 1;
+ server.replstate = REDIS_REPL_CONNECTED;
+ redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync: Finished with success");
+ /* Rewrite the AOF file now that the dataset changed. */
+ if (server.appendonly) rewriteAppendOnlyFileBackground();