From d6704c9bd0f03f277ee23a4e3e1fc86a74e130b3 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 21 Aug 2012 17:27:01 +0200 Subject: [PATCH] hiredis library updated. This version of hiredis merges modifications of the Redis fork with latest changes in the hiredis repository. The same version was pushed on the hiredis repository and will probably merged into the master branch in short time. --- deps/hiredis/README.md | 29 ++++++++++++++++++++++++++++- deps/hiredis/async.c | 40 +++++++++++++++++++++++++++++----------- deps/hiredis/hiredis.c | 7 +++---- deps/hiredis/hiredis.h | 6 ++++++ deps/hiredis/net.c | 36 ++++++++++++++++++++++++------------ 5 files changed, 90 insertions(+), 28 deletions(-) diff --git a/deps/hiredis/README.md b/deps/hiredis/README.md index a58101cc..62fe1067 100644 --- a/deps/hiredis/README.md +++ b/deps/hiredis/README.md @@ -73,7 +73,7 @@ convert it to the protocol used to communicate with Redis. One or more spaces separates arguments, so you can use the specifiers anywhere in an argument: - reply = redisCommand("SET key:%s %s", myid, value); + reply = redisCommand(context, "SET key:%s %s", myid, value); ### Using replies @@ -320,6 +320,10 @@ The reply parsing API consists of the following functions: int redisReaderFeed(redisReader *reader, const char *buf, size_t len); int redisReaderGetReply(redisReader *reader, void **reply); +The same set of functions are used internally by hiredis when creating a +normal Redis context, the above API just exposes it to the user for a direct +usage. + ### Usage The function `redisReaderCreate` creates a `redisReader` structure that holds a @@ -346,6 +350,29 @@ immediately after creating the `redisReader`. For example, [hiredis-rb](https://github.com/pietern/hiredis-rb/blob/master/ext/hiredis_ext/reader.c) uses customized reply object functions to create Ruby objects. +### Reader max buffer + +Both when using the Reader API directly or when using it indirectly via a +normal Redis context, the redisReader structure uses a buffer in order to +accumulate data from the server. +Usually this buffer is destroyed when it is empty and is larger than 16 +kb in order to avoid wasting memory in unused buffers + +However when working with very big payloads destroying the buffer may slow +down performances considerably, so it is possible to modify the max size of +an idle buffer changing the value of the `maxbuf` field of the reader structure +to the desired value. The special value of 0 means that there is no maximum +value for an idle buffer, so the buffer will never get freed. + +For instance if you have a normal Redis context you can set the maximum idle +buffer to zero (unlimited) just with: + + context->reader->maxbuf = 0; + +This should be done only in order to maximize performances when working with +large payloads. The context should be set back to `REDIS_READER_MAX_BUF` again +as soon as possible in order to prevent allocation of useless memory. + ## AUTHORS Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and diff --git a/deps/hiredis/async.c b/deps/hiredis/async.c index f83e2f51..f65f8694 100644 --- a/deps/hiredis/async.c +++ b/deps/hiredis/async.c @@ -372,6 +372,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } + + /* If monitor mode, repush callback */ + if(c->flags & REDIS_MONITORING) { + __redisPushCallback(&ac->replies,&cb); + } /* When the connection is not being disconnected, simply stop * trying to get replies and wait for the next loop tick. */ @@ -381,22 +386,31 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* Even if the context is subscribed, pending regular callbacks will * get a reply before pub/sub messages arrive. */ if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { - /* A spontaneous reply in a not-subscribed context can only be the - * error reply that is sent when a new connection exceeds the - * maximum number of allowed connections on the server side. This - * is seen as an error instead of a regular reply because the - * server closes the connection after sending it. To prevent the - * error from being overwritten by an EOF error the connection is - * closed here. See issue #43. */ - if ( !(c->flags & REDIS_SUBSCRIBED) && ((redisReply*)reply)->type == REDIS_REPLY_ERROR ) { + /* + * A spontaneous reply in a not-subscribed context can be the error + * reply that is sent when a new connection exceeds the maximum + * number of allowed connections on the server side. + * + * This is seen as an error instead of a regular reply because the + * server closes the connection after sending it. + * + * To prevent the error from being overwritten by an EOF error the + * connection is closed here. See issue #43. + * + * Another possibility is that the server is loading its dataset. + * In this case we also want to close the connection, and have the + * user wait until the server is ready to take our request. + */ + if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) { c->err = REDIS_ERR_OTHER; snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str); __redisAsyncDisconnect(ac); return; } - /* No more regular callbacks and no errors, the context *must* be subscribed. */ - assert(c->flags & REDIS_SUBSCRIBED); - __redisGetSubscribeCallback(ac,reply,&cb); + /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */ + assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)); + if(c->flags & REDIS_SUBSCRIBED) + __redisGetSubscribeCallback(ac,reply,&cb); } if (cb.fn != NULL) { @@ -557,6 +571,10 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void /* (P)UNSUBSCRIBE does not have its own response: every channel or * pattern that is unsubscribed will receive a message. This means we * should not append a callback function for this command. */ + } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) { + /* Set monitor flag and push callback */ + c->flags |= REDIS_MONITORING; + __redisPushCallback(&ac->replies,&cb); } else { if (c->flags & REDIS_SUBSCRIBED) /* This will likely result in an error reply, but it needs to be diff --git a/deps/hiredis/hiredis.c b/deps/hiredis/hiredis.c index e6109db8..4709ee32 100644 --- a/deps/hiredis/hiredis.c +++ b/deps/hiredis/hiredis.c @@ -446,7 +446,7 @@ static int processMultiBulkItem(redisReader *r) { long elements; int root = 0; - /* Set error for nested multi bulks with depth > 2 */ + /* Set error for nested multi bulks with depth > 7 */ if (r->ridx == 8) { __redisReaderSetError(r,REDIS_ERR_PROTOCOL, "No support for nested multi bulk replies with depth > 7"); @@ -564,6 +564,7 @@ redisReader *redisReaderCreate(void) { r->errstr[0] = '\0'; r->fn = &defaultFunctions; r->buf = sdsempty(); + r->maxbuf = REDIS_READER_MAX_BUF; if (r->buf == NULL) { free(r); return NULL; @@ -590,9 +591,8 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) { /* Copy the provided buffer. */ if (buf != NULL && len >= 1) { -#if 0 /* Destroy internal buffer when it is empty and is quite large. */ - if (r->len == 0 && sdsavail(r->buf) > 16*1024) { + if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) { sdsfree(r->buf); r->buf = sdsempty(); r->pos = 0; @@ -600,7 +600,6 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) { /* r->buf should not be NULL since we just free'd a larger one. */ assert(r->buf != NULL); } -#endif newbuf = sdscatlen(r->buf,buf,len); if (newbuf == NULL) { diff --git a/deps/hiredis/hiredis.h b/deps/hiredis/hiredis.h index a73f50e9..b922831e 100644 --- a/deps/hiredis/hiredis.h +++ b/deps/hiredis/hiredis.h @@ -76,6 +76,9 @@ /* Flag that is set when the async context has one or more subscriptions. */ #define REDIS_SUBSCRIBED 0x20 +/* Flag that is set when monitor mode is active */ +#define REDIS_MONITORING 0x40 + #define REDIS_REPLY_STRING 1 #define REDIS_REPLY_ARRAY 2 #define REDIS_REPLY_INTEGER 3 @@ -83,6 +86,8 @@ #define REDIS_REPLY_STATUS 5 #define REDIS_REPLY_ERROR 6 +#define REDIS_READER_MAX_BUF (1024*16) /* Default max unused reader buffer. */ + #ifdef __cplusplus extern "C" { #endif @@ -122,6 +127,7 @@ typedef struct redisReader { char *buf; /* Read buffer */ size_t pos; /* Buffer cursor */ size_t len; /* Buffer length */ + size_t maxbuf; /* Max length of unused buffer */ redisReadTask rstack[9]; int ridx; /* Index of current read task */ diff --git a/deps/hiredis/net.c b/deps/hiredis/net.c index 158e1dd8..82ab2b46 100644 --- a/deps/hiredis/net.c +++ b/deps/hiredis/net.c @@ -45,6 +45,8 @@ #include #include #include +#include +#include #include "net.h" #include "sds.h" @@ -121,28 +123,38 @@ static int redisSetTcpNoDelay(redisContext *c, int fd) { return REDIS_OK; } +#define __MAX_MSEC (((LONG_MAX) - 999) / 1000) + static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) { - struct timeval to; - struct timeval *toptr = NULL; - fd_set wfd; + struct pollfd wfd[1]; + long msec; + + msec = -1; + wfd[0].fd = fd; + wfd[0].events = POLLOUT; /* Only use timeout when not NULL. */ if (timeout != NULL) { - to = *timeout; - toptr = &to; + if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) { + close(fd); + return REDIS_ERR; + } + + msec = (timeout->tv_sec * 1000) + ((timeout->tv_usec + 999) / 1000); + + if (msec < 0 || msec > INT_MAX) { + msec = INT_MAX; + } } if (errno == EINPROGRESS) { - FD_ZERO(&wfd); - FD_SET(fd, &wfd); + int res; - if (select(FD_SETSIZE, NULL, &wfd, NULL, toptr) == -1) { - __redisSetErrorFromErrno(c,REDIS_ERR_IO,"select(2)"); + if ((res = poll(wfd, 1, msec)) == -1) { + __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)"); close(fd); return REDIS_ERR; - } - - if (!FD_ISSET(fd, &wfd)) { + } else if (res == 0) { errno = ETIMEDOUT; __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); close(fd); -- 2.45.2