]>
git.saurik.com Git - redis.git/blob - src/pubsub.c
3 /*-----------------------------------------------------------------------------
5 *----------------------------------------------------------------------------*/
7 void freePubsubPattern(void *p
) {
8 pubsubPattern
*pat
= p
;
10 decrRefCount(pat
->pattern
);
14 int listMatchPubsubPattern(void *a
, void *b
) {
15 pubsubPattern
*pa
= a
, *pb
= b
;
17 return (pa
->client
== pb
->client
) &&
18 (equalStringObjects(pa
->pattern
,pb
->pattern
));
21 /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
22 * 0 if the client was already subscribed to that channel. */
23 int pubsubSubscribeChannel(redisClient
*c
, robj
*channel
) {
28 /* Add the channel to the client -> channels hash table */
29 if (dictAdd(c
->pubsub_channels
,channel
,NULL
) == DICT_OK
) {
31 incrRefCount(channel
);
32 /* Add the client to the channel -> list of clients hash table */
33 de
= dictFind(server
.pubsub_channels
,channel
);
35 clients
= listCreate();
36 dictAdd(server
.pubsub_channels
,channel
,clients
);
37 incrRefCount(channel
);
39 clients
= dictGetEntryVal(de
);
41 listAddNodeTail(clients
,c
);
43 /* Notify the client */
44 addReply(c
,shared
.mbulk3
);
45 addReply(c
,shared
.subscribebulk
);
46 addReplyBulk(c
,channel
);
47 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+listLength(c
->pubsub_patterns
));
51 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
52 * 0 if the client was not subscribed to the specified channel. */
53 int pubsubUnsubscribeChannel(redisClient
*c
, robj
*channel
, int notify
) {
59 /* Remove the channel from the client -> channels hash table */
60 incrRefCount(channel
); /* channel may be just a pointer to the same object
61 we have in the hash tables. Protect it... */
62 if (dictDelete(c
->pubsub_channels
,channel
) == DICT_OK
) {
64 /* Remove the client from the channel -> clients list hash table */
65 de
= dictFind(server
.pubsub_channels
,channel
);
66 redisAssertWithInfo(c
,NULL
,de
!= NULL
);
67 clients
= dictGetEntryVal(de
);
68 ln
= listSearchKey(clients
,c
);
69 redisAssertWithInfo(c
,NULL
,ln
!= NULL
);
70 listDelNode(clients
,ln
);
71 if (listLength(clients
) == 0) {
72 /* Free the list and associated hash entry at all if this was
73 * the latest client, so that it will be possible to abuse
74 * Redis PUBSUB creating millions of channels. */
75 dictDelete(server
.pubsub_channels
,channel
);
78 /* Notify the client */
80 addReply(c
,shared
.mbulk3
);
81 addReply(c
,shared
.unsubscribebulk
);
82 addReplyBulk(c
,channel
);
83 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+
84 listLength(c
->pubsub_patterns
));
87 decrRefCount(channel
); /* it is finally safe to release it */
91 /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
92 int pubsubSubscribePattern(redisClient
*c
, robj
*pattern
) {
95 if (listSearchKey(c
->pubsub_patterns
,pattern
) == NULL
) {
98 listAddNodeTail(c
->pubsub_patterns
,pattern
);
99 incrRefCount(pattern
);
100 pat
= zmalloc(sizeof(*pat
));
101 pat
->pattern
= getDecodedObject(pattern
);
103 listAddNodeTail(server
.pubsub_patterns
,pat
);
105 /* Notify the client */
106 addReply(c
,shared
.mbulk3
);
107 addReply(c
,shared
.psubscribebulk
);
108 addReplyBulk(c
,pattern
);
109 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+listLength(c
->pubsub_patterns
));
113 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
114 * 0 if the client was not subscribed to the specified channel. */
115 int pubsubUnsubscribePattern(redisClient
*c
, robj
*pattern
, int notify
) {
120 incrRefCount(pattern
); /* Protect the object. May be the same we remove */
121 if ((ln
= listSearchKey(c
->pubsub_patterns
,pattern
)) != NULL
) {
123 listDelNode(c
->pubsub_patterns
,ln
);
125 pat
.pattern
= pattern
;
126 ln
= listSearchKey(server
.pubsub_patterns
,&pat
);
127 listDelNode(server
.pubsub_patterns
,ln
);
129 /* Notify the client */
131 addReply(c
,shared
.mbulk3
);
132 addReply(c
,shared
.punsubscribebulk
);
133 addReplyBulk(c
,pattern
);
134 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+
135 listLength(c
->pubsub_patterns
));
137 decrRefCount(pattern
);
141 /* Unsubscribe from all the channels. Return the number of channels the
142 * client was subscribed from. */
143 int pubsubUnsubscribeAllChannels(redisClient
*c
, int notify
) {
144 dictIterator
*di
= dictGetSafeIterator(c
->pubsub_channels
);
148 while((de
= dictNext(di
)) != NULL
) {
149 robj
*channel
= dictGetEntryKey(de
);
151 count
+= pubsubUnsubscribeChannel(c
,channel
,notify
);
153 dictReleaseIterator(di
);
157 /* Unsubscribe from all the patterns. Return the number of patterns the
158 * client was subscribed from. */
159 int pubsubUnsubscribeAllPatterns(redisClient
*c
, int notify
) {
164 listRewind(c
->pubsub_patterns
,&li
);
165 while ((ln
= listNext(&li
)) != NULL
) {
166 robj
*pattern
= ln
->value
;
168 count
+= pubsubUnsubscribePattern(c
,pattern
,notify
);
173 /* Publish a message */
174 int pubsubPublishMessage(robj
*channel
, robj
*message
) {
176 struct dictEntry
*de
;
180 /* Send to clients listening for that channel */
181 de
= dictFind(server
.pubsub_channels
,channel
);
183 list
*list
= dictGetEntryVal(de
);
187 listRewind(list
,&li
);
188 while ((ln
= listNext(&li
)) != NULL
) {
189 redisClient
*c
= ln
->value
;
191 addReply(c
,shared
.mbulk3
);
192 addReply(c
,shared
.messagebulk
);
193 addReplyBulk(c
,channel
);
194 addReplyBulk(c
,message
);
198 /* Send to clients listening to matching channels */
199 if (listLength(server
.pubsub_patterns
)) {
200 listRewind(server
.pubsub_patterns
,&li
);
201 channel
= getDecodedObject(channel
);
202 while ((ln
= listNext(&li
)) != NULL
) {
203 pubsubPattern
*pat
= ln
->value
;
205 if (stringmatchlen((char*)pat
->pattern
->ptr
,
206 sdslen(pat
->pattern
->ptr
),
208 sdslen(channel
->ptr
),0)) {
209 addReply(pat
->client
,shared
.mbulk4
);
210 addReply(pat
->client
,shared
.pmessagebulk
);
211 addReplyBulk(pat
->client
,pat
->pattern
);
212 addReplyBulk(pat
->client
,channel
);
213 addReplyBulk(pat
->client
,message
);
217 decrRefCount(channel
);
222 /*-----------------------------------------------------------------------------
223 * Pubsub commands implementation
224 *----------------------------------------------------------------------------*/
226 void subscribeCommand(redisClient
*c
) {
229 for (j
= 1; j
< c
->argc
; j
++)
230 pubsubSubscribeChannel(c
,c
->argv
[j
]);
233 void unsubscribeCommand(redisClient
*c
) {
235 pubsubUnsubscribeAllChannels(c
,1);
240 for (j
= 1; j
< c
->argc
; j
++)
241 pubsubUnsubscribeChannel(c
,c
->argv
[j
],1);
245 void psubscribeCommand(redisClient
*c
) {
248 for (j
= 1; j
< c
->argc
; j
++)
249 pubsubSubscribePattern(c
,c
->argv
[j
]);
252 void punsubscribeCommand(redisClient
*c
) {
254 pubsubUnsubscribeAllPatterns(c
,1);
259 for (j
= 1; j
< c
->argc
; j
++)
260 pubsubUnsubscribePattern(c
,c
->argv
[j
],1);
264 void publishCommand(redisClient
*c
) {
265 int receivers
= pubsubPublishMessage(c
->argv
[1],c
->argv
[2]);
266 if (server
.cluster_enabled
) clusterPropagatePublish(c
->argv
[1],c
->argv
[2]);
267 addReplyLongLong(c
,receivers
);