]> git.saurik.com Git - redis.git/commitdiff
merge conflicts resolved
authorantirez <antirez@gmail.com>
Thu, 22 Sep 2011 13:15:26 +0000 (15:15 +0200)
committerantirez <antirez@gmail.com>
Thu, 22 Sep 2011 13:15:26 +0000 (15:15 +0200)
1  2 
src/Makefile
src/aof.c
src/cluster.c
src/rdb.c
src/redis.h

diff --combined src/Makefile
index 36bba34c426d68838e42d38469709329fef67a29,bce8b6e514d432a7faed45849614357328cea37d..dac6deaf9a5d31bb955b170bcbabeaabdbdeb0d4
@@@ -5,50 -5,41 +5,50 @@@
  release_hdr := $(shell sh -c './mkreleasehdr.sh')
  uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
  OPTIMIZATION?=-O2
 +
 +ifeq ($(uname_S),Linux)
 +  ifneq ($(FORCE_LIBC_MALLOC),yes)
 +    USE_JEMALLOC=yes
 +  endif
 +endif
 +
  ifeq ($(uname_S),SunOS)
 -  CFLAGS?= -std=c99 -pedantic $(OPTIMIZATION) -Wall -W -D__EXTENSIONS__ -D_XPG6
 -  CCLINK?= -ldl -lnsl -lsocket -lm -lpthread
 -  DEBUG?= -g -ggdb 
 +  CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -Wall -W -D__EXTENSIONS__ -D_XPG6
 +  CCLINK?=-ldl -lnsl -lsocket -lm -lpthread
 +  DEBUG?=-g -ggdb 
  else
 -  CFLAGS?= -std=c99 -pedantic $(OPTIMIZATION) -Wall -W $(ARCH) $(PROF)
 -  CCLINK?= -lm -pthread
 -  DEBUG?= -g -rdynamic -ggdb 
 +  CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -Wall -W $(ARCH) $(PROF)
 +  CCLINK?=-lm -pthread
 +  DEBUG?=-g -rdynamic -ggdb 
  endif
  
  ifeq ($(USE_TCMALLOC),yes)
 +  ALLOD_DEPS=
    ALLOC_LINK=-ltcmalloc
    ALLOC_FLAGS=-DUSE_TCMALLOC
  endif
  
  ifeq ($(USE_TCMALLOC_MINIMAL),yes)
 +  ALLOD_DEPS=
    ALLOC_LINK=-ltcmalloc_minimal
    ALLOC_FLAGS=-DUSE_TCMALLOC
  endif
  
  ifeq ($(USE_JEMALLOC),yes)
 -  ALLOC_LINK=-ljemalloc
 -  ALLOC_FLAGS=-DUSE_JEMALLOC
 +  ALLOC_DEP=../deps/jemalloc/lib/libjemalloc.a
 +  ALLOC_LINK=$(ALLOC_DEP) -ldl
 +  ALLOC_FLAGS=-DUSE_JEMALLOC -I../deps/jemalloc/include
  endif
  
  CCLINK+= $(ALLOC_LINK)
  CFLAGS+= $(ALLOC_FLAGS)
  
 -CCOPT= $(CFLAGS) $(CCLINK) $(ARCH) $(PROF)
 +CCOPT= $(CFLAGS) $(ARCH) $(PROF)
  
  PREFIX= /usr/local
  INSTALL_BIN= $(PREFIX)/bin
  INSTALL= cp -p
  
 -
  CCCOLOR="\033[34m"
  LINKCOLOR="\033[34;1m"
  SRCCOLOR="\033[33m"
@@@ -56,12 -47,7 +56,12 @@@ BINCOLOR="\033[37;1m
  MAKECOLOR="\033[32;1m"
  ENDCOLOR="\033[0m"
  
 -OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o dscache.o pubsub.o multi.o debug.o sort.o intset.o syncio.o diskstore.o cluster.o crc16.o endian.o rio.o
 +ifndef V
 +QUIET_CC = @printf '    %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR);
 +QUIET_LINK = @printf '    %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR);
 +endif
 +
- OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endian.o slowlog.o scripting.o bio.o
++OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endian.o slowlog.o scripting.o bio.o rio.o
  BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
  CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o
  CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
@@@ -86,35 -72,32 +86,35 @@@ ae_kqueue.o: ae_kqueue.
  ae_select.o: ae_select.c
  anet.o: anet.c fmacros.h anet.h
  aof.o: aof.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 +bio.o: bio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h bio.h
 +cluster.o: cluster.c redis.h fmacros.h config.h ae.h sds.h dict.h \
 +  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  config.o: config.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 +crc16.o: crc16.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  db.o: db.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  debug.o: debug.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h sha1.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h sha1.h
  dict.o: dict.c fmacros.h dict.h zmalloc.h
 -diskstore.o: diskstore.c redis.h fmacros.h config.h ae.h sds.h dict.h \
 -  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 -dscache.o: dscache.c redis.h fmacros.h config.h ae.h sds.h dict.h \
 -  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 -intset.o: intset.c intset.h zmalloc.h
 +endian.o: endian.c
 +intset.o: intset.c intset.h zmalloc.h endian.h
  lzf_c.o: lzf_c.c lzfP.h
  lzf_d.o: lzf_d.c lzfP.h
  multi.o: multi.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  networking.o: networking.c redis.h fmacros.h config.h ae.h sds.h dict.h \
 -  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  object.o: object.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  pqsort.o: pqsort.c
  pubsub.o: pubsub.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  rdb.o: rdb.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h lzf.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h lzf.h
  redis-benchmark.o: redis-benchmark.c fmacros.h ae.h \
    ../deps/hiredis/hiredis.h sds.h adlist.h zmalloc.h
  redis-check-aof.o: redis-check-aof.c fmacros.h config.h
@@@ -122,90 -105,81 +122,91 @@@ redis-check-dump.o: redis-check-dump.c 
  redis-cli.o: redis-cli.c fmacros.h version.h ../deps/hiredis/hiredis.h \
    sds.h zmalloc.h ../deps/linenoise/linenoise.h help.h
  redis.o: redis.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h asciilogo.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h \
 +  bio.h asciilogo.h
  release.o: release.c release.h
  replication.o: replication.c redis.h fmacros.h config.h ae.h sds.h dict.h \
 -  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 +scripting.o: scripting.c redis.h fmacros.h config.h ae.h sds.h dict.h \
 +  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
 +  sha1.h
+ rio.o: rio.c sds.h
  sds.o: sds.c sds.h zmalloc.h
 -sha1.o: sha1.c sha1.h
 +sha1.o: sha1.c sha1.h config.h
 +slowlog.o: slowlog.c redis.h fmacros.h config.h ae.h sds.h dict.h \
 +  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
 +  slowlog.h
  sort.o: sort.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h pqsort.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h pqsort.h
  syncio.o: syncio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  t_hash.o: t_hash.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  t_list.o: t_list.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  t_set.o: t_set.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  t_string.o: t_string.c redis.h fmacros.h config.h ae.h sds.h dict.h \
 -  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 +  adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
  t_zset.o: t_zset.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 -util.o: util.c util.h
 -cluster.o: redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
 -  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h
 -ziplist.o: ziplist.c zmalloc.h ziplist.h
 -zipmap.o: zipmap.c zmalloc.h
 -zmalloc.o: zmalloc.c config.h
 +  zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
 +util.o: util.c fmacros.h util.h
 +ziplist.o: ziplist.c zmalloc.h util.h ziplist.h endian.h
 +zipmap.o: zipmap.c zmalloc.h endian.h
 +zmalloc.o: zmalloc.c config.h zmalloc.h
 +
 +.PHONY: dependencies
  
  dependencies:
 -      @echo $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)hiredis$(ENDCOLOR)
 +      @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)hiredis$(ENDCOLOR)
        @cd ../deps/hiredis && $(MAKE) static ARCH="$(ARCH)"
 -      @echo $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)linenoise$(ENDCOLOR)
 +      @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)linenoise$(ENDCOLOR)
        @cd ../deps/linenoise && $(MAKE) ARCH="$(ARCH)"
 +      @echo $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)Lua ansi$(ENDCOLOR)
 +      @cd ../deps/lua && $(MAKE) ARCH="$(ARCH)" ansi
  
 -redis-server: $(OBJ)
 -      @$(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ)
 -      @echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR)
 +../deps/jemalloc/lib/libjemalloc.a:
 +      cd ../deps/jemalloc && ./configure $(JEMALLOC_CFLAGS) --with-jemalloc-prefix=je_ --enable-cc-silence && $(MAKE) lib/libjemalloc.a
 +
 +redis-server: dependencies $(OBJ)
 +      $(QUIET_LINK)$(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ) $(CCLINK) $(ALLOC_LINK) ../deps/lua/src/liblua.a
  
  redis-benchmark: dependencies $(BENCHOBJ)
        @cd ../deps/hiredis && $(MAKE) static
 -      @$(CC) -o $(BENCHPRGNAME) $(CCOPT) $(DEBUG) $(BENCHOBJ) ../deps/hiredis/libhiredis.a
 -      @echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR)
 +      $(QUIET_LINK)$(CC) -o $(BENCHPRGNAME) $(CCOPT) $(DEBUG) $(BENCHOBJ) ../deps/hiredis/libhiredis.a $(CCLINK) $(ALLOC_LINK)
  
  redis-benchmark.o:
 -      @$(CC) -c $(CFLAGS) -I../deps/hiredis $(DEBUG) $(COMPILE_TIME) $<
 -      @echo $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$(<)$(ENDCOLOR)
 +      $(QUIET_CC)$(CC) -c $(CFLAGS) -I../deps/hiredis $(DEBUG) $(COMPILE_TIME) $<
  
  redis-cli: dependencies $(CLIOBJ)
 -      @$(CC) -o $(CLIPRGNAME) $(CCOPT) $(DEBUG) $(CLIOBJ) ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o
 -      @echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR)
 +      $(QUIET_LINK)$(CC) -o $(CLIPRGNAME) $(CCOPT) $(DEBUG) $(CLIOBJ) ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o $(CCLINK) $(ALLOC_LINK)
  
  redis-cli.o:
 -      @$(CC) -c $(CFLAGS) -I../deps/hiredis -I../deps/linenoise $(DEBUG) $(COMPILE_TIME) $<
 -      @echo $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$(<)$(ENDCOLOR)
 +      $(QUIET_CC)$(CC) -c $(CFLAGS) -I../deps/hiredis -I../deps/linenoise $(DEBUG) $(COMPILE_TIME) $<
  
  redis-check-dump: $(CHECKDUMPOBJ)
 -      @$(CC) -o $(CHECKDUMPPRGNAME) $(CCOPT) $(DEBUG) $(CHECKDUMPOBJ)
 -      @echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR)
 +      $(QUIET_LINK)$(CC) -o $(CHECKDUMPPRGNAME) $(CCOPT) $(DEBUG) $(CHECKDUMPOBJ) $(CCLINK) $(ALLOC_LINK)
  
  redis-check-aof: $(CHECKAOFOBJ)
 -      @$(CC) -o $(CHECKAOFPRGNAME) $(CCOPT) $(DEBUG) $(CHECKAOFOBJ)
 -      @echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR)
 +      $(QUIET_LINK)$(CC) -o $(CHECKAOFPRGNAME) $(CCOPT) $(DEBUG) $(CHECKAOFOBJ) $(CCLINK) $(ALLOC_LINK)
  
 -.c.o:
 -      @$(CC) -c $(CFLAGS) $(DEBUG) $(COMPILE_TIME) $<
 -      @echo $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$(<)$(ENDCOLOR)
 +# Because the jemalloc.h header is generated as a part of the jemalloc build
 +# process, building it should complete before building any other object.
 +%.o: %.c $(ALLOC_DEP)
 +      $(QUIET_CC)$(CC) -c $(CFLAGS) $(DEBUG) $(COMPILE_TIME) -I../deps/lua/src $<
  
  clean:
        rm -rf $(PRGNAME) $(BENCHPRGNAME) $(CLIPRGNAME) $(CHECKDUMPPRGNAME) $(CHECKAOFPRGNAME) *.o *.gcda *.gcno *.gcov
 +      cd ../deps/hiredis && $(MAKE) $@
 +      cd ../deps/linenoise && $(MAKE) $@
 +      cd ../deps/lua && $(MAKE) $@
 +      -(cd ../deps/jemalloc && $(MAKE) distclean)
  
  dep:
        $(CC) -MM *.c -I ../deps/hiredis -I ../deps/linenoise
  
 -test: redis-server
 -      (cd ..; tclsh8.5 tests/test_helper.tcl --tags "${TAGS}" --file "${FILE}")
 +test: redis-server redis-check-aof
 +      @(cd ..; ./runtest)
  
  bench:
        ./redis-benchmark
@@@ -217,7 -191,7 +218,7 @@@ log
        @echo ""
        @echo "WARNING: if it fails under Linux you probably need to install libc6-dev-i386"
        @echo ""
 -      $(MAKE) ARCH="-m32"
 +      $(MAKE) ARCH="-m32" JEMALLOC_CFLAGS='CFLAGS="-std=gnu99 -Wall -pipe -g3 -fvisibility=hidden -O3 -funroll-loops -m32"'
  
  gprof:
        $(MAKE) PROF="-pg"
@@@ -231,9 -205,6 +232,9 @@@ noopt
  32bitgprof:
        $(MAKE) PROF="-pg" ARCH="-arch i386"
  
 +src/help.h:
 +      @../utils/generate-command-help.rb > help.h
 +
  install: all
        mkdir -p $(INSTALL_BIN)
        $(INSTALL) $(PRGNAME) $(INSTALL_BIN)
diff --combined src/aof.c
index 8d65428182d6f32dea7749df6981205c85697640,aadf644811bdf76c924ed9518413b11c14b4d1fe..b86357de59ebabe1e73c9167ba54e8b667cd9bb3
+++ b/src/aof.c
@@@ -1,6 -1,4 +1,7 @@@
  #include "redis.h"
 +#include "bio.h"
++#include "rio.h"
 +
  #include <signal.h>
  #include <fcntl.h>
  #include <sys/stat.h>
@@@ -8,17 -6,12 +9,17 @@@
  #include <sys/time.h>
  #include <sys/resource.h>
  #include <sys/wait.h>
 -#include "rio.h"
 +
 +void aofUpdateCurrentSize(void);
 +
 +void aof_background_fsync(int fd) {
 +    bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
 +}
  
  /* Called when the user switches from "appendonly yes" to "appendonly no"
   * at runtime using the CONFIG command. */
  void stopAppendOnly(void) {
 -    flushAppendOnlyFile();
 +    flushAppendOnlyFile(1);
      aof_fsync(server.appendfd);
      close(server.appendfd);
  
      server.appendseldb = -1;
      server.appendonly = 0;
      /* rewrite operation in progress? kill it, wait child exit */
 -    if (server.bgsavechildpid != -1) {
 +    if (server.bgrewritechildpid != -1) {
          int statloc;
  
 -        if (kill(server.bgsavechildpid,SIGKILL) != -1)
 +        if (kill(server.bgrewritechildpid,SIGKILL) != -1)
              wait3(&statloc,0,NULL);
          /* reset the buffer accumulating changes while the child saves */
          sdsfree(server.bgrewritebuf);
          server.bgrewritebuf = sdsempty();
 -        server.bgsavechildpid = -1;
 +        server.bgrewritechildpid = -1;
      }
  }
  
@@@ -63,121 -56,62 +64,121 @@@ int startAppendOnly(void) 
   * and the only way the client socket can get a write is entering when the
   * the event loop, we accumulate all the AOF writes in a memory
   * buffer and write it on disk using this function just before entering
 - * the event loop again. */
 -void flushAppendOnlyFile(void) {
 -    time_t now;
 + * the event loop again.
 + *
 + * About the 'force' argument:
 + *
 + * When the fsync policy is set to 'everysec' we may delay the flush if there
 + * is still an fsync() going on in the background thread, since for instance
 + * on Linux write(2) will be blocked by the background fsync anyway.
 + * When this happens we remember that there is some aof buffer to be
 + * flushed ASAP, and will try to do that in the serverCron() function.
 + *
 + * However if force is set to 1 we'll write regardless of the background
 + * fsync. */
 +void flushAppendOnlyFile(int force) {
      ssize_t nwritten;
 +    int sync_in_progress = 0;
  
      if (sdslen(server.aofbuf) == 0) return;
  
 +    if (server.appendfsync == APPENDFSYNC_EVERYSEC)
 +        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
 +
 +    if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) {
 +        /* With this append fsync policy we do background fsyncing.
 +         * If the fsync is still in progress we can try to delay
 +         * the write for a couple of seconds. */
 +        if (sync_in_progress) {
 +            if (server.aof_flush_postponed_start == 0) {
 +                /* No previous write postponinig, remember that we are
 +                 * postponing the flush and return. */
 +                server.aof_flush_postponed_start = server.unixtime;
 +                return;
 +            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
 +                /* We were already waiting for fsync to finish, but for less
 +                 * than two seconds this is still ok. Postpone again. */
 +                return;
 +            }
 +            /* Otherwise fall trough, and go write since we can't wait
 +             * over two seconds. */
 +            redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
 +        }
 +    }
 +    /* If you are following this code path, then we are going to write so
 +     * set reset the postponed flush sentinel to zero. */
 +    server.aof_flush_postponed_start = 0;
 +
      /* We want to perform a single write. This should be guaranteed atomic
       * at least if the filesystem we are writing is a real physical one.
       * While this will save us against the server being killed I don't think
       * there is much to do about the whole server stopping for power problems
       * or alike */
 -     nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
 -     if (nwritten != (signed)sdslen(server.aofbuf)) {
 +    nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
 +    if (nwritten != (signed)sdslen(server.aofbuf)) {
          /* Ooops, we are in troubles. The best thing to do for now is
           * aborting instead of giving the illusion that everything is
           * working as expected. */
 -         if (nwritten == -1) {
 +        if (nwritten == -1) {
              redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
 -         } else {
 +        } else {
              redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
 -         }
 -         exit(1);
 +        }
 +        exit(1);
      }
 -    sdsfree(server.aofbuf);
 -    server.aofbuf = sdsempty();
 +    server.appendonly_current_size += nwritten;
  
 -    /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have
 -     * childs performing heavy I/O on disk. */
 +    /* Re-use AOF buffer when it is small enough. The maximum comes from the
 +     * arena size of 4k minus some overhead (but is otherwise arbitrary). */
 +    if ((sdslen(server.aofbuf)+sdsavail(server.aofbuf)) < 4000) {
 +        sdsclear(server.aofbuf);
 +    } else {
 +        sdsfree(server.aofbuf);
 +        server.aofbuf = sdsempty();
 +    }
 +
 +    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
 +     * children doing I/O in the background. */
      if (server.no_appendfsync_on_rewrite &&
          (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1))
              return;
 -    /* Fsync if needed */
 -    now = time(NULL);
 -    if (server.appendfsync == APPENDFSYNC_ALWAYS ||
 -        (server.appendfsync == APPENDFSYNC_EVERYSEC &&
 -         now-server.lastfsync > 1))
 -    {
 +
 +    /* Perform the fsync if needed. */
 +    if (server.appendfsync == APPENDFSYNC_ALWAYS) {
          /* aof_fsync is defined as fdatasync() for Linux in order to avoid
           * flushing metadata. */
          aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
 -        server.lastfsync = now;
 +        server.lastfsync = server.unixtime;
 +    } else if ((server.appendfsync == APPENDFSYNC_EVERYSEC &&
 +                server.unixtime > server.lastfsync)) {
 +        if (!sync_in_progress) aof_background_fsync(server.appendfd);
 +        server.lastfsync = server.unixtime;
      }
  }
  
 -sds catAppendOnlyGenericCommand(sds buf, int argc, robj **argv) {
 -    int j;
 -    buf = sdscatprintf(buf,"*%d\r\n",argc);
 +sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
 +    char buf[32];
 +    int len, j;
 +    robj *o;
 +
 +    buf[0] = '*';
 +    len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
 +    buf[len++] = '\r';
 +    buf[len++] = '\n';
 +    dst = sdscatlen(dst,buf,len);
 +
      for (j = 0; j < argc; j++) {
 -        robj *o = getDecodedObject(argv[j]);
 -        buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
 -        buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
 -        buf = sdscatlen(buf,"\r\n",2);
 +        o = getDecodedObject(argv[j]);
 +        buf[0] = '$';
 +        len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
 +        buf[len++] = '\r';
 +        buf[len++] = '\n';
 +        dst = sdscatlen(dst,buf,len);
 +        dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
 +        dst = sdscatlen(dst,"\r\n",2);
          decrRefCount(o);
      }
 -    return buf;
 +    return dst;
  }
  
  sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) {
@@@ -287,7 -221,6 +288,7 @@@ int loadAppendOnlyFile(char *filename) 
      long loops = 0;
  
      if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
 +        server.appendonly_current_size = 0;
          fclose(fp);
          return REDIS_ERR;
      }
          }
          if (buf[0] != '*') goto fmterr;
          argc = atoi(buf+1);
 +        if (argc < 1) goto fmterr;
 +
          argv = zmalloc(sizeof(robj*)*argc);
          for (j = 0; j < argc; j++) {
              if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
  
          /* The fake client should not have a reply */
          redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
 +        /* The fake client should never get blocked */
 +        redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
  
          /* Clean up. Command code may have changed argv/argc so we use the
           * argv/argc of the client instead of the local variables. */
      freeFakeClient(fakeClient);
      server.appendonly = appendonly;
      stopLoading();
 +    aofUpdateCurrentSize();
 +    server.auto_aofrewrite_base_size = server.appendonly_current_size;
      return REDIS_OK;
  
  readerr:
@@@ -386,11 -313,26 +387,26 @@@ fmterr
      exit(1);
  }
  
+ /* Delegate writing an object to writing a bulk string or bulk long long.
+  * This is not placed in rio.c since that adds the redis.h dependency. */
+ int rioWriteBulkObject(rio *r, robj *obj) {
+     /* Avoid using getDecodedObject to help copy-on-write (we are often
+      * in a child process when this function is called). */
+     if (obj->encoding == REDIS_ENCODING_INT) {
+         return rioWriteBulkLongLong(r,(long)obj->ptr);
+     } else if (obj->encoding == REDIS_ENCODING_RAW) {
+         return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr));
+     } else {
+         redisPanic("Unknown string encoding");
+     }
+ }
  /* Write a sequence of commands able to fully rebuild the dataset into
   * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
  int rewriteAppendOnlyFile(char *filename) {
      dictIterator *di = NULL;
      dictEntry *de;
+     rio aof;
      FILE *fp;
      char tmpfile[256];
      int j;
          redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
          return REDIS_ERR;
      }
+     aof = rioInitWithFile(fp);
      for (j = 0; j < server.dbnum; j++) {
          char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
          redisDb *db = server.db+j;
          dict *d = db->dict;
          if (dictSize(d) == 0) continue;
 -        di = dictGetIterator(d);
 +        di = dictGetSafeIterator(d);
          if (!di) {
              fclose(fp);
              return REDIS_ERR;
          }
  
          /* SELECT the new DB */
-         if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
-         if (fwriteBulkLongLong(fp,j) == 0) goto werr;
+         if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
+         if (rioWriteBulkLongLong(&aof,j) == 0) goto werr;
  
          /* Iterate this DB writing every entry */
          while((de = dictNext(di)) != NULL) {
              if (o->type == REDIS_STRING) {
                  /* Emit a SET command */
                  char cmd[]="*3\r\n$3\r\nSET\r\n";
-                 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                 if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
                  /* Key and value */
-                 if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                 if (fwriteBulkObject(fp,o) == 0) goto werr;
+                 if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
+                 if (rioWriteBulkObject(&aof,o) == 0) goto werr;
              } else if (o->type == REDIS_LIST) {
                  /* Emit the RPUSHes needed to rebuild the list */
                  char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
                      long long vlong;
  
                      while(ziplistGet(p,&vstr,&vlen,&vlong)) {
-                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
+                         if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
                          if (vstr) {
-                             if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
+                             if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0)
                                  goto werr;
                          } else {
-                             if (fwriteBulkLongLong(fp,vlong) == 0)
+                             if (rioWriteBulkLongLong(&aof,vlong) == 0)
                                  goto werr;
                          }
                          p = ziplistNext(zl,p);
                      while((ln = listNext(&li))) {
                          robj *eleobj = listNodeValue(ln);
  
-                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                         if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                         if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr;
                      }
                  } else {
                      redisPanic("Unknown list encoding");
                      int ii = 0;
                      int64_t llval;
                      while(intsetGet(o->ptr,ii++,&llval)) {
-                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                         if (fwriteBulkLongLong(fp,llval) == 0) goto werr;
+                         if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
+                         if (rioWriteBulkLongLong(&aof,llval) == 0) goto werr;
                      }
                  } else if (o->encoding == REDIS_ENCODING_HT) {
                      dictIterator *di = dictGetIterator(o->ptr);
                      dictEntry *de;
                      while((de = dictNext(di)) != NULL) {
                          robj *eleobj = dictGetEntryKey(de);
-                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                         if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                         if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr;
                      }
                      dictReleaseIterator(di);
                  } else {
                          redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
                          score = zzlGetScore(sptr);
  
-                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                         if (fwriteBulkDouble(fp,score) == 0) goto werr;
+                         if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
+                         if (rioWriteBulkDouble(&aof,score) == 0) goto werr;
                          if (vstr != NULL) {
-                             if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
+                             if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0)
                                  goto werr;
                          } else {
-                             if (fwriteBulkLongLong(fp,vll) == 0)
+                             if (rioWriteBulkLongLong(&aof,vll) == 0)
                                  goto werr;
                          }
                          zzlNext(zl,&eptr,&sptr);
                          robj *eleobj = dictGetEntryKey(de);
                          double *score = dictGetEntryVal(de);
  
-                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                         if (fwriteBulkDouble(fp,*score) == 0) goto werr;
-                         if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
+                         if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
+                         if (rioWriteBulkDouble(&aof,*score) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr;
                      }
                      dictReleaseIterator(di);
                  } else {
                      unsigned int flen, vlen;
  
                      while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
-                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                         if (fwriteBulkString(fp,(char*)field,flen) == 0)
+                         if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
+                         if (rioWriteBulkString(&aof,(char*)field,flen) == 0)
                              goto werr;
-                         if (fwriteBulkString(fp,(char*)val,vlen) == 0)
+                         if (rioWriteBulkString(&aof,(char*)val,vlen) == 0)
                              goto werr;
                      }
                  } else {
                          robj *field = dictGetEntryKey(de);
                          robj *val = dictGetEntryVal(de);
  
-                         if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                         if (fwriteBulkObject(fp,field) == 0) goto werr;
-                         if (fwriteBulkObject(fp,val) == 0) goto werr;
+                         if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,field) == 0) goto werr;
+                         if (rioWriteBulkObject(&aof,val) == 0) goto werr;
                      }
                      dictReleaseIterator(di);
                  }
                  char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
                  /* If this key is already expired skip it */
                  if (expiretime < now) continue;
-                 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                 if (fwriteBulkObject(fp,&key) == 0) goto werr;
-                 if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
+                 if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr;
+                 if (rioWriteBulkObject(&aof,&key) == 0) goto werr;
+                 if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr;
              }
          }
          dictReleaseIterator(di);
@@@ -638,14 -582,16 +656,14 @@@ werr
   */
  int rewriteAppendOnlyFileBackground(void) {
      pid_t childpid;
 +    long long start;
  
      if (server.bgrewritechildpid != -1) return REDIS_ERR;
 -    if (server.ds_enabled != 0) {
 -        redisLog(REDIS_WARNING,"BGREWRITEAOF called with diskstore enabled: AOF is not supported when diskstore is enabled. Operation not performed.");
 -        return REDIS_ERR;
 -    }
 +    start = ustime();
      if ((childpid = fork()) == 0) {
 -        /* Child */
          char tmpfile[256];
  
 +        /* Child */
          if (server.ipfd > 0) close(server.ipfd);
          if (server.sofd > 0) close(server.sofd);
          snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
          }
      } else {
          /* Parent */
 +        server.stat_fork_time = ustime()-start;
          if (childpid == -1) {
              redisLog(REDIS_WARNING,
                  "Can't rewrite append only file in background: fork: %s",
  void bgrewriteaofCommand(redisClient *c) {
      if (server.bgrewritechildpid != -1) {
          addReplyError(c,"Background append only file rewriting already in progress");
 -        return;
 -    }
 -    if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
 +    } else if (server.bgsavechildpid != -1) {
 +        server.aofrewrite_scheduled = 1;
 +        addReplyStatus(c,"Background append only file rewriting scheduled");
 +    } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
          addReplyStatus(c,"Background append only file rewriting started");
      } else {
          addReply(c,shared.err);
@@@ -697,146 -641,58 +715,146 @@@ void aofRemoveTempFile(pid_t childpid) 
      unlink(tmpfile);
  }
  
 +/* Update the server.appendonly_current_size filed explicitly using stat(2)
 + * to check the size of the file. This is useful after a rewrite or after
 + * a restart, normally the size is updated just adding the write length
 + * to the current lenght, that is much faster. */
 +void aofUpdateCurrentSize(void) {
 +    struct redis_stat sb;
 +
 +    if (redis_fstat(server.appendfd,&sb) == -1) {
 +        redisLog(REDIS_WARNING,"Unable to check the AOF length: %s",
 +            strerror(errno));
 +    } else {
 +        server.appendonly_current_size = sb.st_size;
 +    }
 +}
 +
  /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
   * Handle this. */
  void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
      if (!bysignal && exitcode == 0) {
 -        int fd;
 +        int newfd, oldfd;
 +        int nwritten;
          char tmpfile[256];
 +        long long now = ustime();
  
          redisLog(REDIS_NOTICE,
 -            "Background append only file rewriting terminated with success");
 -        /* Now it's time to flush the differences accumulated by the parent */
 -        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
 -        fd = open(tmpfile,O_WRONLY|O_APPEND);
 -        if (fd == -1) {
 -            redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
 +            "Background AOF rewrite terminated with success");
 +
 +        /* Flush the differences accumulated by the parent to the
 +         * rewritten AOF. */
 +        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
 +            (int)server.bgrewritechildpid);
 +        newfd = open(tmpfile,O_WRONLY|O_APPEND);
 +        if (newfd == -1) {
 +            redisLog(REDIS_WARNING,
 +                "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
              goto cleanup;
          }
 -        /* Flush our data... */
 -        if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
 -                (signed) sdslen(server.bgrewritebuf)) {
 -            redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
 -            close(fd);
 +
 +        nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf));
 +        if (nwritten != (signed)sdslen(server.bgrewritebuf)) {
 +            if (nwritten == -1) {
 +                redisLog(REDIS_WARNING,
 +                    "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
 +            } else {
 +                redisLog(REDIS_WARNING,
 +                    "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
 +            }
 +            close(newfd);
              goto cleanup;
          }
 -        redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
 -        /* Now our work is to rename the temp file into the stable file. And
 -         * switch the file descriptor used by the server for append only. */
 +
 +        redisLog(REDIS_NOTICE,
 +            "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);
 +
 +        /* The only remaining thing to do is to rename the temporary file to
 +         * the configured file and switch the file descriptor used to do AOF
 +         * writes. We don't want close(2) or rename(2) calls to block the
 +         * server on old file deletion.
 +         *
 +         * There are two possible scenarios:
 +         *
 +         * 1) AOF is DISABLED and this was a one time rewrite. The temporary
 +         * file will be renamed to the configured file. When this file already
 +         * exists, it will be unlinked, which may block the server.
 +         *
 +         * 2) AOF is ENABLED and the rewritten AOF will immediately start
 +         * receiving writes. After the temporary file is renamed to the
 +         * configured file, the original AOF file descriptor will be closed.
 +         * Since this will be the last reference to that file, closing it
 +         * causes the underlying file to be unlinked, which may block the
 +         * server.
 +         *
 +         * To mitigate the blocking effect of the unlink operation (either
 +         * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
 +         * use a background thread to take care of this. First, we
 +         * make scenario 1 identical to scenario 2 by opening the target file
 +         * when it exists. The unlink operation after the rename(2) will then
 +         * be executed upon calling close(2) for its descriptor. Everything to
 +         * guarantee atomicity for this switch has already happened by then, so
 +         * we don't care what the outcome or duration of that close operation
 +         * is, as long as the file descriptor is released again. */
 +        if (server.appendfd == -1) {
 +            /* AOF disabled */
 +
 +             /* Don't care if this fails: oldfd will be -1 and we handle that.
 +              * One notable case of -1 return is if the old file does
 +              * not exist. */
 +             oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK);
 +        } else {
 +            /* AOF enabled */
 +            oldfd = -1; /* We'll set this to the current AOF filedes later. */
 +        }
 +
 +        /* Rename the temporary file. This will not unlink the target file if
 +         * it exists, because we reference it with "oldfd". */
          if (rename(tmpfile,server.appendfilename) == -1) {
 -            redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
 -            close(fd);
 +            redisLog(REDIS_WARNING,
 +                "Error trying to rename the temporary AOF: %s", strerror(errno));
 +            close(newfd);
 +            if (oldfd != -1) close(oldfd);
              goto cleanup;
          }
 -        /* Mission completed... almost */
 -        redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
 -        if (server.appendfd != -1) {
 -            /* If append only is actually enabled... */
 -            close(server.appendfd);
 -            server.appendfd = fd;
 -            if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd);
 -            server.appendseldb = -1; /* Make sure it will issue SELECT */
 -            redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
 +
 +        if (server.appendfd == -1) {
 +            /* AOF disabled, we don't need to set the AOF file descriptor
 +             * to this new file, so we can close it. */
 +            close(newfd);
          } else {
 -            /* If append only is disabled we just generate a dump in this
 -             * format. Why not? */
 -            close(fd);
 +            /* AOF enabled, replace the old fd with the new one. */
 +            oldfd = server.appendfd;
 +            server.appendfd = newfd;
 +            if (server.appendfsync == APPENDFSYNC_ALWAYS)
 +                aof_fsync(newfd);
 +            else if (server.appendfsync == APPENDFSYNC_EVERYSEC)
 +                aof_background_fsync(newfd);
 +            server.appendseldb = -1; /* Make sure SELECT is re-issued */
 +            aofUpdateCurrentSize();
 +            server.auto_aofrewrite_base_size = server.appendonly_current_size;
 +
 +            /* Clear regular AOF buffer since its contents was just written to
 +             * the new AOF from the background rewrite buffer. */
 +            sdsfree(server.aofbuf);
 +            server.aofbuf = sdsempty();
          }
 +
 +        redisLog(REDIS_NOTICE, "Background AOF rewrite successful");
 +
 +        /* Asynchronously close the overwritten AOF. */
 +        if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
 +
 +        redisLog(REDIS_VERBOSE,
 +            "Background AOF rewrite signal handler took %lldus", ustime()-now);
      } else if (!bysignal && exitcode != 0) {
 -        redisLog(REDIS_WARNING, "Background append only file rewriting error");
 +        redisLog(REDIS_WARNING,
 +            "Background AOF rewrite terminated with error");
      } else {
          redisLog(REDIS_WARNING,
 -            "Background append only file rewriting terminated by signal %d",
 -            bysignal);
 +            "Background AOF rewrite terminated by signal %d", bysignal);
      }
 +
  cleanup:
      sdsfree(server.bgrewritebuf);
      server.bgrewritebuf = sdsempty();
diff --combined src/cluster.c
index e608c420c5df66fe2bdaba848a61e79fb5ccb599,1c0b1c23e54a3b7b7401fb9a9f99fc48d4aaf8a4..c6fc44b1a9f4ce52c7ea250c6c7cf0415ed97faa
@@@ -1383,14 -1383,13 +1383,13 @@@ void clusterCommand(redisClient *c) 
  
  /* RESTORE key ttl serialized-value */
  void restoreCommand(redisClient *c) {
-     FILE *fp;
-     char buf[64];
-     robj *o;
-     unsigned char *data;
      long ttl;
+     rio payload;
+     int type;
+     robj *obj;
  
      /* Make sure this key does not already exist here... */
 -    if (dbExists(c->db,c->argv[1])) {
 +    if (lookupKeyWrite(c->db,c->argv[1]) != NULL) {
          addReplyError(c,"Target key name is busy.");
          return;
      }
          return;
      }
  
-     /* rdbLoadObject() only works against file descriptors so we need to
-      * dump the serialized object into a file and reload. */
-     snprintf(buf,sizeof(buf),"redis-restore-%d.tmp",getpid());
-     fp = fopen(buf,"w+");
-     if (!fp) {
-         redisLog(REDIS_WARNING,"Can't open tmp file for RESTORE: %s",
-             strerror(errno));
-         addReplyErrorFormat(c,"RESTORE failed, tmp file creation error: %s",
-             strerror(errno));
-         return;
-     }
-     unlink(buf);
-     /* Write the actual data and rewind the file */
-     data = (unsigned char*) c->argv[3]->ptr;
-     if (fwrite(data+1,sdslen((sds)data)-1,1,fp) != 1) {
-         redisLog(REDIS_WARNING,"Can't write against tmp file for RESTORE: %s",
-             strerror(errno));
-         addReplyError(c,"RESTORE failed, tmp file I/O error.");
-         fclose(fp);
-         return;
-     }
-     rewind(fp);
-     /* Finally create the object from the serialized dump and
-      * store it at the specified key. */
-     if ((data[0] > 4 && data[0] < 9) ||
-          data[0] > 11 ||
-         (o = rdbLoadObject(data[0],fp)) == NULL)
+     payload = rioInitWithBuffer(c->argv[3]->ptr);
+     if (((type = rdbLoadObjectType(&payload)) == -1) ||
+         ((obj = rdbLoadObject(type,&payload)) == NULL))
      {
-         addReplyError(c,"Bad data format.");
-         fclose(fp);
+         addReplyError(c,"Bad data format");
          return;
      }
-     fclose(fp);
  
      /* Create the key and set the TTL if any */
-     dbAdd(c->db,c->argv[1],o);
+     dbAdd(c->db,c->argv[1],obj);
      if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl);
      addReply(c,shared.ok);
  }
@@@ -1450,12 -1421,9 +1421,9 @@@ void migrateCommand(redisClient *c) 
      int fd;
      long timeout;
      long dbid;
-     char buf[64];
-     FILE *fp;
      time_t ttl;
      robj *o;
-     unsigned char type;
-     off_t payload_len;
+     rio cmd, payload;
  
      /* Sanity check */
      if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK)
          return;
      }
  
-     /* Create temp file */
-     snprintf(buf,sizeof(buf),"redis-migrate-%d.tmp",getpid());
-     fp = fopen(buf,"w+");
-     if (!fp) {
-         redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s",
-             strerror(errno));
-         addReplyErrorFormat(c,"MIGRATE failed, tmp file creation error: %s.",
-             strerror(errno));
-         return;
-     }
-     unlink(buf);
-     /* Build the SELECT + RESTORE query writing it in our temp file. */
-     if (fwriteBulkCount(fp,'*',2) == 0) goto file_wr_err;
-     if (fwriteBulkString(fp,"SELECT",6) == 0) goto file_wr_err;
-     if (fwriteBulkLongLong(fp,dbid) == 0) goto file_wr_err;
+     cmd = rioInitWithBuffer(sdsempty());
+     redisAssert(rioWriteBulkCount(&cmd,'*',2));
+     redisAssert(rioWriteBulkString(&cmd,"SELECT",6));
+     redisAssert(rioWriteBulkLongLong(&cmd,dbid));
  
      ttl = getExpire(c->db,c->argv[3]);
-     type = o->type;
-     if (fwriteBulkCount(fp,'*',4) == 0) goto file_wr_err;
-     if (fwriteBulkString(fp,"RESTORE",7) == 0) goto file_wr_err;
-     if (fwriteBulkObject(fp,c->argv[3]) == 0) goto file_wr_err;
-     if (fwriteBulkLongLong(fp, (ttl == -1) ? 0 : ttl) == 0) goto file_wr_err;
+     redisAssert(rioWriteBulkCount(&cmd,'*',4));
+     redisAssert(rioWriteBulkString(&cmd,"RESTORE",7));
+     redisAssert(c->argv[3]->encoding == REDIS_ENCODING_RAW);
+     redisAssert(rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr)));
+     redisAssert(rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl));
  
      /* Finally the last argument that is the serailized object payload
-      * in the form: <type><rdb-serailized-object>. */
-     payload_len = rdbSavedObjectLen(o);
-     if (fwriteBulkCount(fp,'$',payload_len+1) == 0) goto file_wr_err;
-     if (fwrite(&type,1,1,fp) == 0) goto file_wr_err;
-     if (rdbSaveObject(fp,o) == -1) goto file_wr_err;
-     if (fwrite("\r\n",2,1,fp) == 0) goto file_wr_err;
-     /* Tranfer the query to the other node */
-     rewind(fp);
+      * in the form: <type><rdb-serialized-object>. */
+     payload = rioInitWithBuffer(sdsempty());
+     redisAssert(rdbSaveObjectType(&payload,o));
+     redisAssert(rdbSaveObject(&payload,o) != -1);
+     redisAssert(rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr)));
+     sdsfree(payload.io.buffer.ptr);
+     /* Tranfer the query to the other node in 64K chunks. */
      {
-         char buf[4096];
-         size_t nread;
-         while ((nread = fread(buf,1,sizeof(buf),fp)) != 0) {
-             int nwritten;
-             nwritten = syncWrite(fd,buf,nread,timeout);
-             if (nwritten != (signed)nread) goto socket_wr_err;
+         sds buf = cmd.io.buffer.ptr;
+         size_t pos = 0, towrite;
+         int nwritten = 0;
+         while ((towrite = sdslen(buf)-pos) > 0) {
+             towrite = (towrite > (64*1024) ? (64*1024) : towrite);
+             nwritten = syncWrite(fd,buf+nwritten,towrite,timeout);
+             if (nwritten != (signed)towrite) goto socket_wr_err;
+             pos += nwritten;
          }
-         if (ferror(fp)) goto file_rd_err;
      }
  
-     /* Read back the reply */
+     /* Read back the reply. */
      {
          char buf1[1024];
          char buf2[1024];
          if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0)
              goto socket_rd_err;
          if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0)
-                 goto socket_rd_err;
+             goto socket_rd_err;
          if (buf1[0] == '-' || buf2[0] == '-') {
              addReplyErrorFormat(c,"Target instance replied with error: %s",
                  (buf1[0] == '-') ? buf1+1 : buf2+1);
              addReply(c,shared.ok);
          }
      }
-     fclose(fp);
-     close(fd);
-     return;
  
- file_wr_err:
-     redisLog(REDIS_WARNING,"Can't write on tmp file for MIGRATE: %s",
-         strerror(errno));
-     addReplyErrorFormat(c,"MIGRATE failed, tmp file write error: %s.",
-         strerror(errno));
-     fclose(fp);
-     close(fd);
-     return;
- file_rd_err:
-     redisLog(REDIS_WARNING,"Can't read from tmp file for MIGRATE: %s",
-         strerror(errno));
-     addReplyErrorFormat(c,"MIGRATE failed, tmp file read error: %s.",
-         strerror(errno));
-     fclose(fp);
+     sdsfree(cmd.io.buffer.ptr);
      close(fd);
      return;
  
@@@ -1577,7 -1515,7 +1515,7 @@@ socket_wr_err
          strerror(errno));
      addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.",
          strerror(errno));
-     fclose(fp);
+     sdsfree(cmd.io.buffer.ptr);
      close(fd);
      return;
  
@@@ -1586,7 -1524,7 +1524,7 @@@ socket_rd_err
          strerror(errno));
      addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.",
          strerror(errno));
-     fclose(fp);
+     sdsfree(cmd.io.buffer.ptr);
      close(fd);
      return;
  }
   * DUMP is actually not used by Redis Cluster but it is the obvious
   * complement of RESTORE and can be useful for different applications. */
  void dumpCommand(redisClient *c) {
-     char buf[64];
-     FILE *fp;
      robj *o, *dumpobj;
-     sds dump = NULL;
-     off_t payload_len;
-     unsigned int type;
+     rio payload;
  
      /* Check if the key is here. */
      if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
          addReply(c,shared.nullbulk);
          return;
      }
-     
-     /* Create temp file */
-     snprintf(buf,sizeof(buf),"redis-dump-%d.tmp",getpid());
-     fp = fopen(buf,"w+");
-     if (!fp) {
-         redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s",
-             strerror(errno));
-         addReplyErrorFormat(c,"DUMP failed, tmp file creation error: %s.",
-             strerror(errno));
-         return;
-     }
-     unlink(buf);
-     /* Dump the serailized object and read it back in memory.
-      * We prefix it with a one byte containing the type ID.
-      * This is the serialization format understood by RESTORE. */
-     if (rdbSaveObject(fp,o) == -1) goto file_wr_err;
-     payload_len = ftello(fp);
-     if (fseeko(fp,0,SEEK_SET) == -1) goto file_rd_err;
-     dump = sdsnewlen(NULL,payload_len+1);
-     if (payload_len && fread(dump+1,payload_len,1,fp) != 1) goto file_rd_err;
-     fclose(fp);
-     type = o->type;
-     if (type == REDIS_LIST && o->encoding == REDIS_ENCODING_ZIPLIST)
-         type = REDIS_LIST_ZIPLIST;
-     else if (type == REDIS_HASH && o->encoding == REDIS_ENCODING_ZIPMAP)
-         type = REDIS_HASH_ZIPMAP;
-     else if (type == REDIS_SET && o->encoding == REDIS_ENCODING_INTSET)
-         type = REDIS_SET_INTSET;
-     else
-         type = o->type;
-     dump[0] = type;
+     /* Serialize the object in a RDB-like format. It consist of an object type
+      * byte followed by the serialized object. This is understood by RESTORE. */
+     payload = rioInitWithBuffer(sdsempty());
+     redisAssert(rdbSaveObjectType(&payload,o));
+     redisAssert(rdbSaveObject(&payload,o));
  
      /* Transfer to the client */
-     dumpobj = createObject(REDIS_STRING,dump);
+     dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr);
      addReplyBulk(c,dumpobj);
      decrRefCount(dumpobj);
      return;
- file_wr_err:
-     redisLog(REDIS_WARNING,"Can't write on tmp file for DUMP: %s",
-         strerror(errno));
-     addReplyErrorFormat(c,"DUMP failed, tmp file write error: %s.",
-         strerror(errno));
-     sdsfree(dump);
-     fclose(fp);
-     return;
- file_rd_err:
-     redisLog(REDIS_WARNING,"Can't read from tmp file for DUMP: %s",
-         strerror(errno));
-     addReplyErrorFormat(c,"DUMP failed, tmp file read error: %s.",
-         strerror(errno));
-     sdsfree(dump);
-     fclose(fp);
-     return;
  }
  
  /* -----------------------------------------------------------------------------
diff --combined src/rdb.c
index cfbec3e8672350b0d106d4839587a80b2b332c01,6d99375bc4c2a770241b65a1dfc1fc26ef932063..9bf470aa26e6a79aba46680c26a0a9a6cbd70d48
+++ b/src/rdb.c
@@@ -1,6 -1,3 +1,3 @@@
- #include "redis.h"
- #include "lzf.h"    /* LZF compression library */
  #include <math.h>
  #include <sys/types.h>
  #include <sys/time.h>
@@@ -8,59 -5,99 +5,99 @@@
  #include <sys/wait.h>
  #include <arpa/inet.h>
  #include <sys/stat.h>
+ #include "rdb.h"
+ #include "lzf.h" /* LZF compression library */
  
- /* Convenience wrapper around fwrite, that returns the number of bytes written
-  * to the file instead of the number of objects (see fwrite(3)) and -1 in the
-  * case of an error. It also supports a NULL *fp to skip writing altogether
-  * instead of writing to /dev/null. */
- static int rdbWriteRaw(FILE *fp, void *p, size_t len) {
-     if (fp != NULL && fwrite(p,len,1,fp) == 0) return -1;
+ static int rdbWriteRaw(rio *rdb, void *p, size_t len) {
+     if (rdb && rioWrite(rdb,p,len) == 0)
+         return -1;
      return len;
  }
  
- int rdbSaveType(FILE *fp, unsigned char type) {
-     return rdbWriteRaw(fp,&type,1);
+ int rdbSaveType(rio *rdb, unsigned char type) {
+     return rdbWriteRaw(rdb,&type,1);
+ }
+ int rdbLoadType(rio *rdb) {
+     unsigned char type;
+     if (rioRead(rdb,&type,1) == 0) return -1;
+     return type;
  }
  
- int rdbSaveTime(FILE *fp, time_t t) {
+ int rdbSaveTime(rio *rdb, time_t t) {
      int32_t t32 = (int32_t) t;
-     return rdbWriteRaw(fp,&t32,4);
+     return rdbWriteRaw(rdb,&t32,4);
+ }
+ time_t rdbLoadTime(rio *rdb) {
+     int32_t t32;
+     if (rioRead(rdb,&t32,4) == 0) return -1;
+     return (time_t)t32;
  }
  
- /* check rdbLoadLen() comments for more info */
- int rdbSaveLen(FILE *fp, uint32_t len) {
+ /* Saves an encoded length. The first two bits in the first byte are used to
+  * hold the encoding type. See the REDIS_RDB_* definitions for more information
+  * on the types of encoding. */
+ int rdbSaveLen(rio *rdb, uint32_t len) {
      unsigned char buf[2];
-     int nwritten;
+     size_t nwritten;
  
      if (len < (1<<6)) {
          /* Save a 6 bit len */
          buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
-         if (rdbWriteRaw(fp,buf,1) == -1) return -1;
+         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
          nwritten = 1;
      } else if (len < (1<<14)) {
          /* Save a 14 bit len */
          buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
          buf[1] = len&0xFF;
-         if (rdbWriteRaw(fp,buf,2) == -1) return -1;
+         if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
          nwritten = 2;
      } else {
          /* Save a 32 bit len */
          buf[0] = (REDIS_RDB_32BITLEN<<6);
-         if (rdbWriteRaw(fp,buf,1) == -1) return -1;
+         if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
          len = htonl(len);
-         if (rdbWriteRaw(fp,&len,4) == -1) return -1;
+         if (rdbWriteRaw(rdb,&len,4) == -4) return -1;
          nwritten = 1+4;
      }
      return nwritten;
  }
  
- /* Encode 'value' as an integer if possible (if integer will fit the
-  * supported range). If the function sucessful encoded the integer
-  * then the (up to 5 bytes) encoded representation is written in the
-  * string pointed by 'enc' and the length is returned. Otherwise
-  * 0 is returned. */
+ /* Load an encoded length. The "isencoded" argument is set to 1 if the length
+  * is not actually a length but an "encoding type". See the REDIS_RDB_ENC_*
+  * definitions in rdb.h for more information. */
+ uint32_t rdbLoadLen(rio *rdb, int *isencoded) {
+     unsigned char buf[2];
+     uint32_t len;
+     int type;
+     if (isencoded) *isencoded = 0;
+     if (rioRead(rdb,buf,1) == 0) return REDIS_RDB_LENERR;
+     type = (buf[0]&0xC0)>>6;
+     if (type == REDIS_RDB_ENCVAL) {
+         /* Read a 6 bit encoding type. */
+         if (isencoded) *isencoded = 1;
+         return buf[0]&0x3F;
+     } else if (type == REDIS_RDB_6BITLEN) {
+         /* Read a 6 bit len. */
+         return buf[0]&0x3F;
+     } else if (type == REDIS_RDB_14BITLEN) {
+         /* Read a 14 bit len. */
+         if (rioRead(rdb,buf+1,1) == 0) return REDIS_RDB_LENERR;
+         return ((buf[0]&0x3F)<<8)|buf[1];
+     } else {
+         /* Read a 32 bit len. */
+         if (rioRead(rdb,&len,4) == 0) return REDIS_RDB_LENERR;
+         return ntohl(len);
+     }
+ }
+ /* Encodes the "value" argument as integer when it fits in the supported ranges
+  * for encoded types. If the function successfully encodes the integer, the
+  * representation is stored in the buffer pointer to by "enc" and the string
+  * length is returned. Otherwise 0 is returned. */
  int rdbEncodeInteger(long long value, unsigned char *enc) {
-     /* Finally check if it fits in our ranges */
      if (value >= -(1<<7) && value <= (1<<7)-1) {
          enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
          enc[1] = value&0xFF;
      }
  }
  
+ /* Loads an integer-encoded object with the specified encoding type "enctype".
+  * If the "encode" argument is set the function may return an integer-encoded
+  * string object, otherwise it always returns a raw string object. */
+ robj *rdbLoadIntegerObject(rio *rdb, int enctype, int encode) {
+     unsigned char enc[4];
+     long long val;
+     if (enctype == REDIS_RDB_ENC_INT8) {
+         if (rioRead(rdb,enc,1) == 0) return NULL;
+         val = (signed char)enc[0];
+     } else if (enctype == REDIS_RDB_ENC_INT16) {
+         uint16_t v;
+         if (rioRead(rdb,enc,2) == 0) return NULL;
+         v = enc[0]|(enc[1]<<8);
+         val = (int16_t)v;
+     } else if (enctype == REDIS_RDB_ENC_INT32) {
+         uint32_t v;
+         if (rioRead(rdb,enc,4) == 0) return NULL;
+         v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
+         val = (int32_t)v;
+     } else {
+         val = 0; /* anti-warning */
+         redisPanic("Unknown RDB integer encoding type");
+     }
+     if (encode)
+         return createStringObjectFromLongLong(val);
+     else
+         return createObject(REDIS_STRING,sdsfromlonglong(val));
+ }
  /* String objects in the form "2391" "-100" without any space and with a
   * range of values that can fit in an 8, 16 or 32 bit signed value can be
   * encoded as integers to save space */
@@@ -101,7 -168,7 +168,7 @@@ int rdbTryIntegerEncoding(char *s, size
      return rdbEncodeInteger(value,enc);
  }
  
- int rdbSaveLzfStringObject(FILE *fp, unsigned char *s, size_t len) {
+ int rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) {
      size_t comprlen, outlen;
      unsigned char byte;
      int n, nwritten = 0;
      }
      /* Data compressed! Let's save it on disk */
      byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
-     if ((n = rdbWriteRaw(fp,&byte,1)) == -1) goto writeerr;
+     if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr;
      nwritten += n;
  
-     if ((n = rdbSaveLen(fp,comprlen)) == -1) goto writeerr;
+     if ((n = rdbSaveLen(rdb,comprlen)) == -1) goto writeerr;
      nwritten += n;
  
-     if ((n = rdbSaveLen(fp,len)) == -1) goto writeerr;
+     if ((n = rdbSaveLen(rdb,len)) == -1) goto writeerr;
      nwritten += n;
  
-     if ((n = rdbWriteRaw(fp,out,comprlen)) == -1) goto writeerr;
+     if ((n = rdbWriteRaw(rdb,out,comprlen)) == -1) goto writeerr;
      nwritten += n;
  
      zfree(out);
@@@ -138,9 -205,28 +205,28 @@@ writeerr
      return -1;
  }
  
+ robj *rdbLoadLzfStringObject(rio *rdb) {
+     unsigned int len, clen;
+     unsigned char *c = NULL;
+     sds val = NULL;
+     if ((clen = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
+     if ((len = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
+     if ((c = zmalloc(clen)) == NULL) goto err;
+     if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
+     if (rioRead(rdb,c,clen) == 0) goto err;
+     if (lzf_decompress(c,clen,val,len) == 0) goto err;
+     zfree(c);
+     return createObject(REDIS_STRING,val);
+ err:
+     zfree(c);
+     sdsfree(val);
+     return NULL;
+ }
  /* Save a string objet as [len][data] on disk. If the object is a string
   * representation of an integer value we try to save it in a special form */
- int rdbSaveRawString(FILE *fp, unsigned char *s, size_t len) {
+ int rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
      int enclen;
      int n, nwritten = 0;
  
      if (len <= 11) {
          unsigned char buf[5];
          if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
-             if (rdbWriteRaw(fp,buf,enclen) == -1) return -1;
+             if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;
              return enclen;
          }
      }
      /* Try LZF compression - under 20 bytes it's unable to compress even
       * aaaaaaaaaaaaaaaaaa so skip it */
      if (server.rdbcompression && len > 20) {
-         n = rdbSaveLzfStringObject(fp,s,len);
+         n = rdbSaveLzfStringObject(rdb,s,len);
          if (n == -1) return -1;
          if (n > 0) return n;
          /* Return value of 0 means data can't be compressed, save the old way */
      }
  
      /* Store verbatim */
-     if ((n = rdbSaveLen(fp,len)) == -1) return -1;
+     if ((n = rdbSaveLen(rdb,len)) == -1) return -1;
      nwritten += n;
      if (len > 0) {
-         if (rdbWriteRaw(fp,s,len) == -1) return -1;
+         if (rdbWriteRaw(rdb,s,len) == -1) return -1;
          nwritten += len;
      }
      return nwritten;
  }
  
  /* Save a long long value as either an encoded string or a string. */
- int rdbSaveLongLongAsStringObject(FILE *fp, long long value) {
+ int rdbSaveLongLongAsStringObject(rio *rdb, long long value) {
      unsigned char buf[32];
      int n, nwritten = 0;
      int enclen = rdbEncodeInteger(value,buf);
      if (enclen > 0) {
-         return rdbWriteRaw(fp,buf,enclen);
+         return rdbWriteRaw(rdb,buf,enclen);
      } else {
          /* Encode as string */
          enclen = ll2string((char*)buf,32,value);
          redisAssert(enclen < 32);
-         if ((n = rdbSaveLen(fp,enclen)) == -1) return -1;
+         if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1;
          nwritten += n;
-         if ((n = rdbWriteRaw(fp,buf,enclen)) == -1) return -1;
+         if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1;
          nwritten += n;
      }
      return nwritten;
  }
  
  /* Like rdbSaveStringObjectRaw() but handle encoded objects */
- int rdbSaveStringObject(FILE *fp, robj *obj) {
+ int rdbSaveStringObject(rio *rdb, robj *obj) {
      /* Avoid to decode the object, then encode it again, if the
       * object is alrady integer encoded. */
      if (obj->encoding == REDIS_ENCODING_INT) {
-         return rdbSaveLongLongAsStringObject(fp,(long)obj->ptr);
+         return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
      } else {
          redisAssert(obj->encoding == REDIS_ENCODING_RAW);
-         return rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
+         return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));
      }
  }
  
+ robj *rdbGenericLoadStringObject(rio *rdb, int encode) {
+     int isencoded;
+     uint32_t len;
+     sds val;
+     len = rdbLoadLen(rdb,&isencoded);
+     if (isencoded) {
+         switch(len) {
+         case REDIS_RDB_ENC_INT8:
+         case REDIS_RDB_ENC_INT16:
+         case REDIS_RDB_ENC_INT32:
+             return rdbLoadIntegerObject(rdb,len,encode);
+         case REDIS_RDB_ENC_LZF:
+             return rdbLoadLzfStringObject(rdb);
+         default:
+             redisPanic("Unknown RDB encoding type");
+         }
+     }
+     if (len == REDIS_RDB_LENERR) return NULL;
+     val = sdsnewlen(NULL,len);
+     if (len && rioRead(rdb,val,len) == 0) {
+         sdsfree(val);
+         return NULL;
+     }
+     return createObject(REDIS_STRING,val);
+ }
+ robj *rdbLoadStringObject(rio *rdb) {
+     return rdbGenericLoadStringObject(rdb,0);
+ }
+ robj *rdbLoadEncodedStringObject(rio *rdb) {
+     return rdbGenericLoadStringObject(rdb,1);
+ }
  /* Save a double value. Doubles are saved as strings prefixed by an unsigned
   * 8 bit integer specifing the length of the representation.
   * This 8 bit integer has special values in order to specify the following
   * 254: + inf
   * 255: - inf
   */
- int rdbSaveDoubleValue(FILE *fp, double val) {
+ int rdbSaveDoubleValue(rio *rdb, double val) {
      unsigned char buf[128];
      int len;
  
          buf[0] = strlen((char*)buf+1);
          len = buf[0]+1;
      }
-     return rdbWriteRaw(fp,buf,len);
+     return rdbWriteRaw(rdb,buf,len);
+ }
+ /* For information about double serialization check rdbSaveDoubleValue() */
+ int rdbLoadDoubleValue(rio *rdb, double *val) {
+     char buf[128];
+     unsigned char len;
+     if (rioRead(rdb,&len,1) == 0) return -1;
+     switch(len) {
+     case 255: *val = R_NegInf; return 0;
+     case 254: *val = R_PosInf; return 0;
+     case 253: *val = R_Nan; return 0;
+     default:
+         if (rioRead(rdb,buf,len) == 0) return -1;
+         buf[len] = '\0';
+         sscanf(buf, "%lg", val);
+         return 0;
+     }
+ }
+ /* Save the object type of object "o". */
+ int rdbSaveObjectType(rio *rdb, robj *o) {
+     switch (o->type) {
+     case REDIS_STRING:
+         return rdbSaveType(rdb,REDIS_RDB_TYPE_STRING);
+     case REDIS_LIST:
+         if (o->encoding == REDIS_ENCODING_ZIPLIST)
+             return rdbSaveType(rdb,REDIS_RDB_TYPE_LIST_ZIPLIST);
+         else if (o->encoding == REDIS_ENCODING_LINKEDLIST)
+             return rdbSaveType(rdb,REDIS_RDB_TYPE_LIST);
+         else
+             redisPanic("Unknown list encoding");
+     case REDIS_SET:
+         if (o->encoding == REDIS_ENCODING_INTSET)
+             return rdbSaveType(rdb,REDIS_RDB_TYPE_SET_INTSET);
+         else if (o->encoding == REDIS_ENCODING_HT)
+             return rdbSaveType(rdb,REDIS_RDB_TYPE_SET);
+         else
+             redisPanic("Unknown set encoding");
+     case REDIS_ZSET:
+         if (o->encoding == REDIS_ENCODING_ZIPLIST)
+             return rdbSaveType(rdb,REDIS_RDB_TYPE_ZSET_ZIPLIST);
+         else if (o->encoding == REDIS_ENCODING_SKIPLIST)
+             return rdbSaveType(rdb,REDIS_RDB_TYPE_ZSET);
+         else
+             redisPanic("Unknown sorted set encoding");
+     case REDIS_HASH:
+         if (o->encoding == REDIS_ENCODING_ZIPMAP)
+             return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH_ZIPMAP);
+         else if (o->encoding == REDIS_ENCODING_HT)
+             return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH);
+         else
+             redisPanic("Unknown hash encoding");
+     default:
+         redisPanic("Unknown object type");
+     }
+     return -1; /* avoid warning */
+ }
+ /* Load object type. Return -1 when the byte doesn't contain an object type. */
+ int rdbLoadObjectType(rio *rdb) {
+     int type;
+     if ((type = rdbLoadType(rdb)) == -1) return -1;
+     if (!rdbIsObjectType(type)) return -1;
+     return type;
  }
  
  /* Save a Redis object. Returns -1 on error, 0 on success. */
- int rdbSaveObject(FILE *fp, robj *o) {
+ int rdbSaveObject(rio *rdb, robj *o) {
      int n, nwritten = 0;
  
      if (o->type == REDIS_STRING) {
          /* Save a string value */
-         if ((n = rdbSaveStringObject(fp,o)) == -1) return -1;
+         if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
          nwritten += n;
      } else if (o->type == REDIS_LIST) {
          /* Save a list value */
          if (o->encoding == REDIS_ENCODING_ZIPLIST) {
              size_t l = ziplistBlobLen((unsigned char*)o->ptr);
  
-             if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
+             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
              nwritten += n;
          } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
              list *list = o->ptr;
              listIter li;
              listNode *ln;
  
-             if ((n = rdbSaveLen(fp,listLength(list))) == -1) return -1;
+             if ((n = rdbSaveLen(rdb,listLength(list))) == -1) return -1;
              nwritten += n;
  
              listRewind(list,&li);
              while((ln = listNext(&li))) {
                  robj *eleobj = listNodeValue(ln);
-                 if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
+                 if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
                  nwritten += n;
              }
          } else {
              dictIterator *di = dictGetIterator(set);
              dictEntry *de;
  
-             if ((n = rdbSaveLen(fp,dictSize(set))) == -1) return -1;
+             if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) return -1;
              nwritten += n;
  
              while((de = dictNext(di)) != NULL) {
                  robj *eleobj = dictGetEntryKey(de);
-                 if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
+                 if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
                  nwritten += n;
              }
              dictReleaseIterator(di);
          } else if (o->encoding == REDIS_ENCODING_INTSET) {
              size_t l = intsetBlobLen((intset*)o->ptr);
  
-             if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
+             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
              nwritten += n;
          } else {
              redisPanic("Unknown set encoding");
          if (o->encoding == REDIS_ENCODING_ZIPLIST) {
              size_t l = ziplistBlobLen((unsigned char*)o->ptr);
  
-             if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
+             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
              nwritten += n;
          } else if (o->encoding == REDIS_ENCODING_SKIPLIST) {
              zset *zs = o->ptr;
              dictIterator *di = dictGetIterator(zs->dict);
              dictEntry *de;
  
-             if ((n = rdbSaveLen(fp,dictSize(zs->dict))) == -1) return -1;
+             if ((n = rdbSaveLen(rdb,dictSize(zs->dict))) == -1) return -1;
              nwritten += n;
  
              while((de = dictNext(di)) != NULL) {
                  robj *eleobj = dictGetEntryKey(de);
                  double *score = dictGetEntryVal(de);
  
-                 if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
+                 if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
                  nwritten += n;
-                 if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1;
+                 if ((n = rdbSaveDoubleValue(rdb,*score)) == -1) return -1;
                  nwritten += n;
              }
              dictReleaseIterator(di);
          if (o->encoding == REDIS_ENCODING_ZIPMAP) {
              size_t l = zipmapBlobLen((unsigned char*)o->ptr);
  
-             if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
+             if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
              nwritten += n;
          } else {
              dictIterator *di = dictGetIterator(o->ptr);
              dictEntry *de;
  
-             if ((n = rdbSaveLen(fp,dictSize((dict*)o->ptr))) == -1) return -1;
+             if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) return -1;
              nwritten += n;
  
              while((de = dictNext(di)) != NULL) {
                  robj *key = dictGetEntryKey(de);
                  robj *val = dictGetEntryVal(de);
  
-                 if ((n = rdbSaveStringObject(fp,key)) == -1) return -1;
+                 if ((n = rdbSaveStringObject(rdb,key)) == -1) return -1;
                  nwritten += n;
-                 if ((n = rdbSaveStringObject(fp,val)) == -1) return -1;
+                 if ((n = rdbSaveStringObject(rdb,val)) == -1) return -1;
                  nwritten += n;
              }
              dictReleaseIterator(di);
@@@ -374,33 -561,21 +561,21 @@@ off_t rdbSavedObjectLen(robj *o) 
   * On error -1 is returned.
   * On success if the key was actaully saved 1 is returned, otherwise 0
   * is returned (the key was already expired). */
- int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val,
+ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
                          time_t expiretime, time_t now)
  {
-     int vtype;
      /* Save the expire time */
      if (expiretime != -1) {
          /* If this key is already expired skip it */
          if (expiretime < now) return 0;
-         if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) return -1;
-         if (rdbSaveTime(fp,expiretime) == -1) return -1;
+         if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME) == -1) return -1;
+         if (rdbSaveTime(rdb,expiretime) == -1) return -1;
      }
-     /* Fix the object type if needed, to support saving zipmaps, ziplists,
-      * and intsets, directly as blobs of bytes: they are already serialized. */
-     vtype = val->type;
-     if (vtype == REDIS_HASH && val->encoding == REDIS_ENCODING_ZIPMAP)
-         vtype = REDIS_HASH_ZIPMAP;
-     else if (vtype == REDIS_LIST && val->encoding == REDIS_ENCODING_ZIPLIST)
-         vtype = REDIS_LIST_ZIPLIST;
-     else if (vtype == REDIS_SET && val->encoding == REDIS_ENCODING_INTSET)
-         vtype = REDIS_SET_INTSET;
-     else if (vtype == REDIS_ZSET && val->encoding == REDIS_ENCODING_ZIPLIST)
-         vtype = REDIS_ZSET_ZIPLIST;
      /* Save type, key, value */
-     if (rdbSaveType(fp,vtype) == -1) return -1;
-     if (rdbSaveStringObject(fp,key) == -1) return -1;
-     if (rdbSaveObject(fp,val) == -1) return -1;
+     if (rdbSaveObjectType(rdb,val) == -1) return -1;
+     if (rdbSaveStringObject(rdb,key) == -1) return -1;
+     if (rdbSaveObject(rdb,val) == -1) return -1;
      return 1;
  }
  
  int rdbSave(char *filename) {
      dictIterator *di = NULL;
      dictEntry *de;
-     FILE *fp;
      char tmpfile[256];
      int j;
      time_t now = time(NULL);
+     FILE *fp;
+     rio rdb;
  
 -    if (server.ds_enabled) {
 -        cacheForcePointInTime();
 -        return dsRdbSave(filename);
 -    }
 -
      snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
      fp = fopen(tmpfile,"w");
      if (!fp) {
              strerror(errno));
          return REDIS_ERR;
      }
-     if (fwrite("REDIS0002",9,1,fp) == 0) goto werr;
+     rdb = rioInitWithFile(fp);
+     if (rdbWriteRaw(&rdb,"REDIS0002",9) == -1) goto werr;
      for (j = 0; j < server.dbnum; j++) {
          redisDb *db = server.db+j;
          dict *d = db->dict;
          if (dictSize(d) == 0) continue;
 -        di = dictGetIterator(d);
 +        di = dictGetSafeIterator(d);
          if (!di) {
              fclose(fp);
              return REDIS_ERR;
          }
  
          /* Write the SELECT DB opcode */
-         if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
-         if (rdbSaveLen(fp,j) == -1) goto werr;
+         if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
+         if (rdbSaveLen(&rdb,j) == -1) goto werr;
  
          /* Iterate this DB writing every entry */
          while((de = dictNext(di)) != NULL) {
              
              initStaticStringObject(key,keystr);
              expire = getExpire(db,&key);
-             if (rdbSaveKeyValuePair(fp,&key,o,expire,now) == -1) goto werr;
+             if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
          }
          dictReleaseIterator(di);
      }
      /* EOF opcode */
-     if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
+     if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
  
      /* Make sure data will not remain on the OS's output buffers */
      fflush(fp);
@@@ -477,13 -661,17 +656,13 @@@ werr
  
  int rdbSaveBackground(char *filename) {
      pid_t childpid;
 +    long long start;
  
 -    if (server.bgsavechildpid != -1 ||
 -        server.bgsavethread != (pthread_t) -1) return REDIS_ERR;
 +    if (server.bgsavechildpid != -1) return REDIS_ERR;
  
      server.dirty_before_bgsave = server.dirty;
  
 -    if (server.ds_enabled) {
 -        cacheForcePointInTime();
 -        return dsRdbSaveBackground(filename);
 -    }
 -
 +    start = ustime();
      if ((childpid = fork()) == 0) {
          int retval;
  
          _exit((retval == REDIS_OK) ? 0 : 1);
      } else {
          /* Parent */
 +        server.stat_fork_time = ustime()-start;
          if (childpid == -1) {
              redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                  strerror(errno));
@@@ -515,168 -702,21 +694,21 @@@ void rdbRemoveTempFile(pid_t childpid) 
      unlink(tmpfile);
  }
  
- int rdbLoadType(FILE *fp) {
-     unsigned char type;
-     if (fread(&type,1,1,fp) == 0) return -1;
-     return type;
- }
- time_t rdbLoadTime(FILE *fp) {
-     int32_t t32;
-     if (fread(&t32,4,1,fp) == 0) return -1;
-     return (time_t) t32;
- }
- /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
-  * of this file for a description of how this are stored on disk.
-  *
-  * isencoded is set to 1 if the readed length is not actually a length but
-  * an "encoding type", check the above comments for more info */
- uint32_t rdbLoadLen(FILE *fp, int *isencoded) {
-     unsigned char buf[2];
-     uint32_t len;
-     int type;
-     if (isencoded) *isencoded = 0;
-     if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
-     type = (buf[0]&0xC0)>>6;
-     if (type == REDIS_RDB_6BITLEN) {
-         /* Read a 6 bit len */
-         return buf[0]&0x3F;
-     } else if (type == REDIS_RDB_ENCVAL) {
-         /* Read a 6 bit len encoding type */
-         if (isencoded) *isencoded = 1;
-         return buf[0]&0x3F;
-     } else if (type == REDIS_RDB_14BITLEN) {
-         /* Read a 14 bit len */
-         if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
-         return ((buf[0]&0x3F)<<8)|buf[1];
-     } else {
-         /* Read a 32 bit len */
-         if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
-         return ntohl(len);
-     }
- }
- /* Load an integer-encoded object from file 'fp', with the specified
-  * encoding type 'enctype'. If encode is true the function may return
-  * an integer-encoded object as reply, otherwise the returned object
-  * will always be encoded as a raw string. */
- robj *rdbLoadIntegerObject(FILE *fp, int enctype, int encode) {
-     unsigned char enc[4];
-     long long val;
-     if (enctype == REDIS_RDB_ENC_INT8) {
-         if (fread(enc,1,1,fp) == 0) return NULL;
-         val = (signed char)enc[0];
-     } else if (enctype == REDIS_RDB_ENC_INT16) {
-         uint16_t v;
-         if (fread(enc,2,1,fp) == 0) return NULL;
-         v = enc[0]|(enc[1]<<8);
-         val = (int16_t)v;
-     } else if (enctype == REDIS_RDB_ENC_INT32) {
-         uint32_t v;
-         if (fread(enc,4,1,fp) == 0) return NULL;
-         v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
-         val = (int32_t)v;
-     } else {
-         val = 0; /* anti-warning */
-         redisPanic("Unknown RDB integer encoding type");
-     }
-     if (encode)
-         return createStringObjectFromLongLong(val);
-     else
-         return createObject(REDIS_STRING,sdsfromlonglong(val));
- }
- robj *rdbLoadLzfStringObject(FILE*fp) {
-     unsigned int len, clen;
-     unsigned char *c = NULL;
-     sds val = NULL;
-     if ((clen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
-     if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
-     if ((c = zmalloc(clen)) == NULL) goto err;
-     if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
-     if (fread(c,clen,1,fp) == 0) goto err;
-     if (lzf_decompress(c,clen,val,len) == 0) goto err;
-     zfree(c);
-     return createObject(REDIS_STRING,val);
- err:
-     zfree(c);
-     sdsfree(val);
-     return NULL;
- }
- robj *rdbGenericLoadStringObject(FILE*fp, int encode) {
-     int isencoded;
-     uint32_t len;
-     sds val;
-     len = rdbLoadLen(fp,&isencoded);
-     if (isencoded) {
-         switch(len) {
-         case REDIS_RDB_ENC_INT8:
-         case REDIS_RDB_ENC_INT16:
-         case REDIS_RDB_ENC_INT32:
-             return rdbLoadIntegerObject(fp,len,encode);
-         case REDIS_RDB_ENC_LZF:
-             return rdbLoadLzfStringObject(fp);
-         default:
-             redisPanic("Unknown RDB encoding type");
-         }
-     }
-     if (len == REDIS_RDB_LENERR) return NULL;
-     val = sdsnewlen(NULL,len);
-     if (len && fread(val,len,1,fp) == 0) {
-         sdsfree(val);
-         return NULL;
-     }
-     return createObject(REDIS_STRING,val);
- }
- robj *rdbLoadStringObject(FILE *fp) {
-     return rdbGenericLoadStringObject(fp,0);
- }
- robj *rdbLoadEncodedStringObject(FILE *fp) {
-     return rdbGenericLoadStringObject(fp,1);
- }
- /* For information about double serialization check rdbSaveDoubleValue() */
- int rdbLoadDoubleValue(FILE *fp, double *val) {
-     char buf[128];
-     unsigned char len;
-     if (fread(&len,1,1,fp) == 0) return -1;
-     switch(len) {
-     case 255: *val = R_NegInf; return 0;
-     case 254: *val = R_PosInf; return 0;
-     case 253: *val = R_Nan; return 0;
-     default:
-         if (fread(buf,len,1,fp) == 0) return -1;
-         buf[len] = '\0';
-         sscanf(buf, "%lg", val);
-         return 0;
-     }
- }
  /* Load a Redis object of the specified type from the specified file.
   * On success a newly allocated object is returned, otherwise NULL. */
- robj *rdbLoadObject(int type, FILE *fp) {
+ robj *rdbLoadObject(int rdbtype, rio *rdb) {
      robj *o, *ele, *dec;
      size_t len;
      unsigned int i;
  
-     redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp));
-     if (type == REDIS_STRING) {
+     redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",rdbtype,rdb->tell(rdb));
+     if (rdbtype == REDIS_RDB_TYPE_STRING) {
          /* Read string value */
-         if ((o = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
+         if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
          o = tryObjectEncoding(o);
-     } else if (type == REDIS_LIST) {
+     } else if (rdbtype == REDIS_RDB_TYPE_LIST) {
          /* Read list value */
-         if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
+         if ((len = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
  
          /* Use a real list when there are too many entries */
          if (len > server.list_max_ziplist_entries) {
  
          /* Load every single element of the list */
          while(len--) {
-             if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
+             if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
  
              /* If we are using a ziplist and the value is too big, convert
               * the object to a real list. */
                  listAddNodeTail(o->ptr,ele);
              }
          }
-     } else if (type == REDIS_SET) {
+     } else if (rdbtype == REDIS_RDB_TYPE_SET) {
          /* Read list/set value */
-         if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
+         if ((len = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
  
          /* Use a regular set when there are too many entries. */
          if (len > server.set_max_intset_entries) {
          /* Load every single element of the list/set */
          for (i = 0; i < len; i++) {
              long long llval;
-             if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
+             if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
              ele = tryObjectEncoding(ele);
  
              if (o->encoding == REDIS_ENCODING_INTSET) {
                  decrRefCount(ele);
              }
          }
-     } else if (type == REDIS_ZSET) {
+     } else if (rdbtype == REDIS_RDB_TYPE_ZSET) {
          /* Read list/set value */
          size_t zsetlen;
          size_t maxelelen = 0;
          zset *zs;
  
-         if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
+         if ((zsetlen = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
          o = createZsetObject();
          zs = o->ptr;
  
              double score;
              zskiplistNode *znode;
  
-             if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
+             if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
              ele = tryObjectEncoding(ele);
-             if (rdbLoadDoubleValue(fp,&score) == -1) return NULL;
+             if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL;
  
              /* Don't care about integer-encoded strings. */
              if (ele->encoding == REDIS_ENCODING_RAW &&
          if (zsetLength(o) <= server.zset_max_ziplist_entries &&
              maxelelen <= server.zset_max_ziplist_value)
                  zsetConvert(o,REDIS_ENCODING_ZIPLIST);
-     } else if (type == REDIS_HASH) {
+     } else if (rdbtype == REDIS_RDB_TYPE_HASH) {
          size_t hashlen;
  
-         if ((hashlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
+         if ((hashlen = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
          o = createHashObject();
          /* Too many entries? Use an hash table. */
          if (hashlen > server.hash_max_zipmap_entries)
          while(hashlen--) {
              robj *key, *val;
  
-             if ((key = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
-             if ((val = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
+             if ((key = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
+             if ((val = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
              /* If we are using a zipmap and there are too big values
               * the object is converted to real hash table encoding. */
              if (o->encoding != REDIS_ENCODING_HT &&
                  dictAdd((dict*)o->ptr,key,val);
              }
          }
-     } else if (type == REDIS_HASH_ZIPMAP ||
-                type == REDIS_LIST_ZIPLIST ||
-                type == REDIS_SET_INTSET ||
-                type == REDIS_ZSET_ZIPLIST)
+     } else if (rdbtype == REDIS_RDB_TYPE_HASH_ZIPMAP  ||
+                rdbtype == REDIS_RDB_TYPE_LIST_ZIPLIST ||
+                rdbtype == REDIS_RDB_TYPE_SET_INTSET   ||
+                rdbtype == REDIS_RDB_TYPE_ZSET_ZIPLIST)
      {
-         robj *aux = rdbLoadStringObject(fp);
+         robj *aux = rdbLoadStringObject(rdb);
  
          if (aux == NULL) return NULL;
          o = createObject(REDIS_STRING,NULL); /* string is just placeholder */
           * type. Note that we only check the length and not max element
           * size as this is an O(N) scan. Eventually everything will get
           * converted. */
-         switch(type) {
-             case REDIS_HASH_ZIPMAP:
+         switch(rdbtype) {
+             case REDIS_RDB_TYPE_HASH_ZIPMAP:
                  o->type = REDIS_HASH;
                  o->encoding = REDIS_ENCODING_ZIPMAP;
                  if (zipmapLen(o->ptr) > server.hash_max_zipmap_entries)
                      convertToRealHash(o);
                  break;
-             case REDIS_LIST_ZIPLIST:
+             case REDIS_RDB_TYPE_LIST_ZIPLIST:
                  o->type = REDIS_LIST;
                  o->encoding = REDIS_ENCODING_ZIPLIST;
                  if (ziplistLen(o->ptr) > server.list_max_ziplist_entries)
                      listTypeConvert(o,REDIS_ENCODING_LINKEDLIST);
                  break;
-             case REDIS_SET_INTSET:
+             case REDIS_RDB_TYPE_SET_INTSET:
                  o->type = REDIS_SET;
                  o->encoding = REDIS_ENCODING_INTSET;
                  if (intsetLen(o->ptr) > server.set_max_intset_entries)
                      setTypeConvert(o,REDIS_ENCODING_HT);
                  break;
-             case REDIS_ZSET_ZIPLIST:
+             case REDIS_RDB_TYPE_ZSET_ZIPLIST:
                  o->type = REDIS_ZSET;
                  o->encoding = REDIS_ENCODING_ZIPLIST;
                  if (zsetLength(o) > server.zset_max_ziplist_entries)
@@@ -905,17 -945,19 +937,19 @@@ void stopLoading(void) 
  }
  
  int rdbLoad(char *filename) {
-     FILE *fp;
      uint32_t dbid;
 -    int type, retval, rdbver;
 +    int type, rdbver;
      redisDb *db = server.db+0;
      char buf[1024];
      time_t expiretime, now = time(NULL);
      long loops = 0;
+     FILE *fp;
+     rio rdb;
  
      fp = fopen(filename,"r");
      if (!fp) return REDIS_ERR;
-     if (fread(buf,9,1,fp) == 0) goto eoferr;
+     rdb = rioInitWithFile(fp);
+     if (rioRead(&rdb,buf,9) == 0) goto eoferr;
      buf[9] = '\0';
      if (memcmp(buf,"REDIS",5) != 0) {
          fclose(fp);
  
          /* Serve the clients from time to time */
          if (!(loops++ % 1000)) {
-             loadingProgress(ftello(fp));
+             loadingProgress(rdb.tell(&rdb));
              aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
          }
  
          /* Read type. */
-         if ((type = rdbLoadType(fp)) == -1) goto eoferr;
-         if (type == REDIS_EXPIRETIME) {
-             if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
-             /* We read the time so we need to read the object type again */
-             if ((type = rdbLoadType(fp)) == -1) goto eoferr;
+         if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
+         if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
+             if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
+             /* We read the time so we need to read the object type again. */
+             if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
          }
-         if (type == REDIS_EOF) break;
+         if (type == REDIS_RDB_OPCODE_EOF)
+             break;
          /* Handle SELECT DB opcode as a special case */
-         if (type == REDIS_SELECTDB) {
-             if ((dbid = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR)
+         if (type == REDIS_RDB_OPCODE_SELECTDB) {
+             if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
                  goto eoferr;
              if (dbid >= (unsigned)server.dbnum) {
                  redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
              continue;
          }
          /* Read key */
-         if ((key = rdbLoadStringObject(fp)) == NULL) goto eoferr;
+         if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
          /* Read value */
-         if ((val = rdbLoadObject(type,fp)) == NULL) goto eoferr;
+         if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
          /* Check if the key already expired */
          if (expiretime != -1 && expiretime < now) {
              decrRefCount(key);
              continue;
          }
          /* Add the new object in the hash table */
 -        retval = dbAdd(db,key,val);
 -        if (retval == REDIS_ERR) {
 -            redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", key->ptr);
 -            exit(1);
 -        }
 +        dbAdd(db,key,val);
 +
          /* Set the expire time if needed */
          if (expiretime != -1) setExpire(db,key,expiretime);
  
@@@ -1002,13 -1050,15 +1039,13 @@@ void backgroundSaveDoneHandler(int exit
          rdbRemoveTempFile(server.bgsavechildpid);
      }
      server.bgsavechildpid = -1;
 -    server.bgsavethread = (pthread_t) -1;
 -    server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE;
      /* Possibly there are slaves waiting for a BGSAVE in order to be served
       * (the first stage of SYNC is a bulk transfer of dump.rdb) */
      updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
  }
  
  void saveCommand(redisClient *c) {
 -    if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread_t)-1) {
 +    if (server.bgsavechildpid != -1) {
          addReplyError(c,"Background save already in progress");
          return;
      }
  }
  
  void bgsaveCommand(redisClient *c) {
 -    if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread_t)-1) {
 +    if (server.bgsavechildpid != -1) {
          addReplyError(c,"Background save already in progress");
 -        return;
 -    }
 -    if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
 +    } else if (server.bgrewritechildpid != -1) {
 +        addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");
 +    } else if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
          addReplyStatus(c,"Background saving started");
      } else {
          addReply(c,shared.err);
diff --combined src/redis.h
index e754918deabe296cb89eeef3989db94e50afee52,5af2da8e96fe8ec9748fc7a2fa1b6cbc3edf29b2..e9a31b8c3889f3a3c74fdc1bfd5aed4799c40807
  #include <pthread.h>
  #include <syslog.h>
  #include <netinet/in.h>
 +#include <lua.h>
  
 -#include "ae.h"     /* Event driven programming library */
 -#include "sds.h"    /* Dynamic safe strings */
 -#include "dict.h"   /* Hash tables */
 -#include "adlist.h" /* Linked lists */
 +#include "ae.h"      /* Event driven programming library */
 +#include "sds.h"     /* Dynamic safe strings */
 +#include "dict.h"    /* Hash tables */
 +#include "adlist.h"  /* Linked lists */
  #include "zmalloc.h" /* total memory usage aware version of malloc/free */
 -#include "anet.h"   /* Networking the easy way */
 -#include "zipmap.h" /* Compact string -> string data structure */
 +#include "anet.h"    /* Networking the easy way */
 +#include "zipmap.h"  /* Compact string -> string data structure */
  #include "ziplist.h" /* Compact list data structure */
 -#include "intset.h" /* Compact integer set structure */
 -#include "version.h"
 -#include "util.h"
 +#include "intset.h"  /* Compact integer set structure */
 +#include "version.h" /* Version macro */
 +#include "util.h"    /* Misc functions useful in many places */
  
  /* Error codes */
  #define REDIS_OK                0
@@@ -42,6 -41,7 +42,6 @@@
  #define REDIS_MAXIDLETIME       (60*5)  /* default client timeout */
  #define REDIS_IOBUF_LEN         1024
  #define REDIS_LOADBUF_LEN       1024
 -#define REDIS_STATIC_ARGS       8
  #define REDIS_DEFAULT_DBNUM     16
  #define REDIS_CONFIGLINE_MAX    1024
  #define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */
  #define REDIS_SHARED_INTEGERS 10000
  #define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
  #define REDIS_MAX_LOGMSG_LEN    1024 /* Default maximum length of syslog messages */
 +#define REDIS_AUTO_AOFREWRITE_PERC  100
 +#define REDIS_AUTO_AOFREWRITE_MIN_SIZE (1024*1024)
 +#define REDIS_SLOWLOG_LOG_SLOWER_THAN 10000
 +#define REDIS_SLOWLOG_MAX_LEN 64
  
  /* Hash table parameters */
  #define REDIS_HT_MINFILL        10      /* Minimal hash table fill 10% */
  #define REDIS_HASH 4
  #define REDIS_VMPOINTER 8
  
- /* Object types only used for persistence in .rdb files */
- #define REDIS_HASH_ZIPMAP 9
- #define REDIS_LIST_ZIPLIST 10
- #define REDIS_SET_INTSET 11
- #define REDIS_ZSET_ZIPLIST 12
  /* Objects encoding. Some kind of objects like Strings and Hashes can be
   * internally represented in multiple ways. The 'encoding' field of the object
   * is set to one of this fields for this object. */
  #define REDIS_ENCODING_INTSET 6  /* Encoded as intset */
  #define REDIS_ENCODING_SKIPLIST 7  /* Encoded as skiplist */
  
 -/* Scheduled IO opeations flags. */
 -#define REDIS_IO_LOAD 1
 -#define REDIS_IO_SAVE 2
 -#define REDIS_IO_LOADINPROG 4
 -#define REDIS_IO_SAVEINPROG 8
 +/* Object types only used for dumping to disk */
 +#define REDIS_EXPIRETIME 253
 +#define REDIS_SELECTDB 254
 +#define REDIS_EOF 255
  
 -/* Generic IO flags */
 -#define REDIS_IO_ONLYLOADS 1
 -#define REDIS_IO_ASAP 2
 -
 -#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
 -#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
 +/* Defines related to the dump file format. To store 32 bits lengths for short
 + * keys requires a lot of space, so we check the most significant 2 bits of
 + * the first byte to interpreter the length:
 + *
 + * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
 + * 01|000000 00000000 =>  01, the len is 14 byes, 6 bits + 8 bits of next byte
 + * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
 + * 11|000000 this means: specially encoded object will follow. The six bits
 + *           number specify the kind of object that follows.
 + *           See the REDIS_RDB_ENC_* defines.
 + *
 + * Lenghts up to 63 are stored using a single byte, most DB keys, and may
 + * values, will fit inside. */
 +#define REDIS_RDB_6BITLEN 0
 +#define REDIS_RDB_14BITLEN 1
 +#define REDIS_RDB_32BITLEN 2
 +#define REDIS_RDB_ENCVAL 3
 +#define REDIS_RDB_LENERR UINT_MAX
 +
 +/* When a length of a string object stored on disk has the first two bits
 + * set, the remaining two bits specify a special encoding for the object
 + * accordingly to the following defines: */
 +#define REDIS_RDB_ENC_INT8 0        /* 8 bit signed integer */
 +#define REDIS_RDB_ENC_INT16 1       /* 16 bit signed integer */
 +#define REDIS_RDB_ENC_INT32 2       /* 32 bit signed integer */
 +#define REDIS_RDB_ENC_LZF 3         /* string compressed with FASTLZ */
  
  /* Client flags */
  #define REDIS_SLAVE 1       /* This client is a slave server */
  #define REDIS_MONITOR 4     /* This client is a slave monitor, see MONITOR */
  #define REDIS_MULTI 8       /* This client is in a MULTI context */
  #define REDIS_BLOCKED 16    /* The client is waiting in a blocking operation */
 -#define REDIS_IO_WAIT 32    /* The client is waiting for Virtual Memory I/O */
  #define REDIS_DIRTY_CAS 64  /* Watched keys modified. EXEC will fail. */
  #define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */
  #define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in
                                 server.unblocked_clients */
 +#define REDIS_LUA_CLIENT 512 /* This is a non connected client used by Lua */
  
  /* Client request types */
  #define REDIS_REQ_INLINE 1
  #define REDIS_REQ_MULTIBULK 2
  
  /* Slave replication state - slave side */
 -#define REDIS_REPL_NONE 0   /* No active replication */
 -#define REDIS_REPL_CONNECT 1    /* Must connect to master */
 -#define REDIS_REPL_TRANSFER 2    /* Receiving .rdb from master */
 -#define REDIS_REPL_CONNECTED 3  /* Connected to master */
 +#define REDIS_REPL_NONE 0 /* No active replication */
 +#define REDIS_REPL_CONNECT 1 /* Must connect to master */
 +#define REDIS_REPL_CONNECTING 2 /* Connecting to master */
 +#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
 +#define REDIS_REPL_CONNECTED 4 /* Connected to master */
 +
 +/* Synchronous read timeout - slave side */
 +#define REDIS_REPL_SYNCIO_TIMEOUT 5
  
  /* Slave replication state - from the point of view of master
   * Note that in SEND_BULK and ONLINE state the slave receives new updates
  #define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4
  #define REDIS_MAXMEMORY_NO_EVICTION 5
  
 -/* Diskstore background saving thread states */
 -#define REDIS_BGSAVE_THREAD_UNACTIVE 0
 -#define REDIS_BGSAVE_THREAD_ACTIVE 1
 -#define REDIS_BGSAVE_THREAD_DONE_OK 2
 -#define REDIS_BGSAVE_THREAD_DONE_ERR 3
 +/* Scripting */
 +#define REDIS_LUA_TIME_LIMIT 60000 /* milliseconds */
  
  /* We can print the stacktrace, so our assert is defined this way: */
  #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
@@@ -279,6 -249,9 +273,6 @@@ typedef struct redisDb 
      dict *dict;                 /* The keyspace for this DB */
      dict *expires;              /* Timeout of keys with a timeout set */
      dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */
 -    dict *io_keys;              /* Keys with clients waiting for DS I/O */
 -    dict *io_negcache;          /* Negative caching for disk store */
 -    dict *io_queued;            /* Queued IO operations hash table */
      dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
      int id;
  } redisDb;
@@@ -314,7 -287,6 +308,7 @@@ typedef struct redisClient 
      sds querybuf;
      int argc;
      robj **argv;
 +    struct redisCommand *cmd;
      int reqtype;
      int multibulklen;       /* number of multi bulk arguments left to read */
      long bulklen;           /* length of bulk argument in multi bulk request */
@@@ -350,7 -322,7 +344,7 @@@ struct sharedObjectsStruct 
      robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *cnegone, *pong, *space,
      *colon, *nullbulk, *nullmultibulk, *queued,
      *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
 -    *outofrangeerr, *loadingerr, *plus,
 +    *outofrangeerr, *noscripterr, *loadingerr, *plus,
      *select0, *select1, *select2, *select3, *select4,
      *select5, *select6, *select7, *select8, *select9,
      *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
@@@ -501,6 -473,7 +495,6 @@@ typedef struct 
  
  struct redisServer {
      /* General */
 -    pthread_t mainthread;
      redisDb *db;
      dict *commands;             /* Command table hahs table */
      aeEventLoop *el;
      long long stat_keyspace_hits;   /* number of successful lookups of keys */
      long long stat_keyspace_misses; /* number of failed lookups of keys */
      size_t stat_peak_memory;        /* max used memory record */
 +    long long stat_fork_time;       /* time needed to perform latets fork() */
 +    list *slowlog;
 +    long long slowlog_entry_id;
 +    long long slowlog_log_slower_than;
 +    unsigned long slowlog_max_len;
      /* Configuration */
      int verbosity;
      int maxidletime;
      int appendonly;
      int appendfsync;
      int no_appendfsync_on_rewrite;
 +    int auto_aofrewrite_perc;       /* Rewrite AOF if % growth is > M and... */
 +    off_t auto_aofrewrite_min_size; /* the AOF file is at least N bytes. */
 +    off_t auto_aofrewrite_base_size;/* AOF size on latest startup or rewrite. */
 +    off_t appendonly_current_size;  /* AOF current size. */
 +    int aofrewrite_scheduled;       /* Rewrite once BGSAVE terminates. */
      int shutdown_asap;
      int activerehashing;
      char *requirepass;
      time_t lastfsync;
      int appendfd;
      int appendseldb;
 +    time_t aof_flush_postponed_start;
      char *pidfile;
      pid_t bgsavechildpid;
      pid_t bgrewritechildpid;
 -    int bgsavethread_state;
 -    pthread_mutex_t bgsavethread_mutex;
 -    pthread_t bgsavethread;
      sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
      sds aofbuf;       /* AOF buffer, written before entering the event loop */
      struct saveparam *saveparams;
      char *masterhost;
      int masterport;
      redisClient *master;    /* client that is master for this slave */
 +    int repl_syncio_timeout; /* timeout for synchronous I/O calls */
      int replstate;          /* replication status if the instance is a slave */
      off_t repl_transfer_left;  /* bytes left reading .rdb  */
      int repl_transfer_s;    /* slave -> master SYNC socket */
      char *repl_transfer_tmpfile; /* slave-> master SYNC temp file name */
      time_t repl_transfer_lastio; /* unix time of the latest read, for timeout */
      int repl_serve_stale_data; /* Serve stale data when link is down? */
 +    time_t repl_down_since; /* unix time at which link with master went down */
      /* Limits */
      unsigned int maxclients;
      unsigned long long maxmemory;
      int maxmemory_samples;
      /* Blocked clients */
      unsigned int bpop_blocked_clients;
 -    unsigned int cache_blocked_clients;
      list *unblocked_clients; /* list of clients to unblock before next loop */
 -    list *cache_io_queue;    /* IO operations queue */
 -    int cache_flush_delay;   /* seconds to wait before flushing keys */
      /* Sort parameters - qsort_r() is only available under BSD so we
       * have to take this state global, in order to pass it to sortCompare() */
      int sort_desc;
      int sort_alpha;
      int sort_bypattern;
 -    /* Virtual memory configuration */
 -    int ds_enabled; /* backend disk in redis.conf */
 -    char *ds_path;  /* location of the disk store on disk */
 -    unsigned long long cache_max_memory;
      /* Zip structure config */
      size_t hash_max_zipmap_entries;
      size_t hash_max_zipmap_value;
      size_t zset_max_ziplist_entries;
      size_t zset_max_ziplist_value;
      time_t unixtime;    /* Unix time sampled every second. */
 -    /* Virtual memory I/O threads stuff */
 -    /* An I/O thread process an element taken from the io_jobs queue and
 -     * put the result of the operation in the io_done list. While the
 -     * job is being processed, it's put on io_processing queue. */
 -    list *io_newjobs; /* List of VM I/O jobs yet to be processed */
 -    list *io_processing; /* List of VM I/O jobs being processed */
 -    list *io_processed; /* List of VM I/O jobs already processed */
 -    list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
 -    pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
 -    pthread_cond_t io_condvar; /* I/O threads conditional variable */
 -    pthread_attr_t io_threads_attr; /* attributes for threads creation */
 -    int io_active_threads; /* Number of running I/O threads */
 -    int vm_max_threads; /* Max number of I/O threads running at the same time */
 -    /* Our main thread is blocked on the event loop, locking for sockets ready
 -     * to be read or written, so when a threaded I/O operation is ready to be
 -     * processed by the main thread, the I/O thread will use a unix pipe to
 -     * awake the main thread. The followings are the two pipe FDs. */
 -    int io_ready_pipe_read;
 -    int io_ready_pipe_write;
 -    /* Virtual memory stats */
 -    unsigned long long vm_stats_used_pages;
 -    unsigned long long vm_stats_swapped_objects;
 -    unsigned long long vm_stats_swapouts;
 -    unsigned long long vm_stats_swapins;
      /* Pubsub */
      dict *pubsub_channels; /* Map channels to list of subscribed clients */
      list *pubsub_patterns; /* A list of pubsub_patterns */
      /* Cluster */
      int cluster_enabled;
      clusterState cluster;
 +    /* Scripting */
 +    lua_State *lua; /* The Lua interpreter. We use just one for all clients */
 +    redisClient *lua_client; /* The "fake client" to query Redis from Lua */
 +    dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
 +    long long lua_time_limit;
 +    long long lua_time_start;
  };
  
  typedef struct pubsubPattern {
@@@ -643,7 -631,7 +637,7 @@@ struct redisCommand 
      int arity;
      int flags;
      /* Use a function to determine keys arguments in a command line.
 -     * Used both for diskstore preloading and Redis Cluster. */
 +     * Used for Redis Cluster redirect. */
      redisGetKeysProc *getkeys_proc;
      /* What keys should be loaded in background when calling this command? */
      int firstkey; /* The first argument that's a key (0 = no keys) */
@@@ -670,6 -658,27 +664,6 @@@ typedef struct _redisSortOperation 
      robj *pattern;
  } redisSortOperation;
  
 -/* DIsk store threaded I/O request message */
 -#define REDIS_IOJOB_LOAD 0
 -#define REDIS_IOJOB_SAVE 1
 -
 -typedef struct iojob {
 -    int type;   /* Request type, REDIS_IOJOB_* */
 -    redisDb *db;/* Redis database */
 -    robj *key;  /* This I/O request is about this key */
 -    robj *val;  /* the value to swap for REDIS_IOJOB_SAVE, otherwise this
 -                 * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */
 -    time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */
 -} iojob;
 -
 -/* IO operations scheduled -- check dscache.c for more info */
 -typedef struct ioop {
 -    int type;
 -    redisDb *db;
 -    robj *key;
 -    time_t ctime; /* This is the creation time of the entry. */
 -} ioop;
 -
  /* Structure to hold list iteration abstraction. */
  typedef struct {
      robj *subject;
@@@ -720,7 -729,6 +714,7 @@@ extern struct sharedObjectsStruct share
  extern dictType setDictType;
  extern dictType zsetDictType;
  extern dictType clusterNodesDictType;
 +extern dictType dbDictType;
  extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
  dictType hashDictType;
  
@@@ -760,8 -768,6 +754,8 @@@ void addReplyMultiBulkLen(redisClient *
  void *dupClientReplyValue(void *o);
  void getClientsMaxBuffers(unsigned long *longest_output_list,
                            unsigned long *biggest_input_buffer);
 +void rewriteClientCommandVector(redisClient *c, int argc, ...);
 +void rewriteClientCommandArgument(redisClient *c, int i, robj *newval);
  
  #ifdef __GNUC__
  void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
@@@ -794,14 -800,13 +788,14 @@@ void popGenericCommand(redisClient *c, 
  void unwatchAllKeys(redisClient *c);
  void initClientMultiState(redisClient *c);
  void freeClientMultiState(redisClient *c);
 -void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
 +void queueMultiCommand(redisClient *c);
  void touchWatchedKey(redisDb *db, robj *key);
  void touchWatchedKeysOnFlush(int dbid);
  
  /* Redis object implementation */
  void decrRefCount(void *o);
  void incrRefCount(robj *o);
 +robj *resetRefCount(robj *obj);
  void freeStringObject(robj *o);
  void freeListObject(robj *o);
  void freeSetObject(robj *o);
@@@ -836,15 -841,11 +830,10 @@@ unsigned long estimateObjectIdleTime(ro
  int syncWrite(int fd, char *ptr, ssize_t size, int timeout);
  int syncRead(int fd, char *ptr, ssize_t size, int timeout);
  int syncReadLine(int fd, char *ptr, ssize_t size, int timeout);
- int fwriteBulkString(FILE *fp, char *s, unsigned long len);
- int fwriteBulkDouble(FILE *fp, double d);
- int fwriteBulkLongLong(FILE *fp, long long l);
- int fwriteBulkObject(FILE *fp, robj *obj);
- int fwriteBulkCount(FILE *fp, char prefix, int count);
  
  /* Replication */
  void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
  void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc);
 -int syncWithMaster(void);
  void updateSlavesWaitingBgsave(int bgsaveerr);
  void replicationCron(void);
  
@@@ -854,24 -855,10 +843,10 @@@ void loadingProgress(off_t pos)
  void stopLoading(void);
  
  /* RDB persistence */
- int rdbLoad(char *filename);
- int rdbSaveBackground(char *filename);
- void rdbRemoveTempFile(pid_t childpid);
- int rdbSave(char *filename);
- int rdbSaveObject(FILE *fp, robj *o);
- off_t rdbSavedObjectLen(robj *o);
- off_t rdbSavedObjectPages(robj *o);
- robj *rdbLoadObject(int type, FILE *fp);
- void backgroundSaveDoneHandler(int exitcode, int bysignal);
- int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, time_t expireitme, time_t now);
- int rdbLoadType(FILE *fp);
- time_t rdbLoadTime(FILE *fp);
- robj *rdbLoadStringObject(FILE *fp);
- int rdbSaveType(FILE *fp, unsigned char type);
- int rdbSaveLen(FILE *fp, uint32_t len);
+ #include "rdb.h"
  
  /* AOF persistence */
 -void flushAppendOnlyFile(void);
 +void flushAppendOnlyFile(int force);
  void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
  void aofRemoveTempFile(pid_t childpid);
  int rewriteAppendOnlyFileBackground(void);
@@@ -906,10 -893,9 +881,10 @@@ int processCommand(redisClient *c)
  void setupSignalHandlers(void);
  struct redisCommand *lookupCommand(sds name);
  struct redisCommand *lookupCommandByCString(char *s);
 -void call(redisClient *c, struct redisCommand *cmd);
 +void call(redisClient *c);
  int prepareForShutdown();
  void redisLog(int level, const char *fmt, ...);
 +void redisLogRaw(int level, const char *msg);
  void usage();
  void updateDictResizePolicy(void);
  int htNeedsResize(dict *dict);
@@@ -917,6 -903,40 +892,6 @@@ void oom(const char *msg)
  void populateCommandTable(void);
  void resetCommandTableStats(void);
  
 -/* Disk store */
 -int dsOpen(void);
 -int dsClose(void);
 -int dsSet(redisDb *db, robj *key, robj *val, time_t expire);
 -robj *dsGet(redisDb *db, robj *key, time_t *expire);
 -int dsDel(redisDb *db, robj *key);
 -int dsExists(redisDb *db, robj *key);
 -void dsFlushDb(int dbid);
 -int dsRdbSaveBackground(char *filename);
 -int dsRdbSave(char *filename);
 -
 -/* Disk Store Cache */
 -void dsInit(void);
 -void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask);
 -void lockThreadedIO(void);
 -void unlockThreadedIO(void);
 -void freeIOJob(iojob *j);
 -void queueIOJob(iojob *j);
 -void waitEmptyIOJobsQueue(void);
 -void processAllPendingIOJobs(void);
 -int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
 -int dontWaitForSwappedKey(redisClient *c, robj *key);
 -void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
 -int cacheFreeOneEntry(void);
 -void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag);
 -void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag);
 -int cacheScheduleIOGetFlags(redisDb *db, robj *key);
 -void cacheScheduleIO(redisDb *db, robj *key, int type);
 -void cacheCron(void);
 -int cacheKeyMayExist(redisDb *db, robj *key);
 -void cacheSetKeyMayExist(redisDb *db, robj *key);
 -void cacheSetKeyDoesNotExist(redisDb *db, robj *key);
 -void cacheForcePointInTime(void);
 -
  /* Set data type */
  robj *setTypeCreate(robj *value);
  int setTypeAdd(robj *subject, robj *value);
@@@ -969,9 -989,8 +944,9 @@@ robj *lookupKeyRead(redisDb *db, robj *
  robj *lookupKeyWrite(redisDb *db, robj *key);
  robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply);
  robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply);
 -int dbAdd(redisDb *db, robj *key, robj *val);
 -int dbReplace(redisDb *db, robj *key, robj *val);
 +void dbAdd(redisDb *db, robj *key, robj *val);
 +void dbOverwrite(redisDb *db, robj *key, robj *val);
 +void setKey(redisDb *db, robj *key, robj *val);
  int dbExists(redisDb *db, robj *key);
  robj *dbRandomKey(redisDb *db);
  int dbDelete(redisDb *db, robj *key);
@@@ -999,9 -1018,6 +974,9 @@@ int clusterAddNode(clusterNode *node)
  void clusterCron(void);
  clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
  
 +/* Scripting */
 +void scriptingInit(void);
 +
  /* Git SHA1 */
  char *redisGitSHA1(void);
  char *redisGitDirty(void);
@@@ -1130,8 -1146,6 +1105,8 @@@ void migrateCommand(redisClient *c)
  void dumpCommand(redisClient *c);
  void objectCommand(redisClient *c);
  void clientCommand(redisClient *c);
 +void evalCommand(redisClient *c);
 +void evalShaCommand(redisClient *c);
  
  #if defined(__GNUC__)
  void *calloc(size_t count, size_t size) __attribute__ ((deprecated));