]> git.saurik.com Git - redis.git/blobdiff - src/cluster.c
zmalloc: kill unused __size parameter in update_zmalloc_stat_alloc() macro.
[redis.git] / src / cluster.c
index 308d3bf7ae1f8bf0e42e32be176914dbb2a9c545..cbcdf373df8d8b3ed69d13979537db9a2098d1b6 100644 (file)
@@ -1,9 +1,40 @@
+/* Redis Cluster implementation.
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *   * Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   * Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *   * Neither the name of Redis nor the names of its contributors may be used
+ *     to endorse or promote products derived from this software without
+ *     specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
 #include "redis.h"
 #include "redis.h"
-#include "sha1.h"
+#include "endianconv.h"
 
 #include <arpa/inet.h>
 #include <fcntl.h>
 #include <unistd.h>
 
 #include <arpa/inet.h>
 #include <fcntl.h>
 #include <unistd.h>
+#include <sys/socket.h>
 
 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
 
 void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
 void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
@@ -209,7 +240,7 @@ void clusterInit(void) {
         exit(1);
     }
     if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
         exit(1);
     }
     if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
-        clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
+        clusterAcceptHandler, NULL) == AE_ERR) redisPanic("Unrecoverable error creating Redis Cluster file event.");
     server.cluster.slots_to_keys = zslCreate();
 }
 
     server.cluster.slots_to_keys = zslCreate();
 }
 
@@ -901,7 +932,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
     } else {
         payload = zmalloc(totlen);
         hdr = (clusterMsg*) payload;
     } else {
         payload = zmalloc(totlen);
         hdr = (clusterMsg*) payload;
-        memcpy(payload,hdr,sizeof(hdr));
+        memcpy(payload,hdr,sizeof(*hdr));
     }
     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
     }
     memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
     memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
@@ -1464,8 +1495,8 @@ void clusterCommand(redisClient *c) {
 /* Generates a DUMP-format representation of the object 'o', adding it to the
  * io stream pointed by 'rio'. This function can't fail. */
 void createDumpPayload(rio *payload, robj *o) {
 /* Generates a DUMP-format representation of the object 'o', adding it to the
  * io stream pointed by 'rio'. This function can't fail. */
 void createDumpPayload(rio *payload, robj *o) {
-    unsigned char hash[20], buf[2];
-    SHA1_CTX ctx;
+    unsigned char buf[2];
+    uint64_t crc;
 
     /* Serialize the object in a RDB-like format. It consist of an object type
      * byte followed by the serialized object. This is understood by RESTORE. */
 
     /* Serialize the object in a RDB-like format. It consist of an object type
      * byte followed by the serialized object. This is understood by RESTORE. */
@@ -1474,35 +1505,34 @@ void createDumpPayload(rio *payload, robj *o) {
     redisAssert(rdbSaveObject(payload,o));
 
     /* Write the footer, this is how it looks like:
     redisAssert(rdbSaveObject(payload,o));
 
     /* Write the footer, this is how it looks like:
-     * ----------------+---------------------+--------------+
-     * ... RDB payload | 2 bytes RDB version | 8 bytes SHA1 |
-     * ----------------+---------------------+--------------+
-     * The SHA1 is just 8 bytes of truncated SHA1 of everything excluding itself.
-     * The 2 bytes RDB version is a little endian unsigned integer.  */
+     * ----------------+---------------------+---------------+
+     * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
+     * ----------------+---------------------+---------------+
+     * RDB version and CRC are both in little endian.
+     */
 
     /* RDB version */
     buf[0] = REDIS_RDB_VERSION & 0xff;
     buf[1] = (REDIS_RDB_VERSION >> 8) & 0xff;
     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
 
 
     /* RDB version */
     buf[0] = REDIS_RDB_VERSION & 0xff;
     buf[1] = (REDIS_RDB_VERSION >> 8) & 0xff;
     payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
 
-    /* Truncated SHA1 */
-    SHA1Init(&ctx);
-    SHA1Update(&ctx,(unsigned char*)payload->io.buffer.ptr,
-                    sdslen(payload->io.buffer.ptr));
-    SHA1Final(hash,&ctx);
-    payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,hash,8);
+    /* CRC64 */
+    crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
+                sdslen(payload->io.buffer.ptr));
+    memrev64ifbe(&crc);
+    payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
 }
 
 /* Verify that the RDB version of the dump payload matches the one of this Redis
 }
 
 /* Verify that the RDB version of the dump payload matches the one of this Redis
- * instance and that the truncated SHA1 is ok.
+ * instance and that the checksum is ok.
  * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR
  * is returned. */
 int verifyDumpPayload(unsigned char *p, size_t len) {
  * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR
  * is returned. */
 int verifyDumpPayload(unsigned char *p, size_t len) {
-    unsigned char hash[20], *footer;
-    SHA1_CTX ctx;
+    unsigned char *footer;
     uint16_t rdbver;
     uint16_t rdbver;
+    uint64_t crc;
 
 
-    /* At least 2 bytes of RDB version and 8 of truncated SHA should be present. */
+    /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
     if (len < 10) return REDIS_ERR;
     footer = p+(len-10);
 
     if (len < 10) return REDIS_ERR;
     footer = p+(len-10);
 
@@ -1510,11 +1540,10 @@ int verifyDumpPayload(unsigned char *p, size_t len) {
     rdbver = (footer[1] << 8) | footer[0];
     if (rdbver != REDIS_RDB_VERSION) return REDIS_ERR;
 
     rdbver = (footer[1] << 8) | footer[0];
     if (rdbver != REDIS_RDB_VERSION) return REDIS_ERR;
 
-    /* Verify truncated SHA1 */
-    SHA1Init(&ctx);
-    SHA1Update(&ctx,p,len-8);
-    SHA1Final(hash,&ctx);
-    return (memcmp(hash,footer+2,8) == 0) ? REDIS_OK : REDIS_ERR;
+    /* Verify CRC64 */
+    crc = crc64(0,p,len-8);
+    memrev64ifbe(&crc);
+    return (memcmp(&crc,footer+2,8) == 0) ? REDIS_OK : REDIS_ERR;
 }
 
 /* DUMP keyname
 }
 
 /* DUMP keyname
@@ -1540,15 +1569,25 @@ void dumpCommand(redisClient *c) {
     return;
 }
 
     return;
 }
 
-/* RESTORE key ttl serialized-value */
+/* RESTORE key ttl serialized-value [REPLACE] */
 void restoreCommand(redisClient *c) {
     long ttl;
     rio payload;
 void restoreCommand(redisClient *c) {
     long ttl;
     rio payload;
-    int type;
+    int j, type, replace = 0;
     robj *obj;
 
     robj *obj;
 
+    /* Parse additional options */
+    for (j = 4; j < c->argc; j++) {
+        if (!strcasecmp(c->argv[j]->ptr,"replace")) {
+            replace = 1;
+        } else {
+            addReply(c,shared.syntaxerr);
+            return;
+        }
+    }
+
     /* Make sure this key does not already exist here... */
     /* Make sure this key does not already exist here... */
-    if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
+    if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
         addReplyError(c,"Target key name is busy.");
         return;
     }
         addReplyError(c,"Target key name is busy.");
         return;
     }
@@ -1561,7 +1600,7 @@ void restoreCommand(redisClient *c) {
         return;
     }
 
         return;
     }
 
-    /* Verify truncated SHA1 and RDB version. */
+    /* Verify RDB version and data checksum. */
     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == REDIS_ERR) {
         addReplyError(c,"DUMP payload version or checksum are wrong");
         return;
     if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == REDIS_ERR) {
         addReplyError(c,"DUMP payload version or checksum are wrong");
         return;
@@ -1575,6 +1614,9 @@ void restoreCommand(redisClient *c) {
         return;
     }
 
         return;
     }
 
+    /* Remove the old key if needed. */
+    if (replace) dbDelete(c->db,c->argv[1]);
+
     /* Create the key and set the TTL if any */
     dbAdd(c->db,c->argv[1],obj);
     if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
     /* Create the key and set the TTL if any */
     dbAdd(c->db,c->argv[1],obj);
     if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
@@ -1583,21 +1625,154 @@ void restoreCommand(redisClient *c) {
     server.dirty++;
 }
 
     server.dirty++;
 }
 
-/* MIGRATE host port key dbid timeout */
-void migrateCommand(redisClient *c) {
+/* MIGRATE socket cache implementation.
+ *
+ * We take a map between host:ip and a TCP socket that we used to connect
+ * to this instance in recent time.
+ * This sockets are closed when the max number we cache is reached, and also
+ * in serverCron() when they are around for more than a few seconds. */
+#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
+#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached socekts after 10 sec. */
+
+typedef struct migrateCachedSocket {
+    int fd;
+    time_t last_use_time;
+} migrateCachedSocket;
+
+/* Return a TCP scoket connected with the target instance, possibly returning
+ * a cached one.
+ *
+ * This function is responsible of sending errors to the client if a
+ * connection can't be established. In this case -1 is returned.
+ * Otherwise on success the socket is returned, and the caller should not
+ * attempt to free it after usage.
+ *
+ * If the caller detects an error while using the socket, migrateCloseSocket()
+ * should be called so that the connection will be craeted from scratch
+ * the next time. */
+int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) {
     int fd;
     int fd;
+    sds name = sdsempty();
+    migrateCachedSocket *cs;
+
+    /* Check if we have an already cached socket for this ip:port pair. */
+    name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+    name = sdscatlen(name,":",1);
+    name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+    cs = dictFetchValue(server.migrate_cached_sockets,name);
+    if (cs) {
+        sdsfree(name);
+        cs->last_use_time = server.unixtime;
+        return cs->fd;
+    }
+
+    /* No cached socket, create one. */
+    if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
+        /* Too many items, drop one at random. */
+        dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
+        cs = dictGetVal(de);
+        close(cs->fd);
+        zfree(cs);
+        dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+    }
+
+    /* Create the socket */
+    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
+                atoi(c->argv[2]->ptr));
+    if (fd == -1) {
+        sdsfree(name);
+        addReplyErrorFormat(c,"Can't connect to target node: %s",
+            server.neterr);
+        return -1;
+    }
+    anetTcpNoDelay(server.neterr,fd);
+
+    /* Check if it connects within the specified timeout. */
+    if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
+        sdsfree(name);
+        addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
+        close(fd);
+        return -1;
+    }
+
+    /* Add to the cache and return it to the caller. */
+    cs = zmalloc(sizeof(*cs));
+    cs->fd = fd;
+    cs->last_use_time = server.unixtime;
+    dictAdd(server.migrate_cached_sockets,name,cs);
+    return fd;
+}
+
+/* Free a migrate cached connection. */
+void migrateCloseSocket(robj *host, robj *port) {
+    sds name = sdsempty();
+    migrateCachedSocket *cs;
+
+    name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+    name = sdscatlen(name,":",1);
+    name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+    cs = dictFetchValue(server.migrate_cached_sockets,name);
+    if (!cs) {
+        sdsfree(name);
+        return;
+    }
+
+    close(cs->fd);
+    zfree(cs);
+    dictDelete(server.migrate_cached_sockets,name);
+    sdsfree(name);
+}
+
+void migrateCloseTimedoutSockets(void) {
+    dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
+    dictEntry *de;
+
+    while((de = dictNext(di)) != NULL) {
+        migrateCachedSocket *cs = dictGetVal(de);
+
+        if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
+            close(cs->fd);
+            zfree(cs);
+            dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+        }
+    }
+    dictReleaseIterator(di);
+}
+
+/* MIGRATE host port key dbid timeout [COPY | REPLACE] */
+void migrateCommand(redisClient *c) {
+    int fd, copy, replace, j;
     long timeout;
     long dbid;
     long timeout;
     long dbid;
-    time_t ttl;
+    long long ttl, expireat;
     robj *o;
     rio cmd, payload;
     robj *o;
     rio cmd, payload;
+    int retry_num = 0;
+
+try_again:
+    /* Initialization */
+    copy = 0;
+    replace = 0;
+    ttl = 0;
+
+    /* Parse additional options */
+    for (j = 6; j < c->argc; j++) {
+        if (!strcasecmp(c->argv[j]->ptr,"copy")) {
+            copy = 1;
+        } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
+            replace = 1;
+        } else {
+            addReply(c,shared.syntaxerr);
+            return;
+        }
+    }
 
     /* Sanity check */
     if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
         return;
     if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
         return;
 
     /* Sanity check */
     if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
         return;
     if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
         return;
-    if (timeout <= 0) timeout = 1;
+    if (timeout <= 0) timeout = 1000;
 
     /* Check if the key is here. If not we reply with success as there is
      * nothing to migrate (for instance the key expired in the meantime), but
 
     /* Check if the key is here. If not we reply with success as there is
      * nothing to migrate (for instance the key expired in the meantime), but
@@ -1608,17 +1783,8 @@ void migrateCommand(redisClient *c) {
     }
     
     /* Connect */
     }
     
     /* Connect */
-    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
-                atoi(c->argv[2]->ptr));
-    if (fd == -1) {
-        addReplyErrorFormat(c,"Can't connect to target node: %s",
-            server.neterr);
-        return;
-    }
-    if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
-        addReplyError(c,"Timeout connecting to the client");
-        return;
-    }
+    fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
+    if (fd == -1) return; /* error sent to the client by migrateGetSocket() */
 
     /* Create RESTORE payload and generate the protocol to call the command. */
     rioInitWithBuffer(&cmd,sdsempty());
 
     /* Create RESTORE payload and generate the protocol to call the command. */
     rioInitWithBuffer(&cmd,sdsempty());
@@ -1626,21 +1792,31 @@ void migrateCommand(redisClient *c) {
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
     redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
 
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
     redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
 
-    ttl = getExpire(c->db,c->argv[3]);
-    redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
+    expireat = getExpire(c->db,c->argv[3]);
+    if (expireat != -1) {
+        ttl = expireat-mstime();
+        if (ttl < 1) ttl = 1;
+    }
+    redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
     redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
     redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
-    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
+    redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
 
 
-    /* Finally the last argument that is the serailized object payload
-     * in the DUMP format. */
+    /* Emit the payload argument, that is the serailized object using
+     * the DUMP format. */
     createDumpPayload(&payload,o);
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
                                 sdslen(payload.io.buffer.ptr)));
     sdsfree(payload.io.buffer.ptr);
 
     createDumpPayload(&payload,o);
     redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
                                 sdslen(payload.io.buffer.ptr)));
     sdsfree(payload.io.buffer.ptr);
 
+    /* Add the REPLACE option to the RESTORE command if it was specified
+     * as a MIGRATE option. */
+    if (replace)
+        redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
+
     /* Tranfer the query to the other node in 64K chunks. */
     /* Tranfer the query to the other node in 64K chunks. */
+    errno = 0;
     {
         sds buf = cmd.io.buffer.ptr;
         size_t pos = 0, towrite;
     {
         sds buf = cmd.io.buffer.ptr;
         size_t pos = 0, towrite;
@@ -1648,7 +1824,7 @@ void migrateCommand(redisClient *c) {
 
         while ((towrite = sdslen(buf)-pos) > 0) {
             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
 
         while ((towrite = sdslen(buf)-pos) > 0) {
             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
-            nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
+            nwritten = syncWrite(fd,buf+pos,towrite,timeout);
             if (nwritten != (signed)towrite) goto socket_wr_err;
             pos += nwritten;
         }
             if (nwritten != (signed)towrite) goto socket_wr_err;
             pos += nwritten;
         }
@@ -1670,8 +1846,11 @@ void migrateCommand(redisClient *c) {
         } else {
             robj *aux;
 
         } else {
             robj *aux;
 
-            dbDelete(c->db,c->argv[3]);
-            signalModifiedKey(c->db,c->argv[3]);
+            if (!copy) {
+                /* No COPY option: remove the local key, signal the change. */
+                dbDelete(c->db,c->argv[3]);
+                signalModifiedKey(c->db,c->argv[3]);
+            }
             addReply(c,shared.ok);
             server.dirty++;
 
             addReply(c,shared.ok);
             server.dirty++;
 
@@ -1683,25 +1862,22 @@ void migrateCommand(redisClient *c) {
     }
 
     sdsfree(cmd.io.buffer.ptr);
     }
 
     sdsfree(cmd.io.buffer.ptr);
-    close(fd);
     return;
 
 socket_wr_err:
     return;
 
 socket_wr_err:
-    redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s",
-        strerror(errno));
-    addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
-        strerror(errno));
     sdsfree(cmd.io.buffer.ptr);
     sdsfree(cmd.io.buffer.ptr);
-    close(fd);
+    migrateCloseSocket(c->argv[1],c->argv[2]);
+    if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
+    addReplySds(c,
+        sdsnew("-IOERR error or timeout writing to target instance\r\n"));
     return;
 
 socket_rd_err:
     return;
 
 socket_rd_err:
-    redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s",
-        strerror(errno));
-    addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
-        strerror(errno));
     sdsfree(cmd.io.buffer.ptr);
     sdsfree(cmd.io.buffer.ptr);
-    close(fd);
+    migrateCloseSocket(c->argv[1],c->argv[2]);
+    if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
+    addReplySds(c,
+        sdsnew("-IOERR error or timeout reading from target node\r\n"));
     return;
 }
 
     return;
 }