]> git.saurik.com Git - redis.git/blobdiff - src/dscache.c
diskstore cache bug fixing
[redis.git] / src / dscache.c
index fc358c5290e57899ae59dfc8941c600de6fc540e..05112cbb7303e2b7510f3bfc8e1661f8638dc2cb 100644 (file)
@@ -87,6 +87,8 @@
  *
  * - If dsSet() fails on the write thread log the error and reschedule the
  *   key for flush.
+ *
+ * - Check why INCR will not update the LRU info for the object.
  */
 
 /* Virtual Memory is composed mainly of two subsystems:
@@ -133,6 +135,7 @@ void dsInit(void) {
     server.io_processed = listCreate();
     server.io_ready_clients = listCreate();
     pthread_mutex_init(&server.io_mutex,NULL);
+    pthread_cond_init(&server.io_condvar,NULL);
     server.io_active_threads = 0;
     if (pipe(pipefds) == -1) {
         redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting."
@@ -329,13 +332,14 @@ void *IOThreadEntryPoint(void *arg) {
     REDIS_NOTUSED(arg);
 
     pthread_detach(pthread_self());
+    lockThreadedIO();
     while(1) {
+        /* Wait for more work to do */
+        pthread_cond_wait(&server.io_condvar,&server.io_mutex);
         /* Get a new job to process */
-        lockThreadedIO();
         if (listLength(server.io_newjobs) == 0) {
-            /* No new jobs in queue, exit. */
+            /* No new jobs in queue, reiterate. */
             unlockThreadedIO();
-            sleep(1);
             continue;
         }
         ln = listFirst(server.io_newjobs);
@@ -345,6 +349,7 @@ void *IOThreadEntryPoint(void *arg) {
         listAddNodeTail(server.io_processing,j);
         ln = listLast(server.io_processing); /* We use ln later to remove it */
         unlockThreadedIO();
+
         redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
             (long) pthread_self(),
             (j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
@@ -367,15 +372,17 @@ void *IOThreadEntryPoint(void *arg) {
         /* Done: insert the job into the processed queue */
         redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
             (long) pthread_self(), (void*)j, (char*)j->key->ptr);
+
         lockThreadedIO();
         listDelNode(server.io_processing,ln);
         listAddNodeTail(server.io_processed,j);
-        unlockThreadedIO();
 
         /* Signal the main thread there is new stuff to process */
         redisAssert(write(server.io_ready_pipe_write,"x",1) == 1);
     }
-    return NULL; /* never reached */
+    /* never reached, but that's the full pattern... */
+    unlockThreadedIO();
+    return NULL;
 }
 
 void spawnIOThread(void) {
@@ -397,16 +404,14 @@ void spawnIOThread(void) {
     server.io_active_threads++;
 }
 
-/* We need to wait for the last thread to exit before we are able to
- * fork() in order to BGSAVE or BGREWRITEAOF. */
+/* Wait that all the pending IO Jobs are processed */
 void waitEmptyIOJobsQueue(void) {
     while(1) {
         int io_processed_len;
 
         lockThreadedIO();
         if (listLength(server.io_newjobs) == 0 &&
-            listLength(server.io_processing) == 0 &&
-            server.io_active_threads == 0)
+            listLength(server.io_processing) == 0)
         {
             unlockThreadedIO();
             return;
@@ -427,6 +432,21 @@ void waitEmptyIOJobsQueue(void) {
     }
 }
 
+/* Process all the IO Jobs already completed by threads but still waiting
+ * processing from the main thread. */
+void processAllPendingIOJobs(void) {
+    while(1) {
+        int io_processed_len;
+
+        lockThreadedIO();
+        io_processed_len = listLength(server.io_processed);
+        unlockThreadedIO();
+        if (io_processed_len == 0) return;
+        vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,
+                                                    (void*)0xdeadbeef,0);
+    }
+}
+
 /* This function must be called while with threaded IO locked */
 void queueIOJob(iojob *j) {
     redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
@@ -449,6 +469,7 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
 
     lockThreadedIO();
     queueIOJob(j);
+    pthread_cond_signal(&server.io_condvar);
     unlockThreadedIO();
 }