X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/85a2775298a85b80ccaaf31082c479b7968158b1..b04a5df979ef9237a5114899e9a454c4e52fb5ac:/redis.c diff --git a/redis.c b/redis.c index 59981cff..7c87178d 100644 --- a/redis.c +++ b/redis.c @@ -75,6 +75,11 @@ #include "lzf.h" /* LZF compression library */ #include "pqsort.h" /* Partial qsort for SORT+LIMIT */ +/* #define REDIS_HELGRIND_FRIENDLY */ +#if defined(__GNUC__) && defined(REDIS_HELGRIND_FRIENDLY) +#warning "Remember to undef REDIS_HELGRIND_FRIENDLY before to commit" +#endif + /* Error codes */ #define REDIS_OK 0 #define REDIS_ERR -1 @@ -164,6 +169,7 @@ #define REDIS_VM_MAX_NEAR_PAGES 65536 #define REDIS_VM_MAX_RANDOM_JUMP 4096 #define REDIS_VM_MAX_THREADS 32 +#define REDIS_THREAD_STACK_SIZE (1024*1024*4) /* The following is the number of completed I/O jobs to process when the * handelr is called. 1 is the minimum, and also the default, as it allows * to block as little as possible other accessing clients. While Virtual @@ -403,6 +409,7 @@ struct redisServer { pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */ pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */ + pthread_attr_t io_threads_attr; /* attributes for threads creation */ int io_active_threads; /* Number of running I/O threads */ int vm_max_threads; /* Max number of I/O threads running at the same time */ /* Our main thread is blocked on the event loop, locking for sockets ready @@ -2417,6 +2424,10 @@ static robj *createObject(int type, void *ptr) { o->ptr = ptr; o->refcount = 1; if (server.vm_enabled) { + /* Note that this code may run in the context of an I/O thread + * and accessing to server.unixtime in theory is an error + * (no locks). But in practice this is safe, and even if we read + * garbage Redis will not fail, as it's just a statistical info */ o->vm.atime = server.unixtime; o->storage = REDIS_VM_MEMORY; } @@ -5629,6 +5640,7 @@ static sds genRedisInfoString(void) { ); } if (server.vm_enabled) { + lockThreadedIO(); info = sdscatprintf(info, "vm_conf_max_memory:%llu\r\n" "vm_conf_page_size:%llu\r\n" @@ -5655,6 +5667,7 @@ static sds genRedisInfoString(void) { (unsigned long) listLength(server.io_clients), (unsigned long) server.io_active_threads ); + unlockThreadedIO(); } for (j = 0; j < server.dbnum; j++) { long long keys, vkeys; @@ -6984,6 +6997,7 @@ static void aofRemoveTempFile(pid_t childpid) { static void vmInit(void) { off_t totsize; int pipefds[2]; + size_t stacksize; server.vm_fp = fopen("/tmp/redisvm","w+b"); if (server.vm_fp == NULL) { @@ -7031,6 +7045,11 @@ static void vmInit(void) { server.io_ready_pipe_read = pipefds[0]; server.io_ready_pipe_write = pipefds[1]; redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR); + /* LZF requires a lot of stack */ + pthread_attr_init(&server.io_threads_attr); + pthread_attr_getstacksize(&server.io_threads_attr, &stacksize); + while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; + pthread_attr_setstacksize(&server.io_threads_attr, stacksize); /* Listen for events in the threaded I/O pipe */ if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE, vmThreadedIOCompletedJob, NULL) == AE_ERR) @@ -7347,7 +7366,14 @@ static int vmSwapOneObject(int usethreads) { de = dictGetRandomKey(db->dict); key = dictGetEntryKey(de); val = dictGetEntryVal(de); - if (key->storage != REDIS_VM_MEMORY) { + /* Only swap objects that are currently in memory. + * + * Also don't swap shared objects if threaded VM is on, as we + * try to ensure that the main thread does not touch the + * object while the I/O thread is using it, but we can't + * control other keys without adding additional mutex. */ + if (key->storage != REDIS_VM_MEMORY || + (server.vm_max_threads != 0 && val->refcount != 1)) { if (maxtries) i--; /* don't count this try */ continue; } @@ -7449,10 +7475,10 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, struct dictEntry *de; redisLog(REDIS_DEBUG,"Processing I/O completed job"); - assert(listLength(server.io_processed) != 0); /* Get the processed element (the oldest one) */ lockThreadedIO(); + assert(listLength(server.io_processed) != 0); ln = listFirst(server.io_processed); j = ln->value; listDelNode(server.io_processed,ln); @@ -7565,6 +7591,7 @@ static void vmCancelThreadedIOJob(robj *o) { int i; assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING); +again: lockThreadedIO(); /* Search for a matching key in one of the queues */ for (i = 0; i < 3; i++) { @@ -7577,8 +7604,14 @@ static void vmCancelThreadedIOJob(robj *o) { if (job->canceled) continue; /* Skip this, already canceled. */ if (compareStringObjects(job->key,o) == 0) { - redisLog(REDIS_DEBUG,"*** CANCELED %p (%s)\n", - (void*)job, (char*)o->ptr); + redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (LIST ID %d)\n", + (void*)job, (char*)o->ptr, i); + /* Mark the pages as free since the swap didn't happened + * or happened but is now discarded. */ + if (job->type == REDIS_IOJOB_DO_SWAP) + vmMarkPagesFree(job->page,job->pages); + /* Cancel the job. It depends on the list the job is + * living in. */ switch(i) { case 0: /* io_newjobs */ /* If the job was yet not processed the best thing to do @@ -7587,14 +7620,30 @@ static void vmCancelThreadedIOJob(robj *o) { listDelNode(lists[i],ln); break; case 1: /* io_processing */ + /* Oh Shi- the thread is messing with the Job, and + * probably with the object if this is a + * PREPARE_SWAP or DO_SWAP job. Better to wait for the + * job to move into the next queue... */ + if (job->type != REDIS_IOJOB_LOAD) { + /* Yes, we try again and again until the job + * is completed. */ + unlockThreadedIO(); + /* But let's wait some time for the I/O thread + * to finish with this job. After all this condition + * should be very rare. */ + usleep(1); + goto again; + } else { + job->canceled = 1; + break; + } case 2: /* io_processed */ + /* The job was already processed, that's easy... + * just mark it as canceled so that we'll ignore it + * when processing completed jobs. */ job->canceled = 1; break; } - /* Mark the pages as free since the swap didn't happened - * or happened but is not discarded. */ - if (job->type == REDIS_IOJOB_DO_SWAP) - vmMarkPagesFree(job->page,job->pages); /* Finally we have to adjust the storage type of the object * in order to "UNDO" the operaiton. */ if (o->storage == REDIS_VM_LOADING) @@ -7620,6 +7669,15 @@ static void *IOThreadEntryPoint(void *arg) { /* Get a new job to process */ lockThreadedIO(); if (listLength(server.io_newjobs) == 0) { +#ifdef REDIS_HELGRIND_FRIENDLY + /* No new jobs? Wait and retry, because to be Helgrind + * (valgrind --tool=helgrind) what's needed is to take + * the same threads running instead to create/destroy threads + * as needed (otherwise valgrind will fail) */ + unlockThreadedIO(); + usleep(1); /* Give some time for the I/O thread to work. */ + continue; +#endif /* No new jobs in queue, exit. */ redisLog(REDIS_DEBUG,"Thread %lld exiting, nothing to do", (long long) pthread_self()); @@ -7666,7 +7724,7 @@ static void *IOThreadEntryPoint(void *arg) { static void spawnIOThread(void) { pthread_t thread; - pthread_create(&thread,NULL,IOThreadEntryPoint,NULL); + pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL); server.io_active_threads++; }