]>
Commit | Line | Data |
---|---|---|
1 | /* Background I/O service for Redis. | |
2 | * | |
3 | * This file implements operations that we need to perform in the background. | |
4 | * Currently there is only a single operation, that is a background close(2) | |
5 | * system call. This is needed as when the process is the last owner of a | |
6 | * reference to a file closing it means unlinking it, and the deletion of the | |
7 | * file is slow, blocking the server. | |
8 | * | |
9 | * In the future we'll either continue implementing new things we need or | |
10 | * we'll switch to libeio. However there are probably long term uses for this | |
11 | * file as we may want to put here Redis specific background tasks (for instance | |
12 | * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL | |
13 | * implementation). | |
14 | * | |
15 | * DESIGN | |
16 | * ------ | |
17 | * | |
18 | * The design is trivial, we have a structure representing a job to perform | |
19 | * and a different thread and job queue for every job type. | |
20 | * Every thread wait for new jobs in its queue, and process every job | |
21 | * sequentially. | |
22 | * | |
23 | * Jobs of the same type are guaranteed to be processed from the least | |
24 | * recently inserted to the most recently inserted (older jobs processed | |
25 | * first). | |
26 | * | |
27 | * Currently there is no way for the creator of the job to be notified about | |
28 | * the completion of the operation, this will only be added when/if needed. | |
29 | * | |
30 | * ---------------------------------------------------------------------------- | |
31 | * | |
32 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> | |
33 | * All rights reserved. | |
34 | * | |
35 | * Redistribution and use in source and binary forms, with or without | |
36 | * modification, are permitted provided that the following conditions are met: | |
37 | * | |
38 | * * Redistributions of source code must retain the above copyright notice, | |
39 | * this list of conditions and the following disclaimer. | |
40 | * * Redistributions in binary form must reproduce the above copyright | |
41 | * notice, this list of conditions and the following disclaimer in the | |
42 | * documentation and/or other materials provided with the distribution. | |
43 | * * Neither the name of Redis nor the names of its contributors may be used | |
44 | * to endorse or promote products derived from this software without | |
45 | * specific prior written permission. | |
46 | * | |
47 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
48 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
49 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
50 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | |
51 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
52 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
53 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
54 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
55 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
56 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
57 | * POSSIBILITY OF SUCH DAMAGE. | |
58 | */ | |
59 | ||
60 | ||
61 | #include "redis.h" | |
62 | #include "bio.h" | |
63 | ||
64 | static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; | |
65 | static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS]; | |
66 | static list *bio_jobs[REDIS_BIO_NUM_OPS]; | |
67 | /* The following array is used to hold the number of pending jobs for every | |
68 | * OP type. This allows us to export the bioPendingJobsOfType() API that is | |
69 | * useful when the main thread wants to perform some operation that may involve | |
70 | * objects shared with the background thread. The main thread will just wait | |
71 | * that there are no longer jobs of this type to be executed before performing | |
72 | * the sensible operation. This data is also useful for reporting. */ | |
73 | static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; | |
74 | ||
75 | /* This structure represents a background Job. It is only used locally to this | |
76 | * file as the API deos not expose the internals at all. */ | |
77 | struct bio_job { | |
78 | time_t time; /* Time at which the job was created. */ | |
79 | /* Job specific arguments pointers. If we need to pass more than three | |
80 | * arguments we can just pass a pointer to a structure or alike. */ | |
81 | void *arg1, *arg2, *arg3; | |
82 | }; | |
83 | ||
84 | void *bioProcessBackgroundJobs(void *arg); | |
85 | ||
86 | /* Make sure we have enough stack to perform all the things we do in the | |
87 | * main thread. */ | |
88 | #define REDIS_THREAD_STACK_SIZE (1024*1024*4) | |
89 | ||
90 | /* Initialize the background system, spawning the thread. */ | |
91 | void bioInit(void) { | |
92 | pthread_attr_t attr; | |
93 | pthread_t thread; | |
94 | size_t stacksize; | |
95 | int j; | |
96 | ||
97 | /* Initialization of state vars and objects */ | |
98 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { | |
99 | pthread_mutex_init(&bio_mutex[j],NULL); | |
100 | pthread_cond_init(&bio_condvar[j],NULL); | |
101 | bio_jobs[j] = listCreate(); | |
102 | bio_pending[j] = 0; | |
103 | } | |
104 | ||
105 | /* Set the stack size as by default it may be small in some system */ | |
106 | pthread_attr_init(&attr); | |
107 | pthread_attr_getstacksize(&attr,&stacksize); | |
108 | if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ | |
109 | while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; | |
110 | pthread_attr_setstacksize(&attr, stacksize); | |
111 | ||
112 | /* Ready to spawn our threads. We use the single argument the thread | |
113 | * function accepts in order to pass the job ID the thread is | |
114 | * responsible of. */ | |
115 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { | |
116 | void *arg = (void*)(unsigned long) j; | |
117 | if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { | |
118 | redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); | |
119 | exit(1); | |
120 | } | |
121 | } | |
122 | } | |
123 | ||
124 | void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { | |
125 | struct bio_job *job = zmalloc(sizeof(*job)); | |
126 | ||
127 | job->time = time(NULL); | |
128 | job->arg1 = arg1; | |
129 | job->arg2 = arg2; | |
130 | job->arg3 = arg3; | |
131 | pthread_mutex_lock(&bio_mutex[type]); | |
132 | listAddNodeTail(bio_jobs[type],job); | |
133 | bio_pending[type]++; | |
134 | pthread_cond_signal(&bio_condvar[type]); | |
135 | pthread_mutex_unlock(&bio_mutex[type]); | |
136 | } | |
137 | ||
138 | void *bioProcessBackgroundJobs(void *arg) { | |
139 | struct bio_job *job; | |
140 | unsigned long type = (unsigned long) arg; | |
141 | sigset_t sigset; | |
142 | ||
143 | pthread_detach(pthread_self()); | |
144 | pthread_mutex_lock(&bio_mutex[type]); | |
145 | /* Block SIGALRM so we are sure that only the main thread will | |
146 | * receive the watchdog signal. */ | |
147 | sigemptyset(&sigset); | |
148 | sigaddset(&sigset, SIGALRM); | |
149 | if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) | |
150 | redisLog(REDIS_WARNING, | |
151 | "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); | |
152 | ||
153 | while(1) { | |
154 | listNode *ln; | |
155 | ||
156 | /* The loop always starts with the lock hold. */ | |
157 | if (listLength(bio_jobs[type]) == 0) { | |
158 | pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); | |
159 | continue; | |
160 | } | |
161 | /* Pop the job from the queue. */ | |
162 | ln = listFirst(bio_jobs[type]); | |
163 | job = ln->value; | |
164 | /* It is now possible to unlock the background system as we know have | |
165 | * a stand alone job structure to process.*/ | |
166 | pthread_mutex_unlock(&bio_mutex[type]); | |
167 | ||
168 | /* Process the job accordingly to its type. */ | |
169 | if (type == REDIS_BIO_CLOSE_FILE) { | |
170 | close((long)job->arg1); | |
171 | } else if (type == REDIS_BIO_AOF_FSYNC) { | |
172 | aof_fsync((long)job->arg1); | |
173 | } else { | |
174 | redisPanic("Wrong job type in bioProcessBackgroundJobs()."); | |
175 | } | |
176 | zfree(job); | |
177 | ||
178 | /* Lock again before reiterating the loop, if there are no longer | |
179 | * jobs to process we'll block again in pthread_cond_wait(). */ | |
180 | pthread_mutex_lock(&bio_mutex[type]); | |
181 | listDelNode(bio_jobs[type],ln); | |
182 | bio_pending[type]--; | |
183 | } | |
184 | } | |
185 | ||
186 | /* Return the number of pending jobs of the specified type. */ | |
187 | unsigned long long bioPendingJobsOfType(int type) { | |
188 | unsigned long long val; | |
189 | pthread_mutex_lock(&bio_mutex[type]); | |
190 | val = bio_pending[type]; | |
191 | pthread_mutex_unlock(&bio_mutex[type]); | |
192 | return val; | |
193 | } | |
194 | ||
195 | #if 0 /* We don't use the following code for now, and bioWaitPendingJobsLE | |
196 | probably needs a rewrite using conditional variables instead of the | |
197 | current implementation. */ | |
198 | ||
199 | ||
200 | /* Wait until the number of pending jobs of the specified type are | |
201 | * less or equal to the specified number. | |
202 | * | |
203 | * This function may block for long time, it should only be used to perform | |
204 | * the following tasks: | |
205 | * | |
206 | * 1) To avoid that the main thread is pushing jobs of a given time so fast | |
207 | * that the background thread can't process them at the same speed. | |
208 | * So before creating a new job of a given type the main thread should | |
209 | * call something like: bioWaitPendingJobsLE(job_type,10000); | |
210 | * 2) In order to perform special operations that make it necessary to be sure | |
211 | * no one is touching shared resourced in the background. | |
212 | */ | |
213 | void bioWaitPendingJobsLE(int type, unsigned long long num) { | |
214 | unsigned long long iteration = 0; | |
215 | ||
216 | /* We poll the jobs queue aggressively to start, and gradually relax | |
217 | * the polling speed if it is going to take too much time. */ | |
218 | while(1) { | |
219 | iteration++; | |
220 | if (iteration > 1000 && iteration <= 10000) { | |
221 | usleep(100); | |
222 | } else if (iteration > 10000) { | |
223 | usleep(1000); | |
224 | } | |
225 | if (bioPendingJobsOfType(type) <= num) break; | |
226 | } | |
227 | } | |
228 | ||
229 | /* Return the older job of the specified type. */ | |
230 | time_t bioOlderJobOfType(int type) { | |
231 | time_t time; | |
232 | listNode *ln; | |
233 | struct bio_job *job; | |
234 | ||
235 | pthread_mutex_lock(&bio_mutex[type]); | |
236 | ln = listFirst(bio_jobs[type]); | |
237 | if (ln == NULL) { | |
238 | pthread_mutex_unlock(&bio_mutex[type]); | |
239 | return 0; | |
240 | } | |
241 | job = ln->value; | |
242 | time = job->time; | |
243 | pthread_mutex_unlock(&bio_mutex[type]); | |
244 | return time; | |
245 | } | |
246 | ||
247 | #endif |