]> git.saurik.com Git - redis.git/blobdiff - src/pubsub.c
Better syncio.c with millisecond resolution.
[redis.git] / src / pubsub.c
index 728e559c9e4c563067c5fd2ce8417fc9ec0b5061..984013bbc46454ec69b534ddcad5cfe333bf309b 100644 (file)
@@ -36,12 +36,12 @@ int pubsubSubscribeChannel(redisClient *c, robj *channel) {
             dictAdd(server.pubsub_channels,channel,clients);
             incrRefCount(channel);
         } else {
             dictAdd(server.pubsub_channels,channel,clients);
             incrRefCount(channel);
         } else {
-            clients = dictGetEntryVal(de);
+            clients = dictGetVal(de);
         }
         listAddNodeTail(clients,c);
     }
     /* Notify the client */
         }
         listAddNodeTail(clients,c);
     }
     /* Notify the client */
-    addReply(c,shared.mbulk3);
+    addReply(c,shared.mbulkhdr[3]);
     addReply(c,shared.subscribebulk);
     addReplyBulk(c,channel);
     addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
     addReply(c,shared.subscribebulk);
     addReplyBulk(c,channel);
     addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
@@ -63,10 +63,10 @@ int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
         retval = 1;
         /* Remove the client from the channel -> clients list hash table */
         de = dictFind(server.pubsub_channels,channel);
         retval = 1;
         /* Remove the client from the channel -> clients list hash table */
         de = dictFind(server.pubsub_channels,channel);
-        redisAssert(de != NULL);
-        clients = dictGetEntryVal(de);
+        redisAssertWithInfo(c,NULL,de != NULL);
+        clients = dictGetVal(de);
         ln = listSearchKey(clients,c);
         ln = listSearchKey(clients,c);
-        redisAssert(ln != NULL);
+        redisAssertWithInfo(c,NULL,ln != NULL);
         listDelNode(clients,ln);
         if (listLength(clients) == 0) {
             /* Free the list and associated hash entry at all if this was
         listDelNode(clients,ln);
         if (listLength(clients) == 0) {
             /* Free the list and associated hash entry at all if this was
@@ -77,7 +77,7 @@ int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
     }
     /* Notify the client */
     if (notify) {
     }
     /* Notify the client */
     if (notify) {
-        addReply(c,shared.mbulk3);
+        addReply(c,shared.mbulkhdr[3]);
         addReply(c,shared.unsubscribebulk);
         addReplyBulk(c,channel);
         addReplyLongLong(c,dictSize(c->pubsub_channels)+
         addReply(c,shared.unsubscribebulk);
         addReplyBulk(c,channel);
         addReplyLongLong(c,dictSize(c->pubsub_channels)+
@@ -103,7 +103,7 @@ int pubsubSubscribePattern(redisClient *c, robj *pattern) {
         listAddNodeTail(server.pubsub_patterns,pat);
     }
     /* Notify the client */
         listAddNodeTail(server.pubsub_patterns,pat);
     }
     /* Notify the client */
-    addReply(c,shared.mbulk3);
+    addReply(c,shared.mbulkhdr[3]);
     addReply(c,shared.psubscribebulk);
     addReplyBulk(c,pattern);
     addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
     addReply(c,shared.psubscribebulk);
     addReplyBulk(c,pattern);
     addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
@@ -128,7 +128,7 @@ int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
     }
     /* Notify the client */
     if (notify) {
     }
     /* Notify the client */
     if (notify) {
-        addReply(c,shared.mbulk3);
+        addReply(c,shared.mbulkhdr[3]);
         addReply(c,shared.punsubscribebulk);
         addReplyBulk(c,pattern);
         addReplyLongLong(c,dictSize(c->pubsub_channels)+
         addReply(c,shared.punsubscribebulk);
         addReplyBulk(c,pattern);
         addReplyLongLong(c,dictSize(c->pubsub_channels)+
@@ -146,7 +146,7 @@ int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
     int count = 0;
 
     while((de = dictNext(di)) != NULL) {
     int count = 0;
 
     while((de = dictNext(di)) != NULL) {
-        robj *channel = dictGetEntryKey(de);
+        robj *channel = dictGetKey(de);
 
         count += pubsubUnsubscribeChannel(c,channel,notify);
     }
 
         count += pubsubUnsubscribeChannel(c,channel,notify);
     }
@@ -180,7 +180,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
     /* Send to clients listening for that channel */
     de = dictFind(server.pubsub_channels,channel);
     if (de) {
     /* Send to clients listening for that channel */
     de = dictFind(server.pubsub_channels,channel);
     if (de) {
-        list *list = dictGetEntryVal(de);
+        list *list = dictGetVal(de);
         listNode *ln;
         listIter li;
 
         listNode *ln;
         listIter li;
 
@@ -188,7 +188,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
         while ((ln = listNext(&li)) != NULL) {
             redisClient *c = ln->value;
 
         while ((ln = listNext(&li)) != NULL) {
             redisClient *c = ln->value;
 
-            addReply(c,shared.mbulk3);
+            addReply(c,shared.mbulkhdr[3]);
             addReply(c,shared.messagebulk);
             addReplyBulk(c,channel);
             addReplyBulk(c,message);
             addReply(c,shared.messagebulk);
             addReplyBulk(c,channel);
             addReplyBulk(c,message);
@@ -206,7 +206,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
                                 sdslen(pat->pattern->ptr),
                                 (char*)channel->ptr,
                                 sdslen(channel->ptr),0)) {
                                 sdslen(pat->pattern->ptr),
                                 (char*)channel->ptr,
                                 sdslen(channel->ptr),0)) {
-                addReply(pat->client,shared.mbulk4);
+                addReply(pat->client,shared.mbulkhdr[4]);
                 addReply(pat->client,shared.pmessagebulk);
                 addReplyBulk(pat->client,pat->pattern);
                 addReplyBulk(pat->client,channel);
                 addReply(pat->client,shared.pmessagebulk);
                 addReplyBulk(pat->client,pat->pattern);
                 addReplyBulk(pat->client,channel);
@@ -263,5 +263,6 @@ void punsubscribeCommand(redisClient *c) {
 
 void publishCommand(redisClient *c) {
     int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
 
 void publishCommand(redisClient *c) {
     int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
+    if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
     addReplyLongLong(c,receivers);
 }
     addReplyLongLong(c,receivers);
 }