+/*
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
#include "redis.h"
+/*-----------------------------------------------------------------------------
+ * Pubsub low level API
+ *----------------------------------------------------------------------------*/
+
void freePubsubPattern(void *p) {
pubsubPattern *pat = p;
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
- clients = dictGetEntryVal(de);
+ clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
- addReply(c,shared.mbulk3);
+ addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
retval = 1;
/* Remove the client from the channel -> clients list hash table */
de = dictFind(server.pubsub_channels,channel);
- redisAssert(de != NULL);
- clients = dictGetEntryVal(de);
+ redisAssertWithInfo(c,NULL,de != NULL);
+ clients = dictGetVal(de);
ln = listSearchKey(clients,c);
- redisAssert(ln != NULL);
+ redisAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
}
/* Notify the client */
if (notify) {
- addReply(c,shared.mbulk3);
+ addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.unsubscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listAddNodeTail(server.pubsub_patterns,pat);
}
/* Notify the client */
- addReply(c,shared.mbulk3);
+ addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
}
/* Notify the client */
if (notify) {
- addReply(c,shared.mbulk3);
+ addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.punsubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,dictSize(c->pubsub_channels)+
/* Unsubscribe from all the channels. Return the number of channels the
* client was subscribed from. */
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
- dictIterator *di = dictGetIterator(c->pubsub_channels);
+ dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
dictEntry *de;
int count = 0;
while((de = dictNext(di)) != NULL) {
- robj *channel = dictGetEntryKey(de);
+ robj *channel = dictGetKey(de);
count += pubsubUnsubscribeChannel(c,channel,notify);
}
/* Send to clients listening for that channel */
de = dictFind(server.pubsub_channels,channel);
if (de) {
- list *list = dictGetEntryVal(de);
+ list *list = dictGetVal(de);
listNode *ln;
listIter li;
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
- addReply(c,shared.mbulk3);
+ addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
sdslen(pat->pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) {
- addReply(pat->client,shared.mbulk4);
+ addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
return receivers;
}
+/*-----------------------------------------------------------------------------
+ * Pubsub commands implementation
+ *----------------------------------------------------------------------------*/
+
void subscribeCommand(redisClient *c) {
int j;
void publishCommand(redisClient *c) {
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
+ if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
addReplyLongLong(c,receivers);
}