X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/96ffb2fe97c3e77879e7a4f6f7457397a18bf233..e53ca04b50b86ef158a75c54ae9ee8b17e31719c:/src/networking.c diff --git a/src/networking.c b/src/networking.c index 31844a09..166b44c9 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1,5 +1,4 @@ #include "redis.h" - #include void *dupClientReplyValue(void *o) { @@ -12,20 +11,28 @@ int listMatchObjects(void *a, void *b) { } redisClient *createClient(int fd) { - redisClient *c = zmalloc(sizeof(*c)); + redisClient *c = zmalloc(sizeof(redisClient)); + c->bufpos = 0; anetNonBlock(NULL,fd); anetTcpNoDelay(NULL,fd); if (!c) return NULL; + if (aeCreateFileEvent(server.el,fd,AE_READABLE, + readQueryFromClient, c) == AE_ERR) + { + close(fd); + zfree(c); + return NULL; + } + selectDb(c,0); c->fd = fd; c->querybuf = sdsempty(); + c->reqtype = 0; c->argc = 0; c->argv = NULL; + c->multibulklen = 0; c->bulklen = -1; - c->multibulk = 0; - c->mbargc = 0; - c->mbargv = NULL; c->sentlen = 0; c->flags = 0; c->lastinteraction = time(NULL); @@ -34,8 +41,10 @@ redisClient *createClient(int fd) { c->reply = listCreate(); listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); - c->blocking_keys = NULL; - c->blocking_keys_num = 0; + c->bpop.keys = NULL; + c->bpop.count = 0; + c->bpop.timeout = 0; + c->bpop.target = NULL; c->io_keys = listCreate(); c->watched_keys = listCreate(); listSetFreeMethod(c->io_keys,decrRefCount); @@ -43,80 +52,265 @@ redisClient *createClient(int fd) { c->pubsub_patterns = listCreate(); listSetFreeMethod(c->pubsub_patterns,decrRefCount); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); - if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, - readQueryFromClient, c) == AE_ERR) { - freeClient(c); - return NULL; - } listAddNodeTail(server.clients,c); initClientMultiState(c); return c; } -void addReply(redisClient *c, robj *obj) { - if (listLength(c->reply) == 0 && +/* Set the event loop to listen for write events on the client's socket. + * Typically gets called every time a reply is built. */ +int _installWriteEvent(redisClient *c) { + /* When CLOSE_AFTER_REPLY is set, no more replies may be added! */ + redisAssert(!(c->flags & REDIS_CLOSE_AFTER_REPLY)); + + if (c->fd <= 0) return REDIS_ERR; + if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, - sendReplyToClient, c) == AE_ERR) return; + sendReplyToClient, c) == AE_ERR) return REDIS_ERR; + return REDIS_OK; +} - if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) { - obj = dupStringObject(obj); - obj->refcount = 0; /* getDecodedObject() will increment the refcount */ +/* Create a duplicate of the last object in the reply list when + * it is not exclusively owned by the reply list. */ +robj *dupLastObjectIfNeeded(list *reply) { + robj *new, *cur; + listNode *ln; + redisAssert(listLength(reply) > 0); + ln = listLast(reply); + cur = listNodeValue(ln); + if (cur->refcount > 1) { + new = dupStringObject(cur); + decrRefCount(cur); + listNodeValue(ln) = new; } - listAddNodeTail(c->reply,getDecodedObject(obj)); + return listNodeValue(ln); } -void addReplySds(redisClient *c, sds s) { - robj *o = createObject(REDIS_STRING,s); - addReply(c,o); - decrRefCount(o); +int _addReplyToBuffer(redisClient *c, char *s, size_t len) { + size_t available = sizeof(c->buf)-c->bufpos; + + /* If there already are entries in the reply list, we cannot + * add anything more to the static buffer. */ + if (listLength(c->reply) > 0) return REDIS_ERR; + + /* Check that the buffer has enough space available for this string. */ + if (len > available) return REDIS_ERR; + + memcpy(c->buf+c->bufpos,s,len); + c->bufpos+=len; + return REDIS_OK; } -void addReplyDouble(redisClient *c, double d) { - char buf[128]; +void _addReplyObjectToList(redisClient *c, robj *o) { + robj *tail; + if (listLength(c->reply) == 0) { + incrRefCount(o); + listAddNodeTail(c->reply,o); + } else { + tail = listNodeValue(listLast(c->reply)); - snprintf(buf,sizeof(buf),"%.17g",d); - addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n", - (unsigned long) strlen(buf),buf)); + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); + } else { + incrRefCount(o); + listAddNodeTail(c->reply,o); + } + } } -void addReplyLongLong(redisClient *c, long long ll) { - char buf[128]; - size_t len; +/* This method takes responsibility over the sds. When it is no longer + * needed it will be free'd, otherwise it ends up in a robj. */ +void _addReplySdsToList(redisClient *c, sds s) { + robj *tail; + if (listLength(c->reply) == 0) { + listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); + } else { + tail = listNodeValue(listLast(c->reply)); - if (ll == 0) { - addReply(c,shared.czero); - return; - } else if (ll == 1) { - addReply(c,shared.cone); + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,s,sdslen(s)); + sdsfree(s); + } else { + listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); + } + } +} + +void _addReplyStringToList(redisClient *c, char *s, size_t len) { + robj *tail; + if (listLength(c->reply) == 0) { + listAddNodeTail(c->reply,createStringObject(s,len)); + } else { + tail = listNodeValue(listLast(c->reply)); + + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,s,len); + } else { + listAddNodeTail(c->reply,createStringObject(s,len)); + } + } +} + +void addReply(redisClient *c, robj *obj) { + if (_installWriteEvent(c) != REDIS_OK) return; + + /* This is an important place where we can avoid copy-on-write + * when there is a saving child running, avoiding touching the + * refcount field of the object if it's not needed. + * + * If the encoding is RAW and there is room in the static buffer + * we'll be able to send the object to the client without + * messing with its page. */ + if (obj->encoding == REDIS_ENCODING_RAW) { + if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) + _addReplyObjectToList(c,obj); + } else { + /* FIXME: convert the long into string and use _addReplyToBuffer() + * instead of calling getDecodedObject. As this place in the + * code is too performance critical. */ + obj = getDecodedObject(obj); + if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) + _addReplyObjectToList(c,obj); + decrRefCount(obj); + } +} + +void addReplySds(redisClient *c, sds s) { + if (_installWriteEvent(c) != REDIS_OK) { + /* The caller expects the sds to be free'd. */ + sdsfree(s); return; } - buf[0] = ':'; + if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) { + sdsfree(s); + } else { + /* This method free's the sds when it is no longer needed. */ + _addReplySdsToList(c,s); + } +} + +void addReplyString(redisClient *c, char *s, size_t len) { + if (_installWriteEvent(c) != REDIS_OK) return; + if (_addReplyToBuffer(c,s,len) != REDIS_OK) + _addReplyStringToList(c,s,len); +} + +void _addReplyError(redisClient *c, char *s, size_t len) { + addReplyString(c,"-ERR ",5); + addReplyString(c,s,len); + addReplyString(c,"\r\n",2); +} + +void addReplyError(redisClient *c, char *err) { + _addReplyError(c,err,strlen(err)); +} + +void addReplyErrorFormat(redisClient *c, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + sds s = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + _addReplyError(c,s,sdslen(s)); + sdsfree(s); +} + +void _addReplyStatus(redisClient *c, char *s, size_t len) { + addReplyString(c,"+",1); + addReplyString(c,s,len); + addReplyString(c,"\r\n",2); +} + +void addReplyStatus(redisClient *c, char *status) { + _addReplyStatus(c,status,strlen(status)); +} + +void addReplyStatusFormat(redisClient *c, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + sds s = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + _addReplyStatus(c,s,sdslen(s)); + sdsfree(s); +} + +/* Adds an empty object to the reply list that will contain the multi bulk + * length, which is not known when this function is called. */ +void *addDeferredMultiBulkLength(redisClient *c) { + /* Note that we install the write event here even if the object is not + * ready to be sent, since we are sure that before returning to the + * event loop setDeferredMultiBulkLength() will be called. */ + if (_installWriteEvent(c) != REDIS_OK) return NULL; + listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL)); + return listLast(c->reply); +} + +/* Populate the length object and try glueing it to the next chunk. */ +void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { + listNode *ln = (listNode*)node; + robj *len, *next; + + /* Abort when *node is NULL (see addDeferredMultiBulkLength). */ + if (node == NULL) return; + + len = listNodeValue(ln); + len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length); + if (ln->next != NULL) { + next = listNodeValue(ln->next); + + /* Only glue when the next node is non-NULL (an sds in this case) */ + if (next->ptr != NULL) { + len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr)); + listDelNode(c->reply,ln->next); + } + } +} + +/* Add a duble as a bulk reply */ +void addReplyDouble(redisClient *c, double d) { + char dbuf[128], sbuf[128]; + int dlen, slen; + dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d); + slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf); + addReplyString(c,sbuf,slen); +} + +/* Add a long long as integer reply or bulk len / multi bulk count. + * Basically this is used to output . */ +void _addReplyLongLong(redisClient *c, long long ll, char prefix) { + char buf[128]; + int len; + buf[0] = prefix; len = ll2string(buf+1,sizeof(buf)-1,ll); buf[len+1] = '\r'; buf[len+2] = '\n'; - addReplySds(c,sdsnewlen(buf,len+3)); + addReplyString(c,buf,len+3); } -void addReplyUlong(redisClient *c, unsigned long ul) { - char buf[128]; - size_t len; +void addReplyLongLong(redisClient *c, long long ll) { + _addReplyLongLong(c,ll,':'); +} - if (ul == 0) { - addReply(c,shared.czero); - return; - } else if (ul == 1) { - addReply(c,shared.cone); - return; - } - len = snprintf(buf,sizeof(buf),":%lu\r\n",ul); - addReplySds(c,sdsnewlen(buf,len)); +void addReplyMultiBulkLen(redisClient *c, long length) { + _addReplyLongLong(c,length,'*'); } +/* Create the length prefix of a bulk reply, example: $2234 */ void addReplyBulkLen(redisClient *c, robj *obj) { - size_t len, intlen; - char buf[128]; + size_t len; if (obj->encoding == REDIS_ENCODING_RAW) { len = sdslen(obj->ptr); @@ -133,47 +327,46 @@ void addReplyBulkLen(redisClient *c, robj *obj) { len++; } } - buf[0] = '$'; - intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len); - buf[intlen+1] = '\r'; - buf[intlen+2] = '\n'; - addReplySds(c,sdsnewlen(buf,intlen+3)); + _addReplyLongLong(c,len,'$'); } +/* Add a Redis Object as a bulk reply */ void addReplyBulk(redisClient *c, robj *obj) { addReplyBulkLen(c,obj); addReply(c,obj); addReply(c,shared.crlf); } -/* In the CONFIG command we need to add vanilla C string as bulk replies */ +/* Add a C buffer as bulk reply */ +void addReplyBulkCBuffer(redisClient *c, void *p, size_t len) { + _addReplyLongLong(c,len,'$'); + addReplyString(c,p,len); + addReply(c,shared.crlf); +} + +/* Add a C nul term string as bulk reply */ void addReplyBulkCString(redisClient *c, char *s) { if (s == NULL) { addReply(c,shared.nullbulk); } else { - robj *o = createStringObject(s,strlen(s)); - addReplyBulk(c,o); - decrRefCount(o); + addReplyBulkCBuffer(c,s,strlen(s)); } } -void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { - int cport, cfd; - char cip[128]; - redisClient *c; - REDIS_NOTUSED(el); - REDIS_NOTUSED(mask); - REDIS_NOTUSED(privdata); +/* Add a long long as a bulk reply */ +void addReplyBulkLongLong(redisClient *c, long long ll) { + char buf[64]; + int len; - cfd = anetAccept(server.neterr, fd, cip, &cport); - if (cfd == AE_ERR) { - redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); - return; - } - redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); - if ((c = createClient(cfd)) == NULL) { + len = ll2string(buf,64,ll); + addReplyBulkCBuffer(c,buf,len); +} + +static void acceptCommonHandler(int fd) { + redisClient *c; + if ((c = createClient(fd)) == NULL) { redisLog(REDIS_WARNING,"Error allocating resoures for the client"); - close(cfd); /* May be already closed, just ingore errors */ + close(fd); /* May be already closed, just ingore errors */ return; } /* If maxclient directive is set and this is one client more... close the @@ -193,15 +386,43 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { server.stat_numconnections++; } +void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cport, cfd; + char cip[128]; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + REDIS_NOTUSED(privdata); + + cfd = anetTcpAccept(server.neterr, fd, cip, &cport); + if (cfd == AE_ERR) { + redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); + return; + } + redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); + acceptCommonHandler(cfd); +} + +void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cfd; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + REDIS_NOTUSED(privdata); + + cfd = anetUnixAccept(server.neterr, fd); + if (cfd == AE_ERR) { + redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); + return; + } + redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket); + acceptCommonHandler(cfd); +} + + static void freeClientArgv(redisClient *c) { int j; - for (j = 0; j < c->argc; j++) decrRefCount(c->argv[j]); - for (j = 0; j < c->mbargc; j++) - decrRefCount(c->mbargv[j]); c->argc = 0; - c->mbargc = 0; } void freeClient(redisClient *c) { @@ -235,22 +456,35 @@ void freeClient(redisClient *c) { ln = listSearchKey(server.clients,c); redisAssert(ln != NULL); listDelNode(server.clients,ln); - /* Remove from the list of clients that are now ready to be restarted - * after waiting for swapped keys */ - if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) { - ln = listSearchKey(server.io_ready_clients,c); - if (ln) { + /* When client was just unblocked because of a blocking operation, + * remove it from the list with unblocked clients. */ + if (c->flags & REDIS_UNBLOCKED) { + ln = listSearchKey(server.unblocked_clients,c); + redisAssert(ln != NULL); + listDelNode(server.unblocked_clients,ln); + } + /* Remove from the list of clients waiting for swapped keys, or ready + * to be restarted, but not yet woken up again. */ + if (c->flags & REDIS_IO_WAIT) { + redisAssert(server.ds_enabled); + if (listLength(c->io_keys) == 0) { + ln = listSearchKey(server.io_ready_clients,c); + + /* When this client is waiting to be woken up (REDIS_IO_WAIT), + * it should be present in the list io_ready_clients */ + redisAssert(ln != NULL); listDelNode(server.io_ready_clients,ln); - server.vm_blocked_clients--; + } else { + while (listLength(c->io_keys)) { + ln = listFirst(c->io_keys); + dontWaitForSwappedKey(c,ln->value); + } } - } - /* Remove from the list of clients waiting for swapped keys */ - while (server.vm_enabled && listLength(c->io_keys)) { - ln = listFirst(c->io_keys); - dontWaitForSwappedKey(c,ln->value); + server.cache_blocked_clients--; } listRelease(c->io_keys); - /* Master/slave cleanup */ + /* Master/slave cleanup. + * Case 1: we lost the connection with a slave. */ if (c->flags & REDIS_SLAVE) { if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) close(c->repldbfd); @@ -259,45 +493,27 @@ void freeClient(redisClient *c) { redisAssert(ln != NULL); listDelNode(l,ln); } + + /* Case 2: we lost the connection with the master. */ if (c->flags & REDIS_MASTER) { server.master = NULL; server.replstate = REDIS_REPL_CONNECT; + /* Since we lost the connection with the master, we should also + * close the connection with all our slaves if we have any, so + * when we'll resync with the master the other slaves will sync again + * with us as well. Note that also when the slave is not connected + * to the master it will keep refusing connections by other slaves. */ + while (listLength(server.slaves)) { + ln = listFirst(server.slaves); + freeClient((redisClient*)ln->value); + } } /* Release memory */ zfree(c->argv); - zfree(c->mbargv); freeClientMultiState(c); zfree(c); } -#define GLUEREPLY_UP_TO (1024) -static void glueReplyBuffersIfNeeded(redisClient *c) { - int copylen = 0; - char buf[GLUEREPLY_UP_TO]; - listNode *ln; - listIter li; - robj *o; - - listRewind(c->reply,&li); - while((ln = listNext(&li))) { - int objlen; - - o = ln->value; - objlen = sdslen(o->ptr); - if (copylen + objlen <= GLUEREPLY_UP_TO) { - memcpy(buf+copylen,o->ptr,objlen); - copylen += objlen; - listDelNode(c->reply,ln); - } else { - if (copylen == 0) return; - break; - } - } - /* Now the output buffer is empty, add the new single element */ - o = createObject(REDIS_STRING,sdsnewlen(buf,copylen)); - listAddNodeHead(c->reply,o); -} - void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { redisClient *c = privdata; int nwritten = 0, totwritten = 0, objlen; @@ -305,40 +521,48 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { REDIS_NOTUSED(el); REDIS_NOTUSED(mask); - /* Use writev() if we have enough buffers to send */ - if (!server.glueoutputbuf && - listLength(c->reply) > REDIS_WRITEV_THRESHOLD && - !(c->flags & REDIS_MASTER)) - { - sendReplyToClientWritev(el, fd, privdata, mask); - return; - } + while(c->bufpos > 0 || listLength(c->reply)) { + if (c->bufpos > 0) { + if (c->flags & REDIS_MASTER) { + /* Don't reply to a master */ + nwritten = c->bufpos - c->sentlen; + } else { + nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); + if (nwritten <= 0) break; + } + c->sentlen += nwritten; + totwritten += nwritten; - while(listLength(c->reply)) { - if (server.glueoutputbuf && listLength(c->reply) > 1) - glueReplyBuffersIfNeeded(c); + /* If the buffer was sent, set bufpos to zero to continue with + * the remainder of the reply. */ + if (c->sentlen == c->bufpos) { + c->bufpos = 0; + c->sentlen = 0; + } + } else { + o = listNodeValue(listFirst(c->reply)); + objlen = sdslen(o->ptr); - o = listNodeValue(listFirst(c->reply)); - objlen = sdslen(o->ptr); + if (objlen == 0) { + listDelNode(c->reply,listFirst(c->reply)); + continue; + } - if (objlen == 0) { - listDelNode(c->reply,listFirst(c->reply)); - continue; - } + if (c->flags & REDIS_MASTER) { + /* Don't reply to a master */ + nwritten = objlen - c->sentlen; + } else { + nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen); + if (nwritten <= 0) break; + } + c->sentlen += nwritten; + totwritten += nwritten; - if (c->flags & REDIS_MASTER) { - /* Don't reply to a master */ - nwritten = objlen - c->sentlen; - } else { - nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen); - if (nwritten <= 0) break; - } - c->sentlen += nwritten; - totwritten += nwritten; - /* If we fully sent the object on head go to the next one */ - if (c->sentlen == objlen) { - listDelNode(c->reply,listFirst(c->reply)); - c->sentlen = 0; + /* If we fully sent the object on head go to the next one */ + if (c->sentlen == objlen) { + listDelNode(c->reply,listFirst(c->reply)); + c->sentlen = 0; + } } /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT * bytes, in a single threaded server it's a good idea to serve @@ -361,92 +585,18 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { if (listLength(c->reply) == 0) { c->sentlen = 0; aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); - } -} -void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask) -{ - redisClient *c = privdata; - int nwritten = 0, totwritten = 0, objlen, willwrite; - robj *o; - struct iovec iov[REDIS_WRITEV_IOVEC_COUNT]; - int offset, ion = 0; - REDIS_NOTUSED(el); - REDIS_NOTUSED(mask); - - listNode *node; - while (listLength(c->reply)) { - offset = c->sentlen; - ion = 0; - willwrite = 0; - - /* fill-in the iov[] array */ - for(node = listFirst(c->reply); node; node = listNextNode(node)) { - o = listNodeValue(node); - objlen = sdslen(o->ptr); - - if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT) - break; - - if(ion == REDIS_WRITEV_IOVEC_COUNT) - break; /* no more iovecs */ - - iov[ion].iov_base = ((char*)o->ptr) + offset; - iov[ion].iov_len = objlen - offset; - willwrite += objlen - offset; - offset = 0; /* just for the first item */ - ion++; - } - - if(willwrite == 0) - break; - - /* write all collected blocks at once */ - if((nwritten = writev(fd, iov, ion)) < 0) { - if (errno != EAGAIN) { - redisLog(REDIS_VERBOSE, - "Error writing to client: %s", strerror(errno)); - freeClient(c); - return; - } - break; - } - - totwritten += nwritten; - offset = c->sentlen; - - /* remove written robjs from c->reply */ - while (nwritten && listLength(c->reply)) { - o = listNodeValue(listFirst(c->reply)); - objlen = sdslen(o->ptr); - - if(nwritten >= objlen - offset) { - listDelNode(c->reply, listFirst(c->reply)); - nwritten -= objlen - offset; - c->sentlen = 0; - } else { - /* partial write */ - c->sentlen += nwritten; - break; - } - offset = 0; - } - } - - if (totwritten > 0) - c->lastinteraction = time(NULL); - - if (listLength(c->reply) == 0) { - c->sentlen = 0; - aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); + /* Close connection after entire reply has been sent. */ + if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c); } } /* resetClient prepare the client to process the next command */ void resetClient(redisClient *c) { freeClientArgv(c); + c->reqtype = 0; + c->multibulklen = 0; c->bulklen = -1; - c->multibulk = 0; } void closeTimedoutClients(void) { @@ -461,6 +611,7 @@ void closeTimedoutClients(void) { if (server.maxidletime && !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ !(c->flags & REDIS_MASTER) && /* no timeout for masters */ + !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */ dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ listLength(c->pubsub_patterns) == 0 && (now - c->lastinteraction > server.maxidletime)) @@ -468,7 +619,7 @@ void closeTimedoutClients(void) { redisLog(REDIS_VERBOSE,"Closing idle client"); freeClient(c); } else if (c->flags & REDIS_BLOCKED) { - if (c->blockingto != 0 && c->blockingto < now) { + if (c->bpop.timeout != 0 && c->bpop.timeout < now) { addReply(c,shared.nullmultibulk); unblockClientWaitingData(c); } @@ -476,84 +627,172 @@ void closeTimedoutClients(void) { } } -void processInputBuffer(redisClient *c) { -again: - /* Before to process the input buffer, make sure the client is not - * waitig for a blocking operation such as BLPOP. Note that the first - * iteration the client is never blocked, otherwise the processInputBuffer - * would not be called at all, but after the execution of the first commands - * in the input buffer the client may be blocked, and the "goto again" - * will try to reiterate. The following line will make it return asap. */ - if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; - if (c->bulklen == -1) { - /* Read the first line of the query */ - char *p = strchr(c->querybuf,'\n'); - size_t querylen; - - if (p) { - sds query, *argv; - int argc, j; - - query = c->querybuf; - c->querybuf = sdsempty(); - querylen = 1+(p-(query)); - if (sdslen(query) > querylen) { - /* leave data after the first line of the query in the buffer */ - c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen); - } - *p = '\0'; /* remove "\n" */ - if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */ - sdsupdatelen(query); - - /* Now we can split the query in arguments */ - argv = sdssplitlen(query,sdslen(query)," ",1,&argc); - sdsfree(query); - - if (c->argv) zfree(c->argv); - c->argv = zmalloc(sizeof(robj*)*argc); - - for (j = 0; j < argc; j++) { - if (sdslen(argv[j])) { - c->argv[c->argc] = createObject(REDIS_STRING,argv[j]); - c->argc++; - } else { - sdsfree(argv[j]); +int processInlineBuffer(redisClient *c) { + char *newline = strstr(c->querybuf,"\r\n"); + int argc, j; + sds *argv; + size_t querylen; + + /* Nothing to do without a \r\n */ + if (newline == NULL) + return REDIS_ERR; + + /* Split the input buffer up to the \r\n */ + querylen = newline-(c->querybuf); + argv = sdssplitlen(c->querybuf,querylen," ",1,&argc); + + /* Leave data after the first line of the query in the buffer */ + c->querybuf = sdsrange(c->querybuf,querylen+2,-1); + + /* Setup argv array on client structure */ + if (c->argv) zfree(c->argv); + c->argv = zmalloc(sizeof(robj*)*argc); + + /* Create redis objects for all arguments. */ + for (c->argc = 0, j = 0; j < argc; j++) { + if (sdslen(argv[j])) { + c->argv[c->argc] = createObject(REDIS_STRING,argv[j]); + c->argc++; + } else { + sdsfree(argv[j]); + } + } + zfree(argv); + return REDIS_OK; +} + +/* Helper function. Trims query buffer to make the function that processes + * multi bulk requests idempotent. */ +static void setProtocolError(redisClient *c, int pos) { + c->flags |= REDIS_CLOSE_AFTER_REPLY; + c->querybuf = sdsrange(c->querybuf,pos,-1); +} + +int processMultibulkBuffer(redisClient *c) { + char *newline = NULL; + char *eptr; + int pos = 0, tolerr; + long bulklen; + + if (c->multibulklen == 0) { + /* The client should have been reset */ + redisAssert(c->argc == 0); + + /* Multi bulk length cannot be read without a \r\n */ + newline = strstr(c->querybuf,"\r\n"); + if (newline == NULL) + return REDIS_ERR; + + /* We know for sure there is a whole line since newline != NULL, + * so go ahead and find out the multi bulk length. */ + redisAssert(c->querybuf[0] == '*'); + c->multibulklen = strtol(c->querybuf+1,&eptr,10); + pos = (newline-c->querybuf)+2; + if (c->multibulklen <= 0) { + c->querybuf = sdsrange(c->querybuf,pos,-1); + return REDIS_OK; + } else if (c->multibulklen > 1024*1024) { + addReplyError(c,"Protocol error: invalid multibulk length"); + setProtocolError(c,pos); + return REDIS_ERR; + } + + /* Setup argv array on client structure */ + if (c->argv) zfree(c->argv); + c->argv = zmalloc(sizeof(robj*)*c->multibulklen); + + /* Search new newline */ + newline = strstr(c->querybuf+pos,"\r\n"); + } + + redisAssert(c->multibulklen > 0); + while(c->multibulklen) { + /* Read bulk length if unknown */ + if (c->bulklen == -1) { + newline = strstr(c->querybuf+pos,"\r\n"); + if (newline != NULL) { + if (c->querybuf[pos] != '$') { + addReplyErrorFormat(c, + "Protocol error: expected '$', got '%c'", + c->querybuf[pos]); + setProtocolError(c,pos); + return REDIS_ERR; + } + + bulklen = strtol(c->querybuf+pos+1,&eptr,10); + tolerr = (eptr[0] != '\r'); + if (tolerr || bulklen == LONG_MIN || bulklen == LONG_MAX || + bulklen < 0 || bulklen > 512*1024*1024) + { + addReplyError(c,"Protocol error: invalid bulk length"); + setProtocolError(c,pos); + return REDIS_ERR; } + pos += eptr-(c->querybuf+pos)+2; + c->bulklen = bulklen; + } else { + /* No newline in current buffer, so wait for more data */ + break; } - zfree(argv); - if (c->argc) { - /* Execute the command. If the client is still valid - * after processCommand() return and there is something - * on the query buffer try to process the next command. */ - if (processCommand(c) && sdslen(c->querybuf)) goto again; + } + + /* Read bulk argument */ + if (sdslen(c->querybuf)-pos < (unsigned)(c->bulklen+2)) { + /* Not enough data (+2 == trailing \r\n) */ + break; + } else { + c->argv[c->argc++] = createStringObject(c->querybuf+pos,c->bulklen); + pos += c->bulklen+2; + c->bulklen = -1; + c->multibulklen--; + } + } + + /* Trim to pos */ + c->querybuf = sdsrange(c->querybuf,pos,-1); + + /* We're done when c->multibulk == 0 */ + if (c->multibulklen == 0) { + return REDIS_OK; + } + return REDIS_ERR; +} + +void processInputBuffer(redisClient *c) { + /* Keep processing while there is something in the input buffer */ + while(sdslen(c->querybuf)) { + /* Immediately abort if the client is in the middle of something. */ + if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; + + /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is + * written to the client. Make sure to not let the reply grow after + * this flag has been set (i.e. don't process more commands). */ + if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; + + /* Determine request type when unknown. */ + if (!c->reqtype) { + if (c->querybuf[0] == '*') { + c->reqtype = REDIS_REQ_MULTIBULK; } else { - /* Nothing to process, argc == 0. Just process the query - * buffer if it's not empty or return to the caller */ - if (sdslen(c->querybuf)) goto again; + c->reqtype = REDIS_REQ_INLINE; } - return; - } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) { - redisLog(REDIS_VERBOSE, "Client protocol error"); - freeClient(c); - return; } - } else { - /* Bulk read handling. Note that if we are at this point - the client already sent a command terminated with a newline, - we are reading the bulk data that is actually the last - argument of the command. */ - int qbl = sdslen(c->querybuf); - - if (c->bulklen <= qbl) { - /* Copy everything but the final CRLF as final argument */ - c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2); - c->argc++; - c->querybuf = sdsrange(c->querybuf,c->bulklen,-1); - /* Process the command. If the client is still valid after - * the processing and there is more data in the buffer - * try to parse it. */ - if (processCommand(c) && sdslen(c->querybuf)) goto again; - return; + + if (c->reqtype == REDIS_REQ_INLINE) { + if (processInlineBuffer(c) != REDIS_OK) break; + } else if (c->reqtype == REDIS_REQ_MULTIBULK) { + if (processMultibulkBuffer(c) != REDIS_OK) break; + } else { + redisPanic("Unknown request type"); + } + + /* Multibulk processing could see a <= 0 length. */ + if (c->argc == 0) { + resetClient(c); + } else { + /* Only reset the client when the command was executed. */ + if (processCommand(c) == REDIS_OK) + resetClient(c); } } } @@ -580,10 +819,29 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } if (nread) { - c->querybuf = sdscatlen(c->querybuf, buf, nread); + c->querybuf = sdscatlen(c->querybuf,buf,nread); c->lastinteraction = time(NULL); } else { return; } processInputBuffer(c); } + +void getClientsMaxBuffers(unsigned long *longest_output_list, + unsigned long *biggest_input_buffer) { + redisClient *c; + listNode *ln; + listIter li; + unsigned long lol = 0, bib = 0; + + listRewind(server.clients,&li); + while ((ln = listNext(&li)) != NULL) { + c = listNodeValue(ln); + + if (listLength(c->reply) > lol) lol = listLength(c->reply); + if (sdslen(c->querybuf) > bib) bib = sdslen(c->querybuf); + } + *longest_output_list = lol; + *biggest_input_buffer = bib; +} +