]> git.saurik.com Git - redis.git/commitdiff
diskstore more fixes
authorantirez <antirez@gmail.com>
Mon, 3 Jan 2011 16:18:37 +0000 (17:18 +0100)
committerantirez <antirez@gmail.com>
Mon, 3 Jan 2011 16:18:37 +0000 (17:18 +0100)
src/db.c
src/dscache.c
tests/assets/default.conf
tests/support/server.tcl
tests/test_helper.tcl

index ae40d2044fcd444ab077707c0c38fc7a39ebd9d9..7d2bf4af49be33d66e6ba1faac182bb6bc531b46 100644 (file)
--- a/src/db.c
+++ b/src/db.c
@@ -6,6 +6,35 @@
  * C-level DB API
  *----------------------------------------------------------------------------*/
 
+/* Important notes on lookup and disk store.
+ *
+ * When disk store is enabled on lookup we can have different cases.
+ *
+ * a) The key is in memory:
+ *    - If the key is not in IO_SAVEINPROG state we can access it.
+ *      As if it's just IO_SAVE this means we have the key in the IO queue
+ *      but can't be accessed by the IO thread (it requires to be
+ *      translated into an IO Job by the cache cron function.)
+ *    - If the key is in IO_SAVEINPROG we can't touch the key and have
+ *      to blocking wait completion of operations.
+ * b) The key is not in memory:
+ *    - If it's marked as non existing on disk as well (negative cache)
+ *      we don't need to perform the disk access.
+ *    - if the key MAY EXIST, but is not in memory, and it is marked as IO_SAVE
+ *      then the key can only be a deleted one. As IO_SAVE keys are never
+ *      evicted (dirty state), so the only possibility is that key was deleted.
+ *    - if the key MAY EXIST we need to blocking load it.
+ *      We check that the key is not in IO_SAVEINPROG state before accessing
+ *      the disk object. If it is in this state, we wait.
+ */
+
+void lookupWaitBusyKey(redisDb *db, robj *key) {
+    /* FIXME: wait just for this key, not everything */
+    waitEmptyIOJobsQueue();
+    processAllPendingIOJobs();
+    redisAssert((cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG) == 0);
+}
+
 robj *lookupKey(redisDb *db, robj *key) {
     dictEntry *de = dictFind(db->dict,key->ptr);
     if (de) {
@@ -20,11 +49,9 @@ robj *lookupKey(redisDb *db, robj *key) {
         if (server.ds_enabled &&
             cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG)
         {
-            /* There is a save in progress for this object!
-             * Wait for it to get out. */
-            waitEmptyIOJobsQueue();
-            processAllPendingIOJobs();
-            redisAssert(!(cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG));
+            /* Need to wait for the key to get unbusy */
+            redisLog(REDIS_WARNING,"Lookup found a key in SAVEINPROG state. Waiting. (Key was in the cache)");
+            lookupWaitBusyKey(db,key);
         }
         server.stat_keyspace_hits++;
         return val;
@@ -36,16 +63,24 @@ robj *lookupKey(redisDb *db, robj *key) {
          * enabled we may have this key on disk. If so load it in memory
          * in a blocking way. */
         if (server.ds_enabled && cacheKeyMayExist(db,key)) {
-            if (cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG) {
-                /* There is a save in progress for this object!
-                 * Wait for it to get out. */
-                waitEmptyIOJobsQueue();
-                processAllPendingIOJobs();
-                redisAssert((cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG) == 0);
+            long flags = cacheScheduleIOGetFlags(db,key);
+
+            /* They key is not in cache, but it has a SAVE op in queue?
+             * The only possibility is that the key was deleted, since
+             * dirty keys are not evicted. */
+            if (flags & REDIS_IO_SAVE) {
+                server.stat_keyspace_misses++;
+                return NULL;
+            }
+
+            /* At this point we need to blocking load the key in memory.
+             * The first thing we do is waiting here if the key is busy. */
+            if (flags & REDIS_IO_SAVEINPROG) {
+                redisLog(REDIS_WARNING,"Lookup found a key in SAVEINPROG state. Waiting (while force loading).");
+                lookupWaitBusyKey(db,key);
             }
 
-            redisLog(REDIS_DEBUG,"Force loading key %s via lookup",
-                key->ptr);
+            redisLog(REDIS_DEBUG,"Force loading key %s via lookup", key->ptr);
             val = dsGet(db,key,&expire);
             if (val) {
                 int retval = dbAdd(db,key,val);
@@ -53,6 +88,8 @@ robj *lookupKey(redisDb *db, robj *key) {
                 if (expire != -1) setExpire(db,key,expire);
                 server.stat_keyspace_hits++;
                 return val;
+            } else {
+                cacheSetKeyDoesNotExist(db,key);
             }
         }
         server.stat_keyspace_misses++;
index 6bfcac1d1f647766f9392197bf22166ae14fc136..3a06b03ef6dd36325f6512ea2f94a0a2f4fe5bee 100644 (file)
@@ -348,6 +348,20 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
                     incrRefCount(j->val);
                     if (j->expire != -1) setExpire(j->db,j->key,j->expire);
                 }
+            } else {
+                /* Key not found on disk. If it is also not in memory
+                 * as a cached object, nor there is a job writing it
+                 * in background, we are sure the key does not exist
+                 * currently.
+                 *
+                 * So we set a negative cache entry avoiding that the
+                 * resumed client will block load what does not exist... */
+                if (dictFind(j->db->dict,j->key) == NULL &&
+                    (cacheScheduleIOGetFlags(j->db,j->key) &
+                      (REDIS_IO_SAVE|REDIS_IO_SAVEINPROG)) == 0)
+                {
+                    cacheSetKeyDoesNotExist(j->db,j->key);
+                }
             }
             cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_LOADINPROG);
             handleClientsBlockedOnSwappedKey(j->db,j->key);
@@ -469,7 +483,8 @@ void waitEmptyIOJobsQueue(void) {
         redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d",
             listLength(server.io_newjobs),
             listLength(server.io_processing));
-            /*
+
+        /* FIXME: signal or not?
         if (listLength(server.io_newjobs)) {
             pthread_cond_signal(&server.io_condvar);
         }
@@ -483,8 +498,12 @@ void waitEmptyIOJobsQueue(void) {
         if (io_processed_len) {
             vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,
                                                         (void*)0xdeadbeef,0);
+            /* FIXME: probably wiser to drop this sleeps. Anyway 
+             * the contention on the IO thread will avoid we to loop
+             * too fast here. */
             usleep(1000); /* 1 millisecond */
         } else {
+            /* FIXME: same as fixme above. */
             usleep(10000); /* 10 milliseconds */
         }
     }
index 8113e157c58065bc94955be2cc95ce72f0b3a759..042332848152940452217bdbe02980d2d3450dbe 100644 (file)
@@ -86,7 +86,7 @@ dbfilename dump.rdb
 # Also the Append Only File will be created inside this directory.
 # 
 # Note that you must specify a directory here, not a file name.
-dir ./test/tmp
+dir /tmp
 
 ################################# REPLICATION #################################
 
index 4f48d22dcafbd023acde952340890bc262a7ee10..ad711e6ee224f382bcf80a3ceba4a2b279cd4c5f 100644 (file)
@@ -177,6 +177,9 @@ proc start_server {options {code undefined}} {
     if {$::valgrind} {
         exec valgrind --suppressions=src/valgrind.sup src/redis-server $config_file > $stdout 2> $stderr &
     } else {
+        if {$::verbose} {
+            puts "Logging on $stdout / $stderr"
+        }
         exec src/redis-server $config_file > $stdout 2> $stderr &
     }
     
index 2b7a8957772dbd1e57b4f73bfa362fe8392f1205..8ab2f6e40b9f0aa9443d148bd63b10ce57889033 100644 (file)
@@ -13,7 +13,7 @@ set ::host 127.0.0.1
 set ::port 16379
 set ::traceleaks 0
 set ::valgrind 0
-set ::verbose 0
+set ::verbose 1
 set ::denytags {}
 set ::allowtags {}
 set ::external 0; # If "1" this means, we are running against external instance