]> git.saurik.com Git - redis.git/blame - src/pubsub.c
Make bio.c threads killable ASAP if needed.
[redis.git] / src / pubsub.c
CommitLineData
4365e5b2 1/*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
e2641e09 30#include "redis.h"
31
0f49d6b0 32/*-----------------------------------------------------------------------------
33 * Pubsub low level API
34 *----------------------------------------------------------------------------*/
35
e2641e09 36void freePubsubPattern(void *p) {
37 pubsubPattern *pat = p;
38
39 decrRefCount(pat->pattern);
40 zfree(pat);
41}
42
43int listMatchPubsubPattern(void *a, void *b) {
44 pubsubPattern *pa = a, *pb = b;
45
46 return (pa->client == pb->client) &&
47 (equalStringObjects(pa->pattern,pb->pattern));
48}
49
50/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
51 * 0 if the client was already subscribed to that channel. */
52int pubsubSubscribeChannel(redisClient *c, robj *channel) {
53 struct dictEntry *de;
54 list *clients = NULL;
55 int retval = 0;
56
57 /* Add the channel to the client -> channels hash table */
58 if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
59 retval = 1;
60 incrRefCount(channel);
61 /* Add the client to the channel -> list of clients hash table */
62 de = dictFind(server.pubsub_channels,channel);
63 if (de == NULL) {
64 clients = listCreate();
65 dictAdd(server.pubsub_channels,channel,clients);
66 incrRefCount(channel);
67 } else {
c0ba9ebe 68 clients = dictGetVal(de);
e2641e09 69 }
70 listAddNodeTail(clients,c);
71 }
72 /* Notify the client */
355f8591 73 addReply(c,shared.mbulkhdr[3]);
e2641e09 74 addReply(c,shared.subscribebulk);
75 addReplyBulk(c,channel);
76 addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
77 return retval;
78}
79
80/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
81 * 0 if the client was not subscribed to the specified channel. */
82int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
83 struct dictEntry *de;
84 list *clients;
85 listNode *ln;
86 int retval = 0;
87
88 /* Remove the channel from the client -> channels hash table */
89 incrRefCount(channel); /* channel may be just a pointer to the same object
90 we have in the hash tables. Protect it... */
91 if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
92 retval = 1;
93 /* Remove the client from the channel -> clients list hash table */
94 de = dictFind(server.pubsub_channels,channel);
eab0e26e 95 redisAssertWithInfo(c,NULL,de != NULL);
c0ba9ebe 96 clients = dictGetVal(de);
e2641e09 97 ln = listSearchKey(clients,c);
eab0e26e 98 redisAssertWithInfo(c,NULL,ln != NULL);
e2641e09 99 listDelNode(clients,ln);
100 if (listLength(clients) == 0) {
101 /* Free the list and associated hash entry at all if this was
102 * the latest client, so that it will be possible to abuse
103 * Redis PUBSUB creating millions of channels. */
104 dictDelete(server.pubsub_channels,channel);
105 }
106 }
107 /* Notify the client */
108 if (notify) {
355f8591 109 addReply(c,shared.mbulkhdr[3]);
e2641e09 110 addReply(c,shared.unsubscribebulk);
111 addReplyBulk(c,channel);
112 addReplyLongLong(c,dictSize(c->pubsub_channels)+
113 listLength(c->pubsub_patterns));
114
115 }
116 decrRefCount(channel); /* it is finally safe to release it */
117 return retval;
118}
119
120/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
121int pubsubSubscribePattern(redisClient *c, robj *pattern) {
122 int retval = 0;
123
124 if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
125 retval = 1;
126 pubsubPattern *pat;
127 listAddNodeTail(c->pubsub_patterns,pattern);
128 incrRefCount(pattern);
129 pat = zmalloc(sizeof(*pat));
130 pat->pattern = getDecodedObject(pattern);
131 pat->client = c;
132 listAddNodeTail(server.pubsub_patterns,pat);
133 }
134 /* Notify the client */
355f8591 135 addReply(c,shared.mbulkhdr[3]);
e2641e09 136 addReply(c,shared.psubscribebulk);
137 addReplyBulk(c,pattern);
138 addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
139 return retval;
140}
141
142/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
143 * 0 if the client was not subscribed to the specified channel. */
144int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
145 listNode *ln;
146 pubsubPattern pat;
147 int retval = 0;
148
149 incrRefCount(pattern); /* Protect the object. May be the same we remove */
150 if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
151 retval = 1;
152 listDelNode(c->pubsub_patterns,ln);
153 pat.client = c;
154 pat.pattern = pattern;
155 ln = listSearchKey(server.pubsub_patterns,&pat);
156 listDelNode(server.pubsub_patterns,ln);
157 }
158 /* Notify the client */
159 if (notify) {
355f8591 160 addReply(c,shared.mbulkhdr[3]);
e2641e09 161 addReply(c,shared.punsubscribebulk);
162 addReplyBulk(c,pattern);
163 addReplyLongLong(c,dictSize(c->pubsub_channels)+
164 listLength(c->pubsub_patterns));
165 }
166 decrRefCount(pattern);
167 return retval;
168}
169
170/* Unsubscribe from all the channels. Return the number of channels the
171 * client was subscribed from. */
172int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
efc34087 173 dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
e2641e09 174 dictEntry *de;
175 int count = 0;
176
177 while((de = dictNext(di)) != NULL) {
c0ba9ebe 178 robj *channel = dictGetKey(de);
e2641e09 179
180 count += pubsubUnsubscribeChannel(c,channel,notify);
181 }
182 dictReleaseIterator(di);
183 return count;
184}
185
186/* Unsubscribe from all the patterns. Return the number of patterns the
187 * client was subscribed from. */
188int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
189 listNode *ln;
190 listIter li;
191 int count = 0;
192
193 listRewind(c->pubsub_patterns,&li);
194 while ((ln = listNext(&li)) != NULL) {
195 robj *pattern = ln->value;
196
197 count += pubsubUnsubscribePattern(c,pattern,notify);
198 }
199 return count;
200}
201
202/* Publish a message */
203int pubsubPublishMessage(robj *channel, robj *message) {
204 int receivers = 0;
205 struct dictEntry *de;
206 listNode *ln;
207 listIter li;
208
209 /* Send to clients listening for that channel */
210 de = dictFind(server.pubsub_channels,channel);
211 if (de) {
c0ba9ebe 212 list *list = dictGetVal(de);
e2641e09 213 listNode *ln;
214 listIter li;
215
216 listRewind(list,&li);
217 while ((ln = listNext(&li)) != NULL) {
218 redisClient *c = ln->value;
219
355f8591 220 addReply(c,shared.mbulkhdr[3]);
e2641e09 221 addReply(c,shared.messagebulk);
222 addReplyBulk(c,channel);
223 addReplyBulk(c,message);
224 receivers++;
225 }
226 }
227 /* Send to clients listening to matching channels */
228 if (listLength(server.pubsub_patterns)) {
229 listRewind(server.pubsub_patterns,&li);
230 channel = getDecodedObject(channel);
231 while ((ln = listNext(&li)) != NULL) {
232 pubsubPattern *pat = ln->value;
233
234 if (stringmatchlen((char*)pat->pattern->ptr,
235 sdslen(pat->pattern->ptr),
236 (char*)channel->ptr,
237 sdslen(channel->ptr),0)) {
355f8591 238 addReply(pat->client,shared.mbulkhdr[4]);
e2641e09 239 addReply(pat->client,shared.pmessagebulk);
240 addReplyBulk(pat->client,pat->pattern);
241 addReplyBulk(pat->client,channel);
242 addReplyBulk(pat->client,message);
243 receivers++;
244 }
245 }
246 decrRefCount(channel);
247 }
248 return receivers;
249}
250
0f49d6b0 251/*-----------------------------------------------------------------------------
252 * Pubsub commands implementation
253 *----------------------------------------------------------------------------*/
254
e2641e09 255void subscribeCommand(redisClient *c) {
256 int j;
257
258 for (j = 1; j < c->argc; j++)
259 pubsubSubscribeChannel(c,c->argv[j]);
260}
261
262void unsubscribeCommand(redisClient *c) {
263 if (c->argc == 1) {
264 pubsubUnsubscribeAllChannels(c,1);
265 return;
266 } else {
267 int j;
268
269 for (j = 1; j < c->argc; j++)
270 pubsubUnsubscribeChannel(c,c->argv[j],1);
271 }
272}
273
274void psubscribeCommand(redisClient *c) {
275 int j;
276
277 for (j = 1; j < c->argc; j++)
278 pubsubSubscribePattern(c,c->argv[j]);
279}
280
281void punsubscribeCommand(redisClient *c) {
282 if (c->argc == 1) {
283 pubsubUnsubscribeAllPatterns(c,1);
284 return;
285 } else {
286 int j;
287
288 for (j = 1; j < c->argc; j++)
289 pubsubUnsubscribePattern(c,c->argv[j],1);
290 }
291}
292
293void publishCommand(redisClient *c) {
294 int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
c563ce46 295 if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
e2641e09 296 addReplyLongLong(c,receivers);
297}