#include "redis.h"
#include <arpa/inet.h>
+#include <fcntl.h>
+#include <unistd.h>
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void clusterSendFail(char *nodename);
void clusterUpdateState(void);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
+sds clusterGenNodesDescription(void);
/* -----------------------------------------------------------------------------
* Initialization
int clusterLoadConfig(char *filename) {
FILE *fp = fopen(filename,"r");
-
+
+ return REDIS_ERR;
if (fp == NULL) return REDIS_ERR;
fclose(fp);
return REDIS_OK;
fmterr:
- redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster.conf file.");
+ redisLog(REDIS_WARNING,"Unrecovarable error: corrupted cluster config file.");
fclose(fp);
exit(1);
}
+/* Cluster node configuration is exactly the same as CLUSTER NODES output.
+ *
+ * This function writes the node config and returns 0, on error -1
+ * is returned. */
+int clusterSaveConfig(void) {
+ sds ci = clusterGenNodesDescription();
+ int fd;
+
+ if ((fd = open(server.cluster.configfile,O_WRONLY|O_CREAT,0644)) == -1)
+ goto err;
+ if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err;
+ close(fd);
+ sdsfree(ci);
+ return 0;
+
+err:
+ sdsfree(ci);
+ return -1;
+}
+
+void clusterSaveConfigOrDie(void) {
+ if (clusterSaveConfig() == -1) {
+ redisLog(REDIS_WARNING,"Fatal: can't update cluster config file.");
+ exit(1);
+ }
+}
+
void clusterInit(void) {
+ int saveconf = 0;
+
server.cluster.myself = createClusterNode(NULL,REDIS_NODE_MYSELF);
server.cluster.state = REDIS_CLUSTER_FAIL;
server.cluster.nodes = dictCreate(&clusterNodesDictType,NULL);
sizeof(server.cluster.importing_slots_from));
memset(server.cluster.slots,0,
sizeof(server.cluster.slots));
- if (clusterLoadConfig("cluster.conf") == REDIS_ERR) {
+ if (clusterLoadConfig(server.cluster.configfile) == REDIS_ERR) {
/* No configuration found. We will just use the random name provided
* by the createClusterNode() function. */
redisLog(REDIS_NOTICE,"No cluster configuration found, I'm %.40s",
server.cluster.myself->name);
+ saveconf = 1;
}
clusterAddNode(server.cluster.myself);
+ if (saveconf) clusterSaveConfigOrDie();
/* We need a listening TCP port for our cluster messaging needs */
server.cfd = anetTcpServer(server.neterr,
server.port+REDIS_CLUSTER_PORT_INCR, server.bindaddr);
* CLUSTER command
* -------------------------------------------------------------------------- */
+sds clusterGenNodesDescription(void) {
+ sds ci = sdsempty();
+ dictIterator *di;
+ dictEntry *de;
+ int j, start;
+
+ di = dictGetIterator(server.cluster.nodes);
+ while((de = dictNext(di)) != NULL) {
+ clusterNode *node = dictGetEntryVal(de);
+
+ /* Node coordinates */
+ ci = sdscatprintf(ci,"%.40s %s:%d ",
+ node->name,
+ node->ip,
+ node->port);
+
+ /* Flags */
+ if (node->flags == 0) ci = sdscat(ci,"noflags,");
+ if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
+ if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
+ if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
+ if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
+ if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
+ if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
+ if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
+ if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
+
+ /* Slave of... or just "-" */
+ if (node->slaveof)
+ ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
+ else
+ ci = sdscatprintf(ci,"- ");
+
+ /* Latency from the POV of this node, link status */
+ ci = sdscatprintf(ci,"%ld %ld %s",
+ (long) node->ping_sent,
+ (long) node->pong_received,
+ node->link ? "connected" : "disconnected");
+
+ /* Slots served by this instance */
+ start = -1;
+ for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
+ int bit;
+
+ if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {
+ if (start == -1) start = j;
+ }
+ if (start != -1 && (!bit || j == REDIS_CLUSTER_SLOTS-1)) {
+ if (j == REDIS_CLUSTER_SLOTS-1) j++;
+
+ if (start == j-1) {
+ ci = sdscatprintf(ci," %d",start);
+ } else {
+ ci = sdscatprintf(ci," %d-%d",start,j-1);
+ }
+ start = -1;
+ }
+ }
+ }
+ ci = sdscatlen(ci,"\n",1);
+ dictReleaseIterator(di);
+ return ci;
+}
+
void clusterCommand(redisClient *c) {
if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
clusterAddNode(n);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
- sds ci = sdsempty();
- dictIterator *di;
- dictEntry *de;
robj *o;
+ sds ci = clusterGenNodesDescription();
- di = dictGetIterator(server.cluster.nodes);
- while((de = dictNext(di)) != NULL) {
- clusterNode *node = dictGetEntryVal(de);
-
- /* Node coordinates */
- ci = sdscatprintf(ci,"%.40s %s:%d ",
- node->name,
- node->ip,
- node->port);
-
- /* Flags */
- if (node->flags == 0) ci = sdscat(ci,"noflags,");
- if (node->flags & REDIS_NODE_MYSELF) ci = sdscat(ci,"myself,");
- if (node->flags & REDIS_NODE_MASTER) ci = sdscat(ci,"master,");
- if (node->flags & REDIS_NODE_SLAVE) ci = sdscat(ci,"slave,");
- if (node->flags & REDIS_NODE_PFAIL) ci = sdscat(ci,"fail?,");
- if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,");
- if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,");
- if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,");
- if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' ';
-
- /* Slave of... or just "-" */
- if (node->slaveof)
- ci = sdscatprintf(ci,"%.40s ",node->slaveof->name);
- else
- ci = sdscatprintf(ci,"- ");
-
- /* Latency from the POV of this node, link status */
- ci = sdscatprintf(ci,"%ld %ld %s\n",
- (long) node->ping_sent,
- (long) node->pong_received,
- node->link ? "connected" : "disconnected");
- }
- dictReleaseIterator(di);
o = createObject(REDIS_STRING,ci);
addReplyBulk(c,o);
decrRefCount(o);