X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/f797c7dc176c833e6aa412c557d7fedd59dc1124..1793752d97d72c82ce237b461165d5a06c44587e:/src/cluster.c diff --git a/src/cluster.c b/src/cluster.c index 783c658d..6d117aca 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -11,6 +11,9 @@ void clusterSendFail(char *nodename); void clusterUpdateState(void); int clusterNodeGetSlotBit(clusterNode *n, int slot); sds clusterGenNodesDescription(void); +clusterNode *clusterLookupNode(char *name); +int clusterNodeAddSlave(clusterNode *master, clusterNode *slave); +int clusterAddSlot(clusterNode *n, int slot); /* ----------------------------------------------------------------------------- * Initialization @@ -34,11 +37,97 @@ void clusterGetRandomName(char *p) { int clusterLoadConfig(char *filename) { FILE *fp = fopen(filename,"r"); + char *line; + int maxline, j; - return REDIS_ERR; if (fp == NULL) return REDIS_ERR; + + /* Parse the file. Note that single liens of the cluster config file can + * be really long as they include all the hash slots of the node. + * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers. + * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */ + maxline = 1024+REDIS_CLUSTER_SLOTS*16; + line = zmalloc(maxline); + while(fgets(line,maxline,fp) != NULL) { + int argc; + sds *argv = sdssplitargs(line,&argc); + clusterNode *n, *master; + char *p, *s; + + /* Create this node if it does not exist */ + n = clusterLookupNode(argv[0]); + if (!n) { + n = createClusterNode(argv[0],0); + clusterAddNode(n); + } + /* Address and port */ + if ((p = strchr(argv[1],':')) == NULL) goto fmterr; + *p = '\0'; + memcpy(n->ip,argv[1],strlen(argv[1])+1); + n->port = atoi(p+1); + + /* Parse flags */ + p = s = argv[2]; + while(p) { + p = strchr(s,','); + if (p) *p = '\0'; + if (!strcasecmp(s,"myself")) { + redisAssert(server.cluster.myself == NULL); + server.cluster.myself = n; + n->flags |= REDIS_NODE_MYSELF; + } else if (!strcasecmp(s,"master")) { + n->flags |= REDIS_NODE_MASTER; + } else if (!strcasecmp(s,"slave")) { + n->flags |= REDIS_NODE_SLAVE; + } else if (!strcasecmp(s,"fail?")) { + n->flags |= REDIS_NODE_PFAIL; + } else if (!strcasecmp(s,"fail")) { + n->flags |= REDIS_NODE_FAIL; + } else if (!strcasecmp(s,"handshake")) { + n->flags |= REDIS_NODE_HANDSHAKE; + } else if (!strcasecmp(s,"noaddr")) { + n->flags |= REDIS_NODE_NOADDR; + } else if (!strcasecmp(s,"noflags")) { + /* nothing to do */ + } else { + redisPanic("Unknown flag in redis cluster config file"); + } + if (p) s = p+1; + } + + /* Get master if any. Set the master and populate master's + * slave list. */ + if (argv[3][0] != '-') { + master = clusterLookupNode(argv[3]); + if (!master) { + master = createClusterNode(argv[3],0); + clusterAddNode(master); + } + n->slaveof = master; + clusterNodeAddSlave(master,n); + } + + /* Populate hash slots served by this instance. */ + for (j = 7; j < argc; j++) { + int start, stop; + + if ((p = strchr(argv[j],'-')) != NULL) { + *p = '\0'; + start = atoi(argv[j]); + stop = atoi(p+1); + } else { + start = stop = atoi(argv[j]); + } + while(start <= stop) clusterAddSlot(n, start++); + } + + sdssplitargs_free(argv,argc); + } + zfree(line); fclose(fp); + /* Config sanity check */ + redisAssert(server.cluster.myself != NULL); redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s", server.cluster.myself->name); return REDIS_OK; @@ -57,8 +146,8 @@ int clusterSaveConfig(void) { sds ci = clusterGenNodesDescription(); int fd; - if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT,0644)) == -1) - goto err; + if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644)) + == -1) goto err; if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err; close(fd); sdsfree(ci); @@ -79,7 +168,7 @@ void clusterSaveConfigOrDie(void) { void clusterInit(void) { int saveconf = 0; - server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF); + server.cluster.myself = NULL; server.cluster.state = REDIS_CLUSTER_FAIL; server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL); server.cluster.node_timeout = 15; @@ -92,6 +181,7 @@ void clusterInit(void) { if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) { /* No configuration found. We will just use the random name provided * by the createClusterNode() function. */ + server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF); redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s", server.cluster.myself->name); clusterAddNode(server.cluster.myself); @@ -337,6 +427,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { /* Broadcast the failing node name to everybody */ clusterSendFail(node->name); clusterUpdateState(); + clusterSaveConfigOrDie(); } } else { /* If it's not in NOADDR state and we don't have it, we @@ -415,6 +506,7 @@ int clusterProcessPacket(clusterLink *link) { sender = clusterLookupNode(hdr->sender); if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { + int update_config = 0; redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node); /* Add this node if it is new for us and the msg type is MEET. @@ -428,6 +520,7 @@ int clusterProcessPacket(clusterLink *link) { nodeIp2String(node->ip,link); node->port = ntohs(hdr->port); clusterAddNode(node); + update_config = 1; } /* Get info from the gossip section */ @@ -435,8 +528,12 @@ int clusterProcessPacket(clusterLink *link) { /* Anyway reply with a PONG */ clusterSendPing(link,CLUSTERMSG_TYPE_PONG); + + /* Update config if needed */ + if (update_config) clusterSaveConfigOrDie(); } else if (type == CLUSTERMSG_TYPE_PONG) { - int update = 0; + int update_state = 0; + int update_config = 0; redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node); if (link->node) { @@ -457,6 +554,7 @@ int clusterProcessPacket(clusterLink *link) { redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.", link->node->name); link->node->flags &= ~REDIS_NODE_HANDSHAKE; + update_config = 1; } else if (memcmp(link->node->name,hdr->sender, REDIS_CLUSTER_NAMELEN) != 0) { @@ -466,6 +564,7 @@ int clusterProcessPacket(clusterLink *link) { redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID"); link->node->flags |= REDIS_NODE_NOADDR; freeClusterLink(link); + update_config = 1; /* FIXME: remove this node if we already have it. * * If we already have it but the IP is different, use @@ -511,7 +610,7 @@ int clusterProcessPacket(clusterLink *link) { server.cluster.slots[j]->flags & REDIS_NODE_FAIL) { server.cluster.slots[j] = sender; - update = 1; + update_state = update_config = 1; } } } @@ -522,7 +621,8 @@ int clusterProcessPacket(clusterLink *link) { clusterProcessGossipSection(hdr,link); /* Update the cluster state if needed */ - if (update) clusterUpdateState(); + if (update_state) clusterUpdateState(); + if (update_config) clusterSaveConfigOrDie(); } else if (type == CLUSTERMSG_TYPE_FAIL && sender) { clusterNode *failing; @@ -534,6 +634,7 @@ int clusterProcessPacket(clusterLink *link) { failing->flags |= REDIS_NODE_FAIL; failing->flags &= ~REDIS_NODE_PFAIL; clusterUpdateState(); + clusterSaveConfigOrDie(); } } else { redisLog(REDIS_NOTICE,"Received unknown packet type: %d", type); @@ -784,7 +885,7 @@ void clusterCron(void) { * normal PING packets. */ node->flags &= ~REDIS_NODE_MEET; - redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d\n", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); + redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR); } } dictReleaseIterator(di); @@ -967,8 +1068,8 @@ sds clusterGenNodesDescription(void) { start = -1; } } + ci = sdscatlen(ci,"\n",1); } - ci = sdscatlen(ci,"\n",1); dictReleaseIterator(di); return ci; } @@ -1047,6 +1148,7 @@ void clusterCommand(redisClient *c) { } zfree(slots); clusterUpdateState(); + clusterSaveConfigOrDie(); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { char *statestr[] = {"ok","fail","needhelp"};