]>
git.saurik.com Git - redis.git/blob - src/pubsub.c
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
32 /*-----------------------------------------------------------------------------
33 * Pubsub low level API
34 *----------------------------------------------------------------------------*/
36 void freePubsubPattern(void *p
) {
37 pubsubPattern
*pat
= p
;
39 decrRefCount(pat
->pattern
);
43 int listMatchPubsubPattern(void *a
, void *b
) {
44 pubsubPattern
*pa
= a
, *pb
= b
;
46 return (pa
->client
== pb
->client
) &&
47 (equalStringObjects(pa
->pattern
,pb
->pattern
));
50 /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
51 * 0 if the client was already subscribed to that channel. */
52 int pubsubSubscribeChannel(redisClient
*c
, robj
*channel
) {
57 /* Add the channel to the client -> channels hash table */
58 if (dictAdd(c
->pubsub_channels
,channel
,NULL
) == DICT_OK
) {
60 incrRefCount(channel
);
61 /* Add the client to the channel -> list of clients hash table */
62 de
= dictFind(server
.pubsub_channels
,channel
);
64 clients
= listCreate();
65 dictAdd(server
.pubsub_channels
,channel
,clients
);
66 incrRefCount(channel
);
68 clients
= dictGetVal(de
);
70 listAddNodeTail(clients
,c
);
72 /* Notify the client */
73 addReply(c
,shared
.mbulkhdr
[3]);
74 addReply(c
,shared
.subscribebulk
);
75 addReplyBulk(c
,channel
);
76 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+listLength(c
->pubsub_patterns
));
80 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
81 * 0 if the client was not subscribed to the specified channel. */
82 int pubsubUnsubscribeChannel(redisClient
*c
, robj
*channel
, int notify
) {
88 /* Remove the channel from the client -> channels hash table */
89 incrRefCount(channel
); /* channel may be just a pointer to the same object
90 we have in the hash tables. Protect it... */
91 if (dictDelete(c
->pubsub_channels
,channel
) == DICT_OK
) {
93 /* Remove the client from the channel -> clients list hash table */
94 de
= dictFind(server
.pubsub_channels
,channel
);
95 redisAssertWithInfo(c
,NULL
,de
!= NULL
);
96 clients
= dictGetVal(de
);
97 ln
= listSearchKey(clients
,c
);
98 redisAssertWithInfo(c
,NULL
,ln
!= NULL
);
99 listDelNode(clients
,ln
);
100 if (listLength(clients
) == 0) {
101 /* Free the list and associated hash entry at all if this was
102 * the latest client, so that it will be possible to abuse
103 * Redis PUBSUB creating millions of channels. */
104 dictDelete(server
.pubsub_channels
,channel
);
107 /* Notify the client */
109 addReply(c
,shared
.mbulkhdr
[3]);
110 addReply(c
,shared
.unsubscribebulk
);
111 addReplyBulk(c
,channel
);
112 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+
113 listLength(c
->pubsub_patterns
));
116 decrRefCount(channel
); /* it is finally safe to release it */
120 /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
121 int pubsubSubscribePattern(redisClient
*c
, robj
*pattern
) {
124 if (listSearchKey(c
->pubsub_patterns
,pattern
) == NULL
) {
127 listAddNodeTail(c
->pubsub_patterns
,pattern
);
128 incrRefCount(pattern
);
129 pat
= zmalloc(sizeof(*pat
));
130 pat
->pattern
= getDecodedObject(pattern
);
132 listAddNodeTail(server
.pubsub_patterns
,pat
);
134 /* Notify the client */
135 addReply(c
,shared
.mbulkhdr
[3]);
136 addReply(c
,shared
.psubscribebulk
);
137 addReplyBulk(c
,pattern
);
138 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+listLength(c
->pubsub_patterns
));
142 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
143 * 0 if the client was not subscribed to the specified channel. */
144 int pubsubUnsubscribePattern(redisClient
*c
, robj
*pattern
, int notify
) {
149 incrRefCount(pattern
); /* Protect the object. May be the same we remove */
150 if ((ln
= listSearchKey(c
->pubsub_patterns
,pattern
)) != NULL
) {
152 listDelNode(c
->pubsub_patterns
,ln
);
154 pat
.pattern
= pattern
;
155 ln
= listSearchKey(server
.pubsub_patterns
,&pat
);
156 listDelNode(server
.pubsub_patterns
,ln
);
158 /* Notify the client */
160 addReply(c
,shared
.mbulkhdr
[3]);
161 addReply(c
,shared
.punsubscribebulk
);
162 addReplyBulk(c
,pattern
);
163 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+
164 listLength(c
->pubsub_patterns
));
166 decrRefCount(pattern
);
170 /* Unsubscribe from all the channels. Return the number of channels the
171 * client was subscribed from. */
172 int pubsubUnsubscribeAllChannels(redisClient
*c
, int notify
) {
173 dictIterator
*di
= dictGetSafeIterator(c
->pubsub_channels
);
177 while((de
= dictNext(di
)) != NULL
) {
178 robj
*channel
= dictGetKey(de
);
180 count
+= pubsubUnsubscribeChannel(c
,channel
,notify
);
182 dictReleaseIterator(di
);
186 /* Unsubscribe from all the patterns. Return the number of patterns the
187 * client was subscribed from. */
188 int pubsubUnsubscribeAllPatterns(redisClient
*c
, int notify
) {
193 listRewind(c
->pubsub_patterns
,&li
);
194 while ((ln
= listNext(&li
)) != NULL
) {
195 robj
*pattern
= ln
->value
;
197 count
+= pubsubUnsubscribePattern(c
,pattern
,notify
);
202 /* Publish a message */
203 int pubsubPublishMessage(robj
*channel
, robj
*message
) {
205 struct dictEntry
*de
;
209 /* Send to clients listening for that channel */
210 de
= dictFind(server
.pubsub_channels
,channel
);
212 list
*list
= dictGetVal(de
);
216 listRewind(list
,&li
);
217 while ((ln
= listNext(&li
)) != NULL
) {
218 redisClient
*c
= ln
->value
;
220 addReply(c
,shared
.mbulkhdr
[3]);
221 addReply(c
,shared
.messagebulk
);
222 addReplyBulk(c
,channel
);
223 addReplyBulk(c
,message
);
227 /* Send to clients listening to matching channels */
228 if (listLength(server
.pubsub_patterns
)) {
229 listRewind(server
.pubsub_patterns
,&li
);
230 channel
= getDecodedObject(channel
);
231 while ((ln
= listNext(&li
)) != NULL
) {
232 pubsubPattern
*pat
= ln
->value
;
234 if (stringmatchlen((char*)pat
->pattern
->ptr
,
235 sdslen(pat
->pattern
->ptr
),
237 sdslen(channel
->ptr
),0)) {
238 addReply(pat
->client
,shared
.mbulkhdr
[4]);
239 addReply(pat
->client
,shared
.pmessagebulk
);
240 addReplyBulk(pat
->client
,pat
->pattern
);
241 addReplyBulk(pat
->client
,channel
);
242 addReplyBulk(pat
->client
,message
);
246 decrRefCount(channel
);
251 /*-----------------------------------------------------------------------------
252 * Pubsub commands implementation
253 *----------------------------------------------------------------------------*/
255 void subscribeCommand(redisClient
*c
) {
258 for (j
= 1; j
< c
->argc
; j
++)
259 pubsubSubscribeChannel(c
,c
->argv
[j
]);
262 void unsubscribeCommand(redisClient
*c
) {
264 pubsubUnsubscribeAllChannels(c
,1);
269 for (j
= 1; j
< c
->argc
; j
++)
270 pubsubUnsubscribeChannel(c
,c
->argv
[j
],1);
274 void psubscribeCommand(redisClient
*c
) {
277 for (j
= 1; j
< c
->argc
; j
++)
278 pubsubSubscribePattern(c
,c
->argv
[j
]);
281 void punsubscribeCommand(redisClient
*c
) {
283 pubsubUnsubscribeAllPatterns(c
,1);
288 for (j
= 1; j
< c
->argc
; j
++)
289 pubsubUnsubscribePattern(c
,c
->argv
[j
],1);
293 void publishCommand(redisClient
*c
) {
294 int receivers
= pubsubPublishMessage(c
->argv
[1],c
->argv
[2]);
295 if (server
.cluster_enabled
) clusterPropagatePublish(c
->argv
[1],c
->argv
[2]);
296 addReplyLongLong(c
,receivers
);