From e23d281e489c3cda134803b2e6302d1fc27e5948 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 12 Nov 2012 00:45:10 +0100 Subject: [PATCH] MIGRATE TCP connections caching. By caching TCP connections used by MIGRATE to chat with other Redis instances a 5x performance improvement was measured with redis-benchmark against small keys. This can dramatically speedup cluster resharding and other processes where an high load of MIGRATE commands are used. --- src/cluster.c | 132 ++++++++++++++++++++++++++++++++++++++++++++------ src/redis.c | 20 +++++++- src/redis.h | 4 +- 3 files changed, 139 insertions(+), 17 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 49871578..3b560cb0 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1625,6 +1625,120 @@ void restoreCommand(redisClient *c) { server.dirty++; } +/* MIGRATE socket cache implementation. + * + * We take a map between host:ip and a TCP socket that we used to connect + * to this instance in recent time. + * This sockets are closed when the max number we cache is reached, and also + * in serverCron() when they are around for more than a few seconds. */ +#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */ +#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached socekts after 10 sec. */ + +typedef struct migrateCachedSocket { + int fd; + time_t last_use_time; +} migrateCachedSocket; + +/* Return a TCP scoket connected with the target instance, possibly returning + * a cached one. + * + * This function is responsible of sending errors to the client if a + * connection can't be established. In this case -1 is returned. + * Otherwise on success the socket is returned, and the caller should not + * attempt to free it after usage. + * + * If the caller detects an error while using the socket, migrateCloseSocket() + * should be called so that the connection will be craeted from scratch + * the next time. */ +int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) { + int fd; + sds name = sdsempty(); + migrateCachedSocket *cs; + + /* Check if we have an already cached socket for this ip:port pair. */ + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (cs) { + sdsfree(name); + cs->last_use_time = server.unixtime; + return cs->fd; + } + + /* No cached socket, create one. */ + if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) { + /* Too many items, drop one at random. */ + dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets); + cs = dictGetVal(de); + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + + /* Create the socket */ + fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, + atoi(c->argv[2]->ptr)); + if (fd == -1) { + sdsfree(name); + addReplyErrorFormat(c,"Can't connect to target node: %s", + server.neterr); + return -1; + } + anetTcpNoDelay(server.neterr,fd); + + /* Check if it connects within the specified timeout. */ + if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { + sdsfree(name); + addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n")); + close(fd); + return -1; + } + + /* Add to the cache and return it to the caller. */ + cs = zmalloc(sizeof(*cs)); + cs->fd = fd; + cs->last_use_time = server.unixtime; + dictAdd(server.migrate_cached_sockets,name,cs); + return fd; +} + +/* Free a migrate cached connection. */ +void migrateCloseSocket(robj *host, robj *port) { + sds name = sdsempty(); + migrateCachedSocket *cs; + + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (!cs) { + sdsfree(name); + return; + } + + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,name); + sdsfree(name); +} + +void migrateCloseTimedoutSockets(void) { + dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + migrateCachedSocket *cs = dictGetVal(de); + + if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) { + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + } + dictReleaseIterator(di); +} + /* MIGRATE host port key dbid timeout [COPY | REPLACE] */ void migrateCommand(redisClient *c) { int fd, copy = 0, replace = 0, j; @@ -1662,17 +1776,8 @@ void migrateCommand(redisClient *c) { } /* Connect */ - fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, - atoi(c->argv[2]->ptr)); - if (fd == -1) { - addReplyErrorFormat(c,"Can't connect to target node: %s", - server.neterr); - return; - } - if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { - addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n")); - return; - } + fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); + if (fd == -1) return; /* error sent to the client by migrateGetSocket() */ /* Create RESTORE payload and generate the protocol to call the command. */ rioInitWithBuffer(&cmd,sdsempty()); @@ -1749,19 +1854,18 @@ void migrateCommand(redisClient *c) { } sdsfree(cmd.io.buffer.ptr); - close(fd); return; socket_wr_err: addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n")); sdsfree(cmd.io.buffer.ptr); - close(fd); + migrateCloseSocket(c->argv[1],c->argv[2]); return; socket_rd_err: addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n")); sdsfree(cmd.io.buffer.ptr); - close(fd); + migrateCloseSocket(c->argv[1],c->argv[2]); return; } diff --git a/src/redis.c b/src/redis.c index 1e90edef..f0f3b6c0 100644 --- a/src/redis.c +++ b/src/redis.c @@ -561,6 +561,16 @@ dictType clusterNodesDictType = { NULL /* val destructor */ }; +/* Migrate cache dict type. */ +dictType migrateCacheDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL /* val destructor */ +}; + int htNeedsResize(dict *dict) { long long size, used; @@ -972,16 +982,21 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * to detect transfer failures. */ run_with_period(1000) replicationCron(); - /* Run other sub-systems specific cron jobs */ + /* Run the Redis Cluster cron. */ run_with_period(1000) { if (server.cluster_enabled) clusterCron(); } - /* Run the sentinel timer if we are in sentinel mode. */ + /* Run the Sentinel timer if we are in sentinel mode. */ run_with_period(100) { if (server.sentinel_mode) sentinelTimer(); } + /* Cleanup expired MIGRATE cached sockets. */ + run_with_period(1000) { + migrateCloseTimedoutSockets(); + } + server.cronloops++; return 1000/REDIS_HZ; } @@ -1149,6 +1164,7 @@ void initServerConfig() { server.lua_time_limit = REDIS_LUA_TIME_LIMIT; server.lua_client = NULL; server.lua_timedout = 0; + server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); updateLRUClock(); resetServerSaveParams(); diff --git a/src/redis.h b/src/redis.h index 6b3d58e5..44ff03b6 100644 --- a/src/redis.h +++ b/src/redis.h @@ -646,7 +646,8 @@ struct redisServer { list *clients_to_close; /* Clients to close asynchronously */ list *slaves, *monitors; /* List of slaves and MONITORs */ redisClient *current_client; /* Current client, only used on crash report */ - char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ + char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ + dict *migrate_cached_sockets;/* MIGRATE cached sockets */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ off_t loading_total_bytes; @@ -1174,6 +1175,7 @@ int clusterAddNode(clusterNode *node); void clusterCron(void); clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); void clusterPropagatePublish(robj *channel, robj *message); +void migrateCloseTimedoutSockets(void); /* Sentinel */ void initSentinelConfig(void); -- 2.47.2