]> git.saurik.com Git - redis.git/commitdiff
Merge remote branch 'pietern/networking-perf'
authorantirez <antirez@gmail.com>
Thu, 16 Sep 2010 09:38:40 +0000 (11:38 +0200)
committerantirez <antirez@gmail.com>
Thu, 16 Sep 2010 09:38:40 +0000 (11:38 +0200)
19 files changed:
src/aof.c
src/config.c
src/db.c
src/debug.c
src/multi.c
src/networking.c
src/object.c
src/redis-benchmark.c
src/redis.c
src/redis.h
src/replication.c
src/sds.c
src/sds.h
src/sort.c
src/t_hash.c
src/t_list.c
src/t_set.c
src/t_string.c
src/t_zset.c

index 25febb91fd3743ed744924b3556550d4f8c0df83..167134818ac3292c8095a205378fa9e15bf110da 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -189,6 +189,7 @@ struct redisClient *createFakeClient(void) {
     c->querybuf = sdsempty();
     c->argc = 0;
     c->argv = NULL;
+    c->bufpos = 0;
     c->flags = 0;
     /* We set the fake client as a slave waiting for the synchronization
      * so that Redis will not try to send replies to this client. */
@@ -272,12 +273,14 @@ int loadAppendOnlyFile(char *filename) {
         fakeClient->argc = argc;
         fakeClient->argv = argv;
         cmd->proc(fakeClient);
-        /* Discard the reply objects list from the fake client */
-        while(listLength(fakeClient->reply))
-            listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
+
+        /* The fake client should not have a reply */
+        redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
+
         /* Clean up, ready for the next command */
         for (j = 0; j < argc; j++) decrRefCount(argv[j]);
         zfree(argv);
+
         /* Handle swapping while loading big datasets when VM is on */
         force_swapout = 0;
         if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
@@ -629,12 +632,11 @@ int rewriteAppendOnlyFileBackground(void) {
 
 void bgrewriteaofCommand(redisClient *c) {
     if (server.bgrewritechildpid != -1) {
-        addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
+        addReplyError(c,"Background append only file rewriting already in progress");
         return;
     }
     if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
-        char *status = "+Background append only file rewriting started\r\n";
-        addReplySds(c,sdsnew(status));
+        addReplyStatus(c,"Background append only file rewriting started");
     } else {
         addReply(c,shared.err);
     }
index e1b743dbf41ace2c6b41a9b10ef1cd65aac0455b..8a5ad6c2bc8cd6d0671b2194dad057d23ae7ac7f 100644 (file)
@@ -270,8 +270,8 @@ void configSetCommand(redisClient *c) {
                 stopAppendOnly();
             } else {
                 if (startAppendOnly() == REDIS_ERR) {
-                    addReplySds(c,sdscatprintf(sdsempty(),
-                        "-ERR Unable to turn on AOF. Check server logs.\r\n"));
+                    addReplyError(c,
+                        "Unable to turn on AOF. Check server logs.");
                     decrRefCount(o);
                     return;
                 }
@@ -312,9 +312,8 @@ void configSetCommand(redisClient *c) {
         }
         sdsfreesplitres(v,vlen);
     } else {
-        addReplySds(c,sdscatprintf(sdsempty(),
-            "-ERR not supported CONFIG parameter %s\r\n",
-            (char*)c->argv[2]->ptr));
+        addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
+            (char*)c->argv[2]->ptr);
         decrRefCount(o);
         return;
     }
@@ -323,22 +322,18 @@ void configSetCommand(redisClient *c) {
     return;
 
 badfmt: /* Bad format errors */
-    addReplySds(c,sdscatprintf(sdsempty(),
-        "-ERR invalid argument '%s' for CONFIG SET '%s'\r\n",
+    addReplyErrorFormat(c,"Invalid argument '%s' for CONFIG SET '%s'",
             (char*)o->ptr,
-            (char*)c->argv[2]->ptr));
+            (char*)c->argv[2]->ptr);
     decrRefCount(o);
 }
 
 void configGetCommand(redisClient *c) {
     robj *o = getDecodedObject(c->argv[2]);
-    robj *lenobj = createObject(REDIS_STRING,NULL);
+    void *replylen = addDeferredMultiBulkLength(c);
     char *pattern = o->ptr;
     int matches = 0;
 
-    addReply(c,lenobj);
-    decrRefCount(lenobj);
-
     if (stringmatch(pattern,"dbfilename",0)) {
         addReplyBulkCString(c,"dbfilename");
         addReplyBulkCString(c,server.dbfilename);
@@ -410,7 +405,7 @@ void configGetCommand(redisClient *c) {
         matches++;
     }
     decrRefCount(o);
-    lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2);
+    setDeferredMultiBulkLength(c,replylen,matches*2);
 }
 
 void configCommand(redisClient *c) {
@@ -428,13 +423,12 @@ void configCommand(redisClient *c) {
         server.stat_starttime = time(NULL);
         addReply(c,shared.ok);
     } else {
-        addReplySds(c,sdscatprintf(sdsempty(),
-            "-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n"));
+        addReplyError(c,
+            "CONFIG subcommand must be one of GET, SET, RESETSTAT");
     }
     return;
 
 badarity:
-    addReplySds(c,sdscatprintf(sdsempty(),
-        "-ERR Wrong number of arguments for CONFIG %s\r\n",
-        (char*) c->argv[1]->ptr));
+    addReplyErrorFormat(c,"Wrong number of arguments for CONFIG %s",
+        (char*) c->argv[1]->ptr);
 }
index afca3cddd12eb2b9a3f5d3df6aa684c389b16d97..470310a304c279536efbf6ddc313a9a3eea05714 100644 (file)
--- a/src/db.c
+++ b/src/db.c
@@ -204,7 +204,7 @@ void selectCommand(redisClient *c) {
     int id = atoi(c->argv[1]->ptr);
 
     if (selectDb(c,id) == REDIS_ERR) {
-        addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
+        addReplyError(c,"invalid DB index");
     } else {
         addReply(c,shared.ok);
     }
@@ -228,11 +228,9 @@ void keysCommand(redisClient *c) {
     sds pattern = c->argv[1]->ptr;
     int plen = sdslen(pattern), allkeys;
     unsigned long numkeys = 0;
-    robj *lenobj = createObject(REDIS_STRING,NULL);
+    void *replylen = addDeferredMultiBulkLength(c);
 
     di = dictGetIterator(c->db->dict);
-    addReply(c,lenobj);
-    decrRefCount(lenobj);
     allkeys = (pattern[0] == '*' && pattern[1] == '\0');
     while((de = dictNext(di)) != NULL) {
         sds key = dictGetEntryKey(de);
@@ -248,17 +246,15 @@ void keysCommand(redisClient *c) {
         }
     }
     dictReleaseIterator(di);
-    lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",numkeys);
+    setDeferredMultiBulkLength(c,replylen,numkeys);
 }
 
 void dbsizeCommand(redisClient *c) {
-    addReplySds(c,
-        sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
+    addReplyLongLong(c,dictSize(c->db->dict));
 }
 
 void lastsaveCommand(redisClient *c) {
-    addReplySds(c,
-        sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
+    addReplyLongLong(c,server.lastsave);
 }
 
 void typeCommand(redisClient *c) {
@@ -267,24 +263,23 @@ void typeCommand(redisClient *c) {
 
     o = lookupKeyRead(c->db,c->argv[1]);
     if (o == NULL) {
-        type = "+none";
+        type = "none";
     } else {
         switch(o->type) {
-        case REDIS_STRING: type = "+string"; break;
-        case REDIS_LIST: type = "+list"; break;
-        case REDIS_SET: type = "+set"; break;
-        case REDIS_ZSET: type = "+zset"; break;
-        case REDIS_HASH: type = "+hash"; break;
-        default: type = "+unknown"; break;
+        case REDIS_STRING: type = "string"; break;
+        case REDIS_LIST: type = "list"; break;
+        case REDIS_SET: type = "set"; break;
+        case REDIS_ZSET: type = "zset"; break;
+        case REDIS_HASH: type = "hash"; break;
+        default: type = "unknown"; break;
         }
     }
-    addReplySds(c,sdsnew(type));
-    addReply(c,shared.crlf);
+    addReplyStatus(c,type);
 }
 
 void saveCommand(redisClient *c) {
     if (server.bgsavechildpid != -1) {
-        addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
+        addReplyError(c,"Background save already in progress");
         return;
     }
     if (rdbSave(server.dbfilename) == REDIS_OK) {
@@ -296,12 +291,11 @@ void saveCommand(redisClient *c) {
 
 void bgsaveCommand(redisClient *c) {
     if (server.bgsavechildpid != -1) {
-        addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
+        addReplyError(c,"Background save already in progress");
         return;
     }
     if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
-        char *status = "+Background saving started\r\n";
-        addReplySds(c,sdsnew(status));
+        addReplyStatus(c,"Background saving started");
     } else {
         addReply(c,shared.err);
     }
@@ -310,7 +304,7 @@ void bgsaveCommand(redisClient *c) {
 void shutdownCommand(redisClient *c) {
     if (prepareForShutdown() == REDIS_OK)
         exit(0);
-    addReplySds(c, sdsnew("-ERR Errors trying to SHUTDOWN. Check logs.\r\n"));
+    addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
 }
 
 void renameGenericCommand(redisClient *c, int nx) {
index 76d18b214964170a6162636f26cd591063f4ff04..2f7ab58f1f4b5926186627adc278ced985d4d327 100644 (file)
@@ -211,18 +211,18 @@ void debugCommand(redisClient *c) {
             char *strenc;
 
             strenc = strEncoding(val->encoding);
-            addReplySds(c,sdscatprintf(sdsempty(),
-                "+Value at:%p refcount:%d "
-                "encoding:%s serializedlength:%lld\r\n",
+            addReplyStatusFormat(c,
+                "Value at:%p refcount:%d "
+                "encoding:%s serializedlength:%lld",
                 (void*)val, val->refcount,
-                strenc, (long long) rdbSavedObjectLen(val,NULL)));
+                strenc, (long long) rdbSavedObjectLen(val,NULL));
         } else {
             vmpointer *vp = (vmpointer*) val;
-            addReplySds(c,sdscatprintf(sdsempty(),
-                "+Value swapped at: page %llu "
-                "using %llu pages\r\n",
+            addReplyStatusFormat(c,
+                "Value swapped at: page %llu "
+                "using %llu pages",
                 (unsigned long long) vp->page,
-                (unsigned long long) vp->usedpages));
+                (unsigned long long) vp->usedpages);
         }
     } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) {
         lookupKeyRead(c->db,c->argv[2]);
@@ -233,7 +233,7 @@ void debugCommand(redisClient *c) {
         vmpointer *vp;
 
         if (!server.vm_enabled) {
-            addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n"));
+            addReplyError(c,"Virtual Memory is disabled");
             return;
         }
         if (!de) {
@@ -243,9 +243,9 @@ void debugCommand(redisClient *c) {
         val = dictGetEntryVal(de);
         /* Swap it */
         if (val->storage != REDIS_VM_MEMORY) {
-            addReplySds(c,sdsnew("-ERR This key is not in memory\r\n"));
+            addReplyError(c,"This key is not in memory");
         } else if (val->refcount != 1) {
-            addReplySds(c,sdsnew("-ERR Object is shared\r\n"));
+            addReplyError(c,"Object is shared");
         } else if ((vp = vmSwapObjectBlocking(val)) != NULL) {
             dictGetEntryVal(de) = vp;
             addReply(c,shared.ok);
@@ -274,18 +274,17 @@ void debugCommand(redisClient *c) {
         addReply(c,shared.ok);
     } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) {
         unsigned char digest[20];
-        sds d = sdsnew("+");
+        sds d = sdsempty();
         int j;
 
         computeDatasetDigest(digest);
         for (j = 0; j < 20; j++)
             d = sdscatprintf(d, "%02x",digest[j]);
-
-        d = sdscatlen(d,"\r\n",2);
-        addReplySds(c,d);
+        addReplyStatus(c,d);
+        sdsfree(d);
     } else {
-        addReplySds(c,sdsnew(
-            "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]\r\n"));
+        addReplyError(c,
+            "Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]");
     }
 }
 
index def1dd67326b546d4e96a7c88ee7ebf53fc06a51..47615eb04ff18c7872964600365580d6dec59ea1 100644 (file)
@@ -42,7 +42,7 @@ void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
 
 void multiCommand(redisClient *c) {
     if (c->flags & REDIS_MULTI) {
-        addReplySds(c,sdsnew("-ERR MULTI calls can not be nested\r\n"));
+        addReplyError(c,"MULTI calls can not be nested");
         return;
     }
     c->flags |= REDIS_MULTI;
@@ -51,7 +51,7 @@ void multiCommand(redisClient *c) {
 
 void discardCommand(redisClient *c) {
     if (!(c->flags & REDIS_MULTI)) {
-        addReplySds(c,sdsnew("-ERR DISCARD without MULTI\r\n"));
+        addReplyError(c,"DISCARD without MULTI");
         return;
     }
 
@@ -82,7 +82,7 @@ void execCommand(redisClient *c) {
     int orig_argc;
 
     if (!(c->flags & REDIS_MULTI)) {
-        addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
+        addReplyError(c,"EXEC without MULTI");
         return;
     }
 
@@ -107,7 +107,7 @@ void execCommand(redisClient *c) {
     unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
     orig_argv = c->argv;
     orig_argc = c->argc;
-    addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
+    addReplyMultiBulkLen(c,c->mstate.count);
     for (j = 0; j < c->mstate.count; j++) {
         c->argc = c->mstate.commands[j].argc;
         c->argv = c->mstate.commands[j].argv;
@@ -251,7 +251,7 @@ void watchCommand(redisClient *c) {
     int j;
 
     if (c->flags & REDIS_MULTI) {
-        addReplySds(c,sdsnew("-ERR WATCH inside MULTI is not allowed\r\n"));
+        addReplyError(c,"WATCH inside MULTI is not allowed");
         return;
     }
     for (j = 1; j < c->argc; j++)
index 104444f090545a1f56166a9dd8c852be8bd51b19..96ce5a99712ec3f21a655cd49cb77af2650b5386 100644 (file)
@@ -1,5 +1,4 @@
 #include "redis.h"
-
 #include <sys/uio.h>
 
 void *dupClientReplyValue(void *o) {
@@ -12,7 +11,12 @@ int listMatchObjects(void *a, void *b) {
 }
 
 redisClient *createClient(int fd) {
-    redisClient *c = zmalloc(sizeof(*c));
+    redisClient *c;
+
+    /* Allocate more space to hold a static write buffer. */
+    c = zmalloc(sizeof(redisClient)+REDIS_REPLY_CHUNK_BYTES);
+    c->buflen = REDIS_REPLY_CHUNK_BYTES;
+    c->bufpos = 0;
 
     anetNonBlock(NULL,fd);
     anetTcpNoDelay(NULL,fd);
@@ -56,70 +60,238 @@ redisClient *createClient(int fd) {
     return c;
 }
 
-void addReply(redisClient *c, robj *obj) {
-    if (listLength(c->reply) == 0 &&
+int _ensureFileEvent(redisClient *c) {
+    if (c->fd <= 0) return REDIS_ERR;
+    if (c->bufpos == 0 && listLength(c->reply) == 0 &&
         (c->replstate == REDIS_REPL_NONE ||
          c->replstate == REDIS_REPL_ONLINE) &&
         aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
-        sendReplyToClient, c) == AE_ERR) return;
+        sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
+    return REDIS_OK;
+}
 
+/* Create a duplicate of the last object in the reply list when
+ * it is not exclusively owned by the reply list. */
+robj *dupLastObjectIfNeeded(list *reply) {
+    robj *new, *cur;
+    listNode *ln;
+    redisAssert(listLength(reply) > 0);
+    ln = listLast(reply);
+    cur = listNodeValue(ln);
+    if (cur->refcount > 1) {
+        new = dupStringObject(cur);
+        decrRefCount(cur);
+        listNodeValue(ln) = new;
+    }
+    return listNodeValue(ln);
+}
+
+int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
+    size_t available = c->buflen-c->bufpos;
+
+    /* If there already are entries in the reply list, we cannot
+     * add anything more to the static buffer. */
+    if (listLength(c->reply) > 0) return REDIS_ERR;
+
+    /* Check that the buffer has enough space available for this string. */
+    if (len > available) return REDIS_ERR;
+
+    memcpy(c->buf+c->bufpos,s,len);
+    c->bufpos+=len;
+    return REDIS_OK;
+}
+
+void _addReplyObjectToList(redisClient *c, robj *o) {
+    robj *tail;
+    if (listLength(c->reply) == 0) {
+        incrRefCount(o);
+        listAddNodeTail(c->reply,o);
+    } else {
+        tail = listNodeValue(listLast(c->reply));
+
+        /* Append to this object when possible. */
+        if (tail->ptr != NULL &&
+            sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
+        {
+            tail = dupLastObjectIfNeeded(c->reply);
+            tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
+        } else {
+            incrRefCount(o);
+            listAddNodeTail(c->reply,o);
+        }
+    }
+}
+
+/* This method takes responsibility over the sds. When it is no longer
+ * needed it will be free'd, otherwise it ends up in a robj. */
+void _addReplySdsToList(redisClient *c, sds s) {
+    robj *tail;
+    if (listLength(c->reply) == 0) {
+        listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
+    } else {
+        tail = listNodeValue(listLast(c->reply));
+
+        /* Append to this object when possible. */
+        if (tail->ptr != NULL &&
+            sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
+        {
+            tail = dupLastObjectIfNeeded(c->reply);
+            tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
+            sdsfree(s);
+        } else {
+            listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
+        }
+    }
+}
+
+void _addReplyStringToList(redisClient *c, char *s, size_t len) {
+    robj *tail;
+    if (listLength(c->reply) == 0) {
+        listAddNodeTail(c->reply,createStringObject(s,len));
+    } else {
+        tail = listNodeValue(listLast(c->reply));
+
+        /* Append to this object when possible. */
+        if (tail->ptr != NULL &&
+            sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
+        {
+            tail = dupLastObjectIfNeeded(c->reply);
+            tail->ptr = sdscatlen(tail->ptr,s,len);
+        } else {
+            listAddNodeTail(c->reply,createStringObject(s,len));
+        }
+    }
+}
+
+void addReply(redisClient *c, robj *obj) {
+    if (_ensureFileEvent(c) != REDIS_OK) return;
     if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
+        /* Returns a new object with refcount 1 */
         obj = dupStringObject(obj);
-        obj->refcount = 0; /* getDecodedObject() will increment the refcount */
+    } else {
+        /* This increments the refcount. */
+        obj = getDecodedObject(obj);
     }
-    listAddNodeTail(c->reply,getDecodedObject(obj));
+    if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
+        _addReplyObjectToList(c,obj);
+    decrRefCount(obj);
 }
 
 void addReplySds(redisClient *c, sds s) {
-    robj *o = createObject(REDIS_STRING,s);
-    addReply(c,o);
-    decrRefCount(o);
+    if (_ensureFileEvent(c) != REDIS_OK) {
+        /* The caller expects the sds to be free'd. */
+        sdsfree(s);
+        return;
+    }
+    if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
+        sdsfree(s);
+    } else {
+        /* This method free's the sds when it is no longer needed. */
+        _addReplySdsToList(c,s);
+    }
 }
 
-void addReplyDouble(redisClient *c, double d) {
-    char buf[128];
+void addReplyString(redisClient *c, char *s, size_t len) {
+    if (_ensureFileEvent(c) != REDIS_OK) return;
+    if (_addReplyToBuffer(c,s,len) != REDIS_OK)
+        _addReplyStringToList(c,s,len);
+}
 
-    snprintf(buf,sizeof(buf),"%.17g",d);
-    addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
-        (unsigned long) strlen(buf),buf));
+void _addReplyError(redisClient *c, char *s, size_t len) {
+    addReplyString(c,"-ERR ",5);
+    addReplyString(c,s,len);
+    addReplyString(c,"\r\n",2);
 }
 
-void addReplyLongLong(redisClient *c, long long ll) {
-    char buf[128];
-    size_t len;
+void addReplyError(redisClient *c, char *err) {
+    _addReplyError(c,err,strlen(err));
+}
 
-    if (ll == 0) {
-        addReply(c,shared.czero);
-        return;
-    } else if (ll == 1) {
-        addReply(c,shared.cone);
-        return;
+void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
+    va_list ap;
+    va_start(ap,fmt);
+    sds s = sdscatvprintf(sdsempty(),fmt,ap);
+    va_end(ap);
+    _addReplyError(c,s,sdslen(s));
+    sdsfree(s);
+}
+
+void _addReplyStatus(redisClient *c, char *s, size_t len) {
+    addReplyString(c,"+",1);
+    addReplyString(c,s,len);
+    addReplyString(c,"\r\n",2);
+}
+
+void addReplyStatus(redisClient *c, char *status) {
+    _addReplyStatus(c,status,strlen(status));
+}
+
+void addReplyStatusFormat(redisClient *c, const char *fmt, ...) {
+    va_list ap;
+    va_start(ap,fmt);
+    sds s = sdscatvprintf(sdsempty(),fmt,ap);
+    va_end(ap);
+    _addReplyStatus(c,s,sdslen(s));
+    sdsfree(s);
+}
+
+/* Adds an empty object to the reply list that will contain the multi bulk
+ * length, which is not known when this function is called. */
+void *addDeferredMultiBulkLength(redisClient *c) {
+    if (_ensureFileEvent(c) != REDIS_OK) return NULL;
+    listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
+    return listLast(c->reply);
+}
+
+/* Populate the length object and try glueing it to the next chunk. */
+void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
+    listNode *ln = (listNode*)node;
+    robj *len, *next;
+
+    /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
+    if (node == NULL) return;
+
+    len = listNodeValue(ln);
+    len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
+    if (ln->next != NULL) {
+        next = listNodeValue(ln->next);
+
+        /* Only glue when the next node is non-NULL (an sds in this case) */
+        if (next->ptr != NULL) {
+            len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
+            listDelNode(c->reply,ln->next);
+        }
     }
-    buf[0] = ':';
+}
+
+void addReplyDouble(redisClient *c, double d) {
+    char dbuf[128], sbuf[128];
+    int dlen, slen;
+    dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
+    slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
+    addReplyString(c,sbuf,slen);
+}
+
+void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
+    char buf[128];
+    int len;
+    buf[0] = prefix;
     len = ll2string(buf+1,sizeof(buf)-1,ll);
     buf[len+1] = '\r';
     buf[len+2] = '\n';
-    addReplySds(c,sdsnewlen(buf,len+3));
+    addReplyString(c,buf,len+3);
 }
 
-void addReplyUlong(redisClient *c, unsigned long ul) {
-    char buf[128];
-    size_t len;
+void addReplyLongLong(redisClient *c, long long ll) {
+    _addReplyLongLong(c,ll,':');
+}
 
-    if (ul == 0) {
-        addReply(c,shared.czero);
-        return;
-    } else if (ul == 1) {
-        addReply(c,shared.cone);
-        return;
-    }
-    len = snprintf(buf,sizeof(buf),":%lu\r\n",ul);
-    addReplySds(c,sdsnewlen(buf,len));
+void addReplyMultiBulkLen(redisClient *c, long length) {
+    _addReplyLongLong(c,length,'*');
 }
 
 void addReplyBulkLen(redisClient *c, robj *obj) {
-    size_t len, intlen;
-    char buf[128];
+    size_t len;
 
     if (obj->encoding == REDIS_ENCODING_RAW) {
         len = sdslen(obj->ptr);
@@ -136,11 +308,7 @@ void addReplyBulkLen(redisClient *c, robj *obj) {
             len++;
         }
     }
-    buf[0] = '$';
-    intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len);
-    buf[intlen+1] = '\r';
-    buf[intlen+2] = '\n';
-    addReplySds(c,sdsnewlen(buf,intlen+3));
+    _addReplyLongLong(c,len,'$');
 }
 
 void addReplyBulk(redisClient *c, robj *obj) {
@@ -290,34 +458,6 @@ void freeClient(redisClient *c) {
     zfree(c);
 }
 
-#define GLUEREPLY_UP_TO (1024)
-static void glueReplyBuffersIfNeeded(redisClient *c) {
-    int copylen = 0;
-    char buf[GLUEREPLY_UP_TO];
-    listNode *ln;
-    listIter li;
-    robj *o;
-
-    listRewind(c->reply,&li);
-    while((ln = listNext(&li))) {
-        int objlen;
-
-        o = ln->value;
-        objlen = sdslen(o->ptr);
-        if (copylen + objlen <= GLUEREPLY_UP_TO) {
-            memcpy(buf+copylen,o->ptr,objlen);
-            copylen += objlen;
-            listDelNode(c->reply,ln);
-        } else {
-            if (copylen == 0) return;
-            break;
-        }
-    }
-    /* Now the output buffer is empty, add the new single element */
-    o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
-    listAddNodeHead(c->reply,o);
-}
-
 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
     redisClient *c = privdata;
     int nwritten = 0, totwritten = 0, objlen;
@@ -334,31 +474,48 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
         return;
     }
 
-    while(listLength(c->reply)) {
-        if (server.glueoutputbuf && listLength(c->reply) > 1)
-            glueReplyBuffersIfNeeded(c);
+    while(c->bufpos > 0 || listLength(c->reply)) {
+        if (c->bufpos > 0) {
+            if (c->flags & REDIS_MASTER) {
+                /* Don't reply to a master */
+                nwritten = c->bufpos - c->sentlen;
+            } else {
+                nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
+                if (nwritten <= 0) break;
+            }
+            c->sentlen += nwritten;
+            totwritten += nwritten;
+
+            /* If the buffer was sent, set bufpos to zero to continue with
+             * the remainder of the reply. */
+            if (c->sentlen == c->bufpos) {
+                c->bufpos = 0;
+                c->sentlen = 0;
+            }
+        } else {
+            o = listNodeValue(listFirst(c->reply));
+            objlen = sdslen(o->ptr);
 
-        o = listNodeValue(listFirst(c->reply));
-        objlen = sdslen(o->ptr);
+            if (objlen == 0) {
+                listDelNode(c->reply,listFirst(c->reply));
+                continue;
+            }
 
-        if (objlen == 0) {
-            listDelNode(c->reply,listFirst(c->reply));
-            continue;
-        }
+            if (c->flags & REDIS_MASTER) {
+                /* Don't reply to a master */
+                nwritten = objlen - c->sentlen;
+            } else {
+                nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
+                if (nwritten <= 0) break;
+            }
+            c->sentlen += nwritten;
+            totwritten += nwritten;
 
-        if (c->flags & REDIS_MASTER) {
-            /* Don't reply to a master */
-            nwritten = objlen - c->sentlen;
-        } else {
-            nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
-            if (nwritten <= 0) break;
-        }
-        c->sentlen += nwritten;
-        totwritten += nwritten;
-        /* If we fully sent the object on head go to the next one */
-        if (c->sentlen == objlen) {
-            listDelNode(c->reply,listFirst(c->reply));
-            c->sentlen = 0;
+            /* If we fully sent the object on head go to the next one */
+            if (c->sentlen == objlen) {
+                listDelNode(c->reply,listFirst(c->reply));
+                c->sentlen = 0;
+            }
         }
         /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
          * bytes, in a single threaded server it's a good idea to serve
index 92af1d6a9e7ac6f2f5919d0c0ff3f51a856f9e55..c1a0824515bfaf12394520bb493efc7363f4396e 100644 (file)
@@ -354,9 +354,9 @@ int getDoubleFromObjectOrReply(redisClient *c, robj *o, double *target, const ch
     double value;
     if (getDoubleFromObject(o, &value) != REDIS_OK) {
         if (msg != NULL) {
-            addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg));
+            addReplyError(c,(char*)msg);
         } else {
-            addReplySds(c, sdsnew("-ERR value is not a double\r\n"));
+            addReplyError(c,"value is not a double");
         }
         return REDIS_ERR;
     }
@@ -393,9 +393,9 @@ int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, con
     long long value;
     if (getLongLongFromObject(o, &value) != REDIS_OK) {
         if (msg != NULL) {
-            addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg));
+            addReplyError(c,(char*)msg);
         } else {
-            addReplySds(c, sdsnew("-ERR value is not an integer or out of range\r\n"));
+            addReplyError(c,"value is not an integer or out of range");
         }
         return REDIS_ERR;
     }
@@ -410,9 +410,9 @@ int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *
     if (getLongLongFromObjectOrReply(c, o, &value, msg) != REDIS_OK) return REDIS_ERR;
     if (value < LONG_MIN || value > LONG_MAX) {
         if (msg != NULL) {
-            addReplySds(c, sdscatprintf(sdsempty(), "-ERR %s\r\n", msg));
+            addReplyError(c,(char*)msg);
         } else {
-            addReplySds(c, sdsnew("-ERR value is out of range\r\n"));
+            addReplyError(c,"value is out of range");
         }
         return REDIS_ERR;
     }
index 123d81180e76a76dca76fe298d81ff66f34dfe1a..297ecc6c5a87ab28cc8f135fcd7842de4c26f4d4 100644 (file)
@@ -75,6 +75,7 @@ static struct config {
     long long start;
     long long totlatency;
     int *latency;
+    char *title;
     list *clients;
     int quiet;
     int loop;
@@ -206,16 +207,27 @@ static void clientDone(client c) {
     }
 }
 
+/* Read a length from the buffer pointed to by *p, store the length in *len,
+ * and return the number of bytes that the cursor advanced. */
+static int readLen(char *p, int *len) {
+    char *tail = strstr(p,"\r\n");
+    if (tail == NULL)
+        return 0;
+    *tail = '\0';
+    *len = atoi(p+1);
+    return tail+2-p;
+}
+
 static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask)
 {
-    char buf[1024];
-    int nread;
+    char buf[1024], *p;
+    int nread, pos=0, len=0;
     client c = privdata;
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(fd);
     REDIS_NOTUSED(mask);
 
-    nread = read(c->fd, buf, 1024);
+    nread = read(c->fd,buf,sizeof(buf));
     if (nread == -1) {
         fprintf(stderr, "Reading from socket: %s\n", strerror(errno));
         freeClient(c);
@@ -228,82 +240,89 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask)
     }
     c->totreceived += nread;
     c->ibuf = sdscatlen(c->ibuf,buf,nread);
+    len = sdslen(c->ibuf);
 
-processdata:
-    /* Are we waiting for the first line of the command of for  sdf 
-     * count in bulk or multi bulk operations? */
     if (c->replytype == REPLY_INT ||
-        c->replytype == REPLY_RETCODE ||
-        (c->replytype == REPLY_BULK && c->readlen == -1) ||
-        (c->replytype == REPLY_MBULK && c->readlen == -1) ||
-        (c->replytype == REPLY_MBULK && c->mbulk == -1)) {
-        char *p;
-
-        /* Check if the first line is complete. This is only true if
-         * there is a newline inside the buffer. */
-        if ((p = strchr(c->ibuf,'\n')) != NULL) {
-            if (c->replytype == REPLY_BULK ||
-                (c->replytype == REPLY_MBULK && c->mbulk != -1))
-            {
-                /* Read the count of a bulk reply (being it a single bulk or
-                 * a multi bulk reply). "$<count>" for the protocol spec. */
-                *p = '\0';
-                *(p-1) = '\0';
-                c->readlen = atoi(c->ibuf+1)+2;
-                // printf("BULK ATOI: %s\n", c->ibuf+1);
-                /* Handle null bulk reply "$-1" */
-                if (c->readlen-2 == -1) {
-                    clientDone(c);
-                    return;
-                }
-                /* Leave all the rest in the input buffer */
-                c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1);
-                /* fall through to reach the point where the code will try
-                 * to check if the bulk reply is complete. */
-            } else if (c->replytype == REPLY_MBULK && c->mbulk == -1) {
-                /* Read the count of a multi bulk reply. That is, how many
-                 * bulk replies we have to read next. "*<count>" protocol. */
-                *p = '\0';
-                *(p-1) = '\0';
-                c->mbulk = atoi(c->ibuf+1);
-                /* Handle null bulk reply "*-1" */
-                if (c->mbulk == -1) {
-                    clientDone(c);
-                    return;
+        c->replytype == REPLY_RETCODE)
+    {
+        /* Check if the first line is complete. This is everything we need
+         * when waiting for an integer or status code reply.*/
+        if ((p = strstr(c->ibuf,"\r\n")) != NULL)
+            goto done;
+    } else if (c->replytype == REPLY_BULK) {
+        int advance = 0;
+        if (c->readlen < 0) {
+            advance = readLen(c->ibuf+pos,&c->readlen);
+            if (advance) {
+                pos += advance;
+                if (c->readlen == -1) {
+                    goto done;
+                } else {
+                    /* include the trailing \r\n */
+                    c->readlen += 2;
                 }
-                // printf("%p) %d elements list\n", c, c->mbulk);
-                /* Leave all the rest in the input buffer */
-                c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1);
-                goto processdata;
             } else {
-                c->ibuf = sdstrim(c->ibuf,"\r\n");
-                clientDone(c);
-                return;
+                goto skip;
             }
         }
-    }
-    /* bulk read, did we read everything? */
-    if (((c->replytype == REPLY_MBULK && c->mbulk != -1) || 
-         (c->replytype == REPLY_BULK)) && c->readlen != -1 &&
-          (unsigned)c->readlen <= sdslen(c->ibuf))
-    {
-        // printf("BULKSTATUS mbulk:%d readlen:%d sdslen:%d\n",
-        //    c->mbulk,c->readlen,sdslen(c->ibuf));
-        if (c->replytype == REPLY_BULK) {
-            clientDone(c);
-        } else if (c->replytype == REPLY_MBULK) {
-            // printf("%p) %d (%d)) ",c, c->mbulk, c->readlen);
-            // fwrite(c->ibuf,c->readlen,1,stdout);
-            // printf("\n");
-            if (--c->mbulk == 0) {
-                clientDone(c);
+
+        int canconsume;
+        if (c->readlen > 0) {
+            canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen;
+            c->readlen -= canconsume;
+            pos += canconsume;
+        }
+
+        if (c->readlen == 0)
+            goto done;
+    } else if (c->replytype == REPLY_MBULK) {
+        int advance = 0;
+        if (c->mbulk == -1) {
+            advance = readLen(c->ibuf+pos,&c->mbulk);
+            if (advance) {
+                pos += advance;
+                if (c->mbulk == -1)
+                    goto done;
+            } else {
+                goto skip;
+            }
+        }
+
+        int canconsume;
+        while(c->mbulk > 0 && pos < len) {
+            if (c->readlen > 0) {
+                canconsume = c->readlen > (len-pos) ? (len-pos) : c->readlen;
+                c->readlen -= canconsume;
+                pos += canconsume;
+                if (c->readlen == 0)
+                    c->mbulk--;
             } else {
-                c->ibuf = sdsrange(c->ibuf,c->readlen,-1);
-                c->readlen = -1;
-                goto processdata;
+                advance = readLen(c->ibuf+pos,&c->readlen);
+                if (advance) {
+                    pos += advance;
+                    if (c->readlen == -1) {
+                        c->mbulk--;
+                        continue;
+                    } else {
+                        /* include the trailing \r\n */
+                        c->readlen += 2;
+                    }
+                } else {
+                    goto skip;
+                }
             }
         }
+
+        if (c->mbulk == 0)
+            goto done;
     }
+
+skip:
+    c->ibuf = sdsrange(c->ibuf,pos,-1);
+    return;
+done:
+    clientDone(c);
+    return;
 }
 
 static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask)
@@ -371,13 +390,13 @@ static void createMissingClients(client c) {
     }
 }
 
-static void showLatencyReport(char *title) {
+static void showLatencyReport(void) {
     int j, seen = 0;
     float perc, reqpersec;
 
     reqpersec = (float)config.donerequests/((float)config.totlatency/1000);
     if (!config.quiet) {
-        printf("====== %s ======\n", title);
+        printf("====== %s ======\n", config.title);
         printf("  %d requests completed in %.2f seconds\n", config.donerequests,
             (float)config.totlatency/1000);
         printf("  %d parallel clients\n", config.numclients);
@@ -393,20 +412,20 @@ static void showLatencyReport(char *title) {
         }
         printf("%.2f requests per second\n\n", reqpersec);
     } else {
-        printf("%s: %.2f requests per second\n", title, reqpersec);
+        printf("%s: %.2f requests per second\n", config.title, reqpersec);
     }
 }
 
-static void prepareForBenchmark(void)
-{
+static void prepareForBenchmark(char *title) {
     memset(config.latency,0,sizeof(int)*(MAX_LATENCY+1));
+    config.title = title;
     config.start = mstime();
     config.donerequests = 0;
 }
 
-static void endBenchmark(char *title) {
+static void endBenchmark(void) {
     config.totlatency = mstime()-config.start;
-    showLatencyReport(title);
+    showLatencyReport();
     freeAllClients();
 }
 
@@ -480,6 +499,18 @@ void parseOptions(int argc, char **argv) {
     }
 }
 
+int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData) {
+    REDIS_NOTUSED(eventLoop);
+    REDIS_NOTUSED(id);
+    REDIS_NOTUSED(clientData);
+
+    float dt = (float)(mstime()-config.start)/1000.0;
+    float rps = (float)config.donerequests/dt;
+    printf("%s: %.2f\r", config.title, rps);
+    fflush(stdout);
+    return 250; /* every 250ms */
+}
+
 int main(int argc, char **argv) {
     client c;
 
@@ -491,6 +522,7 @@ int main(int argc, char **argv) {
     config.requests = 10000;
     config.liveclients = 0;
     config.el = aeCreateEventLoop();
+    aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL);
     config.keepalive = 1;
     config.donerequests = 0;
     config.datasize = 3;
@@ -514,7 +546,7 @@ int main(int argc, char **argv) {
 
     if (config.idlemode) {
         printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients);
-        prepareForBenchmark();
+        prepareForBenchmark("IDLE");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdsempty();
@@ -525,25 +557,25 @@ int main(int argc, char **argv) {
     }
 
     do {
-        prepareForBenchmark();
+        prepareForBenchmark("PING");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"PING\r\n");
         prepareClientForReply(c,REPLY_RETCODE);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("PING");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("PING (multi bulk)");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"*1\r\n$4\r\nPING\r\n");
         prepareClientForReply(c,REPLY_RETCODE);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("PING (multi bulk)");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("SET");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscatprintf(c->obuf,"SET foo_rand000000000000 %d\r\n",config.datasize);
@@ -557,106 +589,106 @@ int main(int argc, char **argv) {
         prepareClientForReply(c,REPLY_RETCODE);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("SET");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("GET");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"GET foo_rand000000000000\r\n");
         prepareClientForReply(c,REPLY_BULK);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("GET");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("INCR");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"INCR counter_rand000000000000\r\n");
         prepareClientForReply(c,REPLY_INT);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("INCR");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("LPUSH");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n");
         prepareClientForReply(c,REPLY_INT);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("LPUSH");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("LPOP");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"LPOP mylist\r\n");
         prepareClientForReply(c,REPLY_BULK);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("LPOP");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("SADD");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"SADD myset 24\r\ncounter_rand000000000000\r\n");
         prepareClientForReply(c,REPLY_RETCODE);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("SADD");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("SPOP");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"SPOP myset\r\n");
         prepareClientForReply(c,REPLY_BULK);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("SPOP");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("LPUSH (again, in order to bench LRANGE)");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n");
         prepareClientForReply(c,REPLY_RETCODE);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("LPUSH (again, in order to bench LRANGE)");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("LRANGE (first 100 elements)");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"LRANGE mylist 0 99\r\n");
         prepareClientForReply(c,REPLY_MBULK);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("LRANGE (first 100 elements)");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("LRANGE (first 300 elements)");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"LRANGE mylist 0 299\r\n");
         prepareClientForReply(c,REPLY_MBULK);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("LRANGE (first 300 elements)");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("LRANGE (first 450 elements)");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"LRANGE mylist 0 449\r\n");
         prepareClientForReply(c,REPLY_MBULK);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("LRANGE (first 450 elements)");
+        endBenchmark();
 
-        prepareForBenchmark();
+        prepareForBenchmark("LRANGE (first 600 elements)");
         c = createClient();
         if (!c) exit(1);
         c->obuf = sdscat(c->obuf,"LRANGE mylist 0 599\r\n");
         prepareClientForReply(c,REPLY_MBULK);
         createMissingClients(c);
         aeMain(config.el);
-        endBenchmark("LRANGE (first 600 elements)");
+        endBenchmark();
 
         printf("\n");
     } while(config.loop);
index b6b425213b48042c2335bc4649b59ba9ac484c7f..66e088b0068a080b52705719c755b416a72da742 100644 (file)
@@ -909,7 +909,7 @@ int processCommand(redisClient *c) {
     } else if (c->multibulk) {
         if (c->bulklen == -1) {
             if (((char*)c->argv[0]->ptr)[0] != '$') {
-                addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
+                addReplyError(c,"multi bulk protocol error");
                 resetClient(c);
                 return 1;
             } else {
@@ -922,7 +922,7 @@ int processCommand(redisClient *c) {
                     bulklen < 0 || bulklen > 1024*1024*1024)
                 {
                     c->argc--;
-                    addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
+                    addReplyError(c,"invalid bulk write count");
                     resetClient(c);
                     return 1;
                 }
@@ -975,17 +975,14 @@ int processCommand(redisClient *c) {
      * such wrong arity, bad command name and so forth. */
     cmd = lookupCommand(c->argv[0]->ptr);
     if (!cmd) {
-        addReplySds(c,
-            sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
-                (char*)c->argv[0]->ptr));
+        addReplyErrorFormat(c,"unknown command '%s'",
+            (char*)c->argv[0]->ptr);
         resetClient(c);
         return 1;
     } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
                (c->argc < -cmd->arity)) {
-        addReplySds(c,
-            sdscatprintf(sdsempty(),
-                "-ERR wrong number of arguments for '%s' command\r\n",
-                cmd->name));
+        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
+            cmd->name);
         resetClient(c);
         return 1;
     } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
@@ -999,7 +996,7 @@ int processCommand(redisClient *c) {
             bulklen < 0 || bulklen > 1024*1024*1024)
         {
             c->argc--;
-            addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
+            addReplyError(c,"invalid bulk write count");
             resetClient(c);
             return 1;
         }
@@ -1026,7 +1023,7 @@ int processCommand(redisClient *c) {
 
     /* Check if the user is authenticated */
     if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
-        addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
+        addReplyError(c,"operation not permitted");
         resetClient(c);
         return 1;
     }
@@ -1035,7 +1032,7 @@ int processCommand(redisClient *c) {
     if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) &&
         zmalloc_used_memory() > server.maxmemory)
     {
-        addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
+        addReplyError(c,"command not allowed when used memory > 'maxmemory'");
         resetClient(c);
         return 1;
     }
@@ -1045,7 +1042,7 @@ int processCommand(redisClient *c) {
         &&
         cmd->proc != subscribeCommand && cmd->proc != unsubscribeCommand &&
         cmd->proc != psubscribeCommand && cmd->proc != punsubscribeCommand) {
-        addReplySds(c,sdsnew("-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context\r\n"));
+        addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
         resetClient(c);
         return 1;
     }
@@ -1109,7 +1106,7 @@ void authCommand(redisClient *c) {
       addReply(c,shared.ok);
     } else {
       c->authenticated = 0;
-      addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
+      addReplyError(c,"invalid password");
     }
 }
 
index 9e27d724854e1788199286f83e9437c81b7df954..1ef5628872b771d0f79e0321658eb110b6c0b806 100644 (file)
@@ -47,6 +47,7 @@
 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
 #define REDIS_SHARED_INTEGERS 10000
+#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
 
 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
 #define REDIS_WRITEV_THRESHOLD      3
@@ -309,6 +310,11 @@ typedef struct redisClient {
     list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
     dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
     list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
+
+    /* Response buffer */
+    int bufpos;
+    int buflen;
+    char buf[];
 } redisClient;
 
 struct saveparam {
@@ -588,6 +594,8 @@ void resetClient(redisClient *c);
 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
 void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
 void addReply(redisClient *c, robj *obj);
+void *addDeferredMultiBulkLength(redisClient *c);
+void setDeferredMultiBulkLength(redisClient *c, void *node, long length);
 void addReplySds(redisClient *c, sds s);
 void processInputBuffer(redisClient *c);
 void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
@@ -597,11 +605,23 @@ void addReplyBulkCString(redisClient *c, char *s);
 void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
 void addReply(redisClient *c, robj *obj);
 void addReplySds(redisClient *c, sds s);
+void addReplyError(redisClient *c, char *err);
+void addReplyStatus(redisClient *c, char *status);
 void addReplyDouble(redisClient *c, double d);
 void addReplyLongLong(redisClient *c, long long ll);
-void addReplyUlong(redisClient *c, unsigned long ul);
+void addReplyMultiBulkLen(redisClient *c, long length);
 void *dupClientReplyValue(void *o);
 
+#ifdef __GNUC__
+void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
+    __attribute__((format(printf, 2, 3)));
+void addReplyStatusFormat(redisClient *c, const char *fmt, ...)
+    __attribute__((format(printf, 2, 3)));
+#else
+void addReplyErrorFormat(redisClient *c, const char *fmt, ...);
+void addReplyStatusFormat(redisClient *c, const char *fmt, ...);
+#endif
+
 /* List data type */
 void listTypeTryConversion(robj *subject, robj *value);
 void listTypePush(robj *subject, robj *value, int where);
index c28460885ecf46c0fd3b4832adffcae254d57f8c..8c629006a14b7b02374ef64f8e6adcb72f664846 100644 (file)
@@ -179,7 +179,7 @@ void syncCommand(redisClient *c) {
     /* Refuse SYNC requests if we are a slave but the link with our master
      * is not ok... */
     if (server.masterhost && server.replstate != REDIS_REPL_CONNECTED) {
-        addReplySds(c,sdsnew("-ERR Can't SYNC while not connected with my master\r\n"));
+        addReplyError(c,"Can't SYNC while not connected with my master");
         return;
     }
 
@@ -188,7 +188,7 @@ void syncCommand(redisClient *c) {
      * buffer registering the differences between the BGSAVE and the current
      * dataset, so that we can copy to other slaves if needed. */
     if (listLength(c->reply) != 0) {
-        addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
+        addReplyError(c,"SYNC is invalid with pending input");
         return;
     }
 
@@ -226,7 +226,7 @@ void syncCommand(redisClient *c) {
         redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
         if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
             redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
-            addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
+            addReplyError(c,"Unable to perform background save");
             return;
         }
         c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
index a0ebb0591ee17ac9c181a5a89db1a920753227b0..2f3ffedcbf1d3ff8915ca033923e3fdbfa7269d2 100644 (file)
--- a/src/sds.c
+++ b/src/sds.c
@@ -33,7 +33,6 @@
 #include "sds.h"
 #include <stdio.h>
 #include <stdlib.h>
-#include <stdarg.h>
 #include <string.h>
 #include <ctype.h>
 #include "zmalloc.h"
@@ -156,8 +155,8 @@ sds sdscpy(sds s, char *t) {
     return sdscpylen(s, t, strlen(t));
 }
 
-sds sdscatprintf(sds s, const char *fmt, ...) {
-    va_list ap;
+sds sdscatvprintf(sds s, const char *fmt, va_list ap) {
+    va_list cpy;
     char *buf, *t;
     size_t buflen = 16;
 
@@ -169,9 +168,8 @@ sds sdscatprintf(sds s, const char *fmt, ...) {
         if (buf == NULL) return NULL;
 #endif
         buf[buflen-2] = '\0';
-        va_start(ap, fmt);
-        vsnprintf(buf, buflen, fmt, ap);
-        va_end(ap);
+        va_copy(cpy,ap);
+        vsnprintf(buf, buflen, fmt, cpy);
         if (buf[buflen-2] != '\0') {
             zfree(buf);
             buflen *= 2;
@@ -184,6 +182,15 @@ sds sdscatprintf(sds s, const char *fmt, ...) {
     return t;
 }
 
+sds sdscatprintf(sds s, const char *fmt, ...) {
+    va_list ap;
+    char *t;
+    va_start(ap, fmt);
+    t = sdscatvprintf(s,fmt,ap);
+    va_end(ap);
+    return t;
+}
+
 sds sdstrim(sds s, const char *cset) {
     struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr)));
     char *start, *end, *sp, *ep;
index a0e224f5ada8cc805478e8f863e4dc9dad5519c6..ae0f84fb5c4ec239698d09b9e167ccfce6dbc946 100644 (file)
--- a/src/sds.h
+++ b/src/sds.h
@@ -32,6 +32,7 @@
 #define __SDS_H
 
 #include <sys/types.h>
+#include <stdarg.h>
 
 typedef char *sds;
 
@@ -53,6 +54,7 @@ sds sdscat(sds s, char *t);
 sds sdscpylen(sds s, char *t, size_t len);
 sds sdscpy(sds s, char *t);
 
+sds sdscatvprintf(sds s, const char *fmt, va_list ap);
 #ifdef __GNUC__
 sds sdscatprintf(sds s, const char *fmt, ...)
     __attribute__((format(printf, 2, 3)));
index aa1ce929399dc64bfdbcccda95f1db3ae1b2c9c6..79f7901054c0971e6c52addfbb126bd53afb1854 100644 (file)
@@ -307,7 +307,7 @@ void sortCommand(redisClient *c) {
     outputlen = getop ? getop*(end-start+1) : end-start+1;
     if (storekey == NULL) {
         /* STORE option not specified, sent the sorting result to client */
-        addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
+        addReplyMultiBulkLen(c,outputlen);
         for (j = start; j <= end; j++) {
             listNode *ln;
             listIter li;
@@ -369,7 +369,7 @@ void sortCommand(redisClient *c) {
          * replaced. */
         server.dirty += 1+outputlen;
         touchWatchedKey(c->db,storekey);
-        addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
+        addReplyLongLong(c,outputlen);
     }
 
     /* Cleanup */
index b6be284fa12d58bce12ccc8808c7a481f20453e2..5cef1cabbcd86addfd6133fcbd2c6261e2750ae5 100644 (file)
@@ -249,7 +249,7 @@ void hmsetCommand(redisClient *c) {
     robj *o;
 
     if ((c->argc % 2) == 1) {
-        addReplySds(c,sdsnew("-ERR wrong number of arguments for HMSET\r\n"));
+        addReplyError(c,"wrong number of arguments for HMSET");
         return;
     }
 
@@ -315,7 +315,7 @@ void hmgetCommand(redisClient *c) {
     /* Note the check for o != NULL happens inside the loop. This is
      * done because objects that cannot be found are considered to be
      * an empty hash. The reply should then be a series of NULLs. */
-    addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-2));
+    addReplyMultiBulkLen(c,c->argc-2);
     for (i = 2; i < c->argc; i++) {
         if (o != NULL && (value = hashTypeGet(o,c->argv[i])) != NULL) {
             addReplyBulk(c,value);
@@ -346,21 +346,19 @@ void hlenCommand(redisClient *c) {
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
         checkType(c,o,REDIS_HASH)) return;
 
-    addReplyUlong(c,hashTypeLength(o));
+    addReplyLongLong(c,hashTypeLength(o));
 }
 
 void genericHgetallCommand(redisClient *c, int flags) {
-    robj *o, *lenobj, *obj;
+    robj *o, *obj;
     unsigned long count = 0;
     hashTypeIterator *hi;
+    void *replylen = NULL;
 
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
         || checkType(c,o,REDIS_HASH)) return;
 
-    lenobj = createObject(REDIS_STRING,NULL);
-    addReply(c,lenobj);
-    decrRefCount(lenobj);
-
+    replylen = addDeferredMultiBulkLength(c);
     hi = hashTypeInitIterator(o);
     while (hashTypeNext(hi) != REDIS_ERR) {
         if (flags & REDIS_HASH_KEY) {
@@ -377,8 +375,7 @@ void genericHgetallCommand(redisClient *c, int flags) {
         }
     }
     hashTypeReleaseIterator(hi);
-
-    lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",count);
+    setDeferredMultiBulkLength(c,replylen,count);
 }
 
 void hkeysCommand(redisClient *c) {
index add1bee167691b1fcc382c1292c728832f6c65bd..41d651f64543d144bc528f312adf2f61d63f7372 100644 (file)
@@ -342,7 +342,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
         server.dirty++;
     }
 
-    addReplyUlong(c,listTypeLength(subject));
+    addReplyLongLong(c,listTypeLength(subject));
 }
 
 void lpushxCommand(redisClient *c) {
@@ -366,7 +366,7 @@ void linsertCommand(redisClient *c) {
 void llenCommand(redisClient *c) {
     robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
     if (o == NULL || checkType(c,o,REDIS_LIST)) return;
-    addReplyUlong(c,listTypeLength(o));
+    addReplyLongLong(c,listTypeLength(o));
 }
 
 void lindexCommand(redisClient *c) {
@@ -494,7 +494,7 @@ void lrangeCommand(redisClient *c) {
     rangelen = (end-start)+1;
 
     /* Return the result in form of a multi-bulk reply */
-    addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
+    addReplyMultiBulkLen(c,rangelen);
     listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL);
     for (j = 0; j < rangelen; j++) {
         redisAssert(listTypeNext(li,&entry));
@@ -594,7 +594,7 @@ void lremCommand(redisClient *c) {
         decrRefCount(obj);
 
     if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
-    addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
+    addReplyLongLong(c,removed);
     if (removed) touchWatchedKey(c->db,c->argv[1]);
 }
 
@@ -772,7 +772,7 @@ int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
     redisAssert(ln != NULL);
     receiver = ln->value;
 
-    addReplySds(receiver,sdsnew("*2\r\n"));
+    addReplyMultiBulkLen(receiver,2);
     addReplyBulk(receiver,key);
     addReplyBulk(receiver,ele);
     unblockClientWaitingData(receiver);
@@ -792,7 +792,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
 
     /* Make sure the timeout is not negative */
     if (lltimeout < 0) {
-        addReplySds(c,sdsnew("-ERR timeout is negative\r\n"));
+        addReplyError(c,"timeout is negative");
         return;
     }
 
@@ -822,7 +822,7 @@ void blockingPopGenericCommand(redisClient *c, int where) {
                      * "real" command will add the last element (the value)
                      * for us. If this souds like an hack to you it's just
                      * because it is... */
-                    addReplySds(c,sdsnew("*2\r\n"));
+                    addReplyMultiBulkLen(c,2);
                     addReplyBulk(c,argv[1]);
                     popGenericCommand(c,where);
 
index 68e132278f96c938e605490e6ed55aff46177ef8..e2ac5ae5378155f329f76d90d7f5c6680e959f05 100644 (file)
@@ -276,7 +276,7 @@ void scardCommand(redisClient *c) {
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
         checkType(c,o,REDIS_SET)) return;
 
-    addReplyUlong(c,setTypeSize(o));
+    addReplyLongLong(c,setTypeSize(o));
 }
 
 void spopCommand(redisClient *c) {
@@ -320,7 +320,8 @@ int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
 void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
     robj **sets = zmalloc(sizeof(robj*)*setnum);
     setTypeIterator *si;
-    robj *ele, *lenobj = NULL, *dstset = NULL;
+    robj *ele, *dstset = NULL;
+    void *replylen = NULL;
     unsigned long j, cardinality = 0;
 
     for (j = 0; j < setnum; j++) {
@@ -356,9 +357,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
      * to the output list and save the pointer to later modify it with the
      * right length */
     if (!dstkey) {
-        lenobj = createObject(REDIS_STRING,NULL);
-        addReply(c,lenobj);
-        decrRefCount(lenobj);
+        replylen = addDeferredMultiBulkLength(c);
     } else {
         /* If we have a target key where to store the resulting set
          * create this key with an empty set inside */
@@ -400,7 +399,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
         touchWatchedKey(c->db,dstkey);
         server.dirty++;
     } else {
-        lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
+        setDeferredMultiBulkLength(c,replylen,cardinality);
     }
     zfree(sets);
 }
@@ -470,7 +469,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
 
     /* Output the content of the resulting set, if not in STORE mode */
     if (!dstkey) {
-        addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
+        addReplyMultiBulkLen(c,cardinality);
         si = setTypeInitIterator(dstset);
         while((ele = setTypeNext(si)) != NULL) {
             addReplyBulk(c,ele);
index 3b8a39bbec7100d20622411c2376eda49565a2fa..509c630a49540163980ef3aba678a6d9efbb9623 100644 (file)
@@ -12,7 +12,7 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir
         if (getLongFromObjectOrReply(c, expire, &seconds, NULL) != REDIS_OK)
             return;
         if (seconds <= 0) {
-            addReplySds(c,sdsnew("-ERR invalid expire time in SETEX\r\n"));
+            addReplyError(c,"invalid expire time in SETEX");
             return;
         }
     }
@@ -79,7 +79,7 @@ void getsetCommand(redisClient *c) {
 void mgetCommand(redisClient *c) {
     int j;
 
-    addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
+    addReplyMultiBulkLen(c,c->argc-1);
     for (j = 1; j < c->argc; j++) {
         robj *o = lookupKeyRead(c->db,c->argv[j]);
         if (o == NULL) {
@@ -98,7 +98,7 @@ void msetGenericCommand(redisClient *c, int nx) {
     int j, busykeys = 0;
 
     if ((c->argc % 2) == 0) {
-        addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
+        addReplyError(c,"wrong number of arguments for MSET");
         return;
     }
     /* Handle the NX flag. The MSETNX semantic is to return zero and don't
@@ -211,7 +211,7 @@ void appendCommand(redisClient *c) {
     }
     touchWatchedKey(c->db,c->argv[1]);
     server.dirty++;
-    addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",(unsigned long)totlen));
+    addReplyLongLong(c,totlen);
 }
 
 void substrCommand(redisClient *c) {
index e93e5c406a5cb2088b134aeea7cbc249995ac83d..d944e92329fca9d8f8f0033216dbf7f0ae631706 100644 (file)
@@ -355,8 +355,7 @@ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, i
             *score = scoreval;
         }
         if (isnan(*score)) {
-            addReplySds(c,
-                sdsnew("-ERR resulting score is not a number (NaN)\r\n"));
+            addReplyError(c,"resulting score is not a number (NaN)");
             zfree(score);
             /* Note that we don't need to check if the zset may be empty and
              * should be removed here, as we can only obtain Nan as score if
@@ -561,7 +560,8 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
     /* expect setnum input keys to be given */
     setnum = atoi(c->argv[2]->ptr);
     if (setnum < 1) {
-        addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE\r\n"));
+        addReplyError(c,
+            "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
         return;
     }
 
@@ -782,8 +782,7 @@ void zrangeGenericCommand(redisClient *c, int reverse) {
     }
 
     /* Return the result in form of a multi-bulk reply */
-    addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
-        withscores ? (rangelen*2) : rangelen));
+    addReplyMultiBulkLen(c,withscores ? (rangelen*2) : rangelen);
     for (j = 0; j < rangelen; j++) {
         ele = ln->obj;
         addReplyBulk(c,ele);
@@ -840,8 +839,7 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) {
     if (c->argc != (4 + withscores) && c->argc != (7 + withscores))
         badsyntax = 1;
     if (badsyntax) {
-        addReplySds(c,
-            sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
+        addReplyError(c,"wrong number of arguments for ZRANGEBYSCORE");
         return;
     }
 
@@ -866,7 +864,8 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) {
             zset *zsetobj = o->ptr;
             zskiplist *zsl = zsetobj->zsl;
             zskiplistNode *ln;
-            robj *ele, *lenobj = NULL;
+            robj *ele;
+            void *replylen = NULL;
             unsigned long rangelen = 0;
 
             /* Get the first node with the score >= min, or with
@@ -884,11 +883,8 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) {
              * are in the list, so we push this object that will represent
              * the multi-bulk length in the output buffer, and will "fix"
              * it later */
-            if (!justcount) {
-                lenobj = createObject(REDIS_STRING,NULL);
-                addReply(c,lenobj);
-                decrRefCount(lenobj);
-            }
+            if (!justcount)
+                replylen = addDeferredMultiBulkLength(c);
 
             while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) {
                 if (offset) {
@@ -910,7 +906,7 @@ void genericZrangebyscoreCommand(redisClient *c, int justcount) {
             if (justcount) {
                 addReplyLongLong(c,(long)rangelen);
             } else {
-                lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",
+                setDeferredMultiBulkLength(c,replylen,
                      withscores ? (rangelen*2) : rangelen);
             }
         }
@@ -933,7 +929,7 @@ void zcardCommand(redisClient *c) {
         checkType(c,o,REDIS_ZSET)) return;
 
     zs = o->ptr;
-    addReplyUlong(c,zs->zsl->length);
+    addReplyLongLong(c,zs->zsl->length);
 }
 
 void zscoreCommand(redisClient *c) {