]> git.saurik.com Git - redis.git/blob - src/pubsub.c
A reimplementation of blocking operation internals.
[redis.git] / src / pubsub.c
1 #include "redis.h"
2
3 /*-----------------------------------------------------------------------------
4 * Pubsub low level API
5 *----------------------------------------------------------------------------*/
6
7 void freePubsubPattern(void *p) {
8 pubsubPattern *pat = p;
9
10 decrRefCount(pat->pattern);
11 zfree(pat);
12 }
13
14 int listMatchPubsubPattern(void *a, void *b) {
15 pubsubPattern *pa = a, *pb = b;
16
17 return (pa->client == pb->client) &&
18 (equalStringObjects(pa->pattern,pb->pattern));
19 }
20
21 /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
22 * 0 if the client was already subscribed to that channel. */
23 int pubsubSubscribeChannel(redisClient *c, robj *channel) {
24 struct dictEntry *de;
25 list *clients = NULL;
26 int retval = 0;
27
28 /* Add the channel to the client -> channels hash table */
29 if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
30 retval = 1;
31 incrRefCount(channel);
32 /* Add the client to the channel -> list of clients hash table */
33 de = dictFind(server.pubsub_channels,channel);
34 if (de == NULL) {
35 clients = listCreate();
36 dictAdd(server.pubsub_channels,channel,clients);
37 incrRefCount(channel);
38 } else {
39 clients = dictGetVal(de);
40 }
41 listAddNodeTail(clients,c);
42 }
43 /* Notify the client */
44 addReply(c,shared.mbulkhdr[3]);
45 addReply(c,shared.subscribebulk);
46 addReplyBulk(c,channel);
47 addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
48 return retval;
49 }
50
51 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
52 * 0 if the client was not subscribed to the specified channel. */
53 int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
54 struct dictEntry *de;
55 list *clients;
56 listNode *ln;
57 int retval = 0;
58
59 /* Remove the channel from the client -> channels hash table */
60 incrRefCount(channel); /* channel may be just a pointer to the same object
61 we have in the hash tables. Protect it... */
62 if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
63 retval = 1;
64 /* Remove the client from the channel -> clients list hash table */
65 de = dictFind(server.pubsub_channels,channel);
66 redisAssertWithInfo(c,NULL,de != NULL);
67 clients = dictGetVal(de);
68 ln = listSearchKey(clients,c);
69 redisAssertWithInfo(c,NULL,ln != NULL);
70 listDelNode(clients,ln);
71 if (listLength(clients) == 0) {
72 /* Free the list and associated hash entry at all if this was
73 * the latest client, so that it will be possible to abuse
74 * Redis PUBSUB creating millions of channels. */
75 dictDelete(server.pubsub_channels,channel);
76 }
77 }
78 /* Notify the client */
79 if (notify) {
80 addReply(c,shared.mbulkhdr[3]);
81 addReply(c,shared.unsubscribebulk);
82 addReplyBulk(c,channel);
83 addReplyLongLong(c,dictSize(c->pubsub_channels)+
84 listLength(c->pubsub_patterns));
85
86 }
87 decrRefCount(channel); /* it is finally safe to release it */
88 return retval;
89 }
90
91 /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
92 int pubsubSubscribePattern(redisClient *c, robj *pattern) {
93 int retval = 0;
94
95 if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
96 retval = 1;
97 pubsubPattern *pat;
98 listAddNodeTail(c->pubsub_patterns,pattern);
99 incrRefCount(pattern);
100 pat = zmalloc(sizeof(*pat));
101 pat->pattern = getDecodedObject(pattern);
102 pat->client = c;
103 listAddNodeTail(server.pubsub_patterns,pat);
104 }
105 /* Notify the client */
106 addReply(c,shared.mbulkhdr[3]);
107 addReply(c,shared.psubscribebulk);
108 addReplyBulk(c,pattern);
109 addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
110 return retval;
111 }
112
113 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
114 * 0 if the client was not subscribed to the specified channel. */
115 int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
116 listNode *ln;
117 pubsubPattern pat;
118 int retval = 0;
119
120 incrRefCount(pattern); /* Protect the object. May be the same we remove */
121 if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
122 retval = 1;
123 listDelNode(c->pubsub_patterns,ln);
124 pat.client = c;
125 pat.pattern = pattern;
126 ln = listSearchKey(server.pubsub_patterns,&pat);
127 listDelNode(server.pubsub_patterns,ln);
128 }
129 /* Notify the client */
130 if (notify) {
131 addReply(c,shared.mbulkhdr[3]);
132 addReply(c,shared.punsubscribebulk);
133 addReplyBulk(c,pattern);
134 addReplyLongLong(c,dictSize(c->pubsub_channels)+
135 listLength(c->pubsub_patterns));
136 }
137 decrRefCount(pattern);
138 return retval;
139 }
140
141 /* Unsubscribe from all the channels. Return the number of channels the
142 * client was subscribed from. */
143 int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
144 dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
145 dictEntry *de;
146 int count = 0;
147
148 while((de = dictNext(di)) != NULL) {
149 robj *channel = dictGetKey(de);
150
151 count += pubsubUnsubscribeChannel(c,channel,notify);
152 }
153 dictReleaseIterator(di);
154 return count;
155 }
156
157 /* Unsubscribe from all the patterns. Return the number of patterns the
158 * client was subscribed from. */
159 int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
160 listNode *ln;
161 listIter li;
162 int count = 0;
163
164 listRewind(c->pubsub_patterns,&li);
165 while ((ln = listNext(&li)) != NULL) {
166 robj *pattern = ln->value;
167
168 count += pubsubUnsubscribePattern(c,pattern,notify);
169 }
170 return count;
171 }
172
173 /* Publish a message */
174 int pubsubPublishMessage(robj *channel, robj *message) {
175 int receivers = 0;
176 struct dictEntry *de;
177 listNode *ln;
178 listIter li;
179
180 /* Send to clients listening for that channel */
181 de = dictFind(server.pubsub_channels,channel);
182 if (de) {
183 list *list = dictGetVal(de);
184 listNode *ln;
185 listIter li;
186
187 listRewind(list,&li);
188 while ((ln = listNext(&li)) != NULL) {
189 redisClient *c = ln->value;
190
191 addReply(c,shared.mbulkhdr[3]);
192 addReply(c,shared.messagebulk);
193 addReplyBulk(c,channel);
194 addReplyBulk(c,message);
195 receivers++;
196 }
197 }
198 /* Send to clients listening to matching channels */
199 if (listLength(server.pubsub_patterns)) {
200 listRewind(server.pubsub_patterns,&li);
201 channel = getDecodedObject(channel);
202 while ((ln = listNext(&li)) != NULL) {
203 pubsubPattern *pat = ln->value;
204
205 if (stringmatchlen((char*)pat->pattern->ptr,
206 sdslen(pat->pattern->ptr),
207 (char*)channel->ptr,
208 sdslen(channel->ptr),0)) {
209 addReply(pat->client,shared.mbulkhdr[4]);
210 addReply(pat->client,shared.pmessagebulk);
211 addReplyBulk(pat->client,pat->pattern);
212 addReplyBulk(pat->client,channel);
213 addReplyBulk(pat->client,message);
214 receivers++;
215 }
216 }
217 decrRefCount(channel);
218 }
219 return receivers;
220 }
221
222 /*-----------------------------------------------------------------------------
223 * Pubsub commands implementation
224 *----------------------------------------------------------------------------*/
225
226 void subscribeCommand(redisClient *c) {
227 int j;
228
229 for (j = 1; j < c->argc; j++)
230 pubsubSubscribeChannel(c,c->argv[j]);
231 }
232
233 void unsubscribeCommand(redisClient *c) {
234 if (c->argc == 1) {
235 pubsubUnsubscribeAllChannels(c,1);
236 return;
237 } else {
238 int j;
239
240 for (j = 1; j < c->argc; j++)
241 pubsubUnsubscribeChannel(c,c->argv[j],1);
242 }
243 }
244
245 void psubscribeCommand(redisClient *c) {
246 int j;
247
248 for (j = 1; j < c->argc; j++)
249 pubsubSubscribePattern(c,c->argv[j]);
250 }
251
252 void punsubscribeCommand(redisClient *c) {
253 if (c->argc == 1) {
254 pubsubUnsubscribeAllPatterns(c,1);
255 return;
256 } else {
257 int j;
258
259 for (j = 1; j < c->argc; j++)
260 pubsubUnsubscribePattern(c,c->argv[j],1);
261 }
262 }
263
264 void publishCommand(redisClient *c) {
265 int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
266 if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
267 addReplyLongLong(c,receivers);
268 }