From 8d51fb6a80fb40abd0bb487d71435d3a30c2942e Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 30 Dec 2010 18:37:46 +0100 Subject: [PATCH] diskstore cache bug fixing --- src/db.c | 1 + src/dscache.c | 21 +++++++++++++++++---- src/redis.h | 1 + 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/db.c b/src/db.c index cf99bbb2..f3606077 100644 --- a/src/db.c +++ b/src/db.c @@ -21,6 +21,7 @@ robj *lookupKey(redisDb *db, robj *key) { /* FIXME: change this code to just wait for our object to * get out of the IO Job. */ waitEmptyIOJobsQueue(); + processAllPendingIOJobs(); redisAssert(val->storage != REDIS_DS_SAVING); } server.stat_keyspace_hits++; diff --git a/src/dscache.c b/src/dscache.c index fe3f1c1d..05112cbb 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -404,16 +404,14 @@ void spawnIOThread(void) { server.io_active_threads++; } -/* We need to wait for the last thread to exit before we are able to - * fork() in order to BGSAVE or BGREWRITEAOF. */ +/* Wait that all the pending IO Jobs are processed */ void waitEmptyIOJobsQueue(void) { while(1) { int io_processed_len; lockThreadedIO(); if (listLength(server.io_newjobs) == 0 && - listLength(server.io_processing) == 0 && - server.io_active_threads == 0) + listLength(server.io_processing) == 0) { unlockThreadedIO(); return; @@ -434,6 +432,21 @@ void waitEmptyIOJobsQueue(void) { } } +/* Process all the IO Jobs already completed by threads but still waiting + * processing from the main thread. */ +void processAllPendingIOJobs(void) { + while(1) { + int io_processed_len; + + lockThreadedIO(); + io_processed_len = listLength(server.io_processed); + unlockThreadedIO(); + if (io_processed_len == 0) return; + vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read, + (void*)0xdeadbeef,0); + } +} + /* This function must be called while with threaded IO locked */ void queueIOJob(iojob *j) { redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n", diff --git a/src/redis.h b/src/redis.h index 3a5a274b..bb9aee7a 100644 --- a/src/redis.h +++ b/src/redis.h @@ -799,6 +799,7 @@ void unlockThreadedIO(void); void freeIOJob(iojob *j); void queueIOJob(iojob *j); void waitEmptyIOJobsQueue(void); +void processAllPendingIOJobs(void); void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); -- 2.49.0