From: antirez Date: Thu, 22 Sep 2011 13:15:26 +0000 (+0200) Subject: merge conflicts resolved X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/f9c6f39b2b0039cf29df6597d41c124048c825cd?hp=-c merge conflicts resolved --- f9c6f39b2b0039cf29df6597d41c124048c825cd diff --combined src/Makefile index 36bba34c,bce8b6e5..dac6deaf --- a/src/Makefile +++ b/src/Makefile @@@ -5,50 -5,41 +5,50 @@@ release_hdr := $(shell sh -c './mkreleasehdr.sh') uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') OPTIMIZATION?=-O2 + +ifeq ($(uname_S),Linux) + ifneq ($(FORCE_LIBC_MALLOC),yes) + USE_JEMALLOC=yes + endif +endif + ifeq ($(uname_S),SunOS) - CFLAGS?= -std=c99 -pedantic $(OPTIMIZATION) -Wall -W -D__EXTENSIONS__ -D_XPG6 - CCLINK?= -ldl -lnsl -lsocket -lm -lpthread - DEBUG?= -g -ggdb + CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -Wall -W -D__EXTENSIONS__ -D_XPG6 + CCLINK?=-ldl -lnsl -lsocket -lm -lpthread + DEBUG?=-g -ggdb else - CFLAGS?= -std=c99 -pedantic $(OPTIMIZATION) -Wall -W $(ARCH) $(PROF) - CCLINK?= -lm -pthread - DEBUG?= -g -rdynamic -ggdb + CFLAGS?=-std=c99 -pedantic $(OPTIMIZATION) -Wall -W $(ARCH) $(PROF) + CCLINK?=-lm -pthread + DEBUG?=-g -rdynamic -ggdb endif ifeq ($(USE_TCMALLOC),yes) + ALLOD_DEPS= ALLOC_LINK=-ltcmalloc ALLOC_FLAGS=-DUSE_TCMALLOC endif ifeq ($(USE_TCMALLOC_MINIMAL),yes) + ALLOD_DEPS= ALLOC_LINK=-ltcmalloc_minimal ALLOC_FLAGS=-DUSE_TCMALLOC endif ifeq ($(USE_JEMALLOC),yes) - ALLOC_LINK=-ljemalloc - ALLOC_FLAGS=-DUSE_JEMALLOC + ALLOC_DEP=../deps/jemalloc/lib/libjemalloc.a + ALLOC_LINK=$(ALLOC_DEP) -ldl + ALLOC_FLAGS=-DUSE_JEMALLOC -I../deps/jemalloc/include endif CCLINK+= $(ALLOC_LINK) CFLAGS+= $(ALLOC_FLAGS) -CCOPT= $(CFLAGS) $(CCLINK) $(ARCH) $(PROF) +CCOPT= $(CFLAGS) $(ARCH) $(PROF) PREFIX= /usr/local INSTALL_BIN= $(PREFIX)/bin INSTALL= cp -p - CCCOLOR="\033[34m" LINKCOLOR="\033[34;1m" SRCCOLOR="\033[33m" @@@ -56,12 -47,7 +56,12 @@@ BINCOLOR="\033[37;1m MAKECOLOR="\033[32;1m" ENDCOLOR="\033[0m" -OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o dscache.o pubsub.o multi.o debug.o sort.o intset.o syncio.o diskstore.o cluster.o crc16.o endian.o rio.o +ifndef V +QUIET_CC = @printf ' %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR); +QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR); +endif + - OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endian.o slowlog.o scripting.o bio.o ++OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endian.o slowlog.o scripting.o bio.o rio.o BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o @@@ -86,35 -72,32 +86,35 @@@ ae_kqueue.o: ae_kqueue. ae_select.o: ae_select.c anet.o: anet.c fmacros.h anet.h aof.o: aof.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h +bio.o: bio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h bio.h +cluster.o: cluster.c redis.h fmacros.h config.h ae.h sds.h dict.h \ + adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h config.o: config.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h +crc16.o: crc16.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h db.o: db.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h debug.o: debug.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h sha1.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h sha1.h dict.o: dict.c fmacros.h dict.h zmalloc.h -diskstore.o: diskstore.c redis.h fmacros.h config.h ae.h sds.h dict.h \ - adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h -dscache.o: dscache.c redis.h fmacros.h config.h ae.h sds.h dict.h \ - adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h -intset.o: intset.c intset.h zmalloc.h +endian.o: endian.c +intset.o: intset.c intset.h zmalloc.h endian.h lzf_c.o: lzf_c.c lzfP.h lzf_d.o: lzf_d.c lzfP.h multi.o: multi.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h networking.o: networking.c redis.h fmacros.h config.h ae.h sds.h dict.h \ - adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h object.o: object.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h pqsort.o: pqsort.c pubsub.o: pubsub.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h rdb.o: rdb.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h lzf.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h lzf.h redis-benchmark.o: redis-benchmark.c fmacros.h ae.h \ ../deps/hiredis/hiredis.h sds.h adlist.h zmalloc.h redis-check-aof.o: redis-check-aof.c fmacros.h config.h @@@ -122,90 -105,81 +122,91 @@@ redis-check-dump.o: redis-check-dump.c redis-cli.o: redis-cli.c fmacros.h version.h ../deps/hiredis/hiredis.h \ sds.h zmalloc.h ../deps/linenoise/linenoise.h help.h redis.o: redis.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h asciilogo.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h \ + bio.h asciilogo.h release.o: release.c release.h replication.o: replication.c redis.h fmacros.h config.h ae.h sds.h dict.h \ - adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h +scripting.o: scripting.c redis.h fmacros.h config.h ae.h sds.h dict.h \ + adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \ + sha1.h + rio.o: rio.c sds.h sds.o: sds.c sds.h zmalloc.h -sha1.o: sha1.c sha1.h +sha1.o: sha1.c sha1.h config.h +slowlog.o: slowlog.c redis.h fmacros.h config.h ae.h sds.h dict.h \ + adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \ + slowlog.h sort.o: sort.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h pqsort.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h pqsort.h syncio.o: syncio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h t_hash.o: t_hash.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h t_list.o: t_list.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h t_set.o: t_set.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h t_string.o: t_string.c redis.h fmacros.h config.h ae.h sds.h dict.h \ - adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h + adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h t_zset.o: t_zset.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h -util.o: util.c util.h -cluster.o: redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \ - zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h -ziplist.o: ziplist.c zmalloc.h ziplist.h -zipmap.o: zipmap.c zmalloc.h -zmalloc.o: zmalloc.c config.h + zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h +util.o: util.c fmacros.h util.h +ziplist.o: ziplist.c zmalloc.h util.h ziplist.h endian.h +zipmap.o: zipmap.c zmalloc.h endian.h +zmalloc.o: zmalloc.c config.h zmalloc.h + +.PHONY: dependencies dependencies: - @echo $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)hiredis$(ENDCOLOR) + @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)hiredis$(ENDCOLOR) @cd ../deps/hiredis && $(MAKE) static ARCH="$(ARCH)" - @echo $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)linenoise$(ENDCOLOR) + @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)linenoise$(ENDCOLOR) @cd ../deps/linenoise && $(MAKE) ARCH="$(ARCH)" + @echo $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)Lua ansi$(ENDCOLOR) + @cd ../deps/lua && $(MAKE) ARCH="$(ARCH)" ansi -redis-server: $(OBJ) - @$(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ) - @echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR) +../deps/jemalloc/lib/libjemalloc.a: + cd ../deps/jemalloc && ./configure $(JEMALLOC_CFLAGS) --with-jemalloc-prefix=je_ --enable-cc-silence && $(MAKE) lib/libjemalloc.a + +redis-server: dependencies $(OBJ) + $(QUIET_LINK)$(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ) $(CCLINK) $(ALLOC_LINK) ../deps/lua/src/liblua.a redis-benchmark: dependencies $(BENCHOBJ) @cd ../deps/hiredis && $(MAKE) static - @$(CC) -o $(BENCHPRGNAME) $(CCOPT) $(DEBUG) $(BENCHOBJ) ../deps/hiredis/libhiredis.a - @echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR) + $(QUIET_LINK)$(CC) -o $(BENCHPRGNAME) $(CCOPT) $(DEBUG) $(BENCHOBJ) ../deps/hiredis/libhiredis.a $(CCLINK) $(ALLOC_LINK) redis-benchmark.o: - @$(CC) -c $(CFLAGS) -I../deps/hiredis $(DEBUG) $(COMPILE_TIME) $< - @echo $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$(<)$(ENDCOLOR) + $(QUIET_CC)$(CC) -c $(CFLAGS) -I../deps/hiredis $(DEBUG) $(COMPILE_TIME) $< redis-cli: dependencies $(CLIOBJ) - @$(CC) -o $(CLIPRGNAME) $(CCOPT) $(DEBUG) $(CLIOBJ) ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o - @echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR) + $(QUIET_LINK)$(CC) -o $(CLIPRGNAME) $(CCOPT) $(DEBUG) $(CLIOBJ) ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o $(CCLINK) $(ALLOC_LINK) redis-cli.o: - @$(CC) -c $(CFLAGS) -I../deps/hiredis -I../deps/linenoise $(DEBUG) $(COMPILE_TIME) $< - @echo $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$(<)$(ENDCOLOR) + $(QUIET_CC)$(CC) -c $(CFLAGS) -I../deps/hiredis -I../deps/linenoise $(DEBUG) $(COMPILE_TIME) $< redis-check-dump: $(CHECKDUMPOBJ) - @$(CC) -o $(CHECKDUMPPRGNAME) $(CCOPT) $(DEBUG) $(CHECKDUMPOBJ) - @echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR) + $(QUIET_LINK)$(CC) -o $(CHECKDUMPPRGNAME) $(CCOPT) $(DEBUG) $(CHECKDUMPOBJ) $(CCLINK) $(ALLOC_LINK) redis-check-aof: $(CHECKAOFOBJ) - @$(CC) -o $(CHECKAOFPRGNAME) $(CCOPT) $(DEBUG) $(CHECKAOFOBJ) - @echo $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$(@)$(ENDCOLOR) + $(QUIET_LINK)$(CC) -o $(CHECKAOFPRGNAME) $(CCOPT) $(DEBUG) $(CHECKAOFOBJ) $(CCLINK) $(ALLOC_LINK) -.c.o: - @$(CC) -c $(CFLAGS) $(DEBUG) $(COMPILE_TIME) $< - @echo $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$(<)$(ENDCOLOR) +# Because the jemalloc.h header is generated as a part of the jemalloc build +# process, building it should complete before building any other object. +%.o: %.c $(ALLOC_DEP) + $(QUIET_CC)$(CC) -c $(CFLAGS) $(DEBUG) $(COMPILE_TIME) -I../deps/lua/src $< clean: rm -rf $(PRGNAME) $(BENCHPRGNAME) $(CLIPRGNAME) $(CHECKDUMPPRGNAME) $(CHECKAOFPRGNAME) *.o *.gcda *.gcno *.gcov + cd ../deps/hiredis && $(MAKE) $@ + cd ../deps/linenoise && $(MAKE) $@ + cd ../deps/lua && $(MAKE) $@ + -(cd ../deps/jemalloc && $(MAKE) distclean) dep: $(CC) -MM *.c -I ../deps/hiredis -I ../deps/linenoise -test: redis-server - (cd ..; tclsh8.5 tests/test_helper.tcl --tags "${TAGS}" --file "${FILE}") +test: redis-server redis-check-aof + @(cd ..; ./runtest) bench: ./redis-benchmark @@@ -217,7 -191,7 +218,7 @@@ log @echo "" @echo "WARNING: if it fails under Linux you probably need to install libc6-dev-i386" @echo "" - $(MAKE) ARCH="-m32" + $(MAKE) ARCH="-m32" JEMALLOC_CFLAGS='CFLAGS="-std=gnu99 -Wall -pipe -g3 -fvisibility=hidden -O3 -funroll-loops -m32"' gprof: $(MAKE) PROF="-pg" @@@ -231,9 -205,6 +232,9 @@@ noopt 32bitgprof: $(MAKE) PROF="-pg" ARCH="-arch i386" +src/help.h: + @../utils/generate-command-help.rb > help.h + install: all mkdir -p $(INSTALL_BIN) $(INSTALL) $(PRGNAME) $(INSTALL_BIN) diff --combined src/aof.c index 8d654281,aadf6448..b86357de --- a/src/aof.c +++ b/src/aof.c @@@ -1,6 -1,4 +1,7 @@@ #include "redis.h" +#include "bio.h" ++#include "rio.h" + #include #include #include @@@ -8,17 -6,12 +9,17 @@@ #include #include #include -#include "rio.h" + +void aofUpdateCurrentSize(void); + +void aof_background_fsync(int fd) { + bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL); +} /* Called when the user switches from "appendonly yes" to "appendonly no" * at runtime using the CONFIG command. */ void stopAppendOnly(void) { - flushAppendOnlyFile(); + flushAppendOnlyFile(1); aof_fsync(server.appendfd); close(server.appendfd); @@@ -26,15 -19,15 +27,15 @@@ server.appendseldb = -1; server.appendonly = 0; /* rewrite operation in progress? kill it, wait child exit */ - if (server.bgsavechildpid != -1) { + if (server.bgrewritechildpid != -1) { int statloc; - if (kill(server.bgsavechildpid,SIGKILL) != -1) + if (kill(server.bgrewritechildpid,SIGKILL) != -1) wait3(&statloc,0,NULL); /* reset the buffer accumulating changes while the child saves */ sdsfree(server.bgrewritebuf); server.bgrewritebuf = sdsempty(); - server.bgsavechildpid = -1; + server.bgrewritechildpid = -1; } } @@@ -63,121 -56,62 +64,121 @@@ int startAppendOnly(void) * and the only way the client socket can get a write is entering when the * the event loop, we accumulate all the AOF writes in a memory * buffer and write it on disk using this function just before entering - * the event loop again. */ -void flushAppendOnlyFile(void) { - time_t now; + * the event loop again. + * + * About the 'force' argument: + * + * When the fsync policy is set to 'everysec' we may delay the flush if there + * is still an fsync() going on in the background thread, since for instance + * on Linux write(2) will be blocked by the background fsync anyway. + * When this happens we remember that there is some aof buffer to be + * flushed ASAP, and will try to do that in the serverCron() function. + * + * However if force is set to 1 we'll write regardless of the background + * fsync. */ +void flushAppendOnlyFile(int force) { ssize_t nwritten; + int sync_in_progress = 0; if (sdslen(server.aofbuf) == 0) return; + if (server.appendfsync == APPENDFSYNC_EVERYSEC) + sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; + + if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) { + /* With this append fsync policy we do background fsyncing. + * If the fsync is still in progress we can try to delay + * the write for a couple of seconds. */ + if (sync_in_progress) { + if (server.aof_flush_postponed_start == 0) { + /* No previous write postponinig, remember that we are + * postponing the flush and return. */ + server.aof_flush_postponed_start = server.unixtime; + return; + } else if (server.unixtime - server.aof_flush_postponed_start < 2) { + /* We were already waiting for fsync to finish, but for less + * than two seconds this is still ok. Postpone again. */ + return; + } + /* Otherwise fall trough, and go write since we can't wait + * over two seconds. */ + redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); + } + } + /* If you are following this code path, then we are going to write so + * set reset the postponed flush sentinel to zero. */ + server.aof_flush_postponed_start = 0; + /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think * there is much to do about the whole server stopping for power problems * or alike */ - nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); - if (nwritten != (signed)sdslen(server.aofbuf)) { + nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); + if (nwritten != (signed)sdslen(server.aofbuf)) { /* Ooops, we are in troubles. The best thing to do for now is * aborting instead of giving the illusion that everything is * working as expected. */ - if (nwritten == -1) { + if (nwritten == -1) { redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno)); - } else { + } else { redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno)); - } - exit(1); + } + exit(1); } - sdsfree(server.aofbuf); - server.aofbuf = sdsempty(); + server.appendonly_current_size += nwritten; - /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have - * childs performing heavy I/O on disk. */ + /* Re-use AOF buffer when it is small enough. The maximum comes from the + * arena size of 4k minus some overhead (but is otherwise arbitrary). */ + if ((sdslen(server.aofbuf)+sdsavail(server.aofbuf)) < 4000) { + sdsclear(server.aofbuf); + } else { + sdsfree(server.aofbuf); + server.aofbuf = sdsempty(); + } + + /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are + * children doing I/O in the background. */ if (server.no_appendfsync_on_rewrite && (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1)) return; - /* Fsync if needed */ - now = time(NULL); - if (server.appendfsync == APPENDFSYNC_ALWAYS || - (server.appendfsync == APPENDFSYNC_EVERYSEC && - now-server.lastfsync > 1)) - { + + /* Perform the fsync if needed. */ + if (server.appendfsync == APPENDFSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ aof_fsync(server.appendfd); /* Let's try to get this data on the disk */ - server.lastfsync = now; + server.lastfsync = server.unixtime; + } else if ((server.appendfsync == APPENDFSYNC_EVERYSEC && + server.unixtime > server.lastfsync)) { + if (!sync_in_progress) aof_background_fsync(server.appendfd); + server.lastfsync = server.unixtime; } } -sds catAppendOnlyGenericCommand(sds buf, int argc, robj **argv) { - int j; - buf = sdscatprintf(buf,"*%d\r\n",argc); +sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) { + char buf[32]; + int len, j; + robj *o; + + buf[0] = '*'; + len = 1+ll2string(buf+1,sizeof(buf)-1,argc); + buf[len++] = '\r'; + buf[len++] = '\n'; + dst = sdscatlen(dst,buf,len); + for (j = 0; j < argc; j++) { - robj *o = getDecodedObject(argv[j]); - buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr)); - buf = sdscatlen(buf,o->ptr,sdslen(o->ptr)); - buf = sdscatlen(buf,"\r\n",2); + o = getDecodedObject(argv[j]); + buf[0] = '$'; + len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr)); + buf[len++] = '\r'; + buf[len++] = '\n'; + dst = sdscatlen(dst,buf,len); + dst = sdscatlen(dst,o->ptr,sdslen(o->ptr)); + dst = sdscatlen(dst,"\r\n",2); decrRefCount(o); } - return buf; + return dst; } sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) { @@@ -287,7 -221,6 +288,7 @@@ int loadAppendOnlyFile(char *filename) long loops = 0; if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) { + server.appendonly_current_size = 0; fclose(fp); return REDIS_ERR; } @@@ -326,8 -259,6 +327,8 @@@ } if (buf[0] != '*') goto fmterr; argc = atoi(buf+1); + if (argc < 1) goto fmterr; + argv = zmalloc(sizeof(robj*)*argc); for (j = 0; j < argc; j++) { if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr; @@@ -352,8 -283,6 +353,8 @@@ /* The fake client should not have a reply */ redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0); + /* The fake client should never get blocked */ + redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0); /* Clean up. Command code may have changed argv/argc so we use the * argv/argc of the client instead of the local variables. */ @@@ -370,8 -299,6 +371,8 @@@ freeFakeClient(fakeClient); server.appendonly = appendonly; stopLoading(); + aofUpdateCurrentSize(); + server.auto_aofrewrite_base_size = server.appendonly_current_size; return REDIS_OK; readerr: @@@ -386,11 -313,26 +387,26 @@@ fmterr exit(1); } + /* Delegate writing an object to writing a bulk string or bulk long long. + * This is not placed in rio.c since that adds the redis.h dependency. */ + int rioWriteBulkObject(rio *r, robj *obj) { + /* Avoid using getDecodedObject to help copy-on-write (we are often + * in a child process when this function is called). */ + if (obj->encoding == REDIS_ENCODING_INT) { + return rioWriteBulkLongLong(r,(long)obj->ptr); + } else if (obj->encoding == REDIS_ENCODING_RAW) { + return rioWriteBulkString(r,obj->ptr,sdslen(obj->ptr)); + } else { + redisPanic("Unknown string encoding"); + } + } + /* Write a sequence of commands able to fully rebuild the dataset into * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */ int rewriteAppendOnlyFile(char *filename) { dictIterator *di = NULL; dictEntry *de; + rio aof; FILE *fp; char tmpfile[256]; int j; @@@ -404,20 -346,22 +420,22 @@@ redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno)); return REDIS_ERR; } + + aof = rioInitWithFile(fp); for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; - di = dictGetIterator(d); + di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* SELECT the new DB */ - if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkLongLong(fp,j) == 0) goto werr; + if (rioWrite(&aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; + if (rioWriteBulkLongLong(&aof,j) == 0) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { @@@ -435,10 -379,10 +453,10 @@@ if (o->type == REDIS_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; /* Key and value */ - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkObject(fp,o) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkObject(&aof,o) == 0) goto werr; } else if (o->type == REDIS_LIST) { /* Emit the RPUSHes needed to rebuild the list */ char cmd[]="*3\r\n$5\r\nRPUSH\r\n"; @@@ -450,13 -394,13 +468,13 @@@ long long vlong; while(ziplistGet(p,&vstr,&vlen,&vlong)) { - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; if (vstr) { - if (fwriteBulkString(fp,(char*)vstr,vlen) == 0) + if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0) goto werr; } else { - if (fwriteBulkLongLong(fp,vlong) == 0) + if (rioWriteBulkLongLong(&aof,vlong) == 0) goto werr; } p = ziplistNext(zl,p); @@@ -470,9 -414,9 +488,9 @@@ while((ln = listNext(&li))) { robj *eleobj = listNodeValue(ln); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr; } } else { redisPanic("Unknown list encoding"); @@@ -485,18 -429,18 +503,18 @@@ int ii = 0; int64_t llval; while(intsetGet(o->ptr,ii++,&llval)) { - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkLongLong(fp,llval) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkLongLong(&aof,llval) == 0) goto werr; } } else if (o->encoding == REDIS_ENCODING_HT) { dictIterator *di = dictGetIterator(o->ptr); dictEntry *de; while((de = dictNext(di)) != NULL) { robj *eleobj = dictGetEntryKey(de); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr; } dictReleaseIterator(di); } else { @@@ -523,14 -467,14 +541,14 @@@ redisAssert(ziplistGet(eptr,&vstr,&vlen,&vll)); score = zzlGetScore(sptr); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkDouble(fp,score) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkDouble(&aof,score) == 0) goto werr; if (vstr != NULL) { - if (fwriteBulkString(fp,(char*)vstr,vlen) == 0) + if (rioWriteBulkString(&aof,(char*)vstr,vlen) == 0) goto werr; } else { - if (fwriteBulkLongLong(fp,vll) == 0) + if (rioWriteBulkLongLong(&aof,vll) == 0) goto werr; } zzlNext(zl,&eptr,&sptr); @@@ -544,10 -488,10 +562,10 @@@ robj *eleobj = dictGetEntryKey(de); double *score = dictGetEntryVal(de); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkDouble(fp,*score) == 0) goto werr; - if (fwriteBulkObject(fp,eleobj) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkDouble(&aof,*score) == 0) goto werr; + if (rioWriteBulkObject(&aof,eleobj) == 0) goto werr; } dictReleaseIterator(di); } else { @@@ -563,11 -507,11 +581,11 @@@ unsigned int flen, vlen; while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkString(fp,(char*)field,flen) == 0) + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkString(&aof,(char*)field,flen) == 0) goto werr; - if (fwriteBulkString(fp,(char*)val,vlen) == 0) + if (rioWriteBulkString(&aof,(char*)val,vlen) == 0) goto werr; } } else { @@@ -578,10 -522,10 +596,10 @@@ robj *field = dictGetEntryKey(de); robj *val = dictGetEntryVal(de); - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkObject(fp,field) == 0) goto werr; - if (fwriteBulkObject(fp,val) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkObject(&aof,field) == 0) goto werr; + if (rioWriteBulkObject(&aof,val) == 0) goto werr; } dictReleaseIterator(di); } @@@ -593,9 -537,9 +611,9 @@@ char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n"; /* If this key is already expired skip it */ if (expiretime < now) continue; - if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr; - if (fwriteBulkObject(fp,&key) == 0) goto werr; - if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr; + if (rioWrite(&aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(&aof,&key) == 0) goto werr; + if (rioWriteBulkLongLong(&aof,expiretime) == 0) goto werr; } } dictReleaseIterator(di); @@@ -638,14 -582,16 +656,14 @@@ werr */ int rewriteAppendOnlyFileBackground(void) { pid_t childpid; + long long start; if (server.bgrewritechildpid != -1) return REDIS_ERR; - if (server.ds_enabled != 0) { - redisLog(REDIS_WARNING,"BGREWRITEAOF called with diskstore enabled: AOF is not supported when diskstore is enabled. Operation not performed."); - return REDIS_ERR; - } + start = ustime(); if ((childpid = fork()) == 0) { - /* Child */ char tmpfile[256]; + /* Child */ if (server.ipfd > 0) close(server.ipfd); if (server.sofd > 0) close(server.sofd); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); @@@ -656,7 -602,6 +674,7 @@@ } } else { /* Parent */ + server.stat_fork_time = ustime()-start; if (childpid == -1) { redisLog(REDIS_WARNING, "Can't rewrite append only file in background: fork: %s", @@@ -680,10 -625,9 +698,10 @@@ void bgrewriteaofCommand(redisClient *c) { if (server.bgrewritechildpid != -1) { addReplyError(c,"Background append only file rewriting already in progress"); - return; - } - if (rewriteAppendOnlyFileBackground() == REDIS_OK) { + } else if (server.bgsavechildpid != -1) { + server.aofrewrite_scheduled = 1; + addReplyStatus(c,"Background append only file rewriting scheduled"); + } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) { addReplyStatus(c,"Background append only file rewriting started"); } else { addReply(c,shared.err); @@@ -697,146 -641,58 +715,146 @@@ void aofRemoveTempFile(pid_t childpid) unlink(tmpfile); } +/* Update the server.appendonly_current_size filed explicitly using stat(2) + * to check the size of the file. This is useful after a rewrite or after + * a restart, normally the size is updated just adding the write length + * to the current lenght, that is much faster. */ +void aofUpdateCurrentSize(void) { + struct redis_stat sb; + + if (redis_fstat(server.appendfd,&sb) == -1) { + redisLog(REDIS_WARNING,"Unable to check the AOF length: %s", + strerror(errno)); + } else { + server.appendonly_current_size = sb.st_size; + } +} + /* A background append only file rewriting (BGREWRITEAOF) terminated its work. * Handle this. */ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { if (!bysignal && exitcode == 0) { - int fd; + int newfd, oldfd; + int nwritten; char tmpfile[256]; + long long now = ustime(); redisLog(REDIS_NOTICE, - "Background append only file rewriting terminated with success"); - /* Now it's time to flush the differences accumulated by the parent */ - snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid); - fd = open(tmpfile,O_WRONLY|O_APPEND); - if (fd == -1) { - redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno)); + "Background AOF rewrite terminated with success"); + + /* Flush the differences accumulated by the parent to the + * rewritten AOF. */ + snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", + (int)server.bgrewritechildpid); + newfd = open(tmpfile,O_WRONLY|O_APPEND); + if (newfd == -1) { + redisLog(REDIS_WARNING, + "Unable to open the temporary AOF produced by the child: %s", strerror(errno)); goto cleanup; } - /* Flush our data... */ - if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) != - (signed) sdslen(server.bgrewritebuf)) { - redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno)); - close(fd); + + nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf)); + if (nwritten != (signed)sdslen(server.bgrewritebuf)) { + if (nwritten == -1) { + redisLog(REDIS_WARNING, + "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); + } else { + redisLog(REDIS_WARNING, + "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno)); + } + close(newfd); goto cleanup; } - redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf)); - /* Now our work is to rename the temp file into the stable file. And - * switch the file descriptor used by the server for append only. */ + + redisLog(REDIS_NOTICE, + "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten); + + /* The only remaining thing to do is to rename the temporary file to + * the configured file and switch the file descriptor used to do AOF + * writes. We don't want close(2) or rename(2) calls to block the + * server on old file deletion. + * + * There are two possible scenarios: + * + * 1) AOF is DISABLED and this was a one time rewrite. The temporary + * file will be renamed to the configured file. When this file already + * exists, it will be unlinked, which may block the server. + * + * 2) AOF is ENABLED and the rewritten AOF will immediately start + * receiving writes. After the temporary file is renamed to the + * configured file, the original AOF file descriptor will be closed. + * Since this will be the last reference to that file, closing it + * causes the underlying file to be unlinked, which may block the + * server. + * + * To mitigate the blocking effect of the unlink operation (either + * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we + * use a background thread to take care of this. First, we + * make scenario 1 identical to scenario 2 by opening the target file + * when it exists. The unlink operation after the rename(2) will then + * be executed upon calling close(2) for its descriptor. Everything to + * guarantee atomicity for this switch has already happened by then, so + * we don't care what the outcome or duration of that close operation + * is, as long as the file descriptor is released again. */ + if (server.appendfd == -1) { + /* AOF disabled */ + + /* Don't care if this fails: oldfd will be -1 and we handle that. + * One notable case of -1 return is if the old file does + * not exist. */ + oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK); + } else { + /* AOF enabled */ + oldfd = -1; /* We'll set this to the current AOF filedes later. */ + } + + /* Rename the temporary file. This will not unlink the target file if + * it exists, because we reference it with "oldfd". */ if (rename(tmpfile,server.appendfilename) == -1) { - redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno)); - close(fd); + redisLog(REDIS_WARNING, + "Error trying to rename the temporary AOF: %s", strerror(errno)); + close(newfd); + if (oldfd != -1) close(oldfd); goto cleanup; } - /* Mission completed... almost */ - redisLog(REDIS_NOTICE,"Append only file successfully rewritten."); - if (server.appendfd != -1) { - /* If append only is actually enabled... */ - close(server.appendfd); - server.appendfd = fd; - if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd); - server.appendseldb = -1; /* Make sure it will issue SELECT */ - redisLog(REDIS_NOTICE,"The new append only file was selected for future appends."); + + if (server.appendfd == -1) { + /* AOF disabled, we don't need to set the AOF file descriptor + * to this new file, so we can close it. */ + close(newfd); } else { - /* If append only is disabled we just generate a dump in this - * format. Why not? */ - close(fd); + /* AOF enabled, replace the old fd with the new one. */ + oldfd = server.appendfd; + server.appendfd = newfd; + if (server.appendfsync == APPENDFSYNC_ALWAYS) + aof_fsync(newfd); + else if (server.appendfsync == APPENDFSYNC_EVERYSEC) + aof_background_fsync(newfd); + server.appendseldb = -1; /* Make sure SELECT is re-issued */ + aofUpdateCurrentSize(); + server.auto_aofrewrite_base_size = server.appendonly_current_size; + + /* Clear regular AOF buffer since its contents was just written to + * the new AOF from the background rewrite buffer. */ + sdsfree(server.aofbuf); + server.aofbuf = sdsempty(); } + + redisLog(REDIS_NOTICE, "Background AOF rewrite successful"); + + /* Asynchronously close the overwritten AOF. */ + if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); + + redisLog(REDIS_VERBOSE, + "Background AOF rewrite signal handler took %lldus", ustime()-now); } else if (!bysignal && exitcode != 0) { - redisLog(REDIS_WARNING, "Background append only file rewriting error"); + redisLog(REDIS_WARNING, + "Background AOF rewrite terminated with error"); } else { redisLog(REDIS_WARNING, - "Background append only file rewriting terminated by signal %d", - bysignal); + "Background AOF rewrite terminated by signal %d", bysignal); } + cleanup: sdsfree(server.bgrewritebuf); server.bgrewritebuf = sdsempty(); diff --combined src/cluster.c index e608c420,1c0b1c23..c6fc44b1 --- a/src/cluster.c +++ b/src/cluster.c @@@ -1383,14 -1383,13 +1383,13 @@@ void clusterCommand(redisClient *c) /* RESTORE key ttl serialized-value */ void restoreCommand(redisClient *c) { - FILE *fp; - char buf[64]; - robj *o; - unsigned char *data; long ttl; + rio payload; + int type; + robj *obj; /* Make sure this key does not already exist here... */ - if (dbExists(c->db,c->argv[1])) { + if (lookupKeyWrite(c->db,c->argv[1]) != NULL) { addReplyError(c,"Target key name is busy."); return; } @@@ -1403,44 -1402,16 +1402,16 @@@ return; } - /* rdbLoadObject() only works against file descriptors so we need to - * dump the serialized object into a file and reload. */ - snprintf(buf,sizeof(buf),"redis-restore-%d.tmp",getpid()); - fp = fopen(buf,"w+"); - if (!fp) { - redisLog(REDIS_WARNING,"Can't open tmp file for RESTORE: %s", - strerror(errno)); - addReplyErrorFormat(c,"RESTORE failed, tmp file creation error: %s", - strerror(errno)); - return; - } - unlink(buf); - - /* Write the actual data and rewind the file */ - data = (unsigned char*) c->argv[3]->ptr; - if (fwrite(data+1,sdslen((sds)data)-1,1,fp) != 1) { - redisLog(REDIS_WARNING,"Can't write against tmp file for RESTORE: %s", - strerror(errno)); - addReplyError(c,"RESTORE failed, tmp file I/O error."); - fclose(fp); - return; - } - rewind(fp); - - /* Finally create the object from the serialized dump and - * store it at the specified key. */ - if ((data[0] > 4 && data[0] < 9) || - data[0] > 11 || - (o = rdbLoadObject(data[0],fp)) == NULL) + payload = rioInitWithBuffer(c->argv[3]->ptr); + if (((type = rdbLoadObjectType(&payload)) == -1) || + ((obj = rdbLoadObject(type,&payload)) == NULL)) { - addReplyError(c,"Bad data format."); - fclose(fp); + addReplyError(c,"Bad data format"); return; } - fclose(fp); /* Create the key and set the TTL if any */ - dbAdd(c->db,c->argv[1],o); + dbAdd(c->db,c->argv[1],obj); if (ttl) setExpire(c->db,c->argv[1],time(NULL)+ttl); addReply(c,shared.ok); } @@@ -1450,12 -1421,9 +1421,9 @@@ void migrateCommand(redisClient *c) int fd; long timeout; long dbid; - char buf[64]; - FILE *fp; time_t ttl; robj *o; - unsigned char type; - off_t payload_len; + rio cmd, payload; /* Sanity check */ if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != REDIS_OK) @@@ -1485,54 -1453,41 +1453,41 @@@ return; } - /* Create temp file */ - snprintf(buf,sizeof(buf),"redis-migrate-%d.tmp",getpid()); - fp = fopen(buf,"w+"); - if (!fp) { - redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"MIGRATE failed, tmp file creation error: %s.", - strerror(errno)); - return; - } - unlink(buf); - - /* Build the SELECT + RESTORE query writing it in our temp file. */ - if (fwriteBulkCount(fp,'*',2) == 0) goto file_wr_err; - if (fwriteBulkString(fp,"SELECT",6) == 0) goto file_wr_err; - if (fwriteBulkLongLong(fp,dbid) == 0) goto file_wr_err; + cmd = rioInitWithBuffer(sdsempty()); + redisAssert(rioWriteBulkCount(&cmd,'*',2)); + redisAssert(rioWriteBulkString(&cmd,"SELECT",6)); + redisAssert(rioWriteBulkLongLong(&cmd,dbid)); ttl = getExpire(c->db,c->argv[3]); - type = o->type; - if (fwriteBulkCount(fp,'*',4) == 0) goto file_wr_err; - if (fwriteBulkString(fp,"RESTORE",7) == 0) goto file_wr_err; - if (fwriteBulkObject(fp,c->argv[3]) == 0) goto file_wr_err; - if (fwriteBulkLongLong(fp, (ttl == -1) ? 0 : ttl) == 0) goto file_wr_err; + redisAssert(rioWriteBulkCount(&cmd,'*',4)); + redisAssert(rioWriteBulkString(&cmd,"RESTORE",7)); + redisAssert(c->argv[3]->encoding == REDIS_ENCODING_RAW); + redisAssert(rioWriteBulkString(&cmd,c->argv[3]->ptr,sdslen(c->argv[3]->ptr))); + redisAssert(rioWriteBulkLongLong(&cmd,(ttl == -1) ? 0 : ttl)); /* Finally the last argument that is the serailized object payload - * in the form: . */ - payload_len = rdbSavedObjectLen(o); - if (fwriteBulkCount(fp,'$',payload_len+1) == 0) goto file_wr_err; - if (fwrite(&type,1,1,fp) == 0) goto file_wr_err; - if (rdbSaveObject(fp,o) == -1) goto file_wr_err; - if (fwrite("\r\n",2,1,fp) == 0) goto file_wr_err; - - /* Tranfer the query to the other node */ - rewind(fp); + * in the form: . */ + payload = rioInitWithBuffer(sdsempty()); + redisAssert(rdbSaveObjectType(&payload,o)); + redisAssert(rdbSaveObject(&payload,o) != -1); + redisAssert(rioWriteBulkString(&cmd,payload.io.buffer.ptr,sdslen(payload.io.buffer.ptr))); + sdsfree(payload.io.buffer.ptr); + + /* Tranfer the query to the other node in 64K chunks. */ { - char buf[4096]; - size_t nread; - - while ((nread = fread(buf,1,sizeof(buf),fp)) != 0) { - int nwritten; - - nwritten = syncWrite(fd,buf,nread,timeout); - if (nwritten != (signed)nread) goto socket_wr_err; + sds buf = cmd.io.buffer.ptr; + size_t pos = 0, towrite; + int nwritten = 0; + + while ((towrite = sdslen(buf)-pos) > 0) { + towrite = (towrite > (64*1024) ? (64*1024) : towrite); + nwritten = syncWrite(fd,buf+nwritten,towrite,timeout); + if (nwritten != (signed)towrite) goto socket_wr_err; + pos += nwritten; } - if (ferror(fp)) goto file_rd_err; } - /* Read back the reply */ + /* Read back the reply. */ { char buf1[1024]; char buf2[1024]; @@@ -1541,7 -1496,7 +1496,7 @@@ if (syncReadLine(fd, buf1, sizeof(buf1), timeout) <= 0) goto socket_rd_err; if (syncReadLine(fd, buf2, sizeof(buf2), timeout) <= 0) - goto socket_rd_err; + goto socket_rd_err; if (buf1[0] == '-' || buf2[0] == '-') { addReplyErrorFormat(c,"Target instance replied with error: %s", (buf1[0] == '-') ? buf1+1 : buf2+1); @@@ -1550,25 -1505,8 +1505,8 @@@ addReply(c,shared.ok); } } - fclose(fp); - close(fd); - return; - file_wr_err: - redisLog(REDIS_WARNING,"Can't write on tmp file for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"MIGRATE failed, tmp file write error: %s.", - strerror(errno)); - fclose(fp); - close(fd); - return; - - file_rd_err: - redisLog(REDIS_WARNING,"Can't read from tmp file for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"MIGRATE failed, tmp file read error: %s.", - strerror(errno)); - fclose(fp); + sdsfree(cmd.io.buffer.ptr); close(fd); return; @@@ -1577,7 -1515,7 +1515,7 @@@ socket_wr_err strerror(errno)); addReplyErrorFormat(c,"MIGRATE failed, writing to target node: %s.", strerror(errno)); - fclose(fp); + sdsfree(cmd.io.buffer.ptr); close(fd); return; @@@ -1586,7 -1524,7 +1524,7 @@@ socket_rd_err strerror(errno)); addReplyErrorFormat(c,"MIGRATE failed, reading from target node: %s.", strerror(errno)); - fclose(fp); + sdsfree(cmd.io.buffer.ptr); close(fd); return; } @@@ -1595,74 -1533,26 +1533,26 @@@ * DUMP is actually not used by Redis Cluster but it is the obvious * complement of RESTORE and can be useful for different applications. */ void dumpCommand(redisClient *c) { - char buf[64]; - FILE *fp; robj *o, *dumpobj; - sds dump = NULL; - off_t payload_len; - unsigned int type; + rio payload; /* Check if the key is here. */ if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { addReply(c,shared.nullbulk); return; } - - /* Create temp file */ - snprintf(buf,sizeof(buf),"redis-dump-%d.tmp",getpid()); - fp = fopen(buf,"w+"); - if (!fp) { - redisLog(REDIS_WARNING,"Can't open tmp file for MIGRATE: %s", - strerror(errno)); - addReplyErrorFormat(c,"DUMP failed, tmp file creation error: %s.", - strerror(errno)); - return; - } - unlink(buf); - - /* Dump the serailized object and read it back in memory. - * We prefix it with a one byte containing the type ID. - * This is the serialization format understood by RESTORE. */ - if (rdbSaveObject(fp,o) == -1) goto file_wr_err; - payload_len = ftello(fp); - if (fseeko(fp,0,SEEK_SET) == -1) goto file_rd_err; - dump = sdsnewlen(NULL,payload_len+1); - if (payload_len && fread(dump+1,payload_len,1,fp) != 1) goto file_rd_err; - fclose(fp); - type = o->type; - if (type == REDIS_LIST && o->encoding == REDIS_ENCODING_ZIPLIST) - type = REDIS_LIST_ZIPLIST; - else if (type == REDIS_HASH && o->encoding == REDIS_ENCODING_ZIPMAP) - type = REDIS_HASH_ZIPMAP; - else if (type == REDIS_SET && o->encoding == REDIS_ENCODING_INTSET) - type = REDIS_SET_INTSET; - else - type = o->type; - dump[0] = type; + + /* Serialize the object in a RDB-like format. It consist of an object type + * byte followed by the serialized object. This is understood by RESTORE. */ + payload = rioInitWithBuffer(sdsempty()); + redisAssert(rdbSaveObjectType(&payload,o)); + redisAssert(rdbSaveObject(&payload,o)); /* Transfer to the client */ - dumpobj = createObject(REDIS_STRING,dump); + dumpobj = createObject(REDIS_STRING,payload.io.buffer.ptr); addReplyBulk(c,dumpobj); decrRefCount(dumpobj); return; - - file_wr_err: - redisLog(REDIS_WARNING,"Can't write on tmp file for DUMP: %s", - strerror(errno)); - addReplyErrorFormat(c,"DUMP failed, tmp file write error: %s.", - strerror(errno)); - sdsfree(dump); - fclose(fp); - return; - - file_rd_err: - redisLog(REDIS_WARNING,"Can't read from tmp file for DUMP: %s", - strerror(errno)); - addReplyErrorFormat(c,"DUMP failed, tmp file read error: %s.", - strerror(errno)); - sdsfree(dump); - fclose(fp); - return; } /* ----------------------------------------------------------------------------- diff --combined src/rdb.c index cfbec3e8,6d99375b..9bf470aa --- a/src/rdb.c +++ b/src/rdb.c @@@ -1,6 -1,3 +1,3 @@@ - #include "redis.h" - #include "lzf.h" /* LZF compression library */ - #include #include #include @@@ -8,59 -5,99 +5,99 @@@ #include #include #include + #include "rdb.h" + #include "lzf.h" /* LZF compression library */ - /* Convenience wrapper around fwrite, that returns the number of bytes written - * to the file instead of the number of objects (see fwrite(3)) and -1 in the - * case of an error. It also supports a NULL *fp to skip writing altogether - * instead of writing to /dev/null. */ - static int rdbWriteRaw(FILE *fp, void *p, size_t len) { - if (fp != NULL && fwrite(p,len,1,fp) == 0) return -1; + static int rdbWriteRaw(rio *rdb, void *p, size_t len) { + if (rdb && rioWrite(rdb,p,len) == 0) + return -1; return len; } - int rdbSaveType(FILE *fp, unsigned char type) { - return rdbWriteRaw(fp,&type,1); + int rdbSaveType(rio *rdb, unsigned char type) { + return rdbWriteRaw(rdb,&type,1); + } + + int rdbLoadType(rio *rdb) { + unsigned char type; + if (rioRead(rdb,&type,1) == 0) return -1; + return type; } - int rdbSaveTime(FILE *fp, time_t t) { + int rdbSaveTime(rio *rdb, time_t t) { int32_t t32 = (int32_t) t; - return rdbWriteRaw(fp,&t32,4); + return rdbWriteRaw(rdb,&t32,4); + } + + time_t rdbLoadTime(rio *rdb) { + int32_t t32; + if (rioRead(rdb,&t32,4) == 0) return -1; + return (time_t)t32; } - /* check rdbLoadLen() comments for more info */ - int rdbSaveLen(FILE *fp, uint32_t len) { + /* Saves an encoded length. The first two bits in the first byte are used to + * hold the encoding type. See the REDIS_RDB_* definitions for more information + * on the types of encoding. */ + int rdbSaveLen(rio *rdb, uint32_t len) { unsigned char buf[2]; - int nwritten; + size_t nwritten; if (len < (1<<6)) { /* Save a 6 bit len */ buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6); - if (rdbWriteRaw(fp,buf,1) == -1) return -1; + if (rdbWriteRaw(rdb,buf,1) == -1) return -1; nwritten = 1; } else if (len < (1<<14)) { /* Save a 14 bit len */ buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6); buf[1] = len&0xFF; - if (rdbWriteRaw(fp,buf,2) == -1) return -1; + if (rdbWriteRaw(rdb,buf,2) == -1) return -1; nwritten = 2; } else { /* Save a 32 bit len */ buf[0] = (REDIS_RDB_32BITLEN<<6); - if (rdbWriteRaw(fp,buf,1) == -1) return -1; + if (rdbWriteRaw(rdb,buf,1) == -1) return -1; len = htonl(len); - if (rdbWriteRaw(fp,&len,4) == -1) return -1; + if (rdbWriteRaw(rdb,&len,4) == -4) return -1; nwritten = 1+4; } return nwritten; } - /* Encode 'value' as an integer if possible (if integer will fit the - * supported range). If the function sucessful encoded the integer - * then the (up to 5 bytes) encoded representation is written in the - * string pointed by 'enc' and the length is returned. Otherwise - * 0 is returned. */ + /* Load an encoded length. The "isencoded" argument is set to 1 if the length + * is not actually a length but an "encoding type". See the REDIS_RDB_ENC_* + * definitions in rdb.h for more information. */ + uint32_t rdbLoadLen(rio *rdb, int *isencoded) { + unsigned char buf[2]; + uint32_t len; + int type; + + if (isencoded) *isencoded = 0; + if (rioRead(rdb,buf,1) == 0) return REDIS_RDB_LENERR; + type = (buf[0]&0xC0)>>6; + if (type == REDIS_RDB_ENCVAL) { + /* Read a 6 bit encoding type. */ + if (isencoded) *isencoded = 1; + return buf[0]&0x3F; + } else if (type == REDIS_RDB_6BITLEN) { + /* Read a 6 bit len. */ + return buf[0]&0x3F; + } else if (type == REDIS_RDB_14BITLEN) { + /* Read a 14 bit len. */ + if (rioRead(rdb,buf+1,1) == 0) return REDIS_RDB_LENERR; + return ((buf[0]&0x3F)<<8)|buf[1]; + } else { + /* Read a 32 bit len. */ + if (rioRead(rdb,&len,4) == 0) return REDIS_RDB_LENERR; + return ntohl(len); + } + } + + /* Encodes the "value" argument as integer when it fits in the supported ranges + * for encoded types. If the function successfully encodes the integer, the + * representation is stored in the buffer pointer to by "enc" and the string + * length is returned. Otherwise 0 is returned. */ int rdbEncodeInteger(long long value, unsigned char *enc) { - /* Finally check if it fits in our ranges */ if (value >= -(1<<7) && value <= (1<<7)-1) { enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8; enc[1] = value&0xFF; @@@ -82,6 -119,36 +119,36 @@@ } } + /* Loads an integer-encoded object with the specified encoding type "enctype". + * If the "encode" argument is set the function may return an integer-encoded + * string object, otherwise it always returns a raw string object. */ + robj *rdbLoadIntegerObject(rio *rdb, int enctype, int encode) { + unsigned char enc[4]; + long long val; + + if (enctype == REDIS_RDB_ENC_INT8) { + if (rioRead(rdb,enc,1) == 0) return NULL; + val = (signed char)enc[0]; + } else if (enctype == REDIS_RDB_ENC_INT16) { + uint16_t v; + if (rioRead(rdb,enc,2) == 0) return NULL; + v = enc[0]|(enc[1]<<8); + val = (int16_t)v; + } else if (enctype == REDIS_RDB_ENC_INT32) { + uint32_t v; + if (rioRead(rdb,enc,4) == 0) return NULL; + v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); + val = (int32_t)v; + } else { + val = 0; /* anti-warning */ + redisPanic("Unknown RDB integer encoding type"); + } + if (encode) + return createStringObjectFromLongLong(val); + else + return createObject(REDIS_STRING,sdsfromlonglong(val)); + } + /* String objects in the form "2391" "-100" without any space and with a * range of values that can fit in an 8, 16 or 32 bit signed value can be * encoded as integers to save space */ @@@ -101,7 -168,7 +168,7 @@@ int rdbTryIntegerEncoding(char *s, size return rdbEncodeInteger(value,enc); } - int rdbSaveLzfStringObject(FILE *fp, unsigned char *s, size_t len) { + int rdbSaveLzfStringObject(rio *rdb, unsigned char *s, size_t len) { size_t comprlen, outlen; unsigned char byte; int n, nwritten = 0; @@@ -118,16 -185,16 +185,16 @@@ } /* Data compressed! Let's save it on disk */ byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF; - if ((n = rdbWriteRaw(fp,&byte,1)) == -1) goto writeerr; + if ((n = rdbWriteRaw(rdb,&byte,1)) == -1) goto writeerr; nwritten += n; - if ((n = rdbSaveLen(fp,comprlen)) == -1) goto writeerr; + if ((n = rdbSaveLen(rdb,comprlen)) == -1) goto writeerr; nwritten += n; - if ((n = rdbSaveLen(fp,len)) == -1) goto writeerr; + if ((n = rdbSaveLen(rdb,len)) == -1) goto writeerr; nwritten += n; - if ((n = rdbWriteRaw(fp,out,comprlen)) == -1) goto writeerr; + if ((n = rdbWriteRaw(rdb,out,comprlen)) == -1) goto writeerr; nwritten += n; zfree(out); @@@ -138,9 -205,28 +205,28 @@@ writeerr return -1; } + robj *rdbLoadLzfStringObject(rio *rdb) { + unsigned int len, clen; + unsigned char *c = NULL; + sds val = NULL; + + if ((clen = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((len = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((c = zmalloc(clen)) == NULL) goto err; + if ((val = sdsnewlen(NULL,len)) == NULL) goto err; + if (rioRead(rdb,c,clen) == 0) goto err; + if (lzf_decompress(c,clen,val,len) == 0) goto err; + zfree(c); + return createObject(REDIS_STRING,val); + err: + zfree(c); + sdsfree(val); + return NULL; + } + /* Save a string objet as [len][data] on disk. If the object is a string * representation of an integer value we try to save it in a special form */ - int rdbSaveRawString(FILE *fp, unsigned char *s, size_t len) { + int rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) { int enclen; int n, nwritten = 0; @@@ -148,7 -234,7 +234,7 @@@ if (len <= 11) { unsigned char buf[5]; if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) { - if (rdbWriteRaw(fp,buf,enclen) == -1) return -1; + if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1; return enclen; } } @@@ -156,53 -242,89 +242,89 @@@ /* Try LZF compression - under 20 bytes it's unable to compress even * aaaaaaaaaaaaaaaaaa so skip it */ if (server.rdbcompression && len > 20) { - n = rdbSaveLzfStringObject(fp,s,len); + n = rdbSaveLzfStringObject(rdb,s,len); if (n == -1) return -1; if (n > 0) return n; /* Return value of 0 means data can't be compressed, save the old way */ } /* Store verbatim */ - if ((n = rdbSaveLen(fp,len)) == -1) return -1; + if ((n = rdbSaveLen(rdb,len)) == -1) return -1; nwritten += n; if (len > 0) { - if (rdbWriteRaw(fp,s,len) == -1) return -1; + if (rdbWriteRaw(rdb,s,len) == -1) return -1; nwritten += len; } return nwritten; } /* Save a long long value as either an encoded string or a string. */ - int rdbSaveLongLongAsStringObject(FILE *fp, long long value) { + int rdbSaveLongLongAsStringObject(rio *rdb, long long value) { unsigned char buf[32]; int n, nwritten = 0; int enclen = rdbEncodeInteger(value,buf); if (enclen > 0) { - return rdbWriteRaw(fp,buf,enclen); + return rdbWriteRaw(rdb,buf,enclen); } else { /* Encode as string */ enclen = ll2string((char*)buf,32,value); redisAssert(enclen < 32); - if ((n = rdbSaveLen(fp,enclen)) == -1) return -1; + if ((n = rdbSaveLen(rdb,enclen)) == -1) return -1; nwritten += n; - if ((n = rdbWriteRaw(fp,buf,enclen)) == -1) return -1; + if ((n = rdbWriteRaw(rdb,buf,enclen)) == -1) return -1; nwritten += n; } return nwritten; } /* Like rdbSaveStringObjectRaw() but handle encoded objects */ - int rdbSaveStringObject(FILE *fp, robj *obj) { + int rdbSaveStringObject(rio *rdb, robj *obj) { /* Avoid to decode the object, then encode it again, if the * object is alrady integer encoded. */ if (obj->encoding == REDIS_ENCODING_INT) { - return rdbSaveLongLongAsStringObject(fp,(long)obj->ptr); + return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr); } else { redisAssert(obj->encoding == REDIS_ENCODING_RAW); - return rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr)); + return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr)); } } + robj *rdbGenericLoadStringObject(rio *rdb, int encode) { + int isencoded; + uint32_t len; + sds val; + + len = rdbLoadLen(rdb,&isencoded); + if (isencoded) { + switch(len) { + case REDIS_RDB_ENC_INT8: + case REDIS_RDB_ENC_INT16: + case REDIS_RDB_ENC_INT32: + return rdbLoadIntegerObject(rdb,len,encode); + case REDIS_RDB_ENC_LZF: + return rdbLoadLzfStringObject(rdb); + default: + redisPanic("Unknown RDB encoding type"); + } + } + + if (len == REDIS_RDB_LENERR) return NULL; + val = sdsnewlen(NULL,len); + if (len && rioRead(rdb,val,len) == 0) { + sdsfree(val); + return NULL; + } + return createObject(REDIS_STRING,val); + } + + robj *rdbLoadStringObject(rio *rdb) { + return rdbGenericLoadStringObject(rdb,0); + } + + robj *rdbLoadEncodedStringObject(rio *rdb) { + return rdbGenericLoadStringObject(rdb,1); + } + /* Save a double value. Doubles are saved as strings prefixed by an unsigned * 8 bit integer specifing the length of the representation. * This 8 bit integer has special values in order to specify the following @@@ -211,7 -333,7 +333,7 @@@ * 254: + inf * 255: - inf */ - int rdbSaveDoubleValue(FILE *fp, double val) { + int rdbSaveDoubleValue(rio *rdb, double val) { unsigned char buf[128]; int len; @@@ -242,36 -364,101 +364,101 @@@ buf[0] = strlen((char*)buf+1); len = buf[0]+1; } - return rdbWriteRaw(fp,buf,len); + return rdbWriteRaw(rdb,buf,len); + } + + /* For information about double serialization check rdbSaveDoubleValue() */ + int rdbLoadDoubleValue(rio *rdb, double *val) { + char buf[128]; + unsigned char len; + + if (rioRead(rdb,&len,1) == 0) return -1; + switch(len) { + case 255: *val = R_NegInf; return 0; + case 254: *val = R_PosInf; return 0; + case 253: *val = R_Nan; return 0; + default: + if (rioRead(rdb,buf,len) == 0) return -1; + buf[len] = '\0'; + sscanf(buf, "%lg", val); + return 0; + } + } + + /* Save the object type of object "o". */ + int rdbSaveObjectType(rio *rdb, robj *o) { + switch (o->type) { + case REDIS_STRING: + return rdbSaveType(rdb,REDIS_RDB_TYPE_STRING); + case REDIS_LIST: + if (o->encoding == REDIS_ENCODING_ZIPLIST) + return rdbSaveType(rdb,REDIS_RDB_TYPE_LIST_ZIPLIST); + else if (o->encoding == REDIS_ENCODING_LINKEDLIST) + return rdbSaveType(rdb,REDIS_RDB_TYPE_LIST); + else + redisPanic("Unknown list encoding"); + case REDIS_SET: + if (o->encoding == REDIS_ENCODING_INTSET) + return rdbSaveType(rdb,REDIS_RDB_TYPE_SET_INTSET); + else if (o->encoding == REDIS_ENCODING_HT) + return rdbSaveType(rdb,REDIS_RDB_TYPE_SET); + else + redisPanic("Unknown set encoding"); + case REDIS_ZSET: + if (o->encoding == REDIS_ENCODING_ZIPLIST) + return rdbSaveType(rdb,REDIS_RDB_TYPE_ZSET_ZIPLIST); + else if (o->encoding == REDIS_ENCODING_SKIPLIST) + return rdbSaveType(rdb,REDIS_RDB_TYPE_ZSET); + else + redisPanic("Unknown sorted set encoding"); + case REDIS_HASH: + if (o->encoding == REDIS_ENCODING_ZIPMAP) + return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH_ZIPMAP); + else if (o->encoding == REDIS_ENCODING_HT) + return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH); + else + redisPanic("Unknown hash encoding"); + default: + redisPanic("Unknown object type"); + } + return -1; /* avoid warning */ + } + + /* Load object type. Return -1 when the byte doesn't contain an object type. */ + int rdbLoadObjectType(rio *rdb) { + int type; + if ((type = rdbLoadType(rdb)) == -1) return -1; + if (!rdbIsObjectType(type)) return -1; + return type; } /* Save a Redis object. Returns -1 on error, 0 on success. */ - int rdbSaveObject(FILE *fp, robj *o) { + int rdbSaveObject(rio *rdb, robj *o) { int n, nwritten = 0; if (o->type == REDIS_STRING) { /* Save a string value */ - if ((n = rdbSaveStringObject(fp,o)) == -1) return -1; + if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1; nwritten += n; } else if (o->type == REDIS_LIST) { /* Save a list value */ if (o->encoding == REDIS_ENCODING_ZIPLIST) { size_t l = ziplistBlobLen((unsigned char*)o->ptr); - if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1; + if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; nwritten += n; } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { list *list = o->ptr; listIter li; listNode *ln; - if ((n = rdbSaveLen(fp,listLength(list))) == -1) return -1; + if ((n = rdbSaveLen(rdb,listLength(list))) == -1) return -1; nwritten += n; listRewind(list,&li); while((ln = listNext(&li))) { robj *eleobj = listNodeValue(ln); - if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1; + if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1; nwritten += n; } } else { @@@ -284,19 -471,19 +471,19 @@@ dictIterator *di = dictGetIterator(set); dictEntry *de; - if ((n = rdbSaveLen(fp,dictSize(set))) == -1) return -1; + if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) return -1; nwritten += n; while((de = dictNext(di)) != NULL) { robj *eleobj = dictGetEntryKey(de); - if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1; + if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1; nwritten += n; } dictReleaseIterator(di); } else if (o->encoding == REDIS_ENCODING_INTSET) { size_t l = intsetBlobLen((intset*)o->ptr); - if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1; + if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; nwritten += n; } else { redisPanic("Unknown set encoding"); @@@ -306,23 -493,23 +493,23 @@@ if (o->encoding == REDIS_ENCODING_ZIPLIST) { size_t l = ziplistBlobLen((unsigned char*)o->ptr); - if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1; + if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; nwritten += n; } else if (o->encoding == REDIS_ENCODING_SKIPLIST) { zset *zs = o->ptr; dictIterator *di = dictGetIterator(zs->dict); dictEntry *de; - if ((n = rdbSaveLen(fp,dictSize(zs->dict))) == -1) return -1; + if ((n = rdbSaveLen(rdb,dictSize(zs->dict))) == -1) return -1; nwritten += n; while((de = dictNext(di)) != NULL) { robj *eleobj = dictGetEntryKey(de); double *score = dictGetEntryVal(de); - if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1; + if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1; nwritten += n; - if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1; + if ((n = rdbSaveDoubleValue(rdb,*score)) == -1) return -1; nwritten += n; } dictReleaseIterator(di); @@@ -334,22 -521,22 +521,22 @@@ if (o->encoding == REDIS_ENCODING_ZIPMAP) { size_t l = zipmapBlobLen((unsigned char*)o->ptr); - if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1; + if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; nwritten += n; } else { dictIterator *di = dictGetIterator(o->ptr); dictEntry *de; - if ((n = rdbSaveLen(fp,dictSize((dict*)o->ptr))) == -1) return -1; + if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) return -1; nwritten += n; while((de = dictNext(di)) != NULL) { robj *key = dictGetEntryKey(de); robj *val = dictGetEntryVal(de); - if ((n = rdbSaveStringObject(fp,key)) == -1) return -1; + if ((n = rdbSaveStringObject(rdb,key)) == -1) return -1; nwritten += n; - if ((n = rdbSaveStringObject(fp,val)) == -1) return -1; + if ((n = rdbSaveStringObject(rdb,val)) == -1) return -1; nwritten += n; } dictReleaseIterator(di); @@@ -374,33 -561,21 +561,21 @@@ off_t rdbSavedObjectLen(robj *o) * On error -1 is returned. * On success if the key was actaully saved 1 is returned, otherwise 0 * is returned (the key was already expired). */ - int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, + int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, time_t expiretime, time_t now) { - int vtype; - /* Save the expire time */ if (expiretime != -1) { /* If this key is already expired skip it */ if (expiretime < now) return 0; - if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) return -1; - if (rdbSaveTime(fp,expiretime) == -1) return -1; + if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME) == -1) return -1; + if (rdbSaveTime(rdb,expiretime) == -1) return -1; } - /* Fix the object type if needed, to support saving zipmaps, ziplists, - * and intsets, directly as blobs of bytes: they are already serialized. */ - vtype = val->type; - if (vtype == REDIS_HASH && val->encoding == REDIS_ENCODING_ZIPMAP) - vtype = REDIS_HASH_ZIPMAP; - else if (vtype == REDIS_LIST && val->encoding == REDIS_ENCODING_ZIPLIST) - vtype = REDIS_LIST_ZIPLIST; - else if (vtype == REDIS_SET && val->encoding == REDIS_ENCODING_INTSET) - vtype = REDIS_SET_INTSET; - else if (vtype == REDIS_ZSET && val->encoding == REDIS_ENCODING_ZIPLIST) - vtype = REDIS_ZSET_ZIPLIST; + /* Save type, key, value */ - if (rdbSaveType(fp,vtype) == -1) return -1; - if (rdbSaveStringObject(fp,key) == -1) return -1; - if (rdbSaveObject(fp,val) == -1) return -1; + if (rdbSaveObjectType(rdb,val) == -1) return -1; + if (rdbSaveStringObject(rdb,key) == -1) return -1; + if (rdbSaveObject(rdb,val) == -1) return -1; return 1; } @@@ -408,11 -583,17 +583,12 @@@ int rdbSave(char *filename) { dictIterator *di = NULL; dictEntry *de; - FILE *fp; char tmpfile[256]; int j; time_t now = time(NULL); + FILE *fp; + rio rdb; - if (server.ds_enabled) { - cacheForcePointInTime(); - return dsRdbSave(filename); - } - snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); if (!fp) { @@@ -420,20 -601,23 +596,23 @@@ strerror(errno)); return REDIS_ERR; } - if (fwrite("REDIS0002",9,1,fp) == 0) goto werr; + + rdb = rioInitWithFile(fp); + if (rdbWriteRaw(&rdb,"REDIS0002",9) == -1) goto werr; + for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; - di = dictGetIterator(d); + di = dictGetSafeIterator(d); if (!di) { fclose(fp); return REDIS_ERR; } /* Write the SELECT DB opcode */ - if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr; - if (rdbSaveLen(fp,j) == -1) goto werr; + if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr; + if (rdbSaveLen(&rdb,j) == -1) goto werr; /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { @@@ -443,12 -627,12 +622,12 @@@ initStaticStringObject(key,keystr); expire = getExpire(db,&key); - if (rdbSaveKeyValuePair(fp,&key,o,expire,now) == -1) goto werr; + if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr; } dictReleaseIterator(di); } /* EOF opcode */ - if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr; + if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr; /* Make sure data will not remain on the OS's output buffers */ fflush(fp); @@@ -477,13 -661,17 +656,13 @@@ werr int rdbSaveBackground(char *filename) { pid_t childpid; + long long start; - if (server.bgsavechildpid != -1 || - server.bgsavethread != (pthread_t) -1) return REDIS_ERR; + if (server.bgsavechildpid != -1) return REDIS_ERR; server.dirty_before_bgsave = server.dirty; - if (server.ds_enabled) { - cacheForcePointInTime(); - return dsRdbSaveBackground(filename); - } - + start = ustime(); if ((childpid = fork()) == 0) { int retval; @@@ -494,7 -682,6 +673,7 @@@ _exit((retval == REDIS_OK) ? 0 : 1); } else { /* Parent */ + server.stat_fork_time = ustime()-start; if (childpid == -1) { redisLog(REDIS_WARNING,"Can't save in background: fork: %s", strerror(errno)); @@@ -515,168 -702,21 +694,21 @@@ void rdbRemoveTempFile(pid_t childpid) unlink(tmpfile); } - int rdbLoadType(FILE *fp) { - unsigned char type; - if (fread(&type,1,1,fp) == 0) return -1; - return type; - } - - time_t rdbLoadTime(FILE *fp) { - int32_t t32; - if (fread(&t32,4,1,fp) == 0) return -1; - return (time_t) t32; - } - - /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top - * of this file for a description of how this are stored on disk. - * - * isencoded is set to 1 if the readed length is not actually a length but - * an "encoding type", check the above comments for more info */ - uint32_t rdbLoadLen(FILE *fp, int *isencoded) { - unsigned char buf[2]; - uint32_t len; - int type; - - if (isencoded) *isencoded = 0; - if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR; - type = (buf[0]&0xC0)>>6; - if (type == REDIS_RDB_6BITLEN) { - /* Read a 6 bit len */ - return buf[0]&0x3F; - } else if (type == REDIS_RDB_ENCVAL) { - /* Read a 6 bit len encoding type */ - if (isencoded) *isencoded = 1; - return buf[0]&0x3F; - } else if (type == REDIS_RDB_14BITLEN) { - /* Read a 14 bit len */ - if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR; - return ((buf[0]&0x3F)<<8)|buf[1]; - } else { - /* Read a 32 bit len */ - if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR; - return ntohl(len); - } - } - - /* Load an integer-encoded object from file 'fp', with the specified - * encoding type 'enctype'. If encode is true the function may return - * an integer-encoded object as reply, otherwise the returned object - * will always be encoded as a raw string. */ - robj *rdbLoadIntegerObject(FILE *fp, int enctype, int encode) { - unsigned char enc[4]; - long long val; - - if (enctype == REDIS_RDB_ENC_INT8) { - if (fread(enc,1,1,fp) == 0) return NULL; - val = (signed char)enc[0]; - } else if (enctype == REDIS_RDB_ENC_INT16) { - uint16_t v; - if (fread(enc,2,1,fp) == 0) return NULL; - v = enc[0]|(enc[1]<<8); - val = (int16_t)v; - } else if (enctype == REDIS_RDB_ENC_INT32) { - uint32_t v; - if (fread(enc,4,1,fp) == 0) return NULL; - v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24); - val = (int32_t)v; - } else { - val = 0; /* anti-warning */ - redisPanic("Unknown RDB integer encoding type"); - } - if (encode) - return createStringObjectFromLongLong(val); - else - return createObject(REDIS_STRING,sdsfromlonglong(val)); - } - - robj *rdbLoadLzfStringObject(FILE*fp) { - unsigned int len, clen; - unsigned char *c = NULL; - sds val = NULL; - - if ((clen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; - if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; - if ((c = zmalloc(clen)) == NULL) goto err; - if ((val = sdsnewlen(NULL,len)) == NULL) goto err; - if (fread(c,clen,1,fp) == 0) goto err; - if (lzf_decompress(c,clen,val,len) == 0) goto err; - zfree(c); - return createObject(REDIS_STRING,val); - err: - zfree(c); - sdsfree(val); - return NULL; - } - - robj *rdbGenericLoadStringObject(FILE*fp, int encode) { - int isencoded; - uint32_t len; - sds val; - - len = rdbLoadLen(fp,&isencoded); - if (isencoded) { - switch(len) { - case REDIS_RDB_ENC_INT8: - case REDIS_RDB_ENC_INT16: - case REDIS_RDB_ENC_INT32: - return rdbLoadIntegerObject(fp,len,encode); - case REDIS_RDB_ENC_LZF: - return rdbLoadLzfStringObject(fp); - default: - redisPanic("Unknown RDB encoding type"); - } - } - - if (len == REDIS_RDB_LENERR) return NULL; - val = sdsnewlen(NULL,len); - if (len && fread(val,len,1,fp) == 0) { - sdsfree(val); - return NULL; - } - return createObject(REDIS_STRING,val); - } - - robj *rdbLoadStringObject(FILE *fp) { - return rdbGenericLoadStringObject(fp,0); - } - - robj *rdbLoadEncodedStringObject(FILE *fp) { - return rdbGenericLoadStringObject(fp,1); - } - - /* For information about double serialization check rdbSaveDoubleValue() */ - int rdbLoadDoubleValue(FILE *fp, double *val) { - char buf[128]; - unsigned char len; - - if (fread(&len,1,1,fp) == 0) return -1; - switch(len) { - case 255: *val = R_NegInf; return 0; - case 254: *val = R_PosInf; return 0; - case 253: *val = R_Nan; return 0; - default: - if (fread(buf,len,1,fp) == 0) return -1; - buf[len] = '\0'; - sscanf(buf, "%lg", val); - return 0; - } - } - /* Load a Redis object of the specified type from the specified file. * On success a newly allocated object is returned, otherwise NULL. */ - robj *rdbLoadObject(int type, FILE *fp) { + robj *rdbLoadObject(int rdbtype, rio *rdb) { robj *o, *ele, *dec; size_t len; unsigned int i; - redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp)); - if (type == REDIS_STRING) { + redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",rdbtype,rdb->tell(rdb)); + if (rdbtype == REDIS_RDB_TYPE_STRING) { /* Read string value */ - if ((o = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; o = tryObjectEncoding(o); - } else if (type == REDIS_LIST) { + } else if (rdbtype == REDIS_RDB_TYPE_LIST) { /* Read list value */ - if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((len = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL; /* Use a real list when there are too many entries */ if (len > server.list_max_ziplist_entries) { @@@ -687,7 -727,7 +719,7 @@@ /* Load every single element of the list */ while(len--) { - if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; /* If we are using a ziplist and the value is too big, convert * the object to a real list. */ @@@ -706,9 -746,9 +738,9 @@@ listAddNodeTail(o->ptr,ele); } } - } else if (type == REDIS_SET) { + } else if (rdbtype == REDIS_RDB_TYPE_SET) { /* Read list/set value */ - if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((len = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL; /* Use a regular set when there are too many entries. */ if (len > server.set_max_intset_entries) { @@@ -724,7 -764,7 +756,7 @@@ /* Load every single element of the list/set */ for (i = 0; i < len; i++) { long long llval; - if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; ele = tryObjectEncoding(ele); if (o->encoding == REDIS_ENCODING_INTSET) { @@@ -745,13 -785,13 +777,13 @@@ decrRefCount(ele); } } - } else if (type == REDIS_ZSET) { + } else if (rdbtype == REDIS_RDB_TYPE_ZSET) { /* Read list/set value */ size_t zsetlen; size_t maxelelen = 0; zset *zs; - if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((zsetlen = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL; o = createZsetObject(); zs = o->ptr; @@@ -761,9 -801,9 +793,9 @@@ double score; zskiplistNode *znode; - if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; ele = tryObjectEncoding(ele); - if (rdbLoadDoubleValue(fp,&score) == -1) return NULL; + if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL; /* Don't care about integer-encoded strings. */ if (ele->encoding == REDIS_ENCODING_RAW && @@@ -779,10 -819,10 +811,10 @@@ if (zsetLength(o) <= server.zset_max_ziplist_entries && maxelelen <= server.zset_max_ziplist_value) zsetConvert(o,REDIS_ENCODING_ZIPLIST); - } else if (type == REDIS_HASH) { + } else if (rdbtype == REDIS_RDB_TYPE_HASH) { size_t hashlen; - if ((hashlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL; + if ((hashlen = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL; o = createHashObject(); /* Too many entries? Use an hash table. */ if (hashlen > server.hash_max_zipmap_entries) @@@ -792,8 -832,8 +824,8 @@@ while(hashlen--) { robj *key, *val; - if ((key = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; - if ((val = rdbLoadEncodedStringObject(fp)) == NULL) return NULL; + if ((key = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; + if ((val = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; /* If we are using a zipmap and there are too big values * the object is converted to real hash table encoding. */ if (o->encoding != REDIS_ENCODING_HT && @@@ -825,12 -865,12 +857,12 @@@ dictAdd((dict*)o->ptr,key,val); } } - } else if (type == REDIS_HASH_ZIPMAP || - type == REDIS_LIST_ZIPLIST || - type == REDIS_SET_INTSET || - type == REDIS_ZSET_ZIPLIST) + } else if (rdbtype == REDIS_RDB_TYPE_HASH_ZIPMAP || + rdbtype == REDIS_RDB_TYPE_LIST_ZIPLIST || + rdbtype == REDIS_RDB_TYPE_SET_INTSET || + rdbtype == REDIS_RDB_TYPE_ZSET_ZIPLIST) { - robj *aux = rdbLoadStringObject(fp); + robj *aux = rdbLoadStringObject(rdb); if (aux == NULL) return NULL; o = createObject(REDIS_STRING,NULL); /* string is just placeholder */ @@@ -844,26 -884,26 +876,26 @@@ * type. Note that we only check the length and not max element * size as this is an O(N) scan. Eventually everything will get * converted. */ - switch(type) { - case REDIS_HASH_ZIPMAP: + switch(rdbtype) { + case REDIS_RDB_TYPE_HASH_ZIPMAP: o->type = REDIS_HASH; o->encoding = REDIS_ENCODING_ZIPMAP; if (zipmapLen(o->ptr) > server.hash_max_zipmap_entries) convertToRealHash(o); break; - case REDIS_LIST_ZIPLIST: + case REDIS_RDB_TYPE_LIST_ZIPLIST: o->type = REDIS_LIST; o->encoding = REDIS_ENCODING_ZIPLIST; if (ziplistLen(o->ptr) > server.list_max_ziplist_entries) listTypeConvert(o,REDIS_ENCODING_LINKEDLIST); break; - case REDIS_SET_INTSET: + case REDIS_RDB_TYPE_SET_INTSET: o->type = REDIS_SET; o->encoding = REDIS_ENCODING_INTSET; if (intsetLen(o->ptr) > server.set_max_intset_entries) setTypeConvert(o,REDIS_ENCODING_HT); break; - case REDIS_ZSET_ZIPLIST: + case REDIS_RDB_TYPE_ZSET_ZIPLIST: o->type = REDIS_ZSET; o->encoding = REDIS_ENCODING_ZIPLIST; if (zsetLength(o) > server.zset_max_ziplist_entries) @@@ -905,17 -945,19 +937,19 @@@ void stopLoading(void) } int rdbLoad(char *filename) { - FILE *fp; uint32_t dbid; - int type, retval, rdbver; + int type, rdbver; redisDb *db = server.db+0; char buf[1024]; time_t expiretime, now = time(NULL); long loops = 0; + FILE *fp; + rio rdb; fp = fopen(filename,"r"); if (!fp) return REDIS_ERR; - if (fread(buf,9,1,fp) == 0) goto eoferr; + rdb = rioInitWithFile(fp); + if (rioRead(&rdb,buf,9) == 0) goto eoferr; buf[9] = '\0'; if (memcmp(buf,"REDIS",5) != 0) { fclose(fp); @@@ -936,21 -978,24 +970,24 @@@ /* Serve the clients from time to time */ if (!(loops++ % 1000)) { - loadingProgress(ftello(fp)); + loadingProgress(rdb.tell(&rdb)); aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); } /* Read type. */ - if ((type = rdbLoadType(fp)) == -1) goto eoferr; - if (type == REDIS_EXPIRETIME) { - if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr; - /* We read the time so we need to read the object type again */ - if ((type = rdbLoadType(fp)) == -1) goto eoferr; + if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; + if (type == REDIS_RDB_OPCODE_EXPIRETIME) { + if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr; + /* We read the time so we need to read the object type again. */ + if ((type = rdbLoadType(&rdb)) == -1) goto eoferr; } - if (type == REDIS_EOF) break; + + if (type == REDIS_RDB_OPCODE_EOF) + break; + /* Handle SELECT DB opcode as a special case */ - if (type == REDIS_SELECTDB) { - if ((dbid = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) + if (type == REDIS_RDB_OPCODE_SELECTDB) { + if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR) goto eoferr; if (dbid >= (unsigned)server.dbnum) { redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum); @@@ -960,9 -1005,9 +997,9 @@@ continue; } /* Read key */ - if ((key = rdbLoadStringObject(fp)) == NULL) goto eoferr; + if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr; /* Read value */ - if ((val = rdbLoadObject(type,fp)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr; /* Check if the key already expired */ if (expiretime != -1 && expiretime < now) { decrRefCount(key); @@@ -970,8 -1015,11 +1007,8 @@@ continue; } /* Add the new object in the hash table */ - retval = dbAdd(db,key,val); - if (retval == REDIS_ERR) { - redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", key->ptr); - exit(1); - } + dbAdd(db,key,val); + /* Set the expire time if needed */ if (expiretime != -1) setExpire(db,key,expiretime); @@@ -1002,13 -1050,15 +1039,13 @@@ void backgroundSaveDoneHandler(int exit rdbRemoveTempFile(server.bgsavechildpid); } server.bgsavechildpid = -1; - server.bgsavethread = (pthread_t) -1; - server.bgsavethread_state = REDIS_BGSAVE_THREAD_UNACTIVE; /* Possibly there are slaves waiting for a BGSAVE in order to be served * (the first stage of SYNC is a bulk transfer of dump.rdb) */ updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); } void saveCommand(redisClient *c) { - if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread_t)-1) { + if (server.bgsavechildpid != -1) { addReplyError(c,"Background save already in progress"); return; } @@@ -1020,11 -1070,11 +1057,11 @@@ } void bgsaveCommand(redisClient *c) { - if (server.bgsavechildpid != -1 || server.bgsavethread != (pthread_t)-1) { + if (server.bgsavechildpid != -1) { addReplyError(c,"Background save already in progress"); - return; - } - if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { + } else if (server.bgrewritechildpid != -1) { + addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress"); + } else if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { addReplyStatus(c,"Background saving started"); } else { addReply(c,shared.err); diff --combined src/redis.h index e754918d,5af2da8e..e9a31b8c --- a/src/redis.h +++ b/src/redis.h @@@ -19,19 -19,18 +19,19 @@@ #include #include #include +#include -#include "ae.h" /* Event driven programming library */ -#include "sds.h" /* Dynamic safe strings */ -#include "dict.h" /* Hash tables */ -#include "adlist.h" /* Linked lists */ +#include "ae.h" /* Event driven programming library */ +#include "sds.h" /* Dynamic safe strings */ +#include "dict.h" /* Hash tables */ +#include "adlist.h" /* Linked lists */ #include "zmalloc.h" /* total memory usage aware version of malloc/free */ -#include "anet.h" /* Networking the easy way */ -#include "zipmap.h" /* Compact string -> string data structure */ +#include "anet.h" /* Networking the easy way */ +#include "zipmap.h" /* Compact string -> string data structure */ #include "ziplist.h" /* Compact list data structure */ -#include "intset.h" /* Compact integer set structure */ -#include "version.h" -#include "util.h" +#include "intset.h" /* Compact integer set structure */ +#include "version.h" /* Version macro */ +#include "util.h" /* Misc functions useful in many places */ /* Error codes */ #define REDIS_OK 0 @@@ -42,6 -41,7 +42,6 @@@ #define REDIS_MAXIDLETIME (60*5) /* default client timeout */ #define REDIS_IOBUF_LEN 1024 #define REDIS_LOADBUF_LEN 1024 -#define REDIS_STATIC_ARGS 8 #define REDIS_DEFAULT_DBNUM 16 #define REDIS_CONFIGLINE_MAX 1024 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */ @@@ -51,10 -51,6 +51,10 @@@ #define REDIS_SHARED_INTEGERS 10000 #define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */ #define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */ +#define REDIS_AUTO_AOFREWRITE_PERC 100 +#define REDIS_AUTO_AOFREWRITE_MIN_SIZE (1024*1024) +#define REDIS_SLOWLOG_LOG_SLOWER_THAN 10000 +#define REDIS_SLOWLOG_MAX_LEN 64 /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ @@@ -77,12 -73,6 +77,6 @@@ #define REDIS_HASH 4 #define REDIS_VMPOINTER 8 - /* Object types only used for persistence in .rdb files */ - #define REDIS_HASH_ZIPMAP 9 - #define REDIS_LIST_ZIPLIST 10 - #define REDIS_SET_INTSET 11 - #define REDIS_ZSET_ZIPLIST 12 - /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object * is set to one of this fields for this object. */ @@@ -95,37 -85,18 +89,37 @@@ #define REDIS_ENCODING_INTSET 6 /* Encoded as intset */ #define REDIS_ENCODING_SKIPLIST 7 /* Encoded as skiplist */ -/* Scheduled IO opeations flags. */ -#define REDIS_IO_LOAD 1 -#define REDIS_IO_SAVE 2 -#define REDIS_IO_LOADINPROG 4 -#define REDIS_IO_SAVEINPROG 8 +/* Object types only used for dumping to disk */ +#define REDIS_EXPIRETIME 253 +#define REDIS_SELECTDB 254 +#define REDIS_EOF 255 -/* Generic IO flags */ -#define REDIS_IO_ONLYLOADS 1 -#define REDIS_IO_ASAP 2 - -#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 -#define REDIS_THREAD_STACK_SIZE (1024*1024*4) +/* Defines related to the dump file format. To store 32 bits lengths for short + * keys requires a lot of space, so we check the most significant 2 bits of + * the first byte to interpreter the length: + * + * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte + * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte + * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow + * 11|000000 this means: specially encoded object will follow. The six bits + * number specify the kind of object that follows. + * See the REDIS_RDB_ENC_* defines. + * + * Lenghts up to 63 are stored using a single byte, most DB keys, and may + * values, will fit inside. */ +#define REDIS_RDB_6BITLEN 0 +#define REDIS_RDB_14BITLEN 1 +#define REDIS_RDB_32BITLEN 2 +#define REDIS_RDB_ENCVAL 3 +#define REDIS_RDB_LENERR UINT_MAX + +/* When a length of a string object stored on disk has the first two bits + * set, the remaining two bits specify a special encoding for the object + * accordingly to the following defines: */ +#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */ +#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */ +#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ +#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ /* Client flags */ #define REDIS_SLAVE 1 /* This client is a slave server */ @@@ -133,25 -104,21 +127,25 @@@ #define REDIS_MONITOR 4 /* This client is a slave monitor, see MONITOR */ #define REDIS_MULTI 8 /* This client is in a MULTI context */ #define REDIS_BLOCKED 16 /* The client is waiting in a blocking operation */ -#define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ #define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ #define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */ #define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in server.unblocked_clients */ +#define REDIS_LUA_CLIENT 512 /* This is a non connected client used by Lua */ /* Client request types */ #define REDIS_REQ_INLINE 1 #define REDIS_REQ_MULTIBULK 2 /* Slave replication state - slave side */ -#define REDIS_REPL_NONE 0 /* No active replication */ -#define REDIS_REPL_CONNECT 1 /* Must connect to master */ -#define REDIS_REPL_TRANSFER 2 /* Receiving .rdb from master */ -#define REDIS_REPL_CONNECTED 3 /* Connected to master */ +#define REDIS_REPL_NONE 0 /* No active replication */ +#define REDIS_REPL_CONNECT 1 /* Must connect to master */ +#define REDIS_REPL_CONNECTING 2 /* Connecting to master */ +#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */ +#define REDIS_REPL_CONNECTED 4 /* Connected to master */ + +/* Synchronous read timeout - slave side */ +#define REDIS_REPL_SYNCIO_TIMEOUT 5 /* Slave replication state - from the point of view of master * Note that in SEND_BULK and ONLINE state the slave receives new updates @@@ -212,8 -179,11 +206,8 @@@ #define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4 #define REDIS_MAXMEMORY_NO_EVICTION 5 -/* Diskstore background saving thread states */ -#define REDIS_BGSAVE_THREAD_UNACTIVE 0 -#define REDIS_BGSAVE_THREAD_ACTIVE 1 -#define REDIS_BGSAVE_THREAD_DONE_OK 2 -#define REDIS_BGSAVE_THREAD_DONE_ERR 3 +/* Scripting */ +#define REDIS_LUA_TIME_LIMIT 60000 /* milliseconds */ /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) @@@ -279,6 -249,9 +273,6 @@@ typedef struct redisDb dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ - dict *io_keys; /* Keys with clients waiting for DS I/O */ - dict *io_negcache; /* Negative caching for disk store */ - dict *io_queued; /* Queued IO operations hash table */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@@ -314,7 -287,6 +308,7 @@@ typedef struct redisClient sds querybuf; int argc; robj **argv; + struct redisCommand *cmd; int reqtype; int multibulklen; /* number of multi bulk arguments left to read */ long bulklen; /* length of bulk argument in multi bulk request */ @@@ -350,7 -322,7 +344,7 @@@ struct sharedObjectsStruct robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *cnegone, *pong, *space, *colon, *nullbulk, *nullmultibulk, *queued, *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, - *outofrangeerr, *loadingerr, *plus, + *outofrangeerr, *noscripterr, *loadingerr, *plus, *select0, *select1, *select2, *select3, *select4, *select5, *select6, *select7, *select8, *select9, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3, @@@ -501,6 -473,7 +495,6 @@@ typedef struct struct redisServer { /* General */ - pthread_t mainthread; redisDb *db; dict *commands; /* Command table hahs table */ aeEventLoop *el; @@@ -532,11 -505,6 +526,11 @@@ long long stat_keyspace_hits; /* number of successful lookups of keys */ long long stat_keyspace_misses; /* number of failed lookups of keys */ size_t stat_peak_memory; /* max used memory record */ + long long stat_fork_time; /* time needed to perform latets fork() */ + list *slowlog; + long long slowlog_entry_id; + long long slowlog_log_slower_than; + unsigned long slowlog_max_len; /* Configuration */ int verbosity; int maxidletime; @@@ -545,11 -513,6 +539,11 @@@ int appendonly; int appendfsync; int no_appendfsync_on_rewrite; + int auto_aofrewrite_perc; /* Rewrite AOF if % growth is > M and... */ + off_t auto_aofrewrite_min_size; /* the AOF file is at least N bytes. */ + off_t auto_aofrewrite_base_size;/* AOF size on latest startup or rewrite. */ + off_t appendonly_current_size; /* AOF current size. */ + int aofrewrite_scheduled; /* Rewrite once BGSAVE terminates. */ int shutdown_asap; int activerehashing; char *requirepass; @@@ -559,10 -522,12 +553,10 @@@ time_t lastfsync; int appendfd; int appendseldb; + time_t aof_flush_postponed_start; char *pidfile; pid_t bgsavechildpid; pid_t bgrewritechildpid; - int bgsavethread_state; - pthread_mutex_t bgsavethread_mutex; - pthread_t bgsavethread; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; @@@ -582,7 -547,6 +576,7 @@@ char *masterhost; int masterport; redisClient *master; /* client that is master for this slave */ + int repl_syncio_timeout; /* timeout for synchronous I/O calls */ int replstate; /* replication status if the instance is a slave */ off_t repl_transfer_left; /* bytes left reading .rdb */ int repl_transfer_s; /* slave -> master SYNC socket */ @@@ -590,7 -554,6 +584,7 @@@ char *repl_transfer_tmpfile; /* slave-> master SYNC temp file name */ time_t repl_transfer_lastio; /* unix time of the latest read, for timeout */ int repl_serve_stale_data; /* Serve stale data when link is down? */ + time_t repl_down_since; /* unix time at which link with master went down */ /* Limits */ unsigned int maxclients; unsigned long long maxmemory; @@@ -598,12 -561,19 +592,12 @@@ int maxmemory_samples; /* Blocked clients */ unsigned int bpop_blocked_clients; - unsigned int cache_blocked_clients; list *unblocked_clients; /* list of clients to unblock before next loop */ - list *cache_io_queue; /* IO operations queue */ - int cache_flush_delay; /* seconds to wait before flushing keys */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; int sort_alpha; int sort_bypattern; - /* Virtual memory configuration */ - int ds_enabled; /* backend disk in redis.conf */ - char *ds_path; /* location of the disk store on disk */ - unsigned long long cache_max_memory; /* Zip structure config */ size_t hash_max_zipmap_entries; size_t hash_max_zipmap_value; @@@ -613,6 -583,30 +607,6 @@@ size_t zset_max_ziplist_entries; size_t zset_max_ziplist_value; time_t unixtime; /* Unix time sampled every second. */ - /* Virtual memory I/O threads stuff */ - /* An I/O thread process an element taken from the io_jobs queue and - * put the result of the operation in the io_done list. While the - * job is being processed, it's put on io_processing queue. */ - list *io_newjobs; /* List of VM I/O jobs yet to be processed */ - list *io_processing; /* List of VM I/O jobs being processed */ - list *io_processed; /* List of VM I/O jobs already processed */ - list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */ - pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ - pthread_cond_t io_condvar; /* I/O threads conditional variable */ - pthread_attr_t io_threads_attr; /* attributes for threads creation */ - int io_active_threads; /* Number of running I/O threads */ - int vm_max_threads; /* Max number of I/O threads running at the same time */ - /* Our main thread is blocked on the event loop, locking for sockets ready - * to be read or written, so when a threaded I/O operation is ready to be - * processed by the main thread, the I/O thread will use a unix pipe to - * awake the main thread. The followings are the two pipe FDs. */ - int io_ready_pipe_read; - int io_ready_pipe_write; - /* Virtual memory stats */ - unsigned long long vm_stats_used_pages; - unsigned long long vm_stats_swapped_objects; - unsigned long long vm_stats_swapouts; - unsigned long long vm_stats_swapins; /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ @@@ -622,12 -616,6 +616,12 @@@ /* Cluster */ int cluster_enabled; clusterState cluster; + /* Scripting */ + lua_State *lua; /* The Lua interpreter. We use just one for all clients */ + redisClient *lua_client; /* The "fake client" to query Redis from Lua */ + dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ + long long lua_time_limit; + long long lua_time_start; }; typedef struct pubsubPattern { @@@ -643,7 -631,7 +637,7 @@@ struct redisCommand int arity; int flags; /* Use a function to determine keys arguments in a command line. - * Used both for diskstore preloading and Redis Cluster. */ + * Used for Redis Cluster redirect. */ redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ int firstkey; /* The first argument that's a key (0 = no keys) */ @@@ -670,6 -658,27 +664,6 @@@ typedef struct _redisSortOperation robj *pattern; } redisSortOperation; -/* DIsk store threaded I/O request message */ -#define REDIS_IOJOB_LOAD 0 -#define REDIS_IOJOB_SAVE 1 - -typedef struct iojob { - int type; /* Request type, REDIS_IOJOB_* */ - redisDb *db;/* Redis database */ - robj *key; /* This I/O request is about this key */ - robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this - * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */ - time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */ -} iojob; - -/* IO operations scheduled -- check dscache.c for more info */ -typedef struct ioop { - int type; - redisDb *db; - robj *key; - time_t ctime; /* This is the creation time of the entry. */ -} ioop; - /* Structure to hold list iteration abstraction. */ typedef struct { robj *subject; @@@ -720,7 -729,6 +714,7 @@@ extern struct sharedObjectsStruct share extern dictType setDictType; extern dictType zsetDictType; extern dictType clusterNodesDictType; +extern dictType dbDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; dictType hashDictType; @@@ -760,8 -768,6 +754,8 @@@ void addReplyMultiBulkLen(redisClient * void *dupClientReplyValue(void *o); void getClientsMaxBuffers(unsigned long *longest_output_list, unsigned long *biggest_input_buffer); +void rewriteClientCommandVector(redisClient *c, int argc, ...); +void rewriteClientCommandArgument(redisClient *c, int i, robj *newval); #ifdef __GNUC__ void addReplyErrorFormat(redisClient *c, const char *fmt, ...) @@@ -794,14 -800,13 +788,14 @@@ void popGenericCommand(redisClient *c, void unwatchAllKeys(redisClient *c); void initClientMultiState(redisClient *c); void freeClientMultiState(redisClient *c); -void queueMultiCommand(redisClient *c, struct redisCommand *cmd); +void queueMultiCommand(redisClient *c); void touchWatchedKey(redisDb *db, robj *key); void touchWatchedKeysOnFlush(int dbid); /* Redis object implementation */ void decrRefCount(void *o); void incrRefCount(robj *o); +robj *resetRefCount(robj *obj); void freeStringObject(robj *o); void freeListObject(robj *o); void freeSetObject(robj *o); @@@ -836,15 -841,11 +830,10 @@@ unsigned long estimateObjectIdleTime(ro int syncWrite(int fd, char *ptr, ssize_t size, int timeout); int syncRead(int fd, char *ptr, ssize_t size, int timeout); int syncReadLine(int fd, char *ptr, ssize_t size, int timeout); - int fwriteBulkString(FILE *fp, char *s, unsigned long len); - int fwriteBulkDouble(FILE *fp, double d); - int fwriteBulkLongLong(FILE *fp, long long l); - int fwriteBulkObject(FILE *fp, robj *obj); - int fwriteBulkCount(FILE *fp, char prefix, int count); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); void replicationFeedMonitors(list *monitors, int dictid, robj **argv, int argc); -int syncWithMaster(void); void updateSlavesWaitingBgsave(int bgsaveerr); void replicationCron(void); @@@ -854,24 -855,10 +843,10 @@@ void loadingProgress(off_t pos) void stopLoading(void); /* RDB persistence */ - int rdbLoad(char *filename); - int rdbSaveBackground(char *filename); - void rdbRemoveTempFile(pid_t childpid); - int rdbSave(char *filename); - int rdbSaveObject(FILE *fp, robj *o); - off_t rdbSavedObjectLen(robj *o); - off_t rdbSavedObjectPages(robj *o); - robj *rdbLoadObject(int type, FILE *fp); - void backgroundSaveDoneHandler(int exitcode, int bysignal); - int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, time_t expireitme, time_t now); - int rdbLoadType(FILE *fp); - time_t rdbLoadTime(FILE *fp); - robj *rdbLoadStringObject(FILE *fp); - int rdbSaveType(FILE *fp, unsigned char type); - int rdbSaveLen(FILE *fp, uint32_t len); + #include "rdb.h" /* AOF persistence */ -void flushAppendOnlyFile(void); +void flushAppendOnlyFile(int force); void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc); void aofRemoveTempFile(pid_t childpid); int rewriteAppendOnlyFileBackground(void); @@@ -906,10 -893,9 +881,10 @@@ int processCommand(redisClient *c) void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommandByCString(char *s); -void call(redisClient *c, struct redisCommand *cmd); +void call(redisClient *c); int prepareForShutdown(); void redisLog(int level, const char *fmt, ...); +void redisLogRaw(int level, const char *msg); void usage(); void updateDictResizePolicy(void); int htNeedsResize(dict *dict); @@@ -917,6 -903,40 +892,6 @@@ void oom(const char *msg) void populateCommandTable(void); void resetCommandTableStats(void); -/* Disk store */ -int dsOpen(void); -int dsClose(void); -int dsSet(redisDb *db, robj *key, robj *val, time_t expire); -robj *dsGet(redisDb *db, robj *key, time_t *expire); -int dsDel(redisDb *db, robj *key); -int dsExists(redisDb *db, robj *key); -void dsFlushDb(int dbid); -int dsRdbSaveBackground(char *filename); -int dsRdbSave(char *filename); - -/* Disk Store Cache */ -void dsInit(void); -void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask); -void lockThreadedIO(void); -void unlockThreadedIO(void); -void freeIOJob(iojob *j); -void queueIOJob(iojob *j); -void waitEmptyIOJobsQueue(void); -void processAllPendingIOJobs(void); -int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); -int dontWaitForSwappedKey(redisClient *c, robj *key); -void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); -int cacheFreeOneEntry(void); -void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag); -void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag); -int cacheScheduleIOGetFlags(redisDb *db, robj *key); -void cacheScheduleIO(redisDb *db, robj *key, int type); -void cacheCron(void); -int cacheKeyMayExist(redisDb *db, robj *key); -void cacheSetKeyMayExist(redisDb *db, robj *key); -void cacheSetKeyDoesNotExist(redisDb *db, robj *key); -void cacheForcePointInTime(void); - /* Set data type */ robj *setTypeCreate(robj *value); int setTypeAdd(robj *subject, robj *value); @@@ -969,9 -989,8 +944,9 @@@ robj *lookupKeyRead(redisDb *db, robj * robj *lookupKeyWrite(redisDb *db, robj *key); robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply); robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply); -int dbAdd(redisDb *db, robj *key, robj *val); -int dbReplace(redisDb *db, robj *key, robj *val); +void dbAdd(redisDb *db, robj *key, robj *val); +void dbOverwrite(redisDb *db, robj *key, robj *val); +void setKey(redisDb *db, robj *key, robj *val); int dbExists(redisDb *db, robj *key); robj *dbRandomKey(redisDb *db); int dbDelete(redisDb *db, robj *key); @@@ -999,9 -1018,6 +974,9 @@@ int clusterAddNode(clusterNode *node) void clusterCron(void); clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); +/* Scripting */ +void scriptingInit(void); + /* Git SHA1 */ char *redisGitSHA1(void); char *redisGitDirty(void); @@@ -1130,8 -1146,6 +1105,8 @@@ void migrateCommand(redisClient *c) void dumpCommand(redisClient *c); void objectCommand(redisClient *c); void clientCommand(redisClient *c); +void evalCommand(redisClient *c); +void evalShaCommand(redisClient *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated));