]> git.saurik.com Git - redis.git/blame - src/bio.c
Use a different thread for every different type of background job
[redis.git] / src / bio.c
CommitLineData
02925dd9 1/* Background I/O service for Redis.
2 *
3 * This file implements operations that we need to perform in the background.
4 * Currently there is only a single operation, that is a background close(2)
5 * system call. This is needed as when the process is the last owner of a
6 * reference to a file closing it means unlinking it, and the deletion of the
7 * file is slow, blocking the server.
8 *
9 * In the future we'll either continue implementing new things we need or
10 * we'll switch to libeio. However there are probably long term uses for this
11 * file as we may want to put here Redis specific background tasks (for instance
12 * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
13 * implementation).
14 *
15 * DESIGN
16 * ------
17 *
18 * The design is trivial, we have a structure representing a job to perform
50be9b97 19 * and a different thread and job queue for every job type.
20 * Every thread wait for new jobs in its queue, and process every job
21 * sequentially.
22 *
02925dd9 23 * Currently there is no way for the creator of the job to be notified about
24 * the completion of the operation, this will only be added when/if needed.
25 */
26
27#include "redis.h"
28#include "bio.h"
29
50be9b97 30static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
31static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
32static list *bio_jobs[REDIS_BIO_NUM_OPS];
fde4e4c4 33/* The following array is used to hold the number of pending jobs for every
34 * OP type. This allows us to export the bioPendingJobsOfType() API that is
35 * useful when the main thread wants to perform some operation that may involve
36 * objects shared with the background thread. The main thread will just wait
37 * that there are no longer jobs of this type to be executed before performing
38 * the sensible operation. This data is also useful for reporting. */
50be9b97 39static unsigned long long bio_pending[REDIS_BIO_NUM_OPS];
02925dd9 40
41/* This structure represents a background Job. It is only used locally to this
42 * file as the API deos not expose the internals at all. */
43struct bio_job {
50be9b97 44 time_t time; /* Time at which the job was created. */
45 /* Job specific arguments pointers. If we need to pass more than three
46 * arguments we can just pass a pointer to a structure or alike. */
47 void *arg1, *arg2, *arg3;
f81a5f54 48};
02925dd9 49
50void *bioProcessBackgroundJobs(void *arg);
51
f81a5f54 52/* Make sure we have enough stack to perform all the things we do in the
53 * main thread. */
54#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
55
02925dd9 56/* Initialize the background system, spawning the thread. */
57void bioInit(void) {
58 pthread_attr_t attr;
59 pthread_t thread;
60 size_t stacksize;
fde4e4c4 61 int j;
02925dd9 62
fde4e4c4 63 /* Initialization of state vars and objects */
50be9b97 64 for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
65 pthread_mutex_init(&bio_mutex[j],NULL);
66 pthread_cond_init(&bio_condvar[j],NULL);
67 bio_jobs[j] = listCreate();
68 bio_pending[j] = 0;
69 }
02925dd9 70
71 /* Set the stack size as by default it may be small in some system */
72 pthread_attr_init(&attr);
f81a5f54 73 pthread_attr_getstacksize(&attr,&stacksize);
02925dd9 74 if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
75 while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
76 pthread_attr_setstacksize(&attr, stacksize);
77
50be9b97 78 /* Ready to spawn our threads. We use the single argument the thread
79 * function accepts in order to pass the job ID the thread is
80 * responsible of. */
81 for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
82 void *arg = (void*)(unsigned long) j;
83 if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
84 redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
85 exit(1);
86 }
02925dd9 87 }
88}
89
50be9b97 90void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
02925dd9 91 struct bio_job *job = zmalloc(sizeof(*job));
92
50be9b97 93 job->time = time(NULL);
94 job->arg1 = arg1;
95 job->arg2 = arg2;
96 job->arg3 = arg3;
97 pthread_mutex_lock(&bio_mutex[type]);
98 listAddNodeTail(bio_jobs[type],job);
fde4e4c4 99 bio_pending[type]++;
50be9b97 100 pthread_cond_signal(&bio_condvar[type]);
101 pthread_mutex_unlock(&bio_mutex[type]);
02925dd9 102}
103
104void *bioProcessBackgroundJobs(void *arg) {
105 struct bio_job *job;
50be9b97 106 unsigned long type = (unsigned long) arg;
02925dd9 107
108 pthread_detach(pthread_self());
50be9b97 109 pthread_mutex_lock(&bio_mutex[type]);
02925dd9 110 while(1) {
111 listNode *ln;
112
113 /* The loop always starts with the lock hold. */
50be9b97 114 if (listLength(bio_jobs[type]) == 0) {
115 pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
02925dd9 116 continue;
117 }
118 /* Pop the job from the queue. */
50be9b97 119 ln = listFirst(bio_jobs[type]);
02925dd9 120 job = ln->value;
50be9b97 121 listDelNode(bio_jobs[type],ln);
02925dd9 122 /* It is now possible to unlock the background system as we know have
123 * a stand alone job structure to process.*/
50be9b97 124 pthread_mutex_unlock(&bio_mutex[type]);
02925dd9 125
126 /* Process the job accordingly to its type. */
fde4e4c4 127 if (type == REDIS_BIO_CLOSE_FILE) {
50be9b97 128 close((long)job->arg1);
02925dd9 129 } else {
130 redisPanic("Wrong job type in bioProcessBackgroundJobs().");
131 }
132 zfree(job);
133
134 /* Lock again before reiterating the loop, if there are no longer
135 * jobs to process we'll block again in pthread_cond_wait(). */
50be9b97 136 pthread_mutex_lock(&bio_mutex[type]);
fde4e4c4 137 bio_pending[type]--;
138 }
139}
140
141/* Return the number of pending jobs of the specified type. */
142unsigned long long bioPendingJobsOfType(int type) {
143 unsigned long long val;
50be9b97 144 pthread_mutex_lock(&bio_mutex[type]);
fde4e4c4 145 val = bio_pending[type];
50be9b97 146 pthread_mutex_unlock(&bio_mutex[type]);
fde4e4c4 147 return val;
148}
149
150/* Wait until the number of pending jobs of the specified type are
151 * less or equal to the specified number.
152 *
153 * This function may block for long time, it should only be used to perform
91de5421 154 * the following tasks:
155 *
156 * 1) To avoid that the main thread is pushing jobs of a given time so fast
157 * that the background thread can't process them at the same speed.
158 * So before creating a new job of a given type the main thread should
159 * call something like: bioWaitPendingJobsLE(job_type,10000);
160 * 2) In order to perform special operations that make it necessary to be sure
161 * no one is touching shared resourced in the background.
162 */
fde4e4c4 163void bioWaitPendingJobsLE(int type, unsigned long long num) {
164 unsigned long long iteration = 0;
165
166 /* We poll the jobs queue aggressively to start, and gradually relax
167 * the polling speed if it is going to take too much time. */
168 while(1) {
169 iteration++;
170 if (iteration > 1000 && iteration <= 10000) {
171 usleep(100);
172 } else if (iteration > 10000) {
173 usleep(1000);
174 }
175 if (bioPendingJobsOfType(type) <= num) break;
02925dd9 176 }
177}
50be9b97 178
179/* Return the older job of the specified type. */
180time_t bioOlderJobOfType(int type) {
181 time_t time;
182 listNode *ln;
183 struct bio_job *job;
184
185 pthread_mutex_lock(&bio_mutex[type]);
186 ln = listFirst(bio_jobs[type]);
187 job = ln->value;
188 time = job->time;
189 pthread_mutex_unlock(&bio_mutex[type]);
190 return time;
191}
192