]> git.saurik.com Git - redis.git/commitdiff
MIGRATE TCP connections caching.
authorantirez <antirez@gmail.com>
Sun, 11 Nov 2012 23:45:10 +0000 (00:45 +0100)
committerantirez <antirez@gmail.com>
Sun, 11 Nov 2012 23:47:24 +0000 (00:47 +0100)
By caching TCP connections used by MIGRATE to chat with other Redis
instances a 5x performance improvement was measured with
redis-benchmark against small keys.

This can dramatically speedup cluster resharding and other processes
where an high load of MIGRATE commands are used.

src/cluster.c
src/redis.c
src/redis.h

index 498715782f2a81cbf03992c229dcc1973315b2e9..3b560cb0da0302fddffb7c8a432e66c0686b36cb 100644 (file)
@@ -1625,6 +1625,120 @@ void restoreCommand(redisClient *c) {
     server.dirty++;
 }
 
+/* MIGRATE socket cache implementation.
+ *
+ * We take a map between host:ip and a TCP socket that we used to connect
+ * to this instance in recent time.
+ * This sockets are closed when the max number we cache is reached, and also
+ * in serverCron() when they are around for more than a few seconds. */
+#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
+#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached socekts after 10 sec. */
+
+typedef struct migrateCachedSocket {
+    int fd;
+    time_t last_use_time;
+} migrateCachedSocket;
+
+/* Return a TCP scoket connected with the target instance, possibly returning
+ * a cached one.
+ *
+ * This function is responsible of sending errors to the client if a
+ * connection can't be established. In this case -1 is returned.
+ * Otherwise on success the socket is returned, and the caller should not
+ * attempt to free it after usage.
+ *
+ * If the caller detects an error while using the socket, migrateCloseSocket()
+ * should be called so that the connection will be craeted from scratch
+ * the next time. */
+int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) {
+    int fd;
+    sds name = sdsempty();
+    migrateCachedSocket *cs;
+
+    /* Check if we have an already cached socket for this ip:port pair. */
+    name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+    name = sdscatlen(name,":",1);
+    name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+    cs = dictFetchValue(server.migrate_cached_sockets,name);
+    if (cs) {
+        sdsfree(name);
+        cs->last_use_time = server.unixtime;
+        return cs->fd;
+    }
+
+    /* No cached socket, create one. */
+    if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
+        /* Too many items, drop one at random. */
+        dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
+        cs = dictGetVal(de);
+        close(cs->fd);
+        zfree(cs);
+        dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+    }
+
+    /* Create the socket */
+    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
+                atoi(c->argv[2]->ptr));
+    if (fd == -1) {
+        sdsfree(name);
+        addReplyErrorFormat(c,"Can't connect to target node: %s",
+            server.neterr);
+        return -1;
+    }
+    anetTcpNoDelay(server.neterr,fd);
+
+    /* Check if it connects within the specified timeout. */
+    if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
+        sdsfree(name);
+        addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
+        close(fd);
+        return -1;
+    }
+
+    /* Add to the cache and return it to the caller. */
+    cs = zmalloc(sizeof(*cs));
+    cs->fd = fd;
+    cs->last_use_time = server.unixtime;
+    dictAdd(server.migrate_cached_sockets,name,cs);
+    return fd;
+}
+
+/* Free a migrate cached connection. */
+void migrateCloseSocket(robj *host, robj *port) {
+    sds name = sdsempty();
+    migrateCachedSocket *cs;
+
+    name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+    name = sdscatlen(name,":",1);
+    name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+    cs = dictFetchValue(server.migrate_cached_sockets,name);
+    if (!cs) {
+        sdsfree(name);
+        return;
+    }
+
+    close(cs->fd);
+    zfree(cs);
+    dictDelete(server.migrate_cached_sockets,name);
+    sdsfree(name);
+}
+
+void migrateCloseTimedoutSockets(void) {
+    dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
+    dictEntry *de;
+
+    while((de = dictNext(di)) != NULL) {
+        migrateCachedSocket *cs = dictGetVal(de);
+
+        if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
+            close(cs->fd);
+            zfree(cs);
+            dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+        }
+    }
+    dictReleaseIterator(di);
+}
+
 /* MIGRATE host port key dbid timeout [COPY | REPLACE] */
 void migrateCommand(redisClient *c) {
     int fd, copy = 0, replace = 0, j;
@@ -1662,17 +1776,8 @@ void migrateCommand(redisClient *c) {
     }
     
     /* Connect */
-    fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
-                atoi(c->argv[2]->ptr));
-    if (fd == -1) {
-        addReplyErrorFormat(c,"Can't connect to target node: %s",
-            server.neterr);
-        return;
-    }
-    if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
-        addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
-        return;
-    }
+    fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
+    if (fd == -1) return; /* error sent to the client by migrateGetSocket() */
 
     /* Create RESTORE payload and generate the protocol to call the command. */
     rioInitWithBuffer(&cmd,sdsempty());
@@ -1749,19 +1854,18 @@ void migrateCommand(redisClient *c) {
     }
 
     sdsfree(cmd.io.buffer.ptr);
-    close(fd);
     return;
 
 socket_wr_err:
     addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
     sdsfree(cmd.io.buffer.ptr);
-    close(fd);
+    migrateCloseSocket(c->argv[1],c->argv[2]);
     return;
 
 socket_rd_err:
     addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n"));
     sdsfree(cmd.io.buffer.ptr);
-    close(fd);
+    migrateCloseSocket(c->argv[1],c->argv[2]);
     return;
 }
 
index 1e90edef688c9f3c4c8c284e82a2984a662b2a34..f0f3b6c0a0d1ba0f5be548e8cb666de304f219cd 100644 (file)
@@ -561,6 +561,16 @@ dictType clusterNodesDictType = {
     NULL                        /* val destructor */
 };
 
+/* Migrate cache dict type. */
+dictType migrateCacheDictType = {
+    dictSdsHash,                /* hash function */
+    NULL,                       /* key dup */
+    NULL,                       /* val dup */
+    dictSdsKeyCompare,          /* key compare */
+    dictSdsDestructor,          /* key destructor */
+    NULL                        /* val destructor */
+};
+
 int htNeedsResize(dict *dict) {
     long long size, used;
 
@@ -972,16 +982,21 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * to detect transfer failures. */
     run_with_period(1000) replicationCron();
 
-    /* Run other sub-systems specific cron jobs */
+    /* Run the Redis Cluster cron. */
     run_with_period(1000) {
         if (server.cluster_enabled) clusterCron();
     }
 
-    /* Run the sentinel timer if we are in sentinel mode. */
+    /* Run the Sentinel timer if we are in sentinel mode. */
     run_with_period(100) {
         if (server.sentinel_mode) sentinelTimer();
     }
 
+    /* Cleanup expired MIGRATE cached sockets. */
+    run_with_period(1000) {
+        migrateCloseTimedoutSockets();
+    }
+
     server.cronloops++;
     return 1000/REDIS_HZ;
 }
@@ -1149,6 +1164,7 @@ void initServerConfig() {
     server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
     server.lua_client = NULL;
     server.lua_timedout = 0;
+    server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
 
     updateLRUClock();
     resetServerSaveParams();
index 6b3d58e534ec4958eb5c2b7487e6e1b14379ce66..44ff03b67f0e5c3735369583b63466129c6246ec 100644 (file)
@@ -646,7 +646,8 @@ struct redisServer {
     list *clients_to_close;     /* Clients to close asynchronously */
     list *slaves, *monitors;    /* List of slaves and MONITORs */
     redisClient *current_client; /* Current client, only used on crash report */
-    char neterr[ANET_ERR_LEN];  /* Error buffer for anet.c */
+    char neterr[ANET_ERR_LEN];   /* Error buffer for anet.c */
+    dict *migrate_cached_sockets;/* MIGRATE cached sockets */
     /* RDB / AOF loading information */
     int loading;                /* We are loading data from disk if true */
     off_t loading_total_bytes;
@@ -1174,6 +1175,7 @@ int clusterAddNode(clusterNode *node);
 void clusterCron(void);
 clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
 void clusterPropagatePublish(robj *channel, robj *message);
+void migrateCloseTimedoutSockets(void);
 
 /* Sentinel */
 void initSentinelConfig(void);