]> git.saurik.com Git - redis.git/blobdiff - redis.c
A define to make Redis more helgrind friendly
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index 59981cffe2fd85a3da0cc8caacba1a7d186cc3c5..7c87178d69a0d466727cd95f0bddcf713ac1bd79 100644 (file)
--- a/redis.c
+++ b/redis.c
 #include "lzf.h"    /* LZF compression library */
 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
 
+/* #define REDIS_HELGRIND_FRIENDLY */
+#if defined(__GNUC__) && defined(REDIS_HELGRIND_FRIENDLY)
+#warning "Remember to undef REDIS_HELGRIND_FRIENDLY before to commit"
+#endif
+
 /* Error codes */
 #define REDIS_OK                0
 #define REDIS_ERR               -1
 #define REDIS_VM_MAX_NEAR_PAGES 65536
 #define REDIS_VM_MAX_RANDOM_JUMP 4096
 #define REDIS_VM_MAX_THREADS 32
+#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
 /* The following is the number of completed I/O jobs to process when the
  * handelr is called. 1 is the minimum, and also the default, as it allows
  * to block as little as possible other accessing clients. While Virtual
@@ -403,6 +409,7 @@ struct redisServer {
     pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
     pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */
     pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
+    pthread_attr_t io_threads_attr; /* attributes for threads creation */
     int io_active_threads; /* Number of running I/O threads */
     int vm_max_threads; /* Max number of I/O threads running at the same time */
     /* Our main thread is blocked on the event loop, locking for sockets ready
@@ -2417,6 +2424,10 @@ static robj *createObject(int type, void *ptr) {
     o->ptr = ptr;
     o->refcount = 1;
     if (server.vm_enabled) {
+        /* Note that this code may run in the context of an I/O thread
+         * and accessing to server.unixtime in theory is an error
+         * (no locks). But in practice this is safe, and even if we read
+         * garbage Redis will not fail, as it's just a statistical info */
         o->vm.atime = server.unixtime;
         o->storage = REDIS_VM_MEMORY;
     }
@@ -5629,6 +5640,7 @@ static sds genRedisInfoString(void) {
         );
     }
     if (server.vm_enabled) {
+        lockThreadedIO();
         info = sdscatprintf(info,
             "vm_conf_max_memory:%llu\r\n"
             "vm_conf_page_size:%llu\r\n"
@@ -5655,6 +5667,7 @@ static sds genRedisInfoString(void) {
             (unsigned long) listLength(server.io_clients),
             (unsigned long) server.io_active_threads
         );
+        unlockThreadedIO();
     }
     for (j = 0; j < server.dbnum; j++) {
         long long keys, vkeys;
@@ -6984,6 +6997,7 @@ static void aofRemoveTempFile(pid_t childpid) {
 static void vmInit(void) {
     off_t totsize;
     int pipefds[2];
+    size_t stacksize;
 
     server.vm_fp = fopen("/tmp/redisvm","w+b");
     if (server.vm_fp == NULL) {
@@ -7031,6 +7045,11 @@ static void vmInit(void) {
     server.io_ready_pipe_read = pipefds[0];
     server.io_ready_pipe_write = pipefds[1];
     redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR);
+    /* LZF requires a lot of stack */
+    pthread_attr_init(&server.io_threads_attr);
+    pthread_attr_getstacksize(&server.io_threads_attr, &stacksize);
+    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
+    pthread_attr_setstacksize(&server.io_threads_attr, stacksize);
     /* Listen for events in the threaded I/O pipe */
     if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE,
         vmThreadedIOCompletedJob, NULL) == AE_ERR)
@@ -7347,7 +7366,14 @@ static int vmSwapOneObject(int usethreads) {
             de = dictGetRandomKey(db->dict);
             key = dictGetEntryKey(de);
             val = dictGetEntryVal(de);
-            if (key->storage != REDIS_VM_MEMORY) {
+            /* Only swap objects that are currently in memory.
+             *
+             * Also don't swap shared objects if threaded VM is on, as we
+             * try to ensure that the main thread does not touch the
+             * object while the I/O thread is using it, but we can't
+             * control other keys without adding additional mutex. */
+            if (key->storage != REDIS_VM_MEMORY ||
+                (server.vm_max_threads != 0 && val->refcount != 1)) {
                 if (maxtries) i--; /* don't count this try */
                 continue;
             }
@@ -7449,10 +7475,10 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
         struct dictEntry *de;
 
         redisLog(REDIS_DEBUG,"Processing I/O completed job");
-        assert(listLength(server.io_processed) != 0);
 
         /* Get the processed element (the oldest one) */
         lockThreadedIO();
+        assert(listLength(server.io_processed) != 0);
         ln = listFirst(server.io_processed);
         j = ln->value;
         listDelNode(server.io_processed,ln);
@@ -7565,6 +7591,7 @@ static void vmCancelThreadedIOJob(robj *o) {
     int i;
 
     assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING);
+again:
     lockThreadedIO();
     /* Search for a matching key in one of the queues */
     for (i = 0; i < 3; i++) {
@@ -7577,8 +7604,14 @@ static void vmCancelThreadedIOJob(robj *o) {
 
             if (job->canceled) continue; /* Skip this, already canceled. */
             if (compareStringObjects(job->key,o) == 0) {
-                redisLog(REDIS_DEBUG,"*** CANCELED %p (%s)\n",
-                    (void*)job, (char*)o->ptr);
+                redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (LIST ID %d)\n",
+                    (void*)job, (char*)o->ptr, i);
+                /* Mark the pages as free since the swap didn't happened
+                 * or happened but is now discarded. */
+                if (job->type == REDIS_IOJOB_DO_SWAP)
+                    vmMarkPagesFree(job->page,job->pages);
+                /* Cancel the job. It depends on the list the job is
+                 * living in. */
                 switch(i) {
                 case 0: /* io_newjobs */
                     /* If the job was yet not processed the best thing to do
@@ -7587,14 +7620,30 @@ static void vmCancelThreadedIOJob(robj *o) {
                     listDelNode(lists[i],ln);
                     break;
                 case 1: /* io_processing */
+                    /* Oh Shi- the thread is messing with the Job, and
+                     * probably with the object if this is a
+                     * PREPARE_SWAP or DO_SWAP job. Better to wait for the
+                     * job to move into the next queue... */
+                    if (job->type != REDIS_IOJOB_LOAD) {
+                        /* Yes, we try again and again until the job
+                         * is completed. */
+                        unlockThreadedIO();
+                        /* But let's wait some time for the I/O thread
+                         * to finish with this job. After all this condition
+                         * should be very rare. */
+                        usleep(1);
+                        goto again;
+                    } else {
+                        job->canceled = 1;
+                        break;
+                    }
                 case 2: /* io_processed */
+                    /* The job was already processed, that's easy...
+                     * just mark it as canceled so that we'll ignore it
+                     * when processing completed jobs. */
                     job->canceled = 1;
                     break;
                 }
-                /* Mark the pages as free since the swap didn't happened
-                 * or happened but is not discarded. */
-                if (job->type == REDIS_IOJOB_DO_SWAP)
-                    vmMarkPagesFree(job->page,job->pages);
                 /* Finally we have to adjust the storage type of the object
                  * in order to "UNDO" the operaiton. */
                 if (o->storage == REDIS_VM_LOADING)
@@ -7620,6 +7669,15 @@ static void *IOThreadEntryPoint(void *arg) {
         /* Get a new job to process */
         lockThreadedIO();
         if (listLength(server.io_newjobs) == 0) {
+#ifdef REDIS_HELGRIND_FRIENDLY
+            /* No new jobs? Wait and retry, because to be Helgrind
+             * (valgrind --tool=helgrind) what's needed is to take
+             * the same threads running instead to create/destroy threads
+             * as needed (otherwise valgrind will fail) */
+            unlockThreadedIO();
+            usleep(1); /* Give some time for the I/O thread to work. */
+            continue;
+#endif
             /* No new jobs in queue, exit. */
             redisLog(REDIS_DEBUG,"Thread %lld exiting, nothing to do",
                 (long long) pthread_self());
@@ -7666,7 +7724,7 @@ static void *IOThreadEntryPoint(void *arg) {
 static void spawnIOThread(void) {
     pthread_t thread;
 
-    pthread_create(&thread,NULL,IOThreadEntryPoint,NULL);
+    pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL);
     server.io_active_threads++;
 }