]> git.saurik.com Git - redis.git/blobdiff - src/dscache.c
diskstore cache bug fixing
[redis.git] / src / dscache.c
index 2bda0b509016ddeff0c0afe077fdda3cdee0d6dd..05112cbb7303e2b7510f3bfc8e1661f8638dc2cb 100644 (file)
  * - What happens with MULTI/EXEC?
  *
  *   Good question.
+ *
+ * - If dsSet() fails on the write thread log the error and reschedule the
+ *   key for flush.
+ *
+ * - Check why INCR will not update the LRU info for the object.
  */
 
 /* Virtual Memory is composed mainly of two subsystems:
@@ -117,7 +122,7 @@ void dsInit(void) {
 
     zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
 
-    redisLog(REDIS_NOTICE,"Initializing Disk Store at %s", server.ds_path);
+    redisLog(REDIS_NOTICE,"Opening Disk Store: %s", server.ds_path);
     /* Open Disk Store */
     if (dsOpen() != REDIS_OK) {
         redisLog(REDIS_WARNING,"Fatal error opening disk store. Exiting.");
@@ -130,6 +135,7 @@ void dsInit(void) {
     server.io_processed = listCreate();
     server.io_ready_clients = listCreate();
     pthread_mutex_init(&server.io_mutex,NULL);
+    pthread_cond_init(&server.io_condvar,NULL);
     server.io_active_threads = 0;
     if (pipe(pipefds) == -1) {
         redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting."
@@ -227,6 +233,7 @@ int cacheFreeOneEntry(void) {
         dbDelete(best_db,kobj);
         decrRefCount(kobj);
     }
+    return REDIS_OK;
 }
 
 /* Return true if it's safe to swap out objects in a given moment.
@@ -240,7 +247,8 @@ int dsCanTouchDiskStore(void) {
 
 void freeIOJob(iojob *j) {
     decrRefCount(j->key);
-    decrRefCount(j->val);
+    /* j->val can be NULL if the job is about deleting the key from disk. */
+    if (j->val) decrRefCount(j->val);
     zfree(j);
 }
 
@@ -261,7 +269,6 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
     while((retval = read(fd,buf,1)) == 1) {
         iojob *j;
         listNode *ln;
-        struct dictEntry *de;
 
         redisLog(REDIS_DEBUG,"Processing I/O completed job");
 
@@ -279,13 +286,23 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
 
         /* Post process it in the main thread, as there are things we
          * can do just here to avoid race conditions and/or invasive locks */
-        redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr);
-        de = dictFind(j->db->dict,j->key->ptr);
-        redisAssert(de != NULL);
+        redisLog(REDIS_DEBUG,"COMPLETED Job type %s, key: %s",
+            (j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
+            (unsigned char*)j->key->ptr);
         if (j->type == REDIS_IOJOB_LOAD) {
-            dbAdd(j->db,j->key,j->val);
+            /* Create the key-value pair in the in-memory database */
+            if (j->val != NULL) {
+                dbAdd(j->db,j->key,j->val);
+                incrRefCount(j->val);
+                if (j->expire != -1) setExpire(j->db,j->key,j->expire);
+            } else {
+                /* The key does not exist. Create a negative cache entry
+                 * for this key. */
+                /* FIXME: add this entry into the negative cache */
+            }
+            /* Handle clients waiting for this key to be loaded. */
+            handleClientsBlockedOnSwappedKey(j->db,j->key);
             freeIOJob(j);
-            /* FIXME: notify clients waiting for this key */
         } else if (j->type == REDIS_IOJOB_SAVE) {
             redisAssert(j->val->storage == REDIS_DS_SAVING);
             j->val->storage = REDIS_DS_MEMORY;
@@ -315,51 +332,57 @@ void *IOThreadEntryPoint(void *arg) {
     REDIS_NOTUSED(arg);
 
     pthread_detach(pthread_self());
+    lockThreadedIO();
     while(1) {
+        /* Wait for more work to do */
+        pthread_cond_wait(&server.io_condvar,&server.io_mutex);
         /* Get a new job to process */
-        lockThreadedIO();
         if (listLength(server.io_newjobs) == 0) {
-            /* No new jobs in queue, exit. */
-            redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
-                (long) pthread_self());
-            server.io_active_threads--;
+            /* No new jobs in queue, reiterate. */
             unlockThreadedIO();
-            return NULL;
+            continue;
         }
         ln = listFirst(server.io_newjobs);
         j = ln->value;
         listDelNode(server.io_newjobs,ln);
         /* Add the job in the processing queue */
-        j->thread = pthread_self();
         listAddNodeTail(server.io_processing,j);
         ln = listLast(server.io_processing); /* We use ln later to remove it */
         unlockThreadedIO();
-        redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
-            (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
+
+        redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
+            (long) pthread_self(),
+            (j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
+            (void*)j, (char*)j->key->ptr);
 
         /* Process the Job */
         if (j->type == REDIS_IOJOB_LOAD) {
-            vmpointer *vp = (vmpointer*)j->id;
-            j->val = vmReadObjectFromSwap(j->page,vp->vtype);
-        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
-            j->pages = rdbSavedObjectPages(j->val);
-        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
-            if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
-                j->canceled = 1;
+            time_t expire;
+
+            j->val = dsGet(j->db,j->key,&expire);
+            if (j->val) j->expire = expire;
+        } else if (j->type == REDIS_IOJOB_SAVE) {
+            redisAssert(j->val->storage == REDIS_DS_SAVING);
+            if (j->val)
+                dsSet(j->db,j->key,j->val);
+            else
+                dsDel(j->db,j->key);
         }
 
         /* Done: insert the job into the processed queue */
         redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
             (long) pthread_self(), (void*)j, (char*)j->key->ptr);
+
         lockThreadedIO();
         listDelNode(server.io_processing,ln);
         listAddNodeTail(server.io_processed,j);
-        unlockThreadedIO();
 
         /* Signal the main thread there is new stuff to process */
         redisAssert(write(server.io_ready_pipe_write,"x",1) == 1);
     }
-    return NULL; /* never reached */
+    /* never reached, but that's the full pattern... */
+    unlockThreadedIO();
+    return NULL;
 }
 
 void spawnIOThread(void) {
@@ -381,16 +404,14 @@ void spawnIOThread(void) {
     server.io_active_threads++;
 }
 
-/* We need to wait for the last thread to exit before we are able to
- * fork() in order to BGSAVE or BGREWRITEAOF. */
+/* Wait that all the pending IO Jobs are processed */
 void waitEmptyIOJobsQueue(void) {
     while(1) {
         int io_processed_len;
 
         lockThreadedIO();
         if (listLength(server.io_newjobs) == 0 &&
-            listLength(server.io_processing) == 0 &&
-            server.io_active_threads == 0)
+            listLength(server.io_processing) == 0)
         {
             unlockThreadedIO();
             return;
@@ -411,6 +432,21 @@ void waitEmptyIOJobsQueue(void) {
     }
 }
 
+/* Process all the IO Jobs already completed by threads but still waiting
+ * processing from the main thread. */
+void processAllPendingIOJobs(void) {
+    while(1) {
+        int io_processed_len;
+
+        lockThreadedIO();
+        io_processed_len = listLength(server.io_processed);
+        unlockThreadedIO();
+        if (io_processed_len == 0) return;
+        vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,
+                                                    (void*)0xdeadbeef,0);
+    }
+}
+
 /* This function must be called while with threaded IO locked */
 void queueIOJob(iojob *j) {
     redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
@@ -420,52 +456,119 @@ void queueIOJob(iojob *j) {
         spawnIOThread();
 }
 
-int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
+void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
     iojob *j;
 
     j = zmalloc(sizeof(*j));
-    j->type = REDIS_IOJOB_PREPARE_SWAP;
+    j->type = type;
     j->db = db;
     j->key = key;
     incrRefCount(key);
-    j->id = j->val = val;
-    incrRefCount(val);
-    j->canceled = 0;
-    j->thread = (pthread_t) -1;
-    val->storage = REDIS_VM_SWAPPING;
+    j->val = val;
+    if (val) incrRefCount(val);
 
     lockThreadedIO();
     queueIOJob(j);
+    pthread_cond_signal(&server.io_condvar);
     unlockThreadedIO();
-    return REDIS_OK;
+}
+
+void cacheScheduleForFlush(redisDb *db, robj *key) {
+    dirtykey *dk;
+    dictEntry *de;
+    
+    de = dictFind(db->dict,key->ptr);
+    if (de) {
+        robj *val = dictGetEntryVal(de);
+        if (val->storage == REDIS_DS_DIRTY)
+            return;
+        else
+            val->storage = REDIS_DS_DIRTY;
+    }
+
+    redisLog(REDIS_DEBUG,"Scheduling key %s for saving",key->ptr);
+    dk = zmalloc(sizeof(*dk));
+    dk->db = db;
+    dk->key = key;
+    incrRefCount(key);
+    dk->ctime = time(NULL);
+    listAddNodeTail(server.cache_flush_queue, dk);
+}
+
+void cacheCron(void) {
+    time_t now = time(NULL);
+    listNode *ln;
+
+    /* Sync stuff on disk */
+    while((ln = listFirst(server.cache_flush_queue)) != NULL) {
+        dirtykey *dk = ln->value;
+
+        if ((now - dk->ctime) >= server.cache_flush_delay) {
+            struct dictEntry *de;
+            robj *val;
+
+            redisLog(REDIS_DEBUG,"Creating IO Job to save key %s",dk->key->ptr);
+
+            /* Lookup the key, in order to put the current value in the IO
+             * Job and mark ti as DS_SAVING.
+             * Otherwise if the key does not exists we schedule a disk store
+             * delete operation, setting the value to NULL. */
+            de = dictFind(dk->db->dict,dk->key->ptr);
+            if (de) {
+                val = dictGetEntryVal(de);
+                redisAssert(val->storage == REDIS_DS_DIRTY);
+                val->storage = REDIS_DS_SAVING;
+            } else {
+                /* Setting the value to NULL tells the IO thread to delete
+                 * the key on disk. */
+                val = NULL;
+            }
+            dsCreateIOJob(REDIS_IOJOB_SAVE,dk->db,dk->key,val);
+            listDelNode(server.cache_flush_queue,ln);
+            decrRefCount(dk->key);
+            zfree(dk);
+        } else {
+            break; /* too early */
+        }
+    }
+
+    /* Reclaim memory from the object cache */
+    while (server.ds_enabled && zmalloc_used_memory() >
+            server.cache_max_memory)
+    {
+        if (cacheFreeOneEntry() == REDIS_ERR) break;
+    }
 }
 
 /* ============ Virtual Memory - Blocking clients on missing keys =========== */
 
 /* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
- * If there is not already a job loading the key, it is craeted.
- * The key is added to the io_keys list in the client structure, and also
+ * If the key is already in memory we don't need to block, regardless
+ * of the storage of the value object for this key:
+ *
+ * - If it's REDIS_DS_MEMORY we have the key in memory.
+ * - If it's REDIS_DS_DIRTY they key was modified, but still in memory.
+ * - if it's REDIS_DS_SAVING the key is being saved by an IO Job. When
+ *   the client will lookup the key it will block if the key is still
+ *   in this stage but it's more or less the best we can do.
+ *
+ *   FIXME: we should try if it's actually better to suspend the client
+ *   accessing an object that is being saved, and awake it only when
+ *   the saving was completed.
+ *
+ * Otherwise if the key is not in memory, we block the client and start
+ * an IO Job to load it:
+ *
+ * the key is added to the io_keys list in the client structure, and also
  * in the hash table mapping swapped keys to waiting clients, that is,
  * server.io_waited_keys. */
 int waitForSwappedKey(redisClient *c, robj *key) {
     struct dictEntry *de;
-    robj *o;
     list *l;
 
-    /* If the key does not exist or is already in RAM we don't need to
-     * block the client at all. */
+    /* Return ASAP if the key is in memory */
     de = dictFind(c->db->dict,key->ptr);
-    if (de == NULL) return 0;
-    o = dictGetEntryVal(de);
-    if (o->storage == REDIS_VM_MEMORY) {
-        return 0;
-    } else if (o->storage == REDIS_VM_SWAPPING) {
-        /* We were swapping the key, undo it! */
-        vmCancelThreadedIOJob(o);
-        return 0;
-    }
-
-    /* OK: the key is either swapped, or being loaded just now. */
+    if (de != NULL) return 0;
 
     /* Add the key to the list of keys this client is waiting for.
      * This maps clients to keys they are waiting for. */
@@ -488,25 +591,8 @@ int waitForSwappedKey(redisClient *c, robj *key) {
     listAddNodeTail(l,c);
 
     /* Are we already loading the key from disk? If not create a job */
-    if (o->storage == REDIS_VM_SWAPPED) {
-        iojob *j;
-        vmpointer *vp = (vmpointer*)o;
-
-        o->storage = REDIS_VM_LOADING;
-        j = zmalloc(sizeof(*j));
-        j->type = REDIS_IOJOB_LOAD;
-        j->db = c->db;
-        j->id = (robj*)vp;
-        j->key = key;
-        incrRefCount(key);
-        j->page = vp->page;
-        j->val = NULL;
-        j->canceled = 0;
-        j->thread = (pthread_t) -1;
-        lockThreadedIO();
-        queueIOJob(j);
-        unlockThreadedIO();
-    }
+    if (de == NULL)
+        dsCreateIOJob(REDIS_IOJOB_LOAD,c->db,key,NULL);
     return 1;
 }
 
@@ -584,7 +670,7 @@ int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) {
     if (listLength(c->io_keys)) {
         c->flags |= REDIS_IO_WAIT;
         aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
-        server.vm_blocked_clients++;
+        server.cache_blocked_clients++;
         return 1;
     } else {
         return 0;