X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/92690d29fee4822a0e6280c0fe8afb8a6ff04671..744f34d834df8424fd201305fb469e540bb29020:/src/cluster.c diff --git a/src/cluster.c b/src/cluster.c index ef61ff4c..4e3cf746 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -13,6 +13,7 @@ int clusterNodeGetSlotBit(clusterNode *n, int slot); sds clusterGenNodesDescription(void); clusterNode *clusterLookupNode(char *name); int clusterNodeAddSlave(clusterNode *master, clusterNode *slave); +int clusterAddSlot(clusterNode *n, int slot); /* ----------------------------------------------------------------------------- * Initialization @@ -86,6 +87,8 @@ int clusterLoadConfig(char *filename) { n->flags |= REDIS_NODE_HANDSHAKE; } else if (!strcasecmp(s,"noaddr")) { n->flags |= REDIS_NODE_NOADDR; + } else if (!strcasecmp(s,"noflags")) { + /* nothing to do */ } else { redisPanic("Unknown flag in redis cluster config file"); } @@ -104,6 +107,10 @@ int clusterLoadConfig(char *filename) { clusterNodeAddSlave(master,n); } + /* Set ping sent / pong received timestamps */ + if (atoi(argv[4])) n->ping_sent = time(NULL); + if (atoi(argv[5])) n->pong_received = time(NULL); + /* Populate hash slots served by this instance. */ for (j = 7; j < argc; j++) { int start, stop; @@ -127,6 +134,7 @@ int clusterLoadConfig(char *filename) { redisAssert(server.cluster.myself != NULL); redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s", server.cluster.myself->name); + clusterUpdateState(); return REDIS_OK; fmterr: @@ -503,6 +511,7 @@ int clusterProcessPacket(clusterLink *link) { sender = clusterLookupNode(hdr->sender); if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { + int update_config = 0; redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node); /* Add this node if it is new for us and the msg type is MEET. @@ -516,6 +525,7 @@ int clusterProcessPacket(clusterLink *link) { nodeIp2String(node->ip,link); node->port = ntohs(hdr->port); clusterAddNode(node); + update_config = 1; } /* Get info from the gossip section */ @@ -523,8 +533,12 @@ int clusterProcessPacket(clusterLink *link) { /* Anyway reply with a PONG */ clusterSendPing(link,CLUSTERMSG_TYPE_PONG); + + /* Update config if needed */ + if (update_config) clusterSaveConfigOrDie(); } else if (type == CLUSTERMSG_TYPE_PONG) { - int update = 0; + int update_state = 0; + int update_config = 0; redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node); if (link->node) { @@ -545,6 +559,7 @@ int clusterProcessPacket(clusterLink *link) { redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.", link->node->name); link->node->flags &= ~REDIS_NODE_HANDSHAKE; + update_config = 1; } else if (memcmp(link->node->name,hdr->sender, REDIS_CLUSTER_NAMELEN) != 0) { @@ -554,6 +569,7 @@ int clusterProcessPacket(clusterLink *link) { redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID"); link->node->flags |= REDIS_NODE_NOADDR; freeClusterLink(link); + update_config = 1; /* FIXME: remove this node if we already have it. * * If we already have it but the IP is different, use @@ -599,7 +615,7 @@ int clusterProcessPacket(clusterLink *link) { server.cluster.slots[j]->flags & REDIS_NODE_FAIL) { server.cluster.slots[j] = sender; - update = 1; + update_state = update_config = 1; } } } @@ -610,15 +626,14 @@ int clusterProcessPacket(clusterLink *link) { clusterProcessGossipSection(hdr,link); /* Update the cluster state if needed */ - if (update) { - clusterUpdateState(); - clusterSaveConfigOrDie(); - } + if (update_state) clusterUpdateState(); + if (update_config) clusterSaveConfigOrDie(); } else if (type == CLUSTERMSG_TYPE_FAIL && sender) { clusterNode *failing; failing = clusterLookupNode(hdr->data.fail.about.nodename); - if (failing && !(failing->flags & REDIS_NODE_FAIL)) { + if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF))) + { redisLog(REDIS_NOTICE, "FAIL message received from %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); @@ -876,7 +891,7 @@ void clusterCron(void) { * normal PING packets. */ node->flags &= ~REDIS_NODE_MEET; - redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d\n", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); + redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); } } dictReleaseIterator(di); @@ -906,22 +921,34 @@ void clusterCron(void) { int delay; if (node->flags & - (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE| - REDIS_NODE_FAIL)) continue; + (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE)) + continue; /* Check only if we already sent a ping and did not received * a reply yet. */ if (node->ping_sent == 0 || node->ping_sent <= node->pong_received) continue; delay = time(NULL) - node->pong_received; - if (node->flags & REDIS_NODE_PFAIL) { + if (delay < server.cluster.node_timeout) { /* The PFAIL condition can be reversed without external * help if it is not transitive (that is, if it does not - * turn into a FAIL state). */ - if (delay < server.cluster.node_timeout) + * turn into a FAIL state). + * + * The FAIL condition is also reversible if there are no slaves + * for this host, so no slave election should be in progress. + * + * TODO: consider all the implications of resurrecting a + * FAIL node. */ + if (node->flags & REDIS_NODE_PFAIL) { node->flags &= ~REDIS_NODE_PFAIL; + } else if (node->flags & REDIS_NODE_FAIL && !node->numslaves) { + node->flags &= ~REDIS_NODE_FAIL; + clusterUpdateState(); + } } else { - if (delay >= server.cluster.node_timeout) { + /* Timeout reached. Set the noad se possibly failing if it is + * not already in this state. */ + if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) { redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing", node->name); node->flags |= REDIS_NODE_PFAIL; @@ -966,7 +993,7 @@ int clusterNodeGetSlotBit(clusterNode *n, int slot) { * an error and REDIS_ERR is returned. */ int clusterAddSlot(clusterNode *n, int slot) { redisAssert(clusterNodeSetSlotBit(n,slot) == 0); - server.cluster.slots[slot] = server.cluster.myself; + server.cluster.slots[slot] = n; printf("SLOT %d added to %.40s\n", slot, n->name); return REDIS_OK; } @@ -1059,8 +1086,8 @@ sds clusterGenNodesDescription(void) { start = -1; } } + ci = sdscatlen(ci,"\n",1); } - ci = sdscatlen(ci,"\n",1); dictReleaseIterator(di); return ci; } @@ -1166,11 +1193,13 @@ void clusterCommand(redisClient *c) { "cluster_slots_ok:%d\r\n" "cluster_slots_pfail:%d\r\n" "cluster_slots_fail:%d\r\n" + "cluster_known_nodes:%lu\r\n" , statestr[server.cluster.state], slots_assigned, slots_ok, slots_pfail, - slots_fail + slots_fail, + dictSize(server.cluster.nodes) ); addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", (unsigned long)sdslen(info)));