+/* Redis Cluster implementation.
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
#include "redis.h"
#include "endianconv.h"
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
+#include <sys/socket.h>
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
exit(1);
}
if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE,
- clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event");
+ clusterAcceptHandler, NULL) == AE_ERR) redisPanic("Unrecoverable error creating Redis Cluster file event.");
server.cluster.slots_to_keys = zslCreate();
}
} else {
payload = zmalloc(totlen);
hdr = (clusterMsg*) payload;
- memcpy(payload,hdr,sizeof(hdr));
+ memcpy(payload,hdr,sizeof(*hdr));
}
memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
return;
}
-/* RESTORE key ttl serialized-value */
+/* RESTORE key ttl serialized-value [REPLACE] */
void restoreCommand(redisClient *c) {
long ttl;
rio payload;
- int type;
+ int j, type, replace = 0;
robj *obj;
+ /* Parse additional options */
+ for (j = 4; j < c->argc; j++) {
+ if (!strcasecmp(c->argv[j]->ptr,"replace")) {
+ replace = 1;
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
+
/* Make sure this key does not already exist here... */
- if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
+ if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
addReplyError(c,"Target key name is busy.");
return;
}
return;
}
+ /* Remove the old key if needed. */
+ if (replace) dbDelete(c->db,c->argv[1]);
+
/* Create the key and set the TTL if any */
dbAdd(c->db,c->argv[1],obj);
if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl);
server.dirty++;
}
-/* MIGRATE host port key dbid timeout */
-void migrateCommand(redisClient *c) {
+/* MIGRATE socket cache implementation.
+ *
+ * We take a map between host:ip and a TCP socket that we used to connect
+ * to this instance in recent time.
+ * This sockets are closed when the max number we cache is reached, and also
+ * in serverCron() when they are around for more than a few seconds. */
+#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
+#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached socekts after 10 sec. */
+
+typedef struct migrateCachedSocket {
int fd;
+ time_t last_use_time;
+} migrateCachedSocket;
+
+/* Return a TCP scoket connected with the target instance, possibly returning
+ * a cached one.
+ *
+ * This function is responsible of sending errors to the client if a
+ * connection can't be established. In this case -1 is returned.
+ * Otherwise on success the socket is returned, and the caller should not
+ * attempt to free it after usage.
+ *
+ * If the caller detects an error while using the socket, migrateCloseSocket()
+ * should be called so that the connection will be craeted from scratch
+ * the next time. */
+int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) {
+ int fd;
+ sds name = sdsempty();
+ migrateCachedSocket *cs;
+
+ /* Check if we have an already cached socket for this ip:port pair. */
+ name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+ name = sdscatlen(name,":",1);
+ name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+ cs = dictFetchValue(server.migrate_cached_sockets,name);
+ if (cs) {
+ sdsfree(name);
+ cs->last_use_time = server.unixtime;
+ return cs->fd;
+ }
+
+ /* No cached socket, create one. */
+ if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
+ /* Too many items, drop one at random. */
+ dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
+ cs = dictGetVal(de);
+ close(cs->fd);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+ }
+
+ /* Create the socket */
+ fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
+ atoi(c->argv[2]->ptr));
+ if (fd == -1) {
+ sdsfree(name);
+ addReplyErrorFormat(c,"Can't connect to target node: %s",
+ server.neterr);
+ return -1;
+ }
+ anetTcpNoDelay(server.neterr,fd);
+
+ /* Check if it connects within the specified timeout. */
+ if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) {
+ sdsfree(name);
+ addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
+ close(fd);
+ return -1;
+ }
+
+ /* Add to the cache and return it to the caller. */
+ cs = zmalloc(sizeof(*cs));
+ cs->fd = fd;
+ cs->last_use_time = server.unixtime;
+ dictAdd(server.migrate_cached_sockets,name,cs);
+ return fd;
+}
+
+/* Free a migrate cached connection. */
+void migrateCloseSocket(robj *host, robj *port) {
+ sds name = sdsempty();
+ migrateCachedSocket *cs;
+
+ name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+ name = sdscatlen(name,":",1);
+ name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+ cs = dictFetchValue(server.migrate_cached_sockets,name);
+ if (!cs) {
+ sdsfree(name);
+ return;
+ }
+
+ close(cs->fd);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,name);
+ sdsfree(name);
+}
+
+void migrateCloseTimedoutSockets(void) {
+ dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets);
+ dictEntry *de;
+
+ while((de = dictNext(di)) != NULL) {
+ migrateCachedSocket *cs = dictGetVal(de);
+
+ if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
+ close(cs->fd);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+ }
+ }
+ dictReleaseIterator(di);
+}
+
+/* MIGRATE host port key dbid timeout [COPY | REPLACE] */
+void migrateCommand(redisClient *c) {
+ int fd, copy, replace, j;
long timeout;
long dbid;
- long long ttl = 0, expireat;
+ long long ttl, expireat;
robj *o;
rio cmd, payload;
+ int retry_num = 0;
+
+try_again:
+ /* Initialization */
+ copy = 0;
+ replace = 0;
+ ttl = 0;
+
+ /* Parse additional options */
+ for (j = 6; j < c->argc; j++) {
+ if (!strcasecmp(c->argv[j]->ptr,"copy")) {
+ copy = 1;
+ } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
+ replace = 1;
+ } else {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+ }
/* Sanity check */
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
return;
if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK)
return;
- if (timeout <= 0) timeout = 1;
+ if (timeout <= 0) timeout = 1000;
/* Check if the key is here. If not we reply with success as there is
* nothing to migrate (for instance the key expired in the meantime), but
}
/* Connect */
- fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr,
- atoi(c->argv[2]->ptr));
- if (fd == -1) {
- addReplyErrorFormat(c,"Can't connect to target node: %s",
- server.neterr);
- return;
- }
- if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) {
- addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n"));
- return;
- }
+ fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
+ if (fd == -1) return; /* error sent to the client by migrateGetSocket() */
/* Create RESTORE payload and generate the protocol to call the command. */
rioInitWithBuffer(&cmd,sdsempty());
ttl = expireat-mstime();
if (ttl < 1) ttl = 1;
}
- redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4));
+ redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW);
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
- /* Finally the last argument that is the serailized object payload
- * in the DUMP format. */
+ /* Emit the payload argument, that is the serailized object using
+ * the DUMP format. */
createDumpPayload(&payload,o);
redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));
sdsfree(payload.io.buffer.ptr);
+ /* Add the REPLACE option to the RESTORE command if it was specified
+ * as a MIGRATE option. */
+ if (replace)
+ redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
+
/* Tranfer the query to the other node in 64K chunks. */
+ errno = 0;
{
sds buf = cmd.io.buffer.ptr;
size_t pos = 0, towrite;
} else {
robj *aux;
- dbDelete(c->db,c->argv[3]);
- signalModifiedKey(c->db,c->argv[3]);
+ if (!copy) {
+ /* No COPY option: remove the local key, signal the change. */
+ dbDelete(c->db,c->argv[3]);
+ signalModifiedKey(c->db,c->argv[3]);
+ }
addReply(c,shared.ok);
server.dirty++;
}
sdsfree(cmd.io.buffer.ptr);
- close(fd);
return;
socket_wr_err:
- addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n"));
sdsfree(cmd.io.buffer.ptr);
- close(fd);
+ migrateCloseSocket(c->argv[1],c->argv[2]);
+ if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
+ addReplySds(c,
+ sdsnew("-IOERR error or timeout writing to target instance\r\n"));
return;
socket_rd_err:
- addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n"));
sdsfree(cmd.io.buffer.ptr);
- close(fd);
+ migrateCloseSocket(c->argv[1],c->argv[2]);
+ if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again;
+ addReplySds(c,
+ sdsnew("-IOERR error or timeout reading from target node\r\n"));
return;
}