From 4ab988238f7418d018bf4412c6c956845ffbeab9 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 30 Dec 2010 16:41:36 +0100 Subject: [PATCH] more work done for diskstore without trying to compile, more work needed to build again. --- src/config.c | 4 ++-- src/db.c | 3 +++ src/diskstore.c | 55 ++++++++++++++++++++++++++++++++++++++++++++++++- src/dscache.c | 20 ++++++++++++++---- src/rdb.c | 40 ++++++++++++++++++++++------------- src/redis.c | 2 +- src/redis.h | 2 ++ 7 files changed, 104 insertions(+), 22 deletions(-) diff --git a/src/config.c b/src/config.c index 75b1365b..a4060631 100644 --- a/src/config.c +++ b/src/config.c @@ -246,8 +246,8 @@ void loadServerConfig(char *filename) { err = "argument must be 'yes' or 'no'"; goto loaderr; } } else if (!strcasecmp(argv[0],"diskstore-path") && argc == 2) { - zfree(server.ds_path); - server.ds_path = zstrdup(argv[1]); + sdsfree(server.ds_path); + server.ds_path = sdsnew(argv[1]); } else if (!strcasecmp(argv[0],"cache-max-memory") && argc == 2) { server.cache_max_memory = memtoll(argv[1],NULL); } else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2) { diff --git a/src/db.c b/src/db.c index f0701c2a..cf99bbb2 100644 --- a/src/db.c +++ b/src/db.c @@ -66,6 +66,9 @@ int dbAdd(redisDb *db, robj *key, robj *val) { } else { sds copy = sdsdup(key->ptr); dictAdd(db->dict, copy, val); + if (server.ds_enabled) { + /* FIXME: remove entry from negative cache */ + } return REDIS_OK; } } diff --git a/src/diskstore.c b/src/diskstore.c index ae23b8ed..08c747a7 100644 --- a/src/diskstore.c +++ b/src/diskstore.c @@ -70,6 +70,7 @@ */ #include "redis.h" +#include "sha1.h" #include #include @@ -138,10 +139,62 @@ int dsClose(void) { return REDIS_OK; } +/* Convert key into full path for this object. Dirty but hopefully + * is fast enough. */ +void dsKeyToPath(redisDb *db, unsigned char *buf, robj *key) { + SHA1_CTX ctx; + unsigned char hash[20]; + char *hex, digits[] = "0123456789abcdef"; + int j, l; + + SHA1Init(&ctx); + SHA1Update(&ctx,key->ptr,sdslen(key->ptr)); + SHA1Final(hash,&ctx); + + /* Convert the hash into hex format */ + for (j = 0; j < 20; j++) { + hex[j*2] = digits[(hash[j]&0xF0)>>4]; + hex[(j*2)+1] = digits[hash[j]&0x0F]; + } + + /* Create the object path. Start with server.ds_path that's the root dir */ + l = sdslen(server.ds_path); + memcpy(buf,server.ds_path,l); + buf += l; + *buf++ = '/'; + + /* Then add xx/yy/ that is the two level directories */ + buf[0] = hex[0]; + buf[1] = hex[1]; + buf[2] = '/'; + buf[3] = hex[2]; + buf[4] = hex[3]; + buf[5] = '/'; + buf += 6; + + /* Add the database number followed by _ and finall the SHA1 hex */ + l = ll2string(buf,64,db->id); + buf += l; + buf[0] = '_'; + memcpy(buf+1,hex,40); + buf[41] = '\0'; +} + int dsSet(redisDb *db, robj *key, robj *val) { + char buf[1024]; + FILE *fp; + int retval; + + dsKeyToPath(buf,key); + fp = fopen(buf,"w"); + if ((retval = rdbSaveKeyValuePair(fp,db,key,val,time(NULL))) == -1) + return REDIS_ERR; + fclose(fp); + if (retval == 0) unlink(buf); /* Expired key */ + return REDIS_OK; } -robj *dsGet(redisDb *db, robj *key) { +robj *dsGet(redisDb *db, robj *key, time_t *expire) { return createStringObject("foo",3); } diff --git a/src/dscache.c b/src/dscache.c index d24ec77c..f1ffe491 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -84,6 +84,9 @@ * - What happens with MULTI/EXEC? * * Good question. + * + * - If dsSet() fails on the write thread log the error and reschedule the + * key for flush. */ /* Virtual Memory is composed mainly of two subsystems: @@ -285,8 +288,15 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, (unsigned char*)j->key->ptr); if (j->type == REDIS_IOJOB_LOAD) { /* Create the key-value pair in the in-memory database */ - dbAdd(j->db,j->key,j->val); - incrRefCount(j->val); + if (j->val != NULL) { + dbAdd(j->db,j->key,j->val); + incrRefCount(j->val); + setExpire(j->db,j->key,j->expire); + } else { + /* The key does not exist. Create a negative cache entry + * for this key. */ + /* FIXME: add this entry into the negative cache */ + } /* Handle clients waiting for this key to be loaded. */ handleClientsBlockedOnSwappedKey(j->db,j->key); freeIOJob(j); @@ -342,8 +352,10 @@ void *IOThreadEntryPoint(void *arg) { /* Process the Job */ if (j->type == REDIS_IOJOB_LOAD) { - j->val = dsGet(j->db,j->key); - redisAssert(j->val != NULL); + time_t expire; + + j->val = dsGet(j->db,j->key,&expire); + if (j->val) j->expire = expire; } else if (j->type == REDIS_IOJOB_SAVE) { redisAssert(j->val->storage == REDIS_DS_SAVING); if (j->val) diff --git a/src/rdb.c b/src/rdb.c index acec829a..9129056d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -395,6 +395,31 @@ off_t rdbSavedObjectLen(robj *o) { return len; } +/* Save a key-value pair, with expire time, type, key, value. + * On error -1 is returned. + * On success if the key was actaully saved 1 is returned, otherwise 0 + * is returned (the key was already expired). */ +int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, + time_t now) +{ + time_t expiretime; + + expiretime = getExpire(db,&key); + + /* Save the expire time */ + if (expiretime != -1) { + /* If this key is already expired skip it */ + if (expiretime < now) return 0; + if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) return -1; + if (rdbSaveTime(fp,expiretime) == -1) return -1; + } + /* Save type, key, value */ + if (rdbSaveType(fp,val->type) == -1) return -1; + if (rdbSaveStringObject(fp,&key) == -1) return -1; + if (rdbSaveObject(fp,val) == -1) return -1; + return 1; +} + /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ int rdbSave(char *filename) { dictIterator *di = NULL; @@ -432,22 +457,9 @@ int rdbSave(char *filename) { while((de = dictNext(di)) != NULL) { sds keystr = dictGetEntryKey(de); robj key, *o = dictGetEntryVal(de); - time_t expiretime; initStaticStringObject(key,keystr); - expiretime = getExpire(db,&key); - - /* Save the expire time */ - if (expiretime != -1) { - /* If this key is already expired skip it */ - if (expiretime < now) continue; - if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr; - if (rdbSaveTime(fp,expiretime) == -1) goto werr; - } - /* Save type, key, value */ - if (rdbSaveType(fp,o->type) == -1) goto werr; - if (rdbSaveStringObject(fp,&key) == -1) goto werr; - if (rdbSaveObject(fp,o) == -1) goto werr; + if (rdbSaveKeyValuePair(fp,db,key,o,now) == -1) goto werr; } dictReleaseIterator(di); } diff --git a/src/redis.c b/src/redis.c index 36b7c853..13fe0f3c 100644 --- a/src/redis.c +++ b/src/redis.c @@ -769,7 +769,7 @@ void initServerConfig() { server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU; server.maxmemory_samples = 3; server.ds_enabled = 0; - server.ds_path = zstrdup("/tmp/redis.ds"); + server.ds_path = sdsnew("/tmp/redis.ds"); server.cache_max_memory = 64LL*1024*1024; /* 64 MB of RAM */ server.cache_blocked_clients = 0; server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES; diff --git a/src/redis.h b/src/redis.h index d9a4b912..183b06b0 100644 --- a/src/redis.h +++ b/src/redis.h @@ -554,6 +554,7 @@ typedef struct iojob { robj *key; /* This I/O request is about this key */ robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */ + time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */ } iojob; /* When diskstore is enabled and a flush operation is requested we push @@ -746,6 +747,7 @@ off_t rdbSavedObjectLen(robj *o); off_t rdbSavedObjectPages(robj *o); robj *rdbLoadObject(int type, FILE *fp); void backgroundSaveDoneHandler(int statloc); +int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, time_t now); /* AOF persistence */ void flushAppendOnlyFile(void); -- 2.47.2