]> git.saurik.com Git - redis.git/commitdiff
PUBSUB implemented
authorantirez <antirez@gmail.com>
Mon, 29 Mar 2010 09:47:58 +0000 (11:47 +0200)
committerantirez <antirez@gmail.com>
Mon, 29 Mar 2010 09:47:58 +0000 (11:47 +0200)
redis-cli.c
redis.c

index 3f63aeaa665870bd3bc3c4f998f49ef19ee4901c..5760da15a803b548ab25621cace86861daa7500f 100644 (file)
@@ -159,6 +159,9 @@ static struct redisCommand cmdTable[] = {
     {"hgetall",2,REDIS_CMD_INLINE},
     {"hexists",3,REDIS_CMD_BULK},
     {"config",-2,REDIS_CMD_BULK},
+    {"subscribe",-2,REDIS_CMD_INLINE},
+    {"unsubscribe",-1,REDIS_CMD_INLINE},
+    {"publish",3,REDIS_CMD_BULK},
     {NULL,0,0}
 };
 
diff --git a/redis.c b/redis.c
index e84ae6b5fad6642e77769e0a29fcca30ba4e7718..44db9d683681914715900e4b3641e0d9f4eda751 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -327,6 +327,7 @@ typedef struct redisClient {
                              * is >= blockingto then the operation timed out. */
     list *io_keys;          /* Keys this client is waiting to be loaded from the
                              * swap file in order to continue. */
+    dict *pubsub_classes;   /* Classes a client is interested in (SUBSCRIBE) */
 } redisClient;
 
 struct saveparam {
@@ -435,6 +436,9 @@ struct redisServer {
     unsigned long long vm_stats_swapped_objects;
     unsigned long long vm_stats_swapouts;
     unsigned long long vm_stats_swapins;
+    /* Pubsub */
+    dict *pubsub_classes; /* Associate classes to list of subscribed clients */
+    /* Misc */
     FILE *devnull;
 };
 
@@ -501,7 +505,8 @@ struct sharedObjectsStruct {
     *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
     *outofrangeerr, *plus,
     *select0, *select1, *select2, *select3, *select4,
-    *select5, *select6, *select7, *select8, *select9;
+    *select5, *select6, *select7, *select8, *select9,
+    *messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3;
 } shared;
 
 /* Global vars that are actally used as constants. The following double
@@ -601,6 +606,8 @@ static struct redisCommand *lookupCommand(char *name);
 static void call(redisClient *c, struct redisCommand *cmd);
 static void resetClient(redisClient *c);
 static void convertToRealHash(robj *o);
+static int pubsubUnsubscribeAll(redisClient *c, int notify);
+static void usage();
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
@@ -698,6 +705,9 @@ static void hgetallCommand(redisClient *c);
 static void hexistsCommand(redisClient *c);
 static void configCommand(redisClient *c);
 static void hincrbyCommand(redisClient *c);
+static void subscribeCommand(redisClient *c);
+static void unsubscribeCommand(redisClient *c);
+static void publishCommand(redisClient *c);
 
 /*================================= Globals ================================= */
 
@@ -801,11 +811,12 @@ static struct redisCommand cmdTable[] = {
     {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
     {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
     {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
+    {"subscribe",subscribeCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
+    {"unsubscribe",unsubscribeCommand,-1,REDIS_CMD_INLINE,NULL,0,0,0},
+    {"publish",publishCommand,3,REDIS_CMD_BULK,NULL,0,0,0},
     {NULL,NULL,0,0,NULL,0,0,0}
 };
 
-static void usage();
-
 /*============================ Utility functions ============================ */
 
 /* Glob-style pattern matching. */
@@ -1473,6 +1484,10 @@ static void createSharedObjects(void) {
     shared.select7 = createStringObject("select 7\r\n",10);
     shared.select8 = createStringObject("select 8\r\n",10);
     shared.select9 = createStringObject("select 9\r\n",10);
+    shared.messagebulk = createStringObject("$7\r\nmessage\r\n",13);
+    shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
+    shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",17);
+    shared.mbulk3 = createStringObject("*3\r\n",4);
 }
 
 static void appendServerSaveParams(time_t seconds, int changes) {
@@ -1576,6 +1591,7 @@ static void initServer() {
             server.db[j].io_keys = dictCreate(&keylistDictType,NULL);
         server.db[j].id = j;
     }
+    server.pubsub_classes = dictCreate(&keylistDictType,NULL);
     server.cronloops = 0;
     server.bgsavechildpid = -1;
     server.bgrewritechildpid = -1;
@@ -1839,6 +1855,10 @@ static void freeClient(redisClient *c) {
     if (c->flags & REDIS_BLOCKED)
         unblockClientWaitingData(c);
 
+    /* Unsubscribe from all the pubsub classes */
+    pubsubUnsubscribeAll(c,0);
+    dictRelease(c->pubsub_classes);
+    /* Obvious cleanup */
     aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
     aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
     listRelease(c->reply);
@@ -1861,7 +1881,7 @@ static void freeClient(redisClient *c) {
         dontWaitForSwappedKey(c,ln->value);
     }
     listRelease(c->io_keys);
-    /* Other cleanup */
+    /* Master/slave cleanup */
     if (c->flags & REDIS_SLAVE) {
         if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
             close(c->repldbfd);
@@ -1874,6 +1894,7 @@ static void freeClient(redisClient *c) {
         server.master = NULL;
         server.replstate = REDIS_REPL_CONNECT;
     }
+    /* Release memory */
     zfree(c->argv);
     zfree(c->mbargv);
     freeClientMultiState(c);
@@ -2480,6 +2501,7 @@ static redisClient *createClient(int fd) {
     c->blockingkeys = NULL;
     c->blockingkeysnum = 0;
     c->io_keys = listCreate();
+    c->pubsub_classes = dictCreate(&setDictType,NULL);
     listSetFreeMethod(c->io_keys,decrRefCount);
     if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
         readQueryFromClient, c) == AE_ERR) {
@@ -9236,6 +9258,132 @@ badarity:
         (char*) c->argv[1]->ptr));
 }
 
+/* =========================== Pubsub implementation ======================== */
+
+/* Subscribe a client to a class. Returns 1 if the operation succeeded, or
+ * 0 if the client was already subscribed to that class. */
+static int pubsubSubscribe(redisClient *c, robj *class) {
+    struct dictEntry *de;
+    list *clients = NULL;
+    int retval = 0;
+
+    /* Add the class to the client -> classes hash table */
+    if (dictAdd(c->pubsub_classes,class,NULL) == DICT_OK) {
+        retval = 1;
+        incrRefCount(class);
+        /* Add the client to the class -> list of clients hash table */
+        de = dictFind(server.pubsub_classes,class);
+        if (de == NULL) {
+            clients = listCreate();
+            dictAdd(server.pubsub_classes,class,clients);
+            incrRefCount(class);
+        } else {
+            clients = dictGetEntryVal(de);
+        }
+        listAddNodeTail(clients,c);
+    }
+    /* Notify the client */
+    addReply(c,shared.mbulk3);
+    addReply(c,shared.subscribebulk);
+    addReplyBulk(c,class);
+    addReplyLong(c,dictSize(c->pubsub_classes));
+    return retval;
+}
+
+/* Unsubscribe a client from a class. Returns 1 if the operation succeeded, or
+ * 0 if the client was not subscribed to the specified class. */
+static int pubsubUnsubscribe(redisClient *c, robj *class, int notify) {
+    struct dictEntry *de;
+    list *clients;
+    listNode *ln;
+    int retval = 0;
+
+    /* Remove the class from the client -> classes hash table */
+    if (dictDelete(c->pubsub_classes,class) == DICT_OK) {
+        retval = 1;
+        /* Remove the client from the class -> clients list hash table */
+        de = dictFind(server.pubsub_classes,class);
+        assert(de != NULL);
+        clients = dictGetEntryVal(de);
+        ln = listSearchKey(clients,c);
+        assert(ln != NULL);
+        listDelNode(clients,ln);
+    }
+    /* Notify the client */
+    if (notify) {
+        addReply(c,shared.mbulk3);
+        addReply(c,shared.unsubscribebulk);
+        addReplyBulk(c,class);
+        addReplyLong(c,dictSize(c->pubsub_classes));
+    }
+    return retval;
+}
+
+/* Unsubscribe from all the classes. Return the number of classes the
+ * client was subscribed to. */
+static int pubsubUnsubscribeAll(redisClient *c, int notify) {
+    dictIterator *di = dictGetIterator(c->pubsub_classes);
+    dictEntry *de;
+    int count = 0;
+
+    while((de = dictNext(di)) != NULL) {
+        robj *class = dictGetEntryKey(de);
+
+        count += pubsubUnsubscribe(c,class,notify);
+    }
+    dictReleaseIterator(di);
+    return count;
+}
+
+/* Publish a message */
+static int pubsubPublishMessage(robj *class, robj *message) {
+    int receivers = 0;
+    struct dictEntry *de;
+
+    de = dictFind(server.pubsub_classes,class);
+    if (de) {
+        list *list = dictGetEntryVal(de);
+        listNode *ln;
+        listIter li;
+
+        listRewind(list,&li);
+        while ((ln = listNext(&li)) != NULL) {
+            redisClient *c = ln->value;
+
+            addReply(c,shared.mbulk3);
+            addReply(c,shared.messagebulk);
+            addReplyBulk(c,class);
+            addReplyBulk(c,message);
+            receivers++;
+        }
+    }
+    return receivers;
+}
+
+static void subscribeCommand(redisClient *c) {
+    int j;
+
+    for (j = 1; j < c->argc; j++)
+        pubsubSubscribe(c,c->argv[j]);
+}
+
+static void unsubscribeCommand(redisClient *c) {
+    if (c->argc == 1) {
+        pubsubUnsubscribeAll(c,1);
+        return;
+    } else {
+        int j;
+
+        for (j = 1; j < c->argc; j++)
+            pubsubUnsubscribe(c,c->argv[j],1);
+    }
+}
+
+static void publishCommand(redisClient *c) {
+    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
+    addReplyLong(c,receivers);
+}
+
 /* ================================= Debugging ============================== */
 
 static void debugCommand(redisClient *c) {