02925dd9 |
1 | /* Background I/O service for Redis. |
2 | * |
3 | * This file implements operations that we need to perform in the background. |
4 | * Currently there is only a single operation, that is a background close(2) |
5 | * system call. This is needed as when the process is the last owner of a |
6 | * reference to a file closing it means unlinking it, and the deletion of the |
7 | * file is slow, blocking the server. |
8 | * |
9 | * In the future we'll either continue implementing new things we need or |
10 | * we'll switch to libeio. However there are probably long term uses for this |
11 | * file as we may want to put here Redis specific background tasks (for instance |
12 | * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL |
13 | * implementation). |
14 | * |
15 | * DESIGN |
16 | * ------ |
17 | * |
18 | * The design is trivial, we have a structure representing a job to perform |
50be9b97 |
19 | * and a different thread and job queue for every job type. |
20 | * Every thread wait for new jobs in its queue, and process every job |
21 | * sequentially. |
22 | * |
fbb23ce4 |
23 | * Jobs of the same type are guaranteed to be processed from the least |
24 | * recently inserted to the most recently inserted (older jobs processed |
25 | * first). |
26 | * |
02925dd9 |
27 | * Currently there is no way for the creator of the job to be notified about |
28 | * the completion of the operation, this will only be added when/if needed. |
29 | */ |
30 | |
31 | #include "redis.h" |
32 | #include "bio.h" |
33 | |
50be9b97 |
34 | static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; |
35 | static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS]; |
36 | static list *bio_jobs[REDIS_BIO_NUM_OPS]; |
fde4e4c4 |
37 | /* The following array is used to hold the number of pending jobs for every |
38 | * OP type. This allows us to export the bioPendingJobsOfType() API that is |
39 | * useful when the main thread wants to perform some operation that may involve |
40 | * objects shared with the background thread. The main thread will just wait |
41 | * that there are no longer jobs of this type to be executed before performing |
42 | * the sensible operation. This data is also useful for reporting. */ |
50be9b97 |
43 | static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; |
02925dd9 |
44 | |
45 | /* This structure represents a background Job. It is only used locally to this |
46 | * file as the API deos not expose the internals at all. */ |
47 | struct bio_job { |
50be9b97 |
48 | time_t time; /* Time at which the job was created. */ |
49 | /* Job specific arguments pointers. If we need to pass more than three |
50 | * arguments we can just pass a pointer to a structure or alike. */ |
51 | void *arg1, *arg2, *arg3; |
f81a5f54 |
52 | }; |
02925dd9 |
53 | |
54 | void *bioProcessBackgroundJobs(void *arg); |
55 | |
f81a5f54 |
56 | /* Make sure we have enough stack to perform all the things we do in the |
57 | * main thread. */ |
58 | #define REDIS_THREAD_STACK_SIZE (1024*1024*4) |
59 | |
02925dd9 |
60 | /* Initialize the background system, spawning the thread. */ |
61 | void bioInit(void) { |
62 | pthread_attr_t attr; |
63 | pthread_t thread; |
64 | size_t stacksize; |
fde4e4c4 |
65 | int j; |
02925dd9 |
66 | |
fde4e4c4 |
67 | /* Initialization of state vars and objects */ |
50be9b97 |
68 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { |
69 | pthread_mutex_init(&bio_mutex[j],NULL); |
70 | pthread_cond_init(&bio_condvar[j],NULL); |
71 | bio_jobs[j] = listCreate(); |
72 | bio_pending[j] = 0; |
73 | } |
02925dd9 |
74 | |
75 | /* Set the stack size as by default it may be small in some system */ |
76 | pthread_attr_init(&attr); |
f81a5f54 |
77 | pthread_attr_getstacksize(&attr,&stacksize); |
02925dd9 |
78 | if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ |
79 | while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; |
80 | pthread_attr_setstacksize(&attr, stacksize); |
81 | |
50be9b97 |
82 | /* Ready to spawn our threads. We use the single argument the thread |
83 | * function accepts in order to pass the job ID the thread is |
84 | * responsible of. */ |
85 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { |
86 | void *arg = (void*)(unsigned long) j; |
87 | if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { |
88 | redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); |
89 | exit(1); |
90 | } |
02925dd9 |
91 | } |
92 | } |
93 | |
50be9b97 |
94 | void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { |
02925dd9 |
95 | struct bio_job *job = zmalloc(sizeof(*job)); |
96 | |
50be9b97 |
97 | job->time = time(NULL); |
98 | job->arg1 = arg1; |
99 | job->arg2 = arg2; |
100 | job->arg3 = arg3; |
101 | pthread_mutex_lock(&bio_mutex[type]); |
102 | listAddNodeTail(bio_jobs[type],job); |
fde4e4c4 |
103 | bio_pending[type]++; |
50be9b97 |
104 | pthread_cond_signal(&bio_condvar[type]); |
105 | pthread_mutex_unlock(&bio_mutex[type]); |
02925dd9 |
106 | } |
107 | |
108 | void *bioProcessBackgroundJobs(void *arg) { |
109 | struct bio_job *job; |
50be9b97 |
110 | unsigned long type = (unsigned long) arg; |
02925dd9 |
111 | |
112 | pthread_detach(pthread_self()); |
50be9b97 |
113 | pthread_mutex_lock(&bio_mutex[type]); |
02925dd9 |
114 | while(1) { |
115 | listNode *ln; |
116 | |
117 | /* The loop always starts with the lock hold. */ |
50be9b97 |
118 | if (listLength(bio_jobs[type]) == 0) { |
119 | pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); |
02925dd9 |
120 | continue; |
121 | } |
122 | /* Pop the job from the queue. */ |
50be9b97 |
123 | ln = listFirst(bio_jobs[type]); |
02925dd9 |
124 | job = ln->value; |
02925dd9 |
125 | /* It is now possible to unlock the background system as we know have |
126 | * a stand alone job structure to process.*/ |
50be9b97 |
127 | pthread_mutex_unlock(&bio_mutex[type]); |
02925dd9 |
128 | |
129 | /* Process the job accordingly to its type. */ |
fde4e4c4 |
130 | if (type == REDIS_BIO_CLOSE_FILE) { |
50be9b97 |
131 | close((long)job->arg1); |
9fc1e1b1 |
132 | } else if (type == REDIS_BIO_AOF_FSYNC) { |
a60b397b |
133 | aof_fsync((long)job->arg1); |
02925dd9 |
134 | } else { |
135 | redisPanic("Wrong job type in bioProcessBackgroundJobs()."); |
136 | } |
137 | zfree(job); |
138 | |
139 | /* Lock again before reiterating the loop, if there are no longer |
140 | * jobs to process we'll block again in pthread_cond_wait(). */ |
50be9b97 |
141 | pthread_mutex_lock(&bio_mutex[type]); |
1317b7c2 |
142 | listDelNode(bio_jobs[type],ln); |
fde4e4c4 |
143 | bio_pending[type]--; |
144 | } |
145 | } |
146 | |
147 | /* Return the number of pending jobs of the specified type. */ |
148 | unsigned long long bioPendingJobsOfType(int type) { |
149 | unsigned long long val; |
50be9b97 |
150 | pthread_mutex_lock(&bio_mutex[type]); |
fde4e4c4 |
151 | val = bio_pending[type]; |
50be9b97 |
152 | pthread_mutex_unlock(&bio_mutex[type]); |
fde4e4c4 |
153 | return val; |
154 | } |
155 | |
47e7f9ac |
156 | #if 0 /* We don't use the following code for now, and bioWaitPendingJobsLE |
157 | probably needs a rewrite using conditional variables instead of the |
158 | current implementation. */ |
159 | |
160 | |
fde4e4c4 |
161 | /* Wait until the number of pending jobs of the specified type are |
162 | * less or equal to the specified number. |
163 | * |
164 | * This function may block for long time, it should only be used to perform |
91de5421 |
165 | * the following tasks: |
166 | * |
167 | * 1) To avoid that the main thread is pushing jobs of a given time so fast |
168 | * that the background thread can't process them at the same speed. |
169 | * So before creating a new job of a given type the main thread should |
170 | * call something like: bioWaitPendingJobsLE(job_type,10000); |
171 | * 2) In order to perform special operations that make it necessary to be sure |
172 | * no one is touching shared resourced in the background. |
173 | */ |
fde4e4c4 |
174 | void bioWaitPendingJobsLE(int type, unsigned long long num) { |
175 | unsigned long long iteration = 0; |
176 | |
177 | /* We poll the jobs queue aggressively to start, and gradually relax |
178 | * the polling speed if it is going to take too much time. */ |
179 | while(1) { |
180 | iteration++; |
181 | if (iteration > 1000 && iteration <= 10000) { |
182 | usleep(100); |
183 | } else if (iteration > 10000) { |
184 | usleep(1000); |
185 | } |
186 | if (bioPendingJobsOfType(type) <= num) break; |
02925dd9 |
187 | } |
188 | } |
50be9b97 |
189 | |
190 | /* Return the older job of the specified type. */ |
191 | time_t bioOlderJobOfType(int type) { |
192 | time_t time; |
193 | listNode *ln; |
194 | struct bio_job *job; |
195 | |
196 | pthread_mutex_lock(&bio_mutex[type]); |
197 | ln = listFirst(bio_jobs[type]); |
b39a4d0b |
198 | if (ln == NULL) { |
199 | pthread_mutex_unlock(&bio_mutex[type]); |
200 | return 0; |
201 | } |
50be9b97 |
202 | job = ln->value; |
203 | time = job->time; |
204 | pthread_mutex_unlock(&bio_mutex[type]); |
205 | return time; |
206 | } |
207 | |
47e7f9ac |
208 | #endif |