]>
Commit | Line | Data |
---|---|---|
1 | #include "redis.h" | |
2 | ||
3 | /*----------------------------------------------------------------------------- | |
4 | * Pubsub low level API | |
5 | *----------------------------------------------------------------------------*/ | |
6 | ||
7 | void freePubsubPattern(void *p) { | |
8 | pubsubPattern *pat = p; | |
9 | ||
10 | decrRefCount(pat->pattern); | |
11 | zfree(pat); | |
12 | } | |
13 | ||
14 | int listMatchPubsubPattern(void *a, void *b) { | |
15 | pubsubPattern *pa = a, *pb = b; | |
16 | ||
17 | return (pa->client == pb->client) && | |
18 | (equalStringObjects(pa->pattern,pb->pattern)); | |
19 | } | |
20 | ||
21 | /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or | |
22 | * 0 if the client was already subscribed to that channel. */ | |
23 | int pubsubSubscribeChannel(redisClient *c, robj *channel) { | |
24 | struct dictEntry *de; | |
25 | list *clients = NULL; | |
26 | int retval = 0; | |
27 | ||
28 | /* Add the channel to the client -> channels hash table */ | |
29 | if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { | |
30 | retval = 1; | |
31 | incrRefCount(channel); | |
32 | /* Add the client to the channel -> list of clients hash table */ | |
33 | de = dictFind(server.pubsub_channels,channel); | |
34 | if (de == NULL) { | |
35 | clients = listCreate(); | |
36 | dictAdd(server.pubsub_channels,channel,clients); | |
37 | incrRefCount(channel); | |
38 | } else { | |
39 | clients = dictGetVal(de); | |
40 | } | |
41 | listAddNodeTail(clients,c); | |
42 | } | |
43 | /* Notify the client */ | |
44 | addReply(c,shared.mbulk3); | |
45 | addReply(c,shared.subscribebulk); | |
46 | addReplyBulk(c,channel); | |
47 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); | |
48 | return retval; | |
49 | } | |
50 | ||
51 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or | |
52 | * 0 if the client was not subscribed to the specified channel. */ | |
53 | int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { | |
54 | struct dictEntry *de; | |
55 | list *clients; | |
56 | listNode *ln; | |
57 | int retval = 0; | |
58 | ||
59 | /* Remove the channel from the client -> channels hash table */ | |
60 | incrRefCount(channel); /* channel may be just a pointer to the same object | |
61 | we have in the hash tables. Protect it... */ | |
62 | if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { | |
63 | retval = 1; | |
64 | /* Remove the client from the channel -> clients list hash table */ | |
65 | de = dictFind(server.pubsub_channels,channel); | |
66 | redisAssertWithInfo(c,NULL,de != NULL); | |
67 | clients = dictGetVal(de); | |
68 | ln = listSearchKey(clients,c); | |
69 | redisAssertWithInfo(c,NULL,ln != NULL); | |
70 | listDelNode(clients,ln); | |
71 | if (listLength(clients) == 0) { | |
72 | /* Free the list and associated hash entry at all if this was | |
73 | * the latest client, so that it will be possible to abuse | |
74 | * Redis PUBSUB creating millions of channels. */ | |
75 | dictDelete(server.pubsub_channels,channel); | |
76 | } | |
77 | } | |
78 | /* Notify the client */ | |
79 | if (notify) { | |
80 | addReply(c,shared.mbulk3); | |
81 | addReply(c,shared.unsubscribebulk); | |
82 | addReplyBulk(c,channel); | |
83 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ | |
84 | listLength(c->pubsub_patterns)); | |
85 | ||
86 | } | |
87 | decrRefCount(channel); /* it is finally safe to release it */ | |
88 | return retval; | |
89 | } | |
90 | ||
91 | /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */ | |
92 | int pubsubSubscribePattern(redisClient *c, robj *pattern) { | |
93 | int retval = 0; | |
94 | ||
95 | if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { | |
96 | retval = 1; | |
97 | pubsubPattern *pat; | |
98 | listAddNodeTail(c->pubsub_patterns,pattern); | |
99 | incrRefCount(pattern); | |
100 | pat = zmalloc(sizeof(*pat)); | |
101 | pat->pattern = getDecodedObject(pattern); | |
102 | pat->client = c; | |
103 | listAddNodeTail(server.pubsub_patterns,pat); | |
104 | } | |
105 | /* Notify the client */ | |
106 | addReply(c,shared.mbulk3); | |
107 | addReply(c,shared.psubscribebulk); | |
108 | addReplyBulk(c,pattern); | |
109 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); | |
110 | return retval; | |
111 | } | |
112 | ||
113 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or | |
114 | * 0 if the client was not subscribed to the specified channel. */ | |
115 | int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { | |
116 | listNode *ln; | |
117 | pubsubPattern pat; | |
118 | int retval = 0; | |
119 | ||
120 | incrRefCount(pattern); /* Protect the object. May be the same we remove */ | |
121 | if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { | |
122 | retval = 1; | |
123 | listDelNode(c->pubsub_patterns,ln); | |
124 | pat.client = c; | |
125 | pat.pattern = pattern; | |
126 | ln = listSearchKey(server.pubsub_patterns,&pat); | |
127 | listDelNode(server.pubsub_patterns,ln); | |
128 | } | |
129 | /* Notify the client */ | |
130 | if (notify) { | |
131 | addReply(c,shared.mbulk3); | |
132 | addReply(c,shared.punsubscribebulk); | |
133 | addReplyBulk(c,pattern); | |
134 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ | |
135 | listLength(c->pubsub_patterns)); | |
136 | } | |
137 | decrRefCount(pattern); | |
138 | return retval; | |
139 | } | |
140 | ||
141 | /* Unsubscribe from all the channels. Return the number of channels the | |
142 | * client was subscribed from. */ | |
143 | int pubsubUnsubscribeAllChannels(redisClient *c, int notify) { | |
144 | dictIterator *di = dictGetSafeIterator(c->pubsub_channels); | |
145 | dictEntry *de; | |
146 | int count = 0; | |
147 | ||
148 | while((de = dictNext(di)) != NULL) { | |
149 | robj *channel = dictGetKey(de); | |
150 | ||
151 | count += pubsubUnsubscribeChannel(c,channel,notify); | |
152 | } | |
153 | dictReleaseIterator(di); | |
154 | return count; | |
155 | } | |
156 | ||
157 | /* Unsubscribe from all the patterns. Return the number of patterns the | |
158 | * client was subscribed from. */ | |
159 | int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) { | |
160 | listNode *ln; | |
161 | listIter li; | |
162 | int count = 0; | |
163 | ||
164 | listRewind(c->pubsub_patterns,&li); | |
165 | while ((ln = listNext(&li)) != NULL) { | |
166 | robj *pattern = ln->value; | |
167 | ||
168 | count += pubsubUnsubscribePattern(c,pattern,notify); | |
169 | } | |
170 | return count; | |
171 | } | |
172 | ||
173 | /* Publish a message */ | |
174 | int pubsubPublishMessage(robj *channel, robj *message) { | |
175 | int receivers = 0; | |
176 | struct dictEntry *de; | |
177 | listNode *ln; | |
178 | listIter li; | |
179 | ||
180 | /* Send to clients listening for that channel */ | |
181 | de = dictFind(server.pubsub_channels,channel); | |
182 | if (de) { | |
183 | list *list = dictGetVal(de); | |
184 | listNode *ln; | |
185 | listIter li; | |
186 | ||
187 | listRewind(list,&li); | |
188 | while ((ln = listNext(&li)) != NULL) { | |
189 | redisClient *c = ln->value; | |
190 | ||
191 | addReply(c,shared.mbulk3); | |
192 | addReply(c,shared.messagebulk); | |
193 | addReplyBulk(c,channel); | |
194 | addReplyBulk(c,message); | |
195 | receivers++; | |
196 | } | |
197 | } | |
198 | /* Send to clients listening to matching channels */ | |
199 | if (listLength(server.pubsub_patterns)) { | |
200 | listRewind(server.pubsub_patterns,&li); | |
201 | channel = getDecodedObject(channel); | |
202 | while ((ln = listNext(&li)) != NULL) { | |
203 | pubsubPattern *pat = ln->value; | |
204 | ||
205 | if (stringmatchlen((char*)pat->pattern->ptr, | |
206 | sdslen(pat->pattern->ptr), | |
207 | (char*)channel->ptr, | |
208 | sdslen(channel->ptr),0)) { | |
209 | addReply(pat->client,shared.mbulk4); | |
210 | addReply(pat->client,shared.pmessagebulk); | |
211 | addReplyBulk(pat->client,pat->pattern); | |
212 | addReplyBulk(pat->client,channel); | |
213 | addReplyBulk(pat->client,message); | |
214 | receivers++; | |
215 | } | |
216 | } | |
217 | decrRefCount(channel); | |
218 | } | |
219 | return receivers; | |
220 | } | |
221 | ||
222 | /*----------------------------------------------------------------------------- | |
223 | * Pubsub commands implementation | |
224 | *----------------------------------------------------------------------------*/ | |
225 | ||
226 | void subscribeCommand(redisClient *c) { | |
227 | int j; | |
228 | ||
229 | for (j = 1; j < c->argc; j++) | |
230 | pubsubSubscribeChannel(c,c->argv[j]); | |
231 | } | |
232 | ||
233 | void unsubscribeCommand(redisClient *c) { | |
234 | if (c->argc == 1) { | |
235 | pubsubUnsubscribeAllChannels(c,1); | |
236 | return; | |
237 | } else { | |
238 | int j; | |
239 | ||
240 | for (j = 1; j < c->argc; j++) | |
241 | pubsubUnsubscribeChannel(c,c->argv[j],1); | |
242 | } | |
243 | } | |
244 | ||
245 | void psubscribeCommand(redisClient *c) { | |
246 | int j; | |
247 | ||
248 | for (j = 1; j < c->argc; j++) | |
249 | pubsubSubscribePattern(c,c->argv[j]); | |
250 | } | |
251 | ||
252 | void punsubscribeCommand(redisClient *c) { | |
253 | if (c->argc == 1) { | |
254 | pubsubUnsubscribeAllPatterns(c,1); | |
255 | return; | |
256 | } else { | |
257 | int j; | |
258 | ||
259 | for (j = 1; j < c->argc; j++) | |
260 | pubsubUnsubscribePattern(c,c->argv[j],1); | |
261 | } | |
262 | } | |
263 | ||
264 | void publishCommand(redisClient *c) { | |
265 | int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); | |
266 | if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); | |
267 | addReplyLongLong(c,receivers); | |
268 | } |