| 1 | #include "redis.h" |
| 2 | |
| 3 | /*----------------------------------------------------------------------------- |
| 4 | * Pubsub low level API |
| 5 | *----------------------------------------------------------------------------*/ |
| 6 | |
| 7 | void freePubsubPattern(void *p) { |
| 8 | pubsubPattern *pat = p; |
| 9 | |
| 10 | decrRefCount(pat->pattern); |
| 11 | zfree(pat); |
| 12 | } |
| 13 | |
| 14 | int listMatchPubsubPattern(void *a, void *b) { |
| 15 | pubsubPattern *pa = a, *pb = b; |
| 16 | |
| 17 | return (pa->client == pb->client) && |
| 18 | (equalStringObjects(pa->pattern,pb->pattern)); |
| 19 | } |
| 20 | |
| 21 | /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or |
| 22 | * 0 if the client was already subscribed to that channel. */ |
| 23 | int pubsubSubscribeChannel(redisClient *c, robj *channel) { |
| 24 | struct dictEntry *de; |
| 25 | list *clients = NULL; |
| 26 | int retval = 0; |
| 27 | |
| 28 | /* Add the channel to the client -> channels hash table */ |
| 29 | if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { |
| 30 | retval = 1; |
| 31 | incrRefCount(channel); |
| 32 | /* Add the client to the channel -> list of clients hash table */ |
| 33 | de = dictFind(server.pubsub_channels,channel); |
| 34 | if (de == NULL) { |
| 35 | clients = listCreate(); |
| 36 | dictAdd(server.pubsub_channels,channel,clients); |
| 37 | incrRefCount(channel); |
| 38 | } else { |
| 39 | clients = dictGetVal(de); |
| 40 | } |
| 41 | listAddNodeTail(clients,c); |
| 42 | } |
| 43 | /* Notify the client */ |
| 44 | addReply(c,shared.mbulkhdr[3]); |
| 45 | addReply(c,shared.subscribebulk); |
| 46 | addReplyBulk(c,channel); |
| 47 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); |
| 48 | return retval; |
| 49 | } |
| 50 | |
| 51 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or |
| 52 | * 0 if the client was not subscribed to the specified channel. */ |
| 53 | int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { |
| 54 | struct dictEntry *de; |
| 55 | list *clients; |
| 56 | listNode *ln; |
| 57 | int retval = 0; |
| 58 | |
| 59 | /* Remove the channel from the client -> channels hash table */ |
| 60 | incrRefCount(channel); /* channel may be just a pointer to the same object |
| 61 | we have in the hash tables. Protect it... */ |
| 62 | if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { |
| 63 | retval = 1; |
| 64 | /* Remove the client from the channel -> clients list hash table */ |
| 65 | de = dictFind(server.pubsub_channels,channel); |
| 66 | redisAssertWithInfo(c,NULL,de != NULL); |
| 67 | clients = dictGetVal(de); |
| 68 | ln = listSearchKey(clients,c); |
| 69 | redisAssertWithInfo(c,NULL,ln != NULL); |
| 70 | listDelNode(clients,ln); |
| 71 | if (listLength(clients) == 0) { |
| 72 | /* Free the list and associated hash entry at all if this was |
| 73 | * the latest client, so that it will be possible to abuse |
| 74 | * Redis PUBSUB creating millions of channels. */ |
| 75 | dictDelete(server.pubsub_channels,channel); |
| 76 | } |
| 77 | } |
| 78 | /* Notify the client */ |
| 79 | if (notify) { |
| 80 | addReply(c,shared.mbulkhdr[3]); |
| 81 | addReply(c,shared.unsubscribebulk); |
| 82 | addReplyBulk(c,channel); |
| 83 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ |
| 84 | listLength(c->pubsub_patterns)); |
| 85 | |
| 86 | } |
| 87 | decrRefCount(channel); /* it is finally safe to release it */ |
| 88 | return retval; |
| 89 | } |
| 90 | |
| 91 | /* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the clinet was already subscribed to that pattern. */ |
| 92 | int pubsubSubscribePattern(redisClient *c, robj *pattern) { |
| 93 | int retval = 0; |
| 94 | |
| 95 | if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { |
| 96 | retval = 1; |
| 97 | pubsubPattern *pat; |
| 98 | listAddNodeTail(c->pubsub_patterns,pattern); |
| 99 | incrRefCount(pattern); |
| 100 | pat = zmalloc(sizeof(*pat)); |
| 101 | pat->pattern = getDecodedObject(pattern); |
| 102 | pat->client = c; |
| 103 | listAddNodeTail(server.pubsub_patterns,pat); |
| 104 | } |
| 105 | /* Notify the client */ |
| 106 | addReply(c,shared.mbulkhdr[3]); |
| 107 | addReply(c,shared.psubscribebulk); |
| 108 | addReplyBulk(c,pattern); |
| 109 | addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); |
| 110 | return retval; |
| 111 | } |
| 112 | |
| 113 | /* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or |
| 114 | * 0 if the client was not subscribed to the specified channel. */ |
| 115 | int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { |
| 116 | listNode *ln; |
| 117 | pubsubPattern pat; |
| 118 | int retval = 0; |
| 119 | |
| 120 | incrRefCount(pattern); /* Protect the object. May be the same we remove */ |
| 121 | if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { |
| 122 | retval = 1; |
| 123 | listDelNode(c->pubsub_patterns,ln); |
| 124 | pat.client = c; |
| 125 | pat.pattern = pattern; |
| 126 | ln = listSearchKey(server.pubsub_patterns,&pat); |
| 127 | listDelNode(server.pubsub_patterns,ln); |
| 128 | } |
| 129 | /* Notify the client */ |
| 130 | if (notify) { |
| 131 | addReply(c,shared.mbulkhdr[3]); |
| 132 | addReply(c,shared.punsubscribebulk); |
| 133 | addReplyBulk(c,pattern); |
| 134 | addReplyLongLong(c,dictSize(c->pubsub_channels)+ |
| 135 | listLength(c->pubsub_patterns)); |
| 136 | } |
| 137 | decrRefCount(pattern); |
| 138 | return retval; |
| 139 | } |
| 140 | |
| 141 | /* Unsubscribe from all the channels. Return the number of channels the |
| 142 | * client was subscribed from. */ |
| 143 | int pubsubUnsubscribeAllChannels(redisClient *c, int notify) { |
| 144 | dictIterator *di = dictGetSafeIterator(c->pubsub_channels); |
| 145 | dictEntry *de; |
| 146 | int count = 0; |
| 147 | |
| 148 | while((de = dictNext(di)) != NULL) { |
| 149 | robj *channel = dictGetKey(de); |
| 150 | |
| 151 | count += pubsubUnsubscribeChannel(c,channel,notify); |
| 152 | } |
| 153 | dictReleaseIterator(di); |
| 154 | return count; |
| 155 | } |
| 156 | |
| 157 | /* Unsubscribe from all the patterns. Return the number of patterns the |
| 158 | * client was subscribed from. */ |
| 159 | int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) { |
| 160 | listNode *ln; |
| 161 | listIter li; |
| 162 | int count = 0; |
| 163 | |
| 164 | listRewind(c->pubsub_patterns,&li); |
| 165 | while ((ln = listNext(&li)) != NULL) { |
| 166 | robj *pattern = ln->value; |
| 167 | |
| 168 | count += pubsubUnsubscribePattern(c,pattern,notify); |
| 169 | } |
| 170 | return count; |
| 171 | } |
| 172 | |
| 173 | /* Publish a message */ |
| 174 | int pubsubPublishMessage(robj *channel, robj *message) { |
| 175 | int receivers = 0; |
| 176 | struct dictEntry *de; |
| 177 | listNode *ln; |
| 178 | listIter li; |
| 179 | |
| 180 | /* Send to clients listening for that channel */ |
| 181 | de = dictFind(server.pubsub_channels,channel); |
| 182 | if (de) { |
| 183 | list *list = dictGetVal(de); |
| 184 | listNode *ln; |
| 185 | listIter li; |
| 186 | |
| 187 | listRewind(list,&li); |
| 188 | while ((ln = listNext(&li)) != NULL) { |
| 189 | redisClient *c = ln->value; |
| 190 | |
| 191 | addReply(c,shared.mbulkhdr[3]); |
| 192 | addReply(c,shared.messagebulk); |
| 193 | addReplyBulk(c,channel); |
| 194 | addReplyBulk(c,message); |
| 195 | receivers++; |
| 196 | } |
| 197 | } |
| 198 | /* Send to clients listening to matching channels */ |
| 199 | if (listLength(server.pubsub_patterns)) { |
| 200 | listRewind(server.pubsub_patterns,&li); |
| 201 | channel = getDecodedObject(channel); |
| 202 | while ((ln = listNext(&li)) != NULL) { |
| 203 | pubsubPattern *pat = ln->value; |
| 204 | |
| 205 | if (stringmatchlen((char*)pat->pattern->ptr, |
| 206 | sdslen(pat->pattern->ptr), |
| 207 | (char*)channel->ptr, |
| 208 | sdslen(channel->ptr),0)) { |
| 209 | addReply(pat->client,shared.mbulkhdr[4]); |
| 210 | addReply(pat->client,shared.pmessagebulk); |
| 211 | addReplyBulk(pat->client,pat->pattern); |
| 212 | addReplyBulk(pat->client,channel); |
| 213 | addReplyBulk(pat->client,message); |
| 214 | receivers++; |
| 215 | } |
| 216 | } |
| 217 | decrRefCount(channel); |
| 218 | } |
| 219 | return receivers; |
| 220 | } |
| 221 | |
| 222 | /*----------------------------------------------------------------------------- |
| 223 | * Pubsub commands implementation |
| 224 | *----------------------------------------------------------------------------*/ |
| 225 | |
| 226 | void subscribeCommand(redisClient *c) { |
| 227 | int j; |
| 228 | |
| 229 | for (j = 1; j < c->argc; j++) |
| 230 | pubsubSubscribeChannel(c,c->argv[j]); |
| 231 | } |
| 232 | |
| 233 | void unsubscribeCommand(redisClient *c) { |
| 234 | if (c->argc == 1) { |
| 235 | pubsubUnsubscribeAllChannels(c,1); |
| 236 | return; |
| 237 | } else { |
| 238 | int j; |
| 239 | |
| 240 | for (j = 1; j < c->argc; j++) |
| 241 | pubsubUnsubscribeChannel(c,c->argv[j],1); |
| 242 | } |
| 243 | } |
| 244 | |
| 245 | void psubscribeCommand(redisClient *c) { |
| 246 | int j; |
| 247 | |
| 248 | for (j = 1; j < c->argc; j++) |
| 249 | pubsubSubscribePattern(c,c->argv[j]); |
| 250 | } |
| 251 | |
| 252 | void punsubscribeCommand(redisClient *c) { |
| 253 | if (c->argc == 1) { |
| 254 | pubsubUnsubscribeAllPatterns(c,1); |
| 255 | return; |
| 256 | } else { |
| 257 | int j; |
| 258 | |
| 259 | for (j = 1; j < c->argc; j++) |
| 260 | pubsubUnsubscribePattern(c,c->argv[j],1); |
| 261 | } |
| 262 | } |
| 263 | |
| 264 | void publishCommand(redisClient *c) { |
| 265 | int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); |
| 266 | if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); |
| 267 | addReplyLongLong(c,receivers); |
| 268 | } |