From 9703b1b3dc59aa76a314373fff748e26bdd0466f Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Tue, 19 Apr 2011 23:07:36 +0200 Subject: [PATCH] Update hiredis --- deps/hiredis/Makefile | 11 +- deps/hiredis/README.md | 4 +- deps/hiredis/async.c | 334 +++++++++++++++++++++++++++++++++------- deps/hiredis/async.h | 34 +++-- deps/hiredis/dict.c | 338 +++++++++++++++++++++++++++++++++++++++++ deps/hiredis/dict.h | 126 +++++++++++++++ deps/hiredis/example.c | 3 +- deps/hiredis/fmacros.h | 9 +- deps/hiredis/hiredis.c | 123 +++++++++------ deps/hiredis/hiredis.h | 18 ++- deps/hiredis/net.c | 118 +++++++++++--- deps/hiredis/net.h | 7 +- deps/hiredis/sds.c | 145 ++++++++++++++++-- deps/hiredis/sds.h | 13 +- deps/hiredis/test.c | 83 ++++++++-- 15 files changed, 1187 insertions(+), 179 deletions(-) create mode 100644 deps/hiredis/dict.c create mode 100644 deps/hiredis/dict.h diff --git a/deps/hiredis/Makefile b/deps/hiredis/Makefile index ca3404a4..2a84b9b3 100644 --- a/deps/hiredis/Makefile +++ b/deps/hiredis/Makefile @@ -15,8 +15,9 @@ ifeq ($(uname_S),SunOS) DYLIB_MAKE_CMD?=$(CC) -G -o ${DYLIBNAME} ${OBJ} STLIBNAME?=libhiredis.a STLIB_MAKE_CMD?=ar rcs ${STLIBNAME} ${OBJ} -else ifeq ($(uname_S),Darwin) - CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -fPIC -Wall -W -Wwrite-strings $(ARCH) $(PROF) +else +ifeq ($(uname_S),Darwin) + CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings $(ARCH) $(PROF) CCLINK?=-lm -pthread LDFLAGS?=-L. -Wl,-rpath,. OBJARCH?=-arch i386 -arch x86_64 @@ -25,7 +26,7 @@ else ifeq ($(uname_S),Darwin) STLIBNAME?=libhiredis.a STLIB_MAKE_CMD?=libtool -static -o ${STLIBNAME} - ${OBJ} else - CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -fPIC -Wall -W -Wwrite-strings $(ARCH) $(PROF) + CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings $(ARCH) $(PROF) CCLINK?=-lm -pthread LDFLAGS?=-L. -Wl,-rpath,. DYLIBNAME?=libhiredis.so @@ -33,6 +34,8 @@ else STLIBNAME?=libhiredis.a STLIB_MAKE_CMD?=ar rcs ${STLIBNAME} ${OBJ} endif +endif + CCOPT= $(CFLAGS) $(CCLINK) DEBUG?= -g -ggdb @@ -45,7 +48,7 @@ all: ${DYLIBNAME} ${BINS} # Deps (use make dep to generate this) net.o: net.c fmacros.h net.h -async.o: async.c async.h hiredis.h sds.h util.h +async.o: async.c async.h hiredis.h sds.h util.h dict.c dict.h example.o: example.c hiredis.h hiredis.o: hiredis.c hiredis.h net.h sds.h util.h sds.o: sds.c sds.h diff --git a/deps/hiredis/README.md b/deps/hiredis/README.md index e39ff0c1..5a77cd38 100644 --- a/deps/hiredis/README.md +++ b/deps/hiredis/README.md @@ -108,7 +108,7 @@ was received: * **`REDIS_REPLY_ARRAY`**: * A multi bulk reply. The number of elements in the multi bulk reply is stored in `reply->elements`. Every element in the multi bulk reply is a `redisReply` object as well - and can be accessed via `reply->elements[..index..]`. + and can be accessed via `reply->element[..index..]`. Redis may reply with nested arrays but this is fully supported. Replies should be freed using the `freeReplyObject()` function. @@ -171,7 +171,7 @@ the latter means an error occurred while reading a reply. Just as with the other the `err` field in the context can be used to find out what the cause of this error is. The following examples shows a simple pipeline (resulting in only a single call to `write(2)` and -a single call to `write(2)`): +a single call to `read(2)`): redisReply *reply; redisAppendCommand(context,"SET foo bar"); diff --git a/deps/hiredis/async.c b/deps/hiredis/async.c index 5c11243e..76c4cc3a 100644 --- a/deps/hiredis/async.c +++ b/deps/hiredis/async.c @@ -30,14 +30,58 @@ */ #include +#include #include +#include #include "async.h" +#include "dict.c" #include "sds.h" #include "util.h" /* Forward declaration of function in hiredis.c */ void __redisAppendCommand(redisContext *c, char *cmd, size_t len); +/* Functions managing dictionary of callbacks for pub/sub. */ +static unsigned int callbackHash(const void *key) { + return dictGenHashFunction((unsigned char*)key,sdslen((char*)key)); +} + +static void *callbackValDup(void *privdata, const void *src) { + ((void) privdata); + redisCallback *dup = malloc(sizeof(*dup)); + memcpy(dup,src,sizeof(*dup)); + return dup; +} + +static int callbackKeyCompare(void *privdata, const void *key1, const void *key2) { + int l1, l2; + ((void) privdata); + + l1 = sdslen((sds)key1); + l2 = sdslen((sds)key2); + if (l1 != l2) return 0; + return memcmp(key1,key2,l1) == 0; +} + +static void callbackKeyDestructor(void *privdata, void *key) { + ((void) privdata); + sdsfree((sds)key); +} + +static void callbackValDestructor(void *privdata, void *val) { + ((void) privdata); + free(val); +} + +static dictType callbackDict = { + callbackHash, + NULL, + callbackValDup, + callbackKeyCompare, + callbackKeyDestructor, + callbackValDestructor +}; + static redisAsyncContext *redisAsyncInitialize(redisContext *c) { redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext)); c = &(ac->c); @@ -50,19 +94,23 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { ac->err = 0; ac->errstr = NULL; ac->data = NULL; - ac->_adapter_data = NULL; - ac->evAddRead = NULL; - ac->evDelRead = NULL; - ac->evAddWrite = NULL; - ac->evDelWrite = NULL; - ac->evCleanup = NULL; + ac->ev.data = NULL; + ac->ev.addRead = NULL; + ac->ev.delRead = NULL; + ac->ev.addWrite = NULL; + ac->ev.delWrite = NULL; + ac->ev.cleanup = NULL; ac->onConnect = NULL; ac->onDisconnect = NULL; ac->replies.head = NULL; ac->replies.tail = NULL; + ac->sub.invalid.head = NULL; + ac->sub.invalid.tail = NULL; + ac->sub.channels = dictCreate(&callbackDict,NULL); + ac->sub.patterns = dictCreate(&callbackDict,NULL); return ac; } @@ -96,6 +144,11 @@ int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFun int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { if (ac->onConnect == NULL) { ac->onConnect = fn; + + /* The common way to detect an established connection is to wait for + * the first write event to be fired. This assumes the related event + * library functions are already set. */ + if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); return REDIS_OK; } return REDIS_ERR; @@ -114,11 +167,11 @@ static int __redisPushCallback(redisCallbackList *list, redisCallback *source) { redisCallback *cb; /* Copy callback from stack to heap */ - cb = calloc(1,sizeof(*cb)); + cb = malloc(sizeof(*cb)); if (!cb) redisOOM(); if (source != NULL) { - cb->fn = source->fn; - cb->privdata = source->privdata; + memcpy(cb,source,sizeof(*cb)); + cb->next = NULL; } /* Store callback in list */ @@ -146,51 +199,150 @@ static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) return REDIS_ERR; } -/* Tries to do a clean disconnect from Redis, meaning it stops new commands - * from being issued, but tries to flush the output buffer and execute - * callbacks for all remaining replies. - * - * This functions is generally called from within a callback, so the - * processCallbacks function will pick up the flag when there are no - * more replies. */ -void redisAsyncDisconnect(redisAsyncContext *ac) { +static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) { redisContext *c = &(ac->c); - c->flags |= REDIS_DISCONNECTING; + if (cb->fn != NULL) { + c->flags |= REDIS_IN_CALLBACK; + cb->fn(ac,reply,cb->privdata); + c->flags &= ~REDIS_IN_CALLBACK; + } +} + +/* Helper function to free the context. */ +static void __redisAsyncFree(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisCallback cb; + dictIterator *it; + dictEntry *de; + + /* Execute pending callbacks with NULL reply. */ + while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) + __redisRunCallback(ac,&cb,NULL); + + /* Execute callbacks for invalid commands */ + while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) + __redisRunCallback(ac,&cb,NULL); + + /* Run subscription callbacks callbacks with NULL reply */ + it = dictGetIterator(ac->sub.channels); + while ((de = dictNext(it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); + dictReleaseIterator(it); + dictRelease(ac->sub.channels); + + it = dictGetIterator(ac->sub.patterns); + while ((de = dictNext(it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); + dictReleaseIterator(it); + dictRelease(ac->sub.patterns); + + /* Signal event lib to clean up */ + if (ac->ev.cleanup) ac->ev.cleanup(ac->ev.data); + + /* Execute disconnect callback. When redisAsyncFree() initiated destroying + * this context, the status will always be REDIS_OK. */ + if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) { + if (c->flags & REDIS_FREEING) { + ac->onDisconnect(ac,REDIS_OK); + } else { + ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR); + } + } + + /* Cleanup self */ + redisFree(c); +} + +/* Free the async context. When this function is called from a callback, + * control needs to be returned to redisProcessCallbacks() before actual + * free'ing. To do so, a flag is set on the context which is picked up by + * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */ +void redisAsyncFree(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + c->flags |= REDIS_FREEING; + if (!(c->flags & REDIS_IN_CALLBACK)) + __redisAsyncFree(ac); } /* Helper function to make the disconnect happen and clean up. */ static void __redisAsyncDisconnect(redisAsyncContext *ac) { redisContext *c = &(ac->c); - redisCallback cb; - int status; /* Make sure error is accessible if there is any */ __redisAsyncCopyError(ac); - status = (ac->err == 0) ? REDIS_OK : REDIS_ERR; - if (status == REDIS_OK) { - /* When the connection is cleanly disconnected, there should not - * be pending callbacks. */ + if (ac->err == 0) { + /* For clean disconnects, there should be no pending callbacks. */ assert(__redisShiftCallback(&ac->replies,NULL) == REDIS_ERR); } else { - /* Callbacks should not be able to issue new commands. */ + /* Disconnection is caused by an error, make sure that pending + * callbacks cannot call new commands. */ c->flags |= REDIS_DISCONNECTING; - - /* Execute pending callbacks with NULL reply. */ - while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) { - if (cb.fn != NULL) - cb.fn(ac,NULL,cb.privdata); - } } - /* Signal event lib to clean up */ - if (ac->evCleanup) ac->evCleanup(ac->_adapter_data); + /* For non-clean disconnects, __redisAsyncFree() will execute pending + * callbacks with a NULL-reply. */ + __redisAsyncFree(ac); +} - /* Execute callback with proper status */ - if (ac->onDisconnect) ac->onDisconnect(ac,status); +/* Tries to do a clean disconnect from Redis, meaning it stops new commands + * from being issued, but tries to flush the output buffer and execute + * callbacks for all remaining replies. When this function is called from a + * callback, there might be more replies and we can safely defer disconnecting + * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately + * when there are no pending callbacks. */ +void redisAsyncDisconnect(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + c->flags |= REDIS_DISCONNECTING; + if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL) + __redisAsyncDisconnect(ac); +} - /* Cleanup self */ - redisFree(c); +static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { + redisContext *c = &(ac->c); + dict *callbacks; + dictEntry *de; + int pvariant; + char *stype; + sds sname; + + /* Custom reply functions are not supported for pub/sub. This will fail + * very hard when they are used... */ + if (reply->type == REDIS_REPLY_ARRAY) { + assert(reply->elements >= 2); + assert(reply->element[0]->type == REDIS_REPLY_STRING); + stype = reply->element[0]->str; + pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; + + if (pvariant) + callbacks = ac->sub.patterns; + else + callbacks = ac->sub.channels; + + /* Locate the right callback */ + assert(reply->element[1]->type == REDIS_REPLY_STRING); + sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); + de = dictFind(callbacks,sname); + if (de != NULL) { + memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb)); + + /* If this is an unsubscribe message, remove it. */ + if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { + dictDelete(callbacks,sname); + + /* If this was the last unsubscribe message, revert to + * non-subscribe mode. */ + assert(reply->element[2]->type == REDIS_REPLY_INTEGER); + if (reply->element[2]->integer == 0) + c->flags &= ~REDIS_SUBSCRIBED; + } + } + sdsfree(sname); + } else { + /* Shift callback for invalid commands. */ + __redisShiftCallback(&ac->sub.invalid,dstcb); + } + return REDIS_OK; } void redisProcessCallbacks(redisAsyncContext *ac) { @@ -213,11 +365,28 @@ void redisProcessCallbacks(redisAsyncContext *ac) { break; } - /* Shift callback and execute it */ - assert(__redisShiftCallback(&ac->replies,&cb) == REDIS_OK); + /* Even if the context is subscribed, pending regular callbacks will + * get a reply before pub/sub messages arrive. */ + if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { + /* No more regular callbacks, the context *must* be subscribed. */ + assert(c->flags & REDIS_SUBSCRIBED); + __redisGetSubscribeCallback(ac,reply,&cb); + } + if (cb.fn != NULL) { - cb.fn(ac,reply,cb.privdata); + __redisRunCallback(ac,&cb,reply); + c->fn->freeObject(reply); + + /* Proceed with free'ing when redisAsyncFree() was called. */ + if (c->flags & REDIS_FREEING) { + __redisAsyncFree(ac); + return; + } } else { + /* No callback for this reply. This can either be a NULL callback, + * or there were no callbacks to begin with. Either way, don't + * abort with an error, but simply ignore it because the client + * doesn't know what the server will spit out over the wire. */ c->fn->freeObject(reply); } } @@ -237,7 +406,7 @@ void redisAsyncHandleRead(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); } else { /* Always re-schedule reads */ - if (ac->evAddRead) ac->evAddRead(ac->_adapter_data); + if (ac->ev.addRead) ac->ev.addRead(ac->ev.data); redisProcessCallbacks(ac); } } @@ -251,13 +420,13 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { } else { /* Continue writing when not done, stop writing otherwise */ if (!done) { - if (ac->evAddWrite) ac->evAddWrite(ac->_adapter_data); + if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); } else { - if (ac->evDelWrite) ac->evDelWrite(ac->_adapter_data); + if (ac->ev.delWrite) ac->ev.delWrite(ac->ev.data); } /* Always schedule reads after writes */ - if (ac->evAddRead) ac->evAddRead(ac->_adapter_data); + if (ac->ev.addRead) ac->ev.addRead(ac->ev.data); /* Fire onConnect when this is the first write event. */ if (!(c->flags & REDIS_CONNECTED)) { @@ -267,26 +436,81 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { } } -/* Helper function for the redisAsyncCommand* family of functions. - * - * Write a formatted command to the output buffer and register the provided - * callback function with the context. - */ +/* Sets a pointer to the first argument and its length starting at p. Returns + * the number of bytes to skip to get to the following argument. */ +static char *nextArgument(char *start, char **str, size_t *len) { + char *p = start; + if (p[0] != '$') { + p = strchr(p,'$'); + if (p == NULL) return NULL; + } + + *len = (int)strtol(p+1,NULL,10); + p = strchr(p,'\r'); + assert(p); + *str = p+2; + return p+2+(*len)+2; +} + +/* Helper function for the redisAsyncCommand* family of functions. Writes a + * formatted command to the output buffer and registers the provided callback + * function with the context. */ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) { redisContext *c = &(ac->c); redisCallback cb; + int pvariant, hasnext; + char *cstr, *astr; + size_t clen, alen; + char *p; + sds sname; - /* Don't accept new commands when the connection is lazily closed. */ - if (c->flags & REDIS_DISCONNECTING) return REDIS_ERR; - __redisAppendCommand(c,cmd,len); + /* Don't accept new commands when the connection is about to be closed. */ + if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR; - /* Store callback */ + /* Setup callback */ cb.fn = fn; cb.privdata = privdata; - __redisPushCallback(&ac->replies,&cb); + + /* Find out which command will be appended. */ + p = nextArgument(cmd,&cstr,&clen); + assert(p != NULL); + hasnext = (p[0] == '$'); + pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0; + cstr += pvariant; + clen -= pvariant; + + if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) { + c->flags |= REDIS_SUBSCRIBED; + + /* Add every channel/pattern to the list of subscription callbacks. */ + while ((p = nextArgument(p,&astr,&alen)) != NULL) { + sname = sdsnewlen(astr,alen); + if (pvariant) + dictReplace(ac->sub.patterns,sname,&cb); + else + dictReplace(ac->sub.channels,sname,&cb); + } + } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) { + /* It is only useful to call (P)UNSUBSCRIBE when the context is + * subscribed to one or more channels or patterns. */ + if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR; + + /* (P)UNSUBSCRIBE does not have its own response: every channel or + * pattern that is unsubscribed will receive a message. This means we + * should not append a callback function for this command. */ + } else { + if (c->flags & REDIS_SUBSCRIBED) + /* This will likely result in an error reply, but it needs to be + * received and passed to the callback. */ + __redisPushCallback(&ac->sub.invalid,&cb); + else + __redisPushCallback(&ac->replies,&cb); + } + + __redisAppendCommand(c,cmd,len); /* Always schedule a write when the write buffer is non-empty */ - if (ac->evAddWrite) ac->evAddWrite(ac->_adapter_data); + if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); return REDIS_OK; } diff --git a/deps/hiredis/async.h b/deps/hiredis/async.h index 2ef0e21e..ba2b6f54 100644 --- a/deps/hiredis/async.h +++ b/deps/hiredis/async.h @@ -1,5 +1,7 @@ /* * Copyright (c) 2009-2010, Salvatore Sanfilippo + * Copyright (c) 2010, Pieter Noordhuis + * * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -36,6 +38,7 @@ extern "C" { #endif struct redisAsyncContext; /* need forward declaration of redisAsyncContext */ +struct dict; /* dictionary header is included in async.c */ /* Reply callback prototype and container */ typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*); @@ -66,16 +69,18 @@ typedef struct redisAsyncContext { /* Not used by hiredis */ void *data; - /* Used by the different event lib adapters to store their private data */ - void *_adapter_data; + /* Event library data and hooks */ + struct { + void *data; - /* Called when the library expects to start reading/writing. - * The supplied functions should be idempotent. */ - void (*evAddRead)(void *privdata); - void (*evDelRead)(void *privdata); - void (*evAddWrite)(void *privdata); - void (*evDelWrite)(void *privdata); - void (*evCleanup)(void *privdata); + /* Hooks that are called when the library expects to start + * reading/writing. These functions should be idempotent. */ + void (*addRead)(void *privdata); + void (*delRead)(void *privdata); + void (*addWrite)(void *privdata); + void (*delWrite)(void *privdata); + void (*cleanup)(void *privdata); + } ev; /* Called when either the connection is terminated due to an error or per * user request. The status is set accordingly (REDIS_OK, REDIS_ERR). */ @@ -84,16 +89,25 @@ typedef struct redisAsyncContext { /* Called when the first write event was received. */ redisConnectCallback *onConnect; - /* Reply callbacks */ + /* Regular command callbacks */ redisCallbackList replies; + + /* Subscription callbacks */ + struct { + redisCallbackList invalid; + struct dict *channels; + struct dict *patterns; + } sub; } redisAsyncContext; /* Functions that proxy to hiredis */ redisAsyncContext *redisAsyncConnect(const char *ip, int port); +redisAsyncContext *redisAsyncConnectUnix(const char *path); int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn); int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); void redisAsyncDisconnect(redisAsyncContext *ac); +void redisAsyncFree(redisAsyncContext *ac); /* Handle read/write events */ void redisAsyncHandleRead(redisAsyncContext *ac); diff --git a/deps/hiredis/dict.c b/deps/hiredis/dict.c new file mode 100644 index 00000000..79b1041c --- /dev/null +++ b/deps/hiredis/dict.c @@ -0,0 +1,338 @@ +/* Hash table implementation. + * + * This file implements in memory hash tables with insert/del/replace/find/ + * get-random-element operations. Hash tables will auto resize if needed + * tables of power of two in size are used, collisions are handled by + * chaining. See the source code for more information... :) + * + * Copyright (c) 2006-2010, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "fmacros.h" +#include +#include +#include +#include "dict.h" + +/* -------------------------- private prototypes ---------------------------- */ + +static int _dictExpandIfNeeded(dict *ht); +static unsigned long _dictNextPower(unsigned long size); +static int _dictKeyIndex(dict *ht, const void *key); +static int _dictInit(dict *ht, dictType *type, void *privDataPtr); + +/* -------------------------- hash functions -------------------------------- */ + +/* Generic hash function (a popular one from Bernstein). + * I tested a few and this was the best. */ +static unsigned int dictGenHashFunction(const unsigned char *buf, int len) { + unsigned int hash = 5381; + + while (len--) + hash = ((hash << 5) + hash) + (*buf++); /* hash * 33 + c */ + return hash; +} + +/* ----------------------------- API implementation ------------------------- */ + +/* Reset an hashtable already initialized with ht_init(). + * NOTE: This function should only called by ht_destroy(). */ +static void _dictReset(dict *ht) { + ht->table = NULL; + ht->size = 0; + ht->sizemask = 0; + ht->used = 0; +} + +/* Create a new hash table */ +static dict *dictCreate(dictType *type, void *privDataPtr) { + dict *ht = malloc(sizeof(*ht)); + _dictInit(ht,type,privDataPtr); + return ht; +} + +/* Initialize the hash table */ +static int _dictInit(dict *ht, dictType *type, void *privDataPtr) { + _dictReset(ht); + ht->type = type; + ht->privdata = privDataPtr; + return DICT_OK; +} + +/* Expand or create the hashtable */ +static int dictExpand(dict *ht, unsigned long size) { + dict n; /* the new hashtable */ + unsigned long realsize = _dictNextPower(size), i; + + /* the size is invalid if it is smaller than the number of + * elements already inside the hashtable */ + if (ht->used > size) + return DICT_ERR; + + _dictInit(&n, ht->type, ht->privdata); + n.size = realsize; + n.sizemask = realsize-1; + n.table = calloc(realsize,sizeof(dictEntry*)); + + /* Copy all the elements from the old to the new table: + * note that if the old hash table is empty ht->size is zero, + * so dictExpand just creates an hash table. */ + n.used = ht->used; + for (i = 0; i < ht->size && ht->used > 0; i++) { + dictEntry *he, *nextHe; + + if (ht->table[i] == NULL) continue; + + /* For each hash entry on this slot... */ + he = ht->table[i]; + while(he) { + unsigned int h; + + nextHe = he->next; + /* Get the new element index */ + h = dictHashKey(ht, he->key) & n.sizemask; + he->next = n.table[h]; + n.table[h] = he; + ht->used--; + /* Pass to the next element */ + he = nextHe; + } + } + assert(ht->used == 0); + free(ht->table); + + /* Remap the new hashtable in the old */ + *ht = n; + return DICT_OK; +} + +/* Add an element to the target hash table */ +static int dictAdd(dict *ht, void *key, void *val) { + int index; + dictEntry *entry; + + /* Get the index of the new element, or -1 if + * the element already exists. */ + if ((index = _dictKeyIndex(ht, key)) == -1) + return DICT_ERR; + + /* Allocates the memory and stores key */ + entry = malloc(sizeof(*entry)); + entry->next = ht->table[index]; + ht->table[index] = entry; + + /* Set the hash entry fields. */ + dictSetHashKey(ht, entry, key); + dictSetHashVal(ht, entry, val); + ht->used++; + return DICT_OK; +} + +/* Add an element, discarding the old if the key already exists. + * Return 1 if the key was added from scratch, 0 if there was already an + * element with such key and dictReplace() just performed a value update + * operation. */ +static int dictReplace(dict *ht, void *key, void *val) { + dictEntry *entry, auxentry; + + /* Try to add the element. If the key + * does not exists dictAdd will suceed. */ + if (dictAdd(ht, key, val) == DICT_OK) + return 1; + /* It already exists, get the entry */ + entry = dictFind(ht, key); + /* Free the old value and set the new one */ + /* Set the new value and free the old one. Note that it is important + * to do that in this order, as the value may just be exactly the same + * as the previous one. In this context, think to reference counting, + * you want to increment (set), and then decrement (free), and not the + * reverse. */ + auxentry = *entry; + dictSetHashVal(ht, entry, val); + dictFreeEntryVal(ht, &auxentry); + return 0; +} + +/* Search and remove an element */ +static int dictDelete(dict *ht, const void *key) { + unsigned int h; + dictEntry *de, *prevde; + + if (ht->size == 0) + return DICT_ERR; + h = dictHashKey(ht, key) & ht->sizemask; + de = ht->table[h]; + + prevde = NULL; + while(de) { + if (dictCompareHashKeys(ht,key,de->key)) { + /* Unlink the element from the list */ + if (prevde) + prevde->next = de->next; + else + ht->table[h] = de->next; + + dictFreeEntryKey(ht,de); + dictFreeEntryVal(ht,de); + free(de); + ht->used--; + return DICT_OK; + } + prevde = de; + de = de->next; + } + return DICT_ERR; /* not found */ +} + +/* Destroy an entire hash table */ +static int _dictClear(dict *ht) { + unsigned long i; + + /* Free all the elements */ + for (i = 0; i < ht->size && ht->used > 0; i++) { + dictEntry *he, *nextHe; + + if ((he = ht->table[i]) == NULL) continue; + while(he) { + nextHe = he->next; + dictFreeEntryKey(ht, he); + dictFreeEntryVal(ht, he); + free(he); + ht->used--; + he = nextHe; + } + } + /* Free the table and the allocated cache structure */ + free(ht->table); + /* Re-initialize the table */ + _dictReset(ht); + return DICT_OK; /* never fails */ +} + +/* Clear & Release the hash table */ +static void dictRelease(dict *ht) { + _dictClear(ht); + free(ht); +} + +static dictEntry *dictFind(dict *ht, const void *key) { + dictEntry *he; + unsigned int h; + + if (ht->size == 0) return NULL; + h = dictHashKey(ht, key) & ht->sizemask; + he = ht->table[h]; + while(he) { + if (dictCompareHashKeys(ht, key, he->key)) + return he; + he = he->next; + } + return NULL; +} + +static dictIterator *dictGetIterator(dict *ht) { + dictIterator *iter = malloc(sizeof(*iter)); + + iter->ht = ht; + iter->index = -1; + iter->entry = NULL; + iter->nextEntry = NULL; + return iter; +} + +static dictEntry *dictNext(dictIterator *iter) { + while (1) { + if (iter->entry == NULL) { + iter->index++; + if (iter->index >= + (signed)iter->ht->size) break; + iter->entry = iter->ht->table[iter->index]; + } else { + iter->entry = iter->nextEntry; + } + if (iter->entry) { + /* We need to save the 'next' here, the iterator user + * may delete the entry we are returning. */ + iter->nextEntry = iter->entry->next; + return iter->entry; + } + } + return NULL; +} + +static void dictReleaseIterator(dictIterator *iter) { + free(iter); +} + +/* ------------------------- private functions ------------------------------ */ + +/* Expand the hash table if needed */ +static int _dictExpandIfNeeded(dict *ht) { + /* If the hash table is empty expand it to the intial size, + * if the table is "full" dobule its size. */ + if (ht->size == 0) + return dictExpand(ht, DICT_HT_INITIAL_SIZE); + if (ht->used == ht->size) + return dictExpand(ht, ht->size*2); + return DICT_OK; +} + +/* Our hash table capability is a power of two */ +static unsigned long _dictNextPower(unsigned long size) { + unsigned long i = DICT_HT_INITIAL_SIZE; + + if (size >= LONG_MAX) return LONG_MAX; + while(1) { + if (i >= size) + return i; + i *= 2; + } +} + +/* Returns the index of a free slot that can be populated with + * an hash entry for the given 'key'. + * If the key already exists, -1 is returned. */ +static int _dictKeyIndex(dict *ht, const void *key) { + unsigned int h; + dictEntry *he; + + /* Expand the hashtable if needed */ + if (_dictExpandIfNeeded(ht) == DICT_ERR) + return -1; + /* Compute the key hash value */ + h = dictHashKey(ht, key) & ht->sizemask; + /* Search if this slot does not already contain the given key */ + he = ht->table[h]; + while(he) { + if (dictCompareHashKeys(ht, key, he->key)) + return -1; + he = he->next; + } + return h; +} + diff --git a/deps/hiredis/dict.h b/deps/hiredis/dict.h new file mode 100644 index 00000000..95fcd280 --- /dev/null +++ b/deps/hiredis/dict.h @@ -0,0 +1,126 @@ +/* Hash table implementation. + * + * This file implements in memory hash tables with insert/del/replace/find/ + * get-random-element operations. Hash tables will auto resize if needed + * tables of power of two in size are used, collisions are handled by + * chaining. See the source code for more information... :) + * + * Copyright (c) 2006-2010, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __DICT_H +#define __DICT_H + +#define DICT_OK 0 +#define DICT_ERR 1 + +/* Unused arguments generate annoying warnings... */ +#define DICT_NOTUSED(V) ((void) V) + +typedef struct dictEntry { + void *key; + void *val; + struct dictEntry *next; +} dictEntry; + +typedef struct dictType { + unsigned int (*hashFunction)(const void *key); + void *(*keyDup)(void *privdata, const void *key); + void *(*valDup)(void *privdata, const void *obj); + int (*keyCompare)(void *privdata, const void *key1, const void *key2); + void (*keyDestructor)(void *privdata, void *key); + void (*valDestructor)(void *privdata, void *obj); +} dictType; + +typedef struct dict { + dictEntry **table; + dictType *type; + unsigned long size; + unsigned long sizemask; + unsigned long used; + void *privdata; +} dict; + +typedef struct dictIterator { + dict *ht; + int index; + dictEntry *entry, *nextEntry; +} dictIterator; + +/* This is the initial size of every hash table */ +#define DICT_HT_INITIAL_SIZE 4 + +/* ------------------------------- Macros ------------------------------------*/ +#define dictFreeEntryVal(ht, entry) \ + if ((ht)->type->valDestructor) \ + (ht)->type->valDestructor((ht)->privdata, (entry)->val) + +#define dictSetHashVal(ht, entry, _val_) do { \ + if ((ht)->type->valDup) \ + entry->val = (ht)->type->valDup((ht)->privdata, _val_); \ + else \ + entry->val = (_val_); \ +} while(0) + +#define dictFreeEntryKey(ht, entry) \ + if ((ht)->type->keyDestructor) \ + (ht)->type->keyDestructor((ht)->privdata, (entry)->key) + +#define dictSetHashKey(ht, entry, _key_) do { \ + if ((ht)->type->keyDup) \ + entry->key = (ht)->type->keyDup((ht)->privdata, _key_); \ + else \ + entry->key = (_key_); \ +} while(0) + +#define dictCompareHashKeys(ht, key1, key2) \ + (((ht)->type->keyCompare) ? \ + (ht)->type->keyCompare((ht)->privdata, key1, key2) : \ + (key1) == (key2)) + +#define dictHashKey(ht, key) (ht)->type->hashFunction(key) + +#define dictGetEntryKey(he) ((he)->key) +#define dictGetEntryVal(he) ((he)->val) +#define dictSlots(ht) ((ht)->size) +#define dictSize(ht) ((ht)->used) + +/* API */ +static unsigned int dictGenHashFunction(const unsigned char *buf, int len); +static dict *dictCreate(dictType *type, void *privDataPtr); +static int dictExpand(dict *ht, unsigned long size); +static int dictAdd(dict *ht, void *key, void *val); +static int dictReplace(dict *ht, void *key, void *val); +static int dictDelete(dict *ht, const void *key); +static void dictRelease(dict *ht); +static dictEntry * dictFind(dict *ht, const void *key); +static dictIterator *dictGetIterator(dict *ht); +static dictEntry *dictNext(dictIterator *iter); +static void dictReleaseIterator(dictIterator *iter); + +#endif /* __DICT_H */ diff --git a/deps/hiredis/example.c b/deps/hiredis/example.c index 2506f39c..90ff9ed5 100644 --- a/deps/hiredis/example.c +++ b/deps/hiredis/example.c @@ -9,7 +9,8 @@ int main(void) { redisContext *c; redisReply *reply; - c = redisConnect((char*)"127.0.0.1", 6379); + struct timeval timeout = { 1, 500000 }; // 1.5 seconds + c = redisConnectWithTimeout((char*)"127.0.0.2", 6379, timeout); if (c->err) { printf("Connection error: %s\n", c->errstr); exit(1); diff --git a/deps/hiredis/fmacros.h b/deps/hiredis/fmacros.h index 38f46482..65f9692c 100644 --- a/deps/hiredis/fmacros.h +++ b/deps/hiredis/fmacros.h @@ -1,7 +1,9 @@ -#ifndef _REDIS_FMACRO_H -#define _REDIS_FMACRO_H +#ifndef __HIREDIS_FMACRO_H +#define __HIREDIS_FMACRO_H +#ifndef _BSD_SOURCE #define _BSD_SOURCE +#endif #ifdef __linux__ #define _XOPEN_SOURCE 700 @@ -9,7 +11,4 @@ #define _XOPEN_SOURCE #endif -#define _LARGEFILE_SOURCE -#define _FILE_OFFSET_BITS 64 - #endif diff --git a/deps/hiredis/hiredis.c b/deps/hiredis/hiredis.c index d4cad7c2..f2135bac 100644 --- a/deps/hiredis/hiredis.c +++ b/deps/hiredis/hiredis.c @@ -271,14 +271,17 @@ static int processLineItem(redisReader *r) { int len; if ((p = readLine(r,&len)) != NULL) { - if (r->fn) { - if (cur->type == REDIS_REPLY_INTEGER) { + if (cur->type == REDIS_REPLY_INTEGER) { + if (r->fn && r->fn->createInteger) obj = r->fn->createInteger(cur,readLongLong(p)); - } else { - obj = r->fn->createString(cur,p,len); - } + else + obj = (void*)REDIS_REPLY_INTEGER; } else { - obj = (void*)(size_t)(cur->type); + /* Type will be error or status. */ + if (r->fn && r->fn->createString) + obj = r->fn->createString(cur,p,len); + else + obj = (void*)(size_t)(cur->type); } /* Set reply if this is the root object. */ @@ -306,15 +309,19 @@ static int processBulkItem(redisReader *r) { if (len < 0) { /* The nil object can always be created. */ - obj = r->fn ? r->fn->createNil(cur) : - (void*)REDIS_REPLY_NIL; + if (r->fn && r->fn->createNil) + obj = r->fn->createNil(cur); + else + obj = (void*)REDIS_REPLY_NIL; success = 1; } else { /* Only continue when the buffer contains the entire bulk item. */ bytelen += len+2; /* include \r\n */ if (r->pos+bytelen <= r->len) { - obj = r->fn ? r->fn->createString(cur,s+2,len) : - (void*)REDIS_REPLY_STRING; + if (r->fn && r->fn->createString) + obj = r->fn->createString(cur,s+2,len); + else + obj = (void*)REDIS_REPLY_STRING; success = 1; } } @@ -351,12 +358,16 @@ static int processMultiBulkItem(redisReader *r) { root = (r->ridx == 0); if (elements == -1) { - obj = r->fn ? r->fn->createNil(cur) : - (void*)REDIS_REPLY_NIL; + if (r->fn && r->fn->createNil) + obj = r->fn->createNil(cur); + else + obj = (void*)REDIS_REPLY_NIL; moveToNextTask(r); } else { - obj = r->fn ? r->fn->createArray(cur,elements) : - (void*)REDIS_REPLY_ARRAY; + if (r->fn && r->fn->createArray) + obj = r->fn->createArray(cur,elements); + else + obj = (void*)REDIS_REPLY_ARRAY; /* Modify task stack when there are more than 0 elements. */ if (elements > 0) { @@ -434,7 +445,7 @@ static int processItem(redisReader *r) { } } -void *redisReplyReaderCreate() { +void *redisReplyReaderCreate(void) { redisReader *r = calloc(sizeof(redisReader),1); r->error = NULL; r->fn = &defaultFunctions; @@ -493,7 +504,7 @@ static void redisSetReplyReaderError(redisReader *r, sds err) { if (r->buf != NULL) { sdsfree(r->buf); r->buf = sdsempty(); - r->pos = 0; + r->pos = r->len = 0; } r->ridx = -1; r->error = err; @@ -504,11 +515,18 @@ char *redisReplyReaderGetError(void *reader) { return r->error; } -void redisReplyReaderFeed(void *reader, char *buf, size_t len) { +void redisReplyReaderFeed(void *reader, const char *buf, size_t len) { redisReader *r = reader; /* Copy the provided buffer. */ if (buf != NULL && len >= 1) { + /* Destroy internal buffer when it is empty and is quite large. */ + if (r->len == 0 && sdsavail(r->buf) > 16*1024) { + sdsfree(r->buf); + r->buf = sdsempty(); + r->pos = 0; + } + r->buf = sdscatlen(r->buf,buf,len); r->len = sdslen(r->buf); } @@ -538,15 +556,10 @@ int redisReplyReaderGetReply(void *reader, void **reply) { if (processItem(r) < 0) break; - /* Discard the consumed part of the buffer. */ - if (r->pos > 0) { - if (r->pos == r->len) { - /* sdsrange has a quirck on this edge case. */ - sdsfree(r->buf); - r->buf = sdsempty(); - } else { - r->buf = sdsrange(r->buf,r->pos,r->len); - } + /* Discard part of the buffer when we've consumed at least 1k, to avoid + * doing unnecessary calls to memmove() in sds.c. */ + if (r->pos >= 1024) { + r->buf = sdsrange(r->buf,r->pos,-1); r->pos = 0; r->len = sdslen(r->buf); } @@ -556,13 +569,6 @@ int redisReplyReaderGetReply(void *reader, void **reply) { void *aux = r->reply; r->reply = NULL; - /* Destroy the buffer when it is empty and is quite large. */ - if (r->len == 0 && sdsavail(r->buf) > 16*1024) { - sdsfree(r->buf); - r->buf = sdsempty(); - r->pos = 0; - } - /* Check if there actually *is* a reply. */ if (r->error != NULL) { return REDIS_ERR; @@ -601,7 +607,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { char *cmd = NULL; /* final command */ int pos; /* position in final command */ sds current; /* current argument */ - int interpolated = 0; /* did we do interpolation on an argument? */ + int touched = 0; /* was the current argument touched? */ char **argv = NULL; int argc = 0, j; int totlen = 0; @@ -615,13 +621,14 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { while(*c != '\0') { if (*c != '%' || c[1] == '\0') { if (*c == ' ') { - if (sdslen(current) != 0) { + if (touched) { addArgument(current, &argv, &argc, &totlen); current = sdsempty(); - interpolated = 0; + touched = 0; } } else { current = sdscatlen(current,c,1); + touched = 1; } } else { switch(c[1]) { @@ -630,14 +637,12 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { size = strlen(arg); if (size > 0) current = sdscatlen(current,arg,size); - interpolated = 1; break; case 'b': arg = va_arg(ap,char*); size = va_arg(ap,size_t); if (size > 0) current = sdscatlen(current,arg,size); - interpolated = 1; break; case '%': current = sdscat(current,"%"); @@ -683,7 +688,6 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { _format[_l] = '\0'; va_copy(_cpy,ap); current = sdscatvprintf(current,_format,_cpy); - interpolated = 1; va_end(_cpy); /* Update current position (note: outer blocks @@ -696,13 +700,14 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { va_arg(ap,void); } } + touched = 1; c++; } c++; } /* Add the last argument if needed */ - if (interpolated || sdslen(current) != 0) { + if (touched) { addArgument(current, &argv, &argc, &totlen); } else { sdsfree(current); @@ -798,7 +803,7 @@ void __redisSetError(redisContext *c, int type, const sds errstr) { } } -static redisContext *redisContextInit() { +static redisContext *redisContextInit(void) { redisContext *c = calloc(sizeof(redisContext),1); c->err = 0; c->errstr = NULL; @@ -809,8 +814,7 @@ static redisContext *redisContextInit() { } void redisFree(redisContext *c) { - /* Disconnect before free'ing if not yet disconnected. */ - if (c->flags & REDIS_CONNECTED) + if (c->fd > 0) close(c->fd); if (c->errstr != NULL) sdsfree(c->errstr); @@ -827,31 +831,52 @@ void redisFree(redisContext *c) { redisContext *redisConnect(const char *ip, int port) { redisContext *c = redisContextInit(); c->flags |= REDIS_BLOCK; - redisContextConnectTcp(c,ip,port); + redisContextConnectTcp(c,ip,port,NULL); + return c; +} + +redisContext *redisConnectWithTimeout(const char *ip, int port, struct timeval tv) { + redisContext *c = redisContextInit(); + c->flags |= REDIS_BLOCK; + redisContextConnectTcp(c,ip,port,&tv); return c; } redisContext *redisConnectNonBlock(const char *ip, int port) { redisContext *c = redisContextInit(); c->flags &= ~REDIS_BLOCK; - redisContextConnectTcp(c,ip,port); + redisContextConnectTcp(c,ip,port,NULL); return c; } redisContext *redisConnectUnix(const char *path) { redisContext *c = redisContextInit(); c->flags |= REDIS_BLOCK; - redisContextConnectUnix(c,path); + redisContextConnectUnix(c,path,NULL); + return c; +} + +redisContext *redisConnectUnixWithTimeout(const char *path, struct timeval tv) { + redisContext *c = redisContextInit(); + c->flags |= REDIS_BLOCK; + redisContextConnectUnix(c,path,&tv); return c; } redisContext *redisConnectUnixNonBlock(const char *path) { redisContext *c = redisContextInit(); c->flags &= ~REDIS_BLOCK; - redisContextConnectUnix(c,path); + redisContextConnectUnix(c,path,NULL); return c; } +/* Set read/write timeout on a blocking socket. */ +int redisSetTimeout(redisContext *c, struct timeval tv) { + if (c->flags & REDIS_BLOCK) + return redisContextSetTimeout(c,tv); + return REDIS_ERR; +} + /* Set the replyObjectFunctions to use. Returns REDIS_ERR when the reader * was already initialized and the function set could not be re-set. * Return REDIS_OK when they could be set. */ @@ -879,7 +904,7 @@ int redisBufferRead(redisContext *c) { char buf[2048]; int nread = read(c->fd,buf,sizeof(buf)); if (nread == -1) { - if (errno == EAGAIN) { + if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) { /* Try again later */ } else { __redisSetError(c,REDIS_ERR_IO,NULL); @@ -910,7 +935,7 @@ int redisBufferWrite(redisContext *c, int *done) { if (sdslen(c->obuf) > 0) { nwritten = write(c->fd,c->obuf,sdslen(c->obuf)); if (nwritten == -1) { - if (errno == EAGAIN) { + if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) { /* Try again later */ } else { __redisSetError(c,REDIS_ERR_IO,NULL); diff --git a/deps/hiredis/hiredis.h b/deps/hiredis/hiredis.h index 1412a344..f4452091 100644 --- a/deps/hiredis/hiredis.h +++ b/deps/hiredis/hiredis.h @@ -33,6 +33,7 @@ #define __HIREDIS_H #include /* for size_t */ #include /* for va_list */ +#include /* for struct timeval */ #define HIREDIS_MAJOR 0 #define HIREDIS_MINOR 9 @@ -64,6 +65,16 @@ * should be terminated once all replies have been read. */ #define REDIS_DISCONNECTING 0x4 +/* Flag specific to the async API which means that the context should be clean + * up as soon as possible. */ +#define REDIS_FREEING 0x8 + +/* Flag that is set when an async callback is executed. */ +#define REDIS_IN_CALLBACK 0x10 + +/* Flag that is set when the async context has one or more subscriptions. */ +#define REDIS_SUBSCRIBED 0x20 + #define REDIS_REPLY_STRING 1 #define REDIS_REPLY_ARRAY 2 #define REDIS_REPLY_INTEGER 3 @@ -118,13 +129,13 @@ typedef struct redisContext { } redisContext; void freeReplyObject(void *reply); -void *redisReplyReaderCreate(); +void *redisReplyReaderCreate(void); int redisReplyReaderSetReplyObjectFunctions(void *reader, redisReplyObjectFunctions *fn); int redisReplyReaderSetPrivdata(void *reader, void *privdata); void *redisReplyReaderGetObject(void *reader); char *redisReplyReaderGetError(void *reader); void redisReplyReaderFree(void *ptr); -void redisReplyReaderFeed(void *reader, char *buf, size_t len); +void redisReplyReaderFeed(void *reader, const char *buf, size_t len); int redisReplyReaderGetReply(void *reader, void **reply); /* Functions to format a command according to the protocol. */ @@ -133,9 +144,12 @@ int redisFormatCommand(char **target, const char *format, ...); int redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen); redisContext *redisConnect(const char *ip, int port); +redisContext *redisConnectWithTimeout(const char *ip, int port, struct timeval tv); redisContext *redisConnectNonBlock(const char *ip, int port); redisContext *redisConnectUnix(const char *path); +redisContext *redisConnectUnixWithTimeout(const char *path, struct timeval tv); redisContext *redisConnectUnixNonBlock(const char *path); +int redisSetTimeout(redisContext *c, struct timeval tv); int redisSetReplyObjectFunctions(redisContext *c, redisReplyObjectFunctions *fn); void redisFree(redisContext *c); int redisBufferRead(redisContext *c); diff --git a/deps/hiredis/net.c b/deps/hiredis/net.c index 88171461..438a129b 100644 --- a/deps/hiredis/net.c +++ b/deps/hiredis/net.c @@ -4,6 +4,7 @@ * Copyright (c) 2010, Pieter Noordhuis * * All rights reserved. + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * @@ -32,6 +33,7 @@ #include "fmacros.h" #include #include +#include #include #include #include @@ -66,7 +68,7 @@ static int redisCreateSocket(redisContext *c, int type) { return s; } -static int redisSetNonBlock(redisContext *c, int fd) { +static int redisSetBlocking(redisContext *c, int fd, int blocking) { int flags; /* Set the socket nonblocking. @@ -78,9 +80,15 @@ static int redisSetNonBlock(redisContext *c, int fd) { close(fd); return REDIS_ERR; } - if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { + + if (blocking) + flags &= ~O_NONBLOCK; + else + flags |= O_NONBLOCK; + + if (fcntl(fd, F_SETFL, flags) == -1) { __redisSetError(c,REDIS_ERR_IO, - sdscatprintf(sdsempty(), "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno))); + sdscatprintf(sdsempty(), "fcntl(F_SETFL): %s", strerror(errno))); close(fd); return REDIS_ERR; } @@ -92,19 +100,89 @@ static int redisSetTcpNoDelay(redisContext *c, int fd) { if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) == -1) { __redisSetError(c,REDIS_ERR_IO, sdscatprintf(sdsempty(), "setsockopt(TCP_NODELAY): %s", strerror(errno))); + close(fd); return REDIS_ERR; } return REDIS_OK; } -int redisContextConnectTcp(redisContext *c, const char *addr, int port) { +static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) { + struct timeval to; + struct timeval *toptr = NULL; + fd_set wfd; + int err; + socklen_t errlen; + + /* Only use timeout when not NULL. */ + if (timeout != NULL) { + to = *timeout; + toptr = &to; + } + + if (errno == EINPROGRESS) { + FD_ZERO(&wfd); + FD_SET(fd, &wfd); + + if (select(FD_SETSIZE, NULL, &wfd, NULL, toptr) == -1) { + __redisSetError(c,REDIS_ERR_IO, + sdscatprintf(sdsempty(), "select(2): %s", strerror(errno))); + close(fd); + return REDIS_ERR; + } + + if (!FD_ISSET(fd, &wfd)) { + errno = ETIMEDOUT; + __redisSetError(c,REDIS_ERR_IO,NULL); + close(fd); + return REDIS_ERR; + } + + err = 0; + errlen = sizeof(err); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) { + __redisSetError(c,REDIS_ERR_IO, + sdscatprintf(sdsempty(), "getsockopt(SO_ERROR): %s", strerror(errno))); + close(fd); + return REDIS_ERR; + } + + if (err) { + errno = err; + __redisSetError(c,REDIS_ERR_IO,NULL); + close(fd); + return REDIS_ERR; + } + + return REDIS_OK; + } + + __redisSetError(c,REDIS_ERR_IO,NULL); + close(fd); + return REDIS_ERR; +} + +int redisContextSetTimeout(redisContext *c, struct timeval tv) { + if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)) == -1) { + __redisSetError(c,REDIS_ERR_IO, + sdscatprintf(sdsempty(), "setsockopt(SO_RCVTIMEO): %s", strerror(errno))); + return REDIS_ERR; + } + if (setsockopt(c->fd,SOL_SOCKET,SO_SNDTIMEO,&tv,sizeof(tv)) == -1) { + __redisSetError(c,REDIS_ERR_IO, + sdscatprintf(sdsempty(), "setsockopt(SO_SNDTIMEO): %s", strerror(errno))); + return REDIS_ERR; + } + return REDIS_OK; +} + +int redisContextConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout) { int s; int blocking = (c->flags & REDIS_BLOCK); struct sockaddr_in sa; - if ((s = redisCreateSocket(c,AF_INET)) == REDIS_ERR) + if ((s = redisCreateSocket(c,AF_INET)) < 0) return REDIS_ERR; - if (!blocking && redisSetNonBlock(c,s) == REDIS_ERR) + if (redisSetBlocking(c,s,0) != REDIS_OK) return REDIS_ERR; sa.sin_family = AF_INET; @@ -126,30 +204,31 @@ int redisContextConnectTcp(redisContext *c, const char *addr, int port) { if (errno == EINPROGRESS && !blocking) { /* This is ok. */ } else { - __redisSetError(c,REDIS_ERR_IO,NULL); - close(s); - return REDIS_ERR; + if (redisContextWaitReady(c,s,timeout) != REDIS_OK) + return REDIS_ERR; } } - if (redisSetTcpNoDelay(c,s) != REDIS_OK) { - close(s); + /* Reset socket to be blocking after connect(2). */ + if (blocking && redisSetBlocking(c,s,1) != REDIS_OK) + return REDIS_ERR; + + if (redisSetTcpNoDelay(c,s) != REDIS_OK) return REDIS_ERR; - } c->fd = s; c->flags |= REDIS_CONNECTED; return REDIS_OK; } -int redisContextConnectUnix(redisContext *c, const char *path) { +int redisContextConnectUnix(redisContext *c, const char *path, struct timeval *timeout) { int s; int blocking = (c->flags & REDIS_BLOCK); struct sockaddr_un sa; - if ((s = redisCreateSocket(c,AF_LOCAL)) == REDIS_ERR) + if ((s = redisCreateSocket(c,AF_LOCAL)) < 0) return REDIS_ERR; - if (!blocking && redisSetNonBlock(c,s) != REDIS_OK) + if (redisSetBlocking(c,s,0) != REDIS_OK) return REDIS_ERR; sa.sun_family = AF_LOCAL; @@ -158,12 +237,15 @@ int redisContextConnectUnix(redisContext *c, const char *path) { if (errno == EINPROGRESS && !blocking) { /* This is ok. */ } else { - __redisSetError(c,REDIS_ERR_IO,NULL); - close(s); - return REDIS_ERR; + if (redisContextWaitReady(c,s,timeout) != REDIS_OK) + return REDIS_ERR; } } + /* Reset socket to be blocking after connect(2). */ + if (blocking && redisSetBlocking(c,s,1) != REDIS_OK) + return REDIS_ERR; + c->fd = s; c->flags |= REDIS_CONNECTED; return REDIS_OK; diff --git a/deps/hiredis/net.h b/deps/hiredis/net.h index b052d97f..e149ceba 100644 --- a/deps/hiredis/net.h +++ b/deps/hiredis/net.h @@ -1,6 +1,8 @@ /* Extracted from anet.c to work properly with Hiredis error reporting. * * Copyright (c) 2006-2010, Salvatore Sanfilippo + * Copyright (c) 2010, Pieter Noordhuis + * * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,7 +39,8 @@ #define AF_LOCAL AF_UNIX #endif -int redisContextConnectTcp(redisContext *c, const char *addr, int port); -int redisContextConnectUnix(redisContext *c, const char *path); +int redisContextSetTimeout(redisContext *c, struct timeval tv); +int redisContextConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout); +int redisContextConnectUnix(redisContext *c, const char *path, struct timeval *timeout); #endif diff --git a/deps/hiredis/sds.c b/deps/hiredis/sds.c index e290705a..2bd5a81c 100644 --- a/deps/hiredis/sds.c +++ b/deps/hiredis/sds.c @@ -30,11 +30,11 @@ #define SDS_ABORT_ON_OOM -#include "sds.h" #include #include #include #include +#include "sds.h" static void sdsOomAbort(void) { fprintf(stderr,"SDS: Out Of Memory (SDS_ABORT_ON_OOM defined)\n"); @@ -69,11 +69,6 @@ sds sdsnew(const char *init) { return sdsnewlen(init, initlen); } -size_t sdslen(const sds s) { - struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr))); - return sh->len; -} - sds sdsdup(const sds s) { return sdsnewlen(s, sdslen(s)); } @@ -83,11 +78,6 @@ void sdsfree(sds s) { free(s-sizeof(struct sdshdr)); } -size_t sdsavail(sds s) { - struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr))); - return sh->free; -} - void sdsupdatelen(sds s) { struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr))); int reallen = strlen(s); @@ -115,6 +105,25 @@ static sds sdsMakeRoomFor(sds s, size_t addlen) { return newsh->buf; } +/* Grow the sds to have the specified length. Bytes that were not part of + * the original length of the sds will be set to zero. */ +sds sdsgrowzero(sds s, size_t len) { + struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr))); + size_t totlen, curlen = sh->len; + + if (len <= curlen) return s; + s = sdsMakeRoomFor(s,len-curlen); + if (s == NULL) return NULL; + + /* Make sure added region doesn't contain garbage */ + sh = (void*)(s-(sizeof(struct sdshdr))); + memset(s+curlen,0,(len-curlen+1)); /* also set trailing \0 byte */ + totlen = sh->len+sh->free; + sh->len = len; + sh->free = totlen-sh->len; + return s; +} + sds sdscatlen(sds s, const void *t, size_t len) { struct sdshdr *sh; size_t curlen = sdslen(s); @@ -222,13 +231,16 @@ sds sdsrange(sds s, int start, int end) { } newlen = (start > end) ? 0 : (end-start)+1; if (newlen != 0) { - if (start >= (signed)len) start = len-1; - if (end >= (signed)len) end = len-1; - newlen = (start > end) ? 0 : (end-start)+1; + if (start >= (signed)len) { + newlen = 0; + } else if (end >= (signed)len) { + end = len-1; + newlen = (start > end) ? 0 : (end-start)+1; + } } else { start = 0; } - if (start != 0) memmove(sh->buf, sh->buf+start, newlen); + if (start && newlen) memmove(sh->buf, sh->buf+start, newlen); sh->buf[newlen] = 0; sh->free = sh->free+(sh->len-newlen); sh->len = newlen; @@ -477,3 +489,106 @@ err: if (current) sdsfree(current); return NULL; } + +#ifdef SDS_TEST_MAIN +#include + +int __failed_tests = 0; +int __test_num = 0; +#define test_cond(descr,_c) do { \ + __test_num++; printf("%d - %s: ", __test_num, descr); \ + if(_c) printf("PASSED\n"); else {printf("FAILED\n"); __failed_tests++;} \ +} while(0); +#define test_report() do { \ + printf("%d tests, %d passed, %d failed\n", __test_num, \ + __test_num-__failed_tests, __failed_tests); \ + if (__failed_tests) { \ + printf("=== WARNING === We have failed tests here...\n"); \ + } \ +} while(0); + +int main(void) { + { + sds x = sdsnew("foo"), y; + + test_cond("Create a string and obtain the length", + sdslen(x) == 3 && memcmp(x,"foo\0",4) == 0) + + sdsfree(x); + x = sdsnewlen("foo",2); + test_cond("Create a string with specified length", + sdslen(x) == 2 && memcmp(x,"fo\0",3) == 0) + + x = sdscat(x,"bar"); + test_cond("Strings concatenation", + sdslen(x) == 5 && memcmp(x,"fobar\0",6) == 0); + + x = sdscpy(x,"a"); + test_cond("sdscpy() against an originally longer string", + sdslen(x) == 1 && memcmp(x,"a\0",2) == 0) + + x = sdscpy(x,"xyzxxxxxxxxxxyyyyyyyyyykkkkkkkkkk"); + test_cond("sdscpy() against an originally shorter string", + sdslen(x) == 33 && + memcmp(x,"xyzxxxxxxxxxxyyyyyyyyyykkkkkkkkkk\0",33) == 0) + + sdsfree(x); + x = sdscatprintf(sdsempty(),"%d",123); + test_cond("sdscatprintf() seems working in the base case", + sdslen(x) == 3 && memcmp(x,"123\0",4) ==0) + + sdsfree(x); + x = sdstrim(sdsnew("xxciaoyyy"),"xy"); + test_cond("sdstrim() correctly trims characters", + sdslen(x) == 4 && memcmp(x,"ciao\0",5) == 0) + + y = sdsrange(sdsdup(x),1,1); + test_cond("sdsrange(...,1,1)", + sdslen(y) == 1 && memcmp(y,"i\0",2) == 0) + + sdsfree(y); + y = sdsrange(sdsdup(x),1,-1); + test_cond("sdsrange(...,1,-1)", + sdslen(y) == 3 && memcmp(y,"iao\0",4) == 0) + + sdsfree(y); + y = sdsrange(sdsdup(x),-2,-1); + test_cond("sdsrange(...,-2,-1)", + sdslen(y) == 2 && memcmp(y,"ao\0",3) == 0) + + sdsfree(y); + y = sdsrange(sdsdup(x),2,1); + test_cond("sdsrange(...,2,1)", + sdslen(y) == 0 && memcmp(y,"\0",1) == 0) + + sdsfree(y); + y = sdsrange(sdsdup(x),1,100); + test_cond("sdsrange(...,1,100)", + sdslen(y) == 3 && memcmp(y,"iao\0",4) == 0) + + sdsfree(y); + y = sdsrange(sdsdup(x),100,100); + test_cond("sdsrange(...,100,100)", + sdslen(y) == 0 && memcmp(y,"\0",1) == 0) + + sdsfree(y); + sdsfree(x); + x = sdsnew("foo"); + y = sdsnew("foa"); + test_cond("sdscmp(foo,foa)", sdscmp(x,y) > 0) + + sdsfree(y); + sdsfree(x); + x = sdsnew("bar"); + y = sdsnew("bar"); + test_cond("sdscmp(bar,bar)", sdscmp(x,y) == 0) + + sdsfree(y); + sdsfree(x); + x = sdsnew("aar"); + y = sdsnew("bar"); + test_cond("sdscmp(bar,bar)", sdscmp(x,y) < 0) + } + test_report() +} +#endif diff --git a/deps/hiredis/sds.h b/deps/hiredis/sds.h index 2c3fb52b..94f5871f 100644 --- a/deps/hiredis/sds.h +++ b/deps/hiredis/sds.h @@ -42,13 +42,24 @@ struct sdshdr { char buf[]; }; +static inline size_t sdslen(const sds s) { + struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr))); + return sh->len; +} + +static inline size_t sdsavail(const sds s) { + struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr))); + return sh->free; +} + sds sdsnewlen(const void *init, size_t initlen); sds sdsnew(const char *init); -sds sdsempty(); +sds sdsempty(void); size_t sdslen(const sds s); sds sdsdup(const sds s); void sdsfree(sds s); size_t sdsavail(sds s); +sds sdsgrowzero(sds s, size_t len); sds sdscatlen(sds s, const void *t, size_t len); sds sdscat(sds s, const char *t); sds sdscpylen(sds s, char *t, size_t len); diff --git a/deps/hiredis/test.c b/deps/hiredis/test.c index ed355a73..5724a3ea 100644 --- a/deps/hiredis/test.c +++ b/deps/hiredis/test.c @@ -6,6 +6,7 @@ #include #include #include +#include #include "hiredis.h" @@ -31,7 +32,7 @@ static void __connect(redisContext **target) { } } -static void test_format_commands() { +static void test_format_commands(void) { char *cmd; int len; @@ -53,6 +54,12 @@ static void test_format_commands() { len == 4+4+(3+2)+4+(3+2)+4+(0+2)); free(cmd); + test("Format command with an empty string in between proper interpolations: "); + len = redisFormatCommand(&cmd,"SET %s %s","","foo"); + test_cond(strncmp(cmd,"*3\r\n$3\r\nSET\r\n$0\r\n\r\n$3\r\nfoo\r\n",len) == 0 && + len == 4+4+(3+2)+4+(0+2)+4+(3+2)); + free(cmd); + test("Format command with %%b string interpolation: "); len = redisFormatCommand(&cmd,"SET %b %b","foo",3,"b\0r",3); test_cond(strncmp(cmd,"*3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$3\r\nb\0r\r\n",len) == 0 && @@ -115,7 +122,7 @@ static void test_format_commands() { free(cmd); } -static void test_blocking_connection() { +static void test_blocking_connection(void) { redisContext *c; redisReply *reply; int major, minor; @@ -246,13 +253,21 @@ static void test_blocking_connection() { * conditions, the error will be set to EOF. */ assert(c->err == REDIS_ERR_EOF && strcmp(c->errstr,"Server closed the connection") == 0); + redisFree(c); - /* Clean up context and reconnect again */ + __connect(&c); + test("Returns I/O error on socket timeout: "); + struct timeval tv = { 0, 1000 }; + assert(redisSetTimeout(c,tv) == REDIS_OK); + test_cond(redisGetReply(c,(void**)&reply) == REDIS_ERR && + c->err == REDIS_ERR_IO && errno == EAGAIN); redisFree(c); + + /* Context should be connected */ __connect(&c); } -static void test_reply_reader() { +static void test_reply_reader(void) { void *reader; void *reply; char *err; @@ -309,10 +324,19 @@ static void test_reply_reader() { ret = redisReplyReaderGetReply(reader,&reply); test_cond(ret == REDIS_OK && reply == (void*)REDIS_REPLY_STATUS); redisReplyReaderFree(reader); + + test("Properly reset state after protocol error: "); + reader = redisReplyReaderCreate(); + redisReplyReaderSetReplyObjectFunctions(reader,NULL); + redisReplyReaderFeed(reader,(char*)"x",1); + ret = redisReplyReaderGetReply(reader,&reply); + assert(ret == REDIS_ERR); + ret = redisReplyReaderGetReply(reader,&reply); + test_cond(ret == REDIS_OK && reply == NULL) } -static void test_throughput() { - int i; +static void test_throughput(void) { + int i, num; long long t1, t2; redisContext *c = blocking_context; redisReply **replies; @@ -321,31 +345,60 @@ static void test_throughput() { for (i = 0; i < 500; i++) freeReplyObject(redisCommand(c,"LPUSH mylist foo")); - replies = malloc(sizeof(redisReply*)*1000); + num = 1000; + replies = malloc(sizeof(redisReply*)*num); t1 = usec(); - for (i = 0; i < 1000; i++) { + for (i = 0; i < num; i++) { replies[i] = redisCommand(c,"PING"); assert(replies[i] != NULL && replies[i]->type == REDIS_REPLY_STATUS); } t2 = usec(); - for (i = 0; i < 1000; i++) freeReplyObject(replies[i]); + for (i = 0; i < num; i++) freeReplyObject(replies[i]); free(replies); - printf("\t(1000x PING: %.2fs)\n", (t2-t1)/1000000.0); + printf("\t(%dx PING: %.3fs)\n", num, (t2-t1)/1000000.0); - replies = malloc(sizeof(redisReply*)*1000); + replies = malloc(sizeof(redisReply*)*num); t1 = usec(); - for (i = 0; i < 1000; i++) { + for (i = 0; i < num; i++) { replies[i] = redisCommand(c,"LRANGE mylist 0 499"); assert(replies[i] != NULL && replies[i]->type == REDIS_REPLY_ARRAY); assert(replies[i] != NULL && replies[i]->elements == 500); } t2 = usec(); - for (i = 0; i < 1000; i++) freeReplyObject(replies[i]); + for (i = 0; i < num; i++) freeReplyObject(replies[i]); + free(replies); + printf("\t(%dx LRANGE with 500 elements: %.3fs)\n", num, (t2-t1)/1000000.0); + + num = 10000; + replies = malloc(sizeof(redisReply*)*num); + for (i = 0; i < num; i++) + redisAppendCommand(c,"PING"); + t1 = usec(); + for (i = 0; i < num; i++) { + assert(redisGetReply(c, (void*)&replies[i]) == REDIS_OK); + assert(replies[i] != NULL && replies[i]->type == REDIS_REPLY_STATUS); + } + t2 = usec(); + for (i = 0; i < num; i++) freeReplyObject(replies[i]); + free(replies); + printf("\t(%dx PING (pipelined): %.3fs)\n", num, (t2-t1)/1000000.0); + + replies = malloc(sizeof(redisReply*)*num); + for (i = 0; i < num; i++) + redisAppendCommand(c,"LRANGE mylist 0 499"); + t1 = usec(); + for (i = 0; i < num; i++) { + assert(redisGetReply(c, (void*)&replies[i]) == REDIS_OK); + assert(replies[i] != NULL && replies[i]->type == REDIS_REPLY_ARRAY); + assert(replies[i] != NULL && replies[i]->elements == 500); + } + t2 = usec(); + for (i = 0; i < num; i++) freeReplyObject(replies[i]); free(replies); - printf("\t(1000x LRANGE with 500 elements: %.2fs)\n", (t2-t1)/1000000.0); + printf("\t(%dx LRANGE with 500 elements (pipelined): %.3fs)\n", num, (t2-t1)/1000000.0); } -static void cleanup() { +static void cleanup(void) { redisContext *c = blocking_context; redisReply *reply; -- 2.45.2