]>
git.saurik.com Git - redis.git/blob - src/pubsub.c
3 void freePubsubPattern(void *p
) {
4 pubsubPattern
*pat
= p
;
6 decrRefCount(pat
->pattern
);
10 int listMatchPubsubPattern(void *a
, void *b
) {
11 pubsubPattern
*pa
= a
, *pb
= b
;
13 return (pa
->client
== pb
->client
) &&
14 (equalStringObjects(pa
->pattern
,pb
->pattern
));
17 /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
18 * 0 if the client was already subscribed to that channel. */
19 int pubsubSubscribeChannel(redisClient
*c
, robj
*channel
) {
24 /* Add the channel to the client -> channels hash table */
25 if (dictAdd(c
->pubsub_channels
,channel
,NULL
) == DICT_OK
) {
27 incrRefCount(channel
);
28 /* Add the client to the channel -> list of clients hash table */
29 de
= dictFind(server
.pubsub_channels
,channel
);
31 clients
= listCreate();
32 dictAdd(server
.pubsub_channels
,channel
,clients
);
33 incrRefCount(channel
);
35 clients
= dictGetEntryVal(de
);
37 listAddNodeTail(clients
,c
);
39 /* Notify the client */
40 addReply(c
,shared
.mbulk3
);
41 addReply(c
,shared
.subscribebulk
);
42 addReplyBulk(c
,channel
);
43 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+listLength(c
->pubsub_patterns
));
47 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
48 * 0 if the client was not subscribed to the specified channel. */
49 int pubsubUnsubscribeChannel(redisClient
*c
, robj
*channel
, int notify
) {
55 /* Remove the channel from the client -> channels hash table */
56 incrRefCount(channel
); /* channel may be just a pointer to the same object
57 we have in the hash tables. Protect it... */
58 if (dictDelete(c
->pubsub_channels
,channel
) == DICT_OK
) {
60 /* Remove the client from the channel -> clients list hash table */
61 de
= dictFind(server
.pubsub_channels
,channel
);
62 redisAssert(de
!= NULL
);
63 clients
= dictGetEntryVal(de
);
64 ln
= listSearchKey(clients
,c
);
65 redisAssert(ln
!= NULL
);
66 listDelNode(clients
,ln
);
67 if (listLength(clients
) == 0) {
68 /* Free the list and associated hash entry at all if this was
69 * the latest client, so that it will be possible to abuse
70 * Redis PUBSUB creating millions of channels. */
71 dictDelete(server
.pubsub_channels
,channel
);
74 /* Notify the client */
76 addReply(c
,shared
.mbulk3
);
77 addReply(c
,shared
.unsubscribebulk
);
78 addReplyBulk(c
,channel
);
79 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+
80 listLength(c
->pubsub_patterns
));
83 decrRefCount(channel
); /* it is finally safe to release it */
87 /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */
88 int pubsubSubscribePattern(redisClient
*c
, robj
*pattern
) {
91 if (listSearchKey(c
->pubsub_patterns
,pattern
) == NULL
) {
94 listAddNodeTail(c
->pubsub_patterns
,pattern
);
95 incrRefCount(pattern
);
96 pat
= zmalloc(sizeof(*pat
));
97 pat
->pattern
= getDecodedObject(pattern
);
99 listAddNodeTail(server
.pubsub_patterns
,pat
);
101 /* Notify the client */
102 addReply(c
,shared
.mbulk3
);
103 addReply(c
,shared
.psubscribebulk
);
104 addReplyBulk(c
,pattern
);
105 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+listLength(c
->pubsub_patterns
));
109 /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
110 * 0 if the client was not subscribed to the specified channel. */
111 int pubsubUnsubscribePattern(redisClient
*c
, robj
*pattern
, int notify
) {
116 incrRefCount(pattern
); /* Protect the object. May be the same we remove */
117 if ((ln
= listSearchKey(c
->pubsub_patterns
,pattern
)) != NULL
) {
119 listDelNode(c
->pubsub_patterns
,ln
);
121 pat
.pattern
= pattern
;
122 ln
= listSearchKey(server
.pubsub_patterns
,&pat
);
123 listDelNode(server
.pubsub_patterns
,ln
);
125 /* Notify the client */
127 addReply(c
,shared
.mbulk3
);
128 addReply(c
,shared
.punsubscribebulk
);
129 addReplyBulk(c
,pattern
);
130 addReplyLongLong(c
,dictSize(c
->pubsub_channels
)+
131 listLength(c
->pubsub_patterns
));
133 decrRefCount(pattern
);
137 /* Unsubscribe from all the channels. Return the number of channels the
138 * client was subscribed from. */
139 int pubsubUnsubscribeAllChannels(redisClient
*c
, int notify
) {
140 dictIterator
*di
= dictGetIterator(c
->pubsub_channels
);
144 while((de
= dictNext(di
)) != NULL
) {
145 robj
*channel
= dictGetEntryKey(de
);
147 count
+= pubsubUnsubscribeChannel(c
,channel
,notify
);
149 dictReleaseIterator(di
);
153 /* Unsubscribe from all the patterns. Return the number of patterns the
154 * client was subscribed from. */
155 int pubsubUnsubscribeAllPatterns(redisClient
*c
, int notify
) {
160 listRewind(c
->pubsub_patterns
,&li
);
161 while ((ln
= listNext(&li
)) != NULL
) {
162 robj
*pattern
= ln
->value
;
164 count
+= pubsubUnsubscribePattern(c
,pattern
,notify
);
169 /* Publish a message */
170 int pubsubPublishMessage(robj
*channel
, robj
*message
) {
172 struct dictEntry
*de
;
176 /* Send to clients listening for that channel */
177 de
= dictFind(server
.pubsub_channels
,channel
);
179 list
*list
= dictGetEntryVal(de
);
183 listRewind(list
,&li
);
184 while ((ln
= listNext(&li
)) != NULL
) {
185 redisClient
*c
= ln
->value
;
187 addReply(c
,shared
.mbulk3
);
188 addReply(c
,shared
.messagebulk
);
189 addReplyBulk(c
,channel
);
190 addReplyBulk(c
,message
);
194 /* Send to clients listening to matching channels */
195 if (listLength(server
.pubsub_patterns
)) {
196 listRewind(server
.pubsub_patterns
,&li
);
197 channel
= getDecodedObject(channel
);
198 while ((ln
= listNext(&li
)) != NULL
) {
199 pubsubPattern
*pat
= ln
->value
;
201 if (stringmatchlen((char*)pat
->pattern
->ptr
,
202 sdslen(pat
->pattern
->ptr
),
204 sdslen(channel
->ptr
),0)) {
205 addReply(pat
->client
,shared
.mbulk4
);
206 addReply(pat
->client
,shared
.pmessagebulk
);
207 addReplyBulk(pat
->client
,pat
->pattern
);
208 addReplyBulk(pat
->client
,channel
);
209 addReplyBulk(pat
->client
,message
);
213 decrRefCount(channel
);
218 void subscribeCommand(redisClient
*c
) {
221 for (j
= 1; j
< c
->argc
; j
++)
222 pubsubSubscribeChannel(c
,c
->argv
[j
]);
225 void unsubscribeCommand(redisClient
*c
) {
227 pubsubUnsubscribeAllChannels(c
,1);
232 for (j
= 1; j
< c
->argc
; j
++)
233 pubsubUnsubscribeChannel(c
,c
->argv
[j
],1);
237 void psubscribeCommand(redisClient
*c
) {
240 for (j
= 1; j
< c
->argc
; j
++)
241 pubsubSubscribePattern(c
,c
->argv
[j
]);
244 void punsubscribeCommand(redisClient
*c
) {
246 pubsubUnsubscribeAllPatterns(c
,1);
251 for (j
= 1; j
< c
->argc
; j
++)
252 pubsubUnsubscribePattern(c
,c
->argv
[j
],1);
256 void publishCommand(redisClient
*c
) {
257 int receivers
= pubsubPublishMessage(c
->argv
[1],c
->argv
[2]);
258 addReplyLongLong(c
,receivers
);