X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/f7f12a606c39fcb09a203faaaa12c49882409d8f..ba3b474111a79bfa378a8c77d89fff600cd5b23a:/src/t_list.c diff --git a/src/t_list.c b/src/t_list.c index add1bee1..2dedf481 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -260,6 +260,7 @@ void listTypeConvert(robj *subject, int enc) { void pushGenericCommand(redisClient *c, int where) { robj *lobj = lookupKeyWrite(c->db,c->argv[1]); + c->argv[2] = tryObjectEncoding(c->argv[2]); if (lobj == NULL) { if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) { addReply(c,shared.cone); @@ -342,18 +343,21 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) { server.dirty++; } - addReplyUlong(c,listTypeLength(subject)); + addReplyLongLong(c,listTypeLength(subject)); } void lpushxCommand(redisClient *c) { + c->argv[2] = tryObjectEncoding(c->argv[2]); pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD); } void rpushxCommand(redisClient *c) { + c->argv[2] = tryObjectEncoding(c->argv[2]); pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL); } void linsertCommand(redisClient *c) { + c->argv[4] = tryObjectEncoding(c->argv[4]); if (strcasecmp(c->argv[2]->ptr,"after") == 0) { pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL); } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) { @@ -366,7 +370,7 @@ void linsertCommand(redisClient *c) { void llenCommand(redisClient *c) { robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero); if (o == NULL || checkType(c,o,REDIS_LIST)) return; - addReplyUlong(c,listTypeLength(o)); + addReplyLongLong(c,listTypeLength(o)); } void lindexCommand(redisClient *c) { @@ -409,7 +413,7 @@ void lsetCommand(redisClient *c) { robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr); if (o == NULL || checkType(c,o,REDIS_LIST)) return; int index = atoi(c->argv[2]->ptr); - robj *value = c->argv[3]; + robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3])); listTypeTryConversion(o,value); if (o->encoding == REDIS_ENCODING_ZIPLIST) { @@ -494,7 +498,7 @@ void lrangeCommand(redisClient *c) { rangelen = (end-start)+1; /* Return the result in form of a multi-bulk reply */ - addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen)); + addReplyMultiBulkLen(c,rangelen); listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL); for (j = 0; j < rangelen; j++) { redisAssert(listTypeNext(li,&entry)); @@ -559,7 +563,8 @@ void ltrimCommand(redisClient *c) { } void lremCommand(redisClient *c) { - robj *subject, *obj = c->argv[3]; + robj *subject, *obj; + obj = c->argv[3] = tryObjectEncoding(c->argv[3]); int toremove = atoi(c->argv[2]->ptr); int removed = 0; listTypeEntry entry; @@ -594,7 +599,7 @@ void lremCommand(redisClient *c) { decrRefCount(obj); if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]); - addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed)); + addReplyLongLong(c,removed); if (removed) touchWatchedKey(c->db,c->argv[1]); } @@ -613,7 +618,7 @@ void lremCommand(redisClient *c) { * since the element is not just returned but pushed against another list * as well. This command was originally proposed by Ezra Zygmuntowicz. */ -void rpoplpushcommand(redisClient *c) { +void rpoplpushCommand(redisClient *c) { robj *sobj, *value; if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,sobj,REDIS_LIST)) return; @@ -684,17 +689,23 @@ void rpoplpushcommand(redisClient *c) { /* Set a client in blocking mode for the specified key, with the specified * timeout */ -void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { +void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) { dictEntry *de; list *l; int j; - c->blocking_keys = zmalloc(sizeof(robj*)*numkeys); - c->blocking_keys_num = numkeys; - c->blockingto = timeout; + c->bstate.keys = zmalloc(sizeof(robj*)*numkeys); + c->bstate.count = numkeys; + c->bstate.timeout = timeout; + c->bstate.target = target; + + if (target != NULL) { + incrRefCount(target); + } + for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->blocking_keys[j] = keys[j]; + c->bstate.keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ @@ -723,22 +734,28 @@ void unblockClientWaitingData(redisClient *c) { list *l; int j; - redisAssert(c->blocking_keys != NULL); + redisAssert(c->bstate.keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->blocking_keys_num; j++) { + for (j = 0; j < c->bstate.count; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blocking_keys,c->blocking_keys[j]); + de = dictFind(c->db->blocking_keys,c->bstate.keys[j]); redisAssert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blocking_keys,c->blocking_keys[j]); - decrRefCount(c->blocking_keys[j]); + dictDelete(c->db->blocking_keys,c->bstate.keys[j]); + decrRefCount(c->bstate.keys[j]); + } + + if (c->bstate.target != NULL) { + decrRefCount(c->bstate.target); } + /* Cleanup the client structure */ - zfree(c->blocking_keys); - c->blocking_keys = NULL; + zfree(c->bstate.keys); + c->bstate.keys = NULL; + c->bstate.target = NULL; c->flags &= (~REDIS_BLOCKED); server.blpop_blocked_clients--; /* We want to process data if there is some command waiting @@ -772,9 +789,26 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { redisAssert(ln != NULL); receiver = ln->value; - addReplySds(receiver,sdsnew("*2\r\n")); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); + if (receiver->bstate.target == NULL) { + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); + } + else { + robj *dobj = lookupKeyWrite(receiver->db,receiver->bstate.target); + if (dobj && checkType(receiver,dobj,REDIS_LIST)) return 0; + + addReplyBulk(receiver,ele); + + /* Create the list if the key does not exist */ + if (!dobj) { + dobj = createZiplistObject(); + dbAdd(receiver->db,receiver->bstate.target,dobj); + } + + listTypePush(dobj,ele,REDIS_HEAD); + } + unblockClientWaitingData(receiver); return 1; } @@ -782,17 +816,10 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(redisClient *c, int where) { robj *o; - long long lltimeout; time_t timeout; int j; - /* Make sure timeout is an integer value */ - if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout, - "timeout is not an integer") != REDIS_OK) return; - - /* Make sure the timeout is not negative */ - if (lltimeout < 0) { - addReplySds(c,sdsnew("-ERR timeout is negative\r\n")); + if (checkTimeout(c, c->argv[c->argc - 1], &timeout) != REDIS_OK) { return; } @@ -822,13 +849,15 @@ void blockingPopGenericCommand(redisClient *c, int where) { * "real" command will add the last element (the value) * for us. If this souds like an hack to you it's just * because it is... */ - addReplySds(c,sdsnew("*2\r\n")); + addReplyMultiBulkLen(c,2); addReplyBulk(c,argv[1]); + popGenericCommand(c,where); /* Fix the client structure with the original stuff */ c->argv = orig_argv; c->argc = orig_argc; + return; } } @@ -843,9 +872,26 @@ void blockingPopGenericCommand(redisClient *c, int where) { } /* If the list is empty or the key does not exists we must block */ - timeout = lltimeout; if (timeout > 0) timeout += time(NULL); - blockForKeys(c,c->argv+1,c->argc-2,timeout); + blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); +} + +int checkTimeout(redisClient *c, robj *object, time_t *timeout) { + long long lltimeout; + + if (getLongLongFromObject(object, &lltimeout) != REDIS_OK) { + addReplyError(c, "timeout is not an integer"); + return REDIS_ERR; + } + + if (lltimeout < 0) { + addReplyError(c, "timeout is negative"); + return REDIS_ERR; + } + + *timeout = lltimeout; + + return REDIS_OK; } void blpopCommand(redisClient *c) { @@ -855,3 +901,30 @@ void blpopCommand(redisClient *c) { void brpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_TAIL); } + +void brpoplpushCommand(redisClient *c) { + time_t timeout; + + if (checkTimeout(c, c->argv[3], &timeout) != REDIS_OK) { + return; + } + + robj *key = lookupKeyWrite(c->db, c->argv[1]); + + + if (key == NULL) { + // block + if (c->flags & REDIS_MULTI) { + addReply(c,shared.nullmultibulk); + } else { + if (timeout > 0) timeout += time(NULL); + blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); + } + } else if (key->type != REDIS_LIST) { + addReply(c, shared.wrongtypeerr); + } else { + // The list exists and has elements. + redisAssert(listTypeLength(key) > 0); + rpoplpushCommand(c); + } +}