From 16d778780eb865deefb2bfa024aef50926917eac Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 28 Dec 2010 18:06:40 +0100 Subject: [PATCH] a lot of code reworked/removed to implement object caching --- src/aof.c | 29 ++--------------------- src/config.c | 20 ++++++---------- src/db.c | 32 ++++++++++---------------- src/dscache.c | 60 +++++++++++++++++++++++++++++------------------- src/networking.c | 2 +- src/object.c | 28 ++++------------------ src/rdb.c | 57 ++++++--------------------------------------- src/redis.h | 23 +++++-------------- 8 files changed, 76 insertions(+), 175 deletions(-) diff --git a/src/aof.c b/src/aof.c index 959a5f52..f5d04a62 100644 --- a/src/aof.c +++ b/src/aof.c @@ -242,7 +242,6 @@ int loadAppendOnlyFile(char *filename) { char buf[128]; sds argsds; struct redisCommand *cmd; - int force_swapout; /* Serve the clients from time to time */ if (!(loops++ % 1000)) { @@ -286,17 +285,6 @@ int loadAppendOnlyFile(char *filename) { /* Clean up, ready for the next command */ for (j = 0; j < argc; j++) decrRefCount(argv[j]); zfree(argv); - - /* Handle swapping while loading big datasets when VM is on */ - force_swapout = 0; - if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) - force_swapout = 1; - - if (server.vm_enabled && force_swapout) { - while (zmalloc_used_memory() > server.vm_max_memory) { - if (vmSwapOneObjectBlocking() == REDIS_ERR) break; - } - } } /* This point can only be reached when EOF is reached without errors. @@ -359,22 +347,11 @@ int rewriteAppendOnlyFile(char *filename) { sds keystr = dictGetEntryKey(de); robj key, *o; time_t expiretime; - int swapped; keystr = dictGetEntryKey(de); o = dictGetEntryVal(de); initStaticStringObject(key,keystr); - /* If the value for this key is swapped, load a preview in memory. - * We use a "swapped" flag to remember if we need to free the - * value object instead to just increment the ref count anyway - * in order to avoid copy-on-write of pages if we are forked() */ - if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || - o->storage == REDIS_VM_SWAPPING) { - swapped = 0; - } else { - o = vmPreviewObject(o); - swapped = 1; - } + expiretime = getExpire(db,&key); /* Save the key and associated value */ @@ -509,7 +486,6 @@ int rewriteAppendOnlyFile(char *filename) { if (fwriteBulkObject(fp,&key) == 0) goto werr; if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr; } - if (swapped) decrRefCount(o); } dictReleaseIterator(di); } @@ -553,12 +529,11 @@ int rewriteAppendOnlyFileBackground(void) { pid_t childpid; if (server.bgrewritechildpid != -1) return REDIS_ERR; - if (server.vm_enabled) waitEmptyIOJobsQueue(); + redisAssert(server.ds_enabled == 0); if ((childpid = fork()) == 0) { /* Child */ char tmpfile[256]; - if (server.vm_enabled) vmReopenSwapFile(); if (server.ipfd > 0) close(server.ipfd); if (server.sofd > 0) close(server.sofd); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); diff --git a/src/config.c b/src/config.c index 79c367bf..75b1365b 100644 --- a/src/config.c +++ b/src/config.c @@ -241,21 +241,15 @@ void loadServerConfig(char *filename) { } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) { zfree(server.dbfilename); server.dbfilename = zstrdup(argv[1]); - } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) { - if ((server.vm_enabled = yesnotoi(argv[1])) == -1) { + } else if (!strcasecmp(argv[0],"diskstore-enabled") && argc == 2) { + if ((server.ds_enabled = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - } else if (!strcasecmp(argv[0],"vm-swap-file") && argc == 2) { - zfree(server.vm_swap_file); - server.vm_swap_file = zstrdup(argv[1]); - } else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) { - server.vm_max_memory = memtoll(argv[1],NULL); - } else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) { - server.vm_page_size = memtoll(argv[1], NULL); - } else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) { - server.vm_pages = memtoll(argv[1], NULL); - } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) { - server.vm_max_threads = strtoll(argv[1], NULL, 10); + } else if (!strcasecmp(argv[0],"diskstore-path") && argc == 2) { + zfree(server.ds_path); + server.ds_path = zstrdup(argv[1]); + } else if (!strcasecmp(argv[0],"cache-max-memory") && argc == 2) { + server.cache_max_memory = memtoll(argv[1],NULL); } else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2) { server.hash_max_zipmap_entries = memtoll(argv[1], NULL); } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2) { diff --git a/src/db.c b/src/db.c index aa1c14ad..8ce5d673 100644 --- a/src/db.c +++ b/src/db.c @@ -17,29 +17,17 @@ robj *lookupKey(redisDb *db, robj *key) { if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1) val->lru = server.lruclock; - if (server.vm_enabled) { - if (val->storage == REDIS_VM_MEMORY || - val->storage == REDIS_VM_SWAPPING) - { - /* If we were swapping the object out, cancel the operation */ - if (val->storage == REDIS_VM_SWAPPING) - vmCancelThreadedIOJob(val); - } else { - int notify = (val->storage == REDIS_VM_LOADING); - - /* Our value was swapped on disk. Bring it at home. */ - redisAssert(val->type == REDIS_VMPOINTER); - val = vmLoadObject(val); - dictGetEntryVal(de) = val; - - /* Clients blocked by the VM subsystem may be waiting for - * this key... */ - if (notify) handleClientsBlockedOnSwappedKey(db,key); - } + if (server.ds_enabled && val->storage == REDIS_DS_SAVING) { + /* FIXME: change this code to just wait for our object to + * get out of the IO Job. */ + waitEmptyIOJobsQueue(); + redisAssert(val->storage != REDIS_DS_SAVING); } server.stat_keyspace_hits++; return val; } else { + /* FIXME: Check if the object is on disk, if it is, load it + * in a blocking way now. */ server.stat_keyspace_misses++; return NULL; } @@ -133,7 +121,11 @@ int dbDelete(redisDb *db, robj *key) { * deleting the key will kill the I/O thread bringing the key from swap * to memory, so the client will never be notified and unblocked if we * don't do it now. */ - if (server.vm_enabled) handleClientsBlockedOnSwappedKey(db,key); + if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key); + + /* FIXME: we need to delete the IO Job loading the key, or simply we can + * wait for it to finish. */ + /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr); diff --git a/src/dscache.c b/src/dscache.c index b21868c3..b3dc2b40 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -26,26 +26,17 @@ * * - cron() checks if there are elements on this list. When there are things * to flush, we create an IO Job for the I/O thread. - * FIXME: how to mark this key as "busy"? With VM we used to change the - * object->storage field, but this time we need this to work with every - * kind of object, including shared ones. One possibility is just killing - * object sharing at all. So let's assume this will be our solution. - * - * So we set keys that are in the process of being saved as - * object->storage = REDIS_STORAGE_SAVING; + * NOTE: We disalbe object sharing when server.ds_enabled == 1 so objects + * that are referenced an IO job for flushing on disk are marked as + * o->storage == REDIS_DS_SAVING. * * - This is what we do on key lookup: - * 1) The key already exists in memory. object->storage == REDIS_DS_MEMORY. + * 1) The key already exists in memory. object->storage == REDIS_DS_MEMORY + * or it is object->storage == REDIS_DS_DIRTY: * We don't do nothing special, lookup, return value object pointer. * 2) The key is in memory but object->storage == REDIS_DS_SAVING. - * This is an explicit lookup so we have to abort the saving operation. - * We kill the IO Job, set the storage to == REDIS_DB_MEMORY but - * re-queue the object in the server.ds_cache_dirty list. - * - * Btw here we need some protection against the problem of continuously - * writing against a value having the effect of this value to be never - * saved on disk. That is, at some point we need to block and write it - * if there is too much delay. + * When this happens we block waiting for the I/O thread to process + * this object. Then continue. * 3) The key is not in memory. We block to load the key from disk. * Of course the key may not be present at all on the disk store as well, * in such case we just detect this condition and continue, returning @@ -56,20 +47,43 @@ * keys a client is going to use. We block the client, load keys * using the I/O thread, unblock the client. Same code as VM more or less. * - * - Transfering keys from memory to disk. - * Again while in cron() we detect our memory limit was reached. What we - * do is transfering random keys that are not set as dirty on disk, using - * LRU to select the key. + * - Reclaiming memory. + * In cron() we detect our memory limit was reached. What we + * do is deleting keys that are REDIS_DS_MEMORY, using LRU. + * * If this is not enough to return again under the memory limits we also * start to flush keys that need to be synched on disk synchronously, - * removing it from the memory. + * removing it from the memory. We do this blocking as memory limit is a + * much "harder" barrirer in the new design. * * - IO thread operations are no longer stopped for sync loading/saving of - * things. When a key is found to be in the process of being saved or - * loaded we simply wait for the IO thread to end its work. + * things. When a key is found to be in the process of being saved + * we simply wait for the IO thread to end its work. * * Otherwise if there is to load a key without any IO thread operation * just started it is blocking-loaded in the lookup function. + * + * - What happens when an object is destroyed? + * + * If o->storage == REDIS_DS_MEMORY then we simply destory the object. + * If o->storage == REDIS_DS_DIRTY we can still remove the object. It had + * changes not flushed on disk, but is being removed so + * who cares. + * if o->storage == REDIS_DS_SAVING then the object is being saved so + * it is impossible that its refcount == 1, must be at + * least two. When the object is saved the storage will + * be set back to DS_MEMORY. + * + * - What happens when keys are deleted? + * + * We simply schedule a key flush operation as usually, but when the + * IO thread will be created the object pointer will be set to NULL + * so the IO thread will know that the work to do is to delete the key + * from the disk store. + * + * - What happens with MULTI/EXEC? + * + * Good question. */ /* Virtual Memory is composed mainly of two subsystems: diff --git a/src/networking.c b/src/networking.c index 73a514e6..524c396c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -167,7 +167,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) { void addReply(redisClient *c, robj *obj) { if (_installWriteEvent(c) != REDIS_OK) return; - redisAssert(!server.ds_enabled || obj->storage == REDIS_VM_MEMORY); + redisAssert(!server.ds_enabled || obj->storage == REDIS_DS_MEMORY); /* This is an important place where we can avoid copy-on-write * when there is a saving child running, avoiding touching the diff --git a/src/object.c b/src/object.c index de62f504..4374a07c 100644 --- a/src/object.c +++ b/src/object.c @@ -21,7 +21,7 @@ robj *createObject(int type, void *ptr) { /* The following is only needed if VM is active, but since the conditional * is probably more costly than initializing the field it's better to * have every field properly initialized anyway. */ - o->storage = REDIS_VM_MEMORY; + o->storage = REDIS_DS_MEMORY; return o; } @@ -160,31 +160,11 @@ void incrRefCount(robj *o) { void decrRefCount(void *obj) { robj *o = obj; - /* Object is a swapped out value, or in the process of being loaded. */ - if (server.vm_enabled && - (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING)) - { - vmpointer *vp = obj; - if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(o); - vmMarkPagesFree(vp->page,vp->usedpages); - server.vm_stats_swapped_objects--; - zfree(vp); - return; - } - if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0"); - /* Object is in memory, or in the process of being swapped out. - * - * If the object is being swapped out, abort the operation on - * decrRefCount even if the refcount does not drop to 0: the object - * is referenced at least two times, as value of the key AND as - * job->val in the iojob. So if we don't invalidate the iojob, when it is - * done but the relevant key was removed in the meantime, the - * complete jobs handler will not find the key about the job and the - * assert will fail. */ - if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING) - vmCancelThreadedIOJob(o); if (--(o->refcount) == 0) { + /* DS_SAVING objects should always have a reference in the + * IO Job structure. So we should never reach this state. */ + redisAssert(o->storage != REDIS_DS_SAVING); switch(o->type) { case REDIS_STRING: freeStringObject(o); break; case REDIS_LIST: freeListObject(o); break; diff --git a/src/rdb.c b/src/rdb.c index 5e69a324..e9ca111e 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -395,12 +395,6 @@ off_t rdbSavedObjectLen(robj *o) { return len; } -/* Return the number of pages required to save this object in the swap file */ -off_t rdbSavedObjectPages(robj *o) { - off_t bytes = rdbSavedObjectLen(o); - return (bytes+(server.vm_page_size-1))/server.vm_page_size; -} - /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */ int rdbSave(char *filename) { dictIterator *di = NULL; @@ -410,11 +404,8 @@ int rdbSave(char *filename) { int j; time_t now = time(NULL); - /* Wait for I/O therads to terminate, just in case this is a - * foreground-saving, to avoid seeking the swap file descriptor at the - * same time. */ - if (server.vm_enabled) - waitEmptyIOJobsQueue(); + /* FIXME: implement .rdb save for disk store properly */ + redisAssert(server.ds_enabled == 0); snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); @@ -453,26 +444,10 @@ int rdbSave(char *filename) { if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr; if (rdbSaveTime(fp,expiretime) == -1) goto werr; } - /* Save the key and associated value. This requires special - * handling if the value is swapped out. */ - if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || - o->storage == REDIS_VM_SWAPPING) { - /* Save type, key, value */ - if (rdbSaveType(fp,o->type) == -1) goto werr; - if (rdbSaveStringObject(fp,&key) == -1) goto werr; - if (rdbSaveObject(fp,o) == -1) goto werr; - } else { - /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */ - robj *po; - /* Get a preview of the object in memory */ - po = vmPreviewObject(o); - /* Save type, key, value */ - if (rdbSaveType(fp,po->type) == -1) goto werr; - if (rdbSaveStringObject(fp,&key) == -1) goto werr; - if (rdbSaveObject(fp,po) == -1) goto werr; - /* Remove the loaded object from memory */ - decrRefCount(po); - } + /* Save type, key, value */ + if (rdbSaveType(fp,o->type) == -1) goto werr; + if (rdbSaveStringObject(fp,&key) == -1) goto werr; + if (rdbSaveObject(fp,o) == -1) goto werr; } dictReleaseIterator(di); } @@ -508,11 +483,10 @@ int rdbSaveBackground(char *filename) { pid_t childpid; if (server.bgsavechildpid != -1) return REDIS_ERR; - if (server.vm_enabled) waitEmptyIOJobsQueue(); + redisAssert(server.ds_enabled == 0); server.dirty_before_bgsave = server.dirty; if ((childpid = fork()) == 0) { /* Child */ - if (server.vm_enabled) vmReopenSwapFile(); if (server.ipfd > 0) close(server.ipfd); if (server.sofd > 0) close(server.sofd); if (rdbSave(filename) == REDIS_OK) { @@ -899,8 +873,6 @@ int rdbLoad(char *filename) { startLoading(fp); while(1) { robj *key, *val; - int force_swapout; - expiretime = -1; /* Serve the clients from time to time */ @@ -970,21 +942,6 @@ int rdbLoad(char *filename) { continue; } decrRefCount(key); - - /* Flush data on disk once 32 MB of additional RAM are used... */ - force_swapout = 0; - if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32) - force_swapout = 1; - - /* If we have still some hope of having some value fitting memory - * then we try random sampling. */ - if (!swap_all_values && server.vm_enabled && force_swapout) { - while (zmalloc_used_memory() > server.vm_max_memory) { - if (vmSwapOneObjectBlocking() == REDIS_ERR) break; - } - if (zmalloc_used_memory() > server.vm_max_memory) - swap_all_values = 1; /* We are already using too much mem */ - } } fclose(fp); stopLoading(); diff --git a/src/redis.h b/src/redis.h index b756faef..2b5dcdd5 100644 --- a/src/redis.h +++ b/src/redis.h @@ -119,22 +119,11 @@ #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ -/* Virtual memory object->where field. */ -#define REDIS_VM_MEMORY 0 /* The object is on memory */ -#define REDIS_VM_SWAPPED 1 /* The object is on disk */ -#define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */ -#define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */ - -/* Virtual memory static configuration stuff. - * Check vmFindContiguousPages() to know more about this magic numbers. */ -#define REDIS_VM_MAX_NEAR_PAGES 65536 -#define REDIS_VM_MAX_RANDOM_JUMP 4096 -#define REDIS_VM_MAX_THREADS 32 -#define REDIS_THREAD_STACK_SIZE (1024*1024*4) -/* The following is the *percentage* of completed I/O jobs to process when the - * handelr is called. While Virtual Memory I/O operations are performed by - * threads, this operations must be processed by the main thread when completed - * in order to take effect. */ +/* Disk store cache object->storage values */ +#define REDIS_DS_MEMORY 0 /* The object is on memory */ +#define REDIS_DS_DIRTY 1 /* The object was modified */ +#define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */ + #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 /* Client flags */ @@ -271,7 +260,7 @@ typedef struct vmPointer { _var.type = REDIS_STRING; \ _var.encoding = REDIS_ENCODING_RAW; \ _var.ptr = _ptr; \ - _var.storage = REDIS_VM_MEMORY; \ + _var.storage = REDIS_DS_MEMORY; \ } while(0); typedef struct redisDb { -- 2.45.2