]> git.saurik.com Git - redis.git/commitdiff
a lot of code reworked/removed to implement object caching
authorantirez <antirez@gmail.com>
Tue, 28 Dec 2010 17:06:40 +0000 (18:06 +0100)
committerantirez <antirez@gmail.com>
Tue, 28 Dec 2010 17:06:40 +0000 (18:06 +0100)
src/aof.c
src/config.c
src/db.c
src/dscache.c
src/networking.c
src/object.c
src/rdb.c
src/redis.h

index 959a5f52af6a6a4baa1f3def8165d08758b72c2c..f5d04a62b4d493d1cbb82b5b7f2b98a55adf0a10 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -242,7 +242,6 @@ int loadAppendOnlyFile(char *filename) {
         char buf[128];
         sds argsds;
         struct redisCommand *cmd;
-        int force_swapout;
 
         /* Serve the clients from time to time */
         if (!(loops++ % 1000)) {
@@ -286,17 +285,6 @@ int loadAppendOnlyFile(char *filename) {
         /* Clean up, ready for the next command */
         for (j = 0; j < argc; j++) decrRefCount(argv[j]);
         zfree(argv);
-
-        /* Handle swapping while loading big datasets when VM is on */
-        force_swapout = 0;
-        if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
-            force_swapout = 1;
-
-        if (server.vm_enabled && force_swapout) {
-            while (zmalloc_used_memory() > server.vm_max_memory) {
-                if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
-            }
-        }
     }
 
     /* This point can only be reached when EOF is reached without errors.
@@ -359,22 +347,11 @@ int rewriteAppendOnlyFile(char *filename) {
             sds keystr = dictGetEntryKey(de);
             robj key, *o;
             time_t expiretime;
-            int swapped;
 
             keystr = dictGetEntryKey(de);
             o = dictGetEntryVal(de);
             initStaticStringObject(key,keystr);
-            /* If the value for this key is swapped, load a preview in memory.
-             * We use a "swapped" flag to remember if we need to free the
-             * value object instead to just increment the ref count anyway
-             * in order to avoid copy-on-write of pages if we are forked() */
-            if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
-                o->storage == REDIS_VM_SWAPPING) {
-                swapped = 0;
-            } else {
-                o = vmPreviewObject(o);
-                swapped = 1;
-            }
+
             expiretime = getExpire(db,&key);
 
             /* Save the key and associated value */
@@ -509,7 +486,6 @@ int rewriteAppendOnlyFile(char *filename) {
                 if (fwriteBulkObject(fp,&key) == 0) goto werr;
                 if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
             }
-            if (swapped) decrRefCount(o);
         }
         dictReleaseIterator(di);
     }
@@ -553,12 +529,11 @@ int rewriteAppendOnlyFileBackground(void) {
     pid_t childpid;
 
     if (server.bgrewritechildpid != -1) return REDIS_ERR;
-    if (server.vm_enabled) waitEmptyIOJobsQueue();
+    redisAssert(server.ds_enabled == 0);
     if ((childpid = fork()) == 0) {
         /* Child */
         char tmpfile[256];
 
-        if (server.vm_enabled) vmReopenSwapFile();
         if (server.ipfd > 0) close(server.ipfd);
         if (server.sofd > 0) close(server.sofd);
         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
index 79c367bf7e68c027bac4b986655c6a577a78ba67..75b1365b5125e92a83720b065a2e2ff8fec81596 100644 (file)
@@ -241,21 +241,15 @@ void loadServerConfig(char *filename) {
         } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
             zfree(server.dbfilename);
             server.dbfilename = zstrdup(argv[1]);
-        } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
-            if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
+        } else if (!strcasecmp(argv[0],"diskstore-enabled") && argc == 2) {
+            if ((server.ds_enabled = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
             }
-        } else if (!strcasecmp(argv[0],"vm-swap-file") && argc == 2) {
-            zfree(server.vm_swap_file);
-            server.vm_swap_file = zstrdup(argv[1]);
-        } else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) {
-            server.vm_max_memory = memtoll(argv[1],NULL);
-        } else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) {
-            server.vm_page_size = memtoll(argv[1], NULL);
-        } else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) {
-            server.vm_pages = memtoll(argv[1], NULL);
-        } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) {
-            server.vm_max_threads = strtoll(argv[1], NULL, 10);
+        } else if (!strcasecmp(argv[0],"diskstore-path") && argc == 2) {
+            zfree(server.ds_path);
+            server.ds_path = zstrdup(argv[1]);
+        } else if (!strcasecmp(argv[0],"cache-max-memory") && argc == 2) {
+            server.cache_max_memory = memtoll(argv[1],NULL);
         } else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2) {
             server.hash_max_zipmap_entries = memtoll(argv[1], NULL);
         } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2) {
index aa1c14ad451af1c95f63063e87dc56ba54ddaecd..8ce5d67334ff423c4003c9b585cf964878ce94de 100644 (file)
--- a/src/db.c
+++ b/src/db.c
@@ -17,29 +17,17 @@ robj *lookupKey(redisDb *db, robj *key) {
         if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
             val->lru = server.lruclock;
 
-        if (server.vm_enabled) {
-            if (val->storage == REDIS_VM_MEMORY ||
-                val->storage == REDIS_VM_SWAPPING)
-            {
-                /* If we were swapping the object out, cancel the operation */
-                if (val->storage == REDIS_VM_SWAPPING)
-                    vmCancelThreadedIOJob(val);
-            } else {
-                int notify = (val->storage == REDIS_VM_LOADING);
-
-                /* Our value was swapped on disk. Bring it at home. */
-                redisAssert(val->type == REDIS_VMPOINTER);
-                val = vmLoadObject(val);
-                dictGetEntryVal(de) = val;
-
-                /* Clients blocked by the VM subsystem may be waiting for
-                 * this key... */
-                if (notify) handleClientsBlockedOnSwappedKey(db,key);
-            }
+        if (server.ds_enabled && val->storage == REDIS_DS_SAVING) {
+            /* FIXME: change this code to just wait for our object to
+             * get out of the IO Job. */
+            waitEmptyIOJobsQueue();
+            redisAssert(val->storage != REDIS_DS_SAVING);
         }
         server.stat_keyspace_hits++;
         return val;
     } else {
+        /* FIXME: Check if the object is on disk, if it is, load it
+         * in a blocking way now. */
         server.stat_keyspace_misses++;
         return NULL;
     }
@@ -133,7 +121,11 @@ int dbDelete(redisDb *db, robj *key) {
      * deleting the key will kill the I/O thread bringing the key from swap
      * to memory, so the client will never be notified and unblocked if we
      * don't do it now. */
-    if (server.vm_enabled) handleClientsBlockedOnSwappedKey(db,key);
+    if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
+
+    /* FIXME: we need to delete the IO Job loading the key, or simply we can
+     * wait for it to finish. */
+
     /* Deleting an entry from the expires dict will not free the sds of
      * the key, because it is shared with the main dictionary. */
     if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
index b21868c3efed52e929ab3048c3c1c676c08872cd..b3dc2b40eccd5478d6ff8bfe17ad0ab11803b2a0 100644 (file)
  *
  * - cron() checks if there are elements on this list. When there are things
  *   to flush, we create an IO Job for the I/O thread.
- *   FIXME: how to mark this key as "busy"? With VM we used to change the
- *   object->storage field, but this time we need this to work with every
- *   kind of object, including shared ones. One possibility is just killing
- *   object sharing at all. So let's assume this will be our solution.
- *
- *   So we set keys that are in the process of being saved as
- *   object->storage = REDIS_STORAGE_SAVING;
+ *   NOTE: We disalbe object sharing when server.ds_enabled == 1 so objects
+ *   that are referenced an IO job for flushing on disk are marked as
+ *   o->storage == REDIS_DS_SAVING.
  *
  * - This is what we do on key lookup:
- *   1) The key already exists in memory. object->storage == REDIS_DS_MEMORY.
+ *   1) The key already exists in memory. object->storage == REDIS_DS_MEMORY
+ *      or it is object->storage == REDIS_DS_DIRTY:
  *      We don't do nothing special, lookup, return value object pointer.
  *   2) The key is in memory but object->storage == REDIS_DS_SAVING.
- *      This is an explicit lookup so we have to abort the saving operation.
- *      We kill the IO Job, set the storage to == REDIS_DB_MEMORY but
- *      re-queue the object in the server.ds_cache_dirty list.
- *
- *      Btw here we need some protection against the problem of continuously
- *      writing against a value having the effect of this value to be never
- *      saved on disk. That is, at some point we need to block and write it
- *      if there is too much delay.
+ *      When this happens we block waiting for the I/O thread to process
+ *      this object. Then continue.
  *   3) The key is not in memory. We block to load the key from disk.
  *      Of course the key may not be present at all on the disk store as well,
  *      in such case we just detect this condition and continue, returning
  *      keys a client is going to use. We block the client, load keys
  *      using the I/O thread, unblock the client. Same code as VM more or less.
  *
- * - Transfering keys from memory to disk.
- *   Again while in cron() we detect our memory limit was reached. What we
- *   do is transfering random keys that are not set as dirty on disk, using
- *   LRU to select the key.
+ * - Reclaiming memory.
+ *   In cron() we detect our memory limit was reached. What we
+ *   do is deleting keys that are REDIS_DS_MEMORY, using LRU.
+ *
  *   If this is not enough to return again under the memory limits we also
  *   start to flush keys that need to be synched on disk synchronously,
- *   removing it from the memory.
+ *   removing it from the memory. We do this blocking as memory limit is a
+ *   much "harder" barrirer in the new design.
  *
  * - IO thread operations are no longer stopped for sync loading/saving of
- *   things. When a key is found to be in the process of being saved or
- *   loaded we simply wait for the IO thread to end its work.
+ *   things. When a key is found to be in the process of being saved
+ *   we simply wait for the IO thread to end its work.
  *
  *   Otherwise if there is to load a key without any IO thread operation
  *   just started it is blocking-loaded in the lookup function.
+ *
+ * - What happens when an object is destroyed?
+ *
+ *   If o->storage == REDIS_DS_MEMORY then we simply destory the object.
+ *   If o->storage == REDIS_DS_DIRTY we can still remove the object. It had
+ *                    changes not flushed on disk, but is being removed so
+ *                    who cares.
+ *   if o->storage == REDIS_DS_SAVING then the object is being saved so
+ *                    it is impossible that its refcount == 1, must be at
+ *                    least two. When the object is saved the storage will
+ *                    be set back to DS_MEMORY.
+ *
+ * - What happens when keys are deleted?
+ *
+ *   We simply schedule a key flush operation as usually, but when the
+ *   IO thread will be created the object pointer will be set to NULL
+ *   so the IO thread will know that the work to do is to delete the key
+ *   from the disk store.
+ *
+ * - What happens with MULTI/EXEC?
+ *
+ *   Good question.
  */
 
 /* Virtual Memory is composed mainly of two subsystems:
index 73a514e6fc8ac6977e3caa5d7b78507232e2240a..524c396c38b97529c45c014223cd57fe5d6a740f 100644 (file)
@@ -167,7 +167,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
 
 void addReply(redisClient *c, robj *obj) {
     if (_installWriteEvent(c) != REDIS_OK) return;
-    redisAssert(!server.ds_enabled || obj->storage == REDIS_VM_MEMORY);
+    redisAssert(!server.ds_enabled || obj->storage == REDIS_DS_MEMORY);
 
     /* This is an important place where we can avoid copy-on-write
      * when there is a saving child running, avoiding touching the
index de62f504cc69ff9696dcf4b7b2b70a438d8a7a35..4374a07c1562b5f4a186b8fd2d7cc94870896f73 100644 (file)
@@ -21,7 +21,7 @@ robj *createObject(int type, void *ptr) {
     /* The following is only needed if VM is active, but since the conditional
      * is probably more costly than initializing the field it's better to
      * have every field properly initialized anyway. */
-    o->storage = REDIS_VM_MEMORY;
+    o->storage = REDIS_DS_MEMORY;
     return o;
 }
 
@@ -160,31 +160,11 @@ void incrRefCount(robj *o) {
 void decrRefCount(void *obj) {
     robj *o = obj;
 
-    /* Object is a swapped out value, or in the process of being loaded. */
-    if (server.vm_enabled &&
-        (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
-    {
-        vmpointer *vp = obj;
-        if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(o);
-        vmMarkPagesFree(vp->page,vp->usedpages);
-        server.vm_stats_swapped_objects--;
-        zfree(vp);
-        return;
-    }
-
     if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
-    /* Object is in memory, or in the process of being swapped out.
-     *
-     * If the object is being swapped out, abort the operation on
-     * decrRefCount even if the refcount does not drop to 0: the object
-     * is referenced at least two times, as value of the key AND as
-     * job->val in the iojob. So if we don't invalidate the iojob, when it is
-     * done but the relevant key was removed in the meantime, the
-     * complete jobs handler will not find the key about the job and the
-     * assert will fail. */
-    if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING)
-        vmCancelThreadedIOJob(o);
     if (--(o->refcount) == 0) {
+        /* DS_SAVING objects should always have a reference in the
+         * IO Job structure. So we should never reach this state. */
+        redisAssert(o->storage != REDIS_DS_SAVING);
         switch(o->type) {
         case REDIS_STRING: freeStringObject(o); break;
         case REDIS_LIST: freeListObject(o); break;
index 5e69a32443a31f673b9a825dc80492a739fbc15b..e9ca111e5cd3a92f75c73c32166b64dbb8949d4d 100644 (file)
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -395,12 +395,6 @@ off_t rdbSavedObjectLen(robj *o) {
     return len;
 }
 
-/* Return the number of pages required to save this object in the swap file */
-off_t rdbSavedObjectPages(robj *o) {
-    off_t bytes = rdbSavedObjectLen(o);
-    return (bytes+(server.vm_page_size-1))/server.vm_page_size;
-}
-
 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
 int rdbSave(char *filename) {
     dictIterator *di = NULL;
@@ -410,11 +404,8 @@ int rdbSave(char *filename) {
     int j;
     time_t now = time(NULL);
 
-    /* Wait for I/O therads to terminate, just in case this is a
-     * foreground-saving, to avoid seeking the swap file descriptor at the
-     * same time. */
-    if (server.vm_enabled)
-        waitEmptyIOJobsQueue();
+    /* FIXME: implement .rdb save for disk store properly */
+    redisAssert(server.ds_enabled == 0);
 
     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
     fp = fopen(tmpfile,"w");
@@ -453,26 +444,10 @@ int rdbSave(char *filename) {
                 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
                 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
             }
-            /* Save the key and associated value. This requires special
-             * handling if the value is swapped out. */
-            if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
-                                      o->storage == REDIS_VM_SWAPPING) {
-                /* Save type, key, value */
-                if (rdbSaveType(fp,o->type) == -1) goto werr;
-                if (rdbSaveStringObject(fp,&key) == -1) goto werr;
-                if (rdbSaveObject(fp,o) == -1) goto werr;
-            } else {
-                /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
-                robj *po;
-                /* Get a preview of the object in memory */
-                po = vmPreviewObject(o);
-                /* Save type, key, value */
-                if (rdbSaveType(fp,po->type) == -1) goto werr;
-                if (rdbSaveStringObject(fp,&key) == -1) goto werr;
-                if (rdbSaveObject(fp,po) == -1) goto werr;
-                /* Remove the loaded object from memory */
-                decrRefCount(po);
-            }
+            /* Save type, key, value */
+            if (rdbSaveType(fp,o->type) == -1) goto werr;
+            if (rdbSaveStringObject(fp,&key) == -1) goto werr;
+            if (rdbSaveObject(fp,o) == -1) goto werr;
         }
         dictReleaseIterator(di);
     }
@@ -508,11 +483,10 @@ int rdbSaveBackground(char *filename) {
     pid_t childpid;
 
     if (server.bgsavechildpid != -1) return REDIS_ERR;
-    if (server.vm_enabled) waitEmptyIOJobsQueue();
+    redisAssert(server.ds_enabled == 0);
     server.dirty_before_bgsave = server.dirty;
     if ((childpid = fork()) == 0) {
         /* Child */
-        if (server.vm_enabled) vmReopenSwapFile();
         if (server.ipfd > 0) close(server.ipfd);
         if (server.sofd > 0) close(server.sofd);
         if (rdbSave(filename) == REDIS_OK) {
@@ -899,8 +873,6 @@ int rdbLoad(char *filename) {
     startLoading(fp);
     while(1) {
         robj *key, *val;
-        int force_swapout;
-
         expiretime = -1;
 
         /* Serve the clients from time to time */
@@ -970,21 +942,6 @@ int rdbLoad(char *filename) {
             continue;
         }
         decrRefCount(key);
-
-        /* Flush data on disk once 32 MB of additional RAM are used... */
-        force_swapout = 0;
-        if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
-            force_swapout = 1;
-
-        /* If we have still some hope of having some value fitting memory
-         * then we try random sampling. */
-        if (!swap_all_values && server.vm_enabled && force_swapout) {
-            while (zmalloc_used_memory() > server.vm_max_memory) {
-                if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
-            }
-            if (zmalloc_used_memory() > server.vm_max_memory)
-                swap_all_values = 1; /* We are already using too much mem */
-        }
     }
     fclose(fp);
     stopLoading();
index b756faef1c4f264d2bc3a48f8395111f2ce85bd0..2b5dcdd5969329dbf1988cc2d1569c0d04f084d4 100644 (file)
 #define REDIS_RDB_ENC_INT32 2       /* 32 bit signed integer */
 #define REDIS_RDB_ENC_LZF 3         /* string compressed with FASTLZ */
 
-/* Virtual memory object->where field. */
-#define REDIS_VM_MEMORY 0       /* The object is on memory */
-#define REDIS_VM_SWAPPED 1      /* The object is on disk */
-#define REDIS_VM_SWAPPING 2     /* Redis is swapping this object on disk */
-#define REDIS_VM_LOADING 3      /* Redis is loading this object from disk */
-
-/* Virtual memory static configuration stuff.
- * Check vmFindContiguousPages() to know more about this magic numbers. */
-#define REDIS_VM_MAX_NEAR_PAGES 65536
-#define REDIS_VM_MAX_RANDOM_JUMP 4096
-#define REDIS_VM_MAX_THREADS 32
-#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
-/* The following is the *percentage* of completed I/O jobs to process when the
- * handelr is called. While Virtual Memory I/O operations are performed by
- * threads, this operations must be processed by the main thread when completed
- * in order to take effect. */
+/* Disk store cache object->storage values */
+#define REDIS_DS_MEMORY 0       /* The object is on memory */
+#define REDIS_DS_DIRTY 1        /* The object was modified */
+#define REDIS_DS_SAVING 2       /* There is an IO Job created for this obj. */
+
 #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
 
 /* Client flags */
@@ -271,7 +260,7 @@ typedef struct vmPointer {
     _var.type = REDIS_STRING; \
     _var.encoding = REDIS_ENCODING_RAW; \
     _var.ptr = _ptr; \
-    _var.storage = REDIS_VM_MEMORY; \
+    _var.storage = REDIS_DS_MEMORY; \
 } while(0);
 
 typedef struct redisDb {