]>
Commit | Line | Data |
---|---|---|
4365e5b2 | 1 | /* |
2 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> | |
3 | * All rights reserved. | |
4 | * | |
5 | * Redistribution and use in source and binary forms, with or without | |
6 | * modification, are permitted provided that the following conditions are met: | |
7 | * | |
8 | * * Redistributions of source code must retain the above copyright notice, | |
9 | * this list of conditions and the following disclaimer. | |
10 | * * Redistributions in binary form must reproduce the above copyright | |
11 | * notice, this list of conditions and the following disclaimer in the | |
12 | * documentation and/or other materials provided with the distribution. | |
13 | * * Neither the name of Redis nor the names of its contributors may be used | |
14 | * to endorse or promote products derived from this software without | |
15 | * specific prior written permission. | |
16 | * | |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
27 | * POSSIBILITY OF SUCH DAMAGE. | |
28 | */ | |
29 | ||
e2641e09 | 30 | #include "redis.h" |
31 | ||
0f49d6b0 | 32 | /*----------------------------------------------------------------------------- |
33 | * Pubsub low level API | |
34 | *----------------------------------------------------------------------------*/ | |
35 | ||
e2641e09 | 36 | void freePubsubPattern(void *p) { |
37 | pubsubPattern *pat = p; | |
38 | ||
39 | decrRefCount(pat->pattern); | |
40 | zfree(pat); | |
41 | } | |
42 | ||
43 | int listMatchPubsubPattern(void *a, void *b) { | |
44 | pubsubPattern *pa = a, *pb = b; | |
45 | ||
46 | return (pa->client == pb->client) && | |
47 | (equalStringObjects(pa->pattern,pb->pattern)); | |
48 | } | |
49 | ||
50 | /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or | |
51 | * 0 if the client was already subscribed to that channel. */ | |
52 | int pubsubSubscribeChannel(redisClient *c, robj *channel) { | |
53 | struct dictEntry *de; | |
54 | list *clients = NULL; | |
55 | int retval = 0; | |
56 | ||
57 | /* Add the channel to the client -> channels hash table */ | |
58 | if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { | |
59 | retval = 1; | |
60 | incrRefCount(channel); | |
61 | /* Add the client to the channel -> list of clients hash table */ | |
62 | de = dictFind(server.pubsub_channels,channel); | |
63 | if (de == NULL) { | |
64 | clients = listCreate(); | |
65 | dictAdd(server.pubsub_channels,channel,clients); | |
66 | incrRefCount(channel); | |
67 | } else { | |
c0ba9ebe | 68 | clients = dictGetVal(de); |
e2641e09 | 69 | } |
70 | listAddNodeTail(clients,c); | |
71 | } | |
72 | /* Notify the client */ | |
355f8591 | 73 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 | 74 | addReply(c,shared.subscribebulk); |
75 | addReplyBulk(c,channel); | |
76 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); | |
77 | return retval; | |
78 | } | |
79 | ||
80 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or | |
81 | * 0 if the client was not subscribed to the specified channel. */ | |
82 | int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { | |
83 | struct dictEntry *de; | |
84 | list *clients; | |
85 | listNode *ln; | |
86 | int retval = 0; | |
87 | ||
88 | /* Remove the channel from the client -> channels hash table */ | |
89 | incrRefCount(channel); /* channel may be just a pointer to the same object | |
90 | we have in the hash tables. Protect it... */ | |
91 | if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { | |
92 | retval = 1; | |
93 | /* Remove the client from the channel -> clients list hash table */ | |
94 | de = dictFind(server.pubsub_channels,channel); | |
eab0e26e | 95 | redisAssertWithInfo(c,NULL,de != NULL); |
c0ba9ebe | 96 | clients = dictGetVal(de); |
e2641e09 | 97 | ln = listSearchKey(clients,c); |
eab0e26e | 98 | redisAssertWithInfo(c,NULL,ln != NULL); |
e2641e09 | 99 | listDelNode(clients,ln); |
100 | if (listLength(clients) == 0) { | |
101 | /* Free the list and associated hash entry at all if this was | |
102 | * the latest client, so that it will be possible to abuse | |
103 | * Redis PUBSUB creating millions of channels. */ | |
104 | dictDelete(server.pubsub_channels,channel); | |
105 | } | |
106 | } | |
107 | /* Notify the client */ | |
108 | if (notify) { | |
355f8591 | 109 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 | 110 | addReply(c,shared.unsubscribebulk); |
111 | addReplyBulk(c,channel); | |
112 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ | |
113 | listLength(c->pubsub_patterns)); | |
114 | ||
115 | } | |
116 | decrRefCount(channel); /* it is finally safe to release it */ | |
117 | return retval; | |
118 | } | |
119 | ||
120 | /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */ | |
121 | int pubsubSubscribePattern(redisClient *c, robj *pattern) { | |
122 | int retval = 0; | |
123 | ||
124 | if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { | |
125 | retval = 1; | |
126 | pubsubPattern *pat; | |
127 | listAddNodeTail(c->pubsub_patterns,pattern); | |
128 | incrRefCount(pattern); | |
129 | pat = zmalloc(sizeof(*pat)); | |
130 | pat->pattern = getDecodedObject(pattern); | |
131 | pat->client = c; | |
132 | listAddNodeTail(server.pubsub_patterns,pat); | |
133 | } | |
134 | /* Notify the client */ | |
355f8591 | 135 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 | 136 | addReply(c,shared.psubscribebulk); |
137 | addReplyBulk(c,pattern); | |
138 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); | |
139 | return retval; | |
140 | } | |
141 | ||
142 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or | |
143 | * 0 if the client was not subscribed to the specified channel. */ | |
144 | int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { | |
145 | listNode *ln; | |
146 | pubsubPattern pat; | |
147 | int retval = 0; | |
148 | ||
149 | incrRefCount(pattern); /* Protect the object. May be the same we remove */ | |
150 | if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { | |
151 | retval = 1; | |
152 | listDelNode(c->pubsub_patterns,ln); | |
153 | pat.client = c; | |
154 | pat.pattern = pattern; | |
155 | ln = listSearchKey(server.pubsub_patterns,&pat); | |
156 | listDelNode(server.pubsub_patterns,ln); | |
157 | } | |
158 | /* Notify the client */ | |
159 | if (notify) { | |
355f8591 | 160 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 | 161 | addReply(c,shared.punsubscribebulk); |
162 | addReplyBulk(c,pattern); | |
163 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ | |
164 | listLength(c->pubsub_patterns)); | |
165 | } | |
166 | decrRefCount(pattern); | |
167 | return retval; | |
168 | } | |
169 | ||
170 | /* Unsubscribe from all the channels. Return the number of channels the | |
171 | * client was subscribed from. */ | |
172 | int pubsubUnsubscribeAllChannels(redisClient *c, int notify) { | |
efc34087 | 173 | dictIterator *di = dictGetSafeIterator(c->pubsub_channels); |
e2641e09 | 174 | dictEntry *de; |
175 | int count = 0; | |
176 | ||
177 | while((de = dictNext(di)) != NULL) { | |
c0ba9ebe | 178 | robj *channel = dictGetKey(de); |
e2641e09 | 179 | |
180 | count += pubsubUnsubscribeChannel(c,channel,notify); | |
181 | } | |
182 | dictReleaseIterator(di); | |
183 | return count; | |
184 | } | |
185 | ||
186 | /* Unsubscribe from all the patterns. Return the number of patterns the | |
187 | * client was subscribed from. */ | |
188 | int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) { | |
189 | listNode *ln; | |
190 | listIter li; | |
191 | int count = 0; | |
192 | ||
193 | listRewind(c->pubsub_patterns,&li); | |
194 | while ((ln = listNext(&li)) != NULL) { | |
195 | robj *pattern = ln->value; | |
196 | ||
197 | count += pubsubUnsubscribePattern(c,pattern,notify); | |
198 | } | |
199 | return count; | |
200 | } | |
201 | ||
202 | /* Publish a message */ | |
203 | int pubsubPublishMessage(robj *channel, robj *message) { | |
204 | int receivers = 0; | |
205 | struct dictEntry *de; | |
206 | listNode *ln; | |
207 | listIter li; | |
208 | ||
209 | /* Send to clients listening for that channel */ | |
210 | de = dictFind(server.pubsub_channels,channel); | |
211 | if (de) { | |
c0ba9ebe | 212 | list *list = dictGetVal(de); |
e2641e09 | 213 | listNode *ln; |
214 | listIter li; | |
215 | ||
216 | listRewind(list,&li); | |
217 | while ((ln = listNext(&li)) != NULL) { | |
218 | redisClient *c = ln->value; | |
219 | ||
355f8591 | 220 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 | 221 | addReply(c,shared.messagebulk); |
222 | addReplyBulk(c,channel); | |
223 | addReplyBulk(c,message); | |
224 | receivers++; | |
225 | } | |
226 | } | |
227 | /* Send to clients listening to matching channels */ | |
228 | if (listLength(server.pubsub_patterns)) { | |
229 | listRewind(server.pubsub_patterns,&li); | |
230 | channel = getDecodedObject(channel); | |
231 | while ((ln = listNext(&li)) != NULL) { | |
232 | pubsubPattern *pat = ln->value; | |
233 | ||
234 | if (stringmatchlen((char*)pat->pattern->ptr, | |
235 | sdslen(pat->pattern->ptr), | |
236 | (char*)channel->ptr, | |
237 | sdslen(channel->ptr),0)) { | |
355f8591 | 238 | addReply(pat->client,shared.mbulkhdr[4]); |
e2641e09 | 239 | addReply(pat->client,shared.pmessagebulk); |
240 | addReplyBulk(pat->client,pat->pattern); | |
241 | addReplyBulk(pat->client,channel); | |
242 | addReplyBulk(pat->client,message); | |
243 | receivers++; | |
244 | } | |
245 | } | |
246 | decrRefCount(channel); | |
247 | } | |
248 | return receivers; | |
249 | } | |
250 | ||
0f49d6b0 | 251 | /*----------------------------------------------------------------------------- |
252 | * Pubsub commands implementation | |
253 | *----------------------------------------------------------------------------*/ | |
254 | ||
e2641e09 | 255 | void subscribeCommand(redisClient *c) { |
256 | int j; | |
257 | ||
258 | for (j = 1; j < c->argc; j++) | |
259 | pubsubSubscribeChannel(c,c->argv[j]); | |
260 | } | |
261 | ||
262 | void unsubscribeCommand(redisClient *c) { | |
263 | if (c->argc == 1) { | |
264 | pubsubUnsubscribeAllChannels(c,1); | |
265 | return; | |
266 | } else { | |
267 | int j; | |
268 | ||
269 | for (j = 1; j < c->argc; j++) | |
270 | pubsubUnsubscribeChannel(c,c->argv[j],1); | |
271 | } | |
272 | } | |
273 | ||
274 | void psubscribeCommand(redisClient *c) { | |
275 | int j; | |
276 | ||
277 | for (j = 1; j < c->argc; j++) | |
278 | pubsubSubscribePattern(c,c->argv[j]); | |
279 | } | |
280 | ||
281 | void punsubscribeCommand(redisClient *c) { | |
282 | if (c->argc == 1) { | |
283 | pubsubUnsubscribeAllPatterns(c,1); | |
284 | return; | |
285 | } else { | |
286 | int j; | |
287 | ||
288 | for (j = 1; j < c->argc; j++) | |
289 | pubsubUnsubscribePattern(c,c->argv[j],1); | |
290 | } | |
291 | } | |
292 | ||
293 | void publishCommand(redisClient *c) { | |
294 | int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); | |
c563ce46 | 295 | if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); |
e2641e09 | 296 | addReplyLongLong(c,receivers); |
297 | } |