]> git.saurik.com Git - redis.git/commitdiff
cron part of disk store object cache implemented. Objects are pushed as IO jobs if...
authorantirez <antirez@gmail.com>
Wed, 29 Dec 2010 21:18:20 +0000 (22:18 +0100)
committerantirez <antirez@gmail.com>
Wed, 29 Dec 2010 21:18:20 +0000 (22:18 +0100)
src/dscache.c
src/redis.c
src/redis.h

index 5570e9c5dba59f4019d7b579cfa6f6c93a3bf212..fca2cca9b5f6688200df3b273ab07adc801efb4b 100644 (file)
@@ -349,6 +349,7 @@ void *IOThreadEntryPoint(void *arg) {
             j->val = dsGet(j->db,j->key);
             redisAssert(j->val != NULL);
         } else if (j->type == REDIS_IOJOB_SAVE) {
+            redisAssert(j->val->storage == REDIS_DS_SAVING);
             if (j->val)
                 dsSet(j->db,j->key,j->val);
             else
@@ -443,6 +444,66 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
     unlockThreadedIO();
 }
 
+void cacheScheduleForFlush(redisDb *db, robj *key) {
+    dirtykey *dk;
+    dictEntry *de;
+    
+    de = dictFind(db->dict,key->ptr);
+    if (de) {
+        robj *val = dictGetEntryVal(de);
+        if (val->storage == REDIS_DS_DIRTY)
+            return;
+        else
+            val->storage = REDIS_DS_DIRTY;
+    }
+
+    dk = zmalloc(sizeof(*dk));
+    dk->db = db;
+    dk->key = key;
+    incrRefCount(key);
+    dk->ctime = time(NULL);
+    listAddNodeTail(server.cache_flush_queue, key);
+}
+
+void cacheCron(void) {
+    time_t now = time(NULL);
+    listNode *ln;
+
+    /* Sync stuff on disk */
+    while((ln = listFirst(server.cache_flush_queue)) != NULL) {
+        dirtykey *dk = ln->value;
+
+        if ((now - dk->ctime) >= server.cache_flush_delay) {
+            struct dictEntry *de;
+            robj *val;
+
+            /* Lookup the key. We need to check if it's still here and
+             * possibly access to the value. */
+            de = dictFind(dk->db->dict,dk->key->ptr);
+            if (de) {
+                val = dictGetEntryVal(de);
+                redisAssert(val->storage == REDIS_DS_DIRTY);
+                val->storage = REDIS_DS_SAVING;
+            } else {
+                /* Setting the value to NULL tells the IO thread to delete
+                 * the key on disk. */
+                val = NULL;
+            }
+            dsCreateIOJob(REDIS_IOJOB_SAVE,dk->db,dk->key,val);
+            listDelNode(server.cache_flush_queue,ln);
+        } else {
+            break; /* too early */
+        }
+    }
+
+    /* Reclaim memory from the object cache */
+    while (server.ds_enabled && zmalloc_used_memory() >
+            server.cache_max_memory)
+    {
+        if (cacheFreeOneEntry() == REDIS_ERR) break;
+    }
+}
+
 /* ============ Virtual Memory - Blocking clients on missing keys =========== */
 
 /* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
@@ -454,6 +515,7 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
  * - if it's REDIS_DS_SAVING the key is being saved by an IO Job. When
  *   the client will lookup the key it will block if the key is still
  *   in this stage but it's more or less the best we can do.
+ *
  *   FIXME: we should try if it's actually better to suspend the client
  *   accessing an object that is being saved, and awake it only when
  *   the saving was completed.
index 12ff99064f299eba758cdb263efa7b184f1d5c9b..8680797ab19be84d8c95cc25e5955cb7e9ab4f4b 100644 (file)
@@ -620,11 +620,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
 
     /* Remove a few cached objects from memory if we are over the
      * configured memory limit */
-    while (server.ds_enabled && zmalloc_used_memory() >
-            server.cache_max_memory)
-    {
-        if (cacheFreeOneEntry() == REDIS_ERR) break;
-    }
+    if (server.ds_enabled) cacheCron();
 
     /* Replication cron function -- used to reconnect to master and
      * to detect transfer failures. */
index 25d76ab09dcb15b8b2b27c5bd9de3707f3186e7c..d9a4b912398156f395586f0900808edc658aa2b1 100644 (file)
@@ -800,6 +800,8 @@ int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
 int dontWaitForSwappedKey(redisClient *c, robj *key);
 void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
 int cacheFreeOneEntry(void);
+void cacheScheduleForFlush(redisDb *db, robj *key);
+void cacheCron(void);
 
 /* Set data type */
 robj *setTypeCreate(robj *value);