]> git.saurik.com Git - redis.git/commitdiff
Redis Cluster: process node to node CLUSTERMSG_TYPE_PUBLISH messages and send it...
authorantirez <antirez@gmail.com>
Fri, 7 Oct 2011 14:34:16 +0000 (16:34 +0200)
committerantirez <antirez@gmail.com>
Fri, 7 Oct 2011 14:34:16 +0000 (16:34 +0200)
src/cluster.c
src/redis.h

index cfd891b4f573c229ee69da5a876b0e9f9bbca210..c2d85e0654f379552aaf56b0971be9aeb5fef9b5 100644 (file)
@@ -513,6 +513,8 @@ int clusterProcessPacket(clusterLink *link) {
 
     redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes",
         type, (unsigned long) totlen);
+
+    /* Perform sanity checks */
     if (totlen < 8) return 1;
     if (totlen > sdslen(link->rcvbuf)) return 1;
     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
@@ -531,7 +533,16 @@ int clusterProcessPacket(clusterLink *link) {
         explen += sizeof(clusterMsgDataFail);
         if (totlen != explen) return 1;
     }
+    if (type == CLUSTERMSG_TYPE_PUBLISH) {
+        uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
+
+        explen += sizeof(clusterMsgDataPublish) +
+                ntohl(hdr->data.publish.msg.channel_len) +
+                ntohl(hdr->data.publish.msg.message_len);
+        if (totlen != explen) return 1;
+    }
 
+    /* Ready to process the packet. Dispatch by type. */
     sender = clusterLookupNode(hdr->sender);
     if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
         int update_config = 0;
@@ -665,6 +676,22 @@ int clusterProcessPacket(clusterLink *link) {
             clusterUpdateState();
             clusterSaveConfigOrDie();
         }
+    } else if (type == CLUSTERMSG_TYPE_PUBLISH) {
+        robj *channel, *message;
+        uint32_t channel_len, message_len;
+
+        /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */
+        if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) {
+            channel_len = ntohl(hdr->data.publish.msg.channel_len);
+            message_len = ntohl(hdr->data.publish.msg.message_len);
+            channel = createStringObject(
+                        (char*)hdr->data.publish.msg.bulk_data,channel_len);
+            message = createStringObject(
+                        (char*)hdr->data.publish.msg.bulk_data+channel_len, message_len);
+            pubsubPublishMessage(channel,message);
+            decrRefCount(channel);
+            decrRefCount(message);
+        }
     } else {
         redisLog(REDIS_WARNING,"Received unknown packet type: %d", type);
     }
index e6058caada7f156a69f79e5975b1223eed3f2508..6b33d128f61d4886d65ad8e83736b595ca1f6a42 100644 (file)
@@ -943,6 +943,7 @@ int pubsubUnsubscribeAllChannels(redisClient *c, int notify);
 int pubsubUnsubscribeAllPatterns(redisClient *c, int notify);
 void freePubsubPattern(void *p);
 int listMatchPubsubPattern(void *a, void *b);
+int pubsubPublishMessage(robj *channel, robj *message);
 
 /* Configuration */
 void loadServerConfig(char *filename);