+/* ============= Disk store cache - Scheduling of IO operations =============
+ *
+ * We use a queue and an hash table to hold the state of IO operations
+ * so that's fast to lookup if there is already an IO operation in queue
+ * for a given key.
+ *
+ * There are two types of IO operations for a given key:
+ * REDIS_IO_LOAD and REDIS_IO_SAVE.
+ *
+ * The function cacheScheduleIO() function pushes the specified IO operation
+ * in the queue, but avoid adding the same key for the same operation
+ * multiple times, thanks to the associated hash table.
+ *
+ * We take a set of flags per every key, so when the scheduled IO operation
+ * gets moved from the scheduled queue to the actual IO Jobs queue that
+ * is processed by the IO thread, we flag it as IO_LOADINPROG or
+ * IO_SAVEINPROG.
+ *
+ * So for every given key we always know if there is some IO operation
+ * scheduled, or in progress, for this key.
+ *
+ * NOTE: all this is very important in order to guarantee correctness of
+ * the Disk Store Cache. Jobs are always queued here. Load jobs are
+ * queued at the head for faster execution only in the case there is not
+ * already a write operation of some kind for this job.
+ *
+ * So we have ordering, but can do exceptions when there are no already
+ * operations for a given key. Also when we need to block load a given
+ * key, for an immediate lookup operation, we can check if the key can
+ * be accessed synchronously without race conditions (no IN PROGRESS
+ * operations for this key), otherwise we blocking wait for completion. */
+
+#define REDIS_IO_LOAD 1
+#define REDIS_IO_SAVE 2
+#define REDIS_IO_LOADINPROG 4
+#define REDIS_IO_SAVEINPROG 8
+
+void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag) {
+ struct dictEntry *de = dictFind(db->io_queued,key);
+
+ if (!de) {
+ dictAdd(db->io_queued,key,(void*)flag);
+ incrRefCount(key);
+ return;
+ } else {
+ long flags = (long) dictGetEntryVal(de);
+
+ if (flags & flag) {
+ redisLog(REDIS_WARNING,"Adding the same flag again: was: %ld, addede: %ld",flags,flag);
+ redisAssert(!(flags & flag));
+ }
+ flags |= flag;
+ dictGetEntryVal(de) = (void*) flags;
+ }
+}
+
+void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag) {
+ struct dictEntry *de = dictFind(db->io_queued,key);
+ long flags;
+
+ redisAssert(de != NULL);
+ flags = (long) dictGetEntryVal(de);
+ redisAssert(flags & flag);
+ flags &= ~flag;
+ if (flags == 0) {
+ dictDelete(db->io_queued,key);
+ } else {
+ dictGetEntryVal(de) = (void*) flags;
+ }
+}
+
+int cacheScheduleIOGetFlags(redisDb *db, robj *key) {
+ struct dictEntry *de = dictFind(db->io_queued,key);
+
+ return (de == NULL) ? 0 : ((long) dictGetEntryVal(de));
+}
+
+void cacheScheduleIO(redisDb *db, robj *key, int type) {
+ ioop *op;
+ long flags;
+
+ if ((flags = cacheScheduleIOGetFlags(db,key)) & type) return;
+
+ redisLog(REDIS_DEBUG,"Scheduling key %s for %s",
+ key->ptr, type == REDIS_IO_LOAD ? "loading" : "saving");
+ cacheScheduleIOAddFlag(db,key,type);
+ op = zmalloc(sizeof(*op));
+ op->type = type;
+ op->db = db;
+ op->key = key;
+ incrRefCount(key);
+ op->ctime = time(NULL);
+
+ /* Give priority to load operations if there are no save already
+ * in queue for the same key. */
+ if (type == REDIS_IO_LOAD && !(flags & REDIS_IO_SAVE)) {
+ listAddNodeHead(server.cache_io_queue, op);
+ cacheScheduleIOPushJobs(REDIS_IO_ONLYLOADS);
+ } else {
+ /* FIXME: probably when this happens we want to at least move
+ * the write job about this queue on top, and set the creation time
+ * to a value that will force processing ASAP. */
+ listAddNodeTail(server.cache_io_queue, op);
+ }
+}
+
+/* Push scheduled IO operations into IO Jobs that the IO thread can process.
+ *
+ * If flags include REDIS_IO_ONLYLOADS only load jobs are processed:this is
+ * useful since it's safe to push LOAD IO jobs from any place of the code, while
+ * SAVE io jobs should never be pushed while we are processing a command
+ * (not protected by lookupKey() that will block on keys in IO_SAVEINPROG
+ * state.
+ *
+ * The REDIS_IO_ASAP flag tells the function to don't wait for the IO job
+ * scheduled completion time, but just do the operation ASAP. This is useful
+ * when we need to reclaim memory from the IO queue.
+ */
+#define MAX_IO_JOBS_QUEUE 10
+int cacheScheduleIOPushJobs(int flags) {
+ time_t now = time(NULL);
+ listNode *ln;
+ int jobs, topush = 0, pushed = 0;
+
+ /* Don't push new jobs if there is a threaded BGSAVE in progress. */
+ if (server.bgsavethread != (pthread_t) -1) return 0;
+
+ /* Sync stuff on disk, but only if we have less
+ * than MAX_IO_JOBS_QUEUE IO jobs. */
+ lockThreadedIO();
+ jobs = listLength(server.io_newjobs);
+ unlockThreadedIO();
+
+ topush = MAX_IO_JOBS_QUEUE-jobs;
+ if (topush < 0) topush = 0;
+ if (topush > (signed)listLength(server.cache_io_queue))
+ topush = listLength(server.cache_io_queue);
+
+ while((ln = listFirst(server.cache_io_queue)) != NULL) {
+ ioop *op = ln->value;
+ struct dictEntry *de;
+ robj *val;
+
+ if (!topush) break;
+ topush--;
+
+ if (op->type != REDIS_IO_LOAD && flags & REDIS_IO_ONLYLOADS) break;
+
+ /* Don't execute SAVE before the scheduled time for completion */
+ if (op->type == REDIS_IO_SAVE && !(flags & REDIS_IO_ASAP) &&
+ (now - op->ctime) < server.cache_flush_delay) break;
+
+ /* Don't add a SAVE job in the IO thread queue if there is already
+ * a save in progress for the same key. */
+ if (op->type == REDIS_IO_SAVE &&
+ cacheScheduleIOGetFlags(op->db,op->key) & REDIS_IO_SAVEINPROG)
+ {
+ /* Move the operation at the end of the list if there
+ * are other operations, so we can try to process the next one.
+ * Otherwise break, nothing to do here. */
+ if (listLength(server.cache_io_queue) > 1) {
+ listDelNode(server.cache_io_queue,ln);
+ listAddNodeTail(server.cache_io_queue,op);
+ continue;
+ } else {
+ break;
+ }
+ }
+
+ redisLog(REDIS_DEBUG,"Creating IO %s Job for key %s",
+ op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr);
+
+ if (op->type == REDIS_IO_LOAD) {
+ cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL,0);
+ } else {
+ time_t expire = -1;
+
+ /* Lookup the key, in order to put the current value in the IO
+ * Job. Otherwise if the key does not exists we schedule a disk
+ * store delete operation, setting the value to NULL. */
+ de = dictFind(op->db->dict,op->key->ptr);
+ if (de) {
+ val = dictGetEntryVal(de);
+ expire = getExpire(op->db,op->key);
+ } else {
+ /* Setting the value to NULL tells the IO thread to delete
+ * the key on disk. */
+ val = NULL;
+ }
+ cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val,expire);
+ }
+ /* Mark the operation as in progress. */
+ cacheScheduleIODelFlag(op->db,op->key,op->type);
+ cacheScheduleIOAddFlag(op->db,op->key,
+ (op->type == REDIS_IO_LOAD) ? REDIS_IO_LOADINPROG :
+ REDIS_IO_SAVEINPROG);
+ /* Finally remove the operation from the queue.
+ * But we'll have trace of it in the hash table. */
+ listDelNode(server.cache_io_queue,ln);
+ decrRefCount(op->key);
+ zfree(op);
+ pushed++;
+ }
+ return pushed;
+}
+
+void cacheCron(void) {
+ /* Push jobs */
+ cacheScheduleIOPushJobs(0);
+
+ /* Reclaim memory from the object cache */
+ while (server.ds_enabled && zmalloc_used_memory() >
+ server.cache_max_memory)
+ {
+ int done = 0;
+
+ if (cacheFreeOneEntry() == REDIS_OK) done++;
+ if (negativeCacheEvictOneEntry() == REDIS_OK) done++;
+ if (done == 0) break; /* nothing more to free */
+ }
+}
+
+/* ========== Disk store cache - Blocking clients on missing keys =========== */