X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/3b5289a04c474ce66df2ef410e053795b1f0f1d2..b4fb720b1014a05fb1e3238d1b7b73883e20cb00:/src/cluster.c?ds=inline diff --git a/src/cluster.c b/src/cluster.c index ed07a84f..4ccff657 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -377,7 +377,7 @@ clusterNode *clusterLookupNode(char *name) { de = dictFind(server.cluster.nodes,s); sdsfree(s); if (de == NULL) return NULL; - return dictGetEntryVal(de); + return dictGetVal(de); } /* This is only used after the handshake. When we connect a given IP/PORT @@ -439,7 +439,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { * time PONG figure if it is newer than our figure. * Note that it's not a problem if we have a PING already * in progress against this node. */ - if (node->pong_received < ntohl(g->pong_received)) { + if (node->pong_received < (signed) ntohl(g->pong_received)) { redisLog(REDIS_DEBUG,"Node pong_received updated by gossip"); node->pong_received = ntohl(g->pong_received); } @@ -493,6 +493,7 @@ void nodeIp2String(char *buf, clusterLink *link) { /* Update the node address to the IP address that can be extracted * from link->fd, and at the specified port. */ void nodeUpdateAddress(clusterNode *node, clusterLink *link, int port) { + /* TODO */ } /* When this function is called, there is a packet to process starting @@ -510,8 +511,10 @@ int clusterProcessPacket(clusterLink *link) { uint16_t type = ntohs(hdr->type); clusterNode *sender; - redisLog(REDIS_DEBUG,"--- packet to process %lu bytes (%lu) ---", - (unsigned long) totlen, sdslen(link->rcvbuf)); + redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", + type, (unsigned long) totlen); + + /* Perform sanity checks */ if (totlen < 8) return 1; if (totlen > sdslen(link->rcvbuf)) return 1; if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || @@ -530,7 +533,16 @@ int clusterProcessPacket(clusterLink *link) { explen += sizeof(clusterMsgDataFail); if (totlen != explen) return 1; } + if (type == CLUSTERMSG_TYPE_PUBLISH) { + uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + + explen += sizeof(clusterMsgDataPublish) + + ntohl(hdr->data.publish.msg.channel_len) + + ntohl(hdr->data.publish.msg.message_len); + if (totlen != explen) return 1; + } + /* Ready to process the packet. Dispatch by type. */ sender = clusterLookupNode(hdr->sender); if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { int update_config = 0; @@ -664,8 +676,24 @@ int clusterProcessPacket(clusterLink *link) { clusterUpdateState(); clusterSaveConfigOrDie(); } + } else if (type == CLUSTERMSG_TYPE_PUBLISH) { + robj *channel, *message; + uint32_t channel_len, message_len; + + /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */ + if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) { + channel_len = ntohl(hdr->data.publish.msg.channel_len); + message_len = ntohl(hdr->data.publish.msg.message_len); + channel = createStringObject( + (char*)hdr->data.publish.msg.bulk_data,channel_len); + message = createStringObject( + (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len); + pubsubPublishMessage(channel,message); + decrRefCount(channel); + decrRefCount(message); + } } else { - redisLog(REDIS_NOTICE,"Received unknown packet type: %d", type); + redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); } return 1; } @@ -758,6 +786,22 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); } +/* Send a message to all the nodes with a reliable link */ +void clusterBroadcastMessage(void *buf, size_t len) { + dictIterator *di; + dictEntry *de; + + di = dictGetIterator(server.cluster.nodes); + while((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + + if (!node->link) continue; + if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue; + clusterSendMessage(node->link,buf,len); + } + dictReleaseIterator(di); +} + /* Build the message header */ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { int totlen; @@ -805,7 +849,7 @@ void clusterSendPing(clusterLink *link, int type) { /* Populate the gossip fields */ while(freshnodes > 0 && gossipcount < 3) { struct dictEntry *de = dictGetRandomKey(server.cluster.nodes); - clusterNode *this = dictGetEntryVal(de); + clusterNode *this = dictGetVal(de); clusterMsgDataGossip *gossip; int j; @@ -842,20 +886,48 @@ void clusterSendPing(clusterLink *link, int type) { clusterSendMessage(link,buf,totlen); } -/* Send a message to all the nodes with a reliable link */ -void clusterBroadcastMessage(void *buf, size_t len) { - dictIterator *di; - dictEntry *de; +/* Send a PUBLISH message. + * + * If link is NULL, then the message is broadcasted to the whole cluster. */ +void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { + unsigned char buf[4096], *payload; + clusterMsg *hdr = (clusterMsg*) buf; + uint32_t totlen; + uint32_t channel_len, message_len; - di = dictGetIterator(server.cluster.nodes); - while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + channel = getDecodedObject(channel); + message = getDecodedObject(message); + channel_len = sdslen(channel->ptr); + message_len = sdslen(message->ptr); - if (!node->link) continue; - if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue; - clusterSendMessage(node->link,buf,len); + clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH); + totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + totlen += sizeof(clusterMsgDataPublish) + channel_len + message_len; + + hdr->data.publish.msg.channel_len = htonl(channel_len); + hdr->data.publish.msg.message_len = htonl(message_len); + hdr->totlen = htonl(totlen); + + /* Try to use the local buffer if possible */ + if (totlen < sizeof(buf)) { + payload = buf; + } else { + payload = zmalloc(totlen); + hdr = (clusterMsg*) payload; + memcpy(payload,hdr,sizeof(hdr)); } - dictReleaseIterator(di); + memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr)); + memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr), + message->ptr,sdslen(message->ptr)); + + if (link) + clusterSendMessage(link,payload,totlen); + else + clusterBroadcastMessage(payload,totlen); + + decrRefCount(channel); + decrRefCount(message); + if (payload != buf) zfree(payload); } /* Send a FAIL message to all the nodes we are able to contact. @@ -872,6 +944,17 @@ void clusterSendFail(char *nodename) { clusterBroadcastMessage(buf,ntohl(hdr->totlen)); } +/* ----------------------------------------------------------------------------- + * CLUSTER Pub/Sub support + * + * For now we do very little, just propagating PUBLISH messages across the whole + * cluster. In the future we'll try to get smarter and avoiding propagating those + * messages to hosts without receives for a given channel. + * -------------------------------------------------------------------------- */ +void clusterPropagatePublish(robj *channel, robj *message) { + clusterSendPublish(NULL, channel, message); +} + /* ----------------------------------------------------------------------------- * CLUSTER cron job * -------------------------------------------------------------------------- */ @@ -887,7 +970,7 @@ void clusterCron(void) { /* Check if we have disconnected nodes and reestablish the connection. */ di = dictGetIterator(server.cluster.nodes); while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + clusterNode *node = dictGetVal(de); if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR)) continue; if (node->link == NULL) { @@ -922,7 +1005,7 @@ void clusterCron(void) { * the oldest ping_sent time */ for (j = 0; j < 5; j++) { de = dictGetRandomKey(server.cluster.nodes); - clusterNode *this = dictGetEntryVal(de); + clusterNode *this = dictGetVal(de); if (this->link == NULL) continue; if (this->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue; @@ -939,7 +1022,7 @@ void clusterCron(void) { /* Iterate nodes to check if we need to flag something as failing */ di = dictGetIterator(server.cluster.nodes); while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + clusterNode *node = dictGetVal(de); int delay; if (node->flags & @@ -1070,7 +1153,7 @@ sds clusterGenNodesDescription(void) { di = dictGetIterator(server.cluster.nodes); while((de = dictNext(di)) != NULL) { - clusterNode *node = dictGetEntryVal(de); + clusterNode *node = dictGetVal(de); /* Node coordinates */ ci = sdscatprintf(ci,"%.40s %s:%d ", @@ -1192,7 +1275,10 @@ void clusterCommand(redisClient *c) { addReplyBulk(c,o); decrRefCount(o); } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") || - !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) { + !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) + { + /* CLUSTER ADDSLOTS [slot] ... */ + /* CLUSTER DELSLOTS [slot] ... */ int j, slot; unsigned char *slots = zmalloc(REDIS_CLUSTER_SLOTS); int del = !strcasecmp(c->argv[1]->ptr,"delslots"); @@ -1232,7 +1318,7 @@ void clusterCommand(redisClient *c) { retval = del ? clusterDelSlot(j) : clusterAddSlot(server.cluster.myself,j); - redisAssert(retval == REDIS_OK); + redisAssertWithInfo(c,NULL,retval == REDIS_OK); } } zfree(slots); @@ -1276,7 +1362,7 @@ void clusterCommand(redisClient *c) { /* CLUSTER SETSLOT STABLE */ server.cluster.importing_slots_from[slot] = NULL; server.cluster.migrating_slots_to[slot] = NULL; - } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 4) { + } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) { /* CLUSTER SETSLOT NODE */ clusterNode *n = clusterLookupNode(c->argv[4]->ptr); @@ -1293,7 +1379,7 @@ void clusterCommand(redisClient *c) { keys = zmalloc(sizeof(robj*)*1); numkeys = GetKeysInSlot(slot, keys, 1); zfree(keys); - if (numkeys == 0) { + if (numkeys != 0) { addReplyErrorFormat(c, "Can't assign hashslot %d to a different node while I still hold keys for this hash slot.", slot); return; } @@ -1305,6 +1391,11 @@ void clusterCommand(redisClient *c) { server.cluster.migrating_slots_to[slot]) server.cluster.migrating_slots_to[slot] = NULL; + /* If this node was importing this slot, assigning the slot to + * itself also clears the importing status. */ + if (n == server.cluster.myself && server.cluster.importing_slots_from[slot]) + server.cluster.importing_slots_from[slot] = NULL; + clusterDelSlot(slot); clusterAddSlot(n,slot); } else { @@ -1415,7 +1506,9 @@ void restoreCommand(redisClient *c) { /* Create the key and set the TTL if any */ dbAdd(c->db,c->argv[1],obj); if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl); + signalModifiedKey(c->db,c->argv[1]); addReply(c,shared.ok); + server.dirty++; } /* MIGRATE host port key dbid timeout */ @@ -1438,7 +1531,7 @@ void migrateCommand(redisClient *c) { * nothing to migrate (for instance the key expired in the meantime), but * we include such information in the reply string. */ if ((o = lookupKeyRead(c->db,c->argv[3])) == NULL) { - addReplySds(c,sdsnew("+NOKEY")); + addReplySds(c,sdsnew("+NOKEY\r\n")); return; } @@ -1456,23 +1549,23 @@ void migrateCommand(redisClient *c) { } rioInitWithBuffer(&cmd,sdsempty()); - redisAssert(rioWriteBulkCount(&cmd,'*',2)); - redisAssert(rioWriteBulkString(&cmd,"SELECT",6)); - redisAssert(rioWriteBulkLongLong(&cmd,dbid)); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); + redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); ttl = getExpire(c->db,c->argv[3]); - redisAssert(rioWriteBulkCount(&cmd,'*',4)); - redisAssert(rioWriteBulkString(&cmd,"RESTORE",7)); - redisAssert(c->argv[3]->encoding == REDIS_ENCODING_RAW); - redisAssert(rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); - redisAssert(rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl)); + redisAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',4)); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); + redisAssertWithInfo(c,NULL,c->argv[3]->encoding == REDIS_ENCODING_RAW); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); + redisAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl)); /* Finally the last argument that is the serailized object payload * in the form: . */ rioInitWithBuffer(&payload,sdsempty()); - redisAssert(rdbSaveObjectType(&payload,o)); - redisAssert(rdbSaveObject(&payload,o) != -1); - redisAssert(rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr))); + redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); + redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o) != -1); + redisAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr))); sdsfree(payload.io.buffer.ptr); /* Tranfer the query to the other node in 64K chunks. */ @@ -1503,8 +1596,17 @@ void migrateCommand(redisClient *c) { addReplyErrorFormat(c,"Target instance replied with error: %s", (buf1[0] == '-') ? buf1+1 : buf2+1); } else { + robj *aux; + dbDelete(c->db,c->argv[3]); + signalModifiedKey(c->db,c->argv[3]); addReply(c,shared.ok); + server.dirty++; + + /* Translate MIGRATE as DEL for replication/AOF. */ + aux = createStringObject("DEL",3); + rewriteClientCommandVector(c,2,aux,c->argv[3]); + decrRefCount(aux); } } @@ -1547,8 +1649,8 @@ void dumpCommand(redisClient *c) { /* Serialize the object in a RDB-like format. It consist of an object type * byte followed by the serialized object. This is understood by RESTORE. */ rioInitWithBuffer(&payload,sdsempty()); - redisAssert(rdbSaveObjectType(&payload,o)); - redisAssert(rdbSaveObject(&payload,o)); + redisAssertWithInfo(c,NULL,rdbSaveObjectType(&payload,o)); + redisAssertWithInfo(c,NULL,rdbSaveObject(&payload,o)); /* Transfer to the client */ dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr); @@ -1557,6 +1659,19 @@ void dumpCommand(redisClient *c) { return; } +/* The ASKING command is required after a -ASK redirection. + * The client should issue ASKING before to actualy send the command to + * the target instance. See the Redis Cluster specification for more + * information. */ +void askingCommand(redisClient *c) { + if (server.cluster_enabled == 0) { + addReplyError(c,"This instance has cluster support disabled"); + return; + } + c->flags |= REDIS_ASKING; + addReply(c,shared.ok); +} + /* ----------------------------------------------------------------------------- * Cluster functions related to serving / redirecting clients * -------------------------------------------------------------------------- */ @@ -1618,7 +1733,7 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg slot = keyHashSlot((char*)firstkey->ptr, sdslen(firstkey->ptr)); n = server.cluster.slots[slot]; - redisAssert(n != NULL); + redisAssertWithInfo(c,firstkey,n != NULL); } else { /* If it is not the first key, make sure it is exactly * the same key as the first we saw. */ @@ -1650,9 +1765,12 @@ clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **arg } /* Handle the case in which we are receiving this hash slot from * another instance, so we'll accept the query even if in the table - * it is assigned to a different node. */ - if (server.cluster.importing_slots_from[slot] != NULL) + * it is assigned to a different node, but only if the client + * issued an ASKING command before. */ + if (server.cluster.importing_slots_from[slot] != NULL && + c->flags & REDIS_ASKING) { return server.cluster.myself; + } /* It's not a -ASK case. Base case: just return the right node. */ return n; }