From: antirez Date: Tue, 14 Dec 2010 15:26:37 +0000 (+0100) Subject: Merge remote branch 'pietern/brpoplpush' X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/f858c11d7d9ca79010dd46a9c8c625e63a9a3ec0?hp=-c Merge remote branch 'pietern/brpoplpush' --- f858c11d7d9ca79010dd46a9c8c625e63a9a3ec0 diff --combined src/networking.c index 90d157e1,7d14ac53..1dab8927 --- a/src/networking.c +++ b/src/networking.c @@@ -41,8 -41,10 +41,10 @@@ redisClient *createClient(int fd) c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->blocking_keys = NULL; - c->blocking_keys_num = 0; + c->bpop.keys = NULL; + c->bpop.count = 0; + c->bpop.timeout = 0; + c->bpop.target = NULL; c->io_keys = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); @@@ -178,9 -180,6 +180,9 @@@ void addReply(redisClient *c, robj *obj if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) _addReplyObjectToList(c,obj); } else { + /* FIXME: convert the long into string and use _addReplyToBuffer() + * instead of calling getDecodedObject. As this place in the + * code is too performance critical. */ obj = getDecodedObject(obj); if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) _addReplyObjectToList(c,obj); @@@ -278,7 -277,6 +280,7 @@@ void setDeferredMultiBulkLength(redisCl } } +/* Add a duble as a bulk reply */ void addReplyDouble(redisClient *c, double d) { char dbuf[128], sbuf[128]; int dlen, slen; @@@ -287,8 -285,6 +289,8 @@@ addReplyString(c,sbuf,slen); } +/* Add a long long as integer reply or bulk len / multi bulk count. + * Basically this is used to output . */ void _addReplyLongLong(redisClient *c, long long ll, char prefix) { char buf[128]; int len; @@@ -307,7 -303,6 +309,7 @@@ void addReplyMultiBulkLen(redisClient * _addReplyLongLong(c,length,'*'); } +/* Create the length prefix of a bulk reply, example: $2234 */ void addReplyBulkLen(redisClient *c, robj *obj) { size_t len; @@@ -329,38 -324,23 +331,38 @@@ _addReplyLongLong(c,len,'$'); } +/* Add a Redis Object as a bulk reply */ void addReplyBulk(redisClient *c, robj *obj) { addReplyBulkLen(c,obj); addReply(c,obj); addReply(c,shared.crlf); } -/* In the CONFIG command we need to add vanilla C string as bulk replies */ +/* Add a C buffer as bulk reply */ +void addReplyBulkCBuffer(redisClient *c, void *p, size_t len) { + _addReplyLongLong(c,len,'$'); + addReplyString(c,p,len); + addReply(c,shared.crlf); +} + +/* Add a C nul term string as bulk reply */ void addReplyBulkCString(redisClient *c, char *s) { if (s == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = createStringObject(s,strlen(s)); - addReplyBulk(c,o); - decrRefCount(o); + addReplyBulkCBuffer(c,s,strlen(s)); } } +/* Add a long long as a bulk reply */ +void addReplyBulkLongLong(redisClient *c, long long ll) { + char buf[64]; + int len; + + len = ll2string(buf,64,ll); + addReplyBulkCBuffer(c,buf,len); +} + static void acceptCommonHandler(int fd) { redisClient *c; if ((c = createClient(fd)) == NULL) { @@@ -699,7 -679,7 +701,7 @@@ void closeTimedoutClients(void) redisLog(REDIS_VERBOSE,"Closing idle client"); freeClient(c); } else if (c->flags & REDIS_BLOCKED) { - if (c->blockingto != 0 && c->blockingto < now) { + if (c->bpop.timeout != 0 && c->bpop.timeout < now) { addReply(c,shared.nullmultibulk); unblockClientWaitingData(c); } diff --combined src/redis.h index 67b76a47,3639f062..860e77a1 --- a/src/redis.h +++ b/src/redis.h @@@ -293,6 -293,16 +293,16 @@@ typedef struct multiState int count; /* Total number of MULTI commands */ } multiState; + typedef struct blockingState { + robj **keys; /* The key we are waiting to terminate a blocking + * operation such as BLPOP. Otherwise NULL. */ + int count; /* Number of blocking keys */ + time_t timeout; /* Blocking operation timeout. If UNIX current time + * is >= timeout then the operation timed out. */ + robj *target; /* The key that should receive the element, + * for BRPOPLPUSH. */ + } blockingState; + /* With multiplexing we need to take per-clinet state. * Clients are taken in a liked list. */ typedef struct redisClient { @@@ -316,11 -326,7 +326,7 @@@ long repldboff; /* replication DB file offset */ off_t repldbsize; /* replication DB file size */ multiState mstate; /* MULTI/EXEC state */ - robj **blocking_keys; /* The key we are waiting to terminate a blocking - * operation such as BLPOP. Otherwise NULL. */ - int blocking_keys_num; /* Number of blocking keys */ - time_t blockingto; /* Blocking operation timeout. If UNIX current time - * is >= blockingto then the operation timed out. */ + blockingState bpop; /* blocking state */ list *io_keys; /* Keys this client is waiting to be loaded from the * swap file in order to continue. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ @@@ -427,8 -433,9 +433,9 @@@ struct redisServer int maxmemory_policy; int maxmemory_samples; /* Blocked clients */ - unsigned int blpop_blocked_clients; + unsigned int bpop_blocked_clients; unsigned int vm_blocked_clients; + list *unblocked_clients; /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@@ -639,8 -646,6 +646,8 @@@ void acceptUnixHandler(aeEventLoop *el void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); void addReplyBulk(redisClient *c, robj *obj); void addReplyBulkCString(redisClient *c, char *s); +void addReplyBulkCBuffer(redisClient *c, void *p, size_t len); +void addReplyBulkLongLong(redisClient *c, long long ll); void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); void addReplySds(redisClient *c, sds s); @@@ -813,9 -818,8 +820,9 @@@ int setTypeRemove(robj *subject, robj * int setTypeIsMember(robj *subject, robj *value); setTypeIterator *setTypeInitIterator(robj *subject); void setTypeReleaseIterator(setTypeIterator *si); -robj *setTypeNext(setTypeIterator *si); -robj *setTypeRandomElement(robj *subject); +int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele); +robj *setTypeNextObject(setTypeIterator *si); +int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele); unsigned long setTypeSize(robj *subject); void setTypeConvert(robj *subject, int enc); @@@ -823,8 -827,7 +830,8 @@@ void convertToRealHash(robj *o); void hashTypeTryConversion(robj *subject, robj **argv, int start, int end); void hashTypeTryObjectEncoding(robj *subject, robj **o1, robj **o2); -robj *hashTypeGet(robj *o, robj *key); +int hashTypeGet(robj *o, robj *key, robj **objval, unsigned char **v, unsigned int *vlen); +robj *hashTypeGetObject(robj *o, robj *key); int hashTypeExists(robj *o, robj *key); int hashTypeSet(robj *o, robj *key, robj *value); int hashTypeDelete(robj *o, robj *key); @@@ -832,8 -835,7 +839,8 @@@ unsigned long hashTypeLength(robj *o) hashTypeIterator *hashTypeInitIterator(robj *subject); void hashTypeReleaseIterator(hashTypeIterator *hi); int hashTypeNext(hashTypeIterator *hi); -robj *hashTypeCurrent(hashTypeIterator *hi, int what); +int hashTypeCurrent(hashTypeIterator *hi, int what, robj **objval, unsigned char **v, unsigned int *vlen); +robj *hashTypeCurrentObject(hashTypeIterator *hi, int what); robj *hashTypeLookupWriteOrCreate(redisClient *c, robj *key); /* Pub / Sub */ @@@ -937,7 -939,7 +944,7 @@@ void flushdbCommand(redisClient *c) void flushallCommand(redisClient *c); void sortCommand(redisClient *c); void lremCommand(redisClient *c); - void rpoplpushcommand(redisClient *c); + void rpoplpushCommand(redisClient *c); void infoCommand(redisClient *c); void mgetCommand(redisClient *c); void monitorCommand(redisClient *c); @@@ -966,6 -968,7 +973,7 @@@ void execCommand(redisClient *c) void discardCommand(redisClient *c); void blpopCommand(redisClient *c); void brpopCommand(redisClient *c); + void brpoplpushCommand(redisClient *c); void appendCommand(redisClient *c); void substrCommand(redisClient *c); void strlenCommand(redisClient *c); diff --combined src/t_list.c index d1ec3db9,7dc3f139..8ee3b987 --- a/src/t_list.c +++ b/src/t_list.c @@@ -472,11 -472,12 +472,11 @@@ void rpopCommand(redisClient *c) } void lrangeCommand(redisClient *c) { - robj *o, *value; + robj *o; int start = atoi(c->argv[2]->ptr); int end = atoi(c->argv[3]->ptr); int llen; - int rangelen, j; - listTypeEntry entry; + int rangelen; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,REDIS_LIST)) return; @@@ -498,31 -499,14 +498,31 @@@ /* Return the result in form of a multi-bulk reply */ addReplyMultiBulkLen(c,rangelen); - listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL); - for (j = 0; j < rangelen; j++) { - redisAssert(listTypeNext(li,&entry)); - value = listTypeGet(&entry); - addReplyBulk(c,value); - decrRefCount(value); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + unsigned char *p = ziplistIndex(o->ptr,start); + unsigned char *vstr; + unsigned int vlen; + long long vlong; + + while(rangelen--) { + ziplistGet(p,&vstr,&vlen,&vlong); + if (vstr) { + addReplyBulkCBuffer(c,vstr,vlen); + } else { + addReplyBulkLongLong(c,vlong); + } + p = ziplistNext(o->ptr,p); + } + } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { + listNode *ln = listIndex(o->ptr,start); + + while(rangelen--) { + addReplyBulk(c,ln->value); + ln = ln->next; + } + } else { + redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!"); } - listTypeReleaseIterator(li); } void ltrimCommand(redisClient *c) { @@@ -621,20 -605,38 +621,38 @@@ void lremCommand(redisClient *c) /* This is the semantic of this command: * RPOPLPUSH srclist dstlist: - * IF LLEN(srclist) > 0 - * element = RPOP srclist - * LPUSH dstlist element - * RETURN element - * ELSE - * RETURN nil - * END + * IF LLEN(srclist) > 0 + * element = RPOP srclist + * LPUSH dstlist element + * RETURN element + * ELSE + * RETURN nil + * END * END * * The idea is to be able to get an element from a list in a reliable way * since the element is not just returned but pushed against another list * as well. This command was originally proposed by Ezra Zygmuntowicz. */ - void rpoplpushcommand(redisClient *c) { + + void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) { + if (!handleClientsWaitingListPush(c,dstkey,value)) { + /* Create the list if the key does not exist */ + if (!dstobj) { + dstobj = createZiplistObject(); + dbAdd(c->db,dstkey,dstobj); + } else { + touchWatchedKey(c->db,dstkey); + server.dirty++; + } + listTypePush(dstobj,value,REDIS_HEAD); + } + + /* Always send the pushed value to the client. */ + addReplyBulk(c,value); + } + + void rpoplpushCommand(redisClient *c) { robj *sobj, *value; if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,sobj,REDIS_LIST)) return; @@@ -645,20 -647,7 +663,7 @@@ robj *dobj = lookupKeyWrite(c->db,c->argv[2]); if (dobj && checkType(c,dobj,REDIS_LIST)) return; value = listTypePop(sobj,REDIS_TAIL); - - /* Add the element to the target list (unless it's directly - * passed to some BLPOP-ing client */ - if (!handleClientsWaitingListPush(c,c->argv[2],value)) { - /* Create the list if the key does not exist */ - if (!dobj) { - dobj = createZiplistObject(); - dbAdd(c->db,c->argv[2],dobj); - } - listTypePush(dobj,value,REDIS_HEAD); - } - - /* Send the element to the client as reply as well */ - addReplyBulk(c,value); + rpoplpushHandlePush(c,c->argv[2],dobj,value); /* listTypePop returns an object with its refcount incremented */ decrRefCount(value); @@@ -705,17 -694,23 +710,23 @@@ /* Set a client in blocking mode for the specified key, with the specified * timeout */ - void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) { + void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) { dictEntry *de; list *l; int j; - c->blocking_keys = zmalloc(sizeof(robj*)*numkeys); - c->blocking_keys_num = numkeys; - c->blockingto = timeout; + c->bpop.keys = zmalloc(sizeof(robj*)*numkeys); + c->bpop.count = numkeys; + c->bpop.timeout = timeout; + c->bpop.target = target; + + if (target != NULL) { + incrRefCount(target); + } + for (j = 0; j < numkeys; j++) { /* Add the key in the client structure, to map clients -> keys */ - c->blocking_keys[j] = keys[j]; + c->bpop.keys[j] = keys[j]; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ @@@ -735,7 -730,7 +746,7 @@@ } /* Mark the client as a blocked client */ c->flags |= REDIS_BLOCKED; - server.blpop_blocked_clients++; + server.bpop_blocked_clients++; } /* Unblock a client that's waiting in a blocking operation such as BLPOP */ @@@ -744,30 -739,27 +755,27 @@@ void unblockClientWaitingData(redisClie list *l; int j; - redisAssert(c->blocking_keys != NULL); + redisAssert(c->bpop.keys != NULL); /* The client may wait for multiple keys, so unblock it for every key. */ - for (j = 0; j < c->blocking_keys_num; j++) { + for (j = 0; j < c->bpop.count; j++) { /* Remove this client from the list of clients waiting for this key. */ - de = dictFind(c->db->blocking_keys,c->blocking_keys[j]); + de = dictFind(c->db->blocking_keys,c->bpop.keys[j]); redisAssert(de != NULL); l = dictGetEntryVal(de); listDelNode(l,listSearchKey(l,c)); /* If the list is empty we need to remove it to avoid wasting memory */ if (listLength(l) == 0) - dictDelete(c->db->blocking_keys,c->blocking_keys[j]); - decrRefCount(c->blocking_keys[j]); + dictDelete(c->db->blocking_keys,c->bpop.keys[j]); + decrRefCount(c->bpop.keys[j]); } + /* Cleanup the client structure */ - zfree(c->blocking_keys); - c->blocking_keys = NULL; + zfree(c->bpop.keys); + c->bpop.keys = NULL; + c->bpop.target = NULL; c->flags &= (~REDIS_BLOCKED); - server.blpop_blocked_clients--; - /* We want to process data if there is some command waiting - * in the input buffer. Note that this is safe even if - * unblockClientWaitingData() gets called from freeClient() because - * freeClient() will be smart enough to call this function - * *after* c->querybuf was set to NULL. */ - if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c); + server.bpop_blocked_clients--; + listAddNodeTail(server.unblocked_clients,c); } /* This should be called from any function PUSHing into lists. @@@ -783,39 -775,81 +791,81 @@@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) { struct dictEntry *de; redisClient *receiver; - list *l; + int numclients; + list *clients; listNode *ln; + robj *dstkey, *dstobj; de = dictFind(c->db->blocking_keys,key); if (de == NULL) return 0; - l = dictGetEntryVal(de); - ln = listFirst(l); - redisAssert(ln != NULL); - receiver = ln->value; + clients = dictGetEntryVal(de); + numclients = listLength(clients); + + /* Try to handle the push as long as there are clients waiting for a push. + * Note that "numclients" is used because the list of clients waiting for a + * push on "key" is deleted by unblockClient() when empty. + * + * This loop will have more than 1 iteration when there is a BRPOPLPUSH + * that cannot push the target list because it does not contain a list. If + * this happens, it simply tries the next client waiting for a push. */ + while (numclients--) { + ln = listFirst(clients); + redisAssert(ln != NULL); + receiver = ln->value; + dstkey = receiver->bpop.target; + + /* This should remove the first element of the "clients" list. */ + unblockClientWaitingData(receiver); + redisAssert(ln != listFirst(clients)); + + if (dstkey == NULL) { + /* BRPOP/BLPOP */ + addReplyMultiBulkLen(receiver,2); + addReplyBulk(receiver,key); + addReplyBulk(receiver,ele); + return 1; + } else { + /* BRPOPLPUSH */ + dstobj = lookupKeyWrite(receiver->db,dstkey); + if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) { + decrRefCount(dstkey); + } else { + rpoplpushHandlePush(receiver,dstkey,dstobj,ele); + decrRefCount(dstkey); + return 1; + } + } + } - addReplyMultiBulkLen(receiver,2); - addReplyBulk(receiver,key); - addReplyBulk(receiver,ele); - unblockClientWaitingData(receiver); - return 1; + return 0; + } + + int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) { + long tval; + + if (getLongFromObjectOrReply(c,object,&tval, + "timeout is not an integer or out of range") != REDIS_OK) + return REDIS_ERR; + + if (tval < 0) { + addReplyError(c,"timeout is negative"); + return REDIS_ERR; + } + + if (tval > 0) tval += time(NULL); + *timeout = tval; + + return REDIS_OK; } /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(redisClient *c, int where) { robj *o; - long long lltimeout; time_t timeout; int j; - /* Make sure timeout is an integer value */ - if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout, - "timeout is not an integer") != REDIS_OK) return; - - /* Make sure the timeout is not negative */ - if (lltimeout < 0) { - addReplyError(c,"timeout is negative"); + if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK) return; - } for (j = 1; j < c->argc-1; j++) { o = lookupKeyWrite(c->db,c->argv[j]); @@@ -845,11 -879,13 +895,13 @@@ * because it is... */ addReplyMultiBulkLen(c,2); addReplyBulk(c,argv[1]); + popGenericCommand(c,where); /* Fix the client structure with the original stuff */ c->argv = orig_argv; c->argc = orig_argc; + return; } } @@@ -864,9 -900,7 +916,7 @@@ } /* If the list is empty or the key does not exists we must block */ - timeout = lltimeout; - if (timeout > 0) timeout += time(NULL); - blockForKeys(c,c->argv+1,c->argc-2,timeout); + blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); } void blpopCommand(redisClient *c) { @@@ -876,3 -910,34 +926,34 @@@ void brpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_TAIL); } + + void brpoplpushCommand(redisClient *c) { + time_t timeout; + + if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK) + return; + + robj *key = lookupKeyWrite(c->db, c->argv[1]); + + if (key == NULL) { + if (c->flags & REDIS_MULTI) { + + /* Blocking against an empty list in a multi state + * returns immediately. */ + addReply(c, shared.nullmultibulk); + } else { + /* The list is empty and the client blocks. */ + blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); + } + } else { + if (key->type != REDIS_LIST) { + addReply(c, shared.wrongtypeerr); + } else { + + /* The list exists and has elements, so + * the regular rpoplpushCommand is executed. */ + redisAssert(listTypeLength(key) > 0); + rpoplpushCommand(c); + } + } + }