]>
Commit | Line | Data |
---|---|---|
02925dd9 | 1 | /* Background I/O service for Redis. |
2 | * | |
3 | * This file implements operations that we need to perform in the background. | |
4 | * Currently there is only a single operation, that is a background close(2) | |
5 | * system call. This is needed as when the process is the last owner of a | |
6 | * reference to a file closing it means unlinking it, and the deletion of the | |
7 | * file is slow, blocking the server. | |
8 | * | |
9 | * In the future we'll either continue implementing new things we need or | |
10 | * we'll switch to libeio. However there are probably long term uses for this | |
11 | * file as we may want to put here Redis specific background tasks (for instance | |
12 | * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL | |
13 | * implementation). | |
14 | * | |
15 | * DESIGN | |
16 | * ------ | |
17 | * | |
18 | * The design is trivial, we have a structure representing a job to perform | |
50be9b97 | 19 | * and a different thread and job queue for every job type. |
20 | * Every thread wait for new jobs in its queue, and process every job | |
21 | * sequentially. | |
22 | * | |
fbb23ce4 | 23 | * Jobs of the same type are guaranteed to be processed from the least |
24 | * recently inserted to the most recently inserted (older jobs processed | |
25 | * first). | |
26 | * | |
02925dd9 | 27 | * Currently there is no way for the creator of the job to be notified about |
28 | * the completion of the operation, this will only be added when/if needed. | |
4365e5b2 | 29 | * |
30 | * ---------------------------------------------------------------------------- | |
31 | * | |
32 | * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> | |
33 | * All rights reserved. | |
34 | * | |
35 | * Redistribution and use in source and binary forms, with or without | |
36 | * modification, are permitted provided that the following conditions are met: | |
37 | * | |
38 | * * Redistributions of source code must retain the above copyright notice, | |
39 | * this list of conditions and the following disclaimer. | |
40 | * * Redistributions in binary form must reproduce the above copyright | |
41 | * notice, this list of conditions and the following disclaimer in the | |
42 | * documentation and/or other materials provided with the distribution. | |
43 | * * Neither the name of Redis nor the names of its contributors may be used | |
44 | * to endorse or promote products derived from this software without | |
45 | * specific prior written permission. | |
46 | * | |
47 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | |
48 | * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | |
49 | * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | |
50 | * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE | |
51 | * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR | |
52 | * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF | |
53 | * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS | |
54 | * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN | |
55 | * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) | |
56 | * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE | |
57 | * POSSIBILITY OF SUCH DAMAGE. | |
02925dd9 | 58 | */ |
59 | ||
4365e5b2 | 60 | |
02925dd9 | 61 | #include "redis.h" |
62 | #include "bio.h" | |
63 | ||
50be9b97 | 64 | static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; |
65 | static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS]; | |
66 | static list *bio_jobs[REDIS_BIO_NUM_OPS]; | |
fde4e4c4 | 67 | /* The following array is used to hold the number of pending jobs for every |
68 | * OP type. This allows us to export the bioPendingJobsOfType() API that is | |
69 | * useful when the main thread wants to perform some operation that may involve | |
70 | * objects shared with the background thread. The main thread will just wait | |
71 | * that there are no longer jobs of this type to be executed before performing | |
72 | * the sensible operation. This data is also useful for reporting. */ | |
50be9b97 | 73 | static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; |
02925dd9 | 74 | |
75 | /* This structure represents a background Job. It is only used locally to this | |
76 | * file as the API deos not expose the internals at all. */ | |
77 | struct bio_job { | |
50be9b97 | 78 | time_t time; /* Time at which the job was created. */ |
79 | /* Job specific arguments pointers. If we need to pass more than three | |
80 | * arguments we can just pass a pointer to a structure or alike. */ | |
81 | void *arg1, *arg2, *arg3; | |
f81a5f54 | 82 | }; |
02925dd9 | 83 | |
84 | void *bioProcessBackgroundJobs(void *arg); | |
85 | ||
f81a5f54 | 86 | /* Make sure we have enough stack to perform all the things we do in the |
87 | * main thread. */ | |
88 | #define REDIS_THREAD_STACK_SIZE (1024*1024*4) | |
89 | ||
02925dd9 | 90 | /* Initialize the background system, spawning the thread. */ |
91 | void bioInit(void) { | |
92 | pthread_attr_t attr; | |
93 | pthread_t thread; | |
94 | size_t stacksize; | |
fde4e4c4 | 95 | int j; |
02925dd9 | 96 | |
fde4e4c4 | 97 | /* Initialization of state vars and objects */ |
50be9b97 | 98 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { |
99 | pthread_mutex_init(&bio_mutex[j],NULL); | |
100 | pthread_cond_init(&bio_condvar[j],NULL); | |
101 | bio_jobs[j] = listCreate(); | |
102 | bio_pending[j] = 0; | |
103 | } | |
02925dd9 | 104 | |
105 | /* Set the stack size as by default it may be small in some system */ | |
106 | pthread_attr_init(&attr); | |
f81a5f54 | 107 | pthread_attr_getstacksize(&attr,&stacksize); |
02925dd9 | 108 | if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ |
109 | while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; | |
110 | pthread_attr_setstacksize(&attr, stacksize); | |
111 | ||
50be9b97 | 112 | /* Ready to spawn our threads. We use the single argument the thread |
113 | * function accepts in order to pass the job ID the thread is | |
114 | * responsible of. */ | |
115 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { | |
116 | void *arg = (void*)(unsigned long) j; | |
117 | if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { | |
118 | redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); | |
119 | exit(1); | |
120 | } | |
02925dd9 | 121 | } |
122 | } | |
123 | ||
50be9b97 | 124 | void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { |
02925dd9 | 125 | struct bio_job *job = zmalloc(sizeof(*job)); |
126 | ||
50be9b97 | 127 | job->time = time(NULL); |
128 | job->arg1 = arg1; | |
129 | job->arg2 = arg2; | |
130 | job->arg3 = arg3; | |
131 | pthread_mutex_lock(&bio_mutex[type]); | |
132 | listAddNodeTail(bio_jobs[type],job); | |
fde4e4c4 | 133 | bio_pending[type]++; |
50be9b97 | 134 | pthread_cond_signal(&bio_condvar[type]); |
135 | pthread_mutex_unlock(&bio_mutex[type]); | |
02925dd9 | 136 | } |
137 | ||
138 | void *bioProcessBackgroundJobs(void *arg) { | |
139 | struct bio_job *job; | |
50be9b97 | 140 | unsigned long type = (unsigned long) arg; |
aa96122d | 141 | sigset_t sigset; |
02925dd9 | 142 | |
143 | pthread_detach(pthread_self()); | |
50be9b97 | 144 | pthread_mutex_lock(&bio_mutex[type]); |
aa96122d | 145 | /* Block SIGALRM so we are sure that only the main thread will |
146 | * receive the watchdog signal. */ | |
147 | sigemptyset(&sigset); | |
148 | sigaddset(&sigset, SIGALRM); | |
149 | if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) | |
150 | redisLog(REDIS_WARNING, | |
151 | "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); | |
152 | ||
02925dd9 | 153 | while(1) { |
154 | listNode *ln; | |
155 | ||
156 | /* The loop always starts with the lock hold. */ | |
50be9b97 | 157 | if (listLength(bio_jobs[type]) == 0) { |
158 | pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); | |
02925dd9 | 159 | continue; |
160 | } | |
161 | /* Pop the job from the queue. */ | |
50be9b97 | 162 | ln = listFirst(bio_jobs[type]); |
02925dd9 | 163 | job = ln->value; |
02925dd9 | 164 | /* It is now possible to unlock the background system as we know have |
165 | * a stand alone job structure to process.*/ | |
50be9b97 | 166 | pthread_mutex_unlock(&bio_mutex[type]); |
02925dd9 | 167 | |
168 | /* Process the job accordingly to its type. */ | |
fde4e4c4 | 169 | if (type == REDIS_BIO_CLOSE_FILE) { |
50be9b97 | 170 | close((long)job->arg1); |
9fc1e1b1 | 171 | } else if (type == REDIS_BIO_AOF_FSYNC) { |
a60b397b | 172 | aof_fsync((long)job->arg1); |
02925dd9 | 173 | } else { |
174 | redisPanic("Wrong job type in bioProcessBackgroundJobs()."); | |
175 | } | |
176 | zfree(job); | |
177 | ||
178 | /* Lock again before reiterating the loop, if there are no longer | |
179 | * jobs to process we'll block again in pthread_cond_wait(). */ | |
50be9b97 | 180 | pthread_mutex_lock(&bio_mutex[type]); |
1317b7c2 | 181 | listDelNode(bio_jobs[type],ln); |
fde4e4c4 | 182 | bio_pending[type]--; |
183 | } | |
184 | } | |
185 | ||
186 | /* Return the number of pending jobs of the specified type. */ | |
187 | unsigned long long bioPendingJobsOfType(int type) { | |
188 | unsigned long long val; | |
50be9b97 | 189 | pthread_mutex_lock(&bio_mutex[type]); |
fde4e4c4 | 190 | val = bio_pending[type]; |
50be9b97 | 191 | pthread_mutex_unlock(&bio_mutex[type]); |
fde4e4c4 | 192 | return val; |
193 | } | |
194 | ||
47e7f9ac | 195 | #if 0 /* We don't use the following code for now, and bioWaitPendingJobsLE |
196 | probably needs a rewrite using conditional variables instead of the | |
197 | current implementation. */ | |
198 | ||
199 | ||
fde4e4c4 | 200 | /* Wait until the number of pending jobs of the specified type are |
201 | * less or equal to the specified number. | |
202 | * | |
203 | * This function may block for long time, it should only be used to perform | |
91de5421 | 204 | * the following tasks: |
205 | * | |
206 | * 1) To avoid that the main thread is pushing jobs of a given time so fast | |
207 | * that the background thread can't process them at the same speed. | |
208 | * So before creating a new job of a given type the main thread should | |
209 | * call something like: bioWaitPendingJobsLE(job_type,10000); | |
210 | * 2) In order to perform special operations that make it necessary to be sure | |
211 | * no one is touching shared resourced in the background. | |
212 | */ | |
fde4e4c4 | 213 | void bioWaitPendingJobsLE(int type, unsigned long long num) { |
214 | unsigned long long iteration = 0; | |
215 | ||
216 | /* We poll the jobs queue aggressively to start, and gradually relax | |
217 | * the polling speed if it is going to take too much time. */ | |
218 | while(1) { | |
219 | iteration++; | |
220 | if (iteration > 1000 && iteration <= 10000) { | |
221 | usleep(100); | |
222 | } else if (iteration > 10000) { | |
223 | usleep(1000); | |
224 | } | |
225 | if (bioPendingJobsOfType(type) <= num) break; | |
02925dd9 | 226 | } |
227 | } | |
50be9b97 | 228 | |
229 | /* Return the older job of the specified type. */ | |
230 | time_t bioOlderJobOfType(int type) { | |
231 | time_t time; | |
232 | listNode *ln; | |
233 | struct bio_job *job; | |
234 | ||
235 | pthread_mutex_lock(&bio_mutex[type]); | |
236 | ln = listFirst(bio_jobs[type]); | |
b39a4d0b | 237 | if (ln == NULL) { |
238 | pthread_mutex_unlock(&bio_mutex[type]); | |
239 | return 0; | |
240 | } | |
50be9b97 | 241 | job = ln->value; |
242 | time = job->time; | |
243 | pthread_mutex_unlock(&bio_mutex[type]); | |
244 | return time; | |
245 | } | |
246 | ||
47e7f9ac | 247 | #endif |