* is >= blockingto then the operation timed out. */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
+ dict *pubsub_classes; /* Classes a client is interested in (SUBSCRIBE) */
} redisClient;
struct saveparam {
unsigned long long vm_stats_swapped_objects;
unsigned long long vm_stats_swapouts;
unsigned long long vm_stats_swapins;
+ /* Pubsub */
+ dict *pubsub_classes; /* Associate classes to list of subscribed clients */
+ /* Misc */
FILE *devnull;
};
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
*outofrangeerr, *plus,
*select0, *select1, *select2, *select3, *select4,
- *select5, *select6, *select7, *select8, *select9;
+ *select5, *select6, *select7, *select8, *select9,
+ *messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3;
} shared;
/* Global vars that are actally used as constants. The following double
static void call(redisClient *c, struct redisCommand *cmd);
static void resetClient(redisClient *c);
static void convertToRealHash(robj *o);
+static int pubsubUnsubscribeAll(redisClient *c, int notify);
+static void usage();
static void authCommand(redisClient *c);
static void pingCommand(redisClient *c);
static void hexistsCommand(redisClient *c);
static void configCommand(redisClient *c);
static void hincrbyCommand(redisClient *c);
+static void subscribeCommand(redisClient *c);
+static void unsubscribeCommand(redisClient *c);
+static void publishCommand(redisClient *c);
/*================================= Globals ================================= */
{"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
{"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
{"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
+ {"subscribe",subscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
+ {"unsubscribe",unsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
+ {"publish",publishCommand,3,REDIS_CMD_BULK,NULL,0,0,0},
{NULL,NULL,0,0,NULL,0,0,0}
};
-static void usage();
-
/*============================ Utility functions ============================ */
/* Glob-style pattern matching. */
shared.select7 = createStringObject("select 7\r\n",10);
shared.select8 = createStringObject("select 8\r\n",10);
shared.select9 = createStringObject("select 9\r\n",10);
+ shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
+ shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
+ shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",17);
+ shared.mbulk3 = createStringObject("*3\r\n",4);
}
static void appendServerSaveParams(time_t seconds, int changes) {
server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
}
+ server.pubsub_classes = dictCreate(&keylistDictType,NULL);
server.cronloops = 0;
server.bgsavechildpid = -1;
server.bgrewritechildpid = -1;
if (c->flags & REDIS_BLOCKED)
unblockClientWaitingData(c);
+ /* Unsubscribe from all the pubsub classes */
+ pubsubUnsubscribeAll(c,0);
+ dictRelease(c->pubsub_classes);
+ /* Obvious cleanup */
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
listRelease(c->reply);
dontWaitForSwappedKey(c,ln->value);
}
listRelease(c->io_keys);
- /* Other cleanup */
+ /* Master/slave cleanup */
if (c->flags & REDIS_SLAVE) {
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
close(c->repldbfd);
server.master = NULL;
server.replstate = REDIS_REPL_CONNECT;
}
+ /* Release memory */
zfree(c->argv);
zfree(c->mbargv);
freeClientMultiState(c);
c->blockingkeys = NULL;
c->blockingkeysnum = 0;
c->io_keys = listCreate();
+ c->pubsub_classes = dictCreate(&setDictType,NULL);
listSetFreeMethod(c->io_keys,decrRefCount);
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c) == AE_ERR) {
(char*) c->argv[1]->ptr));
}
+/* =========================== Pubsub implementation ======================== */
+
+/* Subscribe a client to a class. Returns 1 if the operation succeeded, or
+ * 0 if the client was already subscribed to that class. */
+static int pubsubSubscribe(redisClient *c, robj *class) {
+ struct dictEntry *de;
+ list *clients = NULL;
+ int retval = 0;
+
+ /* Add the class to the client -> classes hash table */
+ if (dictAdd(c->pubsub_classes,class,NULL) == DICT_OK) {
+ retval = 1;
+ incrRefCount(class);
+ /* Add the client to the class -> list of clients hash table */
+ de = dictFind(server.pubsub_classes,class);
+ if (de == NULL) {
+ clients = listCreate();
+ dictAdd(server.pubsub_classes,class,clients);
+ incrRefCount(class);
+ } else {
+ clients = dictGetEntryVal(de);
+ }
+ listAddNodeTail(clients,c);
+ }
+ /* Notify the client */
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.subscribebulk);
+ addReplyBulk(c,class);
+ addReplyLong(c,dictSize(c->pubsub_classes));
+ return retval;
+}
+
+/* Unsubscribe a client from a class. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified class. */
+static int pubsubUnsubscribe(redisClient *c, robj *class, int notify) {
+ struct dictEntry *de;
+ list *clients;
+ listNode *ln;
+ int retval = 0;
+
+ /* Remove the class from the client -> classes hash table */
+ if (dictDelete(c->pubsub_classes,class) == DICT_OK) {
+ retval = 1;
+ /* Remove the client from the class -> clients list hash table */
+ de = dictFind(server.pubsub_classes,class);
+ assert(de != NULL);
+ clients = dictGetEntryVal(de);
+ ln = listSearchKey(clients,c);
+ assert(ln != NULL);
+ listDelNode(clients,ln);
+ }
+ /* Notify the client */
+ if (notify) {
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.unsubscribebulk);
+ addReplyBulk(c,class);
+ addReplyLong(c,dictSize(c->pubsub_classes));
+ }
+ return retval;
+}
+
+/* Unsubscribe from all the classes. Return the number of classes the
+ * client was subscribed to. */
+static int pubsubUnsubscribeAll(redisClient *c, int notify) {
+ dictIterator *di = dictGetIterator(c->pubsub_classes);
+ dictEntry *de;
+ int count = 0;
+
+ while((de = dictNext(di)) != NULL) {
+ robj *class = dictGetEntryKey(de);
+
+ count += pubsubUnsubscribe(c,class,notify);
+ }
+ dictReleaseIterator(di);
+ return count;
+}
+
+/* Publish a message */
+static int pubsubPublishMessage(robj *class, robj *message) {
+ int receivers = 0;
+ struct dictEntry *de;
+
+ de = dictFind(server.pubsub_classes,class);
+ if (de) {
+ list *list = dictGetEntryVal(de);
+ listNode *ln;
+ listIter li;
+
+ listRewind(list,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ redisClient *c = ln->value;
+
+ addReply(c,shared.mbulk3);
+ addReply(c,shared.messagebulk);
+ addReplyBulk(c,class);
+ addReplyBulk(c,message);
+ receivers++;
+ }
+ }
+ return receivers;
+}
+
+static void subscribeCommand(redisClient *c) {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubSubscribe(c,c->argv[j]);
+}
+
+static void unsubscribeCommand(redisClient *c) {
+ if (c->argc == 1) {
+ pubsubUnsubscribeAll(c,1);
+ return;
+ } else {
+ int j;
+
+ for (j = 1; j < c->argc; j++)
+ pubsubUnsubscribe(c,c->argv[j],1);
+ }
+}
+
+static void publishCommand(redisClient *c) {
+ int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
+ addReplyLong(c,receivers);
+}
+
/* ================================= Debugging ============================== */
static void debugCommand(redisClient *c) {