X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/8a979f039011a4672b1052ee84ca56f214e6a681..e3c51c4b1bb60069bbd6552fe9109885b886aa86:/src/t_list.c?ds=inline diff --git a/src/t_list.c b/src/t_list.c index 10e7f72c..f5739792 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -689,17 +689,23 @@ void rpoplpushCommand(redisClient *c) { /* Set a client in blocking mode for the specified key, with the specified * timeout */ -void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { +void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) { dictEntry *de; list *l; int j; - c->blocking_keys = zmalloc(sizeof(robj*)*numkeys); - c->blocking_keys_num = numkeys; - c->blockingto = timeout; + c->bpop.keys = zmalloc(sizeof(robj*)*numkeys); + c->bpop.count = numkeys; + c->bpop.timeout = timeout; + c->bpop.target = target; + + if (target != NULL) { + incrRefCount(target); + } + for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->blocking_keys[j] = keys[j]; + c->bpop.keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ @@ -728,22 +734,28 @@ void unblockClientWaitingData(redisClient *c) { list *l; int j; - redisAssert(c->blocking_keys != NULL); + redisAssert(c->bpop.keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->blocking_keys_num; j++) { + for (j = 0; j < c->bpop.count; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blocking_keys,c->blocking_keys[j]); + de = dictFind(c->db->blocking_keys,c->bpop.keys[j]); redisAssert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blocking_keys,c->blocking_keys[j]); - decrRefCount(c->blocking_keys[j]); + dictDelete(c->db->blocking_keys,c->bpop.keys[j]); + decrRefCount(c->bpop.keys[j]); } + + if (c->bpop.target != NULL) { + decrRefCount(c->bpop.target); + } + /* Cleanup the client structure */ - zfree(c->blocking_keys); - c->blocking_keys = NULL; + zfree(c->bpop.keys); + c->bpop.keys = NULL; + c->bpop.target = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -777,27 +789,60 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); + if (receiver->bpop.target == NULL) { + /* BRPOP/BLPOP return a multi-bulk with the name + * of the popped list */ + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); + } + else { + /* BRPOPLPUSH */ + robj *dobj = lookupKeyWrite(receiver->db,receiver->bpop.target); + if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; + + addReplyBulk(receiver,ele); + + if (!handleClientsWaitingListPush(receiver, receiver->bpop.target, ele)) { + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(receiver->db, receiver->bpop.target, dobj); + } + + listTypePush(dobj, ele, REDIS_HEAD); + } + } + unblockClientWaitingData(receiver); return 1; } +int checkTimeout(redisClient *c, robj *object, time_t *timeout) { + long long lltimeout; + + if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { + addReplyError(c, "timeout is not an integer"); + return REDIS_ERR; + } + + if (lltimeout < 0) { + addReplyError(c, "timeout is negative"); + return REDIS_ERR; + } + + *timeout = lltimeout; + + return REDIS_OK; +} + /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(redisClient *c, int where) { robj *o; - long long lltimeout; time_t timeout; int j; - /* Make sure timeout is an integer value */ - if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout, - "timeout is not an integer") != REDIS_OK) return; - - /* Make sure the timeout is not negative */ - if (lltimeout < 0) { - addReplyError(c,"timeout is negative"); + if (checkTimeout(c, c->argv[c->argc - 1], &timeout) != REDIS_OK) { return; } @@ -829,11 +874,13 @@ void blockingPopGenericCommand(redisClient *c, int where) { * because it is... */ addReplyMultiBulkLen(c,2); addReplyBulk(c,argv[1]); + popGenericCommand(c,where); /* Fix the client structure with the original stuff */ c->argv = orig_argv; c->argc = orig_argc; + return; } } @@ -848,9 +895,8 @@ void blockingPopGenericCommand(redisClient *c, int where) { } /* If the list is empty or the key does not exists we must block */ - timeout = lltimeout; if (timeout > 0) timeout += time(NULL); - blockForKeys(c,c->argv+1,c->argc-2,timeout); + blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); } void blpopCommand(redisClient *c) { @@ -860,3 +906,37 @@ void blpopCommand(redisClient *c) { void brpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_TAIL); } + +void brpoplpushCommand(redisClient *c) { + time_t timeout; + + if (checkTimeout(c, c->argv[3], &timeout) != REDIS_OK) { + return; + } + + robj *key = lookupKeyWrite(c->db, c->argv[1]); + + if (key == NULL) { + if (c->flags & REDIS_MULTI) { + + /* Blocking against an empty list in a multi state + * returns immediately. */ + addReply(c, shared.nullmultibulk); + } else { + if (timeout > 0) timeout += time(NULL); + + /* The list is empty and the client blocks. */ + blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); + } + } else { + if (key->type != REDIS_LIST) { + addReply(c, shared.wrongtypeerr); + } else { + + /* The list exists and has elements, so + * the regular rpoplpushCommand is executed. */ + redisAssert(listTypeLength(key) > 0); + rpoplpushCommand(c); + } + } +}