X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/c563ce463b169c768ced856167d3ee09652207b1..9157549fad46025548b6f500a0202d2720779524:/src/cluster.c diff --git a/src/cluster.c b/src/cluster.c index cfd891b4..f76e8ff5 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -19,20 +19,6 @@ int clusterAddSlot(clusterNode *n, int slot); * Initialization * -------------------------------------------------------------------------- */ -void clusterGetRandomName(char *p) { - FILE *fp = fopen("/dev/urandom","r"); - char *charset = "0123456789abcdef"; - int j; - - if (fp == NULL || fread(p,REDIS_CLUSTER_NAMELEN,1,fp) == 0) { - for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++) - p[j] = rand(); - } - for (j = 0; j < REDIS_CLUSTER_NAMELEN; j++) - p[j] = charset[p[j] & 0x0F]; - fclose(fp); -} - int clusterLoadConfig(char *filename) { FILE *fp = fopen(filename,"r"); char *line; @@ -304,7 +290,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { if (nodename) memcpy(node->name, nodename, REDIS_CLUSTER_NAMELEN); else - clusterGetRandomName(node->name); + getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN); node->flags = flags; memset(node->slots,0,sizeof(node->slots)); node->numslaves = 0; @@ -377,7 +363,7 @@ clusterNode *clusterLookupNode(char *name) { de = dictFind(server.cluster.nodes,s); sdsfree(s); if (de == NULL) return NULL; - return dictGetEntryVal(de); + return dictGetVal(de); } /* This is only used after the handshake. When we connect a given IP/PORT @@ -439,7 +425,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { * time PONG figure if it is newer than our figure. * Note that it's not a problem if we have a PING already * in progress against this node. */ - if (node->pong_received < ntohl(g->pong_received)) { + if (node->pong_received < (signed) ntohl(g->pong_received)) { redisLog(REDIS_DEBUG,"Node pong_received updated by gossip"); node->pong_received = ntohl(g->pong_received); } @@ -513,6 +499,8 @@ int clusterProcessPacket(clusterLink *link) { redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", type, (unsigned long) totlen); + + /* Perform sanity checks */ if (totlen < 8) return 1; if (totlen > sdslen(link->rcvbuf)) return 1; if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || @@ -531,7 +519,16 @@ int clusterProcessPacket(clusterLink *link) { explen += sizeof(clusterMsgDataFail); if (totlen != explen) return 1; } + if (type == CLUSTERMSG_TYPE_PUBLISH) { + uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + + explen += sizeof(clusterMsgDataPublish) + + ntohl(hdr->data.publish.msg.channel_len) + + ntohl(hdr->data.publish.msg.message_len); + if (totlen != explen) return 1; + } + /* Ready to process the packet. Dispatch by type. */ sender = clusterLookupNode(hdr->sender); if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { int update_config = 0; @@ -602,7 +599,7 @@ int clusterProcessPacket(clusterLink *link) { } } /* Update our info about the node */ - link->node->pong_received = time(NULL); + if (link->node) link->node->pong_received = time(NULL); /* Update master/slave info */ if (sender) { @@ -665,6 +662,22 @@ int clusterProcessPacket(clusterLink *link) { clusterUpdateState(); clusterSaveConfigOrDie(); } + } else if (type == CLUSTERMSG_TYPE_PUBLISH) { + robj *channel, *message; + uint32_t channel_len, message_len; + + /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */ + if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) { + channel_len = ntohl(hdr->data.publish.msg.channel_len); + message_len = ntohl(hdr->data.publish.msg.message_len); + channel = createStringObject( + (char*)hdr->data.publish.msg.bulk_data,channel_len); + message = createStringObject( + (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len); + pubsubPublishMessage(channel,message); + decrRefCount(channel); + decrRefCount(message); + } } else { redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); } @@ -766,7 +779,7 @@ void clusterBroadcastMessage(void *buf, size_t len) { di = dictGetIterator(server.cluster.nodes); while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + clusterNode *node = dictGetVal(de); if (!node->link) continue; if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue; @@ -777,7 +790,7 @@ void clusterBroadcastMessage(void *buf, size_t len) { /* Build the message header */ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { - int totlen; + int totlen = 0; memset(hdr,0,sizeof(*hdr)); hdr->type = htons(type); @@ -822,7 +835,7 @@ void clusterSendPing(clusterLink *link, int type) { /* Populate the gossip fields */ while(freshnodes > 0 && gossipcount < 3) { struct dictEntry *de = dictGetRandomKey(server.cluster.nodes); - clusterNode *this = dictGetEntryVal(de); + clusterNode *this = dictGetVal(de); clusterMsgDataGossip *gossip; int j; @@ -943,7 +956,7 @@ void clusterCron(void) { /* Check if we have disconnected nodes and reestablish the connection. */ di = dictGetIterator(server.cluster.nodes); while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + clusterNode *node = dictGetVal(de); if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue; if (node->link == NULL) { @@ -978,7 +991,7 @@ void clusterCron(void) { * the oldest ping_sent time */ for (j = 0; j < 5; j++) { de = dictGetRandomKey(server.cluster.nodes); - clusterNode *this = dictGetEntryVal(de); + clusterNode *this = dictGetVal(de); if (this->link == NULL) continue; if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue; @@ -995,7 +1008,7 @@ void clusterCron(void) { /* Iterate nodes to check if we need to flag something as failing */ di = dictGetIterator(server.cluster.nodes); while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + clusterNode *node = dictGetVal(de); int delay; if (node->flags & @@ -1126,7 +1139,7 @@ sds clusterGenNodesDescription(void) { di = dictGetIterator(server.cluster.nodes); while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + clusterNode *node = dictGetVal(de); /* Node coordinates */ ci = sdscatprintf(ci,"%.40s %s:%d ", @@ -1248,7 +1261,10 @@ void clusterCommand(redisClient *c) { addReplyBulk(c,o); decrRefCount(o); } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") || - !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) { + !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) + { + /* CLUSTER ADDSLOTS [slot] ... */ + /* CLUSTER DELSLOTS [slot] ... */ int j, slot; unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS); int del = !strcasecmp(c->argv[1]->ptr,"delslots"); @@ -1476,6 +1492,7 @@ void restoreCommand(redisClient *c) { /* Create the key and set the TTL if any */ dbAdd(c->db,c->argv[1],obj); if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl); + signalModifiedKey(c->db,c->argv[1]); addReply(c,shared.ok); server.dirty++; } @@ -1500,7 +1517,7 @@ void migrateCommand(redisClient *c) { * nothing to migrate (for instance the key expired in the meantime), but * we include such information in the reply string. */ if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) { - addReplySds(c,sdsnew("+NOKEY")); + addReplySds(c,sdsnew("+NOKEY\r\n")); return; } @@ -1568,11 +1585,12 @@ void migrateCommand(redisClient *c) { robj *aux; dbDelete(c->db,c->argv[3]); + signalModifiedKey(c->db,c->argv[3]); addReply(c,shared.ok); server.dirty++; /* Translate MIGRATE as DEL for replication/AOF. */ - aux = createStringObject("DEL",2); + aux = createStringObject("DEL",3); rewriteClientCommandVector(c,2,aux,c->argv[3]); decrRefCount(aux); } @@ -1627,6 +1645,19 @@ void dumpCommand(redisClient *c) { return; } +/* The ASKING command is required after a -ASK redirection. + * The client should issue ASKING before to actualy send the command to + * the target instance. See the Redis Cluster specification for more + * information. */ +void askingCommand(redisClient *c) { + if (server.cluster_enabled == 0) { + addReplyError(c,"This instance has cluster support disabled"); + return; + } + c->flags |= REDIS_ASKING; + addReply(c,shared.ok); +} + /* ----------------------------------------------------------------------------- * Cluster functions related to serving / redirecting clients * -------------------------------------------------------------------------- */ @@ -1720,9 +1751,12 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg } /* Handle the case in which we are receiving this hash slot from * another instance, so we'll accept the query even if in the table - * it is assigned to a different node. */ - if (server.cluster.importing_slots_from[slot] != NULL) + * it is assigned to a different node, but only if the client + * issued an ASKING command before. */ + if (server.cluster.importing_slots_from[slot] != NULL && + c->flags & REDIS_ASKING) { return server.cluster.myself; + } /* It's not a -ASK case. Base case: just return the right node. */ return n; }