From: antirez Date: Wed, 29 Dec 2010 21:18:20 +0000 (+0100) Subject: cron part of disk store object cache implemented. Objects are pushed as IO jobs if... X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/f63f0928c3e08f60f6b8557dc7c5d635834f990b cron part of disk store object cache implemented. Objects are pushed as IO jobs if needed, so that the IO thread will process them. --- diff --git a/src/dscache.c b/src/dscache.c index 5570e9c5..fca2cca9 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -349,6 +349,7 @@ void *IOThreadEntryPoint(void *arg) { j->val = dsGet(j->db,j->key); redisAssert(j->val != NULL); } else if (j->type == REDIS_IOJOB_SAVE) { + redisAssert(j->val->storage == REDIS_DS_SAVING); if (j->val) dsSet(j->db,j->key,j->val); else @@ -443,6 +444,66 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) { unlockThreadedIO(); } +void cacheScheduleForFlush(redisDb *db, robj *key) { + dirtykey *dk; + dictEntry *de; + + de = dictFind(db->dict,key->ptr); + if (de) { + robj *val = dictGetEntryVal(de); + if (val->storage == REDIS_DS_DIRTY) + return; + else + val->storage = REDIS_DS_DIRTY; + } + + dk = zmalloc(sizeof(*dk)); + dk->db = db; + dk->key = key; + incrRefCount(key); + dk->ctime = time(NULL); + listAddNodeTail(server.cache_flush_queue, key); +} + +void cacheCron(void) { + time_t now = time(NULL); + listNode *ln; + + /* Sync stuff on disk */ + while((ln = listFirst(server.cache_flush_queue)) != NULL) { + dirtykey *dk = ln->value; + + if ((now - dk->ctime) >= server.cache_flush_delay) { + struct dictEntry *de; + robj *val; + + /* Lookup the key. We need to check if it's still here and + * possibly access to the value. */ + de = dictFind(dk->db->dict,dk->key->ptr); + if (de) { + val = dictGetEntryVal(de); + redisAssert(val->storage == REDIS_DS_DIRTY); + val->storage = REDIS_DS_SAVING; + } else { + /* Setting the value to NULL tells the IO thread to delete + * the key on disk. */ + val = NULL; + } + dsCreateIOJob(REDIS_IOJOB_SAVE,dk->db,dk->key,val); + listDelNode(server.cache_flush_queue,ln); + } else { + break; /* too early */ + } + } + + /* Reclaim memory from the object cache */ + while (server.ds_enabled && zmalloc_used_memory() > + server.cache_max_memory) + { + if (cacheFreeOneEntry() == REDIS_ERR) break; + } +} + /* ============ Virtual Memory - Blocking clients on missing keys =========== */ /* This function makes the clinet 'c' waiting for the key 'key' to be loaded. @@ -454,6 +515,7 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) { * - if it's REDIS_DS_SAVING the key is being saved by an IO Job. When * the client will lookup the key it will block if the key is still * in this stage but it's more or less the best we can do. + * * FIXME: we should try if it's actually better to suspend the client * accessing an object that is being saved, and awake it only when * the saving was completed. diff --git a/src/redis.c b/src/redis.c index 12ff9906..8680797a 100644 --- a/src/redis.c +++ b/src/redis.c @@ -620,11 +620,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Remove a few cached objects from memory if we are over the * configured memory limit */ - while (server.ds_enabled && zmalloc_used_memory() > - server.cache_max_memory) - { - if (cacheFreeOneEntry() == REDIS_ERR) break; - } + if (server.ds_enabled) cacheCron(); /* Replication cron function -- used to reconnect to master and * to detect transfer failures. */ diff --git a/src/redis.h b/src/redis.h index 25d76ab0..d9a4b912 100644 --- a/src/redis.h +++ b/src/redis.h @@ -800,6 +800,8 @@ int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); int dontWaitForSwappedKey(redisClient *c, robj *key); void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); int cacheFreeOneEntry(void); +void cacheScheduleForFlush(redisDb *db, robj *key); +void cacheCron(void); /* Set data type */ robj *setTypeCreate(robj *value);