* Initialization
* -------------------------------------------------------------------------- */
-void clusterGetRandomName(char *p) {
- FILE *fp = fopen("/dev/urandom","r");
- char *charset = "0123456789abcdef";
- int j;
-
- if (fp == NULL || fread(p,REDIS_CLUSTER_NAMELEN,1,fp) == 0) {
- for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
- p[j] = rand();
- }
- for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++)
- p[j] = charset[p[j] & 0x0F];
- fclose(fp);
-}
-
int clusterLoadConfig(char *filename) {
FILE *fp = fopen(filename,"r");
char *line;
if (nodename)
memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN);
else
- clusterGetRandomName(node->name);
+ getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN);
node->flags = flags;
memset(node->slots,0,sizeof(node->slots));
node->numslaves = 0;
de = dictFind(server.cluster.nodes,s);
sdsfree(s);
if (de == NULL) return NULL;
- return dictGetEntryVal(de);
+ return dictGetVal(de);
}
/* This is only used after the handshake. When we connect a given IP/PORT
* time PONG figure if it is newer than our figure.
* Note that it's not a problem if we have a PING already
* in progress against this node. */
- if (node->pong_received < ntohl(g->pong_received)) {
+ if (node->pong_received < (signed) ntohl(g->pong_received)) {
redisLog(REDIS_DEBUG,"Node pong_received updated by gossip");
node->pong_received = ntohl(g->pong_received);
}
redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
type, (unsigned long) totlen);
+
+ /* Perform sanity checks */
if (totlen < 8) return 1;
if (totlen > sdslen(link->rcvbuf)) return 1;
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
explen += sizeof(clusterMsgDataFail);
if (totlen != explen) return 1;
}
+ if (type == CLUSTERMSG_TYPE_PUBLISH) {
+ uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
+
+ explen += sizeof(clusterMsgDataPublish) +
+ ntohl(hdr->data.publish.msg.channel_len) +
+ ntohl(hdr->data.publish.msg.message_len);
+ if (totlen != explen) return 1;
+ }
+ /* Ready to process the packet. Dispatch by type. */
sender = clusterLookupNode(hdr->sender);
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
int update_config = 0;
}
}
/* Update our info about the node */
- link->node->pong_received = time(NULL);
+ if (link->node) link->node->pong_received = time(NULL);
/* Update master/slave info */
if (sender) {
clusterUpdateState();
clusterSaveConfigOrDie();
}
+ } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
+ robj *channel, *message;
+ uint32_t channel_len, message_len;
+
+ /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
+ if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) {
+ channel_len = ntohl(hdr->data.publish.msg.channel_len);
+ message_len = ntohl(hdr->data.publish.msg.message_len);
+ channel = createStringObject(
+ (char*)hdr->data.publish.msg.bulk_data,channel_len);
+ message = createStringObject(
+ (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len);
+ pubsubPublishMessage(channel,message);
+ decrRefCount(channel);
+ decrRefCount(message);
+ }
} else {
redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
}
di = dictGetIterator(server.cluster.nodes);
while((de = dictNext(di)) != NULL) {
- clusterNode *node = dictGetEntryVal(de);
+ clusterNode *node = dictGetVal(de);
if (!node->link) continue;
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
/* Build the message header */
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
- int totlen;
+ int totlen = 0;
memset(hdr,0,sizeof(*hdr));
hdr->type = htons(type);
/* Populate the gossip fields */
while(freshnodes > 0 && gossipcount < 3) {
struct dictEntry *de = dictGetRandomKey(server.cluster.nodes);
- clusterNode *this = dictGetEntryVal(de);
+ clusterNode *this = dictGetVal(de);
clusterMsgDataGossip *gossip;
int j;
/* Check if we have disconnected nodes and reestablish the connection. */
di = dictGetIterator(server.cluster.nodes);
while((de = dictNext(di)) != NULL) {
- clusterNode *node = dictGetEntryVal(de);
+ clusterNode *node = dictGetVal(de);
if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue;
if (node->link == NULL) {
* the oldest ping_sent time */
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster.nodes);
- clusterNode *this = dictGetEntryVal(de);
+ clusterNode *this = dictGetVal(de);
if (this->link == NULL) continue;
if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue;
/* Iterate nodes to check if we need to flag something as failing */
di = dictGetIterator(server.cluster.nodes);
while((de = dictNext(di)) != NULL) {
- clusterNode *node = dictGetEntryVal(de);
+ clusterNode *node = dictGetVal(de);
int delay;
if (node->flags &
di = dictGetIterator(server.cluster.nodes);
while((de = dictNext(di)) != NULL) {
- clusterNode *node = dictGetEntryVal(de);
+ clusterNode *node = dictGetVal(de);
/* Node coordinates */
ci = sdscatprintf(ci,"%.40s %s:%d ",
addReplyBulk(c,o);
decrRefCount(o);
} else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
- !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) {
+ !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
+ {
+ /* CLUSTER ADDSLOTS <slot> [slot] ... */
+ /* CLUSTER DELSLOTS <slot> [slot] ... */
int j, slot;
unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS);
int del = !strcasecmp(c->argv[1]->ptr,"delslots");
/* Create the key and set the TTL if any */
dbAdd(c->db,c->argv[1],obj);
if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
+ signalModifiedKey(c->db,c->argv[1]);
addReply(c,shared.ok);
server.dirty++;
}
* nothing to migrate (for instance the key expired in the meantime), but
* we include such information in the reply string. */
if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) {
- addReplySds(c,sdsnew("+NOKEY"));
+ addReplySds(c,sdsnew("+NOKEY\r\n"));
return;
}
robj *aux;
dbDelete(c->db,c->argv[3]);
+ signalModifiedKey(c->db,c->argv[3]);
addReply(c,shared.ok);
server.dirty++;
/* Translate MIGRATE as DEL for replication/AOF. */
- aux = createStringObject("DEL",2);
+ aux = createStringObject("DEL",3);
rewriteClientCommandVector(c,2,aux,c->argv[3]);
decrRefCount(aux);
}
return;
}
+/* The ASKING command is required after a -ASK redirection.
+ * The client should issue ASKING before to actualy send the command to
+ * the target instance. See the Redis Cluster specification for more
+ * information. */
+void askingCommand(redisClient *c) {
+ if (server.cluster_enabled == 0) {
+ addReplyError(c,"This instance has cluster support disabled");
+ return;
+ }
+ c->flags |= REDIS_ASKING;
+ addReply(c,shared.ok);
+}
+
/* -----------------------------------------------------------------------------
* Cluster functions related to serving / redirecting clients
* -------------------------------------------------------------------------- */
}
/* Handle the case in which we are receiving this hash slot from
* another instance, so we'll accept the query even if in the table
- * it is assigned to a different node. */
- if (server.cluster.importing_slots_from[slot] != NULL)
+ * it is assigned to a different node, but only if the client
+ * issued an ASKING command before. */
+ if (server.cluster.importing_slots_from[slot] != NULL &&
+ c->flags & REDIS_ASKING) {
return server.cluster.myself;
+ }
/* It's not a -ASK case. Base case: just return the right node. */
return n;
}