X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/e2641e09cc0daf44f63f654230f72d22acf3a9af..af0b220756571bc8faf57a0c7b7389dd86a60376:/src/pubsub.c diff --git a/src/pubsub.c b/src/pubsub.c index c9f5f310..5f91334c 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -1,5 +1,38 @@ +/* + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + #include "redis.h" +/*----------------------------------------------------------------------------- + * Pubsub low level API + *----------------------------------------------------------------------------*/ + void freePubsubPattern(void *p) { pubsubPattern *pat = p; @@ -32,12 +65,12 @@ int pubsubSubscribeChannel(redisClient *c, robj *channel) { dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { - clients = dictGetEntryVal(de); + clients = dictGetVal(de); } listAddNodeTail(clients,c); } /* Notify the client */ - addReply(c,shared.mbulk3); + addReply(c,shared.mbulkhdr[3]); addReply(c,shared.subscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); @@ -59,10 +92,10 @@ int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { retval = 1; /* Remove the client from the channel -> clients list hash table */ de = dictFind(server.pubsub_channels,channel); - redisAssert(de != NULL); - clients = dictGetEntryVal(de); + redisAssertWithInfo(c,NULL,de != NULL); + clients = dictGetVal(de); ln = listSearchKey(clients,c); - redisAssert(ln != NULL); + redisAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); if (listLength(clients) == 0) { /* Free the list and associated hash entry at all if this was @@ -73,7 +106,7 @@ int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) { } /* Notify the client */ if (notify) { - addReply(c,shared.mbulk3); + addReply(c,shared.mbulkhdr[3]); addReply(c,shared.unsubscribebulk); addReplyBulk(c,channel); addReplyLongLong(c,dictSize(c->pubsub_channels)+ @@ -99,7 +132,7 @@ int pubsubSubscribePattern(redisClient *c, robj *pattern) { listAddNodeTail(server.pubsub_patterns,pat); } /* Notify the client */ - addReply(c,shared.mbulk3); + addReply(c,shared.mbulkhdr[3]); addReply(c,shared.psubscribebulk); addReplyBulk(c,pattern); addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns)); @@ -124,7 +157,7 @@ int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { } /* Notify the client */ if (notify) { - addReply(c,shared.mbulk3); + addReply(c,shared.mbulkhdr[3]); addReply(c,shared.punsubscribebulk); addReplyBulk(c,pattern); addReplyLongLong(c,dictSize(c->pubsub_channels)+ @@ -137,12 +170,12 @@ int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) { /* Unsubscribe from all the channels. Return the number of channels the * client was subscribed from. */ int pubsubUnsubscribeAllChannels(redisClient *c, int notify) { - dictIterator *di = dictGetIterator(c->pubsub_channels); + dictIterator *di = dictGetSafeIterator(c->pubsub_channels); dictEntry *de; int count = 0; while((de = dictNext(di)) != NULL) { - robj *channel = dictGetEntryKey(de); + robj *channel = dictGetKey(de); count += pubsubUnsubscribeChannel(c,channel,notify); } @@ -176,7 +209,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { /* Send to clients listening for that channel */ de = dictFind(server.pubsub_channels,channel); if (de) { - list *list = dictGetEntryVal(de); + list *list = dictGetVal(de); listNode *ln; listIter li; @@ -184,7 +217,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { while ((ln = listNext(&li)) != NULL) { redisClient *c = ln->value; - addReply(c,shared.mbulk3); + addReply(c,shared.mbulkhdr[3]); addReply(c,shared.messagebulk); addReplyBulk(c,channel); addReplyBulk(c,message); @@ -202,7 +235,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) { - addReply(pat->client,shared.mbulk4); + addReply(pat->client,shared.mbulkhdr[4]); addReply(pat->client,shared.pmessagebulk); addReplyBulk(pat->client,pat->pattern); addReplyBulk(pat->client,channel); @@ -215,6 +248,10 @@ int pubsubPublishMessage(robj *channel, robj *message) { return receivers; } +/*----------------------------------------------------------------------------- + * Pubsub commands implementation + *----------------------------------------------------------------------------*/ + void subscribeCommand(redisClient *c) { int j; @@ -255,5 +292,6 @@ void punsubscribeCommand(redisClient *c) { void publishCommand(redisClient *c) { int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); + if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]); addReplyLongLong(c,receivers); }