X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/0d83011a11b42fe1b0d1bebb7b9a3318fd44f0e5..af0b220756571bc8faf57a0c7b7389dd86a60376:/src/cluster.c diff --git a/src/cluster.c b/src/cluster.c index 93f095c3..cbcdf373 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1,9 +1,40 @@ +/* Redis Cluster implementation. + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + #include "redis.h" #include "endianconv.h" #include #include #include +#include void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask); @@ -209,7 +240,7 @@ void clusterInit(void) { exit(1); } if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE, - clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event"); + clusterAcceptHandler, NULL) == AE_ERR) redisPanic("Unrecoverable error creating Redis Cluster file event."); server.cluster.slots_to_keys = zslCreate(); } @@ -901,7 +932,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { } else { payload = zmalloc(totlen); hdr = (clusterMsg*) payload; - memcpy(payload,hdr,sizeof(hdr)); + memcpy(payload,hdr,sizeof(*hdr)); } memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr)); memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr), @@ -1538,15 +1569,25 @@ void dumpCommand(redisClient *c) { return; } -/* RESTORE key ttl serialized-value */ +/* RESTORE key ttl serialized-value [REPLACE] */ void restoreCommand(redisClient *c) { long ttl; rio payload; - int type; + int j, type, replace = 0; robj *obj; + /* Parse additional options */ + for (j = 4; j < c->argc; j++) { + if (!strcasecmp(c->argv[j]->ptr,"replace")) { + replace = 1; + } else { + addReply(c,shared.syntaxerr); + return; + } + } + /* Make sure this key does not already exist here... */ - if (lookupKeyWrite(c->db,c->argv[1]) != NULL) { + if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) { addReplyError(c,"Target key name is busy."); return; } @@ -1573,6 +1614,9 @@ void restoreCommand(redisClient *c) { return; } + /* Remove the old key if needed. */ + if (replace) dbDelete(c->db,c->argv[1]); + /* Create the key and set the TTL if any */ dbAdd(c->db,c->argv[1],obj); if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl); @@ -1581,21 +1625,154 @@ void restoreCommand(redisClient *c) { server.dirty++; } -/* MIGRATE host port key dbid timeout */ -void migrateCommand(redisClient *c) { +/* MIGRATE socket cache implementation. + * + * We take a map between host:ip and a TCP socket that we used to connect + * to this instance in recent time. + * This sockets are closed when the max number we cache is reached, and also + * in serverCron() when they are around for more than a few seconds. */ +#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */ +#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached socekts after 10 sec. */ + +typedef struct migrateCachedSocket { int fd; + time_t last_use_time; +} migrateCachedSocket; + +/* Return a TCP scoket connected with the target instance, possibly returning + * a cached one. + * + * This function is responsible of sending errors to the client if a + * connection can't be established. In this case -1 is returned. + * Otherwise on success the socket is returned, and the caller should not + * attempt to free it after usage. + * + * If the caller detects an error while using the socket, migrateCloseSocket() + * should be called so that the connection will be craeted from scratch + * the next time. */ +int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) { + int fd; + sds name = sdsempty(); + migrateCachedSocket *cs; + + /* Check if we have an already cached socket for this ip:port pair. */ + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (cs) { + sdsfree(name); + cs->last_use_time = server.unixtime; + return cs->fd; + } + + /* No cached socket, create one. */ + if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) { + /* Too many items, drop one at random. */ + dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets); + cs = dictGetVal(de); + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + + /* Create the socket */ + fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, + atoi(c->argv[2]->ptr)); + if (fd == -1) { + sdsfree(name); + addReplyErrorFormat(c,"Can't connect to target node: %s", + server.neterr); + return -1; + } + anetTcpNoDelay(server.neterr,fd); + + /* Check if it connects within the specified timeout. */ + if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) { + sdsfree(name); + addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n")); + close(fd); + return -1; + } + + /* Add to the cache and return it to the caller. */ + cs = zmalloc(sizeof(*cs)); + cs->fd = fd; + cs->last_use_time = server.unixtime; + dictAdd(server.migrate_cached_sockets,name,cs); + return fd; +} + +/* Free a migrate cached connection. */ +void migrateCloseSocket(robj *host, robj *port) { + sds name = sdsempty(); + migrateCachedSocket *cs; + + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (!cs) { + sdsfree(name); + return; + } + + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,name); + sdsfree(name); +} + +void migrateCloseTimedoutSockets(void) { + dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + migrateCachedSocket *cs = dictGetVal(de); + + if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) { + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + } + dictReleaseIterator(di); +} + +/* MIGRATE host port key dbid timeout [COPY | REPLACE] */ +void migrateCommand(redisClient *c) { + int fd, copy, replace, j; long timeout; long dbid; - long long ttl = 0, expireat; + long long ttl, expireat; robj *o; rio cmd, payload; + int retry_num = 0; + +try_again: + /* Initialization */ + copy = 0; + replace = 0; + ttl = 0; + + /* Parse additional options */ + for (j = 6; j < c->argc; j++) { + if (!strcasecmp(c->argv[j]->ptr,"copy")) { + copy = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { + replace = 1; + } else { + addReply(c,shared.syntaxerr); + return; + } + } /* Sanity check */ if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK) return; if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK) return; - if (timeout <= 0) timeout = 1; + if (timeout <= 0) timeout = 1000; /* Check if the key is here. If not we reply with success as there is * nothing to migrate (for instance the key expired in the meantime), but @@ -1606,17 +1783,8 @@ void migrateCommand(redisClient *c) { } /* Connect */ - fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, - atoi(c->argv[2]->ptr)); - if (fd == -1) { - addReplyErrorFormat(c,"Can't connect to target node: %s", - server.neterr); - return; - } - if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { - addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n")); - return; - } + fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); + if (fd == -1) return; /* error sent to the client by migrateGetSocket() */ /* Create RESTORE payload and generate the protocol to call the command. */ rioInitWithBuffer(&cmd,sdsempty()); @@ -1629,20 +1797,26 @@ void migrateCommand(redisClient *c) { ttl = expireat-mstime(); if (ttl < 1) ttl = 1; } - redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4)); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW); redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); - /* Finally the last argument that is the serailized object payload - * in the DUMP format. */ + /* Emit the payload argument, that is the serailized object using + * the DUMP format. */ createDumpPayload(&payload,o); redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr))); sdsfree(payload.io.buffer.ptr); + /* Add the REPLACE option to the RESTORE command if it was specified + * as a MIGRATE option. */ + if (replace) + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); + /* Tranfer the query to the other node in 64K chunks. */ + errno = 0; { sds buf = cmd.io.buffer.ptr; size_t pos = 0, towrite; @@ -1672,8 +1846,11 @@ void migrateCommand(redisClient *c) { } else { robj *aux; - dbDelete(c->db,c->argv[3]); - signalModifiedKey(c->db,c->argv[3]); + if (!copy) { + /* No COPY option: remove the local key, signal the change. */ + dbDelete(c->db,c->argv[3]); + signalModifiedKey(c->db,c->argv[3]); + } addReply(c,shared.ok); server.dirty++; @@ -1685,19 +1862,22 @@ void migrateCommand(redisClient *c) { } sdsfree(cmd.io.buffer.ptr); - close(fd); return; socket_wr_err: - addReplySds(c,sdsnew("-IOERR error or timeout writing to target instance\r\n")); sdsfree(cmd.io.buffer.ptr); - close(fd); + migrateCloseSocket(c->argv[1],c->argv[2]); + if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again; + addReplySds(c, + sdsnew("-IOERR error or timeout writing to target instance\r\n")); return; socket_rd_err: - addReplySds(c,sdsnew("-IOERR error or timeout reading from target node\r\n")); sdsfree(cmd.io.buffer.ptr); - close(fd); + migrateCloseSocket(c->argv[1],c->argv[2]); + if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again; + addReplySds(c, + sdsnew("-IOERR error or timeout reading from target node\r\n")); return; }