% make 32bit
-After you build Redis is a good idea to test it, using:
+After you build Redis is a good idea to test it (which require Tcl), using:
% make test
the variable number of arguments represent values, and there is no conflict
with the return value of the command.
+2.6
+===
+
+* Everything under the "SCRIPTING" section.
+* Float increments (INCRBYFLOAT).
+* Fix BRPOPLPUSH + vararg LPUSH semantics.
+* AOF everysec fsync in background (either the aof-bg branch or something else).
+
CLUSTER
=======
* MULTI/EXEC/...: should we do more than simply ignoring it?
* Prevent Lua from calling itself with redis("eval",...)
* SCRIPT FLUSH or alike to start a fresh interpreter?
-* http://redis.io/topics/sponsors
+* Check better the replication handling.
+* Prevent execution of writes if random commands are used.
APPEND ONLY FILE
================
* Redis big lists as linked lists of small ziplists?
Possibly a simple heuristic that join near nodes when some node gets smaller than the low_level, and split it into two if gets bigger than high_level.
-REPORTING
-=========
-
-* Better INFO output with sections.
-
RANDOM
======
+* Server should abort when getcwd() fails if there is some kind of persistence configured. Check this in the cron loop.
* Clients should be closed as far as the output buffer list is bigger than a given number of elements (configurable in redis.conf)
* Should the redis default configuration, and the default redis.conf, just bind 127.0.0.1?
KNOWN BUGS
==========
-* What happens in the following scenario:
- 1) We are reading an AOF file.
- 2) SETEX FOO 5 BAR
- 3) APPEND FOO ZAP
- What happens if between 1 and 2 for some reason (system under huge load
- or alike) too many time passes? We should prevent expires while the
- AOF is loading.
* #519: Slave may have expired keys that were never read in the master (so a DEL
is not sent in the replication channel) but are already expired since
a lot of time. Maybe after a given delay that is undoubltly greater than
the replication link latency we should expire this key on the slave on
access?
-
-DISKSTORE TODO
-==============
-
-* Fix FLUSHALL/FLUSHDB: the queue of pending reads/writes should be handled.
-* Check that 00/00 and ff/ff exist at startup, otherwise exit with error.
-* Implement sync flush option, where data is written synchronously on disk when a command is executed.
-* Implement MULTI/EXEC as transaction abstract API to diskstore.c, with transaction_start, transaction_end, and a journal to recover.
-* Stop BGSAVE thread on shutdown and any other condition where the child is killed during normal bgsave.
-* Fix RANDOMKEY to really do something interesting
-* Fix DBSIZE to really do something interesting
-* Add a DEBUG command to check if an entry is or not in memory currently
-* dscache.c near 236, kobj = createStringObject... we could use static obj.
QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR);
endif
-OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endian.o slowlog.o scripting.o
+OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endian.o slowlog.o scripting.o bio.o
BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o
CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
ae_select.o: ae_select.c
anet.o: anet.c fmacros.h anet.h
aof.o: aof.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
+bio.o: bio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h bio.h
cluster.o: cluster.c redis.h fmacros.h config.h ae.h sds.h dict.h \
- adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
- slowlog.h
+ adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
config.o: config.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
crc16.o: crc16.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
db.o: db.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
debug.o: debug.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h \
- sha1.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h sha1.h
dict.o: dict.c fmacros.h dict.h zmalloc.h
endian.o: endian.c
intset.o: intset.c intset.h zmalloc.h endian.h
lzf_c.o: lzf_c.c lzfP.h
lzf_d.o: lzf_d.c lzfP.h
multi.o: multi.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
networking.o: networking.c redis.h fmacros.h config.h ae.h sds.h dict.h \
- adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
- slowlog.h
+ adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
object.o: object.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
pqsort.o: pqsort.c
pubsub.o: pubsub.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
rdb.o: rdb.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h \
- lzf.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h lzf.h
redis-benchmark.o: redis-benchmark.c fmacros.h ae.h \
../deps/hiredis/hiredis.h sds.h adlist.h zmalloc.h
redis-check-aof.o: redis-check-aof.c fmacros.h config.h
sds.h zmalloc.h ../deps/linenoise/linenoise.h help.h
redis.o: redis.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h \
- asciilogo.h
+ bio.h asciilogo.h
release.o: release.c release.h
replication.o: replication.c redis.h fmacros.h config.h ae.h sds.h dict.h \
+ adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
+scripting.o: scripting.c redis.h fmacros.h config.h ae.h sds.h dict.h \
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
- slowlog.h
+ sha1.h
sds.o: sds.c sds.h zmalloc.h
sha1.o: sha1.c sha1.h config.h
slowlog.o: slowlog.c redis.h fmacros.h config.h ae.h sds.h dict.h \
adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
slowlog.h
sort.o: sort.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h \
- pqsort.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h pqsort.h
syncio.o: syncio.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
t_hash.o: t_hash.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
t_list.o: t_list.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
t_set.o: t_set.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
t_string.o: t_string.c redis.h fmacros.h config.h ae.h sds.h dict.h \
- adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h \
- slowlog.h
+ adlist.h zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
t_zset.o: t_zset.c redis.h fmacros.h config.h ae.h sds.h dict.h adlist.h \
- zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h slowlog.h
+ zmalloc.h anet.h zipmap.h ziplist.h intset.h version.h util.h
util.o: util.c fmacros.h util.h
ziplist.o: ziplist.c zmalloc.h util.h ziplist.h endian.h
zipmap.o: zipmap.c zmalloc.h endian.h
../deps/jemalloc/lib/libjemalloc.a:
cd ../deps/jemalloc && ./configure $(JEMALLOC_CFLAGS) --with-jemalloc-prefix=je_ --enable-cc-silence && $(MAKE) lib/libjemalloc.a
-redis-server: $(OBJ)
+redis-server: dependencies $(OBJ)
$(QUIET_LINK)$(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ) $(CCLINK) $(ALLOC_LINK) ../deps/lua/src/liblua.a
redis-benchmark: dependencies $(BENCHOBJ)
#include "redis.h"
+#include "bio.h"
#include <signal.h>
#include <fcntl.h>
void aofUpdateCurrentSize(void);
+void aof_background_fsync(int fd) {
+ bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
+}
+
/* Called when the user switches from "appendonly yes" to "appendonly no"
* at runtime using the CONFIG command. */
void stopAppendOnly(void) {
- flushAppendOnlyFile();
+ flushAppendOnlyFile(1);
aof_fsync(server.appendfd);
close(server.appendfd);
* and the only way the client socket can get a write is entering when the
* the event loop, we accumulate all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
- * the event loop again. */
-void flushAppendOnlyFile(void) {
- time_t now;
+ * the event loop again.
+ *
+ * About the 'force' argument:
+ *
+ * When the fsync policy is set to 'everysec' we may delay the flush if there
+ * is still an fsync() going on in the background thread, since for instance
+ * on Linux write(2) will be blocked by the background fsync anyway.
+ * When this happens we remember that there is some aof buffer to be
+ * flushed ASAP, and will try to do that in the serverCron() function.
+ *
+ * However if force is set to 1 we'll write regardless of the background
+ * fsync. */
+void flushAppendOnlyFile(int force) {
ssize_t nwritten;
+ int sync_in_progress = 0;
if (sdslen(server.aofbuf) == 0) return;
+ if (server.appendfsync == APPENDFSYNC_EVERYSEC)
+ sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;
+
+ if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) {
+ /* With this append fsync policy we do background fsyncing.
+ * If the fsync is still in progress we can try to delay
+ * the write for a couple of seconds. */
+ if (sync_in_progress) {
+ if (server.aof_flush_postponed_start == 0) {
+ /* No previous write postponinig, remember that we are
+ * postponing the flush and return. */
+ server.aof_flush_postponed_start = server.unixtime;
+ return;
+ } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
+ /* We were already waiting for fsync to finish, but for less
+ * than two seconds this is still ok. Postpone again. */
+ return;
+ }
+ /* Otherwise fall trough, and go write since we can't wait
+ * over two seconds. */
+ redisLog(REDIS_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
+ }
+ }
+ /* If you are following this code path, then we are going to write so
+ * set reset the postponed flush sentinel to zero. */
+ server.aof_flush_postponed_start = 0;
+
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
- nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
- if (nwritten != (signed)sdslen(server.aofbuf)) {
+ nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf));
+ if (nwritten != (signed)sdslen(server.aofbuf)) {
/* Ooops, we are in troubles. The best thing to do for now is
* aborting instead of giving the illusion that everything is
* working as expected. */
- if (nwritten == -1) {
+ if (nwritten == -1) {
redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
- } else {
+ } else {
redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
- }
- exit(1);
+ }
+ exit(1);
}
- sdsfree(server.aofbuf);
- server.aofbuf = sdsempty();
server.appendonly_current_size += nwritten;
- /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have
- * childs performing heavy I/O on disk. */
+ /* Re-use AOF buffer when it is small enough. The maximum comes from the
+ * arena size of 4k minus some overhead (but is otherwise arbitrary). */
+ if ((sdslen(server.aofbuf)+sdsavail(server.aofbuf)) < 4000) {
+ sdsclear(server.aofbuf);
+ } else {
+ sdsfree(server.aofbuf);
+ server.aofbuf = sdsempty();
+ }
+
+ /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
+ * children doing I/O in the background. */
if (server.no_appendfsync_on_rewrite &&
(server.bgrewritechildpid != -1 || server.bgsavechildpid != -1))
return;
- /* Fsync if needed */
- now = time(NULL);
- if (server.appendfsync == APPENDFSYNC_ALWAYS ||
- (server.appendfsync == APPENDFSYNC_EVERYSEC &&
- now-server.lastfsync > 1))
- {
+
+ /* Perform the fsync if needed. */
+ if (server.appendfsync == APPENDFSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
aof_fsync(server.appendfd); /* Let's try to get this data on the disk */
- server.lastfsync = now;
+ server.lastfsync = server.unixtime;
+ } else if ((server.appendfsync == APPENDFSYNC_EVERYSEC &&
+ server.unixtime > server.lastfsync)) {
+ if (!sync_in_progress) aof_background_fsync(server.appendfd);
+ server.lastfsync = server.unixtime;
}
}
-sds catAppendOnlyGenericCommand(sds buf, int argc, robj **argv) {
- int j;
- buf = sdscatprintf(buf,"*%d\r\n",argc);
+sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
+ char buf[32];
+ int len, j;
+ robj *o;
+
+ buf[0] = '*';
+ len = 1+ll2string(buf+1,sizeof(buf)-1,argc);
+ buf[len++] = '\r';
+ buf[len++] = '\n';
+ dst = sdscatlen(dst,buf,len);
+
for (j = 0; j < argc; j++) {
- robj *o = getDecodedObject(argv[j]);
- buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
- buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
- buf = sdscatlen(buf,"\r\n",2);
+ o = getDecodedObject(argv[j]);
+ buf[0] = '$';
+ len = 1+ll2string(buf+1,sizeof(buf)-1,sdslen(o->ptr));
+ buf[len++] = '\r';
+ buf[len++] = '\n';
+ dst = sdscatlen(dst,buf,len);
+ dst = sdscatlen(dst,o->ptr,sdslen(o->ptr));
+ dst = sdscatlen(dst,"\r\n",2);
decrRefCount(o);
}
- return buf;
+ return dst;
}
sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj *seconds) {
* Handle this. */
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
- int fd;
+ int newfd, oldfd;
+ int nwritten;
char tmpfile[256];
+ long long now = ustime();
redisLog(REDIS_NOTICE,
- "Background append only file rewriting terminated with success");
- /* Now it's time to flush the differences accumulated by the parent */
- snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
- fd = open(tmpfile,O_WRONLY|O_APPEND);
- if (fd == -1) {
- redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
+ "Background AOF rewrite terminated with success");
+
+ /* Flush the differences accumulated by the parent to the
+ * rewritten AOF. */
+ snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof",
+ (int)server.bgrewritechildpid);
+ newfd = open(tmpfile,O_WRONLY|O_APPEND);
+ if (newfd == -1) {
+ redisLog(REDIS_WARNING,
+ "Unable to open the temporary AOF produced by the child: %s", strerror(errno));
goto cleanup;
}
- /* Flush our data... */
- if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
- (signed) sdslen(server.bgrewritebuf)) {
- redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
- close(fd);
+
+ nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf));
+ if (nwritten != (signed)sdslen(server.bgrewritebuf)) {
+ if (nwritten == -1) {
+ redisLog(REDIS_WARNING,
+ "Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
+ } else {
+ redisLog(REDIS_WARNING,
+ "Short write trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
+ }
+ close(newfd);
goto cleanup;
}
- redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
- /* Now our work is to rename the temp file into the stable file. And
- * switch the file descriptor used by the server for append only. */
+
+ redisLog(REDIS_NOTICE,
+ "Parent diff successfully flushed to the rewritten AOF (%lu bytes)", nwritten);
+
+ /* The only remaining thing to do is to rename the temporary file to
+ * the configured file and switch the file descriptor used to do AOF
+ * writes. We don't want close(2) or rename(2) calls to block the
+ * server on old file deletion.
+ *
+ * There are two possible scenarios:
+ *
+ * 1) AOF is DISABLED and this was a one time rewrite. The temporary
+ * file will be renamed to the configured file. When this file already
+ * exists, it will be unlinked, which may block the server.
+ *
+ * 2) AOF is ENABLED and the rewritten AOF will immediately start
+ * receiving writes. After the temporary file is renamed to the
+ * configured file, the original AOF file descriptor will be closed.
+ * Since this will be the last reference to that file, closing it
+ * causes the underlying file to be unlinked, which may block the
+ * server.
+ *
+ * To mitigate the blocking effect of the unlink operation (either
+ * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we
+ * use a background thread to take care of this. First, we
+ * make scenario 1 identical to scenario 2 by opening the target file
+ * when it exists. The unlink operation after the rename(2) will then
+ * be executed upon calling close(2) for its descriptor. Everything to
+ * guarantee atomicity for this switch has already happened by then, so
+ * we don't care what the outcome or duration of that close operation
+ * is, as long as the file descriptor is released again. */
+ if (server.appendfd == -1) {
+ /* AOF disabled */
+
+ /* Don't care if this fails: oldfd will be -1 and we handle that.
+ * One notable case of -1 return is if the old file does
+ * not exist. */
+ oldfd = open(server.appendfilename,O_RDONLY|O_NONBLOCK);
+ } else {
+ /* AOF enabled */
+ oldfd = -1; /* We'll set this to the current AOF filedes later. */
+ }
+
+ /* Rename the temporary file. This will not unlink the target file if
+ * it exists, because we reference it with "oldfd". */
if (rename(tmpfile,server.appendfilename) == -1) {
- redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
- close(fd);
+ redisLog(REDIS_WARNING,
+ "Error trying to rename the temporary AOF: %s", strerror(errno));
+ close(newfd);
+ if (oldfd != -1) close(oldfd);
goto cleanup;
}
- /* Mission completed... almost */
- redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
- if (server.appendfd != -1) {
- /* If append only is actually enabled... */
- close(server.appendfd);
- server.appendfd = fd;
- if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd);
- server.appendseldb = -1; /* Make sure it will issue SELECT */
- redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
+
+ if (server.appendfd == -1) {
+ /* AOF disabled, we don't need to set the AOF file descriptor
+ * to this new file, so we can close it. */
+ close(newfd);
+ } else {
+ /* AOF enabled, replace the old fd with the new one. */
+ oldfd = server.appendfd;
+ server.appendfd = newfd;
+ if (server.appendfsync == APPENDFSYNC_ALWAYS)
+ aof_fsync(newfd);
+ else if (server.appendfsync == APPENDFSYNC_EVERYSEC)
+ aof_background_fsync(newfd);
+ server.appendseldb = -1; /* Make sure SELECT is re-issued */
aofUpdateCurrentSize();
server.auto_aofrewrite_base_size = server.appendonly_current_size;
- } else {
- /* If append only is disabled we just generate a dump in this
- * format. Why not? */
- close(fd);
+
+ /* Clear regular AOF buffer since its contents was just written to
+ * the new AOF from the background rewrite buffer. */
+ sdsfree(server.aofbuf);
+ server.aofbuf = sdsempty();
}
+
+ redisLog(REDIS_NOTICE, "Background AOF rewrite successful");
+
+ /* Asynchronously close the overwritten AOF. */
+ if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL);
+
+ redisLog(REDIS_VERBOSE,
+ "Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {
- redisLog(REDIS_WARNING, "Background append only file rewriting error");
+ redisLog(REDIS_WARNING,
+ "Background AOF rewrite terminated with error");
} else {
redisLog(REDIS_WARNING,
- "Background append only file rewriting terminated by signal %d",
- bysignal);
+ "Background AOF rewrite terminated by signal %d", bysignal);
}
+
cleanup:
sdsfree(server.bgrewritebuf);
server.bgrewritebuf = sdsempty();
--- /dev/null
+/* Background I/O service for Redis.
+ *
+ * This file implements operations that we need to perform in the background.
+ * Currently there is only a single operation, that is a background close(2)
+ * system call. This is needed as when the process is the last owner of a
+ * reference to a file closing it means unlinking it, and the deletion of the
+ * file is slow, blocking the server.
+ *
+ * In the future we'll either continue implementing new things we need or
+ * we'll switch to libeio. However there are probably long term uses for this
+ * file as we may want to put here Redis specific background tasks (for instance
+ * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
+ * implementation).
+ *
+ * DESIGN
+ * ------
+ *
+ * The design is trivial, we have a structure representing a job to perform
+ * and a different thread and job queue for every job type.
+ * Every thread wait for new jobs in its queue, and process every job
+ * sequentially.
+ *
+ * Jobs of the same type are guaranteed to be processed from the least
+ * recently inserted to the most recently inserted (older jobs processed
+ * first).
+ *
+ * Currently there is no way for the creator of the job to be notified about
+ * the completion of the operation, this will only be added when/if needed.
+ */
+
+#include "redis.h"
+#include "bio.h"
+
+static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
+static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
+static list *bio_jobs[REDIS_BIO_NUM_OPS];
+/* The following array is used to hold the number of pending jobs for every
+ * OP type. This allows us to export the bioPendingJobsOfType() API that is
+ * useful when the main thread wants to perform some operation that may involve
+ * objects shared with the background thread. The main thread will just wait
+ * that there are no longer jobs of this type to be executed before performing
+ * the sensible operation. This data is also useful for reporting. */
+static unsigned long long bio_pending[REDIS_BIO_NUM_OPS];
+
+/* This structure represents a background Job. It is only used locally to this
+ * file as the API deos not expose the internals at all. */
+struct bio_job {
+ time_t time; /* Time at which the job was created. */
+ /* Job specific arguments pointers. If we need to pass more than three
+ * arguments we can just pass a pointer to a structure or alike. */
+ void *arg1, *arg2, *arg3;
+};
+
+void *bioProcessBackgroundJobs(void *arg);
+
+/* Make sure we have enough stack to perform all the things we do in the
+ * main thread. */
+#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
+
+/* Initialize the background system, spawning the thread. */
+void bioInit(void) {
+ pthread_attr_t attr;
+ pthread_t thread;
+ size_t stacksize;
+ int j;
+
+ /* Initialization of state vars and objects */
+ for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
+ pthread_mutex_init(&bio_mutex[j],NULL);
+ pthread_cond_init(&bio_condvar[j],NULL);
+ bio_jobs[j] = listCreate();
+ bio_pending[j] = 0;
+ }
+
+ /* Set the stack size as by default it may be small in some system */
+ pthread_attr_init(&attr);
+ pthread_attr_getstacksize(&attr,&stacksize);
+ if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
+ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
+ pthread_attr_setstacksize(&attr, stacksize);
+
+ /* Ready to spawn our threads. We use the single argument the thread
+ * function accepts in order to pass the job ID the thread is
+ * responsible of. */
+ for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
+ void *arg = (void*)(unsigned long) j;
+ if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
+ redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
+ exit(1);
+ }
+ }
+}
+
+void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
+ struct bio_job *job = zmalloc(sizeof(*job));
+
+ job->time = time(NULL);
+ job->arg1 = arg1;
+ job->arg2 = arg2;
+ job->arg3 = arg3;
+ pthread_mutex_lock(&bio_mutex[type]);
+ listAddNodeTail(bio_jobs[type],job);
+ bio_pending[type]++;
+ pthread_cond_signal(&bio_condvar[type]);
+ pthread_mutex_unlock(&bio_mutex[type]);
+}
+
+void *bioProcessBackgroundJobs(void *arg) {
+ struct bio_job *job;
+ unsigned long type = (unsigned long) arg;
+
+ pthread_detach(pthread_self());
+ pthread_mutex_lock(&bio_mutex[type]);
+ while(1) {
+ listNode *ln;
+
+ /* The loop always starts with the lock hold. */
+ if (listLength(bio_jobs[type]) == 0) {
+ pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
+ continue;
+ }
+ /* Pop the job from the queue. */
+ ln = listFirst(bio_jobs[type]);
+ job = ln->value;
+ /* It is now possible to unlock the background system as we know have
+ * a stand alone job structure to process.*/
+ pthread_mutex_unlock(&bio_mutex[type]);
+
+ /* Process the job accordingly to its type. */
+ if (type == REDIS_BIO_CLOSE_FILE) {
+ close((long)job->arg1);
+ } else if (type == REDIS_BIO_AOF_FSYNC) {
+ aof_fsync((long)job->arg1);
+ } else {
+ redisPanic("Wrong job type in bioProcessBackgroundJobs().");
+ }
+ zfree(job);
+
+ /* Lock again before reiterating the loop, if there are no longer
+ * jobs to process we'll block again in pthread_cond_wait(). */
+ pthread_mutex_lock(&bio_mutex[type]);
+ listDelNode(bio_jobs[type],ln);
+ bio_pending[type]--;
+ }
+}
+
+/* Return the number of pending jobs of the specified type. */
+unsigned long long bioPendingJobsOfType(int type) {
+ unsigned long long val;
+ pthread_mutex_lock(&bio_mutex[type]);
+ val = bio_pending[type];
+ pthread_mutex_unlock(&bio_mutex[type]);
+ return val;
+}
+
+#if 0 /* We don't use the following code for now, and bioWaitPendingJobsLE
+ probably needs a rewrite using conditional variables instead of the
+ current implementation. */
+
+
+/* Wait until the number of pending jobs of the specified type are
+ * less or equal to the specified number.
+ *
+ * This function may block for long time, it should only be used to perform
+ * the following tasks:
+ *
+ * 1) To avoid that the main thread is pushing jobs of a given time so fast
+ * that the background thread can't process them at the same speed.
+ * So before creating a new job of a given type the main thread should
+ * call something like: bioWaitPendingJobsLE(job_type,10000);
+ * 2) In order to perform special operations that make it necessary to be sure
+ * no one is touching shared resourced in the background.
+ */
+void bioWaitPendingJobsLE(int type, unsigned long long num) {
+ unsigned long long iteration = 0;
+
+ /* We poll the jobs queue aggressively to start, and gradually relax
+ * the polling speed if it is going to take too much time. */
+ while(1) {
+ iteration++;
+ if (iteration > 1000 && iteration <= 10000) {
+ usleep(100);
+ } else if (iteration > 10000) {
+ usleep(1000);
+ }
+ if (bioPendingJobsOfType(type) <= num) break;
+ }
+}
+
+/* Return the older job of the specified type. */
+time_t bioOlderJobOfType(int type) {
+ time_t time;
+ listNode *ln;
+ struct bio_job *job;
+
+ pthread_mutex_lock(&bio_mutex[type]);
+ ln = listFirst(bio_jobs[type]);
+ if (ln == NULL) {
+ pthread_mutex_unlock(&bio_mutex[type]);
+ return 0;
+ }
+ job = ln->value;
+ time = job->time;
+ pthread_mutex_unlock(&bio_mutex[type]);
+ return time;
+}
+
+#endif
--- /dev/null
+/* Exported API */
+void bioInit(void);
+void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3);
+unsigned long long bioPendingJobsOfType(int type);
+void bioWaitPendingJobsLE(int type, unsigned long long num);
+time_t bioOlderJobOfType(int type);
+
+/* Background job opcodes */
+#define REDIS_BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
+#define REDIS_BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
+#define REDIS_BIO_NUM_OPS 2
if (stringmatch(pattern,"dir",0)) {
char buf[1024];
- addReplyBulkCString(c,"dir");
- if (getcwd(buf,sizeof(buf)) == NULL) {
+ if (getcwd(buf,sizeof(buf)) == NULL)
buf[0] = '\0';
- } else {
- addReplyBulkCString(c,buf);
- }
+
+ addReplyBulkCString(c,"dir");
+ addReplyBulkCString(c,buf);
matches++;
}
if (stringmatch(pattern,"dbfilename",0)) {
}
}
if (totwritten > 0) c->lastinteraction = time(NULL);
- if (listLength(c->reply) == 0) {
+ if (c->bufpos == 0 && listLength(c->reply) == 0) {
c->sentlen = 0;
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
void processInputBuffer(redisClient *c) {
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
+ /* Immediately abort if the client is in the middle of something. */
+ if (c->flags & REDIS_BLOCKED) return;
+
/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
int hostport;
const char *hostsocket;
int numclients;
- int requests;
int liveclients;
- int donerequests;
+ int requests;
+ int requests_issued;
+ int requests_finished;
int keysize;
int datasize;
int randomkeys;
}
static void clientDone(client c) {
- if (config.donerequests == config.requests) {
+ if (config.requests_finished == config.requests) {
freeClient(c);
aeStop(config.el);
return;
exit(1);
}
- if (config.donerequests < config.requests)
- config.latency[config.donerequests++] = c->latency;
+ if (config.requests_finished < config.requests)
+ config.latency[config.requests_finished++] = c->latency;
clientDone(c);
}
}
REDIS_NOTUSED(fd);
REDIS_NOTUSED(mask);
- /* When nothing was written yet, randomize keys and set start time. */
+ /* Initialize request when nothing was written. */
if (c->written == 0) {
+ /* Enforce upper bound to number of requests. */
+ if (config.requests_issued++ >= config.requests) {
+ freeClient(c);
+ return;
+ }
+
+ /* Really initialize: randomize keys and set start time. */
if (config.randomkeys) randomizeClientKey(c);
c->start = ustime();
c->latency = -1;
int i, curlat = 0;
float perc, reqpersec;
- reqpersec = (float)config.donerequests/((float)config.totlatency/1000);
+ reqpersec = (float)config.requests_finished/((float)config.totlatency/1000);
if (!config.quiet) {
printf("====== %s ======\n", config.title);
- printf(" %d requests completed in %.2f seconds\n", config.donerequests,
+ printf(" %d requests completed in %.2f seconds\n", config.requests_finished,
(float)config.totlatency/1000);
printf(" %d parallel clients\n", config.numclients);
printf(" %d bytes payload\n", config.datasize);
client c;
config.title = title;
- config.donerequests = 0;
+ config.requests_issued = 0;
+ config.requests_finished = 0;
c = createClient(cmd,len);
createMissingClients(c);
REDIS_NOTUSED(clientData);
float dt = (float)(mstime()-config.start)/1000.0;
- float rps = (float)config.donerequests/dt;
+ float rps = (float)config.requests_finished/dt;
printf("%s: %.2f\r", config.title, rps);
fflush(stdout);
return 250; /* every 250ms */
config.el = aeCreateEventLoop();
aeCreateTimeEvent(config.el,1,showThroughput,NULL,NULL);
config.keepalive = 1;
- config.donerequests = 0;
config.datasize = 3;
config.randomkeys = 0;
config.randomkeys_keyspacelen = 0;
int shutdown;
int monitor_mode;
int pubsub_mode;
+ int latency_mode;
int stdinarg; /* get last arg from stdin. (-x option) */
char *auth;
int raw_output; /* output mode per command */
i++;
} else if (!strcmp(argv[i],"--raw")) {
config.raw_output = 1;
+ } else if (!strcmp(argv[i],"--latency")) {
+ config.latency_mode = 1;
} else if (!strcmp(argv[i],"-d") && !lastarg) {
sdsfree(config.mb_delim);
config.mb_delim = sdsnew(argv[i+1]);
" -x Read last argument from STDIN\n"
" -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n)\n"
" --raw Use raw formatting for replies (default when STDOUT is not a tty)\n"
+" --latency Enter a special mode continuously sampling latency.\n"
" --help Output this help and exit\n"
" --version Output version and exit\n"
"\n"
return retval;
}
+static void latencyMode(void) {
+ redisReply *reply;
+ long long start, latency, min, max, tot, count = 0;
+ double avg;
+
+ if (!context) exit(1);
+ while(1) {
+ start = mstime();
+ reply = redisCommand(context,"PING");
+ if (reply == NULL) {
+ fprintf(stderr,"\nI/O error\n");
+ exit(1);
+ }
+ latency = mstime()-start;
+ freeReplyObject(reply);
+ count++;
+ if (count == 1) {
+ min = max = tot = latency;
+ avg = (double) latency;
+ } else {
+ if (latency < min) min = latency;
+ if (latency > max) max = latency;
+ tot += latency;
+ avg = (double) tot/count;
+ }
+ printf("\x1b[0G\x1b[2Kmin: %lld, max: %lld, avg: %.2f (%lld samples)",
+ min, max, avg, count);
+ fflush(stdout);
+ usleep(10000);
+ }
+}
+
int main(int argc, char **argv) {
int firstarg;
config.shutdown = 0;
config.monitor_mode = 0;
config.pubsub_mode = 0;
+ config.latency_mode = 0;
config.stdinarg = 0;
config.auth = NULL;
config.raw_output = !isatty(fileno(stdout)) && (getenv("FAKETTY") == NULL);
argc -= firstarg;
argv += firstarg;
+ /* Start in latency mode if appropriate */
+ if (config.latency_mode) {
+ cliConnect(0);
+ latencyMode();
+ }
+
/* Start interactive mode when no command is provided */
if (argc == 0) {
/* Note that in repl mode we don't abort on connection error.
#include "redis.h"
#include "slowlog.h"
+#include "bio.h"
#ifdef HAVE_BACKTRACE
#include <execinfo.h>
* in objects at every object access, and accuracy is not needed.
* To access a global var is faster than calling time(NULL) */
server.unixtime = time(NULL);
+
/* We have just 22 bits per object for LRU information.
* So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
* 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
server.auto_aofrewrite_perc &&
server.appendonly_current_size > server.auto_aofrewrite_min_size)
{
- int base = server.auto_aofrewrite_base_size ?
+ long long base = server.auto_aofrewrite_base_size ?
server.auto_aofrewrite_base_size : 1;
long long growth = (server.appendonly_current_size*100/base) - 100;
if (growth >= server.auto_aofrewrite_perc) {
}
}
+
+ /* If we postponed an AOF buffer flush, let's try to do it every time the
+ * cron function is called. */
+ if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
+
/* Expire a few keys per cycle, only if this is a master.
* On slaves we wait for DEL operations synthesized by the master
* in order to guarantee a strict consistency. */
}
/* Write the AOF buffer on disk */
- flushAppendOnlyFile();
+ flushAppendOnlyFile(0);
}
/* =========================== Server initialization ======================== */
server.lastfsync = time(NULL);
server.appendfd = -1;
server.appendseldb = -1; /* Make sure the first time will not match */
+ server.aof_flush_postponed_start = 0;
server.pidfile = zstrdup("/var/run/redis.pid");
server.dbfilename = zstrdup("dump.rdb");
server.appendfilename = zstrdup("appendonly.aof");
if (server.port != 0) {
server.ipfd = anetTcpServer(server.neterr,server.port,server.bindaddr);
if (server.ipfd == ANET_ERR) {
- redisLog(REDIS_WARNING, "Opening port: %s", server.neterr);
+ redisLog(REDIS_WARNING, "Opening port %d: %s",
+ server.port, server.neterr);
exit(1);
}
}
if (server.cluster_enabled) clusterInit();
scriptingInit();
slowlogInit();
+ bioInit();
srand(time(NULL)^getpid());
}
slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
c->cmd->calls++;
- if (server.appendonly && dirty)
+ if (server.appendonly && dirty > 0)
feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
- if ((dirty || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
+ if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
listLength(server.slaves))
replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
if (listLength(server.monitors))
int main(int argc, char **argv) {
long long start;
+ zmalloc_enable_thread_safeness();
initServerConfig();
if (argc == 2) {
if (strcmp(argv[1], "-v") == 0 ||
time_t lastfsync;
int appendfd;
int appendseldb;
+ time_t aof_flush_postponed_start;
char *pidfile;
pid_t bgsavechildpid;
pid_t bgrewritechildpid;
size_t zset_max_ziplist_entries;
size_t zset_max_ziplist_value;
time_t unixtime; /* Unix time sampled every second. */
- /* Virtual memory I/O threads stuff */
- /* An I/O thread process an element taken from the io_jobs queue and
- * put the result of the operation in the io_done list. While the
- * job is being processed, it's put on io_processing queue. */
- list *io_newjobs; /* List of VM I/O jobs yet to be processed */
- list *io_processing; /* List of VM I/O jobs being processed */
- list *io_processed; /* List of VM I/O jobs already processed */
- list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
- pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
- pthread_cond_t io_condvar; /* I/O threads conditional variable */
- pthread_attr_t io_threads_attr; /* attributes for threads creation */
- int io_active_threads; /* Number of running I/O threads */
- int vm_max_threads; /* Max number of I/O threads running at the same time */
- /* Our main thread is blocked on the event loop, locking for sockets ready
- * to be read or written, so when a threaded I/O operation is ready to be
- * processed by the main thread, the I/O thread will use a unix pipe to
- * awake the main thread. The followings are the two pipe FDs. */
- int io_ready_pipe_read;
- int io_ready_pipe_write;
- /* Virtual memory stats */
- unsigned long long vm_stats_used_pages;
- unsigned long long vm_stats_swapped_objects;
- unsigned long long vm_stats_swapouts;
- unsigned long long vm_stats_swapins;
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */
int rdbSaveLen(FILE *fp, uint32_t len);
/* AOF persistence */
-void flushAppendOnlyFile(void);
+void flushAppendOnlyFile(int force);
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
void aofRemoveTempFile(pid_t childpid);
int rewriteAppendOnlyFileBackground(void);
sh->len = reallen;
}
+void sdsclear(sds s) {
+ struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr)));
+ sh->free += sh->len;
+ sh->len = 0;
+ sh->buf[0] = '\0';
+}
+
static sds sdsMakeRoomFor(sds s, size_t addlen) {
struct sdshdr *sh, *newsh;
size_t free = sdsavail(s);
sds sdstrim(sds s, const char *cset);
sds sdsrange(sds s, int start, int end);
void sdsupdatelen(sds s);
+void sdsclear(sds s);
int sdscmp(sds s1, sds s2);
sds *sdssplitlen(char *s, int len, char *sep, int seplen, int *count);
void sdsfreesplitres(sds *tokens, int count);
p = ziplistNext(o->ptr,p);
}
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
- listNode *ln = listIndex(o->ptr,start);
+ listNode *ln;
+
+ /* If we are nearest to the end of the list, reach the element
+ * starting from tail and going backward, as it is faster. */
+ if (start > llen/2) start -= llen;
+ ln = listIndex(o->ptr,start);
while(rangelen--) {
addReplyBulk(c,ln->value);
void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
robj *aux;
- if (!handleClientsWaitingListPush(c,dstkey,value)) {
+ if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
/* Create the list if the key does not exist */
if (!dstobj) {
dstobj = createZiplistObject();
}
listTypePush(dstobj,value,REDIS_HEAD);
/* If we are pushing as a result of LPUSH against a key
- * watched by BLPOPLPUSH, we need to rewrite the command vector.
- * But if this is called directly by RPOPLPUSH (either directly
+ * watched by BRPOPLPUSH, we need to rewrite the command vector
+ * as an LPUSH.
+ *
+ * If this is called directly by RPOPLPUSH (either directly
* or via a BRPOPLPUSH where the popped list exists)
- * we should replicate the BRPOPLPUSH command itself. */
+ * we should replicate the RPOPLPUSH command itself. */
if (c != origclient) {
aux = createStringObject("LPUSH",5);
rewriteClientCommandVector(origclient,3,aux,dstkey,value);
--- /dev/null
+set defaults { appendonly {yes} appendfilename {appendonly.aof} }
+set server_path [tmpdir server.aof]
+set aof_path "$server_path/appendonly.aof"
+
+proc start_server_aof {overrides code} {
+ upvar defaults defaults srv srv server_path server_path
+ set config [concat $defaults $overrides]
+ start_server [list overrides $config] $code
+}
+
+tags {"aof"} {
+ # Specific test for a regression where internal buffers were not properly
+ # cleaned after a child responsible for an AOF rewrite exited. This buffer
+ # was subsequently appended to the new AOF, resulting in duplicate commands.
+ start_server_aof [list dir $server_path] {
+ set client [redis [srv host] [srv port]]
+ set bench [open "|src/redis-benchmark -q -p [srv port] -c 20 -n 20000 incr foo" "r+"]
+ after 100
+
+ # Benchmark should be running by now: start background rewrite
+ $client bgrewriteaof
+
+ # Read until benchmark pipe reaches EOF
+ while {[string length [read $bench]] > 0} {}
+
+ # Check contents of foo
+ assert_equal 20000 [$client get foo]
+ }
+
+ # Restart server to replay AOF
+ start_server_aof [list dir $server_path] {
+ set client [redis [srv host] [srv port]]
+ assert_equal 20000 [$client get foo]
+ }
+}
proc assert {condition} {
if {![uplevel 1 expr $condition]} {
- error "assertion:Expected '$value' to be true"
+ error "assertion:Expected condition '$condition' to be true"
}
}
unit/pubsub
unit/slowlog
unit/scripting
+ unit/maxmemory
}
# Index to the next test to run in the ::all_tests list.
set ::next_test 0
--- /dev/null
+start_server {tags {"maxmemory"}} {
+ foreach policy {
+ allkeys-random allkeys-lru volatile-lru volatile-random volatile-ttl
+ } {
+ test "maxmemory - is the memory limit honoured? (policy $policy)" {
+ # make sure to start with a blank instance
+ r flushall
+ # Get the current memory limit and calculate a new limit.
+ # We just add 100k to the current memory size so that it is
+ # fast for us to reach that limit.
+ set used [s used_memory]
+ set limit [expr {$used+100*1024}]
+ r config set maxmemory $limit
+ r config set maxmemory-policy $policy
+ # Now add keys until the limit is almost reached.
+ set numkeys 0
+ while 1 {
+ r setex [randomKey] 10000 x
+ incr numkeys
+ if {[s used_memory]+4096 > $limit} {
+ assert {$numkeys > 10}
+ break
+ }
+ }
+ # If we add the same number of keys already added again, we
+ # should still be under the limit.
+ for {set j 0} {$j < $numkeys} {incr j} {
+ r setex [randomKey] 10000 x
+ }
+ assert {[s used_memory] < ($limit+4096)}
+ }
+ }
+
+ foreach policy {
+ allkeys-random allkeys-lru volatile-lru volatile-random volatile-ttl
+ } {
+ test "maxmemory - only allkeys-* should remove non-volatile keys ($policy)" {
+ # make sure to start with a blank instance
+ r flushall
+ # Get the current memory limit and calculate a new limit.
+ # We just add 100k to the current memory size so that it is
+ # fast for us to reach that limit.
+ set used [s used_memory]
+ set limit [expr {$used+100*1024}]
+ r config set maxmemory $limit
+ r config set maxmemory-policy $policy
+ # Now add keys until the limit is almost reached.
+ set numkeys 0
+ while 1 {
+ r set [randomKey] x
+ incr numkeys
+ if {[s used_memory]+4096 > $limit} {
+ assert {$numkeys > 10}
+ break
+ }
+ }
+ # If we add the same number of keys already added again and
+ # the policy is allkeys-* we should still be under the limit.
+ # Otherwise we should see an error reported by Redis.
+ set err 0
+ for {set j 0} {$j < $numkeys} {incr j} {
+ if {[catch {r set [randomKey] x} e]} {
+ if {[string match {*used memory*} $e]} {
+ set err 1
+ }
+ }
+ }
+ if {[string match allkeys-* $policy]} {
+ assert {[s used_memory] < ($limit+4096)}
+ } else {
+ assert {$err == 1}
+ }
+ }
+ }
+
+ foreach policy {
+ volatile-lru volatile-random volatile-ttl
+ } {
+ test "maxmemory - policy $policy should only remove volatile keys." {
+ # make sure to start with a blank instance
+ r flushall
+ # Get the current memory limit and calculate a new limit.
+ # We just add 100k to the current memory size so that it is
+ # fast for us to reach that limit.
+ set used [s used_memory]
+ set limit [expr {$used+100*1024}]
+ r config set maxmemory $limit
+ r config set maxmemory-policy $policy
+ # Now add keys until the limit is almost reached.
+ set numkeys 0
+ while 1 {
+ # Odd keys are volatile
+ # Even keys are non volatile
+ if {$numkeys % 2} {
+ r setex "key:$numkeys" 10000 x
+ } else {
+ r set "key:$numkeys" x
+ }
+ if {[s used_memory]+4096 > $limit} {
+ assert {$numkeys > 10}
+ break
+ }
+ incr numkeys
+ }
+ # Now we add the same number of volatile keys already added.
+ # We expect Redis to evict only volatile keys in order to make
+ # space.
+ set err 0
+ for {set j 0} {$j < $numkeys} {incr j} {
+ catch {r setex "foo:$j" 10000 x}
+ }
+ # We should still be under the limit.
+ assert {[s used_memory] < ($limit+4096)}
+ # However all our non volatile keys should be here.
+ for {set j 0} {$j < $numkeys} {incr j 2} {
+ assert {[r exists "key:$j"]}
+ }
+ }
+ }
+}
assert_error "*wrong*arguments*ping*" {r ping x y z}
}
}
+
+start_server {tags {"regression"}} {
+ test "Regression for a crash with blocking ops and pipelining" {
+ set rd [redis_deferring_client]
+ set fd [r channel]
+ set proto "*3\r\n\$5\r\nBLPOP\r\n\$6\r\nnolist\r\n\$1\r\n0\r\n"
+ puts -nonewline $fd $proto$proto
+ flush $fd
+ set res {}
+
+ $rd rpush nolist a
+ $rd read
+ $rd rpush nolist a
+ $rd read
+ }
+}
assert_equal 3 [r llen myotherlist]
}
}
+
+ test "Regression for bug 593 - chaining BRPOPLPUSH with other blocking cmds" {
+ set rd1 [redis_deferring_client]
+ set rd2 [redis_deferring_client]
+
+ $rd1 brpoplpush a b 0
+ $rd1 brpoplpush a b 0
+ $rd2 brpoplpush b c 0
+ after 1000
+ r lpush a data
+ $rd1 close
+ $rd2 close
+ r ping
+ } {PONG}
}