X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/02925dd96e3ad5e31a3cdd9abbc2415949de8700..95f68f7b0fc4ffc700361484b6c792a8e03f3a13:/src/bio.c diff --git a/src/bio.c b/src/bio.c index fc85afd8..0fec24d0 100644 --- a/src/bio.c +++ b/src/bio.c @@ -16,85 +16,166 @@ * ------ * * The design is trivial, we have a structure representing a job to perform - * and a single thread performing all the I/O operations in the queue. + * and a different thread and job queue for every job type. + * Every thread wait for new jobs in its queue, and process every job + * sequentially. + * + * Jobs of the same type are guaranteed to be processed from the least + * recently inserted to the most recently inserted (older jobs processed + * first). + * * Currently there is no way for the creator of the job to be notified about * the completion of the operation, this will only be added when/if needed. + * + * ---------------------------------------------------------------------------- + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. */ + #include "redis.h" #include "bio.h" -static pthread_mutex_t bio_mutex; -static pthread_cond_t bio_condvar; -list *bio_jobs; +static pthread_t bio_threads[REDIS_BIO_NUM_OPS]; +static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; +static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS]; +static list *bio_jobs[REDIS_BIO_NUM_OPS]; +/* The following array is used to hold the number of pending jobs for every + * OP type. This allows us to export the bioPendingJobsOfType() API that is + * useful when the main thread wants to perform some operation that may involve + * objects shared with the background thread. The main thread will just wait + * that there are no longer jobs of this type to be executed before performing + * the sensible operation. This data is also useful for reporting. */ +static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; /* This structure represents a background Job. It is only used locally to this * file as the API deos not expose the internals at all. */ struct bio_job { - int type; /* Job type, for instance BIO_JOB_CLOSE */ - void *data; /* Job specific arguments pointer. */ -} + time_t time; /* Time at which the job was created. */ + /* Job specific arguments pointers. If we need to pass more than three + * arguments we can just pass a pointer to a structure or alike. */ + void *arg1, *arg2, *arg3; +}; void *bioProcessBackgroundJobs(void *arg); +/* Make sure we have enough stack to perform all the things we do in the + * main thread. */ +#define REDIS_THREAD_STACK_SIZE (1024*1024*4) + /* Initialize the background system, spawning the thread. */ void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; + int j; - pthread_mutex_init(bio_mutex,NULL); - pthread_cond_init(bio_condvar,NULL); - bio_jobs = listCreate(); + /* Initialization of state vars and objects */ + for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { + pthread_mutex_init(&bio_mutex[j],NULL); + pthread_cond_init(&bio_condvar[j],NULL); + bio_jobs[j] = listCreate(); + bio_pending[j] = 0; + } /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); - pthread_attr_getstacksize(&attr); + pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); - /* Ready to spawn our thread */ - if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,NULL) != 0) { - redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); - exit(1); + /* Ready to spawn our threads. We use the single argument the thread + * function accepts in order to pass the job ID the thread is + * responsible of. */ + for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { + void *arg = (void*)(unsigned long) j; + if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { + redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); + exit(1); + } + bio_threads[j] = thread; } } -void bioCreateBackgroundJob(int type, void *data) { +void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { struct bio_job *job = zmalloc(sizeof(*job)); - job->type = type; - job->data = data; - pthread_mutex_lock(&bio_mutex); - listAddNodeTail(bio_jobs,job); - pthread_mutex_unlock(&bio_mutex); + job->time = time(NULL); + job->arg1 = arg1; + job->arg2 = arg2; + job->arg3 = arg3; + pthread_mutex_lock(&bio_mutex[type]); + listAddNodeTail(bio_jobs[type],job); + bio_pending[type]++; + pthread_cond_signal(&bio_condvar[type]); + pthread_mutex_unlock(&bio_mutex[type]); } void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; + unsigned long type = (unsigned long) arg; + sigset_t sigset; + + /* Make the thread killable at any time, so that bioKillThreads() + * can work reliably. */ + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); + + pthread_mutex_lock(&bio_mutex[type]); + /* Block SIGALRM so we are sure that only the main thread will + * receive the watchdog signal. */ + sigemptyset(&sigset); + sigaddset(&sigset, SIGALRM); + if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) + redisLog(REDIS_WARNING, + "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); - pthread_detach(pthread_self()); - pthread_mutex_lock(&bio_mutex); while(1) { listNode *ln; /* The loop always starts with the lock hold. */ - if (listLength(server.io_newjobs) == 0) { - pthread_cond_wait(&bio_condvar,&bio_mutex); + if (listLength(bio_jobs[type]) == 0) { + pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. */ - ln = listFirst(bio_jobs); + ln = listFirst(bio_jobs[type]); job = ln->value; - listDelNode(bio_jobs,ln); /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ - pthread_mutex_unlock(&bio_mutex); + pthread_mutex_unlock(&bio_mutex[type]); /* Process the job accordingly to its type. */ - if (job->type == REDIS_BIO_CLOSE_FILE) { - close((long)job->data); + if (type == REDIS_BIO_CLOSE_FILE) { + close((long)job->arg1); + } else if (type == REDIS_BIO_AOF_FSYNC) { + aof_fsync((long)job->arg1); } else { redisPanic("Wrong job type in bioProcessBackgroundJobs()."); } @@ -102,6 +183,38 @@ void *bioProcessBackgroundJobs(void *arg) { /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ - pthread_mutex_lock(&bio_mutex); + pthread_mutex_lock(&bio_mutex[type]); + listDelNode(bio_jobs[type],ln); + bio_pending[type]--; + } +} + +/* Return the number of pending jobs of the specified type. */ +unsigned long long bioPendingJobsOfType(int type) { + unsigned long long val; + pthread_mutex_lock(&bio_mutex[type]); + val = bio_pending[type]; + pthread_mutex_unlock(&bio_mutex[type]); + return val; +} + +/* Kill the running bio threads in an unclean way. This function should be + * used only when it's critical to stop the threads for some reason. + * Currently Redis does this only on crash (for instance on SIGSEGV) in order + * to perform a fast memory check without other threads messing with memory. */ +void bioKillThreads(void) { + int err, j; + + for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { + if (pthread_cancel(bio_threads[j]) == 0) { + if ((err = pthread_join(bio_threads[j],NULL)) != 0) { + redisLog(REDIS_WARNING, + "Bio thread for job type #%d can be joined: %s", + j, strerror(err)); + } else { + redisLog(REDIS_WARNING, + "Bio thread for job type #%d terminated",j); + } + } } }