]>
git.saurik.com Git - redis.git/blob - src/pubsub.c
   3 /*----------------------------------------------------------------------------- 
   5  *----------------------------------------------------------------------------*/ 
   7 void freePubsubPattern(void *p
) { 
   8     pubsubPattern 
*pat 
= p
; 
  10     decrRefCount(pat
->pattern
); 
  14 int listMatchPubsubPattern(void *a
, void *b
) { 
  15     pubsubPattern 
*pa 
= a
, *pb 
= b
; 
  17     return (pa
->client 
== pb
->client
) && 
  18            (equalStringObjects(pa
->pattern
,pb
->pattern
)); 
  21 /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or 
  22  * 0 if the client was already subscribed to that channel. */ 
  23 int pubsubSubscribeChannel(redisClient 
*c
, robj 
*channel
) { 
  28     /* Add the channel to the client -> channels hash table */ 
  29     if (dictAdd(c
->pubsub_channels
,channel
,NULL
) == DICT_OK
) { 
  31         incrRefCount(channel
); 
  32         /* Add the client to the channel -> list of clients hash table */ 
  33         de 
= dictFind(server
.pubsub_channels
,channel
); 
  35             clients 
= listCreate(); 
  36             dictAdd(server
.pubsub_channels
,channel
,clients
); 
  37             incrRefCount(channel
); 
  39             clients 
= dictGetVal(de
); 
  41         listAddNodeTail(clients
,c
); 
  43     /* Notify the client */ 
  44     addReply(c
,shared
.mbulk3
); 
  45     addReply(c
,shared
.subscribebulk
); 
  46     addReplyBulk(c
,channel
); 
  47     addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+listLength(c
->pubsub_patterns
)); 
  51 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or 
  52  * 0 if the client was not subscribed to the specified channel. */ 
  53 int pubsubUnsubscribeChannel(redisClient 
*c
, robj 
*channel
, int notify
) { 
  59     /* Remove the channel from the client -> channels hash table */ 
  60     incrRefCount(channel
); /* channel may be just a pointer to the same object 
  61                             we have in the hash tables. Protect it... */ 
  62     if (dictDelete(c
->pubsub_channels
,channel
) == DICT_OK
) { 
  64         /* Remove the client from the channel -> clients list hash table */ 
  65         de 
= dictFind(server
.pubsub_channels
,channel
); 
  66         redisAssertWithInfo(c
,NULL
,de 
!= NULL
); 
  67         clients 
= dictGetVal(de
); 
  68         ln 
= listSearchKey(clients
,c
); 
  69         redisAssertWithInfo(c
,NULL
,ln 
!= NULL
); 
  70         listDelNode(clients
,ln
); 
  71         if (listLength(clients
) == 0) { 
  72             /* Free the list and associated hash entry at all if this was 
  73              * the latest client, so that it will be possible to abuse 
  74              * Redis PUBSUB creating millions of channels. */ 
  75             dictDelete(server
.pubsub_channels
,channel
); 
  78     /* Notify the client */ 
  80         addReply(c
,shared
.mbulk3
); 
  81         addReply(c
,shared
.unsubscribebulk
); 
  82         addReplyBulk(c
,channel
); 
  83         addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+ 
  84                        listLength(c
->pubsub_patterns
)); 
  87     decrRefCount(channel
); /* it is finally safe to release it */ 
  91 /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */ 
  92 int pubsubSubscribePattern(redisClient 
*c
, robj 
*pattern
) { 
  95     if (listSearchKey(c
->pubsub_patterns
,pattern
) == NULL
) { 
  98         listAddNodeTail(c
->pubsub_patterns
,pattern
); 
  99         incrRefCount(pattern
); 
 100         pat 
= zmalloc(sizeof(*pat
)); 
 101         pat
->pattern 
= getDecodedObject(pattern
); 
 103         listAddNodeTail(server
.pubsub_patterns
,pat
); 
 105     /* Notify the client */ 
 106     addReply(c
,shared
.mbulk3
); 
 107     addReply(c
,shared
.psubscribebulk
); 
 108     addReplyBulk(c
,pattern
); 
 109     addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+listLength(c
->pubsub_patterns
)); 
 113 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or 
 114  * 0 if the client was not subscribed to the specified channel. */ 
 115 int pubsubUnsubscribePattern(redisClient 
*c
, robj 
*pattern
, int notify
) { 
 120     incrRefCount(pattern
); /* Protect the object. May be the same we remove */ 
 121     if ((ln 
= listSearchKey(c
->pubsub_patterns
,pattern
)) != NULL
) { 
 123         listDelNode(c
->pubsub_patterns
,ln
); 
 125         pat
.pattern 
= pattern
; 
 126         ln 
= listSearchKey(server
.pubsub_patterns
,&pat
); 
 127         listDelNode(server
.pubsub_patterns
,ln
); 
 129     /* Notify the client */ 
 131         addReply(c
,shared
.mbulk3
); 
 132         addReply(c
,shared
.punsubscribebulk
); 
 133         addReplyBulk(c
,pattern
); 
 134         addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+ 
 135                        listLength(c
->pubsub_patterns
)); 
 137     decrRefCount(pattern
); 
 141 /* Unsubscribe from all the channels. Return the number of channels the 
 142  * client was subscribed from. */ 
 143 int pubsubUnsubscribeAllChannels(redisClient 
*c
, int notify
) { 
 144     dictIterator 
*di 
= dictGetSafeIterator(c
->pubsub_channels
); 
 148     while((de 
= dictNext(di
)) != NULL
) { 
 149         robj 
*channel 
= dictGetKey(de
); 
 151         count 
+= pubsubUnsubscribeChannel(c
,channel
,notify
); 
 153     dictReleaseIterator(di
); 
 157 /* Unsubscribe from all the patterns. Return the number of patterns the 
 158  * client was subscribed from. */ 
 159 int pubsubUnsubscribeAllPatterns(redisClient 
*c
, int notify
) { 
 164     listRewind(c
->pubsub_patterns
,&li
); 
 165     while ((ln 
= listNext(&li
)) != NULL
) { 
 166         robj 
*pattern 
= ln
->value
; 
 168         count 
+= pubsubUnsubscribePattern(c
,pattern
,notify
); 
 173 /* Publish a message */ 
 174 int pubsubPublishMessage(robj 
*channel
, robj 
*message
) { 
 176     struct dictEntry 
*de
; 
 180     /* Send to clients listening for that channel */ 
 181     de 
= dictFind(server
.pubsub_channels
,channel
); 
 183         list 
*list 
= dictGetVal(de
); 
 187         listRewind(list
,&li
); 
 188         while ((ln 
= listNext(&li
)) != NULL
) { 
 189             redisClient 
*c 
= ln
->value
; 
 191             addReply(c
,shared
.mbulk3
); 
 192             addReply(c
,shared
.messagebulk
); 
 193             addReplyBulk(c
,channel
); 
 194             addReplyBulk(c
,message
); 
 198     /* Send to clients listening to matching channels */ 
 199     if (listLength(server
.pubsub_patterns
)) { 
 200         listRewind(server
.pubsub_patterns
,&li
); 
 201         channel 
= getDecodedObject(channel
); 
 202         while ((ln 
= listNext(&li
)) != NULL
) { 
 203             pubsubPattern 
*pat 
= ln
->value
; 
 205             if (stringmatchlen((char*)pat
->pattern
->ptr
, 
 206                                 sdslen(pat
->pattern
->ptr
), 
 208                                 sdslen(channel
->ptr
),0)) { 
 209                 addReply(pat
->client
,shared
.mbulk4
); 
 210                 addReply(pat
->client
,shared
.pmessagebulk
); 
 211                 addReplyBulk(pat
->client
,pat
->pattern
); 
 212                 addReplyBulk(pat
->client
,channel
); 
 213                 addReplyBulk(pat
->client
,message
); 
 217         decrRefCount(channel
); 
 222 /*----------------------------------------------------------------------------- 
 223  * Pubsub commands implementation 
 224  *----------------------------------------------------------------------------*/ 
 226 void subscribeCommand(redisClient 
*c
) { 
 229     for (j 
= 1; j 
< c
->argc
; j
++) 
 230         pubsubSubscribeChannel(c
,c
->argv
[j
]); 
 233 void unsubscribeCommand(redisClient 
*c
) { 
 235         pubsubUnsubscribeAllChannels(c
,1); 
 240         for (j 
= 1; j 
< c
->argc
; j
++) 
 241             pubsubUnsubscribeChannel(c
,c
->argv
[j
],1); 
 245 void psubscribeCommand(redisClient 
*c
) { 
 248     for (j 
= 1; j 
< c
->argc
; j
++) 
 249         pubsubSubscribePattern(c
,c
->argv
[j
]); 
 252 void punsubscribeCommand(redisClient 
*c
) { 
 254         pubsubUnsubscribeAllPatterns(c
,1); 
 259         for (j 
= 1; j 
< c
->argc
; j
++) 
 260             pubsubUnsubscribePattern(c
,c
->argv
[j
],1); 
 264 void publishCommand(redisClient 
*c
) { 
 265     int receivers 
= pubsubPublishMessage(c
->argv
[1],c
->argv
[2]); 
 266     if (server
.cluster_enabled
) clusterPropagatePublish(c
->argv
[1],c
->argv
[2]); 
 267     addReplyLongLong(c
,receivers
);