From: antirez Date: Fri, 16 Sep 2011 13:44:14 +0000 (+0200) Subject: Merge remote-tracking branch 'origin/unstable' into bg-aof-2 X-Git-Url: https://git.saurik.com/redis.git/commitdiff_plain/37183f14e4034c77283b5246d274cc54a6a90cda?hp=dcdfd005a0133a347cc0aae54c690cd8c845fed7 Merge remote-tracking branch 'origin/unstable' into bg-aof-2 --- diff --git a/src/aof.c b/src/aof.c index 42f153be..5417e76b 100644 --- a/src/aof.c +++ b/src/aof.c @@ -11,10 +11,14 @@ void aofUpdateCurrentSize(void); +void aof_background_fsync(int fd) { + bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL); +} + /* Called when the user switches from "appendonly yes" to "appendonly no" * at runtime using the CONFIG command. */ void stopAppendOnly(void) { - flushAppendOnlyFile(); + flushAppendOnlyFile(1); aof_fsync(server.appendfd); close(server.appendfd); @@ -59,12 +63,50 @@ int startAppendOnly(void) { * and the only way the client socket can get a write is entering when the * the event loop, we accumulate all the AOF writes in a memory * buffer and write it on disk using this function just before entering - * the event loop again. */ -void flushAppendOnlyFile(void) { + * the event loop again. + * + * About the 'force' argument: + * + * When the fsync policy is set to 'everysec' we may delay the flush if there + * is still an fsync() going on in the background thread, since for instance + * on Linux write(2) will be blocked by the background fsync anyway. + * When this happens we remember that there is some aof buffer to be + * flushed ASAP, and will try to do that in the serverCron() function. + * + * However if force is set to 1 we'll write regardless of the background + * fsync. */ +void flushAppendOnlyFile(int force) { ssize_t nwritten; + int sync_in_progress = 0; if (sdslen(server.aofbuf) == 0) return; + if (server.appendfsync == APPENDFSYNC_EVERYSEC) + sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0; + + if (server.appendfsync == APPENDFSYNC_EVERYSEC && !force) { + /* With this append fsync policy we do background fsyncing. + * If the fsync is still in progress we can try to delay + * the write for a couple of seconds. */ + if (sync_in_progress) { + if (server.aof_flush_postponed_start == 0) { + /* No previous write postponinig, remember that we are + * postponing the flush and return. */ + server.aof_flush_postponed_start = server.unixtime; + return; + } else if (server.unixtime - server.aof_flush_postponed_start < 2) { + /* We were already writing for fsync to finish, but for less + * than two seconds this is still ok. Postpone again. */ + return; + } + /* Otherwise fall trough, and go write since we can't wait + * over two seconds. */ + } + } + /* If you are following this code path, then we are going to write so + * set reset the postponed flush sentinel to zero. */ + server.aof_flush_postponed_start = 0; + /* We want to perform a single write. This should be guaranteed atomic * at least if the filesystem we are writing is a real physical one. * While this will save us against the server being killed I don't think @@ -100,14 +142,15 @@ void flushAppendOnlyFile(void) { return; /* Perform the fsync if needed. */ - if (server.appendfsync == APPENDFSYNC_ALWAYS || - (server.appendfsync == APPENDFSYNC_EVERYSEC && - server.unixtime > server.lastfsync)) - { + if (server.appendfsync == APPENDFSYNC_ALWAYS) { /* aof_fsync is defined as fdatasync() for Linux in order to avoid * flushing metadata. */ aof_fsync(server.appendfd); /* Let's try to get this data on the disk */ server.lastfsync = server.unixtime; + } else if ((server.appendfsync == APPENDFSYNC_EVERYSEC && + server.unixtime > server.lastfsync)) { + if (!sync_in_progress) aof_background_fsync(server.appendfd); + server.lastfsync = server.unixtime; } } @@ -762,7 +805,10 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { /* AOF enabled, replace the old fd with the new one. */ oldfd = server.appendfd; server.appendfd = newfd; - if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(newfd); + if (server.appendfsync == APPENDFSYNC_ALWAYS) + aof_fsync(newfd); + else if (server.appendfsync == APPENDFSYNC_EVERYSEC) + aof_background_fsync(newfd); server.appendseldb = -1; /* Make sure SELECT is re-issued */ aofUpdateCurrentSize(); server.auto_aofrewrite_base_size = server.appendonly_current_size; @@ -776,7 +822,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { redisLog(REDIS_NOTICE, "Background AOF rewrite successful"); /* Asynchronously close the overwritten AOF. */ - if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd); + if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); redisLog(REDIS_VERBOSE, "Background AOF rewrite signal handler took %lldus", ustime()-now); diff --git a/src/bio.c b/src/bio.c index 9c269583..9199bf23 100644 --- a/src/bio.c +++ b/src/bio.c @@ -16,7 +16,14 @@ * ------ * * The design is trivial, we have a structure representing a job to perform - * and a single thread performing all the I/O operations in the queue. + * and a different thread and job queue for every job type. + * Every thread wait for new jobs in its queue, and process every job + * sequentially. + * + * Jobs of the same type are guaranteed to be processed from the least + * recently inserted to the most recently inserted (older jobs processed + * first). + * * Currently there is no way for the creator of the job to be notified about * the completion of the operation, this will only be added when/if needed. */ @@ -24,15 +31,24 @@ #include "redis.h" #include "bio.h" -static pthread_mutex_t bio_mutex; -static pthread_cond_t bio_condvar; -static list *bio_jobs; +static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; +static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS]; +static list *bio_jobs[REDIS_BIO_NUM_OPS]; +/* The following array is used to hold the number of pending jobs for every + * OP type. This allows us to export the bioPendingJobsOfType() API that is + * useful when the main thread wants to perform some operation that may involve + * objects shared with the background thread. The main thread will just wait + * that there are no longer jobs of this type to be executed before performing + * the sensible operation. This data is also useful for reporting. */ +static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; /* This structure represents a background Job. It is only used locally to this * file as the API deos not expose the internals at all. */ struct bio_job { - int type; /* Job type, for instance BIO_JOB_CLOSE */ - void *data; /* Job specific arguments pointer. */ + time_t time; /* Time at which the job was created. */ + /* Job specific arguments pointers. If we need to pass more than three + * arguments we can just pass a pointer to a structure or alike. */ + void *arg1, *arg2, *arg3; }; void *bioProcessBackgroundJobs(void *arg); @@ -46,10 +62,15 @@ void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; - - pthread_mutex_init(&bio_mutex,NULL); - pthread_cond_init(&bio_condvar,NULL); - bio_jobs = listCreate(); + int j; + + /* Initialization of state vars and objects */ + for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { + pthread_mutex_init(&bio_mutex[j],NULL); + pthread_cond_init(&bio_condvar[j],NULL); + bio_jobs[j] = listCreate(); + bio_pending[j] = 0; + } /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); @@ -58,49 +79,58 @@ void bioInit(void) { while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); - /* Ready to spawn our thread */ - if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,NULL) != 0) { - redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); - exit(1); + /* Ready to spawn our threads. We use the single argument the thread + * function accepts in order to pass the job ID the thread is + * responsible of. */ + for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { + void *arg = (void*)(unsigned long) j; + if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { + redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); + exit(1); + } } } -void bioCreateBackgroundJob(int type, void *data) { +void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { struct bio_job *job = zmalloc(sizeof(*job)); - job->type = type; - job->data = data; - pthread_mutex_lock(&bio_mutex); - listAddNodeTail(bio_jobs,job); - pthread_cond_signal(&bio_condvar); - pthread_mutex_unlock(&bio_mutex); + job->time = time(NULL); + job->arg1 = arg1; + job->arg2 = arg2; + job->arg3 = arg3; + pthread_mutex_lock(&bio_mutex[type]); + listAddNodeTail(bio_jobs[type],job); + bio_pending[type]++; + pthread_cond_signal(&bio_condvar[type]); + pthread_mutex_unlock(&bio_mutex[type]); } void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; - REDIS_NOTUSED(arg); + unsigned long type = (unsigned long) arg; pthread_detach(pthread_self()); - pthread_mutex_lock(&bio_mutex); + pthread_mutex_lock(&bio_mutex[type]); while(1) { listNode *ln; /* The loop always starts with the lock hold. */ - if (listLength(bio_jobs) == 0) { - pthread_cond_wait(&bio_condvar,&bio_mutex); + if (listLength(bio_jobs[type]) == 0) { + pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. */ - ln = listFirst(bio_jobs); + ln = listFirst(bio_jobs[type]); job = ln->value; - listDelNode(bio_jobs,ln); /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ - pthread_mutex_unlock(&bio_mutex); + pthread_mutex_unlock(&bio_mutex[type]); /* Process the job accordingly to its type. */ - if (job->type == REDIS_BIO_CLOSE_FILE) { - close((long)job->data); + if (type == REDIS_BIO_CLOSE_FILE) { + close((long)job->arg1); + } else if (type == REDIS_BIO_AOF_FSYNC) { + fsync((long)job->arg1); } else { redisPanic("Wrong job type in bioProcessBackgroundJobs()."); } @@ -108,6 +138,65 @@ void *bioProcessBackgroundJobs(void *arg) { /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ - pthread_mutex_lock(&bio_mutex); + pthread_mutex_lock(&bio_mutex[type]); + listDelNode(bio_jobs[type],ln); + bio_pending[type]--; + } +} + +/* Return the number of pending jobs of the specified type. */ +unsigned long long bioPendingJobsOfType(int type) { + unsigned long long val; + pthread_mutex_lock(&bio_mutex[type]); + val = bio_pending[type]; + pthread_mutex_unlock(&bio_mutex[type]); + return val; +} + +/* Wait until the number of pending jobs of the specified type are + * less or equal to the specified number. + * + * This function may block for long time, it should only be used to perform + * the following tasks: + * + * 1) To avoid that the main thread is pushing jobs of a given time so fast + * that the background thread can't process them at the same speed. + * So before creating a new job of a given type the main thread should + * call something like: bioWaitPendingJobsLE(job_type,10000); + * 2) In order to perform special operations that make it necessary to be sure + * no one is touching shared resourced in the background. + */ +void bioWaitPendingJobsLE(int type, unsigned long long num) { + unsigned long long iteration = 0; + + /* We poll the jobs queue aggressively to start, and gradually relax + * the polling speed if it is going to take too much time. */ + while(1) { + iteration++; + if (iteration > 1000 && iteration <= 10000) { + usleep(100); + } else if (iteration > 10000) { + usleep(1000); + } + if (bioPendingJobsOfType(type) <= num) break; } } + +/* Return the older job of the specified type. */ +time_t bioOlderJobOfType(int type) { + time_t time; + listNode *ln; + struct bio_job *job; + + pthread_mutex_lock(&bio_mutex[type]); + ln = listFirst(bio_jobs[type]); + if (ln == NULL) { + pthread_mutex_unlock(&bio_mutex[type]); + return 0; + } + job = ln->value; + time = job->time; + pthread_mutex_unlock(&bio_mutex[type]); + return time; +} + diff --git a/src/bio.h b/src/bio.h index 3721fa2b..22a9b33e 100644 --- a/src/bio.h +++ b/src/bio.h @@ -1,6 +1,11 @@ /* Exported API */ void bioInit(void); -void bioCreateBackgroundJob(int type, void *data); +void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); +unsigned long long bioPendingJobsOfType(int type); +void bioWaitPendingJobsLE(int type, unsigned long long num); +time_t bioOlderJobOfType(int type); /* Background job opcodes */ -#define REDIS_BIO_CLOSE_FILE 1 +#define REDIS_BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */ +#define REDIS_BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */ +#define REDIS_BIO_NUM_OPS 2 diff --git a/src/redis.c b/src/redis.c index 268398f0..af750582 100644 --- a/src/redis.c +++ b/src/redis.c @@ -697,6 +697,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } + + /* If we postponed an AOF buffer flush, let's try to do it every time the + * cron function is called. */ + if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); + /* Expire a few keys per cycle, only if this is a master. * On slaves we wait for DEL operations synthesized by the master * in order to guarantee a strict consistency. */ @@ -735,7 +740,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { } /* Write the AOF buffer on disk */ - flushAppendOnlyFile(); + flushAppendOnlyFile(0); } /* =========================== Server initialization ======================== */ @@ -822,6 +827,7 @@ void initServerConfig() { server.lastfsync = time(NULL); server.appendfd = -1; server.appendseldb = -1; /* Make sure the first time will not match */ + server.aof_flush_postponed_start = 0; server.pidfile = zstrdup("/var/run/redis.pid"); server.dbfilename = zstrdup("dump.rdb"); server.appendfilename = zstrdup("appendonly.aof"); diff --git a/src/redis.h b/src/redis.h index 768322c3..e754918d 100644 --- a/src/redis.h +++ b/src/redis.h @@ -559,6 +559,7 @@ struct redisServer { time_t lastfsync; int appendfd; int appendseldb; + time_t aof_flush_postponed_start; char *pidfile; pid_t bgsavechildpid; pid_t bgrewritechildpid; @@ -870,7 +871,7 @@ int rdbSaveType(FILE *fp, unsigned char type); int rdbSaveLen(FILE *fp, uint32_t len); /* AOF persistence */ -void flushAppendOnlyFile(void); +void flushAppendOnlyFile(int force); void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc); void aofRemoveTempFile(pid_t childpid); int rewriteAppendOnlyFileBackground(void);