]> git.saurik.com Git - redis.git/blobdiff - deps/hiredis/hiredis.c
Better implementation for BRPOP/BLPOP in the non blocking case.
[redis.git] / deps / hiredis / hiredis.c
index f0d78a0dac06c3466a762674d75442624f6580bc..e6109db847c494086691614cc555038888ab6bf5 100644 (file)
@@ -1,5 +1,7 @@
 /*
- * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
+ * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
+ * Copyright (c) 2010-2011, Pieter Noordhuis <pcnoordhuis at gmail dot com>
+ *
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
+#include "fmacros.h"
 #include <string.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <assert.h>
 #include <errno.h>
+#include <ctype.h>
 
 #include "hiredis.h"
 #include "net.h"
 #include "sds.h"
-#include "util.h"
-
-typedef struct redisReader {
-    struct redisReplyObjectFunctions *fn;
-    sds error; /* holds optional error */
-    void *reply; /* holds temporary reply */
-
-    sds buf; /* read buffer */
-    unsigned int pos; /* buffer cursor */
-
-    redisReadTask rstack[3]; /* stack of read tasks */
-    int ridx; /* index of stack */
-} redisReader;
 
 static redisReply *createReplyObject(int type);
 static void *createStringObject(const redisReadTask *task, char *str, size_t len);
 static void *createArrayObject(const redisReadTask *task, int elements);
 static void *createIntegerObject(const redisReadTask *task, long long value);
 static void *createNilObject(const redisReadTask *task);
-static void redisSetReplyReaderError(redisReader *r, sds err);
 
-/* Default set of functions to build the reply. */
+/* Default set of functions to build the reply. Keep in mind that such a
+ * function returning NULL is interpreted as OOM. */
 static redisReplyObjectFunctions defaultFunctions = {
     createStringObject,
     createArrayObject,
@@ -68,9 +59,11 @@ static redisReplyObjectFunctions defaultFunctions = {
 
 /* Create a reply object */
 static redisReply *createReplyObject(int type) {
-    redisReply *r = calloc(sizeof(*r),1);
+    redisReply *r = calloc(1,sizeof(*r));
+
+    if (r == NULL)
+        return NULL;
 
-    if (!r) redisOOM();
     r->type = type;
     return r;
 }
@@ -84,11 +77,16 @@ void freeReplyObject(void *reply) {
     case REDIS_REPLY_INTEGER:
         break; /* Nothing to free */
     case REDIS_REPLY_ARRAY:
-        for (j = 0; j < r->elements; j++)
-            if (r->element[j]) freeReplyObject(r->element[j]);
-        free(r->element);
+        if (r->element != NULL) {
+            for (j = 0; j < r->elements; j++)
+                if (r->element[j] != NULL)
+                    freeReplyObject(r->element[j]);
+            free(r->element);
+        }
         break;
-    default:
+    case REDIS_REPLY_ERROR:
+    case REDIS_REPLY_STATUS:
+    case REDIS_REPLY_STRING:
         if (r->str != NULL)
             free(r->str);
         break;
@@ -97,21 +95,31 @@ void freeReplyObject(void *reply) {
 }
 
 static void *createStringObject(const redisReadTask *task, char *str, size_t len) {
-    redisReply *r = createReplyObject(task->type);
-    char *value = malloc(len+1);
-    if (!value) redisOOM();
-    assert(task->type == REDIS_REPLY_ERROR ||
+    redisReply *r, *parent;
+    char *buf;
+
+    r = createReplyObject(task->type);
+    if (r == NULL)
+        return NULL;
+
+    buf = malloc(len+1);
+    if (buf == NULL) {
+        freeReplyObject(r);
+        return NULL;
+    }
+
+    assert(task->type == REDIS_REPLY_ERROR  ||
            task->type == REDIS_REPLY_STATUS ||
            task->type == REDIS_REPLY_STRING);
 
     /* Copy string value */
-    memcpy(value,str,len);
-    value[len] = '\0';
-    r->str = value;
+    memcpy(buf,str,len);
+    buf[len] = '\0';
+    r->str = buf;
     r->len = len;
 
     if (task->parent) {
-        redisReply *parent = task->parent;
+        parent = task->parent->obj;
         assert(parent->type == REDIS_REPLY_ARRAY);
         parent->element[task->idx] = r;
     }
@@ -119,12 +127,24 @@ static void *createStringObject(const redisReadTask *task, char *str, size_t len
 }
 
 static void *createArrayObject(const redisReadTask *task, int elements) {
-    redisReply *r = createReplyObject(REDIS_REPLY_ARRAY);
+    redisReply *r, *parent;
+
+    r = createReplyObject(REDIS_REPLY_ARRAY);
+    if (r == NULL)
+        return NULL;
+
+    if (elements > 0) {
+        r->element = calloc(elements,sizeof(redisReply*));
+        if (r->element == NULL) {
+            freeReplyObject(r);
+            return NULL;
+        }
+    }
+
     r->elements = elements;
-    if ((r->element = calloc(sizeof(redisReply*),elements)) == NULL)
-        redisOOM();
+
     if (task->parent) {
-        redisReply *parent = task->parent;
+        parent = task->parent->obj;
         assert(parent->type == REDIS_REPLY_ARRAY);
         parent->element[task->idx] = r;
     }
@@ -132,10 +152,16 @@ static void *createArrayObject(const redisReadTask *task, int elements) {
 }
 
 static void *createIntegerObject(const redisReadTask *task, long long value) {
-    redisReply *r = createReplyObject(REDIS_REPLY_INTEGER);
+    redisReply *r, *parent;
+
+    r = createReplyObject(REDIS_REPLY_INTEGER);
+    if (r == NULL)
+        return NULL;
+
     r->integer = value;
+
     if (task->parent) {
-        redisReply *parent = task->parent;
+        parent = task->parent->obj;
         assert(parent->type == REDIS_REPLY_ARRAY);
         parent->element[task->idx] = r;
     }
@@ -143,18 +169,86 @@ static void *createIntegerObject(const redisReadTask *task, long long value) {
 }
 
 static void *createNilObject(const redisReadTask *task) {
-    redisReply *r = createReplyObject(REDIS_REPLY_NIL);
+    redisReply *r, *parent;
+
+    r = createReplyObject(REDIS_REPLY_NIL);
+    if (r == NULL)
+        return NULL;
+
     if (task->parent) {
-        redisReply *parent = task->parent;
+        parent = task->parent->obj;
         assert(parent->type == REDIS_REPLY_ARRAY);
         parent->element[task->idx] = r;
     }
     return r;
 }
 
+static void __redisReaderSetError(redisReader *r, int type, const char *str) {
+    size_t len;
+
+    if (r->reply != NULL && r->fn && r->fn->freeObject) {
+        r->fn->freeObject(r->reply);
+        r->reply = NULL;
+    }
+
+    /* Clear input buffer on errors. */
+    if (r->buf != NULL) {
+        sdsfree(r->buf);
+        r->buf = NULL;
+        r->pos = r->len = 0;
+    }
+
+    /* Reset task stack. */
+    r->ridx = -1;
+
+    /* Set error. */
+    r->err = type;
+    len = strlen(str);
+    len = len < (sizeof(r->errstr)-1) ? len : (sizeof(r->errstr)-1);
+    memcpy(r->errstr,str,len);
+    r->errstr[len] = '\0';
+}
+
+static size_t chrtos(char *buf, size_t size, char byte) {
+    size_t len = 0;
+
+    switch(byte) {
+    case '\\':
+    case '"':
+        len = snprintf(buf,size,"\"\\%c\"",byte);
+        break;
+    case '\n': len = snprintf(buf,size,"\"\\n\""); break;
+    case '\r': len = snprintf(buf,size,"\"\\r\""); break;
+    case '\t': len = snprintf(buf,size,"\"\\t\""); break;
+    case '\a': len = snprintf(buf,size,"\"\\a\""); break;
+    case '\b': len = snprintf(buf,size,"\"\\b\""); break;
+    default:
+        if (isprint(byte))
+            len = snprintf(buf,size,"\"%c\"",byte);
+        else
+            len = snprintf(buf,size,"\"\\x%02x\"",(unsigned char)byte);
+        break;
+    }
+
+    return len;
+}
+
+static void __redisReaderSetErrorProtocolByte(redisReader *r, char byte) {
+    char cbuf[8], sbuf[128];
+
+    chrtos(cbuf,sizeof(cbuf),byte);
+    snprintf(sbuf,sizeof(sbuf),
+        "Protocol error, got %s as reply type byte", cbuf);
+    __redisReaderSetError(r,REDIS_ERR_PROTOCOL,sbuf);
+}
+
+static void __redisReaderSetErrorOOM(redisReader *r) {
+    __redisReaderSetError(r,REDIS_ERR_OOM,"Out of memory");
+}
+
 static char *readBytes(redisReader *r, unsigned int bytes) {
     char *p;
-    if (sdslen(r->buf)-r->pos >= bytes) {
+    if (r->len-r->pos >= bytes) {
         p = r->buf+r->pos;
         r->pos += bytes;
         return p;
@@ -162,20 +256,60 @@ static char *readBytes(redisReader *r, unsigned int bytes) {
     return NULL;
 }
 
-static char *seekNewline(char *s) {
-    /* Find pointer to \r\n without strstr */
-    while (s != NULL) {
-        s = strchr(s,'\r');
-        if (s != NULL) {
-            if (s[1] == '\n')
-                break;
-            else
-                s++;
+/* Find pointer to \r\n. */
+static char *seekNewline(char *s, size_t len) {
+    int pos = 0;
+    int _len = len-1;
+
+    /* Position should be < len-1 because the character at "pos" should be
+     * followed by a \n. Note that strchr cannot be used because it doesn't
+     * allow to search a limited length and the buffer that is being searched
+     * might not have a trailing NULL character. */
+    while (pos < _len) {
+        while(pos < _len && s[pos] != '\r') pos++;
+        if (s[pos] != '\r') {
+            /* Not found. */
+            return NULL;
         } else {
-            break;
+            if (s[pos+1] == '\n') {
+                /* Found. */
+                return s+pos;
+            } else {
+                /* Continue searching. */
+                pos++;
+            }
         }
     }
-    return s;
+    return NULL;
+}
+
+/* Read a long long value starting at *s, under the assumption that it will be
+ * terminated by \r\n. Ambiguously returns -1 for unexpected input. */
+static long long readLongLong(char *s) {
+    long long v = 0;
+    int dec, mult = 1;
+    char c;
+
+    if (*s == '-') {
+        mult = -1;
+        s++;
+    } else if (*s == '+') {
+        mult = 1;
+        s++;
+    }
+
+    while ((c = *(s++)) != '\r') {
+        dec = c - '0';
+        if (dec >= 0 && dec < 10) {
+            v *= 10;
+            v += dec;
+        } else {
+            /* Should not happen... */
+            return -1;
+        }
+    }
+
+    return mult*v;
 }
 
 static char *readLine(redisReader *r, int *_len) {
@@ -183,7 +317,7 @@ static char *readLine(redisReader *r, int *_len) {
     int len;
 
     p = r->buf+r->pos;
-    s = seekNewline(p);
+    s = seekNewline(p,(r->len-r->pos));
     if (s != NULL) {
         len = s-(r->buf+r->pos);
         r->pos += len+2; /* skip \r\n */
@@ -225,23 +359,31 @@ static int processLineItem(redisReader *r) {
     int len;
 
     if ((p = readLine(r,&len)) != NULL) {
-        if (r->fn) {
-            if (cur->type == REDIS_REPLY_INTEGER) {
-                obj = r->fn->createInteger(cur,strtoll(p,NULL,10));
-            } else {
-                obj = r->fn->createString(cur,p,len);
-            }
+        if (cur->type == REDIS_REPLY_INTEGER) {
+            if (r->fn && r->fn->createInteger)
+                obj = r->fn->createInteger(cur,readLongLong(p));
+            else
+                obj = (void*)REDIS_REPLY_INTEGER;
         } else {
-            obj = (void*)(size_t)(cur->type);
+            /* Type will be error or status. */
+            if (r->fn && r->fn->createString)
+                obj = r->fn->createString(cur,p,len);
+            else
+                obj = (void*)(size_t)(cur->type);
         }
 
-        /* If there is no root yet, register this object as root. */
-        if (r->reply == NULL)
-            r->reply = obj;
+        if (obj == NULL) {
+            __redisReaderSetErrorOOM(r);
+            return REDIS_ERR;
+        }
+
+        /* Set reply if this is the root object. */
+        if (r->ridx == 0) r->reply = obj;
         moveToNextTask(r);
-        return 0;
+        return REDIS_OK;
     }
-    return -1;
+
+    return REDIS_ERR;
 }
 
 static int processBulkItem(redisReader *r) {
@@ -250,37 +392,51 @@ static int processBulkItem(redisReader *r) {
     char *p, *s;
     long len;
     unsigned long bytelen;
+    int success = 0;
 
     p = r->buf+r->pos;
-    s = seekNewline(p);
+    s = seekNewline(p,r->len-r->pos);
     if (s != NULL) {
         p = r->buf+r->pos;
         bytelen = s-(r->buf+r->pos)+2; /* include \r\n */
-        len = strtol(p,NULL,10);
+        len = readLongLong(p);
 
         if (len < 0) {
             /* The nil object can always be created. */
-            obj = r->fn ? r->fn->createNil(cur) :
-                (void*)REDIS_REPLY_NIL;
+            if (r->fn && r->fn->createNil)
+                obj = r->fn->createNil(cur);
+            else
+                obj = (void*)REDIS_REPLY_NIL;
+            success = 1;
         } else {
             /* Only continue when the buffer contains the entire bulk item. */
             bytelen += len+2; /* include \r\n */
-            if (r->pos+bytelen <= sdslen(r->buf)) {
-                obj = r->fn ? r->fn->createString(cur,s+2,len) :
-                    (void*)REDIS_REPLY_STRING;
+            if (r->pos+bytelen <= r->len) {
+                if (r->fn && r->fn->createString)
+                    obj = r->fn->createString(cur,s+2,len);
+                else
+                    obj = (void*)REDIS_REPLY_STRING;
+                success = 1;
             }
         }
 
         /* Proceed when obj was created. */
-        if (obj != NULL) {
+        if (success) {
+            if (obj == NULL) {
+                __redisReaderSetErrorOOM(r);
+                return REDIS_ERR;
+            }
+
             r->pos += bytelen;
-            if (r->reply == NULL)
-                r->reply = obj;
+
+            /* Set reply if this is the root object. */
+            if (r->ridx == 0) r->reply = obj;
             moveToNextTask(r);
-            return 0;
+            return REDIS_OK;
         }
     }
-    return -1;
+
+    return REDIS_ERR;
 }
 
 static int processMultiBulkItem(redisReader *r) {
@@ -288,42 +444,69 @@ static int processMultiBulkItem(redisReader *r) {
     void *obj;
     char *p;
     long elements;
+    int root = 0;
+
+    /* Set error for nested multi bulks with depth > 2 */
+    if (r->ridx == 8) {
+        __redisReaderSetError(r,REDIS_ERR_PROTOCOL,
+            "No support for nested multi bulk replies with depth > 7");
+        return REDIS_ERR;
+    }
 
     if ((p = readLine(r,NULL)) != NULL) {
-        elements = strtol(p,NULL,10);
+        elements = readLongLong(p);
+        root = (r->ridx == 0);
+
         if (elements == -1) {
-            obj = r->fn ? r->fn->createNil(cur) :
-                (void*)REDIS_REPLY_NIL;
+            if (r->fn && r->fn->createNil)
+                obj = r->fn->createNil(cur);
+            else
+                obj = (void*)REDIS_REPLY_NIL;
+
+            if (obj == NULL) {
+                __redisReaderSetErrorOOM(r);
+                return REDIS_ERR;
+            }
+
             moveToNextTask(r);
         } else {
-            obj = r->fn ? r->fn->createArray(cur,elements) :
-                (void*)REDIS_REPLY_ARRAY;
+            if (r->fn && r->fn->createArray)
+                obj = r->fn->createArray(cur,elements);
+            else
+                obj = (void*)REDIS_REPLY_ARRAY;
+
+            if (obj == NULL) {
+                __redisReaderSetErrorOOM(r);
+                return REDIS_ERR;
+            }
 
             /* Modify task stack when there are more than 0 elements. */
             if (elements > 0) {
                 cur->elements = elements;
+                cur->obj = obj;
                 r->ridx++;
                 r->rstack[r->ridx].type = -1;
                 r->rstack[r->ridx].elements = -1;
-                r->rstack[r->ridx].parent = obj;
                 r->rstack[r->ridx].idx = 0;
+                r->rstack[r->ridx].obj = NULL;
+                r->rstack[r->ridx].parent = cur;
+                r->rstack[r->ridx].privdata = r->privdata;
             } else {
                 moveToNextTask(r);
             }
         }
 
-        /* Object was created, so we can always continue. */
-        if (r->reply == NULL)
-            r->reply = obj;
-        return 0;
+        /* Set reply if this is the root object. */
+        if (root) r->reply = obj;
+        return REDIS_OK;
     }
-    return -1;
+
+    return REDIS_ERR;
 }
 
 static int processItem(redisReader *r) {
     redisReadTask *cur = &(r->rstack[r->ridx]);
     char *p;
-    sds byte;
 
     /* check if we need to read type */
     if (cur->type < 0) {
@@ -345,15 +528,12 @@ static int processItem(redisReader *r) {
                 cur->type = REDIS_REPLY_ARRAY;
                 break;
             default:
-                byte = sdscatrepr(sdsempty(),p,1);
-                redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
-                    "protocol error, got %s as reply type byte", byte));
-                sdsfree(byte);
-                return -1;
+                __redisReaderSetErrorProtocolByte(r,*p);
+                return REDIS_ERR;
             }
         } else {
             /* could not consume 1 byte */
-            return -1;
+            return REDIS_ERR;
         }
     }
 
@@ -368,131 +548,119 @@ static int processItem(redisReader *r) {
     case REDIS_REPLY_ARRAY:
         return processMultiBulkItem(r);
     default:
-        redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
-            "unknown item type '%d'", cur->type));
-        return -1;
+        assert(NULL);
+        return REDIS_ERR; /* Avoid warning. */
     }
 }
 
-void *redisReplyReaderCreate() {
-    redisReader *r = calloc(sizeof(redisReader),1);
-    r->error = NULL;
+redisReader *redisReaderCreate(void) {
+    redisReader *r;
+
+    r = calloc(sizeof(redisReader),1);
+    if (r == NULL)
+        return NULL;
+
+    r->err = 0;
+    r->errstr[0] = '\0';
     r->fn = &defaultFunctions;
     r->buf = sdsempty();
-    r->ridx = -1;
-    return r;
-}
-
-/* Set the function set to build the reply. Returns REDIS_OK when there
- * is no temporary object and it can be set, REDIS_ERR otherwise. */
-int redisReplyReaderSetReplyObjectFunctions(void *reader, redisReplyObjectFunctions *fn) {
-    redisReader *r = reader;
-    if (r->reply == NULL) {
-        r->fn = fn;
-        return REDIS_OK;
+    if (r->buf == NULL) {
+        free(r);
+        return NULL;
     }
-    return REDIS_ERR;
-}
 
-/* External libraries wrapping hiredis might need access to the temporary
- * variable while the reply is built up. When the reader contains an
- * object in between receiving some bytes to parse, this object might
- * otherwise be free'd by garbage collection. */
-void *redisReplyReaderGetObject(void *reader) {
-    redisReader *r = reader;
-    return r->reply;
+    r->ridx = -1;
+    return r;
 }
 
-void redisReplyReaderFree(void *reader) {
-    redisReader *r = reader;
-    if (r->error != NULL)
-        sdsfree(r->error);
-    if (r->reply != NULL && r->fn)
+void redisReaderFree(redisReader *r) {
+    if (r->reply != NULL && r->fn && r->fn->freeObject)
         r->fn->freeObject(r->reply);
     if (r->buf != NULL)
         sdsfree(r->buf);
     free(r);
 }
 
-static void redisSetReplyReaderError(redisReader *r, sds err) {
-    if (r->reply != NULL)
-        r->fn->freeObject(r->reply);
+int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
+    sds newbuf;
 
-    /* Clear remaining buffer when we see a protocol error. */
-    if (r->buf != NULL) {
-        sdsfree(r->buf);
-        r->buf = sdsempty();
-        r->pos = 0;
-    }
-    r->ridx = -1;
-    r->error = err;
-}
+    /* Return early when this reader is in an erroneous state. */
+    if (r->err)
+        return REDIS_ERR;
 
-char *redisReplyReaderGetError(void *reader) {
-    redisReader *r = reader;
-    return r->error;
-}
+    /* Copy the provided buffer. */
+    if (buf != NULL && len >= 1) {
+#if 0
+        /* Destroy internal buffer when it is empty and is quite large. */
+        if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
+            sdsfree(r->buf);
+            r->buf = sdsempty();
+            r->pos = 0;
 
-void redisReplyReaderFeed(void *reader, char *buf, size_t len) {
-    redisReader *r = reader;
+            /* r->buf should not be NULL since we just free'd a larger one. */
+            assert(r->buf != NULL);
+        }
+#endif
 
-    /* Copy the provided buffer. */
-    if (buf != NULL && len >= 1)
-        r->buf = sdscatlen(r->buf,buf,len);
+        newbuf = sdscatlen(r->buf,buf,len);
+        if (newbuf == NULL) {
+            __redisReaderSetErrorOOM(r);
+            return REDIS_ERR;
+        }
+
+        r->buf = newbuf;
+        r->len = sdslen(r->buf);
+    }
+
+    return REDIS_OK;
 }
 
-int redisReplyReaderGetReply(void *reader, void **reply) {
-    redisReader *r = reader;
-    if (reply != NULL) *reply = NULL;
+int redisReaderGetReply(redisReader *r, void **reply) {
+    /* Default target pointer to NULL. */
+    if (reply != NULL)
+        *reply = NULL;
+
+    /* Return early when this reader is in an erroneous state. */
+    if (r->err)
+        return REDIS_ERR;
 
     /* When the buffer is empty, there will never be a reply. */
-    if (sdslen(r->buf) == 0)
+    if (r->len == 0)
         return REDIS_OK;
 
     /* Set first item to process when the stack is empty. */
     if (r->ridx == -1) {
         r->rstack[0].type = -1;
         r->rstack[0].elements = -1;
-        r->rstack[0].parent = NULL;
         r->rstack[0].idx = -1;
+        r->rstack[0].obj = NULL;
+        r->rstack[0].parent = NULL;
+        r->rstack[0].privdata = r->privdata;
         r->ridx = 0;
     }
 
     /* Process items in reply. */
     while (r->ridx >= 0)
-        if (processItem(r) < 0)
+        if (processItem(r) != REDIS_OK)
             break;
 
-    /* Discard the consumed part of the buffer. */
-    if (r->pos > 0) {
-        if (r->pos == sdslen(r->buf)) {
-            /* sdsrange has a quirck on this edge case. */
-            sdsfree(r->buf);
-            r->buf = sdsempty();
-        } else {
-            r->buf = sdsrange(r->buf,r->pos,sdslen(r->buf));
-        }
+    /* Return ASAP when an error occurred. */
+    if (r->err)
+        return REDIS_ERR;
+
+    /* Discard part of the buffer when we've consumed at least 1k, to avoid
+     * doing unnecessary calls to memmove() in sds.c. */
+    if (r->pos >= 1024) {
+        r->buf = sdsrange(r->buf,r->pos,-1);
         r->pos = 0;
+        r->len = sdslen(r->buf);
     }
 
     /* Emit a reply when there is one. */
     if (r->ridx == -1) {
-        void *aux = r->reply;
+        if (reply != NULL)
+            *reply = r->reply;
         r->reply = NULL;
-
-        /* Destroy the buffer when it is empty and is quite large. */
-        if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) {
-            sdsfree(r->buf);
-            r->buf = sdsempty();
-            r->pos = 0;
-        }
-
-        /* Check if there actually *is* a reply. */
-        if (r->error != NULL) {
-            return REDIS_ERR;
-        } else {
-            if (reply != NULL) *reply = aux;
-        }
     }
     return REDIS_OK;
 }
@@ -511,87 +679,236 @@ static int intlen(int i) {
     return len;
 }
 
-/* Helper function for redisvFormatCommand(). */
-static void addArgument(sds a, char ***argv, int *argc, int *totlen) {
-    (*argc)++;
-    if ((*argv = realloc(*argv, sizeof(char*)*(*argc))) == NULL) redisOOM();
-    if (totlen) *totlen = *totlen+1+intlen(sdslen(a))+2+sdslen(a)+2;
-    (*argv)[(*argc)-1] = a;
+/* Helper that calculates the bulk length given a certain string length. */
+static size_t bulklen(size_t len) {
+    return 1+intlen(len)+2+len+2;
 }
 
 int redisvFormatCommand(char **target, const char *format, va_list ap) {
-    size_t size;
-    const char *arg, *c = format;
+    const char *c = format;
     char *cmd = NULL; /* final command */
     int pos; /* position in final command */
-    sds current; /* current argument */
-    char **argv = NULL;
-    int argc = 0, j;
+    sds curarg, newarg; /* current argument */
+    int touched = 0; /* was the current argument touched? */
+    char **curargv = NULL, **newargv = NULL;
+    int argc = 0;
     int totlen = 0;
+    int j;
 
     /* Abort if there is not target to set */
     if (target == NULL)
         return -1;
 
     /* Build the command string accordingly to protocol */
-    current = sdsempty();
+    curarg = sdsempty();
+    if (curarg == NULL)
+        return -1;
+
     while(*c != '\0') {
         if (*c != '%' || c[1] == '\0') {
             if (*c == ' ') {
-                if (sdslen(current) != 0) {
-                    addArgument(current, &argv, &argc, &totlen);
-                    current = sdsempty();
+                if (touched) {
+                    newargv = realloc(curargv,sizeof(char*)*(argc+1));
+                    if (newargv == NULL) goto err;
+                    curargv = newargv;
+                    curargv[argc++] = curarg;
+                    totlen += bulklen(sdslen(curarg));
+
+                    /* curarg is put in argv so it can be overwritten. */
+                    curarg = sdsempty();
+                    if (curarg == NULL) goto err;
+                    touched = 0;
                 }
             } else {
-                current = sdscatlen(current,c,1);
+                newarg = sdscatlen(curarg,c,1);
+                if (newarg == NULL) goto err;
+                curarg = newarg;
+                touched = 1;
             }
         } else {
+            char *arg;
+            size_t size;
+
+            /* Set newarg so it can be checked even if it is not touched. */
+            newarg = curarg;
+
             switch(c[1]) {
             case 's':
                 arg = va_arg(ap,char*);
-                current = sdscat(current,arg);
+                size = strlen(arg);
+                if (size > 0)
+                    newarg = sdscatlen(curarg,arg,size);
                 break;
             case 'b':
                 arg = va_arg(ap,char*);
                 size = va_arg(ap,size_t);
-                current = sdscatlen(current,arg,size);
+                if (size > 0)
+                    newarg = sdscatlen(curarg,arg,size);
                 break;
             case '%':
-                cmd = sdscat(cmd,"%");
+                newarg = sdscat(curarg,"%");
                 break;
+            default:
+                /* Try to detect printf format */
+                {
+                    static const char intfmts[] = "diouxX";
+                    char _format[16];
+                    const char *_p = c+1;
+                    size_t _l = 0;
+                    va_list _cpy;
+
+                    /* Flags */
+                    if (*_p != '\0' && *_p == '#') _p++;
+                    if (*_p != '\0' && *_p == '0') _p++;
+                    if (*_p != '\0' && *_p == '-') _p++;
+                    if (*_p != '\0' && *_p == ' ') _p++;
+                    if (*_p != '\0' && *_p == '+') _p++;
+
+                    /* Field width */
+                    while (*_p != '\0' && isdigit(*_p)) _p++;
+
+                    /* Precision */
+                    if (*_p == '.') {
+                        _p++;
+                        while (*_p != '\0' && isdigit(*_p)) _p++;
+                    }
+
+                    /* Copy va_list before consuming with va_arg */
+                    va_copy(_cpy,ap);
+
+                    /* Integer conversion (without modifiers) */
+                    if (strchr(intfmts,*_p) != NULL) {
+                        va_arg(ap,int);
+                        goto fmt_valid;
+                    }
+
+                    /* Double conversion (without modifiers) */
+                    if (strchr("eEfFgGaA",*_p) != NULL) {
+                        va_arg(ap,double);
+                        goto fmt_valid;
+                    }
+
+                    /* Size: char */
+                    if (_p[0] == 'h' && _p[1] == 'h') {
+                        _p += 2;
+                        if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
+                            va_arg(ap,int); /* char gets promoted to int */
+                            goto fmt_valid;
+                        }
+                        goto fmt_invalid;
+                    }
+
+                    /* Size: short */
+                    if (_p[0] == 'h') {
+                        _p += 1;
+                        if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
+                            va_arg(ap,int); /* short gets promoted to int */
+                            goto fmt_valid;
+                        }
+                        goto fmt_invalid;
+                    }
+
+                    /* Size: long long */
+                    if (_p[0] == 'l' && _p[1] == 'l') {
+                        _p += 2;
+                        if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
+                            va_arg(ap,long long);
+                            goto fmt_valid;
+                        }
+                        goto fmt_invalid;
+                    }
+
+                    /* Size: long */
+                    if (_p[0] == 'l') {
+                        _p += 1;
+                        if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
+                            va_arg(ap,long);
+                            goto fmt_valid;
+                        }
+                        goto fmt_invalid;
+                    }
+
+                fmt_invalid:
+                    va_end(_cpy);
+                    goto err;
+
+                fmt_valid:
+                    _l = (_p+1)-c;
+                    if (_l < sizeof(_format)-2) {
+                        memcpy(_format,c,_l);
+                        _format[_l] = '\0';
+                        newarg = sdscatvprintf(curarg,_format,_cpy);
+
+                        /* Update current position (note: outer blocks
+                         * increment c twice so compensate here) */
+                        c = _p-1;
+                    }
+
+                    va_end(_cpy);
+                    break;
+                }
             }
+
+            if (newarg == NULL) goto err;
+            curarg = newarg;
+
+            touched = 1;
             c++;
         }
         c++;
     }
 
     /* Add the last argument if needed */
-    if (sdslen(current) != 0) {
-        addArgument(current, &argv, &argc, &totlen);
+    if (touched) {
+        newargv = realloc(curargv,sizeof(char*)*(argc+1));
+        if (newargv == NULL) goto err;
+        curargv = newargv;
+        curargv[argc++] = curarg;
+        totlen += bulklen(sdslen(curarg));
     } else {
-        sdsfree(current);
+        sdsfree(curarg);
     }
 
+    /* Clear curarg because it was put in curargv or was free'd. */
+    curarg = NULL;
+
     /* Add bytes needed to hold multi bulk count */
     totlen += 1+intlen(argc)+2;
 
     /* Build the command at protocol level */
     cmd = malloc(totlen+1);
-    if (!cmd) redisOOM();
+    if (cmd == NULL) goto err;
+
     pos = sprintf(cmd,"*%d\r\n",argc);
     for (j = 0; j < argc; j++) {
-        pos += sprintf(cmd+pos,"$%zu\r\n",sdslen(argv[j]));
-        memcpy(cmd+pos,argv[j],sdslen(argv[j]));
-        pos += sdslen(argv[j]);
-        sdsfree(argv[j]);
+        pos += sprintf(cmd+pos,"$%zu\r\n",sdslen(curargv[j]));
+        memcpy(cmd+pos,curargv[j],sdslen(curargv[j]));
+        pos += sdslen(curargv[j]);
+        sdsfree(curargv[j]);
         cmd[pos++] = '\r';
         cmd[pos++] = '\n';
     }
     assert(pos == totlen);
-    free(argv);
-    cmd[totlen] = '\0';
+    cmd[pos] = '\0';
+
+    free(curargv);
     *target = cmd;
     return totlen;
+
+err:
+    while(argc--)
+        sdsfree(curargv[argc]);
+    free(curargv);
+
+    if (curarg != NULL)
+        sdsfree(curarg);
+
+    /* No need to check cmd since it is the last statement that can fail,
+     * but do it anyway to be as defensive as possible. */
+    if (cmd != NULL)
+        free(cmd);
+
+    return -1;
 }
 
 /* Format a command according to the Redis protocol. This function
@@ -630,12 +947,14 @@ int redisFormatCommandArgv(char **target, int argc, const char **argv, const siz
     totlen = 1+intlen(argc)+2;
     for (j = 0; j < argc; j++) {
         len = argvlen ? argvlen[j] : strlen(argv[j]);
-        totlen += 1+intlen(len)+2+len+2;
+        totlen += bulklen(len);
     }
 
     /* Build the command at protocol level */
     cmd = malloc(totlen+1);
-    if (!cmd) redisOOM();
+    if (cmd == NULL)
+        return -1;
+
     pos = sprintf(cmd,"*%d\r\n",argc);
     for (j = 0; j < argc; j++) {
         len = argvlen ? argvlen[j] : strlen(argv[j]);
@@ -646,42 +965,49 @@ int redisFormatCommandArgv(char **target, int argc, const char **argv, const siz
         cmd[pos++] = '\n';
     }
     assert(pos == totlen);
-    cmd[totlen] = '\0';
+    cmd[pos] = '\0';
+
     *target = cmd;
     return totlen;
 }
 
-void __redisSetError(redisContext *c, int type, const sds errstr) {
+void __redisSetError(redisContext *c, int type, const char *str) {
+    size_t len;
+
     c->err = type;
-    if (errstr != NULL) {
-        c->errstr = errstr;
+    if (str != NULL) {
+        len = strlen(str);
+        len = len < (sizeof(c->errstr)-1) ? len : (sizeof(c->errstr)-1);
+        memcpy(c->errstr,str,len);
+        c->errstr[len] = '\0';
     } else {
         /* Only REDIS_ERR_IO may lack a description! */
         assert(type == REDIS_ERR_IO);
-        c->errstr = sdsnew(strerror(errno));
+        strerror_r(errno,c->errstr,sizeof(c->errstr));
     }
 }
 
-static redisContext *redisContextInit() {
-    redisContext *c = calloc(sizeof(redisContext),1);
+static redisContext *redisContextInit(void) {
+    redisContext *c;
+
+    c = calloc(1,sizeof(redisContext));
+    if (c == NULL)
+        return NULL;
+
     c->err = 0;
-    c->errstr = NULL;
+    c->errstr[0] = '\0';
     c->obuf = sdsempty();
-    c->fn = &defaultFunctions;
-    c->reader = NULL;
+    c->reader = redisReaderCreate();
     return c;
 }
 
 void redisFree(redisContext *c) {
-    /* Disconnect before free'ing if not yet disconnected. */
-    if (c->flags & REDIS_CONNECTED)
+    if (c->fd > 0)
         close(c->fd);
-    if (c->errstr != NULL)
-        sdsfree(c->errstr);
     if (c->obuf != NULL)
         sdsfree(c->obuf);
     if (c->reader != NULL)
-        redisReplyReaderFree(c->reader);
+        redisReaderFree(c->reader);
     free(c);
 }
 
@@ -691,51 +1017,50 @@ void redisFree(redisContext *c) {
 redisContext *redisConnect(const char *ip, int port) {
     redisContext *c = redisContextInit();
     c->flags |= REDIS_BLOCK;
-    c->flags |= REDIS_CONNECTED;
-    redisContextConnectTcp(c,ip,port);
+    redisContextConnectTcp(c,ip,port,NULL);
+    return c;
+}
+
+redisContext *redisConnectWithTimeout(const char *ip, int port, struct timeval tv) {
+    redisContext *c = redisContextInit();
+    c->flags |= REDIS_BLOCK;
+    redisContextConnectTcp(c,ip,port,&tv);
     return c;
 }
 
 redisContext *redisConnectNonBlock(const char *ip, int port) {
     redisContext *c = redisContextInit();
     c->flags &= ~REDIS_BLOCK;
-    c->flags |= REDIS_CONNECTED;
-    redisContextConnectTcp(c,ip,port);
+    redisContextConnectTcp(c,ip,port,NULL);
     return c;
 }
 
 redisContext *redisConnectUnix(const char *path) {
     redisContext *c = redisContextInit();
     c->flags |= REDIS_BLOCK;
-    c->flags |= REDIS_CONNECTED;
-    redisContextConnectUnix(c,path);
+    redisContextConnectUnix(c,path,NULL);
     return c;
 }
 
-redisContext *redisConnectUnixNonBlock(const char *path) {
+redisContext *redisConnectUnixWithTimeout(const char *path, struct timeval tv) {
     redisContext *c = redisContextInit();
-    c->flags &= ~REDIS_BLOCK;
-    c->flags |= REDIS_CONNECTED;
-    redisContextConnectUnix(c,path);
+    c->flags |= REDIS_BLOCK;
+    redisContextConnectUnix(c,path,&tv);
     return c;
 }
 
-/* Set the replyObjectFunctions to use. Returns REDIS_ERR when the reader
- * was already initialized and the function set could not be re-set.
- * Return REDIS_OK when they could be set. */
-int redisSetReplyObjectFunctions(redisContext *c, redisReplyObjectFunctions *fn) {
-    if (c->reader != NULL)
-        return REDIS_ERR;
-    c->fn = fn;
-    return REDIS_OK;
+redisContext *redisConnectUnixNonBlock(const char *path) {
+    redisContext *c = redisContextInit();
+    c->flags &= ~REDIS_BLOCK;
+    redisContextConnectUnix(c,path,NULL);
+    return c;
 }
 
-/* Helper function to lazily create a reply reader. */
-static void __redisCreateReplyReader(redisContext *c) {
-    if (c->reader == NULL) {
-        c->reader = redisReplyReaderCreate();
-        assert(redisReplyReaderSetReplyObjectFunctions(c->reader,c->fn) == REDIS_OK);
-    }
+/* Set read/write timeout on a blocking socket. */
+int redisSetTimeout(redisContext *c, struct timeval tv) {
+    if (c->flags & REDIS_BLOCK)
+        return redisContextSetTimeout(c,tv);
+    return REDIS_ERR;
 }
 
 /* Use this function to handle a read event on the descriptor. It will try
@@ -744,22 +1069,29 @@ static void __redisCreateReplyReader(redisContext *c) {
  * After this function is called, you may use redisContextReadReply to
  * see if there is a reply available. */
 int redisBufferRead(redisContext *c) {
-    char buf[2048];
-    int nread = read(c->fd,buf,sizeof(buf));
+    char buf[1024*16];
+    int nread;
+
+    /* Return early when the context has seen an error. */
+    if (c->err)
+        return REDIS_ERR;
+
+    nread = read(c->fd,buf,sizeof(buf));
     if (nread == -1) {
-        if (errno == EAGAIN) {
+        if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) {
             /* Try again later */
         } else {
             __redisSetError(c,REDIS_ERR_IO,NULL);
             return REDIS_ERR;
         }
     } else if (nread == 0) {
-        __redisSetError(c,REDIS_ERR_EOF,
-            sdsnew("Server closed the connection"));
+        __redisSetError(c,REDIS_ERR_EOF,"Server closed the connection");
         return REDIS_ERR;
     } else {
-        __redisCreateReplyReader(c);
-        redisReplyReaderFeed(c->reader,buf,nread);
+        if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) {
+            __redisSetError(c,c->reader->err,c->reader->errstr);
+            return REDIS_ERR;
+        }
     }
     return REDIS_OK;
 }
@@ -768,17 +1100,22 @@ int redisBufferRead(redisContext *c) {
  *
  * Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was
  * succesfully written to the socket. When the buffer is empty after the
- * write operation, "wdone" is set to 1 (if given).
+ * write operation, "done" is set to 1 (if given).
  *
  * Returns REDIS_ERR if an error occured trying to write and sets
- * c->error to hold the appropriate error string.
+ * c->errstr to hold the appropriate error string.
  */
 int redisBufferWrite(redisContext *c, int *done) {
     int nwritten;
+
+    /* Return early when the context has seen an error. */
+    if (c->err)
+        return REDIS_ERR;
+
     if (sdslen(c->obuf) > 0) {
         nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
         if (nwritten == -1) {
-            if (errno == EAGAIN) {
+            if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) {
                 /* Try again later */
             } else {
                 __redisSetError(c,REDIS_ERR_IO,NULL);
@@ -800,10 +1137,8 @@ int redisBufferWrite(redisContext *c, int *done) {
 /* Internal helper function to try and get a reply from the reader,
  * or set an error in the context otherwise. */
 int redisGetReplyFromReader(redisContext *c, void **reply) {
-    __redisCreateReplyReader(c);
-    if (redisReplyReaderGetReply(c->reader,reply) == REDIS_ERR) {
-        __redisSetError(c,REDIS_ERR_PROTOCOL,
-            sdsnew(((redisReader*)c->reader)->error));
+    if (redisReaderGetReply(c->reader,reply) == REDIS_ERR) {
+        __redisSetError(c,c->reader->err,c->reader->errstr);
         return REDIS_ERR;
     }
     return REDIS_OK;
@@ -846,31 +1181,65 @@ int redisGetReply(redisContext *c, void **reply) {
  * is used, you need to call redisGetReply yourself to retrieve
  * the reply (or replies in pub/sub).
  */
-void __redisAppendCommand(redisContext *c, char *cmd, size_t len) {
-    c->obuf = sdscatlen(c->obuf,cmd,len);
+int __redisAppendCommand(redisContext *c, char *cmd, size_t len) {
+    sds newbuf;
+
+    newbuf = sdscatlen(c->obuf,cmd,len);
+    if (newbuf == NULL) {
+        __redisSetError(c,REDIS_ERR_OOM,"Out of memory");
+        return REDIS_ERR;
+    }
+
+    c->obuf = newbuf;
+    return REDIS_OK;
 }
 
-void redisvAppendCommand(redisContext *c, const char *format, va_list ap) {
+int redisvAppendCommand(redisContext *c, const char *format, va_list ap) {
     char *cmd;
     int len;
+
     len = redisvFormatCommand(&cmd,format,ap);
-    __redisAppendCommand(c,cmd,len);
+    if (len == -1) {
+        __redisSetError(c,REDIS_ERR_OOM,"Out of memory");
+        return REDIS_ERR;
+    }
+
+    if (__redisAppendCommand(c,cmd,len) != REDIS_OK) {
+        free(cmd);
+        return REDIS_ERR;
+    }
+
     free(cmd);
+    return REDIS_OK;
 }
 
-void redisAppendCommand(redisContext *c, const char *format, ...) {
+int redisAppendCommand(redisContext *c, const char *format, ...) {
     va_list ap;
+    int ret;
+
     va_start(ap,format);
-    redisvAppendCommand(c,format,ap);
+    ret = redisvAppendCommand(c,format,ap);
     va_end(ap);
+    return ret;
 }
 
-void redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
+int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
     char *cmd;
     int len;
+
     len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
-    __redisAppendCommand(c,cmd,len);
+    if (len == -1) {
+        __redisSetError(c,REDIS_ERR_OOM,"Out of memory");
+        return REDIS_ERR;
+    }
+
+    if (__redisAppendCommand(c,cmd,len) != REDIS_OK) {
+        free(cmd);
+        return REDIS_ERR;
+    }
+
     free(cmd);
+    return REDIS_OK;
 }
 
 /* Helper function for the redisCommand* family of functions.
@@ -884,26 +1253,21 @@ void redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const
  * otherwise. When NULL is returned in a blocking context, the error field
  * in the context will be set.
  */
-static void *__redisCommand(redisContext *c, char *cmd, size_t len) {
-    void *aux = NULL;
-    __redisAppendCommand(c,cmd,len);
+static void *__redisBlockForReply(redisContext *c) {
+    void *reply;
 
     if (c->flags & REDIS_BLOCK) {
-        if (redisGetReply(c,&aux) == REDIS_OK)
-            return aux;
-        return NULL;
+        if (redisGetReply(c,&reply) != REDIS_OK)
+            return NULL;
+        return reply;
     }
     return NULL;
 }
 
 void *redisvCommand(redisContext *c, const char *format, va_list ap) {
-    char *cmd;
-    int len;
-    void *reply = NULL;
-    len = redisvFormatCommand(&cmd,format,ap);
-    reply = __redisCommand(c,cmd,len);
-    free(cmd);
-    return reply;
+    if (redisvAppendCommand(c,format,ap) != REDIS_OK)
+        return NULL;
+    return __redisBlockForReply(c);
 }
 
 void *redisCommand(redisContext *c, const char *format, ...) {
@@ -916,11 +1280,7 @@ void *redisCommand(redisContext *c, const char *format, ...) {
 }
 
 void *redisCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
-    char *cmd;
-    int len;
-    void *reply = NULL;
-    len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
-    reply = __redisCommand(c,cmd,len);
-    free(cmd);
-    return reply;
+    if (redisAppendCommandArgv(c,argc,argv,argvlen) != REDIS_OK)
+        return NULL;
+    return __redisBlockForReply(c);
 }