]>
Commit | Line | Data |
---|---|---|
02925dd9 | 1 | /* Background I/O service for Redis. |
2 | * | |
3 | * This file implements operations that we need to perform in the background. | |
4 | * Currently there is only a single operation, that is a background close(2) | |
5 | * system call. This is needed as when the process is the last owner of a | |
6 | * reference to a file closing it means unlinking it, and the deletion of the | |
7 | * file is slow, blocking the server. | |
8 | * | |
9 | * In the future we'll either continue implementing new things we need or | |
10 | * we'll switch to libeio. However there are probably long term uses for this | |
11 | * file as we may want to put here Redis specific background tasks (for instance | |
12 | * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL | |
13 | * implementation). | |
14 | * | |
15 | * DESIGN | |
16 | * ------ | |
17 | * | |
18 | * The design is trivial, we have a structure representing a job to perform | |
50be9b97 | 19 | * and a different thread and job queue for every job type. |
20 | * Every thread wait for new jobs in its queue, and process every job | |
21 | * sequentially. | |
22 | * | |
fbb23ce4 | 23 | * Jobs of the same type are guaranteed to be processed from the least |
24 | * recently inserted to the most recently inserted (older jobs processed | |
25 | * first). | |
26 | * | |
02925dd9 | 27 | * Currently there is no way for the creator of the job to be notified about |
28 | * the completion of the operation, this will only be added when/if needed. | |
29 | */ | |
30 | ||
31 | #include "redis.h" | |
32 | #include "bio.h" | |
33 | ||
50be9b97 | 34 | static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; |
35 | static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS]; | |
36 | static list *bio_jobs[REDIS_BIO_NUM_OPS]; | |
fde4e4c4 | 37 | /* The following array is used to hold the number of pending jobs for every |
38 | * OP type. This allows us to export the bioPendingJobsOfType() API that is | |
39 | * useful when the main thread wants to perform some operation that may involve | |
40 | * objects shared with the background thread. The main thread will just wait | |
41 | * that there are no longer jobs of this type to be executed before performing | |
42 | * the sensible operation. This data is also useful for reporting. */ | |
50be9b97 | 43 | static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; |
02925dd9 | 44 | |
45 | /* This structure represents a background Job. It is only used locally to this | |
46 | * file as the API deos not expose the internals at all. */ | |
47 | struct bio_job { | |
50be9b97 | 48 | time_t time; /* Time at which the job was created. */ |
49 | /* Job specific arguments pointers. If we need to pass more than three | |
50 | * arguments we can just pass a pointer to a structure or alike. */ | |
51 | void *arg1, *arg2, *arg3; | |
f81a5f54 | 52 | }; |
02925dd9 | 53 | |
54 | void *bioProcessBackgroundJobs(void *arg); | |
55 | ||
f81a5f54 | 56 | /* Make sure we have enough stack to perform all the things we do in the |
57 | * main thread. */ | |
58 | #define REDIS_THREAD_STACK_SIZE (1024*1024*4) | |
59 | ||
02925dd9 | 60 | /* Initialize the background system, spawning the thread. */ |
61 | void bioInit(void) { | |
62 | pthread_attr_t attr; | |
63 | pthread_t thread; | |
64 | size_t stacksize; | |
fde4e4c4 | 65 | int j; |
02925dd9 | 66 | |
fde4e4c4 | 67 | /* Initialization of state vars and objects */ |
50be9b97 | 68 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { |
69 | pthread_mutex_init(&bio_mutex[j],NULL); | |
70 | pthread_cond_init(&bio_condvar[j],NULL); | |
71 | bio_jobs[j] = listCreate(); | |
72 | bio_pending[j] = 0; | |
73 | } | |
02925dd9 | 74 | |
75 | /* Set the stack size as by default it may be small in some system */ | |
76 | pthread_attr_init(&attr); | |
f81a5f54 | 77 | pthread_attr_getstacksize(&attr,&stacksize); |
02925dd9 | 78 | if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ |
79 | while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; | |
80 | pthread_attr_setstacksize(&attr, stacksize); | |
81 | ||
50be9b97 | 82 | /* Ready to spawn our threads. We use the single argument the thread |
83 | * function accepts in order to pass the job ID the thread is | |
84 | * responsible of. */ | |
85 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { | |
86 | void *arg = (void*)(unsigned long) j; | |
87 | if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { | |
88 | redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); | |
89 | exit(1); | |
90 | } | |
02925dd9 | 91 | } |
92 | } | |
93 | ||
50be9b97 | 94 | void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { |
02925dd9 | 95 | struct bio_job *job = zmalloc(sizeof(*job)); |
96 | ||
50be9b97 | 97 | job->time = time(NULL); |
98 | job->arg1 = arg1; | |
99 | job->arg2 = arg2; | |
100 | job->arg3 = arg3; | |
101 | pthread_mutex_lock(&bio_mutex[type]); | |
102 | listAddNodeTail(bio_jobs[type],job); | |
fde4e4c4 | 103 | bio_pending[type]++; |
50be9b97 | 104 | pthread_cond_signal(&bio_condvar[type]); |
105 | pthread_mutex_unlock(&bio_mutex[type]); | |
02925dd9 | 106 | } |
107 | ||
108 | void *bioProcessBackgroundJobs(void *arg) { | |
109 | struct bio_job *job; | |
50be9b97 | 110 | unsigned long type = (unsigned long) arg; |
02925dd9 | 111 | |
112 | pthread_detach(pthread_self()); | |
50be9b97 | 113 | pthread_mutex_lock(&bio_mutex[type]); |
02925dd9 | 114 | while(1) { |
115 | listNode *ln; | |
116 | ||
117 | /* The loop always starts with the lock hold. */ | |
50be9b97 | 118 | if (listLength(bio_jobs[type]) == 0) { |
119 | pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); | |
02925dd9 | 120 | continue; |
121 | } | |
122 | /* Pop the job from the queue. */ | |
50be9b97 | 123 | ln = listFirst(bio_jobs[type]); |
02925dd9 | 124 | job = ln->value; |
02925dd9 | 125 | /* It is now possible to unlock the background system as we know have |
126 | * a stand alone job structure to process.*/ | |
50be9b97 | 127 | pthread_mutex_unlock(&bio_mutex[type]); |
02925dd9 | 128 | |
129 | /* Process the job accordingly to its type. */ | |
fde4e4c4 | 130 | if (type == REDIS_BIO_CLOSE_FILE) { |
50be9b97 | 131 | close((long)job->arg1); |
9fc1e1b1 | 132 | } else if (type == REDIS_BIO_AOF_FSYNC) { |
a60b397b | 133 | aof_fsync((long)job->arg1); |
02925dd9 | 134 | } else { |
135 | redisPanic("Wrong job type in bioProcessBackgroundJobs()."); | |
136 | } | |
137 | zfree(job); | |
138 | ||
139 | /* Lock again before reiterating the loop, if there are no longer | |
140 | * jobs to process we'll block again in pthread_cond_wait(). */ | |
50be9b97 | 141 | pthread_mutex_lock(&bio_mutex[type]); |
1317b7c2 | 142 | listDelNode(bio_jobs[type],ln); |
fde4e4c4 | 143 | bio_pending[type]--; |
144 | } | |
145 | } | |
146 | ||
147 | /* Return the number of pending jobs of the specified type. */ | |
148 | unsigned long long bioPendingJobsOfType(int type) { | |
149 | unsigned long long val; | |
50be9b97 | 150 | pthread_mutex_lock(&bio_mutex[type]); |
fde4e4c4 | 151 | val = bio_pending[type]; |
50be9b97 | 152 | pthread_mutex_unlock(&bio_mutex[type]); |
fde4e4c4 | 153 | return val; |
154 | } | |
155 | ||
156 | /* Wait until the number of pending jobs of the specified type are | |
157 | * less or equal to the specified number. | |
158 | * | |
159 | * This function may block for long time, it should only be used to perform | |
91de5421 | 160 | * the following tasks: |
161 | * | |
162 | * 1) To avoid that the main thread is pushing jobs of a given time so fast | |
163 | * that the background thread can't process them at the same speed. | |
164 | * So before creating a new job of a given type the main thread should | |
165 | * call something like: bioWaitPendingJobsLE(job_type,10000); | |
166 | * 2) In order to perform special operations that make it necessary to be sure | |
167 | * no one is touching shared resourced in the background. | |
168 | */ | |
fde4e4c4 | 169 | void bioWaitPendingJobsLE(int type, unsigned long long num) { |
170 | unsigned long long iteration = 0; | |
171 | ||
172 | /* We poll the jobs queue aggressively to start, and gradually relax | |
173 | * the polling speed if it is going to take too much time. */ | |
174 | while(1) { | |
175 | iteration++; | |
176 | if (iteration > 1000 && iteration <= 10000) { | |
177 | usleep(100); | |
178 | } else if (iteration > 10000) { | |
179 | usleep(1000); | |
180 | } | |
181 | if (bioPendingJobsOfType(type) <= num) break; | |
02925dd9 | 182 | } |
183 | } | |
50be9b97 | 184 | |
185 | /* Return the older job of the specified type. */ | |
186 | time_t bioOlderJobOfType(int type) { | |
187 | time_t time; | |
188 | listNode *ln; | |
189 | struct bio_job *job; | |
190 | ||
191 | pthread_mutex_lock(&bio_mutex[type]); | |
192 | ln = listFirst(bio_jobs[type]); | |
b39a4d0b | 193 | if (ln == NULL) { |
194 | pthread_mutex_unlock(&bio_mutex[type]); | |
195 | return 0; | |
196 | } | |
50be9b97 | 197 | job = ln->value; |
198 | time = job->time; | |
199 | pthread_mutex_unlock(&bio_mutex[type]); | |
200 | return time; | |
201 | } | |
202 |