]>
Commit | Line | Data |
---|---|---|
1 | /* Background I/O service for Redis. | |
2 | * | |
3 | * This file implements operations that we need to perform in the background. | |
4 | * Currently there is only a single operation, that is a background close(2) | |
5 | * system call. This is needed as when the process is the last owner of a | |
6 | * reference to a file closing it means unlinking it, and the deletion of the | |
7 | * file is slow, blocking the server. | |
8 | * | |
9 | * In the future we'll either continue implementing new things we need or | |
10 | * we'll switch to libeio. However there are probably long term uses for this | |
11 | * file as we may want to put here Redis specific background tasks (for instance | |
12 | * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL | |
13 | * implementation). | |
14 | * | |
15 | * DESIGN | |
16 | * ------ | |
17 | * | |
18 | * The design is trivial, we have a structure representing a job to perform | |
19 | * and a different thread and job queue for every job type. | |
20 | * Every thread wait for new jobs in its queue, and process every job | |
21 | * sequentially. | |
22 | * | |
23 | * Jobs of the same type are guaranteed to be processed from the least | |
24 | * recently inserted to the most recently inserted (older jobs processed | |
25 | * first). | |
26 | * | |
27 | * Currently there is no way for the creator of the job to be notified about | |
28 | * the completion of the operation, this will only be added when/if needed. | |
29 | */ | |
30 | ||
31 | #include "redis.h" | |
32 | #include "bio.h" | |
33 | ||
34 | static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; | |
35 | static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS]; | |
36 | static list *bio_jobs[REDIS_BIO_NUM_OPS]; | |
37 | /* The following array is used to hold the number of pending jobs for every | |
38 | * OP type. This allows us to export the bioPendingJobsOfType() API that is | |
39 | * useful when the main thread wants to perform some operation that may involve | |
40 | * objects shared with the background thread. The main thread will just wait | |
41 | * that there are no longer jobs of this type to be executed before performing | |
42 | * the sensible operation. This data is also useful for reporting. */ | |
43 | static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; | |
44 | ||
45 | /* This structure represents a background Job. It is only used locally to this | |
46 | * file as the API deos not expose the internals at all. */ | |
47 | struct bio_job { | |
48 | time_t time; /* Time at which the job was created. */ | |
49 | /* Job specific arguments pointers. If we need to pass more than three | |
50 | * arguments we can just pass a pointer to a structure or alike. */ | |
51 | void *arg1, *arg2, *arg3; | |
52 | }; | |
53 | ||
54 | void *bioProcessBackgroundJobs(void *arg); | |
55 | ||
56 | /* Make sure we have enough stack to perform all the things we do in the | |
57 | * main thread. */ | |
58 | #define REDIS_THREAD_STACK_SIZE (1024*1024*4) | |
59 | ||
60 | /* Initialize the background system, spawning the thread. */ | |
61 | void bioInit(void) { | |
62 | pthread_attr_t attr; | |
63 | pthread_t thread; | |
64 | size_t stacksize; | |
65 | int j; | |
66 | ||
67 | /* Initialization of state vars and objects */ | |
68 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { | |
69 | pthread_mutex_init(&bio_mutex[j],NULL); | |
70 | pthread_cond_init(&bio_condvar[j],NULL); | |
71 | bio_jobs[j] = listCreate(); | |
72 | bio_pending[j] = 0; | |
73 | } | |
74 | ||
75 | /* Set the stack size as by default it may be small in some system */ | |
76 | pthread_attr_init(&attr); | |
77 | pthread_attr_getstacksize(&attr,&stacksize); | |
78 | if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ | |
79 | while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; | |
80 | pthread_attr_setstacksize(&attr, stacksize); | |
81 | ||
82 | /* Ready to spawn our threads. We use the single argument the thread | |
83 | * function accepts in order to pass the job ID the thread is | |
84 | * responsible of. */ | |
85 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { | |
86 | void *arg = (void*)(unsigned long) j; | |
87 | if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { | |
88 | redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); | |
89 | exit(1); | |
90 | } | |
91 | } | |
92 | } | |
93 | ||
94 | void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { | |
95 | struct bio_job *job = zmalloc(sizeof(*job)); | |
96 | ||
97 | job->time = time(NULL); | |
98 | job->arg1 = arg1; | |
99 | job->arg2 = arg2; | |
100 | job->arg3 = arg3; | |
101 | pthread_mutex_lock(&bio_mutex[type]); | |
102 | listAddNodeTail(bio_jobs[type],job); | |
103 | bio_pending[type]++; | |
104 | pthread_cond_signal(&bio_condvar[type]); | |
105 | pthread_mutex_unlock(&bio_mutex[type]); | |
106 | } | |
107 | ||
108 | void *bioProcessBackgroundJobs(void *arg) { | |
109 | struct bio_job *job; | |
110 | unsigned long type = (unsigned long) arg; | |
111 | sigset_t sigset; | |
112 | ||
113 | pthread_detach(pthread_self()); | |
114 | pthread_mutex_lock(&bio_mutex[type]); | |
115 | /* Block SIGALRM so we are sure that only the main thread will | |
116 | * receive the watchdog signal. */ | |
117 | sigemptyset(&sigset); | |
118 | sigaddset(&sigset, SIGALRM); | |
119 | if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) | |
120 | redisLog(REDIS_WARNING, | |
121 | "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); | |
122 | ||
123 | while(1) { | |
124 | listNode *ln; | |
125 | ||
126 | /* The loop always starts with the lock hold. */ | |
127 | if (listLength(bio_jobs[type]) == 0) { | |
128 | pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); | |
129 | continue; | |
130 | } | |
131 | /* Pop the job from the queue. */ | |
132 | ln = listFirst(bio_jobs[type]); | |
133 | job = ln->value; | |
134 | /* It is now possible to unlock the background system as we know have | |
135 | * a stand alone job structure to process.*/ | |
136 | pthread_mutex_unlock(&bio_mutex[type]); | |
137 | ||
138 | /* Process the job accordingly to its type. */ | |
139 | if (type == REDIS_BIO_CLOSE_FILE) { | |
140 | close((long)job->arg1); | |
141 | } else if (type == REDIS_BIO_AOF_FSYNC) { | |
142 | aof_fsync((long)job->arg1); | |
143 | } else { | |
144 | redisPanic("Wrong job type in bioProcessBackgroundJobs()."); | |
145 | } | |
146 | zfree(job); | |
147 | ||
148 | /* Lock again before reiterating the loop, if there are no longer | |
149 | * jobs to process we'll block again in pthread_cond_wait(). */ | |
150 | pthread_mutex_lock(&bio_mutex[type]); | |
151 | listDelNode(bio_jobs[type],ln); | |
152 | bio_pending[type]--; | |
153 | } | |
154 | } | |
155 | ||
156 | /* Return the number of pending jobs of the specified type. */ | |
157 | unsigned long long bioPendingJobsOfType(int type) { | |
158 | unsigned long long val; | |
159 | pthread_mutex_lock(&bio_mutex[type]); | |
160 | val = bio_pending[type]; | |
161 | pthread_mutex_unlock(&bio_mutex[type]); | |
162 | return val; | |
163 | } | |
164 | ||
165 | #if 0 /* We don't use the following code for now, and bioWaitPendingJobsLE | |
166 | probably needs a rewrite using conditional variables instead of the | |
167 | current implementation. */ | |
168 | ||
169 | ||
170 | /* Wait until the number of pending jobs of the specified type are | |
171 | * less or equal to the specified number. | |
172 | * | |
173 | * This function may block for long time, it should only be used to perform | |
174 | * the following tasks: | |
175 | * | |
176 | * 1) To avoid that the main thread is pushing jobs of a given time so fast | |
177 | * that the background thread can't process them at the same speed. | |
178 | * So before creating a new job of a given type the main thread should | |
179 | * call something like: bioWaitPendingJobsLE(job_type,10000); | |
180 | * 2) In order to perform special operations that make it necessary to be sure | |
181 | * no one is touching shared resourced in the background. | |
182 | */ | |
183 | void bioWaitPendingJobsLE(int type, unsigned long long num) { | |
184 | unsigned long long iteration = 0; | |
185 | ||
186 | /* We poll the jobs queue aggressively to start, and gradually relax | |
187 | * the polling speed if it is going to take too much time. */ | |
188 | while(1) { | |
189 | iteration++; | |
190 | if (iteration > 1000 && iteration <= 10000) { | |
191 | usleep(100); | |
192 | } else if (iteration > 10000) { | |
193 | usleep(1000); | |
194 | } | |
195 | if (bioPendingJobsOfType(type) <= num) break; | |
196 | } | |
197 | } | |
198 | ||
199 | /* Return the older job of the specified type. */ | |
200 | time_t bioOlderJobOfType(int type) { | |
201 | time_t time; | |
202 | listNode *ln; | |
203 | struct bio_job *job; | |
204 | ||
205 | pthread_mutex_lock(&bio_mutex[type]); | |
206 | ln = listFirst(bio_jobs[type]); | |
207 | if (ln == NULL) { | |
208 | pthread_mutex_unlock(&bio_mutex[type]); | |
209 | return 0; | |
210 | } | |
211 | job = ln->value; | |
212 | time = job->time; | |
213 | pthread_mutex_unlock(&bio_mutex[type]); | |
214 | return time; | |
215 | } | |
216 | ||
217 | #endif |