]> git.saurik.com Git - redis.git/commitdiff
Merge pull request #628 from pietern/unstable-zip
authorSalvatore Sanfilippo <antirez@gmail.com>
Wed, 22 Aug 2012 09:32:27 +0000 (02:32 -0700)
committerSalvatore Sanfilippo <antirez@gmail.com>
Wed, 22 Aug 2012 09:32:27 +0000 (02:32 -0700)
Fix ziplist edge case

deps/hiredis/README.md
deps/hiredis/async.c
deps/hiredis/hiredis.c
deps/hiredis/hiredis.h
deps/hiredis/net.c
src/redis-benchmark.c

index a58101cc6e503bc4e14d44fc6b7e40408c641113..62fe1067b7cf2f20fba08ae5d98edb6e75bc6cdf 100644 (file)
@@ -73,7 +73,7 @@ convert it to the protocol used to communicate with Redis.
 One or more spaces separates arguments, so you can use the specifiers
 anywhere in an argument:
 
-    reply = redisCommand("SET key:%s %s", myid, value);
+    reply = redisCommand(context, "SET key:%s %s", myid, value);
 
 ### Using replies
 
@@ -320,6 +320,10 @@ The reply parsing API consists of the following functions:
     int redisReaderFeed(redisReader *reader, const char *buf, size_t len);
     int redisReaderGetReply(redisReader *reader, void **reply);
 
+The same set of functions are used internally by hiredis when creating a
+normal Redis context, the above API just exposes it to the user for a direct
+usage.
+
 ### Usage
 
 The function `redisReaderCreate` creates a `redisReader` structure that holds a
@@ -346,6 +350,29 @@ immediately after creating the `redisReader`.
 For example, [hiredis-rb](https://github.com/pietern/hiredis-rb/blob/master/ext/hiredis_ext/reader.c)
 uses customized reply object functions to create Ruby objects.
 
+### Reader max buffer
+
+Both when using the Reader API directly or when using it indirectly via a
+normal Redis context, the redisReader structure uses a buffer in order to
+accumulate data from the server.
+Usually this buffer is destroyed when it is empty and is larger than 16
+kb in order to avoid wasting memory in unused buffers
+
+However when working with very big payloads destroying the buffer may slow
+down performances considerably, so it is possible to modify the max size of
+an idle buffer changing the value of the `maxbuf` field of the reader structure
+to the desired value. The special value of 0 means that there is no maximum
+value for an idle buffer, so the buffer will never get freed.
+
+For instance if you have a normal Redis context you can set the maximum idle
+buffer to zero (unlimited) just with:
+
+    context->reader->maxbuf = 0;
+
+This should be done only in order to maximize performances when working with
+large payloads. The context should be set back to `REDIS_READER_MAX_BUF` again
+as soon as possible in order to prevent allocation of useless memory.
+
 ## AUTHORS
 
 Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and
index f83e2f51af03c77e5864e88a304339d244d02650..f65f8694ccfabd440e15393946deef1bbac27144 100644 (file)
@@ -372,6 +372,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
                 __redisAsyncDisconnect(ac);
                 return;
             }
+            
+            /* If monitor mode, repush callback */
+            if(c->flags & REDIS_MONITORING) {
+                __redisPushCallback(&ac->replies,&cb);
+            }
 
             /* When the connection is not being disconnected, simply stop
              * trying to get replies and wait for the next loop tick. */
@@ -381,22 +386,31 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
         /* Even if the context is subscribed, pending regular callbacks will
          * get a reply before pub/sub messages arrive. */
         if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
-            /* A spontaneous reply in a not-subscribed context can only be the
-             * error reply that is sent when a new connection exceeds the
-             * maximum number of allowed connections on the server side. This
-             * is seen as an error instead of a regular reply because the
-             * server closes the connection after sending it. To prevent the
-             * error from being overwritten by an EOF error the connection is
-             * closed here. See issue #43. */
-            if ( !(c->flags & REDIS_SUBSCRIBED) && ((redisReply*)reply)->type == REDIS_REPLY_ERROR ) {
+            /*
+             * A spontaneous reply in a not-subscribed context can be the error
+             * reply that is sent when a new connection exceeds the maximum
+             * number of allowed connections on the server side.
+             *
+             * This is seen as an error instead of a regular reply because the
+             * server closes the connection after sending it.
+             *
+             * To prevent the error from being overwritten by an EOF error the
+             * connection is closed here. See issue #43.
+             *
+             * Another possibility is that the server is loading its dataset.
+             * In this case we also want to close the connection, and have the
+             * user wait until the server is ready to take our request.
+             */
+            if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
                 c->err = REDIS_ERR_OTHER;
                 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
                 __redisAsyncDisconnect(ac);
                 return;
             }
-            /* No more regular callbacks and no errors, the context *must* be subscribed. */
-            assert(c->flags & REDIS_SUBSCRIBED);
-            __redisGetSubscribeCallback(ac,reply,&cb);
+            /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
+            assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
+            if(c->flags & REDIS_SUBSCRIBED)
+                __redisGetSubscribeCallback(ac,reply,&cb);
         }
 
         if (cb.fn != NULL) {
@@ -557,6 +571,10 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
         /* (P)UNSUBSCRIBE does not have its own response: every channel or
          * pattern that is unsubscribed will receive a message. This means we
          * should not append a callback function for this command. */
+     } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
+         /* Set monitor flag and push callback */
+         c->flags |= REDIS_MONITORING;
+         __redisPushCallback(&ac->replies,&cb);
     } else {
         if (c->flags & REDIS_SUBSCRIBED)
             /* This will likely result in an error reply, but it needs to be
index e6109db847c494086691614cc555038888ab6bf5..4709ee325309d39ee68a41159548ba7441359171 100644 (file)
@@ -446,7 +446,7 @@ static int processMultiBulkItem(redisReader *r) {
     long elements;
     int root = 0;
 
-    /* Set error for nested multi bulks with depth > 2 */
+    /* Set error for nested multi bulks with depth > 7 */
     if (r->ridx == 8) {
         __redisReaderSetError(r,REDIS_ERR_PROTOCOL,
             "No support for nested multi bulk replies with depth > 7");
@@ -564,6 +564,7 @@ redisReader *redisReaderCreate(void) {
     r->errstr[0] = '\0';
     r->fn = &defaultFunctions;
     r->buf = sdsempty();
+    r->maxbuf = REDIS_READER_MAX_BUF;
     if (r->buf == NULL) {
         free(r);
         return NULL;
@@ -590,9 +591,8 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
 
     /* Copy the provided buffer. */
     if (buf != NULL && len >= 1) {
-#if 0
         /* Destroy internal buffer when it is empty and is quite large. */
-        if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
+        if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) {
             sdsfree(r->buf);
             r->buf = sdsempty();
             r->pos = 0;
@@ -600,7 +600,6 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
             /* r->buf should not be NULL since we just free'd a larger one. */
             assert(r->buf != NULL);
         }
-#endif
 
         newbuf = sdscatlen(r->buf,buf,len);
         if (newbuf == NULL) {
index a73f50e957b916883e087b6067ee8b79cc1eec3c..b922831e3e23232d28c79419c28fbdaeb4a2eff5 100644 (file)
@@ -76,6 +76,9 @@
 /* Flag that is set when the async context has one or more subscriptions. */
 #define REDIS_SUBSCRIBED 0x20
 
+/* Flag that is set when monitor mode is active */
+#define REDIS_MONITORING 0x40
+
 #define REDIS_REPLY_STRING 1
 #define REDIS_REPLY_ARRAY 2
 #define REDIS_REPLY_INTEGER 3
@@ -83,6 +86,8 @@
 #define REDIS_REPLY_STATUS 5
 #define REDIS_REPLY_ERROR 6
 
+#define REDIS_READER_MAX_BUF (1024*16)  /* Default max unused reader buffer. */
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -122,6 +127,7 @@ typedef struct redisReader {
     char *buf; /* Read buffer */
     size_t pos; /* Buffer cursor */
     size_t len; /* Buffer length */
+    size_t maxbuf; /* Max length of unused buffer */
 
     redisReadTask rstack[9];
     int ridx; /* Index of current read task */
index 158e1dd8af8ce37885dc12699e1bd12c0fe1f588..82ab2b468c8ebfdd8e6ca2fb62bd65a9d56892c7 100644 (file)
@@ -45,6 +45,8 @@
 #include <errno.h>
 #include <stdarg.h>
 #include <stdio.h>
+#include <poll.h>
+#include <limits.h>
 
 #include "net.h"
 #include "sds.h"
@@ -121,28 +123,38 @@ static int redisSetTcpNoDelay(redisContext *c, int fd) {
     return REDIS_OK;
 }
 
+#define __MAX_MSEC (((LONG_MAX) - 999) / 1000)
+
 static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) {
-    struct timeval to;
-    struct timeval *toptr = NULL;
-    fd_set wfd;
+    struct pollfd   wfd[1];
+    long msec;
+
+    msec          = -1;
+    wfd[0].fd     = fd;
+    wfd[0].events = POLLOUT;
 
     /* Only use timeout when not NULL. */
     if (timeout != NULL) {
-        to = *timeout;
-        toptr = &to;
+        if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) {
+            close(fd);
+            return REDIS_ERR;
+        }
+
+        msec = (timeout->tv_sec * 1000) + ((timeout->tv_usec + 999) / 1000);
+
+        if (msec < 0 || msec > INT_MAX) {
+            msec = INT_MAX;
+        }
     }
 
     if (errno == EINPROGRESS) {
-        FD_ZERO(&wfd);
-        FD_SET(fd, &wfd);
+        int res;
 
-        if (select(FD_SETSIZE, NULL, &wfd, NULL, toptr) == -1) {
-            __redisSetErrorFromErrno(c,REDIS_ERR_IO,"select(2)");
+        if ((res = poll(wfd, 1, msec)) == -1) {
+            __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)");
             close(fd);
             return REDIS_ERR;
-        }
-
-        if (!FD_ISSET(fd, &wfd)) {
+        } else if (res == 0) {
             errno = ETIMEDOUT;
             __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
             close(fd);
index 19eb49152c749a00774dda1a3a4d9d5681024865..1be4c07d913dcee9937b90c2f1457e6d32d8f966 100644 (file)
@@ -263,6 +263,8 @@ static client createClient(char *cmd, size_t len) {
             fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr);
         exit(1);
     }
+    /* Suppress hiredis cleanup of unused buffers for max speed. */
+    c->context->reader->maxbuf = 0;
     /* Queue N requests accordingly to the pipeline size. */
     c->obuf = sdsempty();
     for (j = 0; j < config.pipeline; j++)