From 571e257db12eaa6cdd47811f5663ac1003e32b1b Mon Sep 17 00:00:00 2001 From: antirez Date: Sat, 10 Mar 2012 12:26:37 +0100 Subject: [PATCH] Redis 2.6 branch obtained from unstable removing all the cluster related code. --- src/Makefile | 4 +- src/cluster.c | 1762 ------------------------------------------------- src/config.c | 7 - src/db.c | 43 -- src/migrate.c | 190 ++++++ src/pubsub.c | 1 - src/redis.c | 53 +- src/redis.h | 148 +---- 8 files changed, 193 insertions(+), 2015 deletions(-) delete mode 100644 src/cluster.c create mode 100644 src/migrate.c diff --git a/src/Makefile b/src/Makefile index cca785cb..8677b60e 100644 --- a/src/Makefile +++ b/src/Makefile @@ -73,7 +73,7 @@ QUIET_CC = @printf ' %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR); endif -OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o +OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o migrate.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o @@ -101,8 +101,6 @@ aof.o: aof.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h bio.o: bio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h bio.h -cluster.o: cluster.c redis.h fmacros.h config.h ae.h sds.h dict.h \ - adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h config.o: config.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h crc16.o: crc16.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ diff --git a/src/cluster.c b/src/cluster.c deleted file mode 100644 index f76e8ff5..00000000 --- a/src/cluster.c +++ /dev/null @@ -1,1762 +0,0 @@ -#include "redis.h" - -#include -#include -#include - -void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); -void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask); -void clusterSendPing(clusterLink *link, int type); -void clusterSendFail(char *nodename); -void clusterUpdateState(void); -int clusterNodeGetSlotBit(clusterNode *n, int slot); -sds clusterGenNodesDescription(void); -clusterNode *clusterLookupNode(char *name); -int clusterNodeAddSlave(clusterNode *master, clusterNode *slave); -int clusterAddSlot(clusterNode *n, int slot); - -/* ----------------------------------------------------------------------------- - * Initialization - * -------------------------------------------------------------------------- */ - -int clusterLoadConfig(char *filename) { - FILE *fp = fopen(filename,"r"); - char *line; - int maxline, j; - - if (fp == NULL) return REDIS_ERR; - - /* Parse the file. Note that single liens of the cluster config file can - * be really long as they include all the hash slots of the node. - * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers. - * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */ - maxline = 1024+REDIS_CLUSTER_SLOTS*16; - line = zmalloc(maxline); - while(fgets(line,maxline,fp) != NULL) { - int argc; - sds *argv = sdssplitargs(line,&argc); - clusterNode *n, *master; - char *p, *s; - - /* Create this node if it does not exist */ - n = clusterLookupNode(argv[0]); - if (!n) { - n = createClusterNode(argv[0],0); - clusterAddNode(n); - } - /* Address and port */ - if ((p = strchr(argv[1],':')) == NULL) goto fmterr; - *p = '\0'; - memcpy(n->ip,argv[1],strlen(argv[1])+1); - n->port = atoi(p+1); - - /* Parse flags */ - p = s = argv[2]; - while(p) { - p = strchr(s,','); - if (p) *p = '\0'; - if (!strcasecmp(s,"myself")) { - redisAssert(server.cluster.myself == NULL); - server.cluster.myself = n; - n->flags |= REDIS_NODE_MYSELF; - } else if (!strcasecmp(s,"master")) { - n->flags |= REDIS_NODE_MASTER; - } else if (!strcasecmp(s,"slave")) { - n->flags |= REDIS_NODE_SLAVE; - } else if (!strcasecmp(s,"fail?")) { - n->flags |= REDIS_NODE_PFAIL; - } else if (!strcasecmp(s,"fail")) { - n->flags |= REDIS_NODE_FAIL; - } else if (!strcasecmp(s,"handshake")) { - n->flags |= REDIS_NODE_HANDSHAKE; - } else if (!strcasecmp(s,"noaddr")) { - n->flags |= REDIS_NODE_NOADDR; - } else if (!strcasecmp(s,"noflags")) { - /* nothing to do */ - } else { - redisPanic("Unknown flag in redis cluster config file"); - } - if (p) s = p+1; - } - - /* Get master if any. Set the master and populate master's - * slave list. */ - if (argv[3][0] != '-') { - master = clusterLookupNode(argv[3]); - if (!master) { - master = createClusterNode(argv[3],0); - clusterAddNode(master); - } - n->slaveof = master; - clusterNodeAddSlave(master,n); - } - - /* Set ping sent / pong received timestamps */ - if (atoi(argv[4])) n->ping_sent = time(NULL); - if (atoi(argv[5])) n->pong_received = time(NULL); - - /* Populate hash slots served by this instance. */ - for (j = 7; j < argc; j++) { - int start, stop; - - if (argv[j][0] == '[') { - /* Here we handle migrating / importing slots */ - int slot; - char direction; - clusterNode *cn; - - p = strchr(argv[j],'-'); - redisAssert(p != NULL); - *p = '\0'; - direction = p[1]; /* Either '>' or '<' */ - slot = atoi(argv[j]+1); - p += 3; - cn = clusterLookupNode(p); - if (!cn) { - cn = createClusterNode(p,0); - clusterAddNode(cn); - } - if (direction == '>') { - server.cluster.migrating_slots_to[slot] = cn; - } else { - server.cluster.importing_slots_from[slot] = cn; - } - continue; - } else if ((p = strchr(argv[j],'-')) != NULL) { - *p = '\0'; - start = atoi(argv[j]); - stop = atoi(p+1); - } else { - start = stop = atoi(argv[j]); - } - while(start <= stop) clusterAddSlot(n, start++); - } - - sdssplitargs_free(argv,argc); - } - zfree(line); - fclose(fp); - - /* Config sanity check */ - redisAssert(server.cluster.myself != NULL); - redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s", - server.cluster.myself->name); - clusterUpdateState(); - return REDIS_OK; - -fmterr: - redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file."); - fclose(fp); - exit(1); -} - -/* Cluster node configuration is exactly the same as CLUSTER NODES output. - * - * This function writes the node config and returns 0, on error -1 - * is returned. */ -int clusterSaveConfig(void) { - sds ci = clusterGenNodesDescription(); - int fd; - - if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644)) - == -1) goto err; - if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err; - close(fd); - sdsfree(ci); - return 0; - -err: - sdsfree(ci); - return -1; -} - -void clusterSaveConfigOrDie(void) { - if (clusterSaveConfig() == -1) { - redisLog(REDIS_WARNING,"Fatal: can't update cluster config file."); - exit(1); - } -} - -void clusterInit(void) { - int saveconf = 0; - - server.cluster.myself = NULL; - server.cluster.state = REDIS_CLUSTER_FAIL; - server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL); - server.cluster.node_timeout = 15; - memset(server.cluster.migrating_slots_to,0, - sizeof(server.cluster.migrating_slots_to)); - memset(server.cluster.importing_slots_from,0, - sizeof(server.cluster.importing_slots_from)); - memset(server.cluster.slots,0, - sizeof(server.cluster.slots)); - if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) { - /* No configuration found. We will just use the random name provided - * by the createClusterNode() function. */ - server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF); - redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s", - server.cluster.myself->name); - clusterAddNode(server.cluster.myself); - saveconf = 1; - } - if (saveconf) clusterSaveConfigOrDie(); - /* We need a listening TCP port for our cluster messaging needs */ - server.cfd = anetTcpServer(server.neterr, - server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr); - if (server.cfd == -1) { - redisLog(REDIS_WARNING, "Opening cluster TCP port: %s", server.neterr); - exit(1); - } - if (aeCreateFileEvent(server.el, server.cfd, AE_READABLE, - clusterAcceptHandler, NULL) == AE_ERR) oom("creating file event"); - server.cluster.slots_to_keys = zslCreate(); -} - -/* ----------------------------------------------------------------------------- - * CLUSTER communication link - * -------------------------------------------------------------------------- */ - -clusterLink *createClusterLink(clusterNode *node) { - clusterLink *link = zmalloc(sizeof(*link)); - link->sndbuf = sdsempty(); - link->rcvbuf = sdsempty(); - link->node = node; - link->fd = -1; - return link; -} - -/* Free a cluster link, but does not free the associated node of course. - * Just this function will make sure that the original node associated - * with this link will have the 'link' field set to NULL. */ -void freeClusterLink(clusterLink *link) { - if (link->fd != -1) { - aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE); - aeDeleteFileEvent(server.el, link->fd, AE_READABLE); - } - sdsfree(link->sndbuf); - sdsfree(link->rcvbuf); - if (link->node) - link->node->link = NULL; - close(link->fd); - zfree(link); -} - -void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - int cport, cfd; - char cip[128]; - clusterLink *link; - REDIS_NOTUSED(el); - REDIS_NOTUSED(mask); - REDIS_NOTUSED(privdata); - - cfd = anetTcpAccept(server.neterr, fd, cip, &cport); - if (cfd == AE_ERR) { - redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr); - return; - } - redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport); - /* We need to create a temporary node in order to read the incoming - * packet in a valid contest. This node will be released once we - * read the packet and reply. */ - link = createClusterLink(NULL); - link->fd = cfd; - aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link); -} - -/* ----------------------------------------------------------------------------- - * Key space handling - * -------------------------------------------------------------------------- */ - -/* We have 4096 hash slots. The hash slot of a given key is obtained - * as the least significant 12 bits of the crc16 of the key. */ -unsigned int keyHashSlot(char *key, int keylen) { - return crc16(key,keylen) & 0x0FFF; -} - -/* ----------------------------------------------------------------------------- - * CLUSTER node API - * -------------------------------------------------------------------------- */ - -/* Create a new cluster node, with the specified flags. - * If "nodename" is NULL this is considered a first handshake and a random - * node name is assigned to this node (it will be fixed later when we'll - * receive the first pong). - * - * The node is created and returned to the user, but it is not automatically - * added to the nodes hash table. */ -clusterNode *createClusterNode(char *nodename, int flags) { - clusterNode *node = zmalloc(sizeof(*node)); - - if (nodename) - memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN); - else - getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN); - node->flags = flags; - memset(node->slots,0,sizeof(node->slots)); - node->numslaves = 0; - node->slaves = NULL; - node->slaveof = NULL; - node->ping_sent = node->pong_received = 0; - node->configdigest = NULL; - node->configdigest_ts = 0; - node->link = NULL; - return node; -} - -int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) { - int j; - - for (j = 0; j < master->numslaves; j++) { - if (master->slaves[j] == slave) { - memmove(master->slaves+j,master->slaves+(j+1), - (master->numslaves-1)-j); - master->numslaves--; - return REDIS_OK; - } - } - return REDIS_ERR; -} - -int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) { - int j; - - /* If it's already a slave, don't add it again. */ - for (j = 0; j < master->numslaves; j++) - if (master->slaves[j] == slave) return REDIS_ERR; - master->slaves = zrealloc(master->slaves, - sizeof(clusterNode*)*(master->numslaves+1)); - master->slaves[master->numslaves] = slave; - master->numslaves++; - return REDIS_OK; -} - -void clusterNodeResetSlaves(clusterNode *n) { - zfree(n->slaves); - n->numslaves = 0; -} - -void freeClusterNode(clusterNode *n) { - sds nodename; - - nodename = sdsnewlen(n->name, REDIS_CLUSTER_NAMELEN); - redisAssert(dictDelete(server.cluster.nodes,nodename) == DICT_OK); - sdsfree(nodename); - if (n->slaveof) clusterNodeRemoveSlave(n->slaveof, n); - if (n->link) freeClusterLink(n->link); - zfree(n); -} - -/* Add a node to the nodes hash table */ -int clusterAddNode(clusterNode *node) { - int retval; - - retval = dictAdd(server.cluster.nodes, - sdsnewlen(node->name,REDIS_CLUSTER_NAMELEN), node); - return (retval == DICT_OK) ? REDIS_OK : REDIS_ERR; -} - -/* Node lookup by name */ -clusterNode *clusterLookupNode(char *name) { - sds s = sdsnewlen(name, REDIS_CLUSTER_NAMELEN); - struct dictEntry *de; - - de = dictFind(server.cluster.nodes,s); - sdsfree(s); - if (de == NULL) return NULL; - return dictGetVal(de); -} - -/* This is only used after the handshake. When we connect a given IP/PORT - * as a result of CLUSTER MEET we don't have the node name yet, so we - * pick a random one, and will fix it when we receive the PONG request using - * this function. */ -void clusterRenameNode(clusterNode *node, char *newname) { - int retval; - sds s = sdsnewlen(node->name, REDIS_CLUSTER_NAMELEN); - - redisLog(REDIS_DEBUG,"Renaming node %.40s into %.40s", - node->name, newname); - retval = dictDelete(server.cluster.nodes, s); - sdsfree(s); - redisAssert(retval == DICT_OK); - memcpy(node->name, newname, REDIS_CLUSTER_NAMELEN); - clusterAddNode(node); -} - -/* ----------------------------------------------------------------------------- - * CLUSTER messages exchange - PING/PONG and gossip - * -------------------------------------------------------------------------- */ - -/* Process the gossip section of PING or PONG packets. - * Note that this function assumes that the packet is already sanity-checked - * by the caller, not in the content of the gossip section, but in the - * length. */ -void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { - uint16_t count = ntohs(hdr->count); - clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip; - clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender); - - while(count--) { - sds ci = sdsempty(); - uint16_t flags = ntohs(g->flags); - clusterNode *node; - - if (flags == 0) ci = sdscat(ci,"noflags,"); - if (flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,"); - if (flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,"); - if (flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,"); - if (flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,"); - if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,"); - if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,"); - if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,"); - if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' '; - - redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s", - g->nodename, - g->ip, - ntohs(g->port), - ci); - sdsfree(ci); - - /* Update our state accordingly to the gossip sections */ - node = clusterLookupNode(g->nodename); - if (node != NULL) { - /* We already know this node. Let's start updating the last - * time PONG figure if it is newer than our figure. - * Note that it's not a problem if we have a PING already - * in progress against this node. */ - if (node->pong_received < (signed) ntohl(g->pong_received)) { - redisLog(REDIS_DEBUG,"Node pong_received updated by gossip"); - node->pong_received = ntohl(g->pong_received); - } - /* Mark this node as FAILED if we think it is possibly failing - * and another node also thinks it's failing. */ - if (node->flags & REDIS_NODE_PFAIL && - (flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL))) - { - redisLog(REDIS_NOTICE,"Received a PFAIL acknowledge from node %.40s, marking node %.40s as FAIL!", hdr->sender, node->name); - node->flags &= ~REDIS_NODE_PFAIL; - node->flags |= REDIS_NODE_FAIL; - /* Broadcast the failing node name to everybody */ - clusterSendFail(node->name); - clusterUpdateState(); - clusterSaveConfigOrDie(); - } - } else { - /* If it's not in NOADDR state and we don't have it, we - * start an handshake process against this IP/PORT pairs. - * - * Note that we require that the sender of this gossip message - * is a well known node in our cluster, otherwise we risk - * joining another cluster. */ - if (sender && !(flags & REDIS_NODE_NOADDR)) { - clusterNode *newnode; - - redisLog(REDIS_DEBUG,"Adding the new node"); - newnode = createClusterNode(NULL,REDIS_NODE_HANDSHAKE); - memcpy(newnode->ip,g->ip,sizeof(g->ip)); - newnode->port = ntohs(g->port); - clusterAddNode(newnode); - } - } - - /* Next node */ - g++; - } -} - -/* IP -> string conversion. 'buf' is supposed to at least be 16 bytes. */ -void nodeIp2String(char *buf, clusterLink *link) { - struct sockaddr_in sa; - socklen_t salen = sizeof(sa); - - if (getpeername(link->fd, (struct sockaddr*) &sa, &salen) == -1) - redisPanic("getpeername() failed."); - strncpy(buf,inet_ntoa(sa.sin_addr),sizeof(link->node->ip)); -} - - -/* Update the node address to the IP address that can be extracted - * from link->fd, and at the specified port. */ -void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) { - /* TODO */ -} - -/* When this function is called, there is a packet to process starting - * at node->rcvbuf. Releasing the buffer is up to the caller, so this - * function should just handle the higher level stuff of processing the - * packet, modifying the cluster state if needed. - * - * The function returns 1 if the link is still valid after the packet - * was processed, otherwise 0 if the link was freed since the packet - * processing lead to some inconsistency error (for instance a PONG - * received from the wrong sender ID). */ -int clusterProcessPacket(clusterLink *link) { - clusterMsg *hdr = (clusterMsg*) link->rcvbuf; - uint32_t totlen = ntohl(hdr->totlen); - uint16_t type = ntohs(hdr->type); - clusterNode *sender; - - redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", - type, (unsigned long) totlen); - - /* Perform sanity checks */ - if (totlen < 8) return 1; - if (totlen > sdslen(link->rcvbuf)) return 1; - if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || - type == CLUSTERMSG_TYPE_MEET) - { - uint16_t count = ntohs(hdr->count); - uint32_t explen; /* expected length of this packet */ - - explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - explen += (sizeof(clusterMsgDataGossip)*count); - if (totlen != explen) return 1; - } - if (type == CLUSTERMSG_TYPE_FAIL) { - uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - - explen += sizeof(clusterMsgDataFail); - if (totlen != explen) return 1; - } - if (type == CLUSTERMSG_TYPE_PUBLISH) { - uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - - explen += sizeof(clusterMsgDataPublish) + - ntohl(hdr->data.publish.msg.channel_len) + - ntohl(hdr->data.publish.msg.message_len); - if (totlen != explen) return 1; - } - - /* Ready to process the packet. Dispatch by type. */ - sender = clusterLookupNode(hdr->sender); - if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { - int update_config = 0; - redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node); - - /* Add this node if it is new for us and the msg type is MEET. - * In this stage we don't try to add the node with the right - * flags, slaveof pointer, and so forth, as this details will be - * resolved when we'll receive PONGs from the server. */ - if (!sender && type == CLUSTERMSG_TYPE_MEET) { - clusterNode *node; - - node = createClusterNode(NULL,REDIS_NODE_HANDSHAKE); - nodeIp2String(node->ip,link); - node->port = ntohs(hdr->port); - clusterAddNode(node); - update_config = 1; - } - - /* Get info from the gossip section */ - clusterProcessGossipSection(hdr,link); - - /* Anyway reply with a PONG */ - clusterSendPing(link,CLUSTERMSG_TYPE_PONG); - - /* Update config if needed */ - if (update_config) clusterSaveConfigOrDie(); - } else if (type == CLUSTERMSG_TYPE_PONG) { - int update_state = 0; - int update_config = 0; - - redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node); - if (link->node) { - if (link->node->flags & REDIS_NODE_HANDSHAKE) { - /* If we already have this node, try to change the - * IP/port of the node with the new one. */ - if (sender) { - redisLog(REDIS_WARNING, - "Handshake error: we already know node %.40s, updating the address if needed.", sender->name); - nodeUpdateAddress(sender,link,ntohs(hdr->port)); - freeClusterNode(link->node); /* will free the link too */ - return 0; - } - - /* First thing to do is replacing the random name with the - * right node name if this was an handshake stage. */ - clusterRenameNode(link->node, hdr->sender); - redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.", - link->node->name); - link->node->flags &= ~REDIS_NODE_HANDSHAKE; - update_config = 1; - } else if (memcmp(link->node->name,hdr->sender, - REDIS_CLUSTER_NAMELEN) != 0) - { - /* If the reply has a non matching node ID we - * disconnect this node and set it as not having an associated - * address. */ - redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID"); - link->node->flags |= REDIS_NODE_NOADDR; - freeClusterLink(link); - update_config = 1; - /* FIXME: remove this node if we already have it. - * - * If we already have it but the IP is different, use - * the new one if the old node is in FAIL, PFAIL, or NOADDR - * status... */ - return 0; - } - } - /* Update our info about the node */ - if (link->node) link->node->pong_received = time(NULL); - - /* Update master/slave info */ - if (sender) { - if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME, - sizeof(hdr->slaveof))) - { - sender->flags &= ~REDIS_NODE_SLAVE; - sender->flags |= REDIS_NODE_MASTER; - sender->slaveof = NULL; - } else { - clusterNode *master = clusterLookupNode(hdr->slaveof); - - sender->flags &= ~REDIS_NODE_MASTER; - sender->flags |= REDIS_NODE_SLAVE; - if (sender->numslaves) clusterNodeResetSlaves(sender); - if (master) clusterNodeAddSlave(master,sender); - } - } - - /* Update our info about served slots if this new node is serving - * slots that are not served from our point of view. */ - if (sender && sender->flags & REDIS_NODE_MASTER) { - int newslots, j; - - newslots = - memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0; - memcpy(sender->slots,hdr->myslots,sizeof(hdr->myslots)); - if (newslots) { - for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { - if (clusterNodeGetSlotBit(sender,j)) { - if (server.cluster.slots[j] == sender) continue; - if (server.cluster.slots[j] == NULL || - server.cluster.slots[j]->flags & REDIS_NODE_FAIL) - { - server.cluster.slots[j] = sender; - update_state = update_config = 1; - } - } - } - } - } - - /* Get info from the gossip section */ - clusterProcessGossipSection(hdr,link); - - /* Update the cluster state if needed */ - if (update_state) clusterUpdateState(); - if (update_config) clusterSaveConfigOrDie(); - } else if (type == CLUSTERMSG_TYPE_FAIL && sender) { - clusterNode *failing; - - failing = clusterLookupNode(hdr->data.fail.about.nodename); - if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF))) - { - redisLog(REDIS_NOTICE, - "FAIL message received from %.40s about %.40s", - hdr->sender, hdr->data.fail.about.nodename); - failing->flags |= REDIS_NODE_FAIL; - failing->flags &= ~REDIS_NODE_PFAIL; - clusterUpdateState(); - clusterSaveConfigOrDie(); - } - } else if (type == CLUSTERMSG_TYPE_PUBLISH) { - robj *channel, *message; - uint32_t channel_len, message_len; - - /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */ - if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) { - channel_len = ntohl(hdr->data.publish.msg.channel_len); - message_len = ntohl(hdr->data.publish.msg.message_len); - channel = createStringObject( - (char*)hdr->data.publish.msg.bulk_data,channel_len); - message = createStringObject( - (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len); - pubsubPublishMessage(channel,message); - decrRefCount(channel); - decrRefCount(message); - } - } else { - redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); - } - return 1; -} - -/* This function is called when we detect the link with this node is lost. - We set the node as no longer connected. The Cluster Cron will detect - this connection and will try to get it connected again. - - Instead if the node is a temporary node used to accept a query, we - completely free the node on error. */ -void handleLinkIOError(clusterLink *link) { - freeClusterLink(link); -} - -/* Send data. This is handled using a trivial send buffer that gets - * consumed by write(). We don't try to optimize this for speed too much - * as this is a very low traffic channel. */ -void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - clusterLink *link = (clusterLink*) privdata; - ssize_t nwritten; - REDIS_NOTUSED(el); - REDIS_NOTUSED(mask); - - nwritten = write(fd, link->sndbuf, sdslen(link->sndbuf)); - if (nwritten <= 0) { - redisLog(REDIS_NOTICE,"I/O error writing to node link: %s", - strerror(errno)); - handleLinkIOError(link); - return; - } - link->sndbuf = sdsrange(link->sndbuf,nwritten,-1); - if (sdslen(link->sndbuf) == 0) - aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE); -} - -/* Read data. Try to read the first field of the header first to check the - * full length of the packet. When a whole packet is in memory this function - * will call the function to process the packet. And so forth. */ -void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - char buf[1024]; - ssize_t nread; - clusterMsg *hdr; - clusterLink *link = (clusterLink*) privdata; - int readlen; - REDIS_NOTUSED(el); - REDIS_NOTUSED(mask); - -again: - if (sdslen(link->rcvbuf) >= 4) { - hdr = (clusterMsg*) link->rcvbuf; - readlen = ntohl(hdr->totlen) - sdslen(link->rcvbuf); - } else { - readlen = 4 - sdslen(link->rcvbuf); - } - - nread = read(fd,buf,readlen); - if (nread == -1 && errno == EAGAIN) return; /* Just no data */ - - if (nread <= 0) { - /* I/O error... */ - redisLog(REDIS_NOTICE,"I/O error reading from node link: %s", - (nread == 0) ? "connection closed" : strerror(errno)); - handleLinkIOError(link); - return; - } else { - /* Read data and recast the pointer to the new buffer. */ - link->rcvbuf = sdscatlen(link->rcvbuf,buf,nread); - hdr = (clusterMsg*) link->rcvbuf; - } - - /* Total length obtained? read the payload now instead of burning - * cycles waiting for a new event to fire. */ - if (sdslen(link->rcvbuf) == 4) goto again; - - /* Whole packet in memory? We can process it. */ - if (sdslen(link->rcvbuf) == ntohl(hdr->totlen)) { - if (clusterProcessPacket(link)) { - sdsfree(link->rcvbuf); - link->rcvbuf = sdsempty(); - } - } -} - -/* Put stuff into the send buffer. */ -void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { - if (sdslen(link->sndbuf) == 0 && msglen != 0) - aeCreateFileEvent(server.el,link->fd,AE_WRITABLE, - clusterWriteHandler,link); - - link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); -} - -/* Send a message to all the nodes with a reliable link */ -void clusterBroadcastMessage(void *buf, size_t len) { - dictIterator *di; - dictEntry *de; - - di = dictGetIterator(server.cluster.nodes); - while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); - - if (!node->link) continue; - if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue; - clusterSendMessage(node->link,buf,len); - } - dictReleaseIterator(di); -} - -/* Build the message header */ -void clusterBuildMessageHdr(clusterMsg *hdr, int type) { - int totlen = 0; - - memset(hdr,0,sizeof(*hdr)); - hdr->type = htons(type); - memcpy(hdr->sender,server.cluster.myself->name,REDIS_CLUSTER_NAMELEN); - memcpy(hdr->myslots,server.cluster.myself->slots, - sizeof(hdr->myslots)); - memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN); - if (server.cluster.myself->slaveof != NULL) { - memcpy(hdr->slaveof,server.cluster.myself->slaveof->name, - REDIS_CLUSTER_NAMELEN); - } - hdr->port = htons(server.port); - hdr->state = server.cluster.state; - memset(hdr->configdigest,0,32); /* FIXME: set config digest */ - - if (type == CLUSTERMSG_TYPE_FAIL) { - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - totlen += sizeof(clusterMsgDataFail); - } - hdr->totlen = htonl(totlen); - /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */ -} - -/* Send a PING or PONG packet to the specified node, making sure to add enough - * gossip informations. */ -void clusterSendPing(clusterLink *link, int type) { - unsigned char buf[1024]; - clusterMsg *hdr = (clusterMsg*) buf; - int gossipcount = 0, totlen; - /* freshnodes is the number of nodes we can still use to populate the - * gossip section of the ping packet. Basically we start with the nodes - * we have in memory minus two (ourself and the node we are sending the - * message to). Every time we add a node we decrement the counter, so when - * it will drop to <= zero we know there is no more gossip info we can - * send. */ - int freshnodes = dictSize(server.cluster.nodes)-2; - - if (link->node && type == CLUSTERMSG_TYPE_PING) - link->node->ping_sent = time(NULL); - clusterBuildMessageHdr(hdr,type); - - /* Populate the gossip fields */ - while(freshnodes > 0 && gossipcount < 3) { - struct dictEntry *de = dictGetRandomKey(server.cluster.nodes); - clusterNode *this = dictGetVal(de); - clusterMsgDataGossip *gossip; - int j; - - /* Not interesting to gossip about ourself. - * Nor to send gossip info about HANDSHAKE state nodes (zero info). */ - if (this == server.cluster.myself || - this->flags & REDIS_NODE_HANDSHAKE) { - freshnodes--; /* otherwise we may loop forever. */ - continue; - } - - /* Check if we already added this node */ - for (j = 0; j < gossipcount; j++) { - if (memcmp(hdr->data.ping.gossip[j].nodename,this->name, - REDIS_CLUSTER_NAMELEN) == 0) break; - } - if (j != gossipcount) continue; - - /* Add it */ - freshnodes--; - gossip = &(hdr->data.ping.gossip[gossipcount]); - memcpy(gossip->nodename,this->name,REDIS_CLUSTER_NAMELEN); - gossip->ping_sent = htonl(this->ping_sent); - gossip->pong_received = htonl(this->pong_received); - memcpy(gossip->ip,this->ip,sizeof(this->ip)); - gossip->port = htons(this->port); - gossip->flags = htons(this->flags); - gossipcount++; - } - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - totlen += (sizeof(clusterMsgDataGossip)*gossipcount); - hdr->count = htons(gossipcount); - hdr->totlen = htonl(totlen); - clusterSendMessage(link,buf,totlen); -} - -/* Send a PUBLISH message. - * - * If link is NULL, then the message is broadcasted to the whole cluster. */ -void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { - unsigned char buf[4096], *payload; - clusterMsg *hdr = (clusterMsg*) buf; - uint32_t totlen; - uint32_t channel_len, message_len; - - channel = getDecodedObject(channel); - message = getDecodedObject(message); - channel_len = sdslen(channel->ptr); - message_len = sdslen(message->ptr); - - clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH); - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - totlen += sizeof(clusterMsgDataPublish) + channel_len + message_len; - - hdr->data.publish.msg.channel_len = htonl(channel_len); - hdr->data.publish.msg.message_len = htonl(message_len); - hdr->totlen = htonl(totlen); - - /* Try to use the local buffer if possible */ - if (totlen < sizeof(buf)) { - payload = buf; - } else { - payload = zmalloc(totlen); - hdr = (clusterMsg*) payload; - memcpy(payload,hdr,sizeof(hdr)); - } - memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr)); - memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr), - message->ptr,sdslen(message->ptr)); - - if (link) - clusterSendMessage(link,payload,totlen); - else - clusterBroadcastMessage(payload,totlen); - - decrRefCount(channel); - decrRefCount(message); - if (payload != buf) zfree(payload); -} - -/* Send a FAIL message to all the nodes we are able to contact. - * The FAIL message is sent when we detect that a node is failing - * (REDIS_NODE_PFAIL) and we also receive a gossip confirmation of this: - * we switch the node state to REDIS_NODE_FAIL and ask all the other - * nodes to do the same ASAP. */ -void clusterSendFail(char *nodename) { - unsigned char buf[1024]; - clusterMsg *hdr = (clusterMsg*) buf; - - clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL); - memcpy(hdr->data.fail.about.nodename,nodename,REDIS_CLUSTER_NAMELEN); - clusterBroadcastMessage(buf,ntohl(hdr->totlen)); -} - -/* ----------------------------------------------------------------------------- - * CLUSTER Pub/Sub support - * - * For now we do very little, just propagating PUBLISH messages across the whole - * cluster. In the future we'll try to get smarter and avoiding propagating those - * messages to hosts without receives for a given channel. - * -------------------------------------------------------------------------- */ -void clusterPropagatePublish(robj *channel, robj *message) { - clusterSendPublish(NULL, channel, message); -} - -/* ----------------------------------------------------------------------------- - * CLUSTER cron job - * -------------------------------------------------------------------------- */ - -/* This is executed 1 time every second */ -void clusterCron(void) { - dictIterator *di; - dictEntry *de; - int j; - time_t min_ping_sent = 0; - clusterNode *min_ping_node = NULL; - - /* Check if we have disconnected nodes and reestablish the connection. */ - di = dictGetIterator(server.cluster.nodes); - while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); - - if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue; - if (node->link == NULL) { - int fd; - clusterLink *link; - - fd = anetTcpNonBlockConnect(server.neterr, node->ip, - node->port+REDIS_CLUSTER_PORT_INCR); - if (fd == -1) continue; - link = createClusterLink(node); - link->fd = fd; - node->link = link; - aeCreateFileEvent(server.el,link->fd,AE_READABLE,clusterReadHandler,link); - /* If the node is flagged as MEET, we send a MEET message instead - * of a PING one, to force the receiver to add us in its node - * table. */ - clusterSendPing(link, node->flags & REDIS_NODE_MEET ? - CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING); - /* We can clear the flag after the first packet is sent. - * If we'll never receive a PONG, we'll never send new packets - * to this node. Instead after the PONG is received and we - * are no longer in meet/handshake status, we want to send - * normal PING packets. */ - node->flags &= ~REDIS_NODE_MEET; - - redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); - } - } - dictReleaseIterator(di); - - /* Ping some random node. Check a few random nodes and ping the one with - * the oldest ping_sent time */ - for (j = 0; j < 5; j++) { - de = dictGetRandomKey(server.cluster.nodes); - clusterNode *this = dictGetVal(de); - - if (this->link == NULL) continue; - if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue; - if (min_ping_node == NULL || min_ping_sent > this->ping_sent) { - min_ping_node = this; - min_ping_sent = this->ping_sent; - } - } - if (min_ping_node) { - redisLog(REDIS_DEBUG,"Pinging node %40s", min_ping_node->name); - clusterSendPing(min_ping_node->link, CLUSTERMSG_TYPE_PING); - } - - /* Iterate nodes to check if we need to flag something as failing */ - di = dictGetIterator(server.cluster.nodes); - while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); - int delay; - - if (node->flags & - (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE)) - continue; - /* Check only if we already sent a ping and did not received - * a reply yet. */ - if (node->ping_sent == 0 || - node->ping_sent <= node->pong_received) continue; - - delay = time(NULL) - node->pong_received; - if (delay < server.cluster.node_timeout) { - /* The PFAIL condition can be reversed without external - * help if it is not transitive (that is, if it does not - * turn into a FAIL state). - * - * The FAIL condition is also reversible if there are no slaves - * for this host, so no slave election should be in progress. - * - * TODO: consider all the implications of resurrecting a - * FAIL node. */ - if (node->flags & REDIS_NODE_PFAIL) { - node->flags &= ~REDIS_NODE_PFAIL; - } else if (node->flags & REDIS_NODE_FAIL && !node->numslaves) { - node->flags &= ~REDIS_NODE_FAIL; - clusterUpdateState(); - } - } else { - /* Timeout reached. Set the noad se possibly failing if it is - * not already in this state. */ - if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) { - redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing", - node->name); - node->flags |= REDIS_NODE_PFAIL; - } - } - } - dictReleaseIterator(di); -} - -/* ----------------------------------------------------------------------------- - * Slots management - * -------------------------------------------------------------------------- */ - -/* Set the slot bit and return the old value. */ -int clusterNodeSetSlotBit(clusterNode *n, int slot) { - off_t byte = slot/8; - int bit = slot&7; - int old = (n->slots[byte] & (1<slots[byte] |= 1<slots[byte] & (1<slots[byte] &= ~(1<slots[byte] & (1<flags & (REDIS_NODE_FAIL)) - { - ok = 0; - break; - } - } - if (ok) { - if (server.cluster.state == REDIS_CLUSTER_NEEDHELP) { - server.cluster.state = REDIS_CLUSTER_NEEDHELP; - } else { - server.cluster.state = REDIS_CLUSTER_OK; - } - } else { - server.cluster.state = REDIS_CLUSTER_FAIL; - } -} - -/* ----------------------------------------------------------------------------- - * CLUSTER command - * -------------------------------------------------------------------------- */ - -sds clusterGenNodesDescription(void) { - sds ci = sdsempty(); - dictIterator *di; - dictEntry *de; - int j, start; - - di = dictGetIterator(server.cluster.nodes); - while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetVal(de); - - /* Node coordinates */ - ci = sdscatprintf(ci,"%.40s %s:%d ", - node->name, - node->ip, - node->port); - - /* Flags */ - if (node->flags == 0) ci = sdscat(ci,"noflags,"); - if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,"); - if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,"); - if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,"); - if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,"); - if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,"); - if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,"); - if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,"); - if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' '; - - /* Slave of... or just "-" */ - if (node->slaveof) - ci = sdscatprintf(ci,"%.40s ",node->slaveof->name); - else - ci = sdscatprintf(ci,"- "); - - /* Latency from the POV of this node, link status */ - ci = sdscatprintf(ci,"%ld %ld %s", - (long) node->ping_sent, - (long) node->pong_received, - (node->link || node->flags & REDIS_NODE_MYSELF) ? - "connected" : "disconnected"); - - /* Slots served by this instance */ - start = -1; - for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { - int bit; - - if ((bit = clusterNodeGetSlotBit(node,j)) != 0) { - if (start == -1) start = j; - } - if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) { - if (j == REDIS_CLUSTER_SLOTS-1) j++; - - if (start == j-1) { - ci = sdscatprintf(ci," %d",start); - } else { - ci = sdscatprintf(ci," %d-%d",start,j-1); - } - start = -1; - } - } - - /* Just for MYSELF node we also dump info about slots that - * we are migrating to other instances or importing from other - * instances. */ - if (node->flags & REDIS_NODE_MYSELF) { - for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { - if (server.cluster.migrating_slots_to[j]) { - ci = sdscatprintf(ci," [%d->-%.40s]",j, - server.cluster.migrating_slots_to[j]->name); - } else if (server.cluster.importing_slots_from[j]) { - ci = sdscatprintf(ci," [%d-<-%.40s]",j, - server.cluster.importing_slots_from[j]->name); - } - } - } - ci = sdscatlen(ci,"\n",1); - } - dictReleaseIterator(di); - return ci; -} - -int getSlotOrReply(redisClient *c, robj *o) { - long long slot; - - if (getLongLongFromObject(o,&slot) != REDIS_OK || - slot < 0 || slot > REDIS_CLUSTER_SLOTS) - { - addReplyError(c,"Invalid or out of range slot"); - return -1; - } - return (int) slot; -} - -void clusterCommand(redisClient *c) { - if (server.cluster_enabled == 0) { - addReplyError(c,"This instance has cluster support disabled"); - return; - } - - if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) { - clusterNode *n; - struct sockaddr_in sa; - long port; - - /* Perform sanity checks on IP/port */ - if (inet_aton(c->argv[2]->ptr,&sa.sin_addr) == 0) { - addReplyError(c,"Invalid IP address in MEET"); - return; - } - if (getLongFromObjectOrReply(c, c->argv[3], &port, NULL) != REDIS_OK || - port < 0 || port > (65535-REDIS_CLUSTER_PORT_INCR)) - { - addReplyError(c,"Invalid TCP port specified"); - return; - } - - /* Finally add the node to the cluster with a random name, this - * will get fixed in the first handshake (ping/pong). */ - n = createClusterNode(NULL,REDIS_NODE_HANDSHAKE|REDIS_NODE_MEET); - strncpy(n->ip,inet_ntoa(sa.sin_addr),sizeof(n->ip)); - n->port = port; - clusterAddNode(n); - addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) { - robj *o; - sds ci = clusterGenNodesDescription(); - - o = createObject(REDIS_STRING,ci); - addReplyBulk(c,o); - decrRefCount(o); - } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") || - !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) - { - /* CLUSTER ADDSLOTS [slot] ... */ - /* CLUSTER DELSLOTS [slot] ... */ - int j, slot; - unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS); - int del = !strcasecmp(c->argv[1]->ptr,"delslots"); - - memset(slots,0,REDIS_CLUSTER_SLOTS); - /* Check that all the arguments are parsable and that all the - * slots are not already busy. */ - for (j = 2; j < c->argc; j++) { - if ((slot = getSlotOrReply(c,c->argv[j])) == -1) { - zfree(slots); - return; - } - if (del && server.cluster.slots[slot] == NULL) { - addReplyErrorFormat(c,"Slot %d is already unassigned", slot); - zfree(slots); - return; - } else if (!del && server.cluster.slots[slot]) { - addReplyErrorFormat(c,"Slot %d is already busy", slot); - zfree(slots); - return; - } - if (slots[slot]++ == 1) { - addReplyErrorFormat(c,"Slot %d specified multiple times", - (int)slot); - zfree(slots); - return; - } - } - for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { - if (slots[j]) { - int retval; - - /* If this slot was set as importing we can clear this - * state as now we are the real owner of the slot. */ - if (server.cluster.importing_slots_from[j]) - server.cluster.importing_slots_from[j] = NULL; - - retval = del ? clusterDelSlot(j) : - clusterAddSlot(server.cluster.myself,j); - redisAssertWithInfo(c,NULL,retval == REDIS_OK); - } - } - zfree(slots); - clusterUpdateState(); - clusterSaveConfigOrDie(); - addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) { - /* SETSLOT 10 MIGRATING */ - /* SETSLOT 10 IMPORTING */ - /* SETSLOT 10 STABLE */ - /* SETSLOT 10 NODE */ - int slot; - clusterNode *n; - - if ((slot = getSlotOrReply(c,c->argv[2])) == -1) return; - - if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) { - if (server.cluster.slots[slot] != server.cluster.myself) { - addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot); - return; - } - if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) { - addReplyErrorFormat(c,"I don't know about node %s", - (char*)c->argv[4]->ptr); - return; - } - server.cluster.migrating_slots_to[slot] = n; - } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) { - if (server.cluster.slots[slot] == server.cluster.myself) { - addReplyErrorFormat(c, - "I'm already the owner of hash slot %u",slot); - return; - } - if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) { - addReplyErrorFormat(c,"I don't know about node %s", - (char*)c->argv[3]->ptr); - return; - } - server.cluster.importing_slots_from[slot] = n; - } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) { - /* CLUSTER SETSLOT STABLE */ - server.cluster.importing_slots_from[slot] = NULL; - server.cluster.migrating_slots_to[slot] = NULL; - } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) { - /* CLUSTER SETSLOT NODE */ - clusterNode *n = clusterLookupNode(c->argv[4]->ptr); - - if (!n) addReplyErrorFormat(c,"Unknown node %s", - (char*)c->argv[4]->ptr); - /* If this hash slot was served by 'myself' before to switch - * make sure there are no longer local keys for this hash slot. */ - if (server.cluster.slots[slot] == server.cluster.myself && - n != server.cluster.myself) - { - int numkeys; - robj **keys; - - keys = zmalloc(sizeof(robj*)*1); - numkeys = GetKeysInSlot(slot, keys, 1); - zfree(keys); - if (numkeys != 0) { - addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot); - return; - } - } - /* If this node was the slot owner and the slot was marked as - * migrating, assigning the slot to another node will clear - * the migratig status. */ - if (server.cluster.slots[slot] == server.cluster.myself && - server.cluster.migrating_slots_to[slot]) - server.cluster.migrating_slots_to[slot] = NULL; - - /* If this node was importing this slot, assigning the slot to - * itself also clears the importing status. */ - if (n == server.cluster.myself && server.cluster.importing_slots_from[slot]) - server.cluster.importing_slots_from[slot] = NULL; - - clusterDelSlot(slot); - clusterAddSlot(n,slot); - } else { - addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments"); - return; - } - clusterSaveConfigOrDie(); - addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { - char *statestr[] = {"ok","fail","needhelp"}; - int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0; - int j; - - for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { - clusterNode *n = server.cluster.slots[j]; - - if (n == NULL) continue; - slots_assigned++; - if (n->flags & REDIS_NODE_FAIL) { - slots_fail++; - } else if (n->flags & REDIS_NODE_PFAIL) { - slots_pfail++; - } else { - slots_ok++; - } - } - - sds info = sdscatprintf(sdsempty(), - "cluster_state:%s\r\n" - "cluster_slots_assigned:%d\r\n" - "cluster_slots_ok:%d\r\n" - "cluster_slots_pfail:%d\r\n" - "cluster_slots_fail:%d\r\n" - "cluster_known_nodes:%lu\r\n" - , statestr[server.cluster.state], - slots_assigned, - slots_ok, - slots_pfail, - slots_fail, - dictSize(server.cluster.nodes) - ); - addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", - (unsigned long)sdslen(info))); - addReplySds(c,info); - addReply(c,shared.crlf); - } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) { - sds key = c->argv[2]->ptr; - - addReplyLongLong(c,keyHashSlot(key,sdslen(key))); - } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) { - long long maxkeys, slot; - unsigned int numkeys, j; - robj **keys; - - if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != REDIS_OK) - return; - if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) != REDIS_OK) - return; - if (slot < 0 || slot >= REDIS_CLUSTER_SLOTS || maxkeys < 0 || - maxkeys > 1024*1024) { - addReplyError(c,"Invalid slot or number of keys"); - return; - } - - keys = zmalloc(sizeof(robj*)*maxkeys); - numkeys = GetKeysInSlot(slot, keys, maxkeys); - addReplyMultiBulkLen(c,numkeys); - for (j = 0; j < numkeys; j++) addReplyBulk(c,keys[j]); - zfree(keys); - } else { - addReplyError(c,"Wrong CLUSTER subcommand or number of arguments"); - } -} - -/* ----------------------------------------------------------------------------- - * RESTORE and MIGRATE commands - * -------------------------------------------------------------------------- */ - -/* RESTORE key ttl serialized-value */ -void restoreCommand(redisClient *c) { - long ttl; - rio payload; - int type; - robj *obj; - - /* Make sure this key does not already exist here... */ - if (lookupKeyWrite(c->db,c->argv[1]) != NULL) { - addReplyError(c,"Target key name is busy."); - return; - } - - /* Check if the TTL value makes sense */ - if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) { - return; - } else if (ttl < 0) { - addReplyError(c,"Invalid TTL value, must be >= 0"); - return; - } - - rioInitWithBuffer(&payload,c->argv[3]->ptr); - if (((type = rdbLoadObjectType(&payload)) == -1) || - ((obj = rdbLoadObject(type,&payload)) == NULL)) - { - addReplyError(c,"Bad data format"); - return; - } - - /* Create the key and set the TTL if any */ - dbAdd(c->db,c->argv[1],obj); - if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl); - signalModifiedKey(c->db,c->argv[1]); - addReply(c,shared.ok); - server.dirty++; -} - -/* MIGRATE host port key dbid timeout */ -void migrateCommand(redisClient *c) { - int fd; - long timeout; - long dbid; - time_t ttl; - robj *o; - rio cmd, payload; - - /* Sanity check */ - if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK) - return; - if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK) - return; - if (timeout <= 0) timeout = 1; - - /* Check if the key is here. If not we reply with success as there is - * nothing to migrate (for instance the key expired in the meantime), but - * we include such information in the reply string. */ - if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) { - addReplySds(c,sdsnew("+NOKEY\r\n")); - return; - } - - /* Connect */ - fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, - atoi(c->argv[2]->ptr)); - if (fd == -1) { - addReplyErrorFormat(c,"Can't connect to target node: %s", - server.neterr); - return; - } - if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { - addReplyError(c,"Timeout connecting to the client"); - return; - } - - rioInitWithBuffer(&cmd,sdsempty()); - redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); - redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); - redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); - - ttl = getExpire(c->db,c->argv[3]); - redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4)); - redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); - redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW); - redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); - redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl)); - - /* Finally the last argument that is the serailized object payload - * in the form: . */ - rioInitWithBuffer(&payload,sdsempty()); - redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); - redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1); - redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr))); - sdsfree(payload.io.buffer.ptr); - - /* Tranfer the query to the other node in 64K chunks. */ - { - sds buf = cmd.io.buffer.ptr; - size_t pos = 0, towrite; - int nwritten = 0; - - while ((towrite = sdslen(buf)-pos) > 0) { - towrite = (towrite > (64*1024) ? (64*1024) : towrite); - nwritten = syncWrite(fd,buf+nwritten,towrite,timeout); - if (nwritten != (signed)towrite) goto socket_wr_err; - pos += nwritten; - } - } - - /* Read back the reply. */ - { - char buf1[1024]; - char buf2[1024]; - - /* Read the two replies */ - if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0) - goto socket_rd_err; - if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0) - goto socket_rd_err; - if (buf1[0] == '-' || buf2[0] == '-') { - addReplyErrorFormat(c,"Target instance replied with error: %s", - (buf1[0] == '-') ? buf1+1 : buf2+1); - } else { - robj *aux; - - dbDelete(c->db,c->argv[3]); - signalModifiedKey(c->db,c->argv[3]); - addReply(c,shared.ok); - server.dirty++; - - /* Translate MIGRATE as DEL for replication/AOF. */ - aux = createStringObject("DEL",3); - rewriteClientCommandVector(c,2,aux,c->argv[3]); - decrRefCount(aux); - } - } - - sdsfree(cmd.io.buffer.ptr); - close(fd); - return; - -socket_wr_err: - redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.", - strerror(errno)); - sdsfree(cmd.io.buffer.ptr); - close(fd); - return; - -socket_rd_err: - redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.", - strerror(errno)); - sdsfree(cmd.io.buffer.ptr); - close(fd); - return; -} - -/* DUMP keyname - * DUMP is actually not used by Redis Cluster but it is the obvious - * complement of RESTORE and can be useful for different applications. */ -void dumpCommand(redisClient *c) { - robj *o, *dumpobj; - rio payload; - - /* Check if the key is here. */ - if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { - addReply(c,shared.nullbulk); - return; - } - - /* Serialize the object in a RDB-like format. It consist of an object type - * byte followed by the serialized object. This is understood by RESTORE. */ - rioInitWithBuffer(&payload,sdsempty()); - redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); - redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o)); - - /* Transfer to the client */ - dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr); - addReplyBulk(c,dumpobj); - decrRefCount(dumpobj); - return; -} - -/* The ASKING command is required after a -ASK redirection. - * The client should issue ASKING before to actualy send the command to - * the target instance. See the Redis Cluster specification for more - * information. */ -void askingCommand(redisClient *c) { - if (server.cluster_enabled == 0) { - addReplyError(c,"This instance has cluster support disabled"); - return; - } - c->flags |= REDIS_ASKING; - addReply(c,shared.ok); -} - -/* ----------------------------------------------------------------------------- - * Cluster functions related to serving / redirecting clients - * -------------------------------------------------------------------------- */ - -/* Return the pointer to the cluster node that is able to serve the query - * as all the keys belong to hash slots for which the node is in charge. - * - * If the returned node should be used only for this request, the *ask - * integer is set to '1', otherwise to '0'. This is used in order to - * let the caller know if we should reply with -MOVED or with -ASK. - * - * If the request contains more than a single key NULL is returned, - * however a request with more then a key argument where the key is always - * the same is valid, like in: RPOPLPUSH mylist mylist.*/ -clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask) { - clusterNode *n = NULL; - robj *firstkey = NULL; - multiState *ms, _ms; - multiCmd mc; - int i, slot = 0; - - /* We handle all the cases as if they were EXEC commands, so we have - * a common code path for everything */ - if (cmd->proc == execCommand) { - /* If REDIS_MULTI flag is not set EXEC is just going to return an - * error. */ - if (!(c->flags & REDIS_MULTI)) return server.cluster.myself; - ms = &c->mstate; - } else { - /* In order to have a single codepath create a fake Multi State - * structure if the client is not in MULTI/EXEC state, this way - * we have a single codepath below. */ - ms = &_ms; - _ms.commands = &mc; - _ms.count = 1; - mc.argv = argv; - mc.argc = argc; - mc.cmd = cmd; - } - - /* Check that all the keys are the same key, and get the slot and - * node for this key. */ - for (i = 0; i < ms->count; i++) { - struct redisCommand *mcmd; - robj **margv; - int margc, *keyindex, numkeys, j; - - mcmd = ms->commands[i].cmd; - margc = ms->commands[i].argc; - margv = ms->commands[i].argv; - - keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys, - REDIS_GETKEYS_ALL); - for (j = 0; j < numkeys; j++) { - if (firstkey == NULL) { - /* This is the first key we see. Check what is the slot - * and node. */ - firstkey = margv[keyindex[j]]; - - slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr)); - n = server.cluster.slots[slot]; - redisAssertWithInfo(c,firstkey,n != NULL); - } else { - /* If it is not the first key, make sure it is exactly - * the same key as the first we saw. */ - if (!equalStringObjects(firstkey,margv[keyindex[j]])) { - decrRefCount(firstkey); - getKeysFreeResult(keyindex); - return NULL; - } - } - } - getKeysFreeResult(keyindex); - } - if (ask) *ask = 0; /* This is the default. Set to 1 if needed later. */ - /* No key at all in command? then we can serve the request - * without redirections. */ - if (n == NULL) return server.cluster.myself; - if (hashslot) *hashslot = slot; - /* This request is about a slot we are migrating into another instance? - * Then we need to check if we have the key. If we have it we can reply. - * If instead is a new key, we pass the request to the node that is - * receiving the slot. */ - if (n == server.cluster.myself && - server.cluster.migrating_slots_to[slot] != NULL) - { - if (lookupKeyRead(&server.db[0],firstkey) == NULL) { - if (ask) *ask = 1; - return server.cluster.migrating_slots_to[slot]; - } - } - /* Handle the case in which we are receiving this hash slot from - * another instance, so we'll accept the query even if in the table - * it is assigned to a different node, but only if the client - * issued an ASKING command before. */ - if (server.cluster.importing_slots_from[slot] != NULL && - c->flags & REDIS_ASKING) { - return server.cluster.myself; - } - /* It's not a -ASK case. Base case: just return the right node. */ - return n; -} diff --git a/src/config.c b/src/config.c index 533a2a57..71a800a1 100644 --- a/src/config.c +++ b/src/config.c @@ -298,13 +298,6 @@ void loadServerConfigFromString(char *config) { err = "Target command name already exists"; goto loaderr; } } - } else if (!strcasecmp(argv[0],"cluster-enabled") && argc == 2) { - if ((server.cluster_enabled = yesnotoi(argv[1])) == -1) { - err = "argument must be 'yes' or 'no'"; goto loaderr; - } - } else if (!strcasecmp(argv[0],"cluster-config-file") && argc == 2) { - zfree(server.cluster.configfile); - server.cluster.configfile = zstrdup(argv[1]); } else if (!strcasecmp(argv[0],"lua-time-limit") && argc == 2) { server.lua_time_limit = strtoll(argv[1],NULL,10); } else if (!strcasecmp(argv[0],"slowlog-log-slower-than") && diff --git a/src/db.c b/src/db.c index a0775af9..7ca2a5f6 100644 --- a/src/db.c +++ b/src/db.c @@ -86,7 +86,6 @@ void dbAdd(redisDb *db, robj *key, robj *val) { int retval = dictAdd(db->dict, copy, val); redisAssertWithInfo(NULL,key,retval == REDIS_OK); - if (server.cluster_enabled) SlotToKeyAdd(key); } /* Overwrite an existing key with a new value. Incrementing the reference @@ -154,7 +153,6 @@ int dbDelete(redisDb *db, robj *key) { * the key, because it is shared with the main dictionary. */ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); if (dictDelete(db->dict,key->ptr) == DICT_OK) { - if (server.cluster_enabled) SlotToKeyDel(key); return 1; } else { return 0; @@ -254,10 +252,6 @@ void existsCommand(redisClient *c) { void selectCommand(redisClient *c) { int id = atoi(c->argv[1]->ptr); - if (server.cluster_enabled && id != 0) { - addReplyError(c,"SELECT is not allowed in cluster mode"); - return; - } if (selectDb(c,id) == REDIS_ERR) { addReplyError(c,"invalid DB index"); } else { @@ -398,11 +392,6 @@ void moveCommand(redisClient *c) { redisDb *src, *dst; int srcid; - if (server.cluster_enabled) { - addReplyError(c,"MOVE is not allowed in cluster mode"); - return; - } - /* Obtain source and target DB pointers */ src = c->db; srcid = c->db->id; @@ -714,35 +703,3 @@ int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *num *numkeys = num; return keys; } - -/* Slot to Key API. This is used by Redis Cluster in order to obtain in - * a fast way a key that belongs to a specified hash slot. This is useful - * while rehashing the cluster. */ -void SlotToKeyAdd(robj *key) { - unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr)); - - zslInsert(server.cluster.slots_to_keys,hashslot,key); - incrRefCount(key); -} - -void SlotToKeyDel(robj *key) { - unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr)); - - zslDelete(server.cluster.slots_to_keys,hashslot,key); -} - -unsigned int GetKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) { - zskiplistNode *n; - zrangespec range; - int j = 0; - - range.min = range.max = hashslot; - range.minex = range.maxex = 0; - - n = zslFirstInRange(server.cluster.slots_to_keys, range); - while(n && n->score == hashslot && count--) { - keys[j++] = n->obj; - n = n->level[0].forward; - } - return j; -} diff --git a/src/migrate.c b/src/migrate.c new file mode 100644 index 00000000..f7a5b730 --- /dev/null +++ b/src/migrate.c @@ -0,0 +1,190 @@ +#include "redis.h" + +/* ----------------------------------------------------------------------------- + * RESTORE and MIGRATE commands + * -------------------------------------------------------------------------- */ + +/* RESTORE key ttl serialized-value */ +void restoreCommand(redisClient *c) { + long ttl; + rio payload; + int type; + robj *obj; + + /* Make sure this key does not already exist here... */ + if (lookupKeyWrite(c->db,c->argv[1]) != NULL) { + addReplyError(c,"Target key name is busy."); + return; + } + + /* Check if the TTL value makes sense */ + if (getLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != REDIS_OK) { + return; + } else if (ttl < 0) { + addReplyError(c,"Invalid TTL value, must be >= 0"); + return; + } + + rioInitWithBuffer(&payload,c->argv[3]->ptr); + if (((type = rdbLoadObjectType(&payload)) == -1) || + ((obj = rdbLoadObject(type,&payload)) == NULL)) + { + addReplyError(c,"Bad data format"); + return; + } + + /* Create the key and set the TTL if any */ + dbAdd(c->db,c->argv[1],obj); + if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl); + signalModifiedKey(c->db,c->argv[1]); + addReply(c,shared.ok); + server.dirty++; +} + +/* MIGRATE host port key dbid timeout */ +void migrateCommand(redisClient *c) { + int fd; + long timeout; + long dbid; + time_t ttl; + robj *o; + rio cmd, payload; + + /* Sanity check */ + if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK) + return; + if (getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != REDIS_OK) + return; + if (timeout <= 0) timeout = 1; + + /* Check if the key is here. If not we reply with success as there is + * nothing to migrate (for instance the key expired in the meantime), but + * we include such information in the reply string. */ + if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) { + addReplySds(c,sdsnew("+NOKEY\r\n")); + return; + } + + /* Connect */ + fd = anetTcpNonBlockConnect(server.neterr,c->argv[1]->ptr, + atoi(c->argv[2]->ptr)); + if (fd == -1) { + addReplyErrorFormat(c,"Can't connect to target node: %s", + server.neterr); + return; + } + if ((aeWait(fd,AE_WRITABLE,timeout*1000) & AE_WRITABLE) == 0) { + addReplyError(c,"Timeout connecting to the client"); + return; + } + + rioInitWithBuffer(&cmd,sdsempty()); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); + redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); + + ttl = getExpire(c->db,c->argv[3]); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4)); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); + redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); + redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl)); + + /* Finally the last argument that is the serailized object payload + * in the form: . */ + rioInitWithBuffer(&payload,sdsempty()); + redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); + redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr))); + sdsfree(payload.io.buffer.ptr); + + /* Tranfer the query to the other node in 64K chunks. */ + { + sds buf = cmd.io.buffer.ptr; + size_t pos = 0, towrite; + int nwritten = 0; + + while ((towrite = sdslen(buf)-pos) > 0) { + towrite = (towrite > (64*1024) ? (64*1024) : towrite); + nwritten = syncWrite(fd,buf+nwritten,towrite,timeout); + if (nwritten != (signed)towrite) goto socket_wr_err; + pos += nwritten; + } + } + + /* Read back the reply. */ + { + char buf1[1024]; + char buf2[1024]; + + /* Read the two replies */ + if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0) + goto socket_rd_err; + if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0) + goto socket_rd_err; + if (buf1[0] == '-' || buf2[0] == '-') { + addReplyErrorFormat(c,"Target instance replied with error: %s", + (buf1[0] == '-') ? buf1+1 : buf2+1); + } else { + robj *aux; + + dbDelete(c->db,c->argv[3]); + signalModifiedKey(c->db,c->argv[3]); + addReply(c,shared.ok); + server.dirty++; + + /* Translate MIGRATE as DEL for replication/AOF. */ + aux = createStringObject("DEL",3); + rewriteClientCommandVector(c,2,aux,c->argv[3]); + decrRefCount(aux); + } + } + + sdsfree(cmd.io.buffer.ptr); + close(fd); + return; + +socket_wr_err: + redisLog(REDIS_NOTICE,"Can't write to target node for MIGRATE: %s", + strerror(errno)); + addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.", + strerror(errno)); + sdsfree(cmd.io.buffer.ptr); + close(fd); + return; + +socket_rd_err: + redisLog(REDIS_NOTICE,"Can't read from target node for MIGRATE: %s", + strerror(errno)); + addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.", + strerror(errno)); + sdsfree(cmd.io.buffer.ptr); + close(fd); + return; +} + +/* DUMP keyname + * DUMP is actually not used by Redis Cluster but it is the obvious + * complement of RESTORE and can be useful for different applications. */ +void dumpCommand(redisClient *c) { + robj *o, *dumpobj; + rio payload; + + /* Check if the key is here. */ + if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { + addReply(c,shared.nullbulk); + return; + } + + /* Serialize the object in a RDB-like format. It consist of an object type + * byte followed by the serialized object. This is understood by RESTORE. */ + rioInitWithBuffer(&payload,sdsempty()); + redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); + redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o)); + + /* Transfer to the client */ + dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr); + addReplyBulk(c,dumpobj); + decrRefCount(dumpobj); + return; +} diff --git a/src/pubsub.c b/src/pubsub.c index 984013bb..4e07dbba 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -263,6 +263,5 @@ void punsubscribeCommand(redisClient *c) { void publishCommand(redisClient *c) { int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); - if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); addReplyLongLong(c,receivers); } diff --git a/src/redis.c b/src/redis.c index 3294eea4..70c7cf0c 100644 --- a/src/redis.c +++ b/src/redis.c @@ -232,10 +232,8 @@ struct redisCommand redisCommandTable[] = { {"publish",publishCommand,3,"rpf",0,NULL,0,0,0,0,0}, {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0}, {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0}, - {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0}, {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0}, {"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0}, - {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0}, {"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0}, {"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0}, {"client",clientCommand,-2,"ar",0,NULL,0,0,0,0,0}, @@ -507,17 +505,6 @@ dictType keylistDictType = { dictListDestructor /* val destructor */ }; -/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to - * clusterNode structures. */ -dictType clusterNodesDictType = { - dictSdsHash, /* hash function */ - NULL, /* key dup */ - NULL, /* val dup */ - dictSdsKeyCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ - NULL /* val destructor */ -}; - int htNeedsResize(dict *dict) { long long size, used; @@ -792,9 +779,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * to detect transfer failures. */ if (!(loops % 10)) replicationCron(); - /* Run other sub-systems specific cron jobs */ - if (server.cluster_enabled && !(loops % 10)) clusterCron(); - server.cronloops++; return 100; } @@ -949,8 +933,6 @@ void initServerConfig() { server.shutdown_asap = 0; server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD; server.repl_timeout = REDIS_REPL_TIMEOUT; - server.cluster_enabled = 0; - server.cluster.configfile = zstrdup("nodes.conf"); server.lua_caller = NULL; server.lua_time_limit = REDIS_LUA_TIME_LIMIT; server.lua_client = NULL; @@ -1151,7 +1133,6 @@ void initServer() { server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION; } - if (server.cluster_enabled) clusterInit(); scriptingInit(); slowlogInit(); bioInit(); @@ -1374,29 +1355,6 @@ int processCommand(redisClient *c) { return REDIS_OK; } - /* If cluster is enabled, redirect here */ - if (server.cluster_enabled && - !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { - int hashslot; - - if (server.cluster.state != REDIS_CLUSTER_OK) { - addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information"); - return REDIS_OK; - } else { - int ask; - clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&ask); - if (n == NULL) { - addReplyError(c,"Multi keys request invalid in cluster"); - return REDIS_OK; - } else if (n != server.cluster.myself) { - addReplySds(c,sdscatprintf(sdsempty(), - "-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED", - hashslot,n->ip,n->port)); - return REDIS_OK; - } - } - } - /* Handle the maxmemory directive. * * First we try to free some memory if possible (if there are volatile @@ -1884,15 +1842,6 @@ sds genRedisInfoString(char *section) { } } - /* Clusetr */ - if (allsections || defsections || !strcasecmp(section,"cluster")) { - if (sections++) info = sdscat(info,"\r\n"); - info = sdscatprintf(info, - "# Cluster\r\n" - "cluster_enabled:%d\r\n", - server.cluster_enabled); - } - /* Key space */ if (allsections || defsections || !strcasecmp(section,"keyspace")) { if (sections++) info = sdscat(info,"\r\n"); @@ -2172,7 +2121,7 @@ void redisAsciiArt(void) { redisGitSHA1(), strtol(redisGitDirty(),NULL,10) > 0, (sizeof(long) == 8) ? "64" : "32", - server.cluster_enabled ? "cluster" : "stand alone", + "stand alone", server.port, (long) getpid() ); diff --git a/src/redis.h b/src/redis.h index 6ead029d..e47a41a4 100644 --- a/src/redis.h +++ b/src/redis.h @@ -425,134 +425,6 @@ typedef struct redisOpArray { int numops; } redisOpArray; -/*----------------------------------------------------------------------------- - * Redis cluster data structures - *----------------------------------------------------------------------------*/ - -#define REDIS_CLUSTER_SLOTS 4096 -#define REDIS_CLUSTER_OK 0 /* Everything looks ok */ -#define REDIS_CLUSTER_FAIL 1 /* The cluster can't work */ -#define REDIS_CLUSTER_NEEDHELP 2 /* The cluster works, but needs some help */ -#define REDIS_CLUSTER_NAMELEN 40 /* sha1 hex length */ -#define REDIS_CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */ - -struct clusterNode; - -/* clusterLink encapsulates everything needed to talk with a remote node. */ -typedef struct clusterLink { - int fd; /* TCP socket file descriptor */ - sds sndbuf; /* Packet send buffer */ - sds rcvbuf; /* Packet reception buffer */ - struct clusterNode *node; /* Node related to this link if any, or NULL */ -} clusterLink; - -/* Node flags */ -#define REDIS_NODE_MASTER 1 /* The node is a master */ -#define REDIS_NODE_SLAVE 2 /* The node is a slave */ -#define REDIS_NODE_PFAIL 4 /* Failure? Need acknowledge */ -#define REDIS_NODE_FAIL 8 /* The node is believed to be malfunctioning */ -#define REDIS_NODE_MYSELF 16 /* This node is myself */ -#define REDIS_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */ -#define REDIS_NODE_NOADDR 64 /* We don't know the address of this node */ -#define REDIS_NODE_MEET 128 /* Send a MEET message to this node */ -#define REDIS_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" - -struct clusterNode { - char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ - int flags; /* REDIS_NODE_... */ - unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */ - int numslaves; /* Number of slave nodes, if this is a master */ - struct clusterNode **slaves; /* pointers to slave nodes */ - struct clusterNode *slaveof; /* pointer to the master node */ - time_t ping_sent; /* Unix time we sent latest ping */ - time_t pong_received; /* Unix time we received the pong */ - char *configdigest; /* Configuration digest of this node */ - time_t configdigest_ts; /* Configuration digest timestamp */ - char ip[16]; /* Latest known IP address of this node */ - int port; /* Latest known port of this node */ - clusterLink *link; /* TCP/IP link with this node */ -}; -typedef struct clusterNode clusterNode; - -typedef struct { - char *configfile; - clusterNode *myself; /* This node */ - int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */ - int node_timeout; - dict *nodes; /* Hash table of name -> clusterNode structures */ - clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS]; - clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS]; - clusterNode *slots[REDIS_CLUSTER_SLOTS]; - zskiplist *slots_to_keys; -} clusterState; - -/* Redis cluster messages header */ - -/* Note that the PING, PONG and MEET messages are actually the same exact - * kind of packet. PONG is the reply to ping, in the extact format as a PING, - * while MEET is a special PING that forces the receiver to add the sender - * as a node (if it is not already in the list). */ -#define CLUSTERMSG_TYPE_PING 0 /* Ping */ -#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */ -#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */ -#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */ -#define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propatagion */ - -/* Initially we don't know our "name", but we'll find it once we connect - * to the first node, using the getsockname() function. Then we'll use this - * address for all the next messages. */ -typedef struct { - char nodename[REDIS_CLUSTER_NAMELEN]; - uint32_t ping_sent; - uint32_t pong_received; - char ip[16]; /* IP address last time it was seen */ - uint16_t port; /* port last time it was seen */ - uint16_t flags; - uint32_t notused; /* for 64 bit alignment */ -} clusterMsgDataGossip; - -typedef struct { - char nodename[REDIS_CLUSTER_NAMELEN]; -} clusterMsgDataFail; - -typedef struct { - uint32_t channel_len; - uint32_t message_len; - unsigned char bulk_data[8]; /* defined as 8 just for alignment concerns. */ -} clusterMsgDataPublish; - -union clusterMsgData { - /* PING, MEET and PONG */ - struct { - /* Array of N clusterMsgDataGossip structures */ - clusterMsgDataGossip gossip[1]; - } ping; - - /* FAIL */ - struct { - clusterMsgDataFail about; - } fail; - - /* PUBLISH */ - struct { - clusterMsgDataPublish msg; - } publish; -}; - -typedef struct { - uint32_t totlen; /* Total length of this message */ - uint16_t type; /* Message type */ - uint16_t count; /* Only used for some kind of messages. */ - char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */ - unsigned char myslots[REDIS_CLUSTER_SLOTS/8]; - char slaveof[REDIS_CLUSTER_NAMELEN]; - char configdigest[32]; - uint16_t port; /* Sender TCP base port */ - unsigned char state; /* Cluster state from the POV of the sender */ - unsigned char notused[5]; /* Reserved for future use. For alignment. */ - union clusterMsgData data; -} clusterMsg; - /*----------------------------------------------------------------------------- * Global server state *----------------------------------------------------------------------------*/ @@ -578,7 +450,6 @@ struct redisServer { mode_t unixsocketperm; /* UNIX socket permission */ int ipfd; /* TCP socket file descriptor */ int sofd; /* Unix socket file descriptor */ - int cfd; /* Cluster bus lisetning socket */ list *clients; /* List of active clients */ list *clients_to_close; /* Clients to close asynchronously */ list *slaves, *monitors; /* List of slaves and MONITORs */ @@ -696,9 +567,6 @@ struct redisServer { /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ - /* Cluster */ - int cluster_enabled; /* Is cluster enabled? */ - clusterState cluster; /* State of the cluster */ /* Scripting */ lua_State *lua; /* The Lua interpreter. We use just one for all clients */ redisClient *lua_client; /* The "fake client" to query Redis from Lua */ @@ -733,8 +601,7 @@ struct redisCommand { int arity; char *sflags; /* Flags as string represenation, one char per flag. */ int flags; /* The actual flags, obtained from the 'sflags' field. */ - /* Use a function to determine keys arguments in a command line. - * Used for Redis Cluster redirect. */ + /* Use a function to determine keys arguments in a command line. */ redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ int firstkey; /* The first argument that's a key (0 = no keys) */ @@ -810,7 +677,6 @@ extern struct redisServer server; extern struct sharedObjectsStruct shared; extern dictType setDictType; extern dictType zsetDictType; -extern dictType clusterNodesDictType; extern dictType dbDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; dictType hashDictType; @@ -1082,16 +948,6 @@ int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numke int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags); int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags); -/* Cluster */ -void clusterInit(void); -unsigned short crc16(const char *buf, int len); -unsigned int keyHashSlot(char *key, int keylen); -clusterNode *createClusterNode(char *nodename, int flags); -int clusterAddNode(clusterNode *node); -void clusterCron(void); -clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); -void clusterPropagatePublish(robj *channel, robj *message); - /* Scripting */ void scriptingInit(void); @@ -1223,10 +1079,8 @@ void punsubscribeCommand(redisClient *c); void publishCommand(redisClient *c); void watchCommand(redisClient *c); void unwatchCommand(redisClient *c); -void clusterCommand(redisClient *c); void restoreCommand(redisClient *c); void migrateCommand(redisClient *c); -void askingCommand(redisClient *c); void dumpCommand(redisClient *c); void objectCommand(redisClient *c); void clientCommand(redisClient *c); -- 2.45.2