]> git.saurik.com Git - redis.git/commitdiff
Merge pull request #74 from kmerenkov/issue_620
authorSalvatore Sanfilippo <antirez@gmail.com>
Tue, 20 Sep 2011 08:55:34 +0000 (01:55 -0700)
committerSalvatore Sanfilippo <antirez@gmail.com>
Tue, 20 Sep 2011 08:55:34 +0000 (01:55 -0700)
[issue 620] don't segfault if number of arguments is less than 1

21 files changed:
README
TODO
src/Makefile
src/aof.c
src/bio.c [new file with mode: 0644]
src/bio.h [new file with mode: 0644]
src/config.c
src/networking.c
src/redis-benchmark.c
src/redis-cli.c
src/redis.c
src/redis.h
src/sds.c
src/sds.h
src/t_list.c
tests/integration/aof-race.tcl [new file with mode: 0644]
tests/support/test.tcl
tests/test_helper.tcl
tests/unit/maxmemory.tcl [new file with mode: 0644]
tests/unit/protocol.tcl
tests/unit/type/list.tcl

diff --git a/README b/README
index ad232e11870304583ea213a2386af2c6e47f9e9d..088844b795747c4d75e524df3c975545c338317d 100644 (file)
--- a/README
+++ b/README
@@ -23,7 +23,7 @@ You can run a 32 bit Redis binary using:
 
     % make 32bit
 
-After you build Redis is a good idea to test it, using:
+After you build Redis is a good idea to test it (which require Tcl), using:
 
     % make test
 
diff --git a/TODO b/TODO
index d45b35047becca1f9445bfb73692a67e72fdf582..f24cec23b8d3786a560e41564456948c0af5bb03 100644 (file)
--- a/TODO
+++ b/TODO
@@ -17,6 +17,14 @@ API CHANGES
   the variable number of arguments represent values, and there is no conflict
   with the return value of the command.
 
+2.6
+===
+
+* Everything under the "SCRIPTING" section.
+* Float increments (INCRBYFLOAT).
+* Fix BRPOPLPUSH + vararg LPUSH semantics.
+* AOF everysec fsync in background (either the aof-bg branch or something else).
+
 CLUSTER
 =======
 
@@ -34,7 +42,8 @@ SCRIPTING
 * MULTI/EXEC/...: should we do more than simply ignoring it?
 * Prevent Lua from calling itself with redis("eval",...)
 * SCRIPT FLUSH or alike to start a fresh interpreter?
-* http://redis.io/topics/sponsors
+* Check better the replication handling.
+* Prevent execution of writes if random commands are used.
 
 APPEND ONLY FILE
 ================
@@ -52,42 +61,18 @@ OPTIMIZATIONS
 * Redis big lists as linked lists of small ziplists?
   Possibly a simple heuristic that join near nodes when some node gets smaller than the low_level, and split it into two if gets bigger than high_level.
 
-REPORTING
-=========
-
-* Better INFO output with sections.
-
 RANDOM
 ======
 
+* Server should abort when getcwd() fails if there is some kind of persistence configured. Check this in the cron loop.
 * Clients should be closed as far as the output buffer list is bigger than a given number of elements (configurable in redis.conf)
 * Should the redis default configuration, and the default redis.conf, just bind 127.0.0.1?
 
 KNOWN BUGS
 ==========
 
-* What happens in the following scenario:
-    1) We are reading an AOF file.
-    2) SETEX FOO 5 BAR
-    3) APPEND FOO ZAP
-    What happens if between 1 and 2 for some reason (system under huge load
-    or alike) too many time passes? We should prevent expires while the
-    AOF is loading.
 * #519: Slave may have expired keys that were never read in the master (so a DEL
   is not sent in the replication channel) but are already expired since
   a lot of time. Maybe after a given delay that is undoubltly greater than
   the replication link latency we should expire this key on the slave on
   access?
-
-DISKSTORE TODO
-==============
-
-* Fix FLUSHALL/FLUSHDB: the queue of pending reads/writes should be handled.
-* Check that 00/00 and ff/ff exist at startup, otherwise exit with error.
-* Implement sync flush option, where data is written synchronously on disk when a command is executed.
-* Implement MULTI/EXEC as transaction abstract API to diskstore.c, with transaction_start, transaction_end, and a journal to recover.
-* Stop BGSAVE thread on shutdown and any other condition where the child is killed during normal bgsave.
-* Fix RANDOMKEY to really do something interesting
-* Fix DBSIZE to really do something interesting
-* Add a DEBUG command to check if an entry is or not in memory currently
-* dscache.c near 236, kobj = createStringObject... we could use static obj.
index ee4bfc5001986fa6dee246fcb33b63a43a246273..36bba34c426d68838e42d38469709329fef67a29 100644 (file)
@@ -61,7 +61,7 @@ QUIET_CC = @printf '    %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR
 QUIET_LINK = @printf '    %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR);
 endif
 
-OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endian.o slowlog.o scripting.o
+OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endian.o slowlog.o scripting.o bio.o
 BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
 CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o
 CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
@@ -86,37 +86,35 @@ ae_kqueue.o: ae_kqueue.c
 ae_select.o: ae_select.c
 anet.o: anet.c fmacros.h anet.h
 aof.o: aof.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
+bio.o: bio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h bio.h
 cluster.o: cluster.c redis.h fmacros.h config.h ae.h sds.h dict.h \
-  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
-  slowlog.h
+  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 config.o: config.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 crc16.o: crc16.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 db.o: db.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 debug.o: debug.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h \
-  sha1.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h sha1.h
 dict.o: dict.c fmacros.h dict.h zmalloc.h
 endian.o: endian.c
 intset.o: intset.c intset.h zmalloc.h endian.h
 lzf_c.o: lzf_c.c lzfP.h
 lzf_d.o: lzf_d.c lzfP.h
 multi.o: multi.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 networking.o: networking.c redis.h fmacros.h config.h ae.h sds.h dict.h \
-  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
-  slowlog.h
+  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 object.o: object.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 pqsort.o: pqsort.c
 pubsub.o: pubsub.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 rdb.o: rdb.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h \
-  lzf.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h lzf.h
 redis-benchmark.o: redis-benchmark.c fmacros.h ae.h \
   ../deps/hiredis/hiredis.h sds.h adlist.h zmalloc.h
 redis-check-aof.o: redis-check-aof.c fmacros.h config.h
@@ -125,32 +123,32 @@ redis-cli.o: redis-cli.c fmacros.h version.h ../deps/hiredis/hiredis.h \
   sds.h zmalloc.h ../deps/linenoise/linenoise.h help.h
 redis.o: redis.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
   zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h \
-  asciilogo.h
+  bio.h asciilogo.h
 release.o: release.c release.h
 replication.o: replication.c redis.h fmacros.h config.h ae.h sds.h dict.h \
+  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
+scripting.o: scripting.c redis.h fmacros.h config.h ae.h sds.h dict.h \
   adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
-  slowlog.h
+  sha1.h
 sds.o: sds.c sds.h zmalloc.h
 sha1.o: sha1.c sha1.h config.h
 slowlog.o: slowlog.c redis.h fmacros.h config.h ae.h sds.h dict.h \
   adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
   slowlog.h
 sort.o: sort.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h \
-  pqsort.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h pqsort.h
 syncio.o: syncio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 t_hash.o: t_hash.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 t_list.o: t_list.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 t_set.o: t_set.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 t_string.o: t_string.c redis.h fmacros.h config.h ae.h sds.h dict.h \
-  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
-  slowlog.h
+  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 t_zset.o: t_zset.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
-  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 util.o: util.c fmacros.h util.h
 ziplist.o: ziplist.c zmalloc.h util.h ziplist.h endian.h
 zipmap.o: zipmap.c zmalloc.h endian.h
@@ -169,7 +167,7 @@ dependencies:
 ../deps/jemalloc/lib/libjemalloc.a:
        cd ../deps/jemalloc && ./configure $(JEMALLOC_CFLAGS) --with-jemalloc-prefix=je_ --enable-cc-silence && $(MAKE) lib/libjemalloc.a
 
-redis-server: $(OBJ)
+redis-server: dependencies $(OBJ)
        $(QUIET_LINK)$(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ) $(CCLINK) $(ALLOC_LINK) ../deps/lua/src/liblua.a
 
 redis-benchmark: dependencies $(BENCHOBJ)
index 517b55fbbb18dbccfe9a2fb7924e03e65f7f16d0..8d65428182d6f32dea7749df6981205c85697640 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -1,4 +1,5 @@
 #include "redis.h"
+#include "bio.h"
 
 #include <signal.h>
 #include <fcntl.h>
 
 void aofUpdateCurrentSize(void);
 
+void aof_background_fsync(int fd) {
+    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
+}
+
 /* Called when the user switches from "appendonly yes" to "appendonly no"
  * at runtime using the CONFIG command. */
 void stopAppendOnly(void) {
-    flushAppendOnlyFile();
+    flushAppendOnlyFile(1);
     aof_fsync(server.appendfd);
     close(server.appendfd);
 
@@ -58,63 +63,121 @@ int startAppendOnly(void) {
  * and the only way the client socket can get a write is entering when the
  * the event loop, we accumulate all the AOF writes in a memory
  * buffer and write it on disk using this function just before entering
- * the event loop again. */
-void flushAppendOnlyFile(void) {
-    time_t now;
+ * the event loop again.
+ *
+ * About the 'force' argument:
+ *
+ * When the fsync policy is set to 'everysec' we may delay the flush if there
+ * is still an fsync() going on in the background thread, since for instance
+ * on Linux write(2) will be blocked by the background fsync anyway.
+ * When this happens we remember that there is some aof buffer to be
+ * flushed ASAP, and will try to do that in the serverCron() function.
+ *
+ * However if force is set to 1 we'll write regardless of the background
+ * fsync. */
+void flushAppendOnlyFile(int force) {
     ssize_t nwritten;
+    int sync_in_progress = 0;
 
     if (sdslen(server.aofbuf) == 0) return;
 
+    if (server.appendfsync == APPENDFSYNC_EVERYSEC)
+        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
+
+    if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) {
+        /* With this append fsync policy we do background fsyncing.
+         * If the fsync is still in progress we can try to delay
+         * the write for a couple of seconds. */
+        if (sync_in_progress) {
+            if (server.aof_flush_postponed_start == 0) {
+                /* No previous write postponinig, remember that we are
+                 * postponing the flush and return. */
+                server.aof_flush_postponed_start = server.unixtime;
+                return;
+            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
+                /* We were already waiting for fsync to finish, but for less
+                 * than two seconds this is still ok. Postpone again. */
+                return;
+            }
+            /* Otherwise fall trough, and go write since we can't wait
+             * over two seconds. */
+            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
+        }
+    }
+    /* If you are following this code path, then we are going to write so
+     * set reset the postponed flush sentinel to zero. */
+    server.aof_flush_postponed_start = 0;
+
     /* We want to perform a single write. This should be guaranteed atomic
      * at least if the filesystem we are writing is a real physical one.
      * While this will save us against the server being killed I don't think
      * there is much to do about the whole server stopping for power problems
      * or alike */
-     nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
-     if (nwritten != (signed)sdslen(server.aofbuf)) {
+    nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
+    if (nwritten != (signed)sdslen(server.aofbuf)) {
         /* Ooops, we are in troubles. The best thing to do for now is
          * aborting instead of giving the illusion that everything is
          * working as expected. */
-         if (nwritten == -1) {
+        if (nwritten == -1) {
             redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
-         } else {
+        } else {
             redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
-         }
-         exit(1);
+        }
+        exit(1);
     }
-    sdsfree(server.aofbuf);
-    server.aofbuf = sdsempty();
     server.appendonly_current_size += nwritten;
 
-    /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have
-     * childs performing heavy I/O on disk. */
+    /* Re-use AOF buffer when it is small enough. The maximum comes from the
+     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
+    if ((sdslen(server.aofbuf)+sdsavail(server.aofbuf)) < 4000) {
+        sdsclear(server.aofbuf);
+    } else {
+        sdsfree(server.aofbuf);
+        server.aofbuf = sdsempty();
+    }
+
+    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
+     * children doing I/O in the background. */
     if (server.no_appendfsync_on_rewrite &&
         (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1))
             return;
-    /* Fsync if needed */
-    now = time(NULL);
-    if (server.appendfsync == APPENDFSYNC_ALWAYS ||
-        (server.appendfsync == APPENDFSYNC_EVERYSEC &&
-         now-server.lastfsync > 1))
-    {
+
+    /* Perform the fsync if needed. */
+    if (server.appendfsync == APPENDFSYNC_ALWAYS) {
         /* aof_fsync is defined as fdatasync() for Linux in order to avoid
          * flushing metadata. */
         aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
-        server.lastfsync = now;
+        server.lastfsync = server.unixtime;
+    } else if ((server.appendfsync == APPENDFSYNC_EVERYSEC &&
+                server.unixtime > server.lastfsync)) {
+        if (!sync_in_progress) aof_background_fsync(server.appendfd);
+        server.lastfsync = server.unixtime;
     }
 }
 
-sds catAppendOnlyGenericCommand(sds buf, int argc, robj **argv) {
-    int j;
-    buf = sdscatprintf(buf,"*%d\r\n",argc);
+sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
+    char buf[32];
+    int len, j;
+    robj *o;
+
+    buf[0] = '*';
+    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
+    buf[len++] = '\r';
+    buf[len++] = '\n';
+    dst = sdscatlen(dst,buf,len);
+
     for (j = 0; j < argc; j++) {
-        robj *o = getDecodedObject(argv[j]);
-        buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
-        buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
-        buf = sdscatlen(buf,"\r\n",2);
+        o = getDecodedObject(argv[j]);
+        buf[0] = '$';
+        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
+        buf[len++] = '\r';
+        buf[len++] = '\n';
+        dst = sdscatlen(dst,buf,len);
+        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
+        dst = sdscatlen(dst,"\r\n",2);
         decrRefCount(o);
     }
-    return buf;
+    return dst;
 }
 
 sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) {
@@ -653,56 +716,127 @@ void aofUpdateCurrentSize(void) {
  * Handle this. */
 void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
     if (!bysignal && exitcode == 0) {
-        int fd;
+        int newfd, oldfd;
+        int nwritten;
         char tmpfile[256];
+        long long now = ustime();
 
         redisLog(REDIS_NOTICE,
-            "Background append only file rewriting terminated with success");
-        /* Now it's time to flush the differences accumulated by the parent */
-        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
-        fd = open(tmpfile,O_WRONLY|O_APPEND);
-        if (fd == -1) {
-            redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
+            "Background AOF rewrite terminated with success");
+
+        /* Flush the differences accumulated by the parent to the
+         * rewritten AOF. */
+        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
+            (int)server.bgrewritechildpid);
+        newfd = open(tmpfile,O_WRONLY|O_APPEND);
+        if (newfd == -1) {
+            redisLog(REDIS_WARNING,
+                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
             goto cleanup;
         }
-        /* Flush our data... */
-        if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
-                (signed) sdslen(server.bgrewritebuf)) {
-            redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
-            close(fd);
+
+        nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf));
+        if (nwritten != (signed)sdslen(server.bgrewritebuf)) {
+            if (nwritten == -1) {
+                redisLog(REDIS_WARNING,
+                    "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
+            } else {
+                redisLog(REDIS_WARNING,
+                    "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
+            }
+            close(newfd);
             goto cleanup;
         }
-        redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
-        /* Now our work is to rename the temp file into the stable file. And
-         * switch the file descriptor used by the server for append only. */
+
+        redisLog(REDIS_NOTICE,
+            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);
+
+        /* The only remaining thing to do is to rename the temporary file to
+         * the configured file and switch the file descriptor used to do AOF
+         * writes. We don't want close(2) or rename(2) calls to block the
+         * server on old file deletion.
+         *
+         * There are two possible scenarios:
+         *
+         * 1) AOF is DISABLED and this was a one time rewrite. The temporary
+         * file will be renamed to the configured file. When this file already
+         * exists, it will be unlinked, which may block the server.
+         *
+         * 2) AOF is ENABLED and the rewritten AOF will immediately start
+         * receiving writes. After the temporary file is renamed to the
+         * configured file, the original AOF file descriptor will be closed.
+         * Since this will be the last reference to that file, closing it
+         * causes the underlying file to be unlinked, which may block the
+         * server.
+         *
+         * To mitigate the blocking effect of the unlink operation (either
+         * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
+         * use a background thread to take care of this. First, we
+         * make scenario 1 identical to scenario 2 by opening the target file
+         * when it exists. The unlink operation after the rename(2) will then
+         * be executed upon calling close(2) for its descriptor. Everything to
+         * guarantee atomicity for this switch has already happened by then, so
+         * we don't care what the outcome or duration of that close operation
+         * is, as long as the file descriptor is released again. */
+        if (server.appendfd == -1) {
+            /* AOF disabled */
+
+             /* Don't care if this fails: oldfd will be -1 and we handle that.
+              * One notable case of -1 return is if the old file does
+              * not exist. */
+             oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK);
+        } else {
+            /* AOF enabled */
+            oldfd = -1; /* We'll set this to the current AOF filedes later. */
+        }
+
+        /* Rename the temporary file. This will not unlink the target file if
+         * it exists, because we reference it with "oldfd". */
         if (rename(tmpfile,server.appendfilename) == -1) {
-            redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
-            close(fd);
+            redisLog(REDIS_WARNING,
+                "Error trying to rename the temporary AOF: %s", strerror(errno));
+            close(newfd);
+            if (oldfd != -1) close(oldfd);
             goto cleanup;
         }
-        /* Mission completed... almost */
-        redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
-        if (server.appendfd != -1) {
-            /* If append only is actually enabled... */
-            close(server.appendfd);
-            server.appendfd = fd;
-            if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd);
-            server.appendseldb = -1; /* Make sure it will issue SELECT */
-            redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
+
+        if (server.appendfd == -1) {
+            /* AOF disabled, we don't need to set the AOF file descriptor
+             * to this new file, so we can close it. */
+            close(newfd);
+        } else {
+            /* AOF enabled, replace the old fd with the new one. */
+            oldfd = server.appendfd;
+            server.appendfd = newfd;
+            if (server.appendfsync == APPENDFSYNC_ALWAYS)
+                aof_fsync(newfd);
+            else if (server.appendfsync == APPENDFSYNC_EVERYSEC)
+                aof_background_fsync(newfd);
+            server.appendseldb = -1; /* Make sure SELECT is re-issued */
             aofUpdateCurrentSize();
             server.auto_aofrewrite_base_size = server.appendonly_current_size;
-        } else {
-            /* If append only is disabled we just generate a dump in this
-             * format. Why not? */
-            close(fd);
+
+            /* Clear regular AOF buffer since its contents was just written to
+             * the new AOF from the background rewrite buffer. */
+            sdsfree(server.aofbuf);
+            server.aofbuf = sdsempty();
         }
+
+        redisLog(REDIS_NOTICE, "Background AOF rewrite successful");
+
+        /* Asynchronously close the overwritten AOF. */
+        if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
+
+        redisLog(REDIS_VERBOSE,
+            "Background AOF rewrite signal handler took %lldus", ustime()-now);
     } else if (!bysignal && exitcode != 0) {
-        redisLog(REDIS_WARNING, "Background append only file rewriting error");
+        redisLog(REDIS_WARNING,
+            "Background AOF rewrite terminated with error");
     } else {
         redisLog(REDIS_WARNING,
-            "Background append only file rewriting terminated by signal %d",
-            bysignal);
+            "Background AOF rewrite terminated by signal %d", bysignal);
     }
+
 cleanup:
     sdsfree(server.bgrewritebuf);
     server.bgrewritebuf = sdsempty();
diff --git a/src/bio.c b/src/bio.c
new file mode 100644 (file)
index 0000000..eaac8e4
--- /dev/null
+++ b/src/bio.c
@@ -0,0 +1,208 @@
+/* Background I/O service for Redis.
+ *
+ * This file implements operations that we need to perform in the background.
+ * Currently there is only a single operation, that is a background close(2)
+ * system call. This is needed as when the process is the last owner of a
+ * reference to a file closing it means unlinking it, and the deletion of the
+ * file is slow, blocking the server.
+ *
+ * In the future we'll either continue implementing new things we need or
+ * we'll switch to libeio. However there are probably long term uses for this
+ * file as we may want to put here Redis specific background tasks (for instance
+ * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
+ * implementation).
+ *
+ * DESIGN
+ * ------
+ *
+ * The design is trivial, we have a structure representing a job to perform
+ * and a different thread and job queue for every job type.
+ * Every thread wait for new jobs in its queue, and process every job
+ * sequentially.
+ *
+ * Jobs of the same type are guaranteed to be processed from the least
+ * recently inserted to the most recently inserted (older jobs processed
+ * first).
+ *
+ * Currently there is no way for the creator of the job to be notified about
+ * the completion of the operation, this will only be added when/if needed.
+ */
+
+#include "redis.h"
+#include "bio.h"
+
+static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
+static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
+static list *bio_jobs[REDIS_BIO_NUM_OPS];
+/* The following array is used to hold the number of pending jobs for every
+ * OP type. This allows us to export the bioPendingJobsOfType() API that is
+ * useful when the main thread wants to perform some operation that may involve
+ * objects shared with the background thread. The main thread will just wait
+ * that there are no longer jobs of this type to be executed before performing
+ * the sensible operation. This data is also useful for reporting. */
+static unsigned long long bio_pending[REDIS_BIO_NUM_OPS];
+
+/* This structure represents a background Job. It is only used locally to this
+ * file as the API deos not expose the internals at all. */
+struct bio_job {
+    time_t time; /* Time at which the job was created. */
+    /* Job specific arguments pointers. If we need to pass more than three
+     * arguments we can just pass a pointer to a structure or alike. */
+    void *arg1, *arg2, *arg3;
+};
+
+void *bioProcessBackgroundJobs(void *arg);
+
+/* Make sure we have enough stack to perform all the things we do in the
+ * main thread. */
+#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
+
+/* Initialize the background system, spawning the thread. */
+void bioInit(void) {
+    pthread_attr_t attr;
+    pthread_t thread;
+    size_t stacksize;
+    int j;
+
+    /* Initialization of state vars and objects */
+    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
+        pthread_mutex_init(&bio_mutex[j],NULL);
+        pthread_cond_init(&bio_condvar[j],NULL);
+        bio_jobs[j] = listCreate();
+        bio_pending[j] = 0;
+    }
+
+    /* Set the stack size as by default it may be small in some system */
+    pthread_attr_init(&attr);
+    pthread_attr_getstacksize(&attr,&stacksize);
+    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
+    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
+    pthread_attr_setstacksize(&attr, stacksize);
+
+    /* Ready to spawn our threads. We use the single argument the thread
+     * function accepts in order to pass the job ID the thread is
+     * responsible of. */
+    for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
+        void *arg = (void*)(unsigned long) j;
+        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
+            redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
+            exit(1);
+        }
+    }
+}
+
+void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
+    struct bio_job *job = zmalloc(sizeof(*job));
+
+    job->time = time(NULL);
+    job->arg1 = arg1;
+    job->arg2 = arg2;
+    job->arg3 = arg3;
+    pthread_mutex_lock(&bio_mutex[type]);
+    listAddNodeTail(bio_jobs[type],job);
+    bio_pending[type]++;
+    pthread_cond_signal(&bio_condvar[type]);
+    pthread_mutex_unlock(&bio_mutex[type]);
+}
+
+void *bioProcessBackgroundJobs(void *arg) {
+    struct bio_job *job;
+    unsigned long type = (unsigned long) arg;
+
+    pthread_detach(pthread_self());
+    pthread_mutex_lock(&bio_mutex[type]);
+    while(1) {
+        listNode *ln;
+
+        /* The loop always starts with the lock hold. */
+        if (listLength(bio_jobs[type]) == 0) {
+            pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
+            continue;
+        }
+        /* Pop the job from the queue. */
+        ln = listFirst(bio_jobs[type]);
+        job = ln->value;
+        /* It is now possible to unlock the background system as we know have
+         * a stand alone job structure to process.*/
+        pthread_mutex_unlock(&bio_mutex[type]);
+
+        /* Process the job accordingly to its type. */
+        if (type == REDIS_BIO_CLOSE_FILE) {
+            close((long)job->arg1);
+        } else if (type == REDIS_BIO_AOF_FSYNC) {
+            aof_fsync((long)job->arg1);
+        } else {
+            redisPanic("Wrong job type in bioProcessBackgroundJobs().");
+        }
+        zfree(job);
+
+        /* Lock again before reiterating the loop, if there are no longer
+         * jobs to process we'll block again in pthread_cond_wait(). */
+        pthread_mutex_lock(&bio_mutex[type]);
+        listDelNode(bio_jobs[type],ln);
+        bio_pending[type]--;
+    }
+}
+
+/* Return the number of pending jobs of the specified type. */
+unsigned long long bioPendingJobsOfType(int type) {
+    unsigned long long val;
+    pthread_mutex_lock(&bio_mutex[type]);
+    val = bio_pending[type];
+    pthread_mutex_unlock(&bio_mutex[type]);
+    return val;
+}
+
+#if 0 /* We don't use the following code for now, and bioWaitPendingJobsLE
+         probably needs a rewrite using conditional variables instead of the
+         current implementation. */
+         
+
+/* Wait until the number of pending jobs of the specified type are
+ * less or equal to the specified number.
+ *
+ * This function may block for long time, it should only be used to perform
+ * the following tasks:
+ *
+ * 1) To avoid that the main thread is pushing jobs of a given time so fast
+ *    that the background thread can't process them at the same speed.
+ *    So before creating a new job of a given type the main thread should
+ *    call something like: bioWaitPendingJobsLE(job_type,10000);
+ * 2) In order to perform special operations that make it necessary to be sure
+ *    no one is touching shared resourced in the background.
+ */
+void bioWaitPendingJobsLE(int type, unsigned long long num) {
+    unsigned long long iteration = 0;
+
+    /* We poll the jobs queue aggressively to start, and gradually relax
+     * the polling speed if it is going to take too much time. */
+    while(1) {
+        iteration++;
+        if (iteration > 1000 && iteration <= 10000) {
+            usleep(100);
+        } else if (iteration > 10000) {
+            usleep(1000);
+        }
+        if (bioPendingJobsOfType(type) <= num) break;
+    }
+}
+
+/* Return the older job of the specified type. */
+time_t bioOlderJobOfType(int type) {
+    time_t time;
+    listNode *ln;
+    struct bio_job *job;
+
+    pthread_mutex_lock(&bio_mutex[type]);
+    ln = listFirst(bio_jobs[type]);
+    if (ln == NULL) {
+        pthread_mutex_unlock(&bio_mutex[type]);
+        return 0;
+    }
+    job = ln->value;
+    time = job->time;
+    pthread_mutex_unlock(&bio_mutex[type]);
+    return time;
+}
+
+#endif
diff --git a/src/bio.h b/src/bio.h
new file mode 100644 (file)
index 0000000..22a9b33
--- /dev/null
+++ b/src/bio.h
@@ -0,0 +1,11 @@
+/* Exported API */
+void bioInit(void);
+void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);
+unsigned long long bioPendingJobsOfType(int type);
+void bioWaitPendingJobsLE(int type, unsigned long long num);
+time_t bioOlderJobOfType(int type);
+
+/* Background job opcodes */
+#define REDIS_BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
+#define REDIS_BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
+#define REDIS_BIO_NUM_OPS       2
index 6f9657dde8ffde20b0b794f96dc81a1fdc5a171b..d470dab1a1d80614be08ab53557a26c1c7b91eb8 100644 (file)
@@ -508,12 +508,11 @@ void configGetCommand(redisClient *c) {
     if (stringmatch(pattern,"dir",0)) {
         char buf[1024];
 
-        addReplyBulkCString(c,"dir");
-        if (getcwd(buf,sizeof(buf)) == NULL) {
+        if (getcwd(buf,sizeof(buf)) == NULL)
             buf[0] = '\0';
-        } else {
-            addReplyBulkCString(c,buf);
-        }
+
+        addReplyBulkCString(c,"dir");
+        addReplyBulkCString(c,buf);
         matches++;
     }
     if (stringmatch(pattern,"dbfilename",0)) {
index 629267d1cad2d025fe958407569c1a88043a810c..3979ab6225c4ca24ca64f557db46e03a0edc6f9f 100644 (file)
@@ -610,7 +610,7 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
         }
     }
     if (totwritten > 0) c->lastinteraction = time(NULL);
-    if (listLength(c->reply) == 0) {
+    if (c->bufpos == 0 && listLength(c->reply) == 0) {
         c->sentlen = 0;
         aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
 
@@ -793,6 +793,9 @@ int processMultibulkBuffer(redisClient *c) {
 void processInputBuffer(redisClient *c) {
     /* Keep processing while there is something in the input buffer */
     while(sdslen(c->querybuf)) {
+        /* Immediately abort if the client is in the middle of something. */
+        if (c->flags & REDIS_BLOCKED) return;
+
         /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
          * written to the client. Make sure to not let the reply grow after
          * this flag has been set (i.e. don't process more commands). */
index 7295dc32a6d31cf4ef6fa03183ab163ce5e2a409..e4a40e13ac90acb31eda899df1ec63c5d48b596f 100644 (file)
@@ -53,9 +53,10 @@ static struct config {
     int hostport;
     const char *hostsocket;
     int numclients;
-    int requests;
     int liveclients;
-    int donerequests;
+    int requests;
+    int requests_issued;
+    int requests_finished;
     int keysize;
     int datasize;
     int randomkeys;
@@ -148,7 +149,7 @@ static void randomizeClientKey(client c) {
 }
 
 static void clientDone(client c) {
-    if (config.donerequests == config.requests) {
+    if (config.requests_finished == config.requests) {
         freeClient(c);
         aeStop(config.el);
         return;
@@ -189,8 +190,8 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
                 exit(1);
             }
 
-            if (config.donerequests < config.requests)
-                config.latency[config.donerequests++] = c->latency;
+            if (config.requests_finished < config.requests)
+                config.latency[config.requests_finished++] = c->latency;
             clientDone(c);
         }
     }
@@ -202,8 +203,15 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     REDIS_NOTUSED(fd);
     REDIS_NOTUSED(mask);
 
-    /* When nothing was written yet, randomize keys and set start time. */
+    /* Initialize request when nothing was written. */
     if (c->written == 0) {
+        /* Enforce upper bound to number of requests. */
+        if (config.requests_issued++ >= config.requests) {
+            freeClient(c);
+            return;
+        }
+
+        /* Really initialize: randomize keys and set start time. */
         if (config.randomkeys) randomizeClientKey(c);
         c->start = ustime();
         c->latency = -1;
@@ -286,10 +294,10 @@ static void showLatencyReport(void) {
     int i, curlat = 0;
     float perc, reqpersec;
 
-    reqpersec = (float)config.donerequests/((float)config.totlatency/1000);
+    reqpersec = (float)config.requests_finished/((float)config.totlatency/1000);
     if (!config.quiet) {
         printf("====== %s ======\n", config.title);
-        printf("  %d requests completed in %.2f seconds\n", config.donerequests,
+        printf("  %d requests completed in %.2f seconds\n", config.requests_finished,
             (float)config.totlatency/1000);
         printf("  %d parallel clients\n", config.numclients);
         printf("  %d bytes payload\n", config.datasize);
@@ -314,7 +322,8 @@ static void benchmark(const char *title, const char *cmd, int len) {
     client c;
 
     config.title = title;
-    config.donerequests = 0;
+    config.requests_issued = 0;
+    config.requests_finished = 0;
 
     c = createClient(cmd,len);
     createMissingClients(c);
@@ -416,7 +425,7 @@ int showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData
     REDIS_NOTUSED(clientData);
 
     float dt = (float)(mstime()-config.start)/1000.0;
-    float rps = (float)config.donerequests/dt;
+    float rps = (float)config.requests_finished/dt;
     printf("%s: %.2f\r", config.title, rps);
     fflush(stdout);
     return 250; /* every 250ms */
@@ -438,7 +447,6 @@ int main(int argc, const char **argv) {
     config.el = aeCreateEventLoop();
     aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL);
     config.keepalive = 1;
-    config.donerequests = 0;
     config.datasize = 3;
     config.randomkeys = 0;
     config.randomkeys_keyspacelen = 0;
index d0c9d979f014a769e88e78e311bf484f651e8f08..328cd3df207e1561e2e5f9bc3e517b75c94b9534 100644 (file)
@@ -61,6 +61,7 @@ static struct config {
     int shutdown;
     int monitor_mode;
     int pubsub_mode;
+    int latency_mode;
     int stdinarg; /* get last arg from stdin. (-x option) */
     char *auth;
     int raw_output; /* output mode per command */
@@ -567,6 +568,8 @@ static int parseOptions(int argc, char **argv) {
             i++;
         } else if (!strcmp(argv[i],"--raw")) {
             config.raw_output = 1;
+        } else if (!strcmp(argv[i],"--latency")) {
+            config.latency_mode = 1;
         } else if (!strcmp(argv[i],"-d") && !lastarg) {
             sdsfree(config.mb_delim);
             config.mb_delim = sdsnew(argv[i+1]);
@@ -617,6 +620,7 @@ static void usage() {
 "  -x               Read last argument from STDIN\n"
 "  -d <delimiter>   Multi-bulk delimiter in for raw formatting (default: \\n)\n"
 "  --raw            Use raw formatting for replies (default when STDOUT is not a tty)\n"
+"  --latency        Enter a special mode continuously sampling latency.\n"
 "  --help           Output this help and exit\n"
 "  --version        Output version and exit\n"
 "\n"
@@ -739,6 +743,38 @@ static int noninteractive(int argc, char **argv) {
     return retval;
 }
 
+static void latencyMode(void) {
+    redisReply *reply;
+    long long start, latency, min, max, tot, count = 0;
+    double avg;
+
+    if (!context) exit(1);
+    while(1) {
+        start = mstime();
+        reply = redisCommand(context,"PING");
+        if (reply == NULL) {
+            fprintf(stderr,"\nI/O error\n");
+            exit(1);
+        }
+        latency = mstime()-start;
+        freeReplyObject(reply);
+        count++;
+        if (count == 1) {
+            min = max = tot = latency;
+            avg = (double) latency;
+        } else {
+            if (latency < min) min = latency;
+            if (latency > max) max = latency;
+            tot += latency;
+            avg = (double) tot/count;
+        }
+        printf("\x1b[0G\x1b[2Kmin: %lld, max: %lld, avg: %.2f (%lld samples)",
+            min, max, avg, count);
+        fflush(stdout);
+        usleep(10000);
+    }
+}
+
 int main(int argc, char **argv) {
     int firstarg;
 
@@ -752,6 +788,7 @@ int main(int argc, char **argv) {
     config.shutdown = 0;
     config.monitor_mode = 0;
     config.pubsub_mode = 0;
+    config.latency_mode = 0;
     config.stdinarg = 0;
     config.auth = NULL;
     config.raw_output = !isatty(fileno(stdout)) && (getenv("FAKETTY") == NULL);
@@ -762,6 +799,12 @@ int main(int argc, char **argv) {
     argc -= firstarg;
     argv += firstarg;
 
+    /* Start in latency mode if appropriate */
+    if (config.latency_mode) {
+        cliConnect(0);
+        latencyMode();
+    }
+
     /* Start interactive mode when no command is provided */
     if (argc == 0) {
         /* Note that in repl mode we don't abort on connection error.
index 0bedc25b38c8c6250f882e0fa07e08b9986548a7..2018d13e49158c31206e6681ec58fb88e0038a2c 100644 (file)
@@ -29,6 +29,7 @@
 
 #include "redis.h"
 #include "slowlog.h"
+#include "bio.h"
 
 #ifdef HAVE_BACKTRACE
 #include <execinfo.h>
@@ -575,6 +576,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * in objects at every object access, and accuracy is not needed.
      * To access a global var is faster than calling time(NULL) */
     server.unixtime = time(NULL);
+
     /* We have just 22 bits per object for LRU information.
      * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
      * 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
@@ -685,7 +687,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
              server.auto_aofrewrite_perc &&
              server.appendonly_current_size > server.auto_aofrewrite_min_size)
          {
-            int base = server.auto_aofrewrite_base_size ?
+            long long base = server.auto_aofrewrite_base_size ?
                             server.auto_aofrewrite_base_size : 1;
             long long growth = (server.appendonly_current_size*100/base) - 100;
             if (growth >= server.auto_aofrewrite_perc) {
@@ -695,6 +697,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
          }
     }
 
+
+    /* If we postponed an AOF buffer flush, let's try to do it every time the
+     * cron function is called. */
+    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
+
     /* Expire a few keys per cycle, only if this is a master.
      * On slaves we wait for DEL operations synthesized by the master
      * in order to guarantee a strict consistency. */
@@ -733,7 +740,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
     }
 
     /* Write the AOF buffer on disk */
-    flushAppendOnlyFile();
+    flushAppendOnlyFile(0);
 }
 
 /* =========================== Server initialization ======================== */
@@ -820,6 +827,7 @@ void initServerConfig() {
     server.lastfsync = time(NULL);
     server.appendfd = -1;
     server.appendseldb = -1; /* Make sure the first time will not match */
+    server.aof_flush_postponed_start = 0;
     server.pidfile = zstrdup("/var/run/redis.pid");
     server.dbfilename = zstrdup("dump.rdb");
     server.appendfilename = zstrdup("appendonly.aof");
@@ -903,7 +911,8 @@ void initServer() {
     if (server.port != 0) {
         server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);
         if (server.ipfd == ANET_ERR) {
-            redisLog(REDIS_WARNING, "Opening port: %s", server.neterr);
+            redisLog(REDIS_WARNING, "Opening port %d: %s",
+                server.port, server.neterr);
             exit(1);
         }
     }
@@ -965,6 +974,7 @@ void initServer() {
     if (server.cluster_enabled) clusterInit();
     scriptingInit();
     slowlogInit();
+    bioInit();
     srand(time(NULL)^getpid());
 }
 
@@ -1022,9 +1032,9 @@ void call(redisClient *c) {
     slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
     c->cmd->calls++;
 
-    if (server.appendonly && dirty)
+    if (server.appendonly && dirty > 0)
         feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
-    if ((dirty || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
+    if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
         listLength(server.slaves))
         replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
     if (listLength(server.monitors))
@@ -1726,6 +1736,7 @@ void redisAsciiArt(void) {
 int main(int argc, char **argv) {
     long long start;
 
+    zmalloc_enable_thread_safeness();
     initServerConfig();
     if (argc == 2) {
         if (strcmp(argv[1], "-v") == 0 ||
index 1a45cc8cd52260d525a8388fa52f36adcf06eb07..e754918deabe296cb89eeef3989db94e50afee52 100644 (file)
@@ -559,6 +559,7 @@ struct redisServer {
     time_t lastfsync;
     int appendfd;
     int appendseldb;
+    time_t aof_flush_postponed_start;
     char *pidfile;
     pid_t bgsavechildpid;
     pid_t bgrewritechildpid;
@@ -612,30 +613,6 @@ struct redisServer {
     size_t zset_max_ziplist_entries;
     size_t zset_max_ziplist_value;
     time_t unixtime;    /* Unix time sampled every second. */
-    /* Virtual memory I/O threads stuff */
-    /* An I/O thread process an element taken from the io_jobs queue and
-     * put the result of the operation in the io_done list. While the
-     * job is being processed, it's put on io_processing queue. */
-    list *io_newjobs; /* List of VM I/O jobs yet to be processed */
-    list *io_processing; /* List of VM I/O jobs being processed */
-    list *io_processed; /* List of VM I/O jobs already processed */
-    list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
-    pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
-    pthread_cond_t io_condvar; /* I/O threads conditional variable */
-    pthread_attr_t io_threads_attr; /* attributes for threads creation */
-    int io_active_threads; /* Number of running I/O threads */
-    int vm_max_threads; /* Max number of I/O threads running at the same time */
-    /* Our main thread is blocked on the event loop, locking for sockets ready
-     * to be read or written, so when a threaded I/O operation is ready to be
-     * processed by the main thread, the I/O thread will use a unix pipe to
-     * awake the main thread. The followings are the two pipe FDs. */
-    int io_ready_pipe_read;
-    int io_ready_pipe_write;
-    /* Virtual memory stats */
-    unsigned long long vm_stats_used_pages;
-    unsigned long long vm_stats_swapped_objects;
-    unsigned long long vm_stats_swapouts;
-    unsigned long long vm_stats_swapins;
     /* Pubsub */
     dict *pubsub_channels; /* Map channels to list of subscribed clients */
     list *pubsub_patterns; /* A list of pubsub_patterns */
@@ -894,7 +871,7 @@ int rdbSaveType(FILE *fp, unsigned char type);
 int rdbSaveLen(FILE *fp, uint32_t len);
 
 /* AOF persistence */
-void flushAppendOnlyFile(void);
+void flushAppendOnlyFile(int force);
 void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
 void aofRemoveTempFile(pid_t childpid);
 int rewriteAppendOnlyFileBackground(void);
index 2ec7c3cb76b3a2d6f5afdb3be8bd6aa793329a1c..77052966ceb79e72355df042f400648d35131829 100644 (file)
--- a/src/sds.c
+++ b/src/sds.c
@@ -94,6 +94,13 @@ void sdsupdatelen(sds s) {
     sh->len = reallen;
 }
 
+void sdsclear(sds s) {
+    struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr)));
+    sh->free += sh->len;
+    sh->len = 0;
+    sh->buf[0] = '\0';
+}
+
 static sds sdsMakeRoomFor(sds s, size_t addlen) {
     struct sdshdr *sh, *newsh;
     size_t free = sdsavail(s);
index af5c4910bdae766730c776691fdd2019d18d920c..6e5684eeb913e370f2c0907ac15262622d19873e 100644 (file)
--- a/src/sds.h
+++ b/src/sds.h
@@ -76,6 +76,7 @@ sds sdscatprintf(sds s, const char *fmt, ...);
 sds sdstrim(sds s, const char *cset);
 sds sdsrange(sds s, int start, int end);
 void sdsupdatelen(sds s);
+void sdsclear(sds s);
 int sdscmp(sds s1, sds s2);
 sds *sdssplitlen(char *s, int len, char *sep, int seplen, int *count);
 void sdsfreesplitres(sds *tokens, int count);
index 5427293f962a7d821933da970772209929a9e6f3..71436198d7859ccff9e8c27f3ccf4376232dea98 100644 (file)
@@ -519,7 +519,12 @@ void lrangeCommand(redisClient *c) {
             p = ziplistNext(o->ptr,p);
         }
     } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
-        listNode *ln = listIndex(o->ptr,start);
+        listNode *ln;
+
+        /* If we are nearest to the end of the list, reach the element
+         * starting from tail and going backward, as it is faster. */
+        if (start > llen/2) start -= llen;
+        ln = listIndex(o->ptr,start);
 
         while(rangelen--) {
             addReplyBulk(c,ln->value);
@@ -643,7 +648,7 @@ void lremCommand(redisClient *c) {
 void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
     robj *aux;
 
-    if (!handleClientsWaitingListPush(c,dstkey,value)) {
+    if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
         /* Create the list if the key does not exist */
         if (!dstobj) {
             dstobj = createZiplistObject();
@@ -653,10 +658,12 @@ void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey,
         }
         listTypePush(dstobj,value,REDIS_HEAD);
         /* If we are pushing as a result of LPUSH against a key
-         * watched by BLPOPLPUSH, we need to rewrite the command vector.
-         * But if this is called directly by RPOPLPUSH (either directly
+         * watched by BRPOPLPUSH, we need to rewrite the command vector
+         * as an LPUSH.
+         *
+         * If this is called directly by RPOPLPUSH (either directly
          * or via a BRPOPLPUSH where the popped list exists)
-         * we should replicate the BRPOPLPUSH command itself. */
+         * we should replicate the RPOPLPUSH command itself. */
         if (c != origclient) {
             aux = createStringObject("LPUSH",5);
             rewriteClientCommandVector(origclient,3,aux,dstkey,value);
diff --git a/tests/integration/aof-race.tcl b/tests/integration/aof-race.tcl
new file mode 100644 (file)
index 0000000..207f207
--- /dev/null
@@ -0,0 +1,35 @@
+set defaults { appendonly {yes} appendfilename {appendonly.aof} }
+set server_path [tmpdir server.aof]
+set aof_path "$server_path/appendonly.aof"
+
+proc start_server_aof {overrides code} {
+    upvar defaults defaults srv srv server_path server_path
+    set config [concat $defaults $overrides]
+    start_server [list overrides $config] $code
+}
+
+tags {"aof"} {
+    # Specific test for a regression where internal buffers were not properly
+    # cleaned after a child responsible for an AOF rewrite exited. This buffer
+    # was subsequently appended to the new AOF, resulting in duplicate commands.
+    start_server_aof [list dir $server_path] {
+        set client [redis [srv host] [srv port]]
+        set bench [open "|src/redis-benchmark -q -p [srv port] -c 20 -n 20000 incr foo" "r+"]
+        after 100
+
+        # Benchmark should be running by now: start background rewrite
+        $client bgrewriteaof
+
+        # Read until benchmark pipe reaches EOF
+        while {[string length [read $bench]] > 0} {}
+
+        # Check contents of foo
+        assert_equal 20000 [$client get foo]
+    }
+
+    # Restart server to replay AOF
+    start_server_aof [list dir $server_path] {
+        set client [redis [srv host] [srv port]]
+        assert_equal 20000 [$client get foo]
+    }
+}
index 4e68905a5a245304ace9020c140c4d18ae89dd84..c875cfd8030755c752f9aa72a9aa034419393512 100644 (file)
@@ -5,7 +5,7 @@ set ::tests_failed {}
 
 proc assert {condition} {
     if {![uplevel 1 expr $condition]} {
-        error "assertion:Expected '$value' to be true"
+        error "assertion:Expected condition '$condition' to be true"
     }
 }
 
index 559d026471073977eba18815d190eb1e9569b3ab..4f3cf01ec27eca0c5fb47751e870bc08273056e7 100644 (file)
@@ -32,6 +32,7 @@ set ::all_tests {
     unit/pubsub
     unit/slowlog
     unit/scripting
+    unit/maxmemory
 }
 # Index to the next test to run in the ::all_tests list.
 set ::next_test 0
diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl
new file mode 100644 (file)
index 0000000..2cde1d8
--- /dev/null
@@ -0,0 +1,120 @@
+start_server {tags {"maxmemory"}} {
+    foreach policy {
+        allkeys-random allkeys-lru volatile-lru volatile-random volatile-ttl
+    } {
+        test "maxmemory - is the memory limit honoured? (policy $policy)" {
+            # make sure to start with a blank instance
+            r flushall 
+            # Get the current memory limit and calculate a new limit.
+            # We just add 100k to the current memory size so that it is
+            # fast for us to reach that limit.
+            set used [s used_memory]
+            set limit [expr {$used+100*1024}]
+            r config set maxmemory $limit
+            r config set maxmemory-policy $policy
+            # Now add keys until the limit is almost reached.
+            set numkeys 0
+            while 1 {
+                r setex [randomKey] 10000 x
+                incr numkeys
+                if {[s used_memory]+4096 > $limit} {
+                    assert {$numkeys > 10}
+                    break
+                }
+            }
+            # If we add the same number of keys already added again, we
+            # should still be under the limit.
+            for {set j 0} {$j < $numkeys} {incr j} {
+                r setex [randomKey] 10000 x
+            }
+            assert {[s used_memory] < ($limit+4096)}
+        }
+    }
+
+    foreach policy {
+        allkeys-random allkeys-lru volatile-lru volatile-random volatile-ttl
+    } {
+        test "maxmemory - only allkeys-* should remove non-volatile keys ($policy)" {
+            # make sure to start with a blank instance
+            r flushall 
+            # Get the current memory limit and calculate a new limit.
+            # We just add 100k to the current memory size so that it is
+            # fast for us to reach that limit.
+            set used [s used_memory]
+            set limit [expr {$used+100*1024}]
+            r config set maxmemory $limit
+            r config set maxmemory-policy $policy
+            # Now add keys until the limit is almost reached.
+            set numkeys 0
+            while 1 {
+                r set [randomKey] x
+                incr numkeys
+                if {[s used_memory]+4096 > $limit} {
+                    assert {$numkeys > 10}
+                    break
+                }
+            }
+            # If we add the same number of keys already added again and
+            # the policy is allkeys-* we should still be under the limit.
+            # Otherwise we should see an error reported by Redis.
+            set err 0
+            for {set j 0} {$j < $numkeys} {incr j} {
+                if {[catch {r set [randomKey] x} e]} {
+                    if {[string match {*used memory*} $e]} {
+                        set err 1
+                    }
+                }
+            }
+            if {[string match allkeys-* $policy]} {
+                assert {[s used_memory] < ($limit+4096)}
+            } else {
+                assert {$err == 1}
+            }
+        }
+    }
+
+    foreach policy {
+        volatile-lru volatile-random volatile-ttl
+    } {
+        test "maxmemory - policy $policy should only remove volatile keys." {
+            # make sure to start with a blank instance
+            r flushall 
+            # Get the current memory limit and calculate a new limit.
+            # We just add 100k to the current memory size so that it is
+            # fast for us to reach that limit.
+            set used [s used_memory]
+            set limit [expr {$used+100*1024}]
+            r config set maxmemory $limit
+            r config set maxmemory-policy $policy
+            # Now add keys until the limit is almost reached.
+            set numkeys 0
+            while 1 {
+                # Odd keys are volatile
+                # Even keys are non volatile
+                if {$numkeys % 2} {
+                    r setex "key:$numkeys" 10000 x
+                } else {
+                    r set "key:$numkeys" x
+                }
+                if {[s used_memory]+4096 > $limit} {
+                    assert {$numkeys > 10}
+                    break
+                }
+                incr numkeys
+            }
+            # Now we add the same number of volatile keys already added.
+            # We expect Redis to evict only volatile keys in order to make
+            # space.
+            set err 0
+            for {set j 0} {$j < $numkeys} {incr j} {
+                catch {r setex "foo:$j" 10000 x}
+            }
+            # We should still be under the limit.
+            assert {[s used_memory] < ($limit+4096)}
+            # However all our non volatile keys should be here.
+            for {set j 0} {$j < $numkeys} {incr j 2} {
+                assert {[r exists "key:$j"]}
+            }
+        }
+    }
+}
index b0faf5dd74ed0dfe33f7620203560d5b52983fb8..516a834a858c14138503d6c447139ce0e0cd89d6 100644 (file)
@@ -60,3 +60,19 @@ start_server {tags {"protocol"}} {
         assert_error "*wrong*arguments*ping*" {r ping x y z}
     }
 }
+
+start_server {tags {"regression"}} {
+    test "Regression for a crash with blocking ops and pipelining" {
+        set rd [redis_deferring_client]
+        set fd [r channel]
+        set proto "*3\r\n\$5\r\nBLPOP\r\n\$6\r\nnolist\r\n\$1\r\n0\r\n"
+        puts -nonewline $fd $proto$proto
+        flush $fd
+        set res {}
+
+        $rd rpush nolist a
+        $rd read
+        $rd rpush nolist a
+        $rd read
+    }
+}
index ff178db4176b2bf68e489f1dc8024ec90d05b2e5..970e3ee7fb8fcf65719f8f6129a616aaa7f1e5ba 100644 (file)
@@ -728,4 +728,18 @@ start_server {
             assert_equal 3 [r llen myotherlist]
         }
     }
+
+    test "Regression for bug 593 - chaining BRPOPLPUSH with other blocking cmds" {
+        set rd1 [redis_deferring_client]
+        set rd2 [redis_deferring_client]
+
+        $rd1 brpoplpush a b 0
+        $rd1 brpoplpush a b 0
+        $rd2 brpoplpush b c 0
+        after 1000
+        r lpush a data
+        $rd1 close
+        $rd2 close
+        r ping
+    } {PONG}
 }