From b2a7fd0cf7afef7e7ede9e46a317fcb9ae84768c Mon Sep 17 00:00:00 2001 From: Damian Janowski & Michel Martens Date: Mon, 8 Nov 2010 15:25:59 -0300 Subject: [PATCH] BRPOPLPUSH. --- src/redis.c | 1 + src/redis.h | 2 + src/t_list.c | 83 +++++++++++++++++++++++++++++----------- tests/unit/type/list.tcl | 49 ++++++++++++++++++++++++ 4 files changed, 112 insertions(+), 23 deletions(-) diff --git a/src/redis.c b/src/redis.c index 1f7e7d26..58a796c0 100644 --- a/src/redis.c +++ b/src/redis.c @@ -89,6 +89,7 @@ struct redisCommand readonlyCommandTable[] = { {"rpop",rpopCommand,2,0,NULL,1,1,1}, {"lpop",lpopCommand,2,0,NULL,1,1,1}, {"brpop",brpopCommand,-3,0,NULL,1,1,1}, + {"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1}, {"blpop",blpopCommand,-3,0,NULL,1,1,1}, {"llen",llenCommand,2,0,NULL,1,1,1}, {"lindex",lindexCommand,3,0,NULL,1,1,1}, diff --git a/src/redis.h b/src/redis.h index 6bd0fd5d..83a94483 100644 --- a/src/redis.h +++ b/src/redis.h @@ -321,6 +321,7 @@ typedef struct redisClient { int blocking_keys_num; /* Number of blocking keys */ time_t blockingto; /* Blocking operation timeout. If UNIX current time * is >= blockingto then the operation timed out. */ + robj *blocking_target; list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ @@ -961,6 +962,7 @@ void execCommand(redisClient *c); void discardCommand(redisClient *c); void blpopCommand(redisClient *c); void brpopCommand(redisClient *c); +void brpoplpushCommand(redisClient *c); void appendCommand(redisClient *c); void substrCommand(redisClient *c); void strlenCommand(redisClient *c); diff --git a/src/t_list.c b/src/t_list.c index 10e7f72c..437f4004 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -777,9 +777,28 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); + if (receiver->blocking_target == NULL) { + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); + } + else { + receiver->argc++; + + robj *dobj = lookupKeyWrite(receiver->db,receiver->blocking_target); + if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; + + addReplyBulk(receiver,ele); + + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(receiver->db,receiver->blocking_target,dobj); + } + + listTypePush(dobj,ele,REDIS_HEAD); + } + unblockClientWaitingData(receiver); return 1; } @@ -814,26 +833,36 @@ void blockingPopGenericCommand(redisClient *c, int where) { robj *argv[2], **orig_argv; int orig_argc; - /* We need to alter the command arguments before to call - * popGenericCommand() as the command takes a single key. */ - orig_argv = c->argv; - orig_argc = c->argc; - argv[1] = c->argv[j]; - c->argv = argv; - c->argc = 2; - - /* Also the return value is different, we need to output - * the multi bulk reply header and the key name. The - * "real" command will add the last element (the value) - * for us. If this souds like an hack to you it's just - * because it is... */ - addReplyMultiBulkLen(c,2); - addReplyBulk(c,argv[1]); - popGenericCommand(c,where); - - /* Fix the client structure with the original stuff */ - c->argv = orig_argv; - c->argc = orig_argc; + if (c->blocking_target == NULL) { + /* We need to alter the command arguments before to call + * popGenericCommand() as the command takes a single key. */ + orig_argv = c->argv; + orig_argc = c->argc; + argv[1] = c->argv[j]; + c->argv = argv; + c->argc = 2; + + /* Also the return value is different, we need to output + * the multi bulk reply header and the key name. The + * "real" command will add the last element (the value) + * for us. If this souds like an hack to you it's just + * because it is... */ + addReplyMultiBulkLen(c,2); + addReplyBulk(c,argv[1]); + + popGenericCommand(c,where); + + /* Fix the client structure with the original stuff */ + c->argv = orig_argv; + c->argc = orig_argc; + } + else { + c->argv[2] = c->blocking_target; + c->blocking_target = NULL; + + rpoplpushCommand(c); + } + return; } } @@ -860,3 +889,11 @@ void blpopCommand(redisClient *c) { void brpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_TAIL); } + +void brpoplpushCommand(redisClient *c) { + c->blocking_target = c->argv[2]; + c->argv[2] = c->argv[3]; + c->argc--; + + blockingPopGenericCommand(c,REDIS_TAIL); +} diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 4c131fc3..85cbe88e 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -127,6 +127,55 @@ start_server { assert_equal 0 [r llen blist1] assert_equal 1 [r llen blist2] } + + test "BRPOPLPUSH - $type" { + r del target + + set rd [redis_deferring_client] + create_$type blist "a b $large c d" + + $rd brpoplpush blist target 1 + assert_equal d [$rd read] + + assert_equal d [r rpop target] + assert_equal "a b $large c" [r lrange blist 0 -1] + } + } + + test "BRPOPLPUSH with zero timeout should block indefinitely" { + set rd [redis_deferring_client] + r del blist target + $rd brpoplpush blist target 0 + after 1000 + r rpush blist foo + assert_equal foo [$rd read] + assert_equal {foo} [r lrange target 0 -1] + } + + test "BRPOPLPUSH with wrong source type" { + set rd [redis_deferring_client] + r del blist target + r set blist nolist + $rd brpoplpush blist target 1 + assert_error "ERR*wrong kind*" {$rd read} + } + + test "BRPOPLPUSH with wrong destination type" { + set rd [redis_deferring_client] + r del blist target + r set target nolist + r lpush blist foo + $rd brpoplpush blist target 1 + assert_error "ERR*wrong kind*" {$rd read} + + set rd [redis_deferring_client] + r del blist target + r set target nolist + $rd brpoplpush blist target 0 + after 1000 + r rpush blist foo + assert_error "ERR*wrong kind*" {$rd read} + assert_equal {foo} [r lrange blist 0 -1] } foreach {pop} {BLPOP BRPOP} { -- 2.45.2