]> git.saurik.com Git - redis.git/blobdiff - deps/hiredis/hiredis.c
TODO updated
[redis.git] / deps / hiredis / hiredis.c
index 898b4d6afa950184bc5bf8f1e05182e318313364..b27c63b83868980542287be9988152b5f5f71759 100644 (file)
@@ -1,5 +1,7 @@
 /*
  * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
+ * Copyright (c) 2010, Pieter Noordhuis <pcnoordhuis at gmail dot com>
+ *
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -32,6 +34,7 @@
 #include <unistd.h>
 #include <assert.h>
 #include <errno.h>
+#include <ctype.h>
 
 #include "hiredis.h"
 #include "net.h"
@@ -44,10 +47,12 @@ typedef struct redisReader {
     void *reply; /* holds temporary reply */
 
     sds buf; /* read buffer */
-    unsigned int pos; /* buffer cursor */
+    size_t pos; /* buffer cursor */
+    size_t len; /* buffer length */
 
-    redisReadTask rstack[3]; /* stack of read tasks */
+    redisReadTask rstack[9]; /* stack of read tasks */
     int ridx; /* index of stack */
+    void *privdata; /* user-settable arbitrary field */
 } redisReader;
 
 static redisReply *createReplyObject(int type);
@@ -68,7 +73,7 @@ static redisReplyObjectFunctions defaultFunctions = {
 
 /* Create a reply object */
 static redisReply *createReplyObject(int type) {
-    redisReply *r = calloc(sizeof(*r),1);
+    redisReply *r = malloc(sizeof(*r));
 
     if (!r) redisOOM();
     r->type = type;
@@ -88,9 +93,10 @@ void freeReplyObject(void *reply) {
             if (r->element[j]) freeReplyObject(r->element[j]);
         free(r->element);
         break;
-    default:
-        if (r->str != NULL)
-            free(r->str);
+    case REDIS_REPLY_ERROR:
+    case REDIS_REPLY_STATUS:
+    case REDIS_REPLY_STRING:
+        free(r->str);
         break;
     }
     free(r);
@@ -111,7 +117,7 @@ static void *createStringObject(const redisReadTask *task, char *str, size_t len
     r->len = len;
 
     if (task->parent) {
-        redisReply *parent = task->parent;
+        redisReply *parent = task->parent->obj;
         assert(parent->type == REDIS_REPLY_ARRAY);
         parent->element[task->idx] = r;
     }
@@ -124,7 +130,7 @@ static void *createArrayObject(const redisReadTask *task, int elements) {
     if ((r->element = calloc(sizeof(redisReply*),elements)) == NULL)
         redisOOM();
     if (task->parent) {
-        redisReply *parent = task->parent;
+        redisReply *parent = task->parent->obj;
         assert(parent->type == REDIS_REPLY_ARRAY);
         parent->element[task->idx] = r;
     }
@@ -135,7 +141,7 @@ static void *createIntegerObject(const redisReadTask *task, long long value) {
     redisReply *r = createReplyObject(REDIS_REPLY_INTEGER);
     r->integer = value;
     if (task->parent) {
-        redisReply *parent = task->parent;
+        redisReply *parent = task->parent->obj;
         assert(parent->type == REDIS_REPLY_ARRAY);
         parent->element[task->idx] = r;
     }
@@ -145,7 +151,7 @@ static void *createIntegerObject(const redisReadTask *task, long long value) {
 static void *createNilObject(const redisReadTask *task) {
     redisReply *r = createReplyObject(REDIS_REPLY_NIL);
     if (task->parent) {
-        redisReply *parent = task->parent;
+        redisReply *parent = task->parent->obj;
         assert(parent->type == REDIS_REPLY_ARRAY);
         parent->element[task->idx] = r;
     }
@@ -154,7 +160,7 @@ static void *createNilObject(const redisReadTask *task) {
 
 static char *readBytes(redisReader *r, unsigned int bytes) {
     char *p;
-    if (sdslen(r->buf)-r->pos >= bytes) {
+    if (r->len-r->pos >= bytes) {
         p = r->buf+r->pos;
         r->pos += bytes;
         return p;
@@ -162,20 +168,60 @@ static char *readBytes(redisReader *r, unsigned int bytes) {
     return NULL;
 }
 
-static char *seekNewline(char *s) {
-    /* Find pointer to \r\n without strstr */
-    while (s != NULL) {
-        s = strchr(s,'\r');
-        if (s != NULL) {
-            if (s[1] == '\n')
-                break;
-            else
-                s++;
+/* Find pointer to \r\n. */
+static char *seekNewline(char *s, size_t len) {
+    int pos = 0;
+    int _len = len-1;
+
+    /* Position should be < len-1 because the character at "pos" should be
+     * followed by a \n. Note that strchr cannot be used because it doesn't
+     * allow to search a limited length and the buffer that is being searched
+     * might not have a trailing NULL character. */
+    while (pos < _len) {
+        while(pos < _len && s[pos] != '\r') pos++;
+        if (s[pos] != '\r') {
+            /* Not found. */
+            return NULL;
         } else {
-            break;
+            if (s[pos+1] == '\n') {
+                /* Found. */
+                return s+pos;
+            } else {
+                /* Continue searching. */
+                pos++;
+            }
+        }
+    }
+    return NULL;
+}
+
+/* Read a long long value starting at *s, under the assumption that it will be
+ * terminated by \r\n. Ambiguously returns -1 for unexpected input. */
+static long long readLongLong(char *s) {
+    long long v = 0;
+    int dec, mult = 1;
+    char c;
+
+    if (*s == '-') {
+        mult = -1;
+        s++;
+    } else if (*s == '+') {
+        mult = 1;
+        s++;
+    }
+
+    while ((c = *(s++)) != '\r') {
+        dec = c - '0';
+        if (dec >= 0 && dec < 10) {
+            v *= 10;
+            v += dec;
+        } else {
+            /* Should not happen... */
+            return -1;
         }
     }
-    return s;
+
+    return mult*v;
 }
 
 static char *readLine(redisReader *r, int *_len) {
@@ -183,7 +229,7 @@ static char *readLine(redisReader *r, int *_len) {
     int len;
 
     p = r->buf+r->pos;
-    s = seekNewline(p);
+    s = seekNewline(p,(r->len-r->pos));
     if (s != NULL) {
         len = s-(r->buf+r->pos);
         r->pos += len+2; /* skip \r\n */
@@ -225,19 +271,21 @@ static int processLineItem(redisReader *r) {
     int len;
 
     if ((p = readLine(r,&len)) != NULL) {
-        if (r->fn) {
-            if (cur->type == REDIS_REPLY_INTEGER) {
-                obj = r->fn->createInteger(cur,strtoll(p,NULL,10));
-            } else {
-                obj = r->fn->createString(cur,p,len);
-            }
+        if (cur->type == REDIS_REPLY_INTEGER) {
+            if (r->fn && r->fn->createInteger)
+                obj = r->fn->createInteger(cur,readLongLong(p));
+            else
+                obj = (void*)REDIS_REPLY_INTEGER;
         } else {
-            obj = (void*)(size_t)(cur->type);
+            /* Type will be error or status. */
+            if (r->fn && r->fn->createString)
+                obj = r->fn->createString(cur,p,len);
+            else
+                obj = (void*)(size_t)(cur->type);
         }
 
-        /* If there is no root yet, register this object as root. */
-        if (r->reply == NULL)
-            r->reply = obj;
+        /* Set reply if this is the root object. */
+        if (r->ridx == 0) r->reply = obj;
         moveToNextTask(r);
         return 0;
     }
@@ -250,32 +298,40 @@ static int processBulkItem(redisReader *r) {
     char *p, *s;
     long len;
     unsigned long bytelen;
+    int success = 0;
 
     p = r->buf+r->pos;
-    s = seekNewline(p);
+    s = seekNewline(p,r->len-r->pos);
     if (s != NULL) {
         p = r->buf+r->pos;
         bytelen = s-(r->buf+r->pos)+2; /* include \r\n */
-        len = strtol(p,NULL,10);
+        len = readLongLong(p);
 
         if (len < 0) {
             /* The nil object can always be created. */
-            obj = r->fn ? r->fn->createNil(cur) :
-                (void*)REDIS_REPLY_NIL;
+            if (r->fn && r->fn->createNil)
+                obj = r->fn->createNil(cur);
+            else
+                obj = (void*)REDIS_REPLY_NIL;
+            success = 1;
         } else {
             /* Only continue when the buffer contains the entire bulk item. */
             bytelen += len+2; /* include \r\n */
-            if (r->pos+bytelen <= sdslen(r->buf)) {
-                obj = r->fn ? r->fn->createString(cur,s+2,len) :
-                    (void*)REDIS_REPLY_STRING;
+            if (r->pos+bytelen <= r->len) {
+                if (r->fn && r->fn->createString)
+                    obj = r->fn->createString(cur,s+2,len);
+                else
+                    obj = (void*)REDIS_REPLY_STRING;
+                success = 1;
             }
         }
 
         /* Proceed when obj was created. */
-        if (obj != NULL) {
+        if (success) {
             r->pos += bytelen;
-            if (r->reply == NULL)
-                r->reply = obj;
+
+            /* Set reply if this is the root object. */
+            if (r->ridx == 0) r->reply = obj;
             moveToNextTask(r);
             return 0;
         }
@@ -288,33 +344,49 @@ static int processMultiBulkItem(redisReader *r) {
     void *obj;
     char *p;
     long elements;
+    int root = 0;
+
+    /* Set error for nested multi bulks with depth > 1 */
+    if (r->ridx == 8) {
+        redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
+            "No support for nested multi bulk replies with depth > 7"));
+        return -1;
+    }
 
     if ((p = readLine(r,NULL)) != NULL) {
-        elements = strtol(p,NULL,10);
+        elements = readLongLong(p);
+        root = (r->ridx == 0);
+
         if (elements == -1) {
-            obj = r->fn ? r->fn->createNil(cur) :
-                (void*)REDIS_REPLY_NIL;
+            if (r->fn && r->fn->createNil)
+                obj = r->fn->createNil(cur);
+            else
+                obj = (void*)REDIS_REPLY_NIL;
             moveToNextTask(r);
         } else {
-            obj = r->fn ? r->fn->createArray(cur,elements) :
-                (void*)REDIS_REPLY_ARRAY;
+            if (r->fn && r->fn->createArray)
+                obj = r->fn->createArray(cur,elements);
+            else
+                obj = (void*)REDIS_REPLY_ARRAY;
 
             /* Modify task stack when there are more than 0 elements. */
             if (elements > 0) {
                 cur->elements = elements;
+                cur->obj = obj;
                 r->ridx++;
                 r->rstack[r->ridx].type = -1;
                 r->rstack[r->ridx].elements = -1;
-                r->rstack[r->ridx].parent = obj;
                 r->rstack[r->ridx].idx = 0;
+                r->rstack[r->ridx].obj = NULL;
+                r->rstack[r->ridx].parent = cur;
+                r->rstack[r->ridx].privdata = r->privdata;
             } else {
                 moveToNextTask(r);
             }
         }
 
-        /* Object was created, so we can always continue. */
-        if (r->reply == NULL)
-            r->reply = obj;
+        /* Set reply if this is the root object. */
+        if (root) r->reply = obj;
         return 0;
     }
     return -1;
@@ -347,7 +419,7 @@ static int processItem(redisReader *r) {
             default:
                 byte = sdscatrepr(sdsempty(),p,1);
                 redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
-                    "protocol error, got %s as reply type byte", byte));
+                    "Protocol error, got %s as reply type byte", byte));
                 sdsfree(byte);
                 return -1;
             }
@@ -368,13 +440,12 @@ static int processItem(redisReader *r) {
     case REDIS_REPLY_ARRAY:
         return processMultiBulkItem(r);
     default:
-        redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
-            "unknown item type '%d'", cur->type));
+        assert(NULL);
         return -1;
     }
 }
 
-void *redisReplyReaderCreate() {
+void *redisReplyReaderCreate(void) {
     redisReader *r = calloc(sizeof(redisReader),1);
     r->error = NULL;
     r->fn = &defaultFunctions;
@@ -394,6 +465,17 @@ int redisReplyReaderSetReplyObjectFunctions(void *reader, redisReplyObjectFuncti
     return REDIS_ERR;
 }
 
+/* Set the private data field that is used in the read tasks. This argument can
+ * be used to curry arbitrary data to the custom reply object functions. */
+int redisReplyReaderSetPrivdata(void *reader, void *privdata) {
+    redisReader *r = reader;
+    if (r->reply == NULL) {
+        r->privdata = privdata;
+        return REDIS_OK;
+    }
+    return REDIS_ERR;
+}
+
 /* External libraries wrapping hiredis might need access to the temporary
  * variable while the reply is built up. When the reader contains an
  * object in between receiving some bytes to parse, this object might
@@ -422,7 +504,7 @@ static void redisSetReplyReaderError(redisReader *r, sds err) {
     if (r->buf != NULL) {
         sdsfree(r->buf);
         r->buf = sdsempty();
-        r->pos = 0;
+        r->pos = r->len = 0;
     }
     r->ridx = -1;
     r->error = err;
@@ -433,12 +515,21 @@ char *redisReplyReaderGetError(void *reader) {
     return r->error;
 }
 
-void redisReplyReaderFeed(void *reader, char *buf, size_t len) {
+void redisReplyReaderFeed(void *reader, const char *buf, size_t len) {
     redisReader *r = reader;
 
     /* Copy the provided buffer. */
-    if (buf != NULL && len >= 1)
+    if (buf != NULL && len >= 1) {
+        /* Destroy internal buffer when it is empty and is quite large. */
+        if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
+            sdsfree(r->buf);
+            r->buf = sdsempty();
+            r->pos = 0;
+        }
+
         r->buf = sdscatlen(r->buf,buf,len);
+        r->len = sdslen(r->buf);
+    }
 }
 
 int redisReplyReaderGetReply(void *reader, void **reply) {
@@ -446,15 +537,17 @@ int redisReplyReaderGetReply(void *reader, void **reply) {
     if (reply != NULL) *reply = NULL;
 
     /* When the buffer is empty, there will never be a reply. */
-    if (sdslen(r->buf) == 0)
+    if (r->len == 0)
         return REDIS_OK;
 
     /* Set first item to process when the stack is empty. */
     if (r->ridx == -1) {
         r->rstack[0].type = -1;
         r->rstack[0].elements = -1;
-        r->rstack[0].parent = NULL;
         r->rstack[0].idx = -1;
+        r->rstack[0].obj = NULL;
+        r->rstack[0].parent = NULL;
+        r->rstack[0].privdata = r->privdata;
         r->ridx = 0;
     }
 
@@ -463,16 +556,12 @@ int redisReplyReaderGetReply(void *reader, void **reply) {
         if (processItem(r) < 0)
             break;
 
-    /* Discard the consumed part of the buffer. */
-    if (r->pos > 0) {
-        if (r->pos == sdslen(r->buf)) {
-            /* sdsrange has a quirck on this edge case. */
-            sdsfree(r->buf);
-            r->buf = sdsempty();
-        } else {
-            r->buf = sdsrange(r->buf,r->pos,sdslen(r->buf));
-        }
+    /* Discard part of the buffer when we've consumed at least 1k, to avoid
+     * doing unnecessary calls to memmove() in sds.c. */
+    if (r->pos >= 1024) {
+        r->buf = sdsrange(r->buf,r->pos,-1);
         r->pos = 0;
+        r->len = sdslen(r->buf);
     }
 
     /* Emit a reply when there is one. */
@@ -480,13 +569,6 @@ int redisReplyReaderGetReply(void *reader, void **reply) {
         void *aux = r->reply;
         r->reply = NULL;
 
-        /* Destroy the buffer when it is empty and is quite large. */
-        if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) {
-            sdsfree(r->buf);
-            r->buf = sdsempty();
-            r->pos = 0;
-        }
-
         /* Check if there actually *is* a reply. */
         if (r->error != NULL) {
             return REDIS_ERR;
@@ -525,6 +607,7 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
     char *cmd = NULL; /* final command */
     int pos; /* position in final command */
     sds current; /* current argument */
+    int touched = 0; /* was the current argument touched? */
     char **argv = NULL;
     int argc = 0, j;
     int totlen = 0;
@@ -538,35 +621,93 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) {
     while(*c != '\0') {
         if (*c != '%' || c[1] == '\0') {
             if (*c == ' ') {
-                if (sdslen(current) != 0) {
+                if (touched) {
                     addArgument(current, &argv, &argc, &totlen);
                     current = sdsempty();
+                    touched = 0;
                 }
             } else {
                 current = sdscatlen(current,c,1);
+                touched = 1;
             }
         } else {
             switch(c[1]) {
             case 's':
                 arg = va_arg(ap,char*);
-                current = sdscat(current,arg);
+                size = strlen(arg);
+                if (size > 0)
+                    current = sdscatlen(current,arg,size);
                 break;
             case 'b':
                 arg = va_arg(ap,char*);
                 size = va_arg(ap,size_t);
-                current = sdscatlen(current,arg,size);
+                if (size > 0)
+                    current = sdscatlen(current,arg,size);
                 break;
             case '%':
-                cmd = sdscat(cmd,"%");
+                current = sdscat(current,"%");
                 break;
+            default:
+                /* Try to detect printf format */
+                {
+                    char _format[16];
+                    const char *_p = c+1;
+                    size_t _l = 0;
+                    va_list _cpy;
+
+                    /* Flags */
+                    if (*_p != '\0' && *_p == '#') _p++;
+                    if (*_p != '\0' && *_p == '0') _p++;
+                    if (*_p != '\0' && *_p == '-') _p++;
+                    if (*_p != '\0' && *_p == ' ') _p++;
+                    if (*_p != '\0' && *_p == '+') _p++;
+
+                    /* Field width */
+                    while (*_p != '\0' && isdigit(*_p)) _p++;
+
+                    /* Precision */
+                    if (*_p == '.') {
+                        _p++;
+                        while (*_p != '\0' && isdigit(*_p)) _p++;
+                    }
+
+                    /* Modifiers */
+                    if (*_p != '\0') {
+                        if (*_p == 'h' || *_p == 'l') {
+                            /* Allow a single repetition for these modifiers */
+                            if (_p[0] == _p[1]) _p++;
+                            _p++;
+                        }
+                    }
+
+                    /* Conversion specifier */
+                    if (*_p != '\0' && strchr("diouxXeEfFgGaA",*_p) != NULL) {
+                        _l = (_p+1)-c;
+                        if (_l < sizeof(_format)-2) {
+                            memcpy(_format,c,_l);
+                            _format[_l] = '\0';
+                            va_copy(_cpy,ap);
+                            current = sdscatvprintf(current,_format,_cpy);
+                            va_end(_cpy);
+
+                            /* Update current position (note: outer blocks
+                             * increment c twice so compensate here) */
+                            c = _p-1;
+                        }
+                    }
+
+                    /* Consume and discard vararg */
+                    va_arg(ap,void);
+                }
             }
+            touched = 1;
             c++;
         }
         c++;
     }
 
     /* Add the last argument if needed */
-    if (sdslen(current) != 0) {
+    if (touched) {
         addArgument(current, &argv, &argc, &totlen);
     } else {
         sdsfree(current);
@@ -662,9 +803,8 @@ void __redisSetError(redisContext *c, int type, const sds errstr) {
     }
 }
 
-static redisContext *redisContextInit() {
+static redisContext *redisContextInit(void) {
     redisContext *c = calloc(sizeof(redisContext),1);
-    c->fd = -1; /* quick fix for a bug that should be addressed differently */
     c->err = 0;
     c->errstr = NULL;
     c->obuf = sdsempty();
@@ -674,8 +814,7 @@ static redisContext *redisContextInit() {
 }
 
 void redisFree(redisContext *c) {
-    /* Disconnect before free'ing if not yet disconnected. */
-    if (c->flags & REDIS_CONNECTED)
+    if (c->fd > 0)
         close(c->fd);
     if (c->errstr != NULL)
         sdsfree(c->errstr);
@@ -692,35 +831,52 @@ void redisFree(redisContext *c) {
 redisContext *redisConnect(const char *ip, int port) {
     redisContext *c = redisContextInit();
     c->flags |= REDIS_BLOCK;
-    c->flags |= REDIS_CONNECTED;
-    redisContextConnectTcp(c,ip,port);
+    redisContextConnectTcp(c,ip,port,NULL);
+    return c;
+}
+
+redisContext *redisConnectWithTimeout(const char *ip, int port, struct timeval tv) {
+    redisContext *c = redisContextInit();
+    c->flags |= REDIS_BLOCK;
+    redisContextConnectTcp(c,ip,port,&tv);
     return c;
 }
 
 redisContext *redisConnectNonBlock(const char *ip, int port) {
     redisContext *c = redisContextInit();
     c->flags &= ~REDIS_BLOCK;
-    c->flags |= REDIS_CONNECTED;
-    redisContextConnectTcp(c,ip,port);
+    redisContextConnectTcp(c,ip,port,NULL);
     return c;
 }
 
 redisContext *redisConnectUnix(const char *path) {
     redisContext *c = redisContextInit();
     c->flags |= REDIS_BLOCK;
-    c->flags |= REDIS_CONNECTED;
-    redisContextConnectUnix(c,path);
+    redisContextConnectUnix(c,path,NULL);
+    return c;
+}
+
+redisContext *redisConnectUnixWithTimeout(const char *path, struct timeval tv) {
+    redisContext *c = redisContextInit();
+    c->flags |= REDIS_BLOCK;
+    redisContextConnectUnix(c,path,&tv);
     return c;
 }
 
 redisContext *redisConnectUnixNonBlock(const char *path) {
     redisContext *c = redisContextInit();
     c->flags &= ~REDIS_BLOCK;
-    c->flags |= REDIS_CONNECTED;
-    redisContextConnectUnix(c,path);
+    redisContextConnectUnix(c,path,NULL);
     return c;
 }
 
+/* Set read/write timeout on a blocking socket. */
+int redisSetTimeout(redisContext *c, struct timeval tv) {
+    if (c->flags & REDIS_BLOCK)
+        return redisContextSetTimeout(c,tv);
+    return REDIS_ERR;
+}
+
 /* Set the replyObjectFunctions to use. Returns REDIS_ERR when the reader
  * was already initialized and the function set could not be re-set.
  * Return REDIS_OK when they could be set. */
@@ -748,7 +904,7 @@ int redisBufferRead(redisContext *c) {
     char buf[2048];
     int nread = read(c->fd,buf,sizeof(buf));
     if (nread == -1) {
-        if (errno == EAGAIN) {
+        if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) {
             /* Try again later */
         } else {
             __redisSetError(c,REDIS_ERR_IO,NULL);
@@ -779,7 +935,7 @@ int redisBufferWrite(redisContext *c, int *done) {
     if (sdslen(c->obuf) > 0) {
         nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
         if (nwritten == -1) {
-            if (errno == EAGAIN) {
+            if (errno == EAGAIN && !(c->flags & REDIS_BLOCK)) {
                 /* Try again later */
             } else {
                 __redisSetError(c,REDIS_ERR_IO,NULL);