From bcf2995c987acea7f5485ec0e3717a29a7e98457 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 2 Aug 2010 18:13:39 +0200 Subject: [PATCH] support for write operations against expiring keys, by master-controlled expiring in replication and AOF synthesizing DEL operations --- src/db.c | 51 +++++++++++++++++++++++-------- src/redis.c | 81 ++++++++++++++++++++++++++++---------------------- src/redis.h | 2 +- src/t_string.c | 1 - 4 files changed, 86 insertions(+), 49 deletions(-) diff --git a/src/db.c b/src/db.c index 958a9f6b..d8a5d0b2 100644 --- a/src/db.c +++ b/src/db.c @@ -45,7 +45,7 @@ robj *lookupKeyRead(redisDb *db, robj *key) { } robj *lookupKeyWrite(redisDb *db, robj *key) { - deleteIfVolatile(db,key); + expireIfNeeded(db,key); return lookupKey(db,key); } @@ -321,7 +321,6 @@ void renameGenericCommand(redisClient *c, int nx) { return; incrRefCount(o); - deleteIfVolatile(c->db,c->argv[2]); if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) { if (nx) { decrRefCount(o); @@ -375,7 +374,6 @@ void moveCommand(redisClient *c) { } /* Try to add the element to the target DB */ - deleteIfVolatile(dst,c->argv[1]); if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) { addReply(c,shared.czero); return; @@ -430,8 +428,45 @@ time_t getExpire(redisDb *db, robj *key) { return (time_t) dictGetEntryVal(de); } +/* Propagate expires into slaves and the AOF file. + * When a key expires in the master, a DEL operation for this key is sent + * to all the slaves and the AOF file if enabled. + * + * This way the key expiry is centralized in one place, and since both + * AOF and the master->slave link guarantee operation ordering, everything + * will be consistent even if we allow write operations against expiring + * keys. */ +void propagateExpire(redisDb *db, robj *key) { + struct redisCommand *cmd; + robj *argv[2]; + + cmd = lookupCommand("del"); + argv[0] = createStringObject("DEL",3); + argv[1] = key; + incrRefCount(key); + + if (server.appendonly) + feedAppendOnlyFile(cmd,db->id,argv,2); + if (listLength(server.slaves)) + replicationFeedSlaves(server.slaves,db->id,argv,2); + + decrRefCount(key); +} + int expireIfNeeded(redisDb *db, robj *key) { time_t when = getExpire(db,key); + + /* If we are running in the context of a slave, return ASAP: + * the slave key expiration is controlled by the master that will + * send us synthesized DEL operations for expired keys. + * + * Still we try to return the right information to the caller, + * that is, 0 if we think the key should be still valid, 1 if + * we think the key is expired at this time. */ + if (server.masterhost != NULL) { + return time(NULL) > when; + } + if (when < 0) return 0; /* Return when this key has not expired */ @@ -440,15 +475,7 @@ int expireIfNeeded(redisDb *db, robj *key) { /* Delete the key */ server.stat_expiredkeys++; server.dirty++; - return dbDelete(db,key); -} - -int deleteIfVolatile(redisDb *db, robj *key) { - if (getExpire(db,key) < 0) return 0; - - /* Delete the key */ - server.stat_expiredkeys++; - server.dirty++; + propagateExpire(db,key); return dbDelete(db,key); } diff --git a/src/redis.c b/src/redis.c index c8b1c781..27ade8b1 100644 --- a/src/redis.c +++ b/src/redis.c @@ -435,6 +435,48 @@ void updateDictResizePolicy(void) { /* ======================= Cron: called every 100 ms ======================== */ +/* Try to expire a few timed out keys. The algorithm used is adaptive and + * will use few CPU cycles if there are few expiring keys, otherwise + * it will get more aggressive to avoid that too much memory is used by + * keys that can be removed from the keyspace. */ +void activeExpireCycle(void) { + int j; + + for (j = 0; j < server.dbnum; j++) { + int expired; + redisDb *db = server.db+j; + + /* Continue to expire if at the end of the cycle more than 25% + * of the keys were expired. */ + do { + long num = dictSize(db->expires); + time_t now = time(NULL); + + expired = 0; + if (num > REDIS_EXPIRELOOKUPS_PER_CRON) + num = REDIS_EXPIRELOOKUPS_PER_CRON; + while (num--) { + dictEntry *de; + time_t t; + + if ((de = dictGetRandomKey(db->expires)) == NULL) break; + t = (time_t) dictGetEntryVal(de); + if (now > t) { + sds key = dictGetEntryKey(de); + robj *keyobj = createStringObject(key,sdslen(key)); + + propagateExpire(db,keyobj); + dbDelete(db,keyobj); + decrRefCount(keyobj); + expired++; + server.stat_expiredkeys++; + } + } + } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); + } +} + + int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j, loops = server.cronloops++; REDIS_NOTUSED(eventLoop); @@ -533,41 +575,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } - /* Try to expire a few timed out keys. The algorithm used is adaptive and - * will use few CPU cycles if there are few expiring keys, otherwise - * it will get more aggressive to avoid that too much memory is used by - * keys that can be removed from the keyspace. */ - for (j = 0; j < server.dbnum; j++) { - int expired; - redisDb *db = server.db+j; - - /* Continue to expire if at the end of the cycle more than 25% - * of the keys were expired. */ - do { - long num = dictSize(db->expires); - time_t now = time(NULL); - - expired = 0; - if (num > REDIS_EXPIRELOOKUPS_PER_CRON) - num = REDIS_EXPIRELOOKUPS_PER_CRON; - while (num--) { - dictEntry *de; - time_t t; - - if ((de = dictGetRandomKey(db->expires)) == NULL) break; - t = (time_t) dictGetEntryVal(de); - if (now > t) { - sds key = dictGetEntryKey(de); - robj *keyobj = createStringObject(key,sdslen(key)); - - dbDelete(db,keyobj); - decrRefCount(keyobj); - expired++; - server.stat_expiredkeys++; - } - } - } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4); - } + /* Expire a few keys per cycle, only if this is a master. + * On slaves we wait for DEL operations synthesized by the master + * in order to guarantee a strict consistency. */ + if (server.masterhost == NULL) activeExpireCycle(); /* Swap a few keys on disk if we are over the memory limit and VM * is enbled. Try to free objects from the free list first. */ diff --git a/src/redis.h b/src/redis.h index fb051f8e..27520c19 100644 --- a/src/redis.h +++ b/src/redis.h @@ -752,8 +752,8 @@ void resetServerSaveParams(); /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); +void propagateExpire(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key); -int deleteIfVolatile(redisDb *db, robj *key); time_t getExpire(redisDb *db, robj *key); int setExpire(redisDb *db, robj *key, time_t when); robj *lookupKey(redisDb *db, robj *key); diff --git a/src/t_string.c b/src/t_string.c index f55595c2..3b8a39bb 100644 --- a/src/t_string.c +++ b/src/t_string.c @@ -17,7 +17,6 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir } } - if (nx) deleteIfVolatile(c->db,key); retval = dbAdd(c->db,key,val); if (retval == REDIS_ERR) { if (!nx) { -- 2.45.2