X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/357a841714d50f3d3c293b9bb03839bac28cb05a..e3c51c4b1bb60069bbd6552fe9109885b886aa86:/src/t_list.c diff --git a/src/t_list.c b/src/t_list.c index 17015acf..f5739792 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -689,17 +689,23 @@ void rpoplpushCommand(redisClient *c) { /* Set a client in blocking mode for the specified key, with the specified * timeout */ -void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { +void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) { dictEntry *de; list *l; int j; - c->bstate.keys = zmalloc(sizeof(robj*)*numkeys); - c->bstate.count = numkeys; - c->bstate.timeout = timeout; + c->bpop.keys = zmalloc(sizeof(robj*)*numkeys); + c->bpop.count = numkeys; + c->bpop.timeout = timeout; + c->bpop.target = target; + + if (target != NULL) { + incrRefCount(target); + } + for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->bstate.keys[j] = keys[j]; + c->bpop.keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ @@ -728,22 +734,28 @@ void unblockClientWaitingData(redisClient *c) { list *l; int j; - redisAssert(c->bstate.keys != NULL); + redisAssert(c->bpop.keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->bstate.count; j++) { + for (j = 0; j < c->bpop.count; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blocking_keys,c->bstate.keys[j]); + de = dictFind(c->db->blocking_keys,c->bpop.keys[j]); redisAssert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blocking_keys,c->bstate.keys[j]); - decrRefCount(c->bstate.keys[j]); + dictDelete(c->db->blocking_keys,c->bpop.keys[j]); + decrRefCount(c->bpop.keys[j]); } + + if (c->bpop.target != NULL) { + decrRefCount(c->bpop.target); + } + /* Cleanup the client structure */ - zfree(c->bstate.keys); - c->bstate.keys = NULL; + zfree(c->bpop.keys); + c->bpop.keys = NULL; + c->bpop.target = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -777,46 +789,60 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - if (receiver->bstate.target == NULL) { - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); + if (receiver->bpop.target == NULL) { + /* BRPOP/BLPOP return a multi-bulk with the name + * of the popped list */ + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); } else { - receiver->argc++; + /* BRPOPLPUSH */ + robj *dobj = lookupKeyWrite(receiver->db,receiver->bpop.target); + if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; - robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); - if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; + addReplyBulk(receiver,ele); - addReplyBulk(receiver,ele); - - /* Create the list if the key does not exist */ - if (!dobj) { - dobj = createZiplistObject(); - dbAdd(receiver->db,receiver->bstate.target,dobj); - } + if (!handleClientsWaitingListPush(receiver, receiver->bpop.target, ele)) { + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(receiver->db, receiver->bpop.target, dobj); + } - listTypePush(dobj,ele,REDIS_HEAD); + listTypePush(dobj, ele, REDIS_HEAD); + } } unblockClientWaitingData(receiver); return 1; } +int checkTimeout(redisClient *c, robj *object, time_t *timeout) { + long long lltimeout; + + if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { + addReplyError(c, "timeout is not an integer"); + return REDIS_ERR; + } + + if (lltimeout < 0) { + addReplyError(c, "timeout is negative"); + return REDIS_ERR; + } + + *timeout = lltimeout; + + return REDIS_OK; +} + /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(redisClient *c, int where) { robj *o; - long long lltimeout; time_t timeout; int j; - /* Make sure timeout is an integer value */ - if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout, - "timeout is not an integer") != REDIS_OK) return; - - /* Make sure the timeout is not negative */ - if (lltimeout < 0) { - addReplyError(c,"timeout is negative"); + if (checkTimeout(c, c->argv[c->argc - 1], &timeout) != REDIS_OK) { return; } @@ -833,35 +859,27 @@ void blockingPopGenericCommand(redisClient *c, int where) { robj *argv[2], **orig_argv; int orig_argc; - if (c->bstate.target == NULL) { - /* We need to alter the command arguments before to call - * popGenericCommand() as the command takes a single key. */ - orig_argv = c->argv; - orig_argc = c->argc; - argv[1] = c->argv[j]; - c->argv = argv; - c->argc = 2; - - /* Also the return value is different, we need to output - * the multi bulk reply header and the key name. The - * "real" command will add the last element (the value) - * for us. If this souds like an hack to you it's just - * because it is... */ - addReplyMultiBulkLen(c,2); - addReplyBulk(c,argv[1]); - - popGenericCommand(c,where); - - /* Fix the client structure with the original stuff */ - c->argv = orig_argv; - c->argc = orig_argc; - } - else { - c->argv[2] = c->bstate.target; - c->bstate.target = NULL; - - rpoplpushCommand(c); - } + /* We need to alter the command arguments before to call + * popGenericCommand() as the command takes a single key. */ + orig_argv = c->argv; + orig_argc = c->argc; + argv[1] = c->argv[j]; + c->argv = argv; + c->argc = 2; + + /* Also the return value is different, we need to output + * the multi bulk reply header and the key name. The + * "real" command will add the last element (the value) + * for us. If this souds like an hack to you it's just + * because it is... */ + addReplyMultiBulkLen(c,2); + addReplyBulk(c,argv[1]); + + popGenericCommand(c,where); + + /* Fix the client structure with the original stuff */ + c->argv = orig_argv; + c->argc = orig_argc; return; } @@ -877,9 +895,8 @@ void blockingPopGenericCommand(redisClient *c, int where) { } /* If the list is empty or the key does not exists we must block */ - timeout = lltimeout; if (timeout > 0) timeout += time(NULL); - blockForKeys(c,c->argv+1,c->argc-2,timeout); + blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); } void blpopCommand(redisClient *c) { @@ -891,9 +908,35 @@ void brpopCommand(redisClient *c) { } void brpoplpushCommand(redisClient *c) { - c->bstate.target = c->argv[2]; - c->argv[2] = c->argv[3]; - c->argc--; + time_t timeout; - blockingPopGenericCommand(c,REDIS_TAIL); + if (checkTimeout(c, c->argv[3], &timeout) != REDIS_OK) { + return; + } + + robj *key = lookupKeyWrite(c->db, c->argv[1]); + + if (key == NULL) { + if (c->flags & REDIS_MULTI) { + + /* Blocking against an empty list in a multi state + * returns immediately. */ + addReply(c, shared.nullmultibulk); + } else { + if (timeout > 0) timeout += time(NULL); + + /* The list is empty and the client blocks. */ + blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); + } + } else { + if (key->type != REDIS_LIST) { + addReply(c, shared.wrongtypeerr); + } else { + + /* The list exists and has elements, so + * the regular rpoplpushCommand is executed. */ + redisAssert(listTypeLength(key) > 0); + rpoplpushCommand(c); + } + } }