]> git.saurik.com Git - redis.git/commitdiff
New vararg BLPOP able to block against multiple keys
authorantirez <antirez@gmail.com>
Sat, 2 Jan 2010 14:06:44 +0000 (09:06 -0500)
committerantirez <antirez@gmail.com>
Sat, 2 Jan 2010 14:06:44 +0000 (09:06 -0500)
TODO
redis-cli.c
redis.c
staticsymbols.h

diff --git a/TODO b/TODO
index f3eaeb21908bc12bb58abf7c63e374a7947b69b3..6e17188736d07f4b2734fc43cbad045a11d00c0c 100644 (file)
--- a/TODO
+++ b/TODO
@@ -3,14 +3,19 @@ Redis TODO and Roadmap
 VERSION 1.4 TODO (Hash type)
 ============================
 
-* Blocking LPOP (BLPOP).
-* Hashes (HSET, HGET, HEXISTS, HLEN, ...).
+* BRPOPLPUSH
+* RPOPLPUSH should notify blocking POP operations
 * List ops like L/RPUSH L/RPOP should return the new list length.
+* Save dataset / fsync() on SIGTERM
+* MULTI/EXEC should support the "EXEC FSYNC" form
+* Synchronous Virtual Memory
+* BLPOP & C. tests (write a non blocking Tcl client as first step)
 
 VERSION 1.6 TODO (Virtual memory)
 =================================
 
-* Redis Virtual Memory for datasets bigger than RAM (http://groups.google.com/group/redis-db/msg/752997c7b38553cd)
+* Asynchronous Virtual Memory
+* Hashes (HSET, HGET, HEXISTS, HLEN, ...).
 
 VERSION 1.8 TODO (Fault tollerant sharding)
 ===========================================
index fbf90fd87dd91ec0a693f7aaae0b30b850662d85..a082a9bb9b22d042a02ba297b5534e1877f1b91d 100644 (file)
@@ -71,8 +71,8 @@ static struct redisCommand cmdTable[] = {
     {"lpush",3,REDIS_CMD_BULK},
     {"rpop",2,REDIS_CMD_INLINE},
     {"lpop",2,REDIS_CMD_INLINE},
-    {"brpop",3,REDIS_CMD_INLINE},
-    {"blpop",3,REDIS_CMD_INLINE},
+    {"brpop",-3,REDIS_CMD_INLINE},
+    {"blpop",-3,REDIS_CMD_INLINE},
     {"llen",2,REDIS_CMD_INLINE},
     {"lindex",3,REDIS_CMD_INLINE},
     {"lset",4,REDIS_CMD_BULK},
diff --git a/redis.c b/redis.c
index 8b771c029c6c33df534b3767a987112c8065dfa2..9f1eb7a2507d0ba4d3d466ce7e67d4880bd038fa 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -268,8 +268,9 @@ typedef struct redisClient {
     long repldboff;         /* replication DB file offset */
     off_t repldbsize;       /* replication DB file size */
     multiState mstate;      /* MULTI/EXEC state */
-    robj *blockingkey;      /* The key we waiting to terminate a blocking
+    robj **blockingkeys;    /* The key we waiting to terminate a blocking
                              * operation such as BLPOP. Otherwise NULL. */
+    int blockingkeysnum;    /* Number of blocking keys */
     time_t blockingto;      /* Blocking operation timeout. If UNIX current time
                              * is >= blockingto then the operation timed out. */
 } redisClient;
@@ -542,8 +543,8 @@ static struct redisCommand cmdTable[] = {
     {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
     {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
-    {"brpop",brpopCommand,3,REDIS_CMD_INLINE},
-    {"blpop",blpopCommand,3,REDIS_CMD_INLINE},
+    {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
+    {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
     {"llen",llenCommand,2,REDIS_CMD_INLINE},
     {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
     {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
@@ -902,7 +903,7 @@ static void closeTimedoutClients(void) {
             freeClient(c);
         } else if (c->flags & REDIS_BLOCKED) {
             if (c->blockingto != 0 && c->blockingto < now) {
-                addReply(c,shared.nullbulk);
+                addReply(c,shared.nullmultibulk);
                 unblockClient(c);
             }
         }
@@ -2089,7 +2090,8 @@ static redisClient *createClient(int fd) {
     c->authenticated = 0;
     c->replstate = REDIS_REPL_NONE;
     c->reply = listCreate();
-    c->blockingkey = NULL;
+    c->blockingkeys = NULL;
+    c->blockingkeysnum = 0;
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
     if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
@@ -5492,27 +5494,35 @@ static void execCommand(redisClient *c) {
 
 /* Set a client in blocking mode for the specified key, with the specified
  * timeout */
-static void blockForKey(redisClient *c, robj *key, time_t timeout) {
+static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
     dictEntry *de;
     list *l;
+    int j;
 
-    c->blockingkey = key;
-    incrRefCount(key);
+    c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
+    c->blockingkeysnum = numkeys;
     c->blockingto = timeout;
-    de = dictFind(c->db->blockingkeys,key);
-    if (de == NULL) {
-        int retval;
+    for (j = 0; j < numkeys; j++) {
+        /* Add the key in the client structure, to map clients -> keys */
+        c->blockingkeys[j] = keys[j];
+        incrRefCount(keys[j]);
 
-        /* We take a list of clients blocked for a given key */
-        l = listCreate();
-        retval = dictAdd(c->db->blockingkeys,key,l);
-        incrRefCount(key);
-        assert(retval == DICT_OK);
-    } else {
-        l = dictGetEntryVal(de);
+        /* And in the other "side", to map keys -> clients */
+        de = dictFind(c->db->blockingkeys,keys[j]);
+        if (de == NULL) {
+            int retval;
+
+            /* For every key we take a list of clients blocked for it */
+            l = listCreate();
+            retval = dictAdd(c->db->blockingkeys,keys[j],l);
+            incrRefCount(keys[j]);
+            assert(retval == DICT_OK);
+        } else {
+            l = dictGetEntryVal(de);
+        }
+        listAddNodeTail(l,c);
     }
-    /* Add this client to the list, and mark it as blocked */
-    listAddNodeTail(l,c);
+    /* Mark the client as a blocked client */
     c->flags |= REDIS_BLOCKED;
     aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
     server.blockedclients++;
@@ -5522,19 +5532,24 @@ static void blockForKey(redisClient *c, robj *key, time_t timeout) {
 static void unblockClient(redisClient *c) {
     dictEntry *de;
     list *l;
+    int j;
 
-    /* Remove this client from the list of clients waiting for this key. */
-    assert(c->blockingkey != NULL);
-    de = dictFind(c->db->blockingkeys,c->blockingkey);
-    assert(de != NULL);
-    l = dictGetEntryVal(de);
-    listDelNode(l,listSearchKey(l,c));
-    /* If the list is empty we need to remove it to avoid wasting memory */
-    if (listLength(l) == 0)
-        dictDelete(c->db->blockingkeys,c->blockingkey);
-    /* Finally set the right flags in the client structure */
-    decrRefCount(c->blockingkey);
-    c->blockingkey = NULL;
+    assert(c->blockingkeys != NULL);
+    /* The client may wait for multiple keys, so unblock it for every key. */
+    for (j = 0; j < c->blockingkeysnum; j++) {
+        /* Remove this client from the list of clients waiting for this key. */
+        de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
+        assert(de != NULL);
+        l = dictGetEntryVal(de);
+        listDelNode(l,listSearchKey(l,c));
+        /* If the list is empty we need to remove it to avoid wasting memory */
+        if (listLength(l) == 0)
+            dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
+        decrRefCount(c->blockingkeys[j]);
+    }
+    /* Cleanup the client structure */
+    zfree(c->blockingkeys);
+    c->blockingkeys = NULL;
     c->flags &= (~REDIS_BLOCKED);
     server.blockedclients--;
     /* Ok now we are ready to get read events from socket, note that we
@@ -5574,6 +5589,10 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
     assert(ln != NULL);
     receiver = ln->value;
 
+    addReplySds(receiver,sdsnew("*2\r\n"));
+    addReplyBulkLen(receiver,key);
+    addReply(receiver,key);
+    addReply(receiver,shared.crlf);
     addReplyBulkLen(receiver,ele);
     addReply(receiver,ele);
     addReply(receiver,shared.crlf);
@@ -5585,26 +5604,53 @@ static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
 static void blockingPopGenericCommand(redisClient *c, int where) {
     robj *o;
     time_t timeout;
+    int j;
 
-    o = lookupKeyWrite(c->db,c->argv[1]);
-    if (o != NULL) {
-        if (o->type != REDIS_LIST) {
-            popGenericCommand(c,where);
-            return;
-        } else {
-            list *list = o->ptr;
-            if (listLength(list) != 0) {
-                /* If the list contains elements fall back to the usual
-                 * non-blocking POP operation */
-                popGenericCommand(c,where);
+    for (j = 1; j < c->argc-1; j++) {
+        o = lookupKeyWrite(c->db,c->argv[j]);
+        if (o != NULL) {
+            if (o->type != REDIS_LIST) {
+                addReply(c,shared.wrongtypeerr);
                 return;
+            } else {
+                list *list = o->ptr;
+                if (listLength(list) != 0) {
+                    /* If the list contains elements fall back to the usual
+                     * non-blocking POP operation */
+                    robj *argv[2], **orig_argv;
+                    int orig_argc;
+                   
+                    /* We need to alter the command arguments before to call
+                     * popGenericCommand() as the command takes a single key. */
+                    orig_argv = c->argv;
+                    orig_argc = c->argc;
+                    argv[1] = c->argv[j];
+                    c->argv = argv;
+                    c->argc = 2;
+
+                    /* Also the return value is different, we need to output
+                     * the multi bulk reply header and the key name. The
+                     * "real" command will add the last element (the value)
+                     * for us. If this souds like an hack to you it's just
+                     * because it is... */
+                    addReplySds(c,sdsnew("*2\r\n"));
+                    addReplyBulkLen(c,argv[1]);
+                    addReply(c,argv[1]);
+                    addReply(c,shared.crlf);
+                    popGenericCommand(c,where);
+
+                    /* Fix the client structure with the original stuff */
+                    c->argv = orig_argv;
+                    c->argc = orig_argc;
+                    return;
+                }
             }
         }
     }
     /* If the list is empty or the key does not exists we must block */
-    timeout = strtol(c->argv[2]->ptr,NULL,10);
+    timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
     if (timeout > 0) timeout += time(NULL);
-    blockForKey(c,c->argv[1],timeout);
+    blockForKeys(c,c->argv+1,c->argc-2,timeout);
 }
 
 static void blpopCommand(redisClient *c) {
@@ -6659,7 +6705,7 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) {
             redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
         }
     }
-    // free(messages); Don't call free() with possibly corrupted memory.
+    /* free(messages); Don't call free() with possibly corrupted memory. */
     exit(0);
 }
 
index d22663dad911d970d8fa49014f3fcb1429bc024a..0d48d7013a1cfe4e823539dfb08f12d329aa1c31 100644 (file)
@@ -10,7 +10,7 @@ static struct redisFunctionSym symsTable[] = {
 {"authCommand",(unsigned long)authCommand},
 {"bgrewriteaofCommand",(unsigned long)bgrewriteaofCommand},
 {"bgsaveCommand",(unsigned long)bgsaveCommand},
-{"blockForKey",(unsigned long)blockForKey},
+{"blockForKeys",(unsigned long)blockForKeys},
 {"blockingPopGenericCommand",(unsigned long)blockingPopGenericCommand},
 {"blpopCommand",(unsigned long)blpopCommand},
 {"brpopCommand",(unsigned long)brpopCommand},