e2641e09 |
1 | #include "redis.h" |
2 | |
0f49d6b0 |
3 | /*----------------------------------------------------------------------------- |
4 | * Pubsub low level API |
5 | *----------------------------------------------------------------------------*/ |
6 | |
e2641e09 |
7 | void freePubsubPattern(void *p) { |
8 | pubsubPattern *pat = p; |
9 | |
10 | decrRefCount(pat->pattern); |
11 | zfree(pat); |
12 | } |
13 | |
14 | int listMatchPubsubPattern(void *a, void *b) { |
15 | pubsubPattern *pa = a, *pb = b; |
16 | |
17 | return (pa->client == pb->client) && |
18 | (equalStringObjects(pa->pattern,pb->pattern)); |
19 | } |
20 | |
21 | /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or |
22 | * 0 if the client was already subscribed to that channel. */ |
23 | int pubsubSubscribeChannel(redisClient *c, robj *channel) { |
24 | struct dictEntry *de; |
25 | list *clients = NULL; |
26 | int retval = 0; |
27 | |
28 | /* Add the channel to the client -> channels hash table */ |
29 | if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { |
30 | retval = 1; |
31 | incrRefCount(channel); |
32 | /* Add the client to the channel -> list of clients hash table */ |
33 | de = dictFind(server.pubsub_channels,channel); |
34 | if (de == NULL) { |
35 | clients = listCreate(); |
36 | dictAdd(server.pubsub_channels,channel,clients); |
37 | incrRefCount(channel); |
38 | } else { |
c0ba9ebe |
39 | clients = dictGetVal(de); |
e2641e09 |
40 | } |
41 | listAddNodeTail(clients,c); |
42 | } |
43 | /* Notify the client */ |
355f8591 |
44 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 |
45 | addReply(c,shared.subscribebulk); |
46 | addReplyBulk(c,channel); |
47 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); |
48 | return retval; |
49 | } |
50 | |
51 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or |
52 | * 0 if the client was not subscribed to the specified channel. */ |
53 | int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { |
54 | struct dictEntry *de; |
55 | list *clients; |
56 | listNode *ln; |
57 | int retval = 0; |
58 | |
59 | /* Remove the channel from the client -> channels hash table */ |
60 | incrRefCount(channel); /* channel may be just a pointer to the same object |
61 | we have in the hash tables. Protect it... */ |
62 | if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { |
63 | retval = 1; |
64 | /* Remove the client from the channel -> clients list hash table */ |
65 | de = dictFind(server.pubsub_channels,channel); |
eab0e26e |
66 | redisAssertWithInfo(c,NULL,de != NULL); |
c0ba9ebe |
67 | clients = dictGetVal(de); |
e2641e09 |
68 | ln = listSearchKey(clients,c); |
eab0e26e |
69 | redisAssertWithInfo(c,NULL,ln != NULL); |
e2641e09 |
70 | listDelNode(clients,ln); |
71 | if (listLength(clients) == 0) { |
72 | /* Free the list and associated hash entry at all if this was |
73 | * the latest client, so that it will be possible to abuse |
74 | * Redis PUBSUB creating millions of channels. */ |
75 | dictDelete(server.pubsub_channels,channel); |
76 | } |
77 | } |
78 | /* Notify the client */ |
79 | if (notify) { |
355f8591 |
80 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 |
81 | addReply(c,shared.unsubscribebulk); |
82 | addReplyBulk(c,channel); |
83 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ |
84 | listLength(c->pubsub_patterns)); |
85 | |
86 | } |
87 | decrRefCount(channel); /* it is finally safe to release it */ |
88 | return retval; |
89 | } |
90 | |
91 | /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */ |
92 | int pubsubSubscribePattern(redisClient *c, robj *pattern) { |
93 | int retval = 0; |
94 | |
95 | if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { |
96 | retval = 1; |
97 | pubsubPattern *pat; |
98 | listAddNodeTail(c->pubsub_patterns,pattern); |
99 | incrRefCount(pattern); |
100 | pat = zmalloc(sizeof(*pat)); |
101 | pat->pattern = getDecodedObject(pattern); |
102 | pat->client = c; |
103 | listAddNodeTail(server.pubsub_patterns,pat); |
104 | } |
105 | /* Notify the client */ |
355f8591 |
106 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 |
107 | addReply(c,shared.psubscribebulk); |
108 | addReplyBulk(c,pattern); |
109 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); |
110 | return retval; |
111 | } |
112 | |
113 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or |
114 | * 0 if the client was not subscribed to the specified channel. */ |
115 | int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { |
116 | listNode *ln; |
117 | pubsubPattern pat; |
118 | int retval = 0; |
119 | |
120 | incrRefCount(pattern); /* Protect the object. May be the same we remove */ |
121 | if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { |
122 | retval = 1; |
123 | listDelNode(c->pubsub_patterns,ln); |
124 | pat.client = c; |
125 | pat.pattern = pattern; |
126 | ln = listSearchKey(server.pubsub_patterns,&pat); |
127 | listDelNode(server.pubsub_patterns,ln); |
128 | } |
129 | /* Notify the client */ |
130 | if (notify) { |
355f8591 |
131 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 |
132 | addReply(c,shared.punsubscribebulk); |
133 | addReplyBulk(c,pattern); |
134 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ |
135 | listLength(c->pubsub_patterns)); |
136 | } |
137 | decrRefCount(pattern); |
138 | return retval; |
139 | } |
140 | |
141 | /* Unsubscribe from all the channels. Return the number of channels the |
142 | * client was subscribed from. */ |
143 | int pubsubUnsubscribeAllChannels(redisClient *c, int notify) { |
efc34087 |
144 | dictIterator *di = dictGetSafeIterator(c->pubsub_channels); |
e2641e09 |
145 | dictEntry *de; |
146 | int count = 0; |
147 | |
148 | while((de = dictNext(di)) != NULL) { |
c0ba9ebe |
149 | robj *channel = dictGetKey(de); |
e2641e09 |
150 | |
151 | count += pubsubUnsubscribeChannel(c,channel,notify); |
152 | } |
153 | dictReleaseIterator(di); |
154 | return count; |
155 | } |
156 | |
157 | /* Unsubscribe from all the patterns. Return the number of patterns the |
158 | * client was subscribed from. */ |
159 | int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) { |
160 | listNode *ln; |
161 | listIter li; |
162 | int count = 0; |
163 | |
164 | listRewind(c->pubsub_patterns,&li); |
165 | while ((ln = listNext(&li)) != NULL) { |
166 | robj *pattern = ln->value; |
167 | |
168 | count += pubsubUnsubscribePattern(c,pattern,notify); |
169 | } |
170 | return count; |
171 | } |
172 | |
173 | /* Publish a message */ |
174 | int pubsubPublishMessage(robj *channel, robj *message) { |
175 | int receivers = 0; |
176 | struct dictEntry *de; |
177 | listNode *ln; |
178 | listIter li; |
179 | |
180 | /* Send to clients listening for that channel */ |
181 | de = dictFind(server.pubsub_channels,channel); |
182 | if (de) { |
c0ba9ebe |
183 | list *list = dictGetVal(de); |
e2641e09 |
184 | listNode *ln; |
185 | listIter li; |
186 | |
187 | listRewind(list,&li); |
188 | while ((ln = listNext(&li)) != NULL) { |
189 | redisClient *c = ln->value; |
190 | |
355f8591 |
191 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 |
192 | addReply(c,shared.messagebulk); |
193 | addReplyBulk(c,channel); |
194 | addReplyBulk(c,message); |
195 | receivers++; |
196 | } |
197 | } |
198 | /* Send to clients listening to matching channels */ |
199 | if (listLength(server.pubsub_patterns)) { |
200 | listRewind(server.pubsub_patterns,&li); |
201 | channel = getDecodedObject(channel); |
202 | while ((ln = listNext(&li)) != NULL) { |
203 | pubsubPattern *pat = ln->value; |
204 | |
205 | if (stringmatchlen((char*)pat->pattern->ptr, |
206 | sdslen(pat->pattern->ptr), |
207 | (char*)channel->ptr, |
208 | sdslen(channel->ptr),0)) { |
355f8591 |
209 | addReply(pat->client,shared.mbulkhdr[4]); |
e2641e09 |
210 | addReply(pat->client,shared.pmessagebulk); |
211 | addReplyBulk(pat->client,pat->pattern); |
212 | addReplyBulk(pat->client,channel); |
213 | addReplyBulk(pat->client,message); |
214 | receivers++; |
215 | } |
216 | } |
217 | decrRefCount(channel); |
218 | } |
219 | return receivers; |
220 | } |
221 | |
0f49d6b0 |
222 | /*----------------------------------------------------------------------------- |
223 | * Pubsub commands implementation |
224 | *----------------------------------------------------------------------------*/ |
225 | |
e2641e09 |
226 | void subscribeCommand(redisClient *c) { |
227 | int j; |
228 | |
229 | for (j = 1; j < c->argc; j++) |
230 | pubsubSubscribeChannel(c,c->argv[j]); |
231 | } |
232 | |
233 | void unsubscribeCommand(redisClient *c) { |
234 | if (c->argc == 1) { |
235 | pubsubUnsubscribeAllChannels(c,1); |
236 | return; |
237 | } else { |
238 | int j; |
239 | |
240 | for (j = 1; j < c->argc; j++) |
241 | pubsubUnsubscribeChannel(c,c->argv[j],1); |
242 | } |
243 | } |
244 | |
245 | void psubscribeCommand(redisClient *c) { |
246 | int j; |
247 | |
248 | for (j = 1; j < c->argc; j++) |
249 | pubsubSubscribePattern(c,c->argv[j]); |
250 | } |
251 | |
252 | void punsubscribeCommand(redisClient *c) { |
253 | if (c->argc == 1) { |
254 | pubsubUnsubscribeAllPatterns(c,1); |
255 | return; |
256 | } else { |
257 | int j; |
258 | |
259 | for (j = 1; j < c->argc; j++) |
260 | pubsubUnsubscribePattern(c,c->argv[j],1); |
261 | } |
262 | } |
263 | |
264 | void publishCommand(redisClient *c) { |
265 | int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); |
266 | addReplyLongLong(c,receivers); |
267 | } |