]> git.saurik.com Git - redis.git/blob - src/pubsub.c
gitignore modified
[redis.git] / src / pubsub.c
1 #include "redis.h"
2
3 void freePubsubPattern(void *p) {
4 pubsubPattern *pat = p;
5
6 decrRefCount(pat->pattern);
7 zfree(pat);
8 }
9
10 int listMatchPubsubPattern(void *a, void *b) {
11 pubsubPattern *pa = a, *pb = b;
12
13 return (pa->client == pb->client) &&
14 (equalStringObjects(pa->pattern,pb->pattern));
15 }
16
17 /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
18 * 0 if the client was already subscribed to that channel. */
19 int pubsubSubscribeChannel(redisClient *c, robj *channel) {
20 struct dictEntry *de;
21 list *clients = NULL;
22 int retval = 0;
23
24 /* Add the channel to the client -> channels hash table */
25 if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
26 retval = 1;
27 incrRefCount(channel);
28 /* Add the client to the channel -> list of clients hash table */
29 de = dictFind(server.pubsub_channels,channel);
30 if (de == NULL) {
31 clients = listCreate();
32 dictAdd(server.pubsub_channels,channel,clients);
33 incrRefCount(channel);
34 } else {
35 clients = dictGetEntryVal(de);
36 }
37 listAddNodeTail(clients,c);
38 }
39 /* Notify the client */
40 addReply(c,shared.mbulk3);
41 addReply(c,shared.subscribebulk);
42 addReplyBulk(c,channel);
43 addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
44 return retval;
45 }
46
47 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
48 * 0 if the client was not subscribed to the specified channel. */
49 int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
50 struct dictEntry *de;
51 list *clients;
52 listNode *ln;
53 int retval = 0;
54
55 /* Remove the channel from the client -> channels hash table */
56 incrRefCount(channel); /* channel may be just a pointer to the same object
57 we have in the hash tables. Protect it... */
58 if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
59 retval = 1;
60 /* Remove the client from the channel -> clients list hash table */
61 de = dictFind(server.pubsub_channels,channel);
62 redisAssert(de != NULL);
63 clients = dictGetEntryVal(de);
64 ln = listSearchKey(clients,c);
65 redisAssert(ln != NULL);
66 listDelNode(clients,ln);
67 if (listLength(clients) == 0) {
68 /* Free the list and associated hash entry at all if this was
69 * the latest client, so that it will be possible to abuse
70 * Redis PUBSUB creating millions of channels. */
71 dictDelete(server.pubsub_channels,channel);
72 }
73 }
74 /* Notify the client */
75 if (notify) {
76 addReply(c,shared.mbulk3);
77 addReply(c,shared.unsubscribebulk);
78 addReplyBulk(c,channel);
79 addReplyLongLong(c,dictSize(c->pubsub_channels)+
80 listLength(c->pubsub_patterns));
81
82 }
83 decrRefCount(channel); /* it is finally safe to release it */
84 return retval;
85 }
86
87 /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
88 int pubsubSubscribePattern(redisClient *c, robj *pattern) {
89 int retval = 0;
90
91 if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
92 retval = 1;
93 pubsubPattern *pat;
94 listAddNodeTail(c->pubsub_patterns,pattern);
95 incrRefCount(pattern);
96 pat = zmalloc(sizeof(*pat));
97 pat->pattern = getDecodedObject(pattern);
98 pat->client = c;
99 listAddNodeTail(server.pubsub_patterns,pat);
100 }
101 /* Notify the client */
102 addReply(c,shared.mbulk3);
103 addReply(c,shared.psubscribebulk);
104 addReplyBulk(c,pattern);
105 addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
106 return retval;
107 }
108
109 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
110 * 0 if the client was not subscribed to the specified channel. */
111 int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
112 listNode *ln;
113 pubsubPattern pat;
114 int retval = 0;
115
116 incrRefCount(pattern); /* Protect the object. May be the same we remove */
117 if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
118 retval = 1;
119 listDelNode(c->pubsub_patterns,ln);
120 pat.client = c;
121 pat.pattern = pattern;
122 ln = listSearchKey(server.pubsub_patterns,&pat);
123 listDelNode(server.pubsub_patterns,ln);
124 }
125 /* Notify the client */
126 if (notify) {
127 addReply(c,shared.mbulk3);
128 addReply(c,shared.punsubscribebulk);
129 addReplyBulk(c,pattern);
130 addReplyLongLong(c,dictSize(c->pubsub_channels)+
131 listLength(c->pubsub_patterns));
132 }
133 decrRefCount(pattern);
134 return retval;
135 }
136
137 /* Unsubscribe from all the channels. Return the number of channels the
138 * client was subscribed from. */
139 int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
140 dictIterator *di = dictGetIterator(c->pubsub_channels);
141 dictEntry *de;
142 int count = 0;
143
144 while((de = dictNext(di)) != NULL) {
145 robj *channel = dictGetEntryKey(de);
146
147 count += pubsubUnsubscribeChannel(c,channel,notify);
148 }
149 dictReleaseIterator(di);
150 return count;
151 }
152
153 /* Unsubscribe from all the patterns. Return the number of patterns the
154 * client was subscribed from. */
155 int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
156 listNode *ln;
157 listIter li;
158 int count = 0;
159
160 listRewind(c->pubsub_patterns,&li);
161 while ((ln = listNext(&li)) != NULL) {
162 robj *pattern = ln->value;
163
164 count += pubsubUnsubscribePattern(c,pattern,notify);
165 }
166 return count;
167 }
168
169 /* Publish a message */
170 int pubsubPublishMessage(robj *channel, robj *message) {
171 int receivers = 0;
172 struct dictEntry *de;
173 listNode *ln;
174 listIter li;
175
176 /* Send to clients listening for that channel */
177 de = dictFind(server.pubsub_channels,channel);
178 if (de) {
179 list *list = dictGetEntryVal(de);
180 listNode *ln;
181 listIter li;
182
183 listRewind(list,&li);
184 while ((ln = listNext(&li)) != NULL) {
185 redisClient *c = ln->value;
186
187 addReply(c,shared.mbulk3);
188 addReply(c,shared.messagebulk);
189 addReplyBulk(c,channel);
190 addReplyBulk(c,message);
191 receivers++;
192 }
193 }
194 /* Send to clients listening to matching channels */
195 if (listLength(server.pubsub_patterns)) {
196 listRewind(server.pubsub_patterns,&li);
197 channel = getDecodedObject(channel);
198 while ((ln = listNext(&li)) != NULL) {
199 pubsubPattern *pat = ln->value;
200
201 if (stringmatchlen((char*)pat->pattern->ptr,
202 sdslen(pat->pattern->ptr),
203 (char*)channel->ptr,
204 sdslen(channel->ptr),0)) {
205 addReply(pat->client,shared.mbulk4);
206 addReply(pat->client,shared.pmessagebulk);
207 addReplyBulk(pat->client,pat->pattern);
208 addReplyBulk(pat->client,channel);
209 addReplyBulk(pat->client,message);
210 receivers++;
211 }
212 }
213 decrRefCount(channel);
214 }
215 return receivers;
216 }
217
218 void subscribeCommand(redisClient *c) {
219 int j;
220
221 for (j = 1; j < c->argc; j++)
222 pubsubSubscribeChannel(c,c->argv[j]);
223 }
224
225 void unsubscribeCommand(redisClient *c) {
226 if (c->argc == 1) {
227 pubsubUnsubscribeAllChannels(c,1);
228 return;
229 } else {
230 int j;
231
232 for (j = 1; j < c->argc; j++)
233 pubsubUnsubscribeChannel(c,c->argv[j],1);
234 }
235 }
236
237 void psubscribeCommand(redisClient *c) {
238 int j;
239
240 for (j = 1; j < c->argc; j++)
241 pubsubSubscribePattern(c,c->argv[j]);
242 }
243
244 void punsubscribeCommand(redisClient *c) {
245 if (c->argc == 1) {
246 pubsubUnsubscribeAllPatterns(c,1);
247 return;
248 } else {
249 int j;
250
251 for (j = 1; j < c->argc; j++)
252 pubsubUnsubscribePattern(c,c->argv[j],1);
253 }
254 }
255
256 void publishCommand(redisClient *c) {
257 int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
258 addReplyLongLong(c,receivers);
259 }