/* FIXME: change this code to just wait for our object to
* get out of the IO Job. */
waitEmptyIOJobsQueue();
+ processAllPendingIOJobs();
redisAssert(val->storage != REDIS_DS_SAVING);
}
server.stat_keyspace_hits++;
server.io_active_threads++;
}
-/* We need to wait for the last thread to exit before we are able to
- * fork() in order to BGSAVE or BGREWRITEAOF. */
+/* Wait that all the pending IO Jobs are processed */
void waitEmptyIOJobsQueue(void) {
while(1) {
int io_processed_len;
lockThreadedIO();
if (listLength(server.io_newjobs) == 0 &&
- listLength(server.io_processing) == 0 &&
- server.io_active_threads == 0)
+ listLength(server.io_processing) == 0)
{
unlockThreadedIO();
return;
}
}
+/* Process all the IO Jobs already completed by threads but still waiting
+ * processing from the main thread. */
+void processAllPendingIOJobs(void) {
+ while(1) {
+ int io_processed_len;
+
+ lockThreadedIO();
+ io_processed_len = listLength(server.io_processed);
+ unlockThreadedIO();
+ if (io_processed_len == 0) return;
+ vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,
+ (void*)0xdeadbeef,0);
+ }
+}
+
/* This function must be called while with threaded IO locked */
void queueIOJob(iojob *j) {
redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
void freeIOJob(iojob *j);
void queueIOJob(iojob *j);
void waitEmptyIOJobsQueue(void);
+void processAllPendingIOJobs(void);
void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);