X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/b301c1fc2bbf977a7d9fd4718cd9914113541c75..30d31cc8bb416f67183a218f1511ef517eb9ae3b:/src/networking.c diff --git a/src/networking.c b/src/networking.c index 464d5e02..632fd047 100644 --- a/src/networking.c +++ b/src/networking.c @@ -11,23 +11,24 @@ int listMatchObjects(void *a, void *b) { } redisClient *createClient(int fd) { - redisClient *c; - - /* Make sure to allocate a multiple of the page size to prevent wasting - * memory. A page size of 4096 is assumed here. We need to compensate - * for the zmalloc overhead of sizeof(size_t) bytes. */ - size_t size = 8192-sizeof(size_t); - redisAssert(size > sizeof(redisClient)); - c = zmalloc(size); - c->buflen = size-sizeof(redisClient); + redisClient *c = zmalloc(sizeof(redisClient)); c->bufpos = 0; anetNonBlock(NULL,fd); anetTcpNoDelay(NULL,fd); if (!c) return NULL; + if (aeCreateFileEvent(server.el,fd,AE_READABLE, + readQueryFromClient, c) == AE_ERR) + { + close(fd); + zfree(c); + return NULL; + } + selectDb(c,0); c->fd = fd; c->querybuf = sdsempty(); + c->newline = NULL; c->argc = 0; c->argv = NULL; c->bulklen = -1; @@ -51,17 +52,13 @@ redisClient *createClient(int fd) { c->pubsub_patterns = listCreate(); listSetFreeMethod(c->pubsub_patterns,decrRefCount); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); - if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, - readQueryFromClient, c) == AE_ERR) { - freeClient(c); - return NULL; - } listAddNodeTail(server.clients,c); initClientMultiState(c); return c; } -int _ensureFileEvent(redisClient *c) { +int _installWriteEvent(redisClient *c) { + if (c->fd <= 0) return REDIS_ERR; if (c->bufpos == 0 && listLength(c->reply) == 0 && (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && @@ -70,86 +67,187 @@ int _ensureFileEvent(redisClient *c) { return REDIS_OK; } -void _addReplyObjectToList(redisClient *c, robj *obj) { - redisAssert(obj->type == REDIS_STRING && - obj->encoding == REDIS_ENCODING_RAW); - listAddNodeTail(c->reply,obj); +/* Create a duplicate of the last object in the reply list when + * it is not exclusively owned by the reply list. */ +robj *dupLastObjectIfNeeded(list *reply) { + robj *new, *cur; + listNode *ln; + redisAssert(listLength(reply) > 0); + ln = listLast(reply); + cur = listNodeValue(ln); + if (cur->refcount > 1) { + new = dupStringObject(cur); + decrRefCount(cur); + listNodeValue(ln) = new; + } + return listNodeValue(ln); } -void _ensureBufferInReplyList(redisClient *c) { - sds buffer = sdsnewlen(NULL,REDIS_REPLY_CHUNK_SIZE); - sdsupdatelen(buffer); /* sdsnewlen expects non-empty string */ - listAddNodeTail(c->reply,createObject(REDIS_REPLY_NODE,buffer)); +int _addReplyToBuffer(redisClient *c, char *s, size_t len) { + size_t available = sizeof(c->buf)-c->bufpos; + + /* If there already are entries in the reply list, we cannot + * add anything more to the static buffer. */ + if (listLength(c->reply) > 0) return REDIS_ERR; + + /* Check that the buffer has enough space available for this string. */ + if (len > available) return REDIS_ERR; + + memcpy(c->buf+c->bufpos,s,len); + c->bufpos+=len; + return REDIS_OK; } -void _addReplyStringToBuffer(redisClient *c, char *s, size_t len) { - size_t available = 0; - redisAssert(len < REDIS_REPLY_CHUNK_THRESHOLD); - if (listLength(c->reply) > 0) { - robj *o = listNodeValue(listLast(c->reply)); +void _addReplyObjectToList(redisClient *c, robj *o) { + robj *tail; + if (listLength(c->reply) == 0) { + incrRefCount(o); + listAddNodeTail(c->reply,o); + } else { + tail = listNodeValue(listLast(c->reply)); - /* Make sure to append to a reply node with enough bytes available. */ - if (o->type == REDIS_REPLY_NODE) available = sdsavail(o->ptr); - if (o->type != REDIS_REPLY_NODE || len > available) { - _ensureBufferInReplyList(c); - _addReplyStringToBuffer(c,s,len); + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr)); } else { - o->ptr = sdscatlen(o->ptr,s,len); + incrRefCount(o); + listAddNodeTail(c->reply,o); } + } +} + +/* This method takes responsibility over the sds. When it is no longer + * needed it will be free'd, otherwise it ends up in a robj. */ +void _addReplySdsToList(redisClient *c, sds s) { + robj *tail; + if (listLength(c->reply) == 0) { + listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); } else { - available = c->buflen-c->bufpos; - if (len > available) { - _ensureBufferInReplyList(c); - _addReplyStringToBuffer(c,s,len); + tail = listNodeValue(listLast(c->reply)); + + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,s,sdslen(s)); + sdsfree(s); } else { - memcpy(c->buf+c->bufpos,s,len); - c->bufpos += len; + listAddNodeTail(c->reply,createObject(REDIS_STRING,s)); } } } -void addReply(redisClient *c, robj *obj) { - if (_ensureFileEvent(c) != REDIS_OK) return; - if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) { - /* Returns a new object with refcount 1 */ - obj = dupStringObject(obj); +void _addReplyStringToList(redisClient *c, char *s, size_t len) { + robj *tail; + if (listLength(c->reply) == 0) { + listAddNodeTail(c->reply,createStringObject(s,len)); } else { - /* This increments the refcount. */ - obj = getDecodedObject(obj); + tail = listNodeValue(listLast(c->reply)); + + /* Append to this object when possible. */ + if (tail->ptr != NULL && + sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES) + { + tail = dupLastObjectIfNeeded(c->reply); + tail->ptr = sdscatlen(tail->ptr,s,len); + } else { + listAddNodeTail(c->reply,createStringObject(s,len)); + } } +} - if (sdslen(obj->ptr) < REDIS_REPLY_CHUNK_THRESHOLD) { - _addReplyStringToBuffer(c,obj->ptr,sdslen(obj->ptr)); - decrRefCount(obj); +void addReply(redisClient *c, robj *obj) { + if (_installWriteEvent(c) != REDIS_OK) return; + redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY); + + /* This is an important place where we can avoid copy-on-write + * when there is a saving child running, avoiding touching the + * refcount field of the object if it's not needed. + * + * If the encoding is RAW and there is room in the static buffer + * we'll be able to send the object to the client without + * messing with its page. */ + if (obj->encoding == REDIS_ENCODING_RAW) { + if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) + _addReplyObjectToList(c,obj); } else { - _addReplyObjectToList(c,obj); + obj = getDecodedObject(obj); + if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK) + _addReplyObjectToList(c,obj); + decrRefCount(obj); } } void addReplySds(redisClient *c, sds s) { - if (_ensureFileEvent(c) != REDIS_OK) return; - if (sdslen(s) < REDIS_REPLY_CHUNK_THRESHOLD) { - _addReplyStringToBuffer(c,s,sdslen(s)); + if (_installWriteEvent(c) != REDIS_OK) { + /* The caller expects the sds to be free'd. */ + sdsfree(s); + return; + } + if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) { sdsfree(s); } else { - _addReplyObjectToList(c,createObject(REDIS_STRING,s)); + /* This method free's the sds when it is no longer needed. */ + _addReplySdsToList(c,s); } } void addReplyString(redisClient *c, char *s, size_t len) { - if (_ensureFileEvent(c) != REDIS_OK) return; - if (len < REDIS_REPLY_CHUNK_THRESHOLD) { - _addReplyStringToBuffer(c,s,len); - } else { - _addReplyObjectToList(c,createStringObject(s,len)); - } + if (_installWriteEvent(c) != REDIS_OK) return; + if (_addReplyToBuffer(c,s,len) != REDIS_OK) + _addReplyStringToList(c,s,len); +} + +void _addReplyError(redisClient *c, char *s, size_t len) { + addReplyString(c,"-ERR ",5); + addReplyString(c,s,len); + addReplyString(c,"\r\n",2); +} + +void addReplyError(redisClient *c, char *err) { + _addReplyError(c,err,strlen(err)); +} + +void addReplyErrorFormat(redisClient *c, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + sds s = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + _addReplyError(c,s,sdslen(s)); + sdsfree(s); +} + +void _addReplyStatus(redisClient *c, char *s, size_t len) { + addReplyString(c,"+",1); + addReplyString(c,s,len); + addReplyString(c,"\r\n",2); +} + +void addReplyStatus(redisClient *c, char *status) { + _addReplyStatus(c,status,strlen(status)); +} + +void addReplyStatusFormat(redisClient *c, const char *fmt, ...) { + va_list ap; + va_start(ap,fmt); + sds s = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + _addReplyStatus(c,s,sdslen(s)); + sdsfree(s); } /* Adds an empty object to the reply list that will contain the multi bulk * length, which is not known when this function is called. */ void *addDeferredMultiBulkLength(redisClient *c) { - if (_ensureFileEvent(c) != REDIS_OK) return NULL; - _addReplyObjectToList(c,createObject(REDIS_STRING,NULL)); + /* Note that we install the write event here even if the object is not + * ready to be sent, since we are sure that before returning to the + * event loop setDeferredMultiBulkLength() will be called. */ + if (_installWriteEvent(c) != REDIS_OK) return NULL; + listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL)); return listLast(c->reply); } @@ -165,8 +263,9 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) { len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length); if (ln->next != NULL) { next = listNodeValue(ln->next); - /* Only glue when the next node is a reply chunk. */ - if (next->type == REDIS_REPLY_NODE) { + + /* Only glue when the next node is non-NULL (an sds in this case) */ + if (next->ptr != NULL) { len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr)); listDelNode(c->reply,ln->next); } @@ -195,8 +294,8 @@ void addReplyLongLong(redisClient *c, long long ll) { _addReplyLongLong(c,ll,':'); } -void addReplyUlong(redisClient *c, unsigned long ul) { - _addReplyLongLong(c,(long long)ul,':'); +void addReplyMultiBulkLen(redisClient *c, long length) { + _addReplyLongLong(c,length,'*'); } void addReplyBulkLen(redisClient *c, robj *obj) { @@ -533,6 +632,7 @@ void resetClient(redisClient *c) { freeClientArgv(c); c->bulklen = -1; c->multibulk = 0; + c->newline = NULL; } void closeTimedoutClients(void) { @@ -564,6 +664,8 @@ void closeTimedoutClients(void) { } void processInputBuffer(redisClient *c) { + int seeknewline = 0; + again: /* Before to process the input buffer, make sure the client is not * waitig for a blocking operation such as BLPOP. Note that the first @@ -572,15 +674,19 @@ again: * in the input buffer the client may be blocked, and the "goto again" * will try to reiterate. The following line will make it return asap. */ if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; + + if (seeknewline && c->bulklen == -1) c->newline = strchr(c->querybuf,'\n'); + seeknewline = 1; if (c->bulklen == -1) { /* Read the first line of the query */ - char *p = strchr(c->querybuf,'\n'); size_t querylen; - if (p) { + if (c->newline) { + char *p = c->newline; sds query, *argv; int argc, j; + c->newline = NULL; query = c->querybuf; c->querybuf = sdsempty(); querylen = 1+(p-(query)); @@ -667,8 +773,14 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } if (nread) { + size_t oldlen = sdslen(c->querybuf); c->querybuf = sdscatlen(c->querybuf, buf, nread); c->lastinteraction = time(NULL); + /* Scan this new piece of the query for the newline. We do this + * here in order to make sure we perform this scan just one time + * per piece of buffer, leading to an O(N) scan instead of O(N*N) */ + if (c->bulklen == -1 && c->newline == NULL) + c->newline = strchr(c->querybuf+oldlen,'\n'); } else { return; }