]> git.saurik.com Git - redis.git/blobdiff - src/pubsub.c
Merge remote-tracking branch 'origin/unstable' into unstable
[redis.git] / src / pubsub.c
index c9f5f310e55d1894430283017ff828cf240e014b..27e6f9a5819b49cddb4bd4766c9af2fb2e076d6a 100644 (file)
@@ -1,5 +1,9 @@
 #include "redis.h"
 
+/*-----------------------------------------------------------------------------
+ * Pubsub low level API
+ *----------------------------------------------------------------------------*/
+
 void freePubsubPattern(void *p) {
     pubsubPattern *pat = p;
 
@@ -32,7 +36,7 @@ int pubsubSubscribeChannel(redisClient *c, robj *channel) {
             dictAdd(server.pubsub_channels,channel,clients);
             incrRefCount(channel);
         } else {
-            clients = dictGetEntryVal(de);
+            clients = dictGetVal(de);
         }
         listAddNodeTail(clients,c);
     }
@@ -59,10 +63,10 @@ int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
         retval = 1;
         /* Remove the client from the channel -> clients list hash table */
         de = dictFind(server.pubsub_channels,channel);
-        redisAssert(de != NULL);
-        clients = dictGetEntryVal(de);
+        redisAssertWithInfo(c,NULL,de != NULL);
+        clients = dictGetVal(de);
         ln = listSearchKey(clients,c);
-        redisAssert(ln != NULL);
+        redisAssertWithInfo(c,NULL,ln != NULL);
         listDelNode(clients,ln);
         if (listLength(clients) == 0) {
             /* Free the list and associated hash entry at all if this was
@@ -137,12 +141,12 @@ int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
 /* Unsubscribe from all the channels. Return the number of channels the
  * client was subscribed from. */
 int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
-    dictIterator *di = dictGetIterator(c->pubsub_channels);
+    dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
     dictEntry *de;
     int count = 0;
 
     while((de = dictNext(di)) != NULL) {
-        robj *channel = dictGetEntryKey(de);
+        robj *channel = dictGetKey(de);
 
         count += pubsubUnsubscribeChannel(c,channel,notify);
     }
@@ -176,7 +180,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
     /* Send to clients listening for that channel */
     de = dictFind(server.pubsub_channels,channel);
     if (de) {
-        list *list = dictGetEntryVal(de);
+        list *list = dictGetVal(de);
         listNode *ln;
         listIter li;
 
@@ -215,6 +219,10 @@ int pubsubPublishMessage(robj *channel, robj *message) {
     return receivers;
 }
 
+/*-----------------------------------------------------------------------------
+ * Pubsub commands implementation
+ *----------------------------------------------------------------------------*/
+
 void subscribeCommand(redisClient *c) {
     int j;
 
@@ -255,5 +263,6 @@ void punsubscribeCommand(redisClient *c) {
 
 void publishCommand(redisClient *c) {
     int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
+    if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
     addReplyLongLong(c,receivers);
 }