]> git.saurik.com Git - redis.git/blobdiff - redis.c
redis-benchmark hopefully last bug with multi bulk reply fixed
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index 90063525a199ace4d74137b9007147f77051e16c..df3b413192bc98cda7c2460968fdb7e37eb785ca 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -56,6 +56,7 @@
 #include <fcntl.h>
 #include <sys/time.h>
 #include <sys/resource.h>
+#include <sys/uio.h>
 #include <limits.h>
 #include <math.h>
 
 #define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */
 #define REDIS_EXPIRELOOKUPS_PER_CRON    100 /* try to expire 100 keys/second */
 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
-#define REDIS_REQUEST_MAX_SIZE  (1024*1024*256) /* max bytes in inline command */
+#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
+
+/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
+#define REDIS_WRITEV_THRESHOLD      3
+/* Max number of iovecs used for each writev call */
+#define REDIS_WRITEV_IOVEC_COUNT    256
 
 /* Hash table parameters */
 #define REDIS_HT_MINFILL        10      /* Minimal hash table fill 10% */
@@ -394,6 +400,7 @@ static void processInputBuffer(redisClient *c);
 static zskiplist *zslCreate(void);
 static void zslFree(zskiplist *zsl);
 static void zslInsert(zskiplist *zsl, double score, robj *obj);
+static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
@@ -446,7 +453,7 @@ static void flushdbCommand(redisClient *c);
 static void flushallCommand(redisClient *c);
 static void sortCommand(redisClient *c);
 static void lremCommand(redisClient *c);
-static void lpoppushCommand(redisClient *c);
+static void rpoplpushcommand(redisClient *c);
 static void infoCommand(redisClient *c);
 static void mgetCommand(redisClient *c);
 static void monitorCommand(redisClient *c);
@@ -490,7 +497,7 @@ static struct redisCommand cmdTable[] = {
     {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
     {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
     {"lrem",lremCommand,4,REDIS_CMD_BULK},
-    {"lpoppush",lpoppushCommand,3,REDIS_CMD_BULK},
+    {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
     {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"srem",sremCommand,3,REDIS_CMD_BULK},
     {"smove",smoveCommand,4,REDIS_CMD_BULK},
@@ -906,7 +913,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
     /* Check if a background saving in progress terminated */
     if (server.bgsaveinprogress) {
         int statloc;
-        if (wait4(-1,&statloc,WNOHANG,NULL)) {
+        if (wait3(&statloc,WNOHANG,NULL)) {
             int exitcode = WEXITSTATUS(statloc);
             int bysignal = WIFSIGNALED(statloc);
 
@@ -1342,34 +1349,31 @@ static void freeClient(redisClient *c) {
     zfree(c);
 }
 
+#define GLUEREPLY_UP_TO (1024)
 static void glueReplyBuffersIfNeeded(redisClient *c) {
-    int totlen = 0;
+    int copylen = 0;
+    char buf[GLUEREPLY_UP_TO];
     listNode *ln;
     robj *o;
 
     listRewind(c->reply);
     while((ln = listYield(c->reply))) {
+        int objlen;
+
         o = ln->value;
-        totlen += sdslen(o->ptr);
-        /* This optimization makes more sense if we don't have to copy
-         * too much data */
-        if (totlen > 1024) return;
-    }
-    if (totlen > 0) {
-        char buf[1024];
-        int copylen = 0;
-
-        listRewind(c->reply);
-        while((ln = listYield(c->reply))) {
-            o = ln->value;
-            memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
-            copylen += sdslen(o->ptr);
+        objlen = sdslen(o->ptr);
+        if (copylen + objlen <= GLUEREPLY_UP_TO) {
+            memcpy(buf+copylen,o->ptr,objlen);
+            copylen += objlen;
             listDelNode(c->reply,ln);
+        } else {
+            if (copylen == 0) return;
+            break;
         }
-        /* Now the output buffer is empty, add the new single element */
-        o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
-        listAddNodeTail(c->reply,o);
     }
+    /* Now the output buffer is empty, add the new single element */
+    o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
+    listAddNodeHead(c->reply,o);
 }
 
 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
@@ -1379,9 +1383,22 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(mask);
 
-    if (server.glueoutputbuf && listLength(c->reply) > 1)
-        glueReplyBuffersIfNeeded(c);
+
+    /* Use writev() if we have enough buffers to send */
+    #if 0
+    if (!server.glueoutputbuf &&
+        listLength(c->reply) > REDIS_WRITEV_THRESHOLD && 
+        !(c->flags & REDIS_MASTER))
+    {
+        sendReplyToClientWritev(el, fd, privdata, mask);
+        return;
+    }
+    #endif
+
     while(listLength(c->reply)) {
+        if (server.glueoutputbuf && listLength(c->reply) > 1)
+            glueReplyBuffersIfNeeded(c);
+
         o = listNodeValue(listFirst(c->reply));
         objlen = sdslen(o->ptr);
 
@@ -1428,6 +1445,84 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
     }
 }
 
+static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
+{
+    redisClient *c = privdata;
+    int nwritten = 0, totwritten = 0, objlen, willwrite;
+    robj *o;
+    struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
+    int offset, ion = 0;
+    REDIS_NOTUSED(el);
+    REDIS_NOTUSED(mask);
+
+    listNode *node;
+    while (listLength(c->reply)) {
+        offset = c->sentlen;
+        ion = 0;
+        willwrite = 0;
+
+        /* fill-in the iov[] array */
+        for(node = listFirst(c->reply); node; node = listNextNode(node)) {
+            o = listNodeValue(node);
+            objlen = sdslen(o->ptr);
+
+            if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT) 
+                break;
+
+            if(ion == REDIS_WRITEV_IOVEC_COUNT)
+                break; /* no more iovecs */
+
+            iov[ion].iov_base = ((char*)o->ptr) + offset;
+            iov[ion].iov_len = objlen - offset;
+            willwrite += objlen - offset;
+            offset = 0; /* just for the first item */
+            ion++;
+        }
+
+        if(willwrite == 0)
+            break;
+
+        /* write all collected blocks at once */
+        if((nwritten = writev(fd, iov, ion)) < 0) {
+            if (errno != EAGAIN) {
+                redisLog(REDIS_DEBUG,
+                         "Error writing to client: %s", strerror(errno));
+                freeClient(c);
+                return;
+            }
+            break;
+        }
+
+        totwritten += nwritten;
+        offset = c->sentlen;
+
+        /* remove written robjs from c->reply */
+        while (nwritten && listLength(c->reply)) {
+            o = listNodeValue(listFirst(c->reply));
+            objlen = sdslen(o->ptr);
+
+            if(nwritten >= objlen - offset) {
+                listDelNode(c->reply, listFirst(c->reply));
+                nwritten -= objlen - offset;
+                c->sentlen = 0;
+            } else {
+                /* partial write */
+                c->sentlen += nwritten;
+                break;
+            }
+            offset = 0;
+        }
+    }
+
+    if (totwritten > 0) 
+        c->lastinteraction = time(NULL);
+
+    if (listLength(c->reply) == 0) {
+        c->sentlen = 0;
+        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
+    }
+}
+
 static struct redisCommand *lookupCommand(char *name) {
     int j = 0;
     while(cmdTable[j].name != NULL) {
@@ -3473,7 +3568,7 @@ static void lremCommand(redisClient *c) {
 }
 
 /* This is the semantic of this command:
- *  LPOPPUSH srclist dstlist:
+ *  RPOPLPUSH srclist dstlist:
  *   IF LLEN(srclist) > 0
  *     element = RPOP srclist
  *     LPUSH dstlist element
@@ -3487,7 +3582,7 @@ static void lremCommand(redisClient *c) {
  * since the element is not just returned but pushed against another list
  * as well. This command was originally proposed by Ezra Zygmuntowicz.
  */
-static void lpoppushCommand(redisClient *c) {
+static void rpoplpushcommand(redisClient *c) {
     robj *sobj;
 
     sobj = lookupKeyWrite(c->db,c->argv[1]);
@@ -4372,7 +4467,7 @@ static void zscoreCommand(redisClient *c) {
     
     o = lookupKeyRead(c->db,c->argv[1]);
     if (o == NULL) {
-        addReply(c,shared.czero);
+        addReply(c,shared.nullbulk);
         return;
     } else {
         if (o->type != REDIS_ZSET) {