]> git.saurik.com Git - redis.git/blobdiff - redis.c
A define to make Redis more helgrind friendly
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index ce047a406bf570075a6b97f7f52e40ab8fa2d447..7c87178d69a0d466727cd95f0bddcf713ac1bd79 100644 (file)
--- a/redis.c
+++ b/redis.c
 #include "lzf.h"    /* LZF compression library */
 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
 
+/* #define REDIS_HELGRIND_FRIENDLY */
+#if defined(__GNUC__) && defined(REDIS_HELGRIND_FRIENDLY)
+#warning "Remember to undef REDIS_HELGRIND_FRIENDLY before to commit"
+#endif
+
 /* Error codes */
 #define REDIS_OK                0
 #define REDIS_ERR               -1
@@ -2419,6 +2424,10 @@ static robj *createObject(int type, void *ptr) {
     o->ptr = ptr;
     o->refcount = 1;
     if (server.vm_enabled) {
+        /* Note that this code may run in the context of an I/O thread
+         * and accessing to server.unixtime in theory is an error
+         * (no locks). But in practice this is safe, and even if we read
+         * garbage Redis will not fail, as it's just a statistical info */
         o->vm.atime = server.unixtime;
         o->storage = REDIS_VM_MEMORY;
     }
@@ -5631,6 +5640,7 @@ static sds genRedisInfoString(void) {
         );
     }
     if (server.vm_enabled) {
+        lockThreadedIO();
         info = sdscatprintf(info,
             "vm_conf_max_memory:%llu\r\n"
             "vm_conf_page_size:%llu\r\n"
@@ -5657,6 +5667,7 @@ static sds genRedisInfoString(void) {
             (unsigned long) listLength(server.io_clients),
             (unsigned long) server.io_active_threads
         );
+        unlockThreadedIO();
     }
     for (j = 0; j < server.dbnum; j++) {
         long long keys, vkeys;
@@ -7355,7 +7366,14 @@ static int vmSwapOneObject(int usethreads) {
             de = dictGetRandomKey(db->dict);
             key = dictGetEntryKey(de);
             val = dictGetEntryVal(de);
-            if (key->storage != REDIS_VM_MEMORY) {
+            /* Only swap objects that are currently in memory.
+             *
+             * Also don't swap shared objects if threaded VM is on, as we
+             * try to ensure that the main thread does not touch the
+             * object while the I/O thread is using it, but we can't
+             * control other keys without adding additional mutex. */
+            if (key->storage != REDIS_VM_MEMORY ||
+                (server.vm_max_threads != 0 && val->refcount != 1)) {
                 if (maxtries) i--; /* don't count this try */
                 continue;
             }
@@ -7457,10 +7475,10 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
         struct dictEntry *de;
 
         redisLog(REDIS_DEBUG,"Processing I/O completed job");
-        assert(listLength(server.io_processed) != 0);
 
         /* Get the processed element (the oldest one) */
         lockThreadedIO();
+        assert(listLength(server.io_processed) != 0);
         ln = listFirst(server.io_processed);
         j = ln->value;
         listDelNode(server.io_processed,ln);
@@ -7573,6 +7591,7 @@ static void vmCancelThreadedIOJob(robj *o) {
     int i;
 
     assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING);
+again:
     lockThreadedIO();
     /* Search for a matching key in one of the queues */
     for (i = 0; i < 3; i++) {
@@ -7585,8 +7604,8 @@ static void vmCancelThreadedIOJob(robj *o) {
 
             if (job->canceled) continue; /* Skip this, already canceled. */
             if (compareStringObjects(job->key,o) == 0) {
-                redisLog(REDIS_DEBUG,"*** CANCELED %p (%s)\n",
-                    (void*)job, (char*)o->ptr);
+                redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (LIST ID %d)\n",
+                    (void*)job, (char*)o->ptr, i);
                 /* Mark the pages as free since the swap didn't happened
                  * or happened but is now discarded. */
                 if (job->type == REDIS_IOJOB_DO_SWAP)
@@ -7601,7 +7620,27 @@ static void vmCancelThreadedIOJob(robj *o) {
                     listDelNode(lists[i],ln);
                     break;
                 case 1: /* io_processing */
+                    /* Oh Shi- the thread is messing with the Job, and
+                     * probably with the object if this is a
+                     * PREPARE_SWAP or DO_SWAP job. Better to wait for the
+                     * job to move into the next queue... */
+                    if (job->type != REDIS_IOJOB_LOAD) {
+                        /* Yes, we try again and again until the job
+                         * is completed. */
+                        unlockThreadedIO();
+                        /* But let's wait some time for the I/O thread
+                         * to finish with this job. After all this condition
+                         * should be very rare. */
+                        usleep(1);
+                        goto again;
+                    } else {
+                        job->canceled = 1;
+                        break;
+                    }
                 case 2: /* io_processed */
+                    /* The job was already processed, that's easy...
+                     * just mark it as canceled so that we'll ignore it
+                     * when processing completed jobs. */
                     job->canceled = 1;
                     break;
                 }
@@ -7630,6 +7669,15 @@ static void *IOThreadEntryPoint(void *arg) {
         /* Get a new job to process */
         lockThreadedIO();
         if (listLength(server.io_newjobs) == 0) {
+#ifdef REDIS_HELGRIND_FRIENDLY
+            /* No new jobs? Wait and retry, because to be Helgrind
+             * (valgrind --tool=helgrind) what's needed is to take
+             * the same threads running instead to create/destroy threads
+             * as needed (otherwise valgrind will fail) */
+            unlockThreadedIO();
+            usleep(1); /* Give some time for the I/O thread to work. */
+            continue;
+#endif
             /* No new jobs in queue, exit. */
             redisLog(REDIS_DEBUG,"Thread %lld exiting, nothing to do",
                 (long long) pthread_self());