]> git.saurik.com Git - redis.git/commitdiff
Merge branch 'unstable' of github.com:antirez/redis-private into unstable
authorantirez <antirez@gmail.com>
Fri, 7 Jan 2011 15:43:51 +0000 (16:43 +0100)
committerantirez <antirez@gmail.com>
Fri, 7 Jan 2011 15:43:51 +0000 (16:43 +0100)
redis.conf
src/anet.c
src/config.c
src/diskstore.c
src/dscache.c
src/networking.c
src/rdb.c
src/redis.c
src/redis.h
utils/whatisdoing.sh [new file with mode: 0755]

index 3425a59cdc87c90122b1402a762f53f5f0191a89..150eb6907b64fc6f286c0182798a7482a6c59ac4 100644 (file)
@@ -319,11 +319,6 @@ cache-flush-delay 0
 
 ############################### ADVANCED CONFIG ###############################
 
-# Glue small output buffers together in order to send small replies in a
-# single TCP packet. Uses a bit more CPU but most of the times it is a win
-# in terms of number of queries per second. Use 'yes' if unsure.
-glueoutputbuf yes
-
 # Hashes are encoded in a special way (much more memory efficient) when they
 # have at max a given numer of elements, and the biggest element does not
 # exceed a given threshold. You can configure this limits with the following
index e7763e4c63b64ac54e6290ffbad3e9b4a710b233..4e16f2e4c9d45c88254fb04a4dee94f1c838e435 100644 (file)
@@ -64,11 +64,11 @@ int anetNonBlock(char *err, int fd)
      * Note that fcntl(2) for F_GETFL and F_SETFL can't be
      * interrupted by a signal. */
     if ((flags = fcntl(fd, F_GETFL)) == -1) {
-        anetSetError(err, "fcntl(F_GETFL): %s\n", strerror(errno));
+        anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));
         return ANET_ERR;
     }
     if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
-        anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s\n", strerror(errno));
+        anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
         return ANET_ERR;
     }
     return ANET_OK;
@@ -79,7 +79,7 @@ int anetTcpNoDelay(char *err, int fd)
     int yes = 1;
     if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) == -1)
     {
-        anetSetError(err, "setsockopt TCP_NODELAY: %s\n", strerror(errno));
+        anetSetError(err, "setsockopt TCP_NODELAY: %s", strerror(errno));
         return ANET_ERR;
     }
     return ANET_OK;
@@ -89,7 +89,7 @@ int anetSetSendBuffer(char *err, int fd, int buffsize)
 {
     if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffsize, sizeof(buffsize)) == -1)
     {
-        anetSetError(err, "setsockopt SO_SNDBUF: %s\n", strerror(errno));
+        anetSetError(err, "setsockopt SO_SNDBUF: %s", strerror(errno));
         return ANET_ERR;
     }
     return ANET_OK;
@@ -99,7 +99,7 @@ int anetTcpKeepAlive(char *err, int fd)
 {
     int yes = 1;
     if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)) == -1) {
-        anetSetError(err, "setsockopt SO_KEEPALIVE: %s\n", strerror(errno));
+        anetSetError(err, "setsockopt SO_KEEPALIVE: %s", strerror(errno));
         return ANET_ERR;
     }
     return ANET_OK;
@@ -115,7 +115,7 @@ int anetResolve(char *err, char *host, char *ipbuf)
 
         he = gethostbyname(host);
         if (he == NULL) {
-            anetSetError(err, "can't resolve: %s\n", host);
+            anetSetError(err, "can't resolve: %s", host);
             return ANET_ERR;
         }
         memcpy(&sa.sin_addr, he->h_addr, sizeof(struct in_addr));
@@ -127,14 +127,14 @@ int anetResolve(char *err, char *host, char *ipbuf)
 static int anetCreateSocket(char *err, int domain) {
     int s, on = 1;
     if ((s = socket(domain, SOCK_STREAM, 0)) == -1) {
-        anetSetError(err, "creating socket: %s\n", strerror(errno));
+        anetSetError(err, "creating socket: %s", strerror(errno));
         return ANET_ERR;
     }
 
     /* Make sure connection-intensive things like the redis benckmark
      * will be able to close/open sockets a zillion of times */
     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
-        anetSetError(err, "setsockopt SO_REUSEADDR: %s\n", strerror(errno));
+        anetSetError(err, "setsockopt SO_REUSEADDR: %s", strerror(errno));
         return ANET_ERR;
     }
     return s;
@@ -157,7 +157,7 @@ static int anetTcpGenericConnect(char *err, char *addr, int port, int flags)
 
         he = gethostbyname(addr);
         if (he == NULL) {
-            anetSetError(err, "can't resolve: %s\n", addr);
+            anetSetError(err, "can't resolve: %s", addr);
             close(s);
             return ANET_ERR;
         }
@@ -172,7 +172,7 @@ static int anetTcpGenericConnect(char *err, char *addr, int port, int flags)
             flags & ANET_CONNECT_NONBLOCK)
             return s;
 
-        anetSetError(err, "connect: %s\n", strerror(errno));
+        anetSetError(err, "connect: %s", strerror(errno));
         close(s);
         return ANET_ERR;
     }
@@ -208,7 +208,7 @@ int anetUnixGenericConnect(char *err, char *path, int flags)
             flags & ANET_CONNECT_NONBLOCK)
             return s;
 
-        anetSetError(err, "connect: %s\n", strerror(errno));
+        anetSetError(err, "connect: %s", strerror(errno));
         close(s);
         return ANET_ERR;
     }
@@ -257,12 +257,12 @@ int anetWrite(int fd, char *buf, int count)
 
 static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len) {
     if (bind(s,sa,len) == -1) {
-        anetSetError(err, "bind: %s\n", strerror(errno));
+        anetSetError(err, "bind: %s", strerror(errno));
         close(s);
         return ANET_ERR;
     }
     if (listen(s, 511) == -1) { /* the magic 511 constant is from nginx */
-        anetSetError(err, "listen: %s\n", strerror(errno));
+        anetSetError(err, "listen: %s", strerror(errno));
         close(s);
         return ANET_ERR;
     }
@@ -282,7 +282,7 @@ int anetTcpServer(char *err, int port, char *bindaddr)
     sa.sin_port = htons(port);
     sa.sin_addr.s_addr = htonl(INADDR_ANY);
     if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) {
-        anetSetError(err, "Invalid bind address\n");
+        anetSetError(err, "invalid bind address");
         close(s);
         return ANET_ERR;
     }
@@ -315,7 +315,7 @@ static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *l
             if (errno == EINTR)
                 continue;
             else {
-                anetSetError(err, "accept: %s\n", strerror(errno));
+                anetSetError(err, "accept: %s", strerror(errno));
                 return ANET_ERR;
             }
         }
index 3ba87c738c6b781f638453fd741723bfb70bb79d..219c99ca0936a35c34bb5209aa3b43a9b58ec85b 100644 (file)
@@ -194,10 +194,8 @@ void loadServerConfig(char *filename) {
             if ((server.repl_serve_stale_data = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
             }
-        } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
-            if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
-                err = "argument must be 'yes' or 'no'"; goto loaderr;
-            }
+        } else if (!strcasecmp(argv[0],"glueoutputbuf")) {
+            redisLog(REDIS_WARNING, "Deprecated configuration directive: \"%s\"", argv[0]);
         } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
             if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
index 7250390eb87d752681912ddb181181babee0a00c..26f3af6076927ed051b005e137f2d212a8a781e5 100644 (file)
@@ -348,3 +348,50 @@ void dsFlushDb(int dbid) {
         }
     }
 }
+
+int dsRdbSave(char *filename) {
+    char tmpfile[256];
+    int j, i;
+    time_t now = time(NULL);
+
+    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
+    fp = fopen(tmpfile,"w");
+    if (!fp) {
+        redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
+        return REDIS_ERR;
+    }
+    if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
+
+    /* Scan all diskstore dirs looking for keys */
+    for (j = 0; j < 256; j++) {
+        for (i = 0; i < 256; i++) {
+            snprintf(buf,1024,"%s/%02x/%02x",server.ds_path,j,i);
+
+            /* Write the SELECT DB opcode */
+            if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
+            if (rdbSaveLen(fp,j) == -1) goto werr;
+        }
+    }
+
+    /* Make sure data will not remain on the OS's output buffers */
+    fflush(fp);
+    fsync(fileno(fp));
+    fclose(fp);
+
+    /* Use RENAME to make sure the DB file is changed atomically only
+     * if the generate DB file is ok. */
+    if (rename(tmpfile,filename) == -1) {
+        redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
+        unlink(tmpfile);
+        return REDIS_ERR;
+    }
+    redisLog(REDIS_NOTICE,"DB saved on disk");
+    server.dirty = 0;
+    server.lastsave = time(NULL);
+    return REDIS_OK;
+
+werr:
+    fclose(fp);
+    unlink(tmpfile);
+    redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
+}
index 4ebca7087d218a76aa5a65378120f38daa9d0c66..1adba6f56c90af9ae3c48890394e6be7cb0a8f5a 100644 (file)
@@ -512,7 +512,7 @@ int processActiveIOJobs(int max) {
 
 #if 0
         /* If there are new jobs we need to signal the thread to
-         * process the next one. */
+         * process the next one. FIXME: drop this if useless. */
         redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d",
             listLength(server.io_newjobs),
             listLength(server.io_processing));
@@ -576,7 +576,21 @@ void queueIOJob(iojob *j) {
         spawnIOThread();
 }
 
-void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
+/* Consume all the IO scheduled operations, and all the thread IO jobs
+ * so that eventually the state of diskstore is a point-in-time snapshot.
+ *
+ * This is useful when we need to BGSAVE with diskstore enabled. */
+void cacheForcePointInTime(void) {
+    redisLog(REDIS_NOTICE,"Diskstore: synching on disk to reach point-in-time state.");
+    while (listLength(server.cache_io_queue) != 0) {
+        cacheScheduleIOPushJobs(REDIS_IO_ASAP);
+        processActiveIOJobs(1);
+    }
+    waitEmptyIOJobsQueue();
+    processAllPendingIOJobs();
+}
+
+void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
     iojob *j;
 
     j = zmalloc(sizeof(*j));
@@ -762,7 +776,7 @@ int cacheScheduleIOPushJobs(int flags) {
             op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr);
 
         if (op->type == REDIS_IO_LOAD) {
-            dsCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL);
+            cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL);
         } else {
             /* Lookup the key, in order to put the current value in the IO
              * Job. Otherwise if the key does not exists we schedule a disk
@@ -775,7 +789,7 @@ int cacheScheduleIOPushJobs(int flags) {
                  * the key on disk. */
                 val = NULL;
             }
-            dsCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val);
+            cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val);
         }
         /* Mark the operation as in progress. */
         cacheScheduleIODelFlag(op->db,op->key,op->type);
index cd3e787eb8913e0b68b4f1d70848a04e2e520ffe..6d232ecf7691b0e52ffbaaee3cae3cbdc9a1b95d 100644 (file)
@@ -515,15 +515,6 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(mask);
 
-    /* Use writev() if we have enough buffers to send */
-    if (!server.glueoutputbuf &&
-        listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
-        !(c->flags & REDIS_MASTER))
-    {
-        sendReplyToClientWritev(el, fd, privdata, mask);
-        return;
-    }
-
     while(c->bufpos > 0 || listLength(c->reply)) {
         if (c->bufpos > 0) {
             if (c->flags & REDIS_MASTER) {
@@ -594,84 +585,6 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
     }
 }
 
-void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
-{
-    redisClient *c = privdata;
-    int nwritten = 0, totwritten = 0, objlen, willwrite;
-    robj *o;
-    struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
-    int offset, ion = 0;
-    REDIS_NOTUSED(el);
-    REDIS_NOTUSED(mask);
-
-    listNode *node;
-    while (listLength(c->reply)) {
-        offset = c->sentlen;
-        ion = 0;
-        willwrite = 0;
-
-        /* fill-in the iov[] array */
-        for(node = listFirst(c->reply); node; node = listNextNode(node)) {
-            o = listNodeValue(node);
-            objlen = sdslen(o->ptr);
-
-            if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
-                break;
-
-            if(ion == REDIS_WRITEV_IOVEC_COUNT)
-                break; /* no more iovecs */
-
-            iov[ion].iov_base = ((char*)o->ptr) + offset;
-            iov[ion].iov_len = objlen - offset;
-            willwrite += objlen - offset;
-            offset = 0; /* just for the first item */
-            ion++;
-        }
-
-        if(willwrite == 0)
-            break;
-
-        /* write all collected blocks at once */
-        if((nwritten = writev(fd, iov, ion)) < 0) {
-            if (errno != EAGAIN) {
-                redisLog(REDIS_VERBOSE,
-                         "Error writing to client: %s", strerror(errno));
-                freeClient(c);
-                return;
-            }
-            break;
-        }
-
-        totwritten += nwritten;
-        offset = c->sentlen;
-
-        /* remove written robjs from c->reply */
-        while (nwritten && listLength(c->reply)) {
-            o = listNodeValue(listFirst(c->reply));
-            objlen = sdslen(o->ptr);
-
-            if(nwritten >= objlen - offset) {
-                listDelNode(c->reply, listFirst(c->reply));
-                nwritten -= objlen - offset;
-                c->sentlen = 0;
-            } else {
-                /* partial write */
-                c->sentlen += nwritten;
-                break;
-            }
-            offset = 0;
-        }
-    }
-
-    if (totwritten > 0)
-        c->lastinteraction = time(NULL);
-
-    if (listLength(c->reply) == 0) {
-        c->sentlen = 0;
-        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
-    }
-}
-
 /* resetClient prepare the client to process the next command */
 void resetClient(redisClient *c) {
     freeClientArgv(c);
index 60d0a6ce2606a6199bda66b48521ffd7d5626b18..6b6b6ab643b232d6f13c278216111edcb6569a16 100644 (file)
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -429,8 +429,10 @@ int rdbSave(char *filename) {
     int j;
     time_t now = time(NULL);
 
-    /* FIXME: implement .rdb save for disk store properly */
-    redisAssert(server.ds_enabled == 0);
+    if (server.ds_enabled) {
+        cacheForcePointInTime();
+        return dsRdbSave(filename);
+    }
 
     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
     fp = fopen(tmpfile,"w");
@@ -495,17 +497,22 @@ int rdbSaveBackground(char *filename) {
     pid_t childpid;
 
     if (server.bgsavechildpid != -1) return REDIS_ERR;
-    redisAssert(server.ds_enabled == 0);
+
     server.dirty_before_bgsave = server.dirty;
+
     if ((childpid = fork()) == 0) {
+        int retval;
+
         /* Child */
         if (server.ipfd > 0) close(server.ipfd);
         if (server.sofd > 0) close(server.sofd);
-        if (rdbSave(filename) == REDIS_OK) {
-            _exit(0);
+        if (server.ds_enabled) {
+            cacheForcePointInTime();
+            dsRdbSave(filename);
         } else {
-            _exit(1);
+            rdbSave(filename);
         }
+        _exit((retval == REDIS_OK) ? 0 : 1);
     } else {
         /* Parent */
         if (childpid == -1) {
index 91371a007f73f536925b6d54e85215531ad3f687..c0dac05fe6c8c4789ede397e02b4cd7cb6d3f95d 100644 (file)
@@ -749,7 +749,6 @@ void initServerConfig() {
     server.syslog_enabled = 0;
     server.syslog_ident = zstrdup("redis");
     server.syslog_facility = LOG_LOCAL0;
-    server.glueoutputbuf = 1;
     server.daemonize = 0;
     server.appendonly = 0;
     server.appendfsync = APPENDFSYNC_EVERYSEC;
index 54d82cb6bafdd08b9ac769d61df9c25d82a222b2..495de985916f91f5b55840da33d621a173718df2 100644 (file)
 #define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
 #define REDIS_MAX_LOGMSG_LEN    1024 /* Default maximum length of syslog messages */
 
-/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
-#define REDIS_WRITEV_THRESHOLD      3
-/* Max number of iovecs used for each writev call */
-#define REDIS_WRITEV_IOVEC_COUNT    256
-
 /* Hash table parameters */
 #define REDIS_HT_MINFILL        10      /* Minimal hash table fill 10% */
 
@@ -388,7 +383,6 @@ struct redisServer {
     long long stat_keyspace_misses; /* number of failed lookups of keys */
     /* Configuration */
     int verbosity;
-    int glueoutputbuf;
     int maxidletime;
     int dbnum;
     int daemonize;
@@ -633,7 +627,6 @@ void closeTimedoutClients(void);
 void freeClient(redisClient *c);
 void resetClient(redisClient *c);
 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
-void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
 void addReply(redisClient *c, robj *obj);
 void *addDeferredMultiBulkLength(redisClient *c);
 void setDeferredMultiBulkLength(redisClient *c, void *node, long length);
@@ -820,6 +813,7 @@ void cacheCron(void);
 int cacheKeyMayExist(redisDb *db, robj *key);
 void cacheSetKeyMayExist(redisDb *db, robj *key);
 void cacheSetKeyDoesNotExist(redisDb *db, robj *key);
+void cacheForcePointInTime(void);
 
 /* Set data type */
 robj *setTypeCreate(robj *value);
diff --git a/utils/whatisdoing.sh b/utils/whatisdoing.sh
new file mode 100755 (executable)
index 0000000..8f441cf
--- /dev/null
@@ -0,0 +1,18 @@
+# This script is from http://poormansprofiler.org/
+
+#!/bin/bash
+nsamples=1
+sleeptime=0
+pid=$(pidof redis-server)
+
+for x in $(seq 1 $nsamples)
+  do
+    gdb -ex "set pagination 0" -ex "thread apply all bt" -batch -p $pid
+    sleep $sleeptime
+  done | \
+awk '
+  BEGIN { s = ""; } 
+  /Thread/ { print s; s = ""; } 
+  /^\#/ { if (s != "" ) { s = s "," $4} else { s = $4 } } 
+  END { print s }' | \
+sort | uniq -c | sort -r -n -k 1,1