From d38ef520852af3268dde9d254f04a000856f9b3c Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 7 Oct 2011 16:34:16 +0200 Subject: [PATCH] Redis Cluster: process node to node CLUSTERMSG_TYPE_PUBLISH messages and send it to the local clients. --- src/cluster.c | 27 +++++++++++++++++++++++++++ src/redis.h | 1 + 2 files changed, 28 insertions(+) diff --git a/src/cluster.c b/src/cluster.c index cfd891b4..c2d85e06 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -513,6 +513,8 @@ int clusterProcessPacket(clusterLink *link) { redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", type, (unsigned long) totlen); + + /* Perform sanity checks */ if (totlen < 8) return 1; if (totlen > sdslen(link->rcvbuf)) return 1; if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || @@ -531,7 +533,16 @@ int clusterProcessPacket(clusterLink *link) { explen += sizeof(clusterMsgDataFail); if (totlen != explen) return 1; } + if (type == CLUSTERMSG_TYPE_PUBLISH) { + uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + + explen += sizeof(clusterMsgDataPublish) + + ntohl(hdr->data.publish.msg.channel_len) + + ntohl(hdr->data.publish.msg.message_len); + if (totlen != explen) return 1; + } + /* Ready to process the packet. Dispatch by type. */ sender = clusterLookupNode(hdr->sender); if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { int update_config = 0; @@ -665,6 +676,22 @@ int clusterProcessPacket(clusterLink *link) { clusterUpdateState(); clusterSaveConfigOrDie(); } + } else if (type == CLUSTERMSG_TYPE_PUBLISH) { + robj *channel, *message; + uint32_t channel_len, message_len; + + /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */ + if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) { + channel_len = ntohl(hdr->data.publish.msg.channel_len); + message_len = ntohl(hdr->data.publish.msg.message_len); + channel = createStringObject( + (char*)hdr->data.publish.msg.bulk_data,channel_len); + message = createStringObject( + (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len); + pubsubPublishMessage(channel,message); + decrRefCount(channel); + decrRefCount(message); + } } else { redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); } diff --git a/src/redis.h b/src/redis.h index e6058caa..6b33d128 100644 --- a/src/redis.h +++ b/src/redis.h @@ -943,6 +943,7 @@ int pubsubUnsubscribeAllChannels(redisClient *c, int notify); int pubsubUnsubscribeAllPatterns(redisClient *c, int notify); void freePubsubPattern(void *p); int listMatchPubsubPattern(void *a, void *b); +int pubsubPublishMessage(robj *channel, robj *message); /* Configuration */ void loadServerConfig(char *filename); -- 2.45.2