]>
Commit | Line | Data |
---|---|---|
e2641e09 | 1 | #include "redis.h" |
2 | ||
0f49d6b0 | 3 | /*----------------------------------------------------------------------------- |
4 | * Pubsub low level API | |
5 | *----------------------------------------------------------------------------*/ | |
6 | ||
e2641e09 | 7 | void freePubsubPattern(void *p) { |
8 | pubsubPattern *pat = p; | |
9 | ||
10 | decrRefCount(pat->pattern); | |
11 | zfree(pat); | |
12 | } | |
13 | ||
14 | int listMatchPubsubPattern(void *a, void *b) { | |
15 | pubsubPattern *pa = a, *pb = b; | |
16 | ||
17 | return (pa->client == pb->client) && | |
18 | (equalStringObjects(pa->pattern,pb->pattern)); | |
19 | } | |
20 | ||
21 | /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or | |
22 | * 0 if the client was already subscribed to that channel. */ | |
23 | int pubsubSubscribeChannel(redisClient *c, robj *channel) { | |
24 | struct dictEntry *de; | |
25 | list *clients = NULL; | |
26 | int retval = 0; | |
27 | ||
28 | /* Add the channel to the client -> channels hash table */ | |
29 | if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { | |
30 | retval = 1; | |
31 | incrRefCount(channel); | |
32 | /* Add the client to the channel -> list of clients hash table */ | |
33 | de = dictFind(server.pubsub_channels,channel); | |
34 | if (de == NULL) { | |
35 | clients = listCreate(); | |
36 | dictAdd(server.pubsub_channels,channel,clients); | |
37 | incrRefCount(channel); | |
38 | } else { | |
c0ba9ebe | 39 | clients = dictGetVal(de); |
e2641e09 | 40 | } |
41 | listAddNodeTail(clients,c); | |
42 | } | |
43 | /* Notify the client */ | |
355f8591 | 44 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 | 45 | addReply(c,shared.subscribebulk); |
46 | addReplyBulk(c,channel); | |
47 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); | |
48 | return retval; | |
49 | } | |
50 | ||
51 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or | |
52 | * 0 if the client was not subscribed to the specified channel. */ | |
53 | int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { | |
54 | struct dictEntry *de; | |
55 | list *clients; | |
56 | listNode *ln; | |
57 | int retval = 0; | |
58 | ||
59 | /* Remove the channel from the client -> channels hash table */ | |
60 | incrRefCount(channel); /* channel may be just a pointer to the same object | |
61 | we have in the hash tables. Protect it... */ | |
62 | if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { | |
63 | retval = 1; | |
64 | /* Remove the client from the channel -> clients list hash table */ | |
65 | de = dictFind(server.pubsub_channels,channel); | |
eab0e26e | 66 | redisAssertWithInfo(c,NULL,de != NULL); |
c0ba9ebe | 67 | clients = dictGetVal(de); |
e2641e09 | 68 | ln = listSearchKey(clients,c); |
eab0e26e | 69 | redisAssertWithInfo(c,NULL,ln != NULL); |
e2641e09 | 70 | listDelNode(clients,ln); |
71 | if (listLength(clients) == 0) { | |
72 | /* Free the list and associated hash entry at all if this was | |
73 | * the latest client, so that it will be possible to abuse | |
74 | * Redis PUBSUB creating millions of channels. */ | |
75 | dictDelete(server.pubsub_channels,channel); | |
76 | } | |
77 | } | |
78 | /* Notify the client */ | |
79 | if (notify) { | |
355f8591 | 80 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 | 81 | addReply(c,shared.unsubscribebulk); |
82 | addReplyBulk(c,channel); | |
83 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ | |
84 | listLength(c->pubsub_patterns)); | |
85 | ||
86 | } | |
87 | decrRefCount(channel); /* it is finally safe to release it */ | |
88 | return retval; | |
89 | } | |
90 | ||
91 | /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */ | |
92 | int pubsubSubscribePattern(redisClient *c, robj *pattern) { | |
93 | int retval = 0; | |
94 | ||
95 | if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { | |
96 | retval = 1; | |
97 | pubsubPattern *pat; | |
98 | listAddNodeTail(c->pubsub_patterns,pattern); | |
99 | incrRefCount(pattern); | |
100 | pat = zmalloc(sizeof(*pat)); | |
101 | pat->pattern = getDecodedObject(pattern); | |
102 | pat->client = c; | |
103 | listAddNodeTail(server.pubsub_patterns,pat); | |
104 | } | |
105 | /* Notify the client */ | |
355f8591 | 106 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 | 107 | addReply(c,shared.psubscribebulk); |
108 | addReplyBulk(c,pattern); | |
109 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); | |
110 | return retval; | |
111 | } | |
112 | ||
113 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or | |
114 | * 0 if the client was not subscribed to the specified channel. */ | |
115 | int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { | |
116 | listNode *ln; | |
117 | pubsubPattern pat; | |
118 | int retval = 0; | |
119 | ||
120 | incrRefCount(pattern); /* Protect the object. May be the same we remove */ | |
121 | if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { | |
122 | retval = 1; | |
123 | listDelNode(c->pubsub_patterns,ln); | |
124 | pat.client = c; | |
125 | pat.pattern = pattern; | |
126 | ln = listSearchKey(server.pubsub_patterns,&pat); | |
127 | listDelNode(server.pubsub_patterns,ln); | |
128 | } | |
129 | /* Notify the client */ | |
130 | if (notify) { | |
355f8591 | 131 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 | 132 | addReply(c,shared.punsubscribebulk); |
133 | addReplyBulk(c,pattern); | |
134 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ | |
135 | listLength(c->pubsub_patterns)); | |
136 | } | |
137 | decrRefCount(pattern); | |
138 | return retval; | |
139 | } | |
140 | ||
141 | /* Unsubscribe from all the channels. Return the number of channels the | |
142 | * client was subscribed from. */ | |
143 | int pubsubUnsubscribeAllChannels(redisClient *c, int notify) { | |
efc34087 | 144 | dictIterator *di = dictGetSafeIterator(c->pubsub_channels); |
e2641e09 | 145 | dictEntry *de; |
146 | int count = 0; | |
147 | ||
148 | while((de = dictNext(di)) != NULL) { | |
c0ba9ebe | 149 | robj *channel = dictGetKey(de); |
e2641e09 | 150 | |
151 | count += pubsubUnsubscribeChannel(c,channel,notify); | |
152 | } | |
153 | dictReleaseIterator(di); | |
154 | return count; | |
155 | } | |
156 | ||
157 | /* Unsubscribe from all the patterns. Return the number of patterns the | |
158 | * client was subscribed from. */ | |
159 | int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) { | |
160 | listNode *ln; | |
161 | listIter li; | |
162 | int count = 0; | |
163 | ||
164 | listRewind(c->pubsub_patterns,&li); | |
165 | while ((ln = listNext(&li)) != NULL) { | |
166 | robj *pattern = ln->value; | |
167 | ||
168 | count += pubsubUnsubscribePattern(c,pattern,notify); | |
169 | } | |
170 | return count; | |
171 | } | |
172 | ||
173 | /* Publish a message */ | |
174 | int pubsubPublishMessage(robj *channel, robj *message) { | |
175 | int receivers = 0; | |
176 | struct dictEntry *de; | |
177 | listNode *ln; | |
178 | listIter li; | |
179 | ||
180 | /* Send to clients listening for that channel */ | |
181 | de = dictFind(server.pubsub_channels,channel); | |
182 | if (de) { | |
c0ba9ebe | 183 | list *list = dictGetVal(de); |
e2641e09 | 184 | listNode *ln; |
185 | listIter li; | |
186 | ||
187 | listRewind(list,&li); | |
188 | while ((ln = listNext(&li)) != NULL) { | |
189 | redisClient *c = ln->value; | |
190 | ||
355f8591 | 191 | addReply(c,shared.mbulkhdr[3]); |
e2641e09 | 192 | addReply(c,shared.messagebulk); |
193 | addReplyBulk(c,channel); | |
194 | addReplyBulk(c,message); | |
195 | receivers++; | |
196 | } | |
197 | } | |
198 | /* Send to clients listening to matching channels */ | |
199 | if (listLength(server.pubsub_patterns)) { | |
200 | listRewind(server.pubsub_patterns,&li); | |
201 | channel = getDecodedObject(channel); | |
202 | while ((ln = listNext(&li)) != NULL) { | |
203 | pubsubPattern *pat = ln->value; | |
204 | ||
205 | if (stringmatchlen((char*)pat->pattern->ptr, | |
206 | sdslen(pat->pattern->ptr), | |
207 | (char*)channel->ptr, | |
208 | sdslen(channel->ptr),0)) { | |
355f8591 | 209 | addReply(pat->client,shared.mbulkhdr[4]); |
e2641e09 | 210 | addReply(pat->client,shared.pmessagebulk); |
211 | addReplyBulk(pat->client,pat->pattern); | |
212 | addReplyBulk(pat->client,channel); | |
213 | addReplyBulk(pat->client,message); | |
214 | receivers++; | |
215 | } | |
216 | } | |
217 | decrRefCount(channel); | |
218 | } | |
219 | return receivers; | |
220 | } | |
221 | ||
0f49d6b0 | 222 | /*----------------------------------------------------------------------------- |
223 | * Pubsub commands implementation | |
224 | *----------------------------------------------------------------------------*/ | |
225 | ||
e2641e09 | 226 | void subscribeCommand(redisClient *c) { |
227 | int j; | |
228 | ||
229 | for (j = 1; j < c->argc; j++) | |
230 | pubsubSubscribeChannel(c,c->argv[j]); | |
231 | } | |
232 | ||
233 | void unsubscribeCommand(redisClient *c) { | |
234 | if (c->argc == 1) { | |
235 | pubsubUnsubscribeAllChannels(c,1); | |
236 | return; | |
237 | } else { | |
238 | int j; | |
239 | ||
240 | for (j = 1; j < c->argc; j++) | |
241 | pubsubUnsubscribeChannel(c,c->argv[j],1); | |
242 | } | |
243 | } | |
244 | ||
245 | void psubscribeCommand(redisClient *c) { | |
246 | int j; | |
247 | ||
248 | for (j = 1; j < c->argc; j++) | |
249 | pubsubSubscribePattern(c,c->argv[j]); | |
250 | } | |
251 | ||
252 | void punsubscribeCommand(redisClient *c) { | |
253 | if (c->argc == 1) { | |
254 | pubsubUnsubscribeAllPatterns(c,1); | |
255 | return; | |
256 | } else { | |
257 | int j; | |
258 | ||
259 | for (j = 1; j < c->argc; j++) | |
260 | pubsubUnsubscribePattern(c,c->argv[j],1); | |
261 | } | |
262 | } | |
263 | ||
264 | void publishCommand(redisClient *c) { | |
265 | int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); | |
c563ce46 | 266 | if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); |
e2641e09 | 267 | addReplyLongLong(c,receivers); |
268 | } |