#define REDIS_IO_LOADINPROG 4
#define REDIS_IO_SAVEINPROG 8
+void cacheScheduleIOPushJobs(int onlyloads);
+
void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag) {
struct dictEntry *de = dictFind(db->io_queued,key);
* in queue for the same key. */
if (type == REDIS_IO_LOAD && !(flags & REDIS_IO_SAVE)) {
listAddNodeHead(server.cache_io_queue, op);
+ cacheScheduleIOPushJobs(1);
} else {
/* FIXME: probably when this happens we want to at least move
* the write job about this queue on top, and set the creation time
}
}
-void cacheCron(void) {
+/* Push scheduled IO operations into IO Jobs that the IO thread can process.
+ * If 'onlyloads' is true only IO_LOAD jobs are processed: this is useful
+ * since it's save to push LOAD IO jobs from any place of the code, while
+ * SAVE io jobs should never be pushed while we are processing a command
+ * (not protected by lookupKey() that will block on keys in IO_SAVEINPROG
+ * state. */
+#define MAX_IO_JOBS_QUEUE 100
+void cacheScheduleIOPushJobs(int onlyloads) {
time_t now = time(NULL);
listNode *ln;
int jobs, topush = 0;
- /* Sync stuff on disk, but only if we have less than 100 IO jobs */
+ /* Sync stuff on disk, but only if we have less
+ * than MAX_IO_JOBS_QUEUE IO jobs. */
lockThreadedIO();
jobs = listLength(server.io_newjobs);
unlockThreadedIO();
- topush = 100-jobs;
+ topush = MAX_IO_JOBS_QUEUE-jobs;
if (topush < 0) topush = 0;
if (topush > (signed)listLength(server.cache_io_queue))
topush = listLength(server.cache_io_queue);
topush--;
if (op->type == REDIS_IO_LOAD ||
- (now - op->ctime) >= server.cache_flush_delay)
+ (!onlyloads && (now - op->ctime) >= server.cache_flush_delay))
{
struct dictEntry *de;
robj *val;
break; /* too early */
}
}
+}
+
+void cacheCron(void) {
+ /* Push jobs */
+ cacheScheduleIOPushJobs(0);
/* Reclaim memory from the object cache */
while (server.ds_enabled && zmalloc_used_memory() >