From: antirez <antirez@gmail.com> Date: Thu, 30 Dec 2010 17:37:46 +0000 (+0100) Subject: diskstore cache bug fixing X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/8d51fb6a80fb40abd0bb487d71435d3a30c2942e diskstore cache bug fixing --- diff --git a/src/db.c b/src/db.c index cf99bbb2..f3606077 100644 --- a/src/db.c +++ b/src/db.c @@ -21,6 +21,7 @@ robj *lookupKey(redisDb *db, robj *key) { /* FIXME: change this code to just wait for our object to * get out of the IO Job. */ waitEmptyIOJobsQueue(); + processAllPendingIOJobs(); redisAssert(val->storage != REDIS_DS_SAVING); } server.stat_keyspace_hits++; diff --git a/src/dscache.c b/src/dscache.c index fe3f1c1d..05112cbb 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -404,16 +404,14 @@ void spawnIOThread(void) { server.io_active_threads++; } -/* We need to wait for the last thread to exit before we are able to - * fork() in order to BGSAVE or BGREWRITEAOF. */ +/* Wait that all the pending IO Jobs are processed */ void waitEmptyIOJobsQueue(void) { while(1) { int io_processed_len; lockThreadedIO(); if (listLength(server.io_newjobs) == 0 && - listLength(server.io_processing) == 0 && - server.io_active_threads == 0) + listLength(server.io_processing) == 0) { unlockThreadedIO(); return; @@ -434,6 +432,21 @@ void waitEmptyIOJobsQueue(void) { } } +/* Process all the IO Jobs already completed by threads but still waiting + * processing from the main thread. */ +void processAllPendingIOJobs(void) { + while(1) { + int io_processed_len; + + lockThreadedIO(); + io_processed_len = listLength(server.io_processed); + unlockThreadedIO(); + if (io_processed_len == 0) return; + vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read, + (void*)0xdeadbeef,0); + } +} + /* This function must be called while with threaded IO locked */ void queueIOJob(iojob *j) { redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n", diff --git a/src/redis.h b/src/redis.h index 3a5a274b..bb9aee7a 100644 --- a/src/redis.h +++ b/src/redis.h @@ -799,6 +799,7 @@ void unlockThreadedIO(void); void freeIOJob(iojob *j); void queueIOJob(iojob *j); void waitEmptyIOJobsQueue(void); +void processAllPendingIOJobs(void); void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);