void clusterUpdateState(void);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
sds clusterGenNodesDescription(void);
+clusterNode *clusterLookupNode(char *name);
+int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
+int clusterAddSlot(clusterNode *n, int slot);
/* -----------------------------------------------------------------------------
* Initialization
int clusterLoadConfig(char *filename) {
FILE *fp = fopen(filename,"r");
+ char *line;
+ int maxline, j;
- return REDIS_ERR;
if (fp == NULL) return REDIS_ERR;
+
+ /* Parse the file. Note that single liens of the cluster config file can
+ * be really long as they include all the hash slots of the node.
+ * This means in the worst possible case REDIS_CLUSTER_SLOTS/2 integers.
+ * To simplify we allocate 1024+REDIS_CLUSTER_SLOTS*16 bytes per line. */
+ maxline = 1024+REDIS_CLUSTER_SLOTS*16;
+ line = zmalloc(maxline);
+ while(fgets(line,maxline,fp) != NULL) {
+ int argc;
+ sds *argv = sdssplitargs(line,&argc);
+ clusterNode *n, *master;
+ char *p, *s;
+
+ /* Create this node if it does not exist */
+ n = clusterLookupNode(argv[0]);
+ if (!n) {
+ n = createClusterNode(argv[0],0);
+ clusterAddNode(n);
+ }
+ /* Address and port */
+ if ((p = strchr(argv[1],':')) == NULL) goto fmterr;
+ *p = '\0';
+ memcpy(n->ip,argv[1],strlen(argv[1])+1);
+ n->port = atoi(p+1);
+
+ /* Parse flags */
+ p = s = argv[2];
+ while(p) {
+ p = strchr(s,',');
+ if (p) *p = '\0';
+ if (!strcasecmp(s,"myself")) {
+ redisAssert(server.cluster.myself == NULL);
+ server.cluster.myself = n;
+ n->flags |= REDIS_NODE_MYSELF;
+ } else if (!strcasecmp(s,"master")) {
+ n->flags |= REDIS_NODE_MASTER;
+ } else if (!strcasecmp(s,"slave")) {
+ n->flags |= REDIS_NODE_SLAVE;
+ } else if (!strcasecmp(s,"fail?")) {
+ n->flags |= REDIS_NODE_PFAIL;
+ } else if (!strcasecmp(s,"fail")) {
+ n->flags |= REDIS_NODE_FAIL;
+ } else if (!strcasecmp(s,"handshake")) {
+ n->flags |= REDIS_NODE_HANDSHAKE;
+ } else if (!strcasecmp(s,"noaddr")) {
+ n->flags |= REDIS_NODE_NOADDR;
+ } else if (!strcasecmp(s,"noflags")) {
+ /* nothing to do */
+ } else {
+ redisPanic("Unknown flag in redis cluster config file");
+ }
+ if (p) s = p+1;
+ }
+
+ /* Get master if any. Set the master and populate master's
+ * slave list. */
+ if (argv[3][0] != '-') {
+ master = clusterLookupNode(argv[3]);
+ if (!master) {
+ master = createClusterNode(argv[3],0);
+ clusterAddNode(master);
+ }
+ n->slaveof = master;
+ clusterNodeAddSlave(master,n);
+ }
+
+ /* Populate hash slots served by this instance. */
+ for (j = 7; j < argc; j++) {
+ int start, stop;
+
+ if ((p = strchr(argv[j],'-')) != NULL) {
+ *p = '\0';
+ start = atoi(argv[j]);
+ stop = atoi(p+1);
+ } else {
+ start = stop = atoi(argv[j]);
+ }
+ while(start <= stop) clusterAddSlot(n, start++);
+ }
+
+ sdssplitargs_free(argv,argc);
+ }
+ zfree(line);
fclose(fp);
+ /* Config sanity check */
+ redisAssert(server.cluster.myself != NULL);
redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
server.cluster.myself->name);
return REDIS_OK;
sds ci = clusterGenNodesDescription();
int fd;
- if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT,0644)) == -1)
- goto err;
+ if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT|O_TRUNC,0644))
+ == -1) goto err;
if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
close(fd);
sdsfree(ci);
void clusterInit(void) {
int saveconf = 0;
- server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
+ server.cluster.myself = NULL;
server.cluster.state = REDIS_CLUSTER_FAIL;
server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster.node_timeout = 15;
if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
/* No configuration found. We will just use the random name provided
* by the createClusterNode() function. */
+ server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
server.cluster.myself->name);
clusterAddNode(server.cluster.myself);
/* Broadcast the failing node name to everybody */
clusterSendFail(node->name);
clusterUpdateState();
+ clusterSaveConfigOrDie();
}
} else {
/* If it's not in NOADDR state and we don't have it, we
sender = clusterLookupNode(hdr->sender);
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
+ int update_config = 0;
redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
/* Add this node if it is new for us and the msg type is MEET.
nodeIp2String(node->ip,link);
node->port = ntohs(hdr->port);
clusterAddNode(node);
+ update_config = 1;
}
/* Get info from the gossip section */
/* Anyway reply with a PONG */
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
+
+ /* Update config if needed */
+ if (update_config) clusterSaveConfigOrDie();
} else if (type == CLUSTERMSG_TYPE_PONG) {
- int update = 0;
+ int update_state = 0;
+ int update_config = 0;
redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
if (link->node) {
redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
link->node->name);
link->node->flags &= ~REDIS_NODE_HANDSHAKE;
+ update_config = 1;
} else if (memcmp(link->node->name,hdr->sender,
REDIS_CLUSTER_NAMELEN) != 0)
{
redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
link->node->flags |= REDIS_NODE_NOADDR;
freeClusterLink(link);
+ update_config = 1;
/* FIXME: remove this node if we already have it.
*
* If we already have it but the IP is different, use
server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
{
server.cluster.slots[j] = sender;
- update = 1;
+ update_state = update_config = 1;
}
}
}
clusterProcessGossipSection(hdr,link);
/* Update the cluster state if needed */
- if (update) clusterUpdateState();
+ if (update_state) clusterUpdateState();
+ if (update_config) clusterSaveConfigOrDie();
} else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
clusterNode *failing;
failing->flags |= REDIS_NODE_FAIL;
failing->flags &= ~REDIS_NODE_PFAIL;
clusterUpdateState();
+ clusterSaveConfigOrDie();
}
} else {
redisLog(REDIS_NOTICE,"Received unknown packet type: %d", type);
* normal PING packets. */
node->flags &= ~REDIS_NODE_MEET;
- redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d\n", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
+ redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}
dictReleaseIterator(di);
start = -1;
}
}
+ ci = sdscatlen(ci,"\n",1);
}
- ci = sdscatlen(ci,"\n",1);
dictReleaseIterator(di);
return ci;
}
}
zfree(slots);
clusterUpdateState();
+ clusterSaveConfigOrDie();
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
char *statestr[] = {"ok","fail","needhelp"};