X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/d51ebef5090624a3ba0824fa697410deaff1d1fa..4c5f0966b2e582981d9fdaf3b511c6cf4ac4d4d5:/src/networking.c diff --git a/src/networking.c b/src/networking.c index 90d157e1..166b44c9 100644 --- a/src/networking.c +++ b/src/networking.c @@ -41,8 +41,10 @@ redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->blocking_keys = NULL; - c->blocking_keys_num = 0; + c->bpop.keys = NULL; + c->bpop.count = 0; + c->bpop.timeout = 0; + c->bpop.target = NULL; c->io_keys = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); @@ -165,7 +167,6 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) { void addReply(redisClient *c, robj *obj) { if (_installWriteEvent(c) != REDIS_OK) return; - redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY); /* This is an important place where we can avoid copy-on-write * when there is a saving child running, avoiding touching the @@ -455,10 +456,17 @@ void freeClient(redisClient *c) { ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); + /* When client was just unblocked because of a blocking operation, + * remove it from the list with unblocked clients. */ + if (c->flags & REDIS_UNBLOCKED) { + ln = listSearchKey(server.unblocked_clients,c); + redisAssert(ln != NULL); + listDelNode(server.unblocked_clients,ln); + } /* Remove from the list of clients waiting for swapped keys, or ready * to be restarted, but not yet woken up again. */ if (c->flags & REDIS_IO_WAIT) { - redisAssert(server.vm_enabled); + redisAssert(server.ds_enabled); if (listLength(c->io_keys) == 0) { ln = listSearchKey(server.io_ready_clients,c); @@ -472,7 +480,7 @@ void freeClient(redisClient *c) { dontWaitForSwappedKey(c,ln->value); } } - server.vm_blocked_clients--; + server.cache_blocked_clients--; } listRelease(c->io_keys); /* Master/slave cleanup. @@ -489,7 +497,6 @@ void freeClient(redisClient *c) { /* Case 2: we lost the connection with the master. */ if (c->flags & REDIS_MASTER) { server.master = NULL; - /* FIXME */ server.replstate = REDIS_REPL_CONNECT; /* Since we lost the connection with the master, we should also * close the connection with all our slaves if we have any, so @@ -514,15 +521,6 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { REDIS_NOTUSED(el); REDIS_NOTUSED(mask); - /* Use writev() if we have enough buffers to send */ - if (!server.glueoutputbuf && - listLength(c->reply) > REDIS_WRITEV_THRESHOLD && - !(c->flags & REDIS_MASTER)) - { - sendReplyToClientWritev(el, fd, privdata, mask); - return; - } - while(c->bufpos > 0 || listLength(c->reply)) { if (c->bufpos > 0) { if (c->flags & REDIS_MASTER) { @@ -593,84 +591,6 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { } } -void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask) -{ - redisClient *c = privdata; - int nwritten = 0, totwritten = 0, objlen, willwrite; - robj *o; - struct iovec iov[REDIS_WRITEV_IOVEC_COUNT]; - int offset, ion = 0; - REDIS_NOTUSED(el); - REDIS_NOTUSED(mask); - - listNode *node; - while (listLength(c->reply)) { - offset = c->sentlen; - ion = 0; - willwrite = 0; - - /* fill-in the iov[] array */ - for(node = listFirst(c->reply); node; node = listNextNode(node)) { - o = listNodeValue(node); - objlen = sdslen(o->ptr); - - if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT) - break; - - if(ion == REDIS_WRITEV_IOVEC_COUNT) - break; /* no more iovecs */ - - iov[ion].iov_base = ((char*)o->ptr) + offset; - iov[ion].iov_len = objlen - offset; - willwrite += objlen - offset; - offset = 0; /* just for the first item */ - ion++; - } - - if(willwrite == 0) - break; - - /* write all collected blocks at once */ - if((nwritten = writev(fd, iov, ion)) < 0) { - if (errno != EAGAIN) { - redisLog(REDIS_VERBOSE, - "Error writing to client: %s", strerror(errno)); - freeClient(c); - return; - } - break; - } - - totwritten += nwritten; - offset = c->sentlen; - - /* remove written robjs from c->reply */ - while (nwritten && listLength(c->reply)) { - o = listNodeValue(listFirst(c->reply)); - objlen = sdslen(o->ptr); - - if(nwritten >= objlen - offset) { - listDelNode(c->reply, listFirst(c->reply)); - nwritten -= objlen - offset; - c->sentlen = 0; - } else { - /* partial write */ - c->sentlen += nwritten; - break; - } - offset = 0; - } - } - - if (totwritten > 0) - c->lastinteraction = time(NULL); - - if (listLength(c->reply) == 0) { - c->sentlen = 0; - aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); - } -} - /* resetClient prepare the client to process the next command */ void resetClient(redisClient *c) { freeClientArgv(c); @@ -699,7 +619,7 @@ void closeTimedoutClients(void) { redisLog(REDIS_VERBOSE,"Closing idle client"); freeClient(c); } else if (c->flags & REDIS_BLOCKED) { - if (c->blockingto != 0 && c->blockingto < now) { + if (c->bpop.timeout != 0 && c->bpop.timeout < now) { addReply(c,shared.nullmultibulk); unblockClientWaitingData(c); } @@ -802,7 +722,7 @@ int processMultibulkBuffer(redisClient *c) { bulklen = strtol(c->querybuf+pos+1,&eptr,10); tolerr = (eptr[0] != '\r'); if (tolerr || bulklen == LONG_MIN || bulklen == LONG_MAX || - bulklen < 0 || bulklen > 1024*1024*1024) + bulklen < 0 || bulklen > 512*1024*1024) { addReplyError(c,"Protocol error: invalid bulk length"); setProtocolError(c,pos); @@ -906,3 +826,22 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { } processInputBuffer(c); } + +void getClientsMaxBuffers(unsigned long *longest_output_list, + unsigned long *biggest_input_buffer) { + redisClient *c; + listNode *ln; + listIter li; + unsigned long lol = 0, bib = 0; + + listRewind(server.clients,&li); + while ((ln = listNext(&li)) != NULL) { + c = listNodeValue(ln); + + if (listLength(c->reply) > lol) lol = listLength(c->reply); + if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf); + } + *longest_output_list = lol; + *biggest_input_buffer = bib; +} +