server.dirty++;
}
+/* MIGRATE socket cache implementation.
+ *
+ * We take a map between host:ip and a TCP socket that we used to connect
+ * to this instance in recent time.
+ * This sockets are closed when the max number we cache is reached, and also
+ * in serverCron() when they are around for more than a few seconds. */
+#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
+#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached socekts after 10 sec. */
+
+typedef struct migrateCachedSocket {
+ int fd;
+ time_t last_use_time;
+} migrateCachedSocket;
+
+/* Return a TCP scoket connected with the target instance, possibly returning
+ * a cached one.
+ *
+ * This function is responsible of sending errors to the client if a
+ * connection can't be established. In this case -1 is returned.
+ * Otherwise on success the socket is returned, and the caller should not
+ * attempt to free it after usage.
+ *
+ * If the caller detects an error while using the socket, migrateCloseSocket()
+ * should be called so that the connection will be craeted from scratch
+ * the next time. */
+int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) {
+ int fd;
+ sds name = sdsempty();
+ migrateCachedSocket *cs;
+
+ /* Check if we have an already cached socket for this ip:port pair. */
+ name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+ name = sdscatlen(name,":",1);
+ name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+ cs = dictFetchValue(server.migrate_cached_sockets,name);
+ if (cs) {
+ sdsfree(name);
+ cs->last_use_time = server.unixtime;
+ return cs->fd;
+ }
+
+ /* No cached socket, create one. */
+ if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
+ /* Too many items, drop one at random. */
+ dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
+ cs = dictGetVal(de);
+ close(cs->fd);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+ }
+
+ /* Create the socket */
+ fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
+ atoi(c->argv[2]->ptr));
+ if (fd == -1) {
+ sdsfree(name);
+ addReplyErrorFormat(c,"Can't connect to target node: %s",
+ server.neterr);
+ return -1;
+ }
+ anetTcpNoDelay(server.neterr,fd);
+
+ /* Check if it connects within the specified timeout. */
+ if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
+ sdsfree(name);
+ addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
+ close(fd);
+ return -1;
+ }
+
+ /* Add to the cache and return it to the caller. */
+ cs = zmalloc(sizeof(*cs));
+ cs->fd = fd;
+ cs->last_use_time = server.unixtime;
+ dictAdd(server.migrate_cached_sockets,name,cs);
+ return fd;
+}
+
+/* Free a migrate cached connection. */
+void migrateCloseSocket(robj *host, robj *port) {
+ sds name = sdsempty();
+ migrateCachedSocket *cs;
+
+ name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+ name = sdscatlen(name,":",1);
+ name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+ cs = dictFetchValue(server.migrate_cached_sockets,name);
+ if (!cs) {
+ sdsfree(name);
+ return;
+ }
+
+ close(cs->fd);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,name);
+ sdsfree(name);
+}
+
+void migrateCloseTimedoutSockets(void) {
+ dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ migrateCachedSocket *cs = dictGetVal(de);
+
+ if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
+ close(cs->fd);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+ }
+ }
+ dictReleaseIterator(di);
+}
+
/* MIGRATE host port key dbid timeout [COPY | REPLACE] */
void migrateCommand(redisClient *c) {
int fd, copy = 0, replace = 0, j;
}
/* Connect */
- fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
- atoi(c->argv[2]->ptr));
- if (fd == -1) {
- addReplyErrorFormat(c,"Can't connect to target node: %s",
- server.neterr);
- return;
- }
- if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
- addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
- return;
- }
+ fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
+ if (fd == -1) return; /* error sent to the client by migrateGetSocket() */
/* Create RESTORE payload and generate the protocol to call the command. */
rioInitWithBuffer(&cmd,sdsempty());
}
sdsfree(cmd.io.buffer.ptr);
- close(fd);
return;
socket_wr_err:
addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
sdsfree(cmd.io.buffer.ptr);
- close(fd);
+ migrateCloseSocket(c->argv[1],c->argv[2]);
return;
socket_rd_err:
addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n"));
sdsfree(cmd.io.buffer.ptr);
- close(fd);
+ migrateCloseSocket(c->argv[1],c->argv[2]);
return;
}
NULL /* val destructor */
};
+/* Migrate cache dict type. */
+dictType migrateCacheDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ dictSdsDestructor, /* key destructor */
+ NULL /* val destructor */
+};
+
int htNeedsResize(dict *dict) {
long long size, used;
* to detect transfer failures. */
run_with_period(1000) replicationCron();
- /* Run other sub-systems specific cron jobs */
+ /* Run the Redis Cluster cron. */
run_with_period(1000) {
if (server.cluster_enabled) clusterCron();
}
- /* Run the sentinel timer if we are in sentinel mode. */
+ /* Run the Sentinel timer if we are in sentinel mode. */
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
+ /* Cleanup expired MIGRATE cached sockets. */
+ run_with_period(1000) {
+ migrateCloseTimedoutSockets();
+ }
+
server.cronloops++;
return 1000/REDIS_HZ;
}
server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
server.lua_client = NULL;
server.lua_timedout = 0;
+ server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
updateLRUClock();
resetServerSaveParams();