void clusterUpdateState(void);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
sds clusterGenNodesDescription(void);
+clusterNode *clusterLookupNode(char *name);
+int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
+int clusterAddSlot(clusterNode *n, int slot);
/* -----------------------------------------------------------------------------
* Initialization
int clusterLoadConfig(char *filename) {
FILE *fp = fopen(filename,"r");
char *line;
- int maxline;
+ int maxline, j;
if (fp == NULL) return REDIS_ERR;
while(fgets(line,maxline,fp) != NULL) {
int argc;
sds *argv = sdssplitargs(line,&argc);
+ clusterNode *n, *master;
+ char *p, *s;
+
+ /* Create this node if it does not exist */
+ n = clusterLookupNode(argv[0]);
+ if (!n) {
+ n = createClusterNode(argv[0],0);
+ clusterAddNode(n);
+ }
+ /* Address and port */
+ if ((p = strchr(argv[1],':')) == NULL) goto fmterr;
+ *p = '\0';
+ memcpy(n->ip,argv[1],strlen(argv[1])+1);
+ n->port = atoi(p+1);
+
+ /* Parse flags */
+ p = s = argv[2];
+ while(p) {
+ p = strchr(s,',');
+ if (p) *p = '\0';
+ if (!strcasecmp(s,"myself")) {
+ redisAssert(server.cluster.myself == NULL);
+ server.cluster.myself = n;
+ n->flags |= REDIS_NODE_MYSELF;
+ } else if (!strcasecmp(s,"master")) {
+ n->flags |= REDIS_NODE_MASTER;
+ } else if (!strcasecmp(s,"slave")) {
+ n->flags |= REDIS_NODE_SLAVE;
+ } else if (!strcasecmp(s,"fail?")) {
+ n->flags |= REDIS_NODE_PFAIL;
+ } else if (!strcasecmp(s,"fail")) {
+ n->flags |= REDIS_NODE_FAIL;
+ } else if (!strcasecmp(s,"handshake")) {
+ n->flags |= REDIS_NODE_HANDSHAKE;
+ } else if (!strcasecmp(s,"noaddr")) {
+ n->flags |= REDIS_NODE_NOADDR;
+ } else if (!strcasecmp(s,"noflags")) {
+ /* nothing to do */
+ } else {
+ redisPanic("Unknown flag in redis cluster config file");
+ }
+ if (p) s = p+1;
+ }
- printf("Node: %s\n", argv[0]);
+ /* Get master if any. Set the master and populate master's
+ * slave list. */
+ if (argv[3][0] != '-') {
+ master = clusterLookupNode(argv[3]);
+ if (!master) {
+ master = createClusterNode(argv[3],0);
+ clusterAddNode(master);
+ }
+ n->slaveof = master;
+ clusterNodeAddSlave(master,n);
+ }
+
+ /* Populate hash slots served by this instance. */
+ for (j = 7; j < argc; j++) {
+ int start, stop;
+
+ if ((p = strchr(argv[j],'-')) != NULL) {
+ *p = '\0';
+ start = atoi(argv[j]);
+ stop = atoi(p+1);
+ } else {
+ start = stop = atoi(argv[j]);
+ }
+ while(start <= stop) clusterAddSlot(n, start++);
+ }
sdssplitargs_free(argv,argc);
}
fclose(fp);
/* Config sanity check */
- /* TODO: check that myself is set. */
- return REDIS_ERR;
-
+ redisAssert(server.cluster.myself != NULL);
redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
server.cluster.myself->name);
return REDIS_OK;
void clusterInit(void) {
int saveconf = 0;
- server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
+ server.cluster.myself = NULL;
server.cluster.state = REDIS_CLUSTER_FAIL;
server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
server.cluster.node_timeout = 15;
if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
/* No configuration found. We will just use the random name provided
* by the createClusterNode() function. */
+ server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
server.cluster.myself->name);
clusterAddNode(server.cluster.myself);
sender = clusterLookupNode(hdr->sender);
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
+ int update_config = 0;
redisLog(REDIS_DEBUG,"Ping packet received: %p", link->node);
/* Add this node if it is new for us and the msg type is MEET.
nodeIp2String(node->ip,link);
node->port = ntohs(hdr->port);
clusterAddNode(node);
+ update_config = 1;
}
/* Get info from the gossip section */
/* Anyway reply with a PONG */
clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
+
+ /* Update config if needed */
+ if (update_config) clusterSaveConfigOrDie();
} else if (type == CLUSTERMSG_TYPE_PONG) {
- int update = 0;
+ int update_state = 0;
+ int update_config = 0;
redisLog(REDIS_DEBUG,"Pong packet received: %p", link->node);
if (link->node) {
redisLog(REDIS_DEBUG,"Handshake with node %.40s completed.",
link->node->name);
link->node->flags &= ~REDIS_NODE_HANDSHAKE;
+ update_config = 1;
} else if (memcmp(link->node->name,hdr->sender,
REDIS_CLUSTER_NAMELEN) != 0)
{
redisLog(REDIS_DEBUG,"PONG contains mismatching sender ID");
link->node->flags |= REDIS_NODE_NOADDR;
freeClusterLink(link);
+ update_config = 1;
/* FIXME: remove this node if we already have it.
*
* If we already have it but the IP is different, use
server.cluster.slots[j]->flags & REDIS_NODE_FAIL)
{
server.cluster.slots[j] = sender;
- update = 1;
+ update_state = update_config = 1;
}
}
}
clusterProcessGossipSection(hdr,link);
/* Update the cluster state if needed */
- if (update) {
- clusterUpdateState();
- clusterSaveConfigOrDie();
- }
+ if (update_state) clusterUpdateState();
+ if (update_config) clusterSaveConfigOrDie();
} else if (type == CLUSTERMSG_TYPE_FAIL && sender) {
clusterNode *failing;
* normal PING packets. */
node->flags &= ~REDIS_NODE_MEET;
- redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d\n", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
+ redisLog(REDIS_NOTICE,"Connecting with Node %.40s at %s:%d", node->name, node->ip, node->port+REDIS_CLUSTER_PORT_INCR);
}
}
dictReleaseIterator(di);
start = -1;
}
}
+ ci = sdscatlen(ci,"\n",1);
}
- ci = sdscatlen(ci,"\n",1);
dictReleaseIterator(di);
return ci;
}