]>
Commit | Line | Data |
---|---|---|
1 | /* Background I/O service for Redis. | |
2 | * | |
3 | * This file implements operations that we need to perform in the background. | |
4 | * Currently there is only a single operation, that is a background close(2) | |
5 | * system call. This is needed as when the process is the last owner of a | |
6 | * reference to a file closing it means unlinking it, and the deletion of the | |
7 | * file is slow, blocking the server. | |
8 | * | |
9 | * In the future we'll either continue implementing new things we need or | |
10 | * we'll switch to libeio. However there are probably long term uses for this | |
11 | * file as we may want to put here Redis specific background tasks (for instance | |
12 | * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL | |
13 | * implementation). | |
14 | * | |
15 | * DESIGN | |
16 | * ------ | |
17 | * | |
18 | * The design is trivial, we have a structure representing a job to perform | |
19 | * and a different thread and job queue for every job type. | |
20 | * Every thread wait for new jobs in its queue, and process every job | |
21 | * sequentially. | |
22 | * | |
23 | * Jobs of the same type are guaranteed to be processed from the least | |
24 | * recently inserted to the most recently inserted (older jobs processed | |
25 | * first). | |
26 | * | |
27 | * Currently there is no way for the creator of the job to be notified about | |
28 | * the completion of the operation, this will only be added when/if needed. | |
29 | */ | |
30 | ||
31 | #include "redis.h" | |
32 | #include "bio.h" | |
33 | ||
34 | static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; | |
35 | static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS]; | |
36 | static list *bio_jobs[REDIS_BIO_NUM_OPS]; | |
37 | /* The following array is used to hold the number of pending jobs for every | |
38 | * OP type. This allows us to export the bioPendingJobsOfType() API that is | |
39 | * useful when the main thread wants to perform some operation that may involve | |
40 | * objects shared with the background thread. The main thread will just wait | |
41 | * that there are no longer jobs of this type to be executed before performing | |
42 | * the sensible operation. This data is also useful for reporting. */ | |
43 | static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; | |
44 | ||
45 | /* This structure represents a background Job. It is only used locally to this | |
46 | * file as the API deos not expose the internals at all. */ | |
47 | struct bio_job { | |
48 | time_t time; /* Time at which the job was created. */ | |
49 | /* Job specific arguments pointers. If we need to pass more than three | |
50 | * arguments we can just pass a pointer to a structure or alike. */ | |
51 | void *arg1, *arg2, *arg3; | |
52 | }; | |
53 | ||
54 | void *bioProcessBackgroundJobs(void *arg); | |
55 | ||
56 | /* Make sure we have enough stack to perform all the things we do in the | |
57 | * main thread. */ | |
58 | #define REDIS_THREAD_STACK_SIZE (1024*1024*4) | |
59 | ||
60 | /* Initialize the background system, spawning the thread. */ | |
61 | void bioInit(void) { | |
62 | pthread_attr_t attr; | |
63 | pthread_t thread; | |
64 | size_t stacksize; | |
65 | int j; | |
66 | ||
67 | /* Initialization of state vars and objects */ | |
68 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { | |
69 | pthread_mutex_init(&bio_mutex[j],NULL); | |
70 | pthread_cond_init(&bio_condvar[j],NULL); | |
71 | bio_jobs[j] = listCreate(); | |
72 | bio_pending[j] = 0; | |
73 | } | |
74 | ||
75 | /* Set the stack size as by default it may be small in some system */ | |
76 | pthread_attr_init(&attr); | |
77 | pthread_attr_getstacksize(&attr,&stacksize); | |
78 | if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ | |
79 | while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; | |
80 | pthread_attr_setstacksize(&attr, stacksize); | |
81 | ||
82 | /* Ready to spawn our threads. We use the single argument the thread | |
83 | * function accepts in order to pass the job ID the thread is | |
84 | * responsible of. */ | |
85 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { | |
86 | void *arg = (void*)(unsigned long) j; | |
87 | if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { | |
88 | redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); | |
89 | exit(1); | |
90 | } | |
91 | } | |
92 | } | |
93 | ||
94 | void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { | |
95 | struct bio_job *job = zmalloc(sizeof(*job)); | |
96 | ||
97 | job->time = time(NULL); | |
98 | job->arg1 = arg1; | |
99 | job->arg2 = arg2; | |
100 | job->arg3 = arg3; | |
101 | pthread_mutex_lock(&bio_mutex[type]); | |
102 | listAddNodeTail(bio_jobs[type],job); | |
103 | bio_pending[type]++; | |
104 | pthread_cond_signal(&bio_condvar[type]); | |
105 | pthread_mutex_unlock(&bio_mutex[type]); | |
106 | } | |
107 | ||
108 | void *bioProcessBackgroundJobs(void *arg) { | |
109 | struct bio_job *job; | |
110 | unsigned long type = (unsigned long) arg; | |
111 | ||
112 | pthread_detach(pthread_self()); | |
113 | pthread_mutex_lock(&bio_mutex[type]); | |
114 | while(1) { | |
115 | listNode *ln; | |
116 | ||
117 | /* The loop always starts with the lock hold. */ | |
118 | if (listLength(bio_jobs[type]) == 0) { | |
119 | pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); | |
120 | continue; | |
121 | } | |
122 | /* Pop the job from the queue. */ | |
123 | ln = listFirst(bio_jobs[type]); | |
124 | job = ln->value; | |
125 | /* It is now possible to unlock the background system as we know have | |
126 | * a stand alone job structure to process.*/ | |
127 | pthread_mutex_unlock(&bio_mutex[type]); | |
128 | ||
129 | /* Process the job accordingly to its type. */ | |
130 | if (type == REDIS_BIO_CLOSE_FILE) { | |
131 | close((long)job->arg1); | |
132 | } else { | |
133 | redisPanic("Wrong job type in bioProcessBackgroundJobs()."); | |
134 | } | |
135 | zfree(job); | |
136 | ||
137 | /* Lock again before reiterating the loop, if there are no longer | |
138 | * jobs to process we'll block again in pthread_cond_wait(). */ | |
139 | pthread_mutex_lock(&bio_mutex[type]); | |
140 | listDelNode(bio_jobs[type],ln); | |
141 | bio_pending[type]--; | |
142 | } | |
143 | } | |
144 | ||
145 | /* Return the number of pending jobs of the specified type. */ | |
146 | unsigned long long bioPendingJobsOfType(int type) { | |
147 | unsigned long long val; | |
148 | pthread_mutex_lock(&bio_mutex[type]); | |
149 | val = bio_pending[type]; | |
150 | pthread_mutex_unlock(&bio_mutex[type]); | |
151 | return val; | |
152 | } | |
153 | ||
154 | /* Wait until the number of pending jobs of the specified type are | |
155 | * less or equal to the specified number. | |
156 | * | |
157 | * This function may block for long time, it should only be used to perform | |
158 | * the following tasks: | |
159 | * | |
160 | * 1) To avoid that the main thread is pushing jobs of a given time so fast | |
161 | * that the background thread can't process them at the same speed. | |
162 | * So before creating a new job of a given type the main thread should | |
163 | * call something like: bioWaitPendingJobsLE(job_type,10000); | |
164 | * 2) In order to perform special operations that make it necessary to be sure | |
165 | * no one is touching shared resourced in the background. | |
166 | */ | |
167 | void bioWaitPendingJobsLE(int type, unsigned long long num) { | |
168 | unsigned long long iteration = 0; | |
169 | ||
170 | /* We poll the jobs queue aggressively to start, and gradually relax | |
171 | * the polling speed if it is going to take too much time. */ | |
172 | while(1) { | |
173 | iteration++; | |
174 | if (iteration > 1000 && iteration <= 10000) { | |
175 | usleep(100); | |
176 | } else if (iteration > 10000) { | |
177 | usleep(1000); | |
178 | } | |
179 | if (bioPendingJobsOfType(type) <= num) break; | |
180 | } | |
181 | } | |
182 | ||
183 | /* Return the older job of the specified type. */ | |
184 | time_t bioOlderJobOfType(int type) { | |
185 | time_t time; | |
186 | listNode *ln; | |
187 | struct bio_job *job; | |
188 | ||
189 | pthread_mutex_lock(&bio_mutex[type]); | |
190 | ln = listFirst(bio_jobs[type]); | |
191 | if (ln == NULL) { | |
192 | pthread_mutex_unlock(&bio_mutex[type]); | |
193 | return 0; | |
194 | } | |
195 | job = ln->value; | |
196 | time = job->time; | |
197 | pthread_mutex_unlock(&bio_mutex[type]); | |
198 | return time; | |
199 | } | |
200 |