X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/994ed2bc552f4114b1f0c8dd3fd8aefaec6beeae..af0b220756571bc8faf57a0c7b7389dd86a60376:/src/cluster.c diff --git a/src/cluster.c b/src/cluster.c index e608c420..cbcdf373 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1,8 +1,40 @@ +/* Redis Cluster implementation. + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + #include "redis.h" +#include "endianconv.h" #include #include #include +#include void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask); @@ -19,20 +51,6 @@ int clusterAddSlot(clusterNode *n, int slot); * Initialization * -------------------------------------------------------------------------- */ -void clusterGetRandomName(char *p) { - FILE *fp = fopen("/dev/urandom","r"); - char *charset = "0123456789abcdef"; - int j; - - if (fp == NULL || fread(p,REDIS_CLUSTER_NAMELEN,1,fp) == 0) { - for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++) - p[j] = rand(); - } - for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++) - p[j] = charset[p[j] & 0x0F]; - fclose(fp); -} - int clusterLoadConfig(char *filename) { FILE *fp = fopen(filename,"r"); char *line; @@ -222,7 +240,7 @@ void clusterInit(void) { exit(1); } if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE, - clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event"); + clusterAcceptHandler, NULL) == AE_ERR) redisPanic("Unrecoverable error creating Redis Cluster file event."); server.cluster.slots_to_keys = zslCreate(); } @@ -304,7 +322,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { if (nodename) memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN); else - clusterGetRandomName(node->name); + getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN); node->flags = flags; memset(node->slots,0,sizeof(node->slots)); node->numslaves = 0; @@ -377,7 +395,7 @@ clusterNode *clusterLookupNode(char *name) { de = dictFind(server.cluster.nodes,s); sdsfree(s); if (de == NULL) return NULL; - return dictGetEntryVal(de); + return dictGetVal(de); } /* This is only used after the handshake. When we connect a given IP/PORT @@ -439,7 +457,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { * time PONG figure if it is newer than our figure. * Note that it's not a problem if we have a PING already * in progress against this node. */ - if (node->pong_received < ntohl(g->pong_received)) { + if (node->pong_received < (signed) ntohl(g->pong_received)) { redisLog(REDIS_DEBUG,"Node pong_received updated by gossip"); node->pong_received = ntohl(g->pong_received); } @@ -493,6 +511,7 @@ void nodeIp2String(char *buf, clusterLink *link) { /* Update the node address to the IP address that can be extracted * from link->fd, and at the specified port. */ void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) { + /* TODO */ } /* When this function is called, there is a packet to process starting @@ -510,8 +529,10 @@ int clusterProcessPacket(clusterLink *link) { uint16_t type = ntohs(hdr->type); clusterNode *sender; - redisLog(REDIS_DEBUG,"--- packet to process %lu bytes (%lu) ---", - (unsigned long) totlen, sdslen(link->rcvbuf)); + redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", + type, (unsigned long) totlen); + + /* Perform sanity checks */ if (totlen < 8) return 1; if (totlen > sdslen(link->rcvbuf)) return 1; if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || @@ -530,7 +551,16 @@ int clusterProcessPacket(clusterLink *link) { explen += sizeof(clusterMsgDataFail); if (totlen != explen) return 1; } + if (type == CLUSTERMSG_TYPE_PUBLISH) { + uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + + explen += sizeof(clusterMsgDataPublish) + + ntohl(hdr->data.publish.msg.channel_len) + + ntohl(hdr->data.publish.msg.message_len); + if (totlen != explen) return 1; + } + /* Ready to process the packet. Dispatch by type. */ sender = clusterLookupNode(hdr->sender); if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { int update_config = 0; @@ -601,7 +631,7 @@ int clusterProcessPacket(clusterLink *link) { } } /* Update our info about the node */ - link->node->pong_received = time(NULL); + if (link->node) link->node->pong_received = time(NULL); /* Update master/slave info */ if (sender) { @@ -664,8 +694,24 @@ int clusterProcessPacket(clusterLink *link) { clusterUpdateState(); clusterSaveConfigOrDie(); } + } else if (type == CLUSTERMSG_TYPE_PUBLISH) { + robj *channel, *message; + uint32_t channel_len, message_len; + + /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */ + if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) { + channel_len = ntohl(hdr->data.publish.msg.channel_len); + message_len = ntohl(hdr->data.publish.msg.message_len); + channel = createStringObject( + (char*)hdr->data.publish.msg.bulk_data,channel_len); + message = createStringObject( + (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len); + pubsubPublishMessage(channel,message); + decrRefCount(channel); + decrRefCount(message); + } } else { - redisLog(REDIS_NOTICE,"Received unknown packet type: %d", type); + redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); } return 1; } @@ -758,9 +804,25 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); } +/* Send a message to all the nodes with a reliable link */ +void clusterBroadcastMessage(void *buf, size_t len) { + dictIterator *di; + dictEntry *de; + + di = dictGetIterator(server.cluster.nodes); + while((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + + if (!node->link) continue; + if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue; + clusterSendMessage(node->link,buf,len); + } + dictReleaseIterator(di); +} + /* Build the message header */ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { - int totlen; + int totlen = 0; memset(hdr,0,sizeof(*hdr)); hdr->type = htons(type); @@ -805,7 +867,7 @@ void clusterSendPing(clusterLink *link, int type) { /* Populate the gossip fields */ while(freshnodes > 0 && gossipcount < 3) { struct dictEntry *de = dictGetRandomKey(server.cluster.nodes); - clusterNode *this = dictGetEntryVal(de); + clusterNode *this = dictGetVal(de); clusterMsgDataGossip *gossip; int j; @@ -842,20 +904,48 @@ void clusterSendPing(clusterLink *link, int type) { clusterSendMessage(link,buf,totlen); } -/* Send a message to all the nodes with a reliable link */ -void clusterBroadcastMessage(void *buf, size_t len) { - dictIterator *di; - dictEntry *de; +/* Send a PUBLISH message. + * + * If link is NULL, then the message is broadcasted to the whole cluster. */ +void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { + unsigned char buf[4096], *payload; + clusterMsg *hdr = (clusterMsg*) buf; + uint32_t totlen; + uint32_t channel_len, message_len; - di = dictGetIterator(server.cluster.nodes); - while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + channel = getDecodedObject(channel); + message = getDecodedObject(message); + channel_len = sdslen(channel->ptr); + message_len = sdslen(message->ptr); - if (!node->link) continue; - if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue; - clusterSendMessage(node->link,buf,len); + clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH); + totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + totlen += sizeof(clusterMsgDataPublish) + channel_len + message_len; + + hdr->data.publish.msg.channel_len = htonl(channel_len); + hdr->data.publish.msg.message_len = htonl(message_len); + hdr->totlen = htonl(totlen); + + /* Try to use the local buffer if possible */ + if (totlen < sizeof(buf)) { + payload = buf; + } else { + payload = zmalloc(totlen); + hdr = (clusterMsg*) payload; + memcpy(payload,hdr,sizeof(*hdr)); } - dictReleaseIterator(di); + memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr)); + memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr), + message->ptr,sdslen(message->ptr)); + + if (link) + clusterSendMessage(link,payload,totlen); + else + clusterBroadcastMessage(payload,totlen); + + decrRefCount(channel); + decrRefCount(message); + if (payload != buf) zfree(payload); } /* Send a FAIL message to all the nodes we are able to contact. @@ -872,6 +962,17 @@ void clusterSendFail(char *nodename) { clusterBroadcastMessage(buf,ntohl(hdr->totlen)); } +/* ----------------------------------------------------------------------------- + * CLUSTER Pub/Sub support + * + * For now we do very little, just propagating PUBLISH messages across the whole + * cluster. In the future we'll try to get smarter and avoiding propagating those + * messages to hosts without receives for a given channel. + * -------------------------------------------------------------------------- */ +void clusterPropagatePublish(robj *channel, robj *message) { + clusterSendPublish(NULL, channel, message); +} + /* ----------------------------------------------------------------------------- * CLUSTER cron job * -------------------------------------------------------------------------- */ @@ -887,7 +988,7 @@ void clusterCron(void) { /* Check if we have disconnected nodes and reestablish the connection. */ di = dictGetIterator(server.cluster.nodes); while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + clusterNode *node = dictGetVal(de); if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue; if (node->link == NULL) { @@ -922,7 +1023,7 @@ void clusterCron(void) { * the oldest ping_sent time */ for (j = 0; j < 5; j++) { de = dictGetRandomKey(server.cluster.nodes); - clusterNode *this = dictGetEntryVal(de); + clusterNode *this = dictGetVal(de); if (this->link == NULL) continue; if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue; @@ -939,7 +1040,7 @@ void clusterCron(void) { /* Iterate nodes to check if we need to flag something as failing */ di = dictGetIterator(server.cluster.nodes); while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + clusterNode *node = dictGetVal(de); int delay; if (node->flags & @@ -1070,7 +1171,7 @@ sds clusterGenNodesDescription(void) { di = dictGetIterator(server.cluster.nodes); while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + clusterNode *node = dictGetVal(de); /* Node coordinates */ ci = sdscatprintf(ci,"%.40s %s:%d ", @@ -1099,7 +1200,8 @@ sds clusterGenNodesDescription(void) { ci = sdscatprintf(ci,"%ld %ld %s", (long) node->ping_sent, (long) node->pong_received, - node->link ? "connected" : "disconnected"); + (node->link || node->flags & REDIS_NODE_MYSELF) ? + "connected" : "disconnected"); /* Slots served by this instance */ start = -1; @@ -1191,7 +1293,10 @@ void clusterCommand(redisClient *c) { addReplyBulk(c,o); decrRefCount(o); } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") || - !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) { + !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) + { + /* CLUSTER ADDSLOTS [slot] ... */ + /* CLUSTER DELSLOTS [slot] ... */ int j, slot; unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS); int del = !strcasecmp(c->argv[1]->ptr,"delslots"); @@ -1231,7 +1336,7 @@ void clusterCommand(redisClient *c) { retval = del ? clusterDelSlot(j) : clusterAddSlot(server.cluster.myself,j); - redisAssert(retval == REDIS_OK); + redisAssertWithInfo(c,NULL,retval == REDIS_OK); } } zfree(slots); @@ -1239,9 +1344,10 @@ void clusterCommand(redisClient *c) { clusterSaveConfigOrDie(); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) { - /* SETSLOT 10 MIGRATING */ - /* SETSLOT 10 IMPORTING */ + /* SETSLOT 10 MIGRATING */ + /* SETSLOT 10 IMPORTING */ /* SETSLOT 10 STABLE */ + /* SETSLOT 10 NODE */ int slot; clusterNode *n; @@ -1274,7 +1380,7 @@ void clusterCommand(redisClient *c) { /* CLUSTER SETSLOT STABLE */ server.cluster.importing_slots_from[slot] = NULL; server.cluster.migrating_slots_to[slot] = NULL; - } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 4) { + } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) { /* CLUSTER SETSLOT NODE */ clusterNode *n = clusterLookupNode(c->argv[4]->ptr); @@ -1291,7 +1397,7 @@ void clusterCommand(redisClient *c) { keys = zmalloc(sizeof(robj*)*1); numkeys = GetKeysInSlot(slot, keys, 1); zfree(keys); - if (numkeys == 0) { + if (numkeys != 0) { addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot); return; } @@ -1303,6 +1409,11 @@ void clusterCommand(redisClient *c) { server.cluster.migrating_slots_to[slot]) server.cluster.migrating_slots_to[slot] = NULL; + /* If this node was importing this slot, assigning the slot to + * itself also clears the importing status. */ + if (n == server.cluster.myself && server.cluster.importing_slots_from[slot]) + server.cluster.importing_slots_from[slot] = NULL; + clusterDelSlot(slot); clusterAddSlot(n,slot); } else { @@ -1378,19 +1489,105 @@ void clusterCommand(redisClient *c) { } /* ----------------------------------------------------------------------------- - * RESTORE and MIGRATE commands + * DUMP, RESTORE and MIGRATE commands * -------------------------------------------------------------------------- */ -/* RESTORE key ttl serialized-value */ +/* Generates a DUMP-format representation of the object 'o', adding it to the + * io stream pointed by 'rio'. This function can't fail. */ +void createDumpPayload(rio *payload, robj *o) { + unsigned char buf[2]; + uint64_t crc; + + /* Serialize the object in a RDB-like format. It consist of an object type + * byte followed by the serialized object. This is understood by RESTORE. */ + rioInitWithBuffer(payload,sdsempty()); + redisAssert(rdbSaveObjectType(payload,o)); + redisAssert(rdbSaveObject(payload,o)); + + /* Write the footer, this is how it looks like: + * ----------------+---------------------+---------------+ + * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | + * ----------------+---------------------+---------------+ + * RDB version and CRC are both in little endian. + */ + + /* RDB version */ + buf[0] = REDIS_RDB_VERSION & 0xff; + buf[1] = (REDIS_RDB_VERSION >> 8) & 0xff; + payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2); + + /* CRC64 */ + crc = crc64(0,(unsigned char*)payload->io.buffer.ptr, + sdslen(payload->io.buffer.ptr)); + memrev64ifbe(&crc); + payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8); +} + +/* Verify that the RDB version of the dump payload matches the one of this Redis + * instance and that the checksum is ok. + * If the DUMP payload looks valid REDIS_OK is returned, otherwise REDIS_ERR + * is returned. */ +int verifyDumpPayload(unsigned char *p, size_t len) { + unsigned char *footer; + uint16_t rdbver; + uint64_t crc; + + /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */ + if (len < 10) return REDIS_ERR; + footer = p+(len-10); + + /* Verify RDB version */ + rdbver = (footer[1] << 8) | footer[0]; + if (rdbver != REDIS_RDB_VERSION) return REDIS_ERR; + + /* Verify CRC64 */ + crc = crc64(0,p,len-8); + memrev64ifbe(&crc); + return (memcmp(&crc,footer+2,8) == 0) ? REDIS_OK : REDIS_ERR; +} + +/* DUMP keyname + * DUMP is actually not used by Redis Cluster but it is the obvious + * complement of RESTORE and can be useful for different applications. */ +void dumpCommand(redisClient *c) { + robj *o, *dumpobj; + rio payload; + + /* Check if the key is here. */ + if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { + addReply(c,shared.nullbulk); + return; + } + + /* Create the DUMP encoded representation. */ + createDumpPayload(&payload,o); + + /* Transfer to the client */ + dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr); + addReplyBulk(c,dumpobj); + decrRefCount(dumpobj); + return; +} + +/* RESTORE key ttl serialized-value [REPLACE] */ void restoreCommand(redisClient *c) { - FILE *fp; - char buf[64]; - robj *o; - unsigned char *data; long ttl; + rio payload; + int j, type, replace = 0; + robj *obj; + + /* Parse additional options */ + for (j = 4; j < c->argc; j++) { + if (!strcasecmp(c->argv[j]->ptr,"replace")) { + replace = 1; + } else { + addReply(c,shared.syntaxerr); + return; + } + } /* Make sure this key does not already exist here... */ - if (lookupKeyWrite(c->db,c->argv[1]) != NULL) { + if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) { addReplyError(c,"Target key name is busy."); return; } @@ -1403,136 +1600,237 @@ void restoreCommand(redisClient *c) { return; } - /* rdbLoadObject() only works against file descriptors so we need to - * dump the serialized object into a file and reload. */ - snprintf(buf,sizeof(buf),"redis-restore-%d.tmp",getpid()); - fp = fopen(buf,"w+"); - if (!fp) { - redisLog(REDIS_WARNING,"Can't open tmp file for RESTORE: %s", - strerror(errno)); - addReplyErrorFormat(c,"RESTORE failed, tmp file creation error: %s", - strerror(errno)); - return; - } - unlink(buf); - - /* Write the actual data and rewind the file */ - data = (unsigned char*) c->argv[3]->ptr; - if (fwrite(data+1,sdslen((sds)data)-1,1,fp) != 1) { - redisLog(REDIS_WARNING,"Can't write against tmp file for RESTORE: %s", - strerror(errno)); - addReplyError(c,"RESTORE failed, tmp file I/O error."); - fclose(fp); + /* Verify RDB version and data checksum. */ + if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == REDIS_ERR) { + addReplyError(c,"DUMP payload version or checksum are wrong"); return; } - rewind(fp); - /* Finally create the object from the serialized dump and - * store it at the specified key. */ - if ((data[0] > 4 && data[0] < 9) || - data[0] > 11 || - (o = rdbLoadObject(data[0],fp)) == NULL) + rioInitWithBuffer(&payload,c->argv[3]->ptr); + if (((type = rdbLoadObjectType(&payload)) == -1) || + ((obj = rdbLoadObject(type,&payload)) == NULL)) { - addReplyError(c,"Bad data format."); - fclose(fp); + addReplyError(c,"Bad data format"); return; } - fclose(fp); + + /* Remove the old key if needed. */ + if (replace) dbDelete(c->db,c->argv[1]); /* Create the key and set the TTL if any */ - dbAdd(c->db,c->argv[1],o); - if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl); + dbAdd(c->db,c->argv[1],obj); + if (ttl) setExpire(c->db,c->argv[1],mstime()+ttl); + signalModifiedKey(c->db,c->argv[1]); addReply(c,shared.ok); + server.dirty++; } -/* MIGRATE host port key dbid timeout */ -void migrateCommand(redisClient *c) { +/* MIGRATE socket cache implementation. + * + * We take a map between host:ip and a TCP socket that we used to connect + * to this instance in recent time. + * This sockets are closed when the max number we cache is reached, and also + * in serverCron() when they are around for more than a few seconds. */ +#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */ +#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached socekts after 10 sec. */ + +typedef struct migrateCachedSocket { int fd; + time_t last_use_time; +} migrateCachedSocket; + +/* Return a TCP scoket connected with the target instance, possibly returning + * a cached one. + * + * This function is responsible of sending errors to the client if a + * connection can't be established. In this case -1 is returned. + * Otherwise on success the socket is returned, and the caller should not + * attempt to free it after usage. + * + * If the caller detects an error while using the socket, migrateCloseSocket() + * should be called so that the connection will be craeted from scratch + * the next time. */ +int migrateGetSocket(redisClient *c, robj *host, robj *port, long timeout) { + int fd; + sds name = sdsempty(); + migrateCachedSocket *cs; + + /* Check if we have an already cached socket for this ip:port pair. */ + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (cs) { + sdsfree(name); + cs->last_use_time = server.unixtime; + return cs->fd; + } + + /* No cached socket, create one. */ + if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) { + /* Too many items, drop one at random. */ + dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets); + cs = dictGetVal(de); + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + + /* Create the socket */ + fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, + atoi(c->argv[2]->ptr)); + if (fd == -1) { + sdsfree(name); + addReplyErrorFormat(c,"Can't connect to target node: %s", + server.neterr); + return -1; + } + anetTcpNoDelay(server.neterr,fd); + + /* Check if it connects within the specified timeout. */ + if ((aeWait(fd,AE_WRITABLE,timeout) & AE_WRITABLE) == 0) { + sdsfree(name); + addReplySds(c,sdsnew("-IOERR error or timeout connecting to the client\r\n")); + close(fd); + return -1; + } + + /* Add to the cache and return it to the caller. */ + cs = zmalloc(sizeof(*cs)); + cs->fd = fd; + cs->last_use_time = server.unixtime; + dictAdd(server.migrate_cached_sockets,name,cs); + return fd; +} + +/* Free a migrate cached connection. */ +void migrateCloseSocket(robj *host, robj *port) { + sds name = sdsempty(); + migrateCachedSocket *cs; + + name = sdscatlen(name,host->ptr,sdslen(host->ptr)); + name = sdscatlen(name,":",1); + name = sdscatlen(name,port->ptr,sdslen(port->ptr)); + cs = dictFetchValue(server.migrate_cached_sockets,name); + if (!cs) { + sdsfree(name); + return; + } + + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,name); + sdsfree(name); +} + +void migrateCloseTimedoutSockets(void) { + dictIterator *di = dictGetSafeIterator(server.migrate_cached_sockets); + dictEntry *de; + + while((de = dictNext(di)) != NULL) { + migrateCachedSocket *cs = dictGetVal(de); + + if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) { + close(cs->fd); + zfree(cs); + dictDelete(server.migrate_cached_sockets,dictGetKey(de)); + } + } + dictReleaseIterator(di); +} + +/* MIGRATE host port key dbid timeout [COPY | REPLACE] */ +void migrateCommand(redisClient *c) { + int fd, copy, replace, j; long timeout; long dbid; - char buf[64]; - FILE *fp; - time_t ttl; + long long ttl, expireat; robj *o; - unsigned char type; - off_t payload_len; + rio cmd, payload; + int retry_num = 0; + +try_again: + /* Initialization */ + copy = 0; + replace = 0; + ttl = 0; + + /* Parse additional options */ + for (j = 6; j < c->argc; j++) { + if (!strcasecmp(c->argv[j]->ptr,"copy")) { + copy = 1; + } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { + replace = 1; + } else { + addReply(c,shared.syntaxerr); + return; + } + } /* Sanity check */ if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK) return; if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK) return; - if (timeout <= 0) timeout = 1; + if (timeout <= 0) timeout = 1000; /* Check if the key is here. If not we reply with success as there is * nothing to migrate (for instance the key expired in the meantime), but * we include such information in the reply string. */ if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) { - addReplySds(c,sdsnew("+NOKEY")); + addReplySds(c,sdsnew("+NOKEY\r\n")); return; } /* Connect */ - fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, - atoi(c->argv[2]->ptr)); - if (fd == -1) { - addReplyErrorFormat(c,"Can't connect to target node: %s", - server.neterr); - return; - } - if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { - addReplyError(c,"Timeout connecting to the client"); - return; - } - - /* Create temp file */ - snprintf(buf,sizeof(buf),"redis-migrate-%d.tmp",getpid()); - fp = fopen(buf,"w+"); - if (!fp) { - redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"MIGRATE failed, tmp file creation error: %s.", - strerror(errno)); - return; + fd = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); + if (fd == -1) return; /* error sent to the client by migrateGetSocket() */ + + /* Create RESTORE payload and generate the protocol to call the command. */ + rioInitWithBuffer(&cmd,sdsempty()); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); + redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); + + expireat = getExpire(c->db,c->argv[3]); + if (expireat != -1) { + ttl = expireat-mstime(); + if (ttl < 1) ttl = 1; } - unlink(buf); - - /* Build the SELECT + RESTORE query writing it in our temp file. */ - if (fwriteBulkCount(fp,'*',2) == 0) goto file_wr_err; - if (fwriteBulkString(fp,"SELECT",6) == 0) goto file_wr_err; - if (fwriteBulkLongLong(fp,dbid) == 0) goto file_wr_err; - - ttl = getExpire(c->db,c->argv[3]); - type = o->type; - if (fwriteBulkCount(fp,'*',4) == 0) goto file_wr_err; - if (fwriteBulkString(fp,"RESTORE",7) == 0) goto file_wr_err; - if (fwriteBulkObject(fp,c->argv[3]) == 0) goto file_wr_err; - if (fwriteBulkLongLong(fp, (ttl == -1) ? 0 : ttl) == 0) goto file_wr_err; - - /* Finally the last argument that is the serailized object payload - * in the form: . */ - payload_len = rdbSavedObjectLen(o); - if (fwriteBulkCount(fp,'$',payload_len+1) == 0) goto file_wr_err; - if (fwrite(&type,1,1,fp) == 0) goto file_wr_err; - if (rdbSaveObject(fp,o) == -1) goto file_wr_err; - if (fwrite("\r\n",2,1,fp) == 0) goto file_wr_err; - - /* Tranfer the query to the other node */ - rewind(fp); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); + redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); + redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); + + /* Emit the payload argument, that is the serailized object using + * the DUMP format. */ + createDumpPayload(&payload,o); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr, + sdslen(payload.io.buffer.ptr))); + sdsfree(payload.io.buffer.ptr); + + /* Add the REPLACE option to the RESTORE command if it was specified + * as a MIGRATE option. */ + if (replace) + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); + + /* Tranfer the query to the other node in 64K chunks. */ + errno = 0; { - char buf[4096]; - size_t nread; - - while ((nread = fread(buf,1,sizeof(buf),fp)) != 0) { - int nwritten; - - nwritten = syncWrite(fd,buf,nread,timeout); - if (nwritten != (signed)nread) goto socket_wr_err; + sds buf = cmd.io.buffer.ptr; + size_t pos = 0, towrite; + int nwritten = 0; + + while ((towrite = sdslen(buf)-pos) > 0) { + towrite = (towrite > (64*1024) ? (64*1024) : towrite); + nwritten = syncWrite(fd,buf+pos,towrite,timeout); + if (nwritten != (signed)towrite) goto socket_wr_err; + pos += nwritten; } - if (ferror(fp)) goto file_rd_err; } - /* Read back the reply */ + /* Read back the reply. */ { char buf1[1024]; char buf2[1024]; @@ -1541,128 +1839,59 @@ void migrateCommand(redisClient *c) { if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0) goto socket_rd_err; if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0) - goto socket_rd_err; + goto socket_rd_err; if (buf1[0] == '-' || buf2[0] == '-') { addReplyErrorFormat(c,"Target instance replied with error: %s", (buf1[0] == '-') ? buf1+1 : buf2+1); } else { - dbDelete(c->db,c->argv[3]); + robj *aux; + + if (!copy) { + /* No COPY option: remove the local key, signal the change. */ + dbDelete(c->db,c->argv[3]); + signalModifiedKey(c->db,c->argv[3]); + } addReply(c,shared.ok); + server.dirty++; + + /* Translate MIGRATE as DEL for replication/AOF. */ + aux = createStringObject("DEL",3); + rewriteClientCommandVector(c,2,aux,c->argv[3]); + decrRefCount(aux); } } - fclose(fp); - close(fd); - return; -file_wr_err: - redisLog(REDIS_WARNING,"Can't write on tmp file for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"MIGRATE failed, tmp file write error: %s.", - strerror(errno)); - fclose(fp); - close(fd); - return; - -file_rd_err: - redisLog(REDIS_WARNING,"Can't read from tmp file for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"MIGRATE failed, tmp file read error: %s.", - strerror(errno)); - fclose(fp); - close(fd); + sdsfree(cmd.io.buffer.ptr); return; socket_wr_err: - redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.", - strerror(errno)); - fclose(fp); - close(fd); + sdsfree(cmd.io.buffer.ptr); + migrateCloseSocket(c->argv[1],c->argv[2]); + if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again; + addReplySds(c, + sdsnew("-IOERR error or timeout writing to target instance\r\n")); return; socket_rd_err: - redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.", - strerror(errno)); - fclose(fp); - close(fd); + sdsfree(cmd.io.buffer.ptr); + migrateCloseSocket(c->argv[1],c->argv[2]); + if (errno != ETIMEDOUT && retry_num++ == 0) goto try_again; + addReplySds(c, + sdsnew("-IOERR error or timeout reading from target node\r\n")); return; } -/* DUMP keyname - * DUMP is actually not used by Redis Cluster but it is the obvious - * complement of RESTORE and can be useful for different applications. */ -void dumpCommand(redisClient *c) { - char buf[64]; - FILE *fp; - robj *o, *dumpobj; - sds dump = NULL; - off_t payload_len; - unsigned int type; - - /* Check if the key is here. */ - if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { - addReply(c,shared.nullbulk); - return; - } - - /* Create temp file */ - snprintf(buf,sizeof(buf),"redis-dump-%d.tmp",getpid()); - fp = fopen(buf,"w+"); - if (!fp) { - redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"DUMP failed, tmp file creation error: %s.", - strerror(errno)); +/* The ASKING command is required after a -ASK redirection. + * The client should issue ASKING before to actualy send the command to + * the target instance. See the Redis Cluster specification for more + * information. */ +void askingCommand(redisClient *c) { + if (server.cluster_enabled == 0) { + addReplyError(c,"This instance has cluster support disabled"); return; } - unlink(buf); - - /* Dump the serailized object and read it back in memory. - * We prefix it with a one byte containing the type ID. - * This is the serialization format understood by RESTORE. */ - if (rdbSaveObject(fp,o) == -1) goto file_wr_err; - payload_len = ftello(fp); - if (fseeko(fp,0,SEEK_SET) == -1) goto file_rd_err; - dump = sdsnewlen(NULL,payload_len+1); - if (payload_len && fread(dump+1,payload_len,1,fp) != 1) goto file_rd_err; - fclose(fp); - type = o->type; - if (type == REDIS_LIST && o->encoding == REDIS_ENCODING_ZIPLIST) - type = REDIS_LIST_ZIPLIST; - else if (type == REDIS_HASH && o->encoding == REDIS_ENCODING_ZIPMAP) - type = REDIS_HASH_ZIPMAP; - else if (type == REDIS_SET && o->encoding == REDIS_ENCODING_INTSET) - type = REDIS_SET_INTSET; - else - type = o->type; - dump[0] = type; - - /* Transfer to the client */ - dumpobj = createObject(REDIS_STRING,dump); - addReplyBulk(c,dumpobj); - decrRefCount(dumpobj); - return; - -file_wr_err: - redisLog(REDIS_WARNING,"Can't write on tmp file for DUMP: %s", - strerror(errno)); - addReplyErrorFormat(c,"DUMP failed, tmp file write error: %s.", - strerror(errno)); - sdsfree(dump); - fclose(fp); - return; - -file_rd_err: - redisLog(REDIS_WARNING,"Can't read from tmp file for DUMP: %s", - strerror(errno)); - addReplyErrorFormat(c,"DUMP failed, tmp file read error: %s.", - strerror(errno)); - sdsfree(dump); - fclose(fp); - return; + c->flags |= REDIS_ASKING; + addReply(c,shared.ok); } /* ----------------------------------------------------------------------------- @@ -1726,7 +1955,7 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr)); n = server.cluster.slots[slot]; - redisAssert(n != NULL); + redisAssertWithInfo(c,firstkey,n != NULL); } else { /* If it is not the first key, make sure it is exactly * the same key as the first we saw. */ @@ -1758,9 +1987,12 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg } /* Handle the case in which we are receiving this hash slot from * another instance, so we'll accept the query even if in the table - * it is assigned to a different node. */ - if (server.cluster.importing_slots_from[slot] != NULL) + * it is assigned to a different node, but only if the client + * issued an ASKING command before. */ + if (server.cluster.importing_slots_from[slot] != NULL && + c->flags & REDIS_ASKING) { return server.cluster.myself; + } /* It's not a -ASK case. Base case: just return the right node. */ return n; }