One or more spaces separates arguments, so you can use the specifiers
anywhere in an argument:
- reply = redisCommand("SET key:%s %s", myid, value);
+ reply = redisCommand(context, "SET key:%s %s", myid, value);
### Using replies
int redisReaderFeed(redisReader *reader, const char *buf, size_t len);
int redisReaderGetReply(redisReader *reader, void **reply);
+The same set of functions are used internally by hiredis when creating a
+normal Redis context, the above API just exposes it to the user for a direct
+usage.
+
### Usage
The function `redisReaderCreate` creates a `redisReader` structure that holds a
For example, [hiredis-rb](https://github.com/pietern/hiredis-rb/blob/master/ext/hiredis_ext/reader.c)
uses customized reply object functions to create Ruby objects.
+### Reader max buffer
+
+Both when using the Reader API directly or when using it indirectly via a
+normal Redis context, the redisReader structure uses a buffer in order to
+accumulate data from the server.
+Usually this buffer is destroyed when it is empty and is larger than 16
+kb in order to avoid wasting memory in unused buffers
+
+However when working with very big payloads destroying the buffer may slow
+down performances considerably, so it is possible to modify the max size of
+an idle buffer changing the value of the `maxbuf` field of the reader structure
+to the desired value. The special value of 0 means that there is no maximum
+value for an idle buffer, so the buffer will never get freed.
+
+For instance if you have a normal Redis context you can set the maximum idle
+buffer to zero (unlimited) just with:
+
+ context->reader->maxbuf = 0;
+
+This should be done only in order to maximize performances when working with
+large payloads. The context should be set back to `REDIS_READER_MAX_BUF` again
+as soon as possible in order to prevent allocation of useless memory.
+
## AUTHORS
Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and
__redisAsyncDisconnect(ac);
return;
}
+
+ /* If monitor mode, repush callback */
+ if(c->flags & REDIS_MONITORING) {
+ __redisPushCallback(&ac->replies,&cb);
+ }
/* When the connection is not being disconnected, simply stop
* trying to get replies and wait for the next loop tick. */
/* Even if the context is subscribed, pending regular callbacks will
* get a reply before pub/sub messages arrive. */
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
- /* A spontaneous reply in a not-subscribed context can only be the
- * error reply that is sent when a new connection exceeds the
- * maximum number of allowed connections on the server side. This
- * is seen as an error instead of a regular reply because the
- * server closes the connection after sending it. To prevent the
- * error from being overwritten by an EOF error the connection is
- * closed here. See issue #43. */
- if ( !(c->flags & REDIS_SUBSCRIBED) && ((redisReply*)reply)->type == REDIS_REPLY_ERROR ) {
+ /*
+ * A spontaneous reply in a not-subscribed context can be the error
+ * reply that is sent when a new connection exceeds the maximum
+ * number of allowed connections on the server side.
+ *
+ * This is seen as an error instead of a regular reply because the
+ * server closes the connection after sending it.
+ *
+ * To prevent the error from being overwritten by an EOF error the
+ * connection is closed here. See issue #43.
+ *
+ * Another possibility is that the server is loading its dataset.
+ * In this case we also want to close the connection, and have the
+ * user wait until the server is ready to take our request.
+ */
+ if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
c->err = REDIS_ERR_OTHER;
snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
__redisAsyncDisconnect(ac);
return;
}
- /* No more regular callbacks and no errors, the context *must* be subscribed. */
- assert(c->flags & REDIS_SUBSCRIBED);
- __redisGetSubscribeCallback(ac,reply,&cb);
+ /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
+ assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
+ if(c->flags & REDIS_SUBSCRIBED)
+ __redisGetSubscribeCallback(ac,reply,&cb);
}
if (cb.fn != NULL) {
/* (P)UNSUBSCRIBE does not have its own response: every channel or
* pattern that is unsubscribed will receive a message. This means we
* should not append a callback function for this command. */
+ } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
+ /* Set monitor flag and push callback */
+ c->flags |= REDIS_MONITORING;
+ __redisPushCallback(&ac->replies,&cb);
} else {
if (c->flags & REDIS_SUBSCRIBED)
/* This will likely result in an error reply, but it needs to be
long elements;
int root = 0;
- /* Set error for nested multi bulks with depth > 2 */
+ /* Set error for nested multi bulks with depth > 7 */
if (r->ridx == 8) {
__redisReaderSetError(r,REDIS_ERR_PROTOCOL,
"No support for nested multi bulk replies with depth > 7");
r->errstr[0] = '\0';
r->fn = &defaultFunctions;
r->buf = sdsempty();
+ r->maxbuf = REDIS_READER_MAX_BUF;
if (r->buf == NULL) {
free(r);
return NULL;
/* Copy the provided buffer. */
if (buf != NULL && len >= 1) {
-#if 0
/* Destroy internal buffer when it is empty and is quite large. */
- if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
+ if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) {
sdsfree(r->buf);
r->buf = sdsempty();
r->pos = 0;
/* r->buf should not be NULL since we just free'd a larger one. */
assert(r->buf != NULL);
}
-#endif
newbuf = sdscatlen(r->buf,buf,len);
if (newbuf == NULL) {
/* Flag that is set when the async context has one or more subscriptions. */
#define REDIS_SUBSCRIBED 0x20
+/* Flag that is set when monitor mode is active */
+#define REDIS_MONITORING 0x40
+
#define REDIS_REPLY_STRING 1
#define REDIS_REPLY_ARRAY 2
#define REDIS_REPLY_INTEGER 3
#define REDIS_REPLY_STATUS 5
#define REDIS_REPLY_ERROR 6
+#define REDIS_READER_MAX_BUF (1024*16) /* Default max unused reader buffer. */
+
#ifdef __cplusplus
extern "C" {
#endif
char *buf; /* Read buffer */
size_t pos; /* Buffer cursor */
size_t len; /* Buffer length */
+ size_t maxbuf; /* Max length of unused buffer */
redisReadTask rstack[9];
int ridx; /* Index of current read task */
#include <errno.h>
#include <stdarg.h>
#include <stdio.h>
+#include <poll.h>
+#include <limits.h>
#include "net.h"
#include "sds.h"
return REDIS_OK;
}
+#define __MAX_MSEC (((LONG_MAX) - 999) / 1000)
+
static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) {
- struct timeval to;
- struct timeval *toptr = NULL;
- fd_set wfd;
+ struct pollfd wfd[1];
+ long msec;
+
+ msec = -1;
+ wfd[0].fd = fd;
+ wfd[0].events = POLLOUT;
/* Only use timeout when not NULL. */
if (timeout != NULL) {
- to = *timeout;
- toptr = &to;
+ if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) {
+ close(fd);
+ return REDIS_ERR;
+ }
+
+ msec = (timeout->tv_sec * 1000) + ((timeout->tv_usec + 999) / 1000);
+
+ if (msec < 0 || msec > INT_MAX) {
+ msec = INT_MAX;
+ }
}
if (errno == EINPROGRESS) {
- FD_ZERO(&wfd);
- FD_SET(fd, &wfd);
+ int res;
- if (select(FD_SETSIZE, NULL, &wfd, NULL, toptr) == -1) {
- __redisSetErrorFromErrno(c,REDIS_ERR_IO,"select(2)");
+ if ((res = poll(wfd, 1, msec)) == -1) {
+ __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)");
close(fd);
return REDIS_ERR;
- }
-
- if (!FD_ISSET(fd, &wfd)) {
+ } else if (res == 0) {
errno = ETIMEDOUT;
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(fd);