]> git.saurik.com Git - redis.git/commitdiff
hiredis library updated.
authorantirez <antirez@gmail.com>
Tue, 21 Aug 2012 15:27:01 +0000 (17:27 +0200)
committerantirez <antirez@gmail.com>
Wed, 22 Aug 2012 09:33:57 +0000 (11:33 +0200)
This version of hiredis merges modifications of the Redis fork with
latest changes in the hiredis repository.

The same version was pushed on the hiredis repository and will probably
merged into the master branch in short time.

deps/hiredis/README.md
deps/hiredis/async.c
deps/hiredis/hiredis.c
deps/hiredis/hiredis.h
deps/hiredis/net.c

index a58101cc6e503bc4e14d44fc6b7e40408c641113..62fe1067b7cf2f20fba08ae5d98edb6e75bc6cdf 100644 (file)
@@ -73,7 +73,7 @@ convert it to the protocol used to communicate with Redis.
 One or more spaces separates arguments, so you can use the specifiers
 anywhere in an argument:
 
-    reply = redisCommand("SET key:%s %s", myid, value);
+    reply = redisCommand(context, "SET key:%s %s", myid, value);
 
 ### Using replies
 
@@ -320,6 +320,10 @@ The reply parsing API consists of the following functions:
     int redisReaderFeed(redisReader *reader, const char *buf, size_t len);
     int redisReaderGetReply(redisReader *reader, void **reply);
 
+The same set of functions are used internally by hiredis when creating a
+normal Redis context, the above API just exposes it to the user for a direct
+usage.
+
 ### Usage
 
 The function `redisReaderCreate` creates a `redisReader` structure that holds a
@@ -346,6 +350,29 @@ immediately after creating the `redisReader`.
 For example, [hiredis-rb](https://github.com/pietern/hiredis-rb/blob/master/ext/hiredis_ext/reader.c)
 uses customized reply object functions to create Ruby objects.
 
+### Reader max buffer
+
+Both when using the Reader API directly or when using it indirectly via a
+normal Redis context, the redisReader structure uses a buffer in order to
+accumulate data from the server.
+Usually this buffer is destroyed when it is empty and is larger than 16
+kb in order to avoid wasting memory in unused buffers
+
+However when working with very big payloads destroying the buffer may slow
+down performances considerably, so it is possible to modify the max size of
+an idle buffer changing the value of the `maxbuf` field of the reader structure
+to the desired value. The special value of 0 means that there is no maximum
+value for an idle buffer, so the buffer will never get freed.
+
+For instance if you have a normal Redis context you can set the maximum idle
+buffer to zero (unlimited) just with:
+
+    context->reader->maxbuf = 0;
+
+This should be done only in order to maximize performances when working with
+large payloads. The context should be set back to `REDIS_READER_MAX_BUF` again
+as soon as possible in order to prevent allocation of useless memory.
+
 ## AUTHORS
 
 Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and
index f83e2f51af03c77e5864e88a304339d244d02650..f65f8694ccfabd440e15393946deef1bbac27144 100644 (file)
@@ -372,6 +372,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
                 __redisAsyncDisconnect(ac);
                 return;
             }
+            
+            /* If monitor mode, repush callback */
+            if(c->flags & REDIS_MONITORING) {
+                __redisPushCallback(&ac->replies,&cb);
+            }
 
             /* When the connection is not being disconnected, simply stop
              * trying to get replies and wait for the next loop tick. */
@@ -381,22 +386,31 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
         /* Even if the context is subscribed, pending regular callbacks will
          * get a reply before pub/sub messages arrive. */
         if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
-            /* A spontaneous reply in a not-subscribed context can only be the
-             * error reply that is sent when a new connection exceeds the
-             * maximum number of allowed connections on the server side. This
-             * is seen as an error instead of a regular reply because the
-             * server closes the connection after sending it. To prevent the
-             * error from being overwritten by an EOF error the connection is
-             * closed here. See issue #43. */
-            if ( !(c->flags & REDIS_SUBSCRIBED) && ((redisReply*)reply)->type == REDIS_REPLY_ERROR ) {
+            /*
+             * A spontaneous reply in a not-subscribed context can be the error
+             * reply that is sent when a new connection exceeds the maximum
+             * number of allowed connections on the server side.
+             *
+             * This is seen as an error instead of a regular reply because the
+             * server closes the connection after sending it.
+             *
+             * To prevent the error from being overwritten by an EOF error the
+             * connection is closed here. See issue #43.
+             *
+             * Another possibility is that the server is loading its dataset.
+             * In this case we also want to close the connection, and have the
+             * user wait until the server is ready to take our request.
+             */
+            if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
                 c->err = REDIS_ERR_OTHER;
                 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
                 __redisAsyncDisconnect(ac);
                 return;
             }
-            /* No more regular callbacks and no errors, the context *must* be subscribed. */
-            assert(c->flags & REDIS_SUBSCRIBED);
-            __redisGetSubscribeCallback(ac,reply,&cb);
+            /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
+            assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
+            if(c->flags & REDIS_SUBSCRIBED)
+                __redisGetSubscribeCallback(ac,reply,&cb);
         }
 
         if (cb.fn != NULL) {
@@ -557,6 +571,10 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
         /* (P)UNSUBSCRIBE does not have its own response: every channel or
          * pattern that is unsubscribed will receive a message. This means we
          * should not append a callback function for this command. */
+     } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
+         /* Set monitor flag and push callback */
+         c->flags |= REDIS_MONITORING;
+         __redisPushCallback(&ac->replies,&cb);
     } else {
         if (c->flags & REDIS_SUBSCRIBED)
             /* This will likely result in an error reply, but it needs to be
index e6109db847c494086691614cc555038888ab6bf5..4709ee325309d39ee68a41159548ba7441359171 100644 (file)
@@ -446,7 +446,7 @@ static int processMultiBulkItem(redisReader *r) {
     long elements;
     int root = 0;
 
-    /* Set error for nested multi bulks with depth > 2 */
+    /* Set error for nested multi bulks with depth > 7 */
     if (r->ridx == 8) {
         __redisReaderSetError(r,REDIS_ERR_PROTOCOL,
             "No support for nested multi bulk replies with depth > 7");
@@ -564,6 +564,7 @@ redisReader *redisReaderCreate(void) {
     r->errstr[0] = '\0';
     r->fn = &defaultFunctions;
     r->buf = sdsempty();
+    r->maxbuf = REDIS_READER_MAX_BUF;
     if (r->buf == NULL) {
         free(r);
         return NULL;
@@ -590,9 +591,8 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
 
     /* Copy the provided buffer. */
     if (buf != NULL && len >= 1) {
-#if 0
         /* Destroy internal buffer when it is empty and is quite large. */
-        if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
+        if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) {
             sdsfree(r->buf);
             r->buf = sdsempty();
             r->pos = 0;
@@ -600,7 +600,6 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
             /* r->buf should not be NULL since we just free'd a larger one. */
             assert(r->buf != NULL);
         }
-#endif
 
         newbuf = sdscatlen(r->buf,buf,len);
         if (newbuf == NULL) {
index a73f50e957b916883e087b6067ee8b79cc1eec3c..b922831e3e23232d28c79419c28fbdaeb4a2eff5 100644 (file)
@@ -76,6 +76,9 @@
 /* Flag that is set when the async context has one or more subscriptions. */
 #define REDIS_SUBSCRIBED 0x20
 
+/* Flag that is set when monitor mode is active */
+#define REDIS_MONITORING 0x40
+
 #define REDIS_REPLY_STRING 1
 #define REDIS_REPLY_ARRAY 2
 #define REDIS_REPLY_INTEGER 3
@@ -83,6 +86,8 @@
 #define REDIS_REPLY_STATUS 5
 #define REDIS_REPLY_ERROR 6
 
+#define REDIS_READER_MAX_BUF (1024*16)  /* Default max unused reader buffer. */
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -122,6 +127,7 @@ typedef struct redisReader {
     char *buf; /* Read buffer */
     size_t pos; /* Buffer cursor */
     size_t len; /* Buffer length */
+    size_t maxbuf; /* Max length of unused buffer */
 
     redisReadTask rstack[9];
     int ridx; /* Index of current read task */
index 158e1dd8af8ce37885dc12699e1bd12c0fe1f588..82ab2b468c8ebfdd8e6ca2fb62bd65a9d56892c7 100644 (file)
@@ -45,6 +45,8 @@
 #include <errno.h>
 #include <stdarg.h>
 #include <stdio.h>
+#include <poll.h>
+#include <limits.h>
 
 #include "net.h"
 #include "sds.h"
@@ -121,28 +123,38 @@ static int redisSetTcpNoDelay(redisContext *c, int fd) {
     return REDIS_OK;
 }
 
+#define __MAX_MSEC (((LONG_MAX) - 999) / 1000)
+
 static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) {
-    struct timeval to;
-    struct timeval *toptr = NULL;
-    fd_set wfd;
+    struct pollfd   wfd[1];
+    long msec;
+
+    msec          = -1;
+    wfd[0].fd     = fd;
+    wfd[0].events = POLLOUT;
 
     /* Only use timeout when not NULL. */
     if (timeout != NULL) {
-        to = *timeout;
-        toptr = &to;
+        if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) {
+            close(fd);
+            return REDIS_ERR;
+        }
+
+        msec = (timeout->tv_sec * 1000) + ((timeout->tv_usec + 999) / 1000);
+
+        if (msec < 0 || msec > INT_MAX) {
+            msec = INT_MAX;
+        }
     }
 
     if (errno == EINPROGRESS) {
-        FD_ZERO(&wfd);
-        FD_SET(fd, &wfd);
+        int res;
 
-        if (select(FD_SETSIZE, NULL, &wfd, NULL, toptr) == -1) {
-            __redisSetErrorFromErrno(c,REDIS_ERR_IO,"select(2)");
+        if ((res = poll(wfd, 1, msec)) == -1) {
+            __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)");
             close(fd);
             return REDIS_ERR;
-        }
-
-        if (!FD_ISSET(fd, &wfd)) {
+        } else if (res == 0) {
             errno = ETIMEDOUT;
             __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
             close(fd);