]>
Commit | Line | Data |
---|---|---|
02925dd9 | 1 | /* Background I/O service for Redis. |
2 | * | |
3 | * This file implements operations that we need to perform in the background. | |
4 | * Currently there is only a single operation, that is a background close(2) | |
5 | * system call. This is needed as when the process is the last owner of a | |
6 | * reference to a file closing it means unlinking it, and the deletion of the | |
7 | * file is slow, blocking the server. | |
8 | * | |
9 | * In the future we'll either continue implementing new things we need or | |
10 | * we'll switch to libeio. However there are probably long term uses for this | |
11 | * file as we may want to put here Redis specific background tasks (for instance | |
12 | * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL | |
13 | * implementation). | |
14 | * | |
15 | * DESIGN | |
16 | * ------ | |
17 | * | |
18 | * The design is trivial, we have a structure representing a job to perform | |
50be9b97 | 19 | * and a different thread and job queue for every job type. |
20 | * Every thread wait for new jobs in its queue, and process every job | |
21 | * sequentially. | |
22 | * | |
02925dd9 | 23 | * Currently there is no way for the creator of the job to be notified about |
24 | * the completion of the operation, this will only be added when/if needed. | |
25 | */ | |
26 | ||
27 | #include "redis.h" | |
28 | #include "bio.h" | |
29 | ||
50be9b97 | 30 | static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; |
31 | static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS]; | |
32 | static list *bio_jobs[REDIS_BIO_NUM_OPS]; | |
fde4e4c4 | 33 | /* The following array is used to hold the number of pending jobs for every |
34 | * OP type. This allows us to export the bioPendingJobsOfType() API that is | |
35 | * useful when the main thread wants to perform some operation that may involve | |
36 | * objects shared with the background thread. The main thread will just wait | |
37 | * that there are no longer jobs of this type to be executed before performing | |
38 | * the sensible operation. This data is also useful for reporting. */ | |
50be9b97 | 39 | static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; |
02925dd9 | 40 | |
41 | /* This structure represents a background Job. It is only used locally to this | |
42 | * file as the API deos not expose the internals at all. */ | |
43 | struct bio_job { | |
50be9b97 | 44 | time_t time; /* Time at which the job was created. */ |
45 | /* Job specific arguments pointers. If we need to pass more than three | |
46 | * arguments we can just pass a pointer to a structure or alike. */ | |
47 | void *arg1, *arg2, *arg3; | |
f81a5f54 | 48 | }; |
02925dd9 | 49 | |
50 | void *bioProcessBackgroundJobs(void *arg); | |
51 | ||
f81a5f54 | 52 | /* Make sure we have enough stack to perform all the things we do in the |
53 | * main thread. */ | |
54 | #define REDIS_THREAD_STACK_SIZE (1024*1024*4) | |
55 | ||
02925dd9 | 56 | /* Initialize the background system, spawning the thread. */ |
57 | void bioInit(void) { | |
58 | pthread_attr_t attr; | |
59 | pthread_t thread; | |
60 | size_t stacksize; | |
fde4e4c4 | 61 | int j; |
02925dd9 | 62 | |
fde4e4c4 | 63 | /* Initialization of state vars and objects */ |
50be9b97 | 64 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { |
65 | pthread_mutex_init(&bio_mutex[j],NULL); | |
66 | pthread_cond_init(&bio_condvar[j],NULL); | |
67 | bio_jobs[j] = listCreate(); | |
68 | bio_pending[j] = 0; | |
69 | } | |
02925dd9 | 70 | |
71 | /* Set the stack size as by default it may be small in some system */ | |
72 | pthread_attr_init(&attr); | |
f81a5f54 | 73 | pthread_attr_getstacksize(&attr,&stacksize); |
02925dd9 | 74 | if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ |
75 | while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; | |
76 | pthread_attr_setstacksize(&attr, stacksize); | |
77 | ||
50be9b97 | 78 | /* Ready to spawn our threads. We use the single argument the thread |
79 | * function accepts in order to pass the job ID the thread is | |
80 | * responsible of. */ | |
81 | for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { | |
82 | void *arg = (void*)(unsigned long) j; | |
83 | if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { | |
84 | redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); | |
85 | exit(1); | |
86 | } | |
02925dd9 | 87 | } |
88 | } | |
89 | ||
50be9b97 | 90 | void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { |
02925dd9 | 91 | struct bio_job *job = zmalloc(sizeof(*job)); |
92 | ||
50be9b97 | 93 | job->time = time(NULL); |
94 | job->arg1 = arg1; | |
95 | job->arg2 = arg2; | |
96 | job->arg3 = arg3; | |
97 | pthread_mutex_lock(&bio_mutex[type]); | |
98 | listAddNodeTail(bio_jobs[type],job); | |
fde4e4c4 | 99 | bio_pending[type]++; |
50be9b97 | 100 | pthread_cond_signal(&bio_condvar[type]); |
101 | pthread_mutex_unlock(&bio_mutex[type]); | |
02925dd9 | 102 | } |
103 | ||
104 | void *bioProcessBackgroundJobs(void *arg) { | |
105 | struct bio_job *job; | |
50be9b97 | 106 | unsigned long type = (unsigned long) arg; |
02925dd9 | 107 | |
108 | pthread_detach(pthread_self()); | |
50be9b97 | 109 | pthread_mutex_lock(&bio_mutex[type]); |
02925dd9 | 110 | while(1) { |
111 | listNode *ln; | |
112 | ||
113 | /* The loop always starts with the lock hold. */ | |
50be9b97 | 114 | if (listLength(bio_jobs[type]) == 0) { |
115 | pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); | |
02925dd9 | 116 | continue; |
117 | } | |
118 | /* Pop the job from the queue. */ | |
50be9b97 | 119 | ln = listFirst(bio_jobs[type]); |
02925dd9 | 120 | job = ln->value; |
50be9b97 | 121 | listDelNode(bio_jobs[type],ln); |
02925dd9 | 122 | /* It is now possible to unlock the background system as we know have |
123 | * a stand alone job structure to process.*/ | |
50be9b97 | 124 | pthread_mutex_unlock(&bio_mutex[type]); |
02925dd9 | 125 | |
126 | /* Process the job accordingly to its type. */ | |
fde4e4c4 | 127 | if (type == REDIS_BIO_CLOSE_FILE) { |
50be9b97 | 128 | close((long)job->arg1); |
02925dd9 | 129 | } else { |
130 | redisPanic("Wrong job type in bioProcessBackgroundJobs()."); | |
131 | } | |
132 | zfree(job); | |
133 | ||
134 | /* Lock again before reiterating the loop, if there are no longer | |
135 | * jobs to process we'll block again in pthread_cond_wait(). */ | |
50be9b97 | 136 | pthread_mutex_lock(&bio_mutex[type]); |
fde4e4c4 | 137 | bio_pending[type]--; |
138 | } | |
139 | } | |
140 | ||
141 | /* Return the number of pending jobs of the specified type. */ | |
142 | unsigned long long bioPendingJobsOfType(int type) { | |
143 | unsigned long long val; | |
50be9b97 | 144 | pthread_mutex_lock(&bio_mutex[type]); |
fde4e4c4 | 145 | val = bio_pending[type]; |
50be9b97 | 146 | pthread_mutex_unlock(&bio_mutex[type]); |
fde4e4c4 | 147 | return val; |
148 | } | |
149 | ||
150 | /* Wait until the number of pending jobs of the specified type are | |
151 | * less or equal to the specified number. | |
152 | * | |
153 | * This function may block for long time, it should only be used to perform | |
91de5421 | 154 | * the following tasks: |
155 | * | |
156 | * 1) To avoid that the main thread is pushing jobs of a given time so fast | |
157 | * that the background thread can't process them at the same speed. | |
158 | * So before creating a new job of a given type the main thread should | |
159 | * call something like: bioWaitPendingJobsLE(job_type,10000); | |
160 | * 2) In order to perform special operations that make it necessary to be sure | |
161 | * no one is touching shared resourced in the background. | |
162 | */ | |
fde4e4c4 | 163 | void bioWaitPendingJobsLE(int type, unsigned long long num) { |
164 | unsigned long long iteration = 0; | |
165 | ||
166 | /* We poll the jobs queue aggressively to start, and gradually relax | |
167 | * the polling speed if it is going to take too much time. */ | |
168 | while(1) { | |
169 | iteration++; | |
170 | if (iteration > 1000 && iteration <= 10000) { | |
171 | usleep(100); | |
172 | } else if (iteration > 10000) { | |
173 | usleep(1000); | |
174 | } | |
175 | if (bioPendingJobsOfType(type) <= num) break; | |
02925dd9 | 176 | } |
177 | } | |
50be9b97 | 178 | |
179 | /* Return the older job of the specified type. */ | |
180 | time_t bioOlderJobOfType(int type) { | |
181 | time_t time; | |
182 | listNode *ln; | |
183 | struct bio_job *job; | |
184 | ||
185 | pthread_mutex_lock(&bio_mutex[type]); | |
186 | ln = listFirst(bio_jobs[type]); | |
187 | job = ln->value; | |
188 | time = job->time; | |
189 | pthread_mutex_unlock(&bio_mutex[type]); | |
190 | return time; | |
191 | } | |
192 |