]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> | |
3 | * All rights reserved. | |
4 | * | |
5 | * Redistribution and use in source and binary forms, with or without | |
6 | * modification, are permitted provided that the following conditions are met: | |
7 | * | |
8 | * * Redistributions of source code must retain the above copyright notice, | |
9 | * this list of conditions and the following disclaimer. | |
10 | * * Redistributions in binary form must reproduce the above copyright | |
11 | * notice, this list of conditions and the following disclaimer in the | |
12 | * documentation and/or other materials provided with the distribution. | |
13 | * * Neither the name of Redis nor the names of its contributors may be used | |
14 | * to endorse or promote products derived from this software without | |
15 | * specific prior written permission. | |
16 | * | |
17 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
18 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
19 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
20 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | |
21 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
22 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
23 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
24 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
25 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
26 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
27 | * POSSIBILITY OF SUCH DAMAGE. | |
28 | */ | |
29 | ||
30 | #include "redis.h" | |
31 | ||
32 | /*----------------------------------------------------------------------------- | |
33 | * Pubsub low level API | |
34 | *----------------------------------------------------------------------------*/ | |
35 | ||
36 | void freePubsubPattern(void *p) { | |
37 | pubsubPattern *pat = p; | |
38 | ||
39 | decrRefCount(pat->pattern); | |
40 | zfree(pat); | |
41 | } | |
42 | ||
43 | int listMatchPubsubPattern(void *a, void *b) { | |
44 | pubsubPattern *pa = a, *pb = b; | |
45 | ||
46 | return (pa->client == pb->client) && | |
47 | (equalStringObjects(pa->pattern,pb->pattern)); | |
48 | } | |
49 | ||
50 | /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or | |
51 | * 0 if the client was already subscribed to that channel. */ | |
52 | int pubsubSubscribeChannel(redisClient *c, robj *channel) { | |
53 | struct dictEntry *de; | |
54 | list *clients = NULL; | |
55 | int retval = 0; | |
56 | ||
57 | /* Add the channel to the client -> channels hash table */ | |
58 | if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { | |
59 | retval = 1; | |
60 | incrRefCount(channel); | |
61 | /* Add the client to the channel -> list of clients hash table */ | |
62 | de = dictFind(server.pubsub_channels,channel); | |
63 | if (de == NULL) { | |
64 | clients = listCreate(); | |
65 | dictAdd(server.pubsub_channels,channel,clients); | |
66 | incrRefCount(channel); | |
67 | } else { | |
68 | clients = dictGetVal(de); | |
69 | } | |
70 | listAddNodeTail(clients,c); | |
71 | } | |
72 | /* Notify the client */ | |
73 | addReply(c,shared.mbulkhdr[3]); | |
74 | addReply(c,shared.subscribebulk); | |
75 | addReplyBulk(c,channel); | |
76 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); | |
77 | return retval; | |
78 | } | |
79 | ||
80 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or | |
81 | * 0 if the client was not subscribed to the specified channel. */ | |
82 | int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { | |
83 | struct dictEntry *de; | |
84 | list *clients; | |
85 | listNode *ln; | |
86 | int retval = 0; | |
87 | ||
88 | /* Remove the channel from the client -> channels hash table */ | |
89 | incrRefCount(channel); /* channel may be just a pointer to the same object | |
90 | we have in the hash tables. Protect it... */ | |
91 | if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { | |
92 | retval = 1; | |
93 | /* Remove the client from the channel -> clients list hash table */ | |
94 | de = dictFind(server.pubsub_channels,channel); | |
95 | redisAssertWithInfo(c,NULL,de != NULL); | |
96 | clients = dictGetVal(de); | |
97 | ln = listSearchKey(clients,c); | |
98 | redisAssertWithInfo(c,NULL,ln != NULL); | |
99 | listDelNode(clients,ln); | |
100 | if (listLength(clients) == 0) { | |
101 | /* Free the list and associated hash entry at all if this was | |
102 | * the latest client, so that it will be possible to abuse | |
103 | * Redis PUBSUB creating millions of channels. */ | |
104 | dictDelete(server.pubsub_channels,channel); | |
105 | } | |
106 | } | |
107 | /* Notify the client */ | |
108 | if (notify) { | |
109 | addReply(c,shared.mbulkhdr[3]); | |
110 | addReply(c,shared.unsubscribebulk); | |
111 | addReplyBulk(c,channel); | |
112 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ | |
113 | listLength(c->pubsub_patterns)); | |
114 | ||
115 | } | |
116 | decrRefCount(channel); /* it is finally safe to release it */ | |
117 | return retval; | |
118 | } | |
119 | ||
120 | /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */ | |
121 | int pubsubSubscribePattern(redisClient *c, robj *pattern) { | |
122 | int retval = 0; | |
123 | ||
124 | if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { | |
125 | retval = 1; | |
126 | pubsubPattern *pat; | |
127 | listAddNodeTail(c->pubsub_patterns,pattern); | |
128 | incrRefCount(pattern); | |
129 | pat = zmalloc(sizeof(*pat)); | |
130 | pat->pattern = getDecodedObject(pattern); | |
131 | pat->client = c; | |
132 | listAddNodeTail(server.pubsub_patterns,pat); | |
133 | } | |
134 | /* Notify the client */ | |
135 | addReply(c,shared.mbulkhdr[3]); | |
136 | addReply(c,shared.psubscribebulk); | |
137 | addReplyBulk(c,pattern); | |
138 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); | |
139 | return retval; | |
140 | } | |
141 | ||
142 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or | |
143 | * 0 if the client was not subscribed to the specified channel. */ | |
144 | int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { | |
145 | listNode *ln; | |
146 | pubsubPattern pat; | |
147 | int retval = 0; | |
148 | ||
149 | incrRefCount(pattern); /* Protect the object. May be the same we remove */ | |
150 | if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { | |
151 | retval = 1; | |
152 | listDelNode(c->pubsub_patterns,ln); | |
153 | pat.client = c; | |
154 | pat.pattern = pattern; | |
155 | ln = listSearchKey(server.pubsub_patterns,&pat); | |
156 | listDelNode(server.pubsub_patterns,ln); | |
157 | } | |
158 | /* Notify the client */ | |
159 | if (notify) { | |
160 | addReply(c,shared.mbulkhdr[3]); | |
161 | addReply(c,shared.punsubscribebulk); | |
162 | addReplyBulk(c,pattern); | |
163 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ | |
164 | listLength(c->pubsub_patterns)); | |
165 | } | |
166 | decrRefCount(pattern); | |
167 | return retval; | |
168 | } | |
169 | ||
170 | /* Unsubscribe from all the channels. Return the number of channels the | |
171 | * client was subscribed from. */ | |
172 | int pubsubUnsubscribeAllChannels(redisClient *c, int notify) { | |
173 | dictIterator *di = dictGetSafeIterator(c->pubsub_channels); | |
174 | dictEntry *de; | |
175 | int count = 0; | |
176 | ||
177 | while((de = dictNext(di)) != NULL) { | |
178 | robj *channel = dictGetKey(de); | |
179 | ||
180 | count += pubsubUnsubscribeChannel(c,channel,notify); | |
181 | } | |
182 | dictReleaseIterator(di); | |
183 | return count; | |
184 | } | |
185 | ||
186 | /* Unsubscribe from all the patterns. Return the number of patterns the | |
187 | * client was subscribed from. */ | |
188 | int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) { | |
189 | listNode *ln; | |
190 | listIter li; | |
191 | int count = 0; | |
192 | ||
193 | listRewind(c->pubsub_patterns,&li); | |
194 | while ((ln = listNext(&li)) != NULL) { | |
195 | robj *pattern = ln->value; | |
196 | ||
197 | count += pubsubUnsubscribePattern(c,pattern,notify); | |
198 | } | |
199 | return count; | |
200 | } | |
201 | ||
202 | /* Publish a message */ | |
203 | int pubsubPublishMessage(robj *channel, robj *message) { | |
204 | int receivers = 0; | |
205 | struct dictEntry *de; | |
206 | listNode *ln; | |
207 | listIter li; | |
208 | ||
209 | /* Send to clients listening for that channel */ | |
210 | de = dictFind(server.pubsub_channels,channel); | |
211 | if (de) { | |
212 | list *list = dictGetVal(de); | |
213 | listNode *ln; | |
214 | listIter li; | |
215 | ||
216 | listRewind(list,&li); | |
217 | while ((ln = listNext(&li)) != NULL) { | |
218 | redisClient *c = ln->value; | |
219 | ||
220 | addReply(c,shared.mbulkhdr[3]); | |
221 | addReply(c,shared.messagebulk); | |
222 | addReplyBulk(c,channel); | |
223 | addReplyBulk(c,message); | |
224 | receivers++; | |
225 | } | |
226 | } | |
227 | /* Send to clients listening to matching channels */ | |
228 | if (listLength(server.pubsub_patterns)) { | |
229 | listRewind(server.pubsub_patterns,&li); | |
230 | channel = getDecodedObject(channel); | |
231 | while ((ln = listNext(&li)) != NULL) { | |
232 | pubsubPattern *pat = ln->value; | |
233 | ||
234 | if (stringmatchlen((char*)pat->pattern->ptr, | |
235 | sdslen(pat->pattern->ptr), | |
236 | (char*)channel->ptr, | |
237 | sdslen(channel->ptr),0)) { | |
238 | addReply(pat->client,shared.mbulkhdr[4]); | |
239 | addReply(pat->client,shared.pmessagebulk); | |
240 | addReplyBulk(pat->client,pat->pattern); | |
241 | addReplyBulk(pat->client,channel); | |
242 | addReplyBulk(pat->client,message); | |
243 | receivers++; | |
244 | } | |
245 | } | |
246 | decrRefCount(channel); | |
247 | } | |
248 | return receivers; | |
249 | } | |
250 | ||
251 | /*----------------------------------------------------------------------------- | |
252 | * Pubsub commands implementation | |
253 | *----------------------------------------------------------------------------*/ | |
254 | ||
255 | void subscribeCommand(redisClient *c) { | |
256 | int j; | |
257 | ||
258 | for (j = 1; j < c->argc; j++) | |
259 | pubsubSubscribeChannel(c,c->argv[j]); | |
260 | } | |
261 | ||
262 | void unsubscribeCommand(redisClient *c) { | |
263 | if (c->argc == 1) { | |
264 | pubsubUnsubscribeAllChannels(c,1); | |
265 | return; | |
266 | } else { | |
267 | int j; | |
268 | ||
269 | for (j = 1; j < c->argc; j++) | |
270 | pubsubUnsubscribeChannel(c,c->argv[j],1); | |
271 | } | |
272 | } | |
273 | ||
274 | void psubscribeCommand(redisClient *c) { | |
275 | int j; | |
276 | ||
277 | for (j = 1; j < c->argc; j++) | |
278 | pubsubSubscribePattern(c,c->argv[j]); | |
279 | } | |
280 | ||
281 | void punsubscribeCommand(redisClient *c) { | |
282 | if (c->argc == 1) { | |
283 | pubsubUnsubscribeAllPatterns(c,1); | |
284 | return; | |
285 | } else { | |
286 | int j; | |
287 | ||
288 | for (j = 1; j < c->argc; j++) | |
289 | pubsubUnsubscribePattern(c,c->argv[j],1); | |
290 | } | |
291 | } | |
292 | ||
293 | void publishCommand(redisClient *c) { | |
294 | int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); | |
295 | if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); | |
296 | addReplyLongLong(c,receivers); | |
297 | } |