]> git.saurik.com Git - redis.git/blob - src/pubsub.c
Test: more MIGRATE tests.
[redis.git] / src / pubsub.c
1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "redis.h"
31
32 /*-----------------------------------------------------------------------------
33 * Pubsub low level API
34 *----------------------------------------------------------------------------*/
35
36 void freePubsubPattern(void *p) {
37 pubsubPattern *pat = p;
38
39 decrRefCount(pat->pattern);
40 zfree(pat);
41 }
42
43 int listMatchPubsubPattern(void *a, void *b) {
44 pubsubPattern *pa = a, *pb = b;
45
46 return (pa->client == pb->client) &&
47 (equalStringObjects(pa->pattern,pb->pattern));
48 }
49
50 /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
51 * 0 if the client was already subscribed to that channel. */
52 int pubsubSubscribeChannel(redisClient *c, robj *channel) {
53 struct dictEntry *de;
54 list *clients = NULL;
55 int retval = 0;
56
57 /* Add the channel to the client -> channels hash table */
58 if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
59 retval = 1;
60 incrRefCount(channel);
61 /* Add the client to the channel -> list of clients hash table */
62 de = dictFind(server.pubsub_channels,channel);
63 if (de == NULL) {
64 clients = listCreate();
65 dictAdd(server.pubsub_channels,channel,clients);
66 incrRefCount(channel);
67 } else {
68 clients = dictGetVal(de);
69 }
70 listAddNodeTail(clients,c);
71 }
72 /* Notify the client */
73 addReply(c,shared.mbulkhdr[3]);
74 addReply(c,shared.subscribebulk);
75 addReplyBulk(c,channel);
76 addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
77 return retval;
78 }
79
80 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
81 * 0 if the client was not subscribed to the specified channel. */
82 int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
83 struct dictEntry *de;
84 list *clients;
85 listNode *ln;
86 int retval = 0;
87
88 /* Remove the channel from the client -> channels hash table */
89 incrRefCount(channel); /* channel may be just a pointer to the same object
90 we have in the hash tables. Protect it... */
91 if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
92 retval = 1;
93 /* Remove the client from the channel -> clients list hash table */
94 de = dictFind(server.pubsub_channels,channel);
95 redisAssertWithInfo(c,NULL,de != NULL);
96 clients = dictGetVal(de);
97 ln = listSearchKey(clients,c);
98 redisAssertWithInfo(c,NULL,ln != NULL);
99 listDelNode(clients,ln);
100 if (listLength(clients) == 0) {
101 /* Free the list and associated hash entry at all if this was
102 * the latest client, so that it will be possible to abuse
103 * Redis PUBSUB creating millions of channels. */
104 dictDelete(server.pubsub_channels,channel);
105 }
106 }
107 /* Notify the client */
108 if (notify) {
109 addReply(c,shared.mbulkhdr[3]);
110 addReply(c,shared.unsubscribebulk);
111 addReplyBulk(c,channel);
112 addReplyLongLong(c,dictSize(c->pubsub_channels)+
113 listLength(c->pubsub_patterns));
114
115 }
116 decrRefCount(channel); /* it is finally safe to release it */
117 return retval;
118 }
119
120 /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
121 int pubsubSubscribePattern(redisClient *c, robj *pattern) {
122 int retval = 0;
123
124 if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
125 retval = 1;
126 pubsubPattern *pat;
127 listAddNodeTail(c->pubsub_patterns,pattern);
128 incrRefCount(pattern);
129 pat = zmalloc(sizeof(*pat));
130 pat->pattern = getDecodedObject(pattern);
131 pat->client = c;
132 listAddNodeTail(server.pubsub_patterns,pat);
133 }
134 /* Notify the client */
135 addReply(c,shared.mbulkhdr[3]);
136 addReply(c,shared.psubscribebulk);
137 addReplyBulk(c,pattern);
138 addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
139 return retval;
140 }
141
142 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
143 * 0 if the client was not subscribed to the specified channel. */
144 int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
145 listNode *ln;
146 pubsubPattern pat;
147 int retval = 0;
148
149 incrRefCount(pattern); /* Protect the object. May be the same we remove */
150 if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
151 retval = 1;
152 listDelNode(c->pubsub_patterns,ln);
153 pat.client = c;
154 pat.pattern = pattern;
155 ln = listSearchKey(server.pubsub_patterns,&pat);
156 listDelNode(server.pubsub_patterns,ln);
157 }
158 /* Notify the client */
159 if (notify) {
160 addReply(c,shared.mbulkhdr[3]);
161 addReply(c,shared.punsubscribebulk);
162 addReplyBulk(c,pattern);
163 addReplyLongLong(c,dictSize(c->pubsub_channels)+
164 listLength(c->pubsub_patterns));
165 }
166 decrRefCount(pattern);
167 return retval;
168 }
169
170 /* Unsubscribe from all the channels. Return the number of channels the
171 * client was subscribed from. */
172 int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
173 dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
174 dictEntry *de;
175 int count = 0;
176
177 while((de = dictNext(di)) != NULL) {
178 robj *channel = dictGetKey(de);
179
180 count += pubsubUnsubscribeChannel(c,channel,notify);
181 }
182 dictReleaseIterator(di);
183 return count;
184 }
185
186 /* Unsubscribe from all the patterns. Return the number of patterns the
187 * client was subscribed from. */
188 int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
189 listNode *ln;
190 listIter li;
191 int count = 0;
192
193 listRewind(c->pubsub_patterns,&li);
194 while ((ln = listNext(&li)) != NULL) {
195 robj *pattern = ln->value;
196
197 count += pubsubUnsubscribePattern(c,pattern,notify);
198 }
199 return count;
200 }
201
202 /* Publish a message */
203 int pubsubPublishMessage(robj *channel, robj *message) {
204 int receivers = 0;
205 struct dictEntry *de;
206 listNode *ln;
207 listIter li;
208
209 /* Send to clients listening for that channel */
210 de = dictFind(server.pubsub_channels,channel);
211 if (de) {
212 list *list = dictGetVal(de);
213 listNode *ln;
214 listIter li;
215
216 listRewind(list,&li);
217 while ((ln = listNext(&li)) != NULL) {
218 redisClient *c = ln->value;
219
220 addReply(c,shared.mbulkhdr[3]);
221 addReply(c,shared.messagebulk);
222 addReplyBulk(c,channel);
223 addReplyBulk(c,message);
224 receivers++;
225 }
226 }
227 /* Send to clients listening to matching channels */
228 if (listLength(server.pubsub_patterns)) {
229 listRewind(server.pubsub_patterns,&li);
230 channel = getDecodedObject(channel);
231 while ((ln = listNext(&li)) != NULL) {
232 pubsubPattern *pat = ln->value;
233
234 if (stringmatchlen((char*)pat->pattern->ptr,
235 sdslen(pat->pattern->ptr),
236 (char*)channel->ptr,
237 sdslen(channel->ptr),0)) {
238 addReply(pat->client,shared.mbulkhdr[4]);
239 addReply(pat->client,shared.pmessagebulk);
240 addReplyBulk(pat->client,pat->pattern);
241 addReplyBulk(pat->client,channel);
242 addReplyBulk(pat->client,message);
243 receivers++;
244 }
245 }
246 decrRefCount(channel);
247 }
248 return receivers;
249 }
250
251 /*-----------------------------------------------------------------------------
252 * Pubsub commands implementation
253 *----------------------------------------------------------------------------*/
254
255 void subscribeCommand(redisClient *c) {
256 int j;
257
258 for (j = 1; j < c->argc; j++)
259 pubsubSubscribeChannel(c,c->argv[j]);
260 }
261
262 void unsubscribeCommand(redisClient *c) {
263 if (c->argc == 1) {
264 pubsubUnsubscribeAllChannels(c,1);
265 return;
266 } else {
267 int j;
268
269 for (j = 1; j < c->argc; j++)
270 pubsubUnsubscribeChannel(c,c->argv[j],1);
271 }
272 }
273
274 void psubscribeCommand(redisClient *c) {
275 int j;
276
277 for (j = 1; j < c->argc; j++)
278 pubsubSubscribePattern(c,c->argv[j]);
279 }
280
281 void punsubscribeCommand(redisClient *c) {
282 if (c->argc == 1) {
283 pubsubUnsubscribeAllPatterns(c,1);
284 return;
285 } else {
286 int j;
287
288 for (j = 1; j < c->argc; j++)
289 pubsubUnsubscribePattern(c,c->argv[j],1);
290 }
291 }
292
293 void publishCommand(redisClient *c) {
294 int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
295 if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
296 addReplyLongLong(c,receivers);
297 }