]> git.saurik.com Git - redis.git/blob - src/bio.c
Background I/O library enhanced so that the main thread can query for the number...
[redis.git] / src / bio.c
1 /* Background I/O service for Redis.
2 *
3 * This file implements operations that we need to perform in the background.
4 * Currently there is only a single operation, that is a background close(2)
5 * system call. This is needed as when the process is the last owner of a
6 * reference to a file closing it means unlinking it, and the deletion of the
7 * file is slow, blocking the server.
8 *
9 * In the future we'll either continue implementing new things we need or
10 * we'll switch to libeio. However there are probably long term uses for this
11 * file as we may want to put here Redis specific background tasks (for instance
12 * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
13 * implementation).
14 *
15 * DESIGN
16 * ------
17 *
18 * The design is trivial, we have a structure representing a job to perform
19 * and a single thread performing all the I/O operations in the queue.
20 * Currently there is no way for the creator of the job to be notified about
21 * the completion of the operation, this will only be added when/if needed.
22 */
23
24 #include "redis.h"
25 #include "bio.h"
26
27 static pthread_mutex_t bio_mutex;
28 static pthread_cond_t bio_condvar;
29 static list *bio_jobs;
30 /* The following array is used to hold the number of pending jobs for every
31 * OP type. This allows us to export the bioPendingJobsOfType() API that is
32 * useful when the main thread wants to perform some operation that may involve
33 * objects shared with the background thread. The main thread will just wait
34 * that there are no longer jobs of this type to be executed before performing
35 * the sensible operation. This data is also useful for reporting. */
36 static unsigned long long *bio_pending;
37
38 /* This structure represents a background Job. It is only used locally to this
39 * file as the API deos not expose the internals at all. */
40 struct bio_job {
41 int type; /* Job type, for instance BIO_JOB_CLOSE */
42 void *data; /* Job specific arguments pointer. */
43 };
44
45 void *bioProcessBackgroundJobs(void *arg);
46
47 /* Make sure we have enough stack to perform all the things we do in the
48 * main thread. */
49 #define REDIS_THREAD_STACK_SIZE (1024*1024*4)
50
51 /* Initialize the background system, spawning the thread. */
52 void bioInit(void) {
53 pthread_attr_t attr;
54 pthread_t thread;
55 size_t stacksize;
56 int j;
57
58 /* Initialization of state vars and objects */
59 pthread_mutex_init(&bio_mutex,NULL);
60 pthread_cond_init(&bio_condvar,NULL);
61 bio_jobs = listCreate();
62 bio_pending = zmalloc(sizeof(*bio_pending)*REDIS_BIO_MAX_OP_ID);
63 for (j = 0; j < REDIS_BIO_MAX_OP_ID; j++) bio_pending[j] = 0;
64
65 /* Set the stack size as by default it may be small in some system */
66 pthread_attr_init(&attr);
67 pthread_attr_getstacksize(&attr,&stacksize);
68 if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
69 while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
70 pthread_attr_setstacksize(&attr, stacksize);
71
72 /* Ready to spawn our thread */
73 if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,NULL) != 0) {
74 redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
75 exit(1);
76 }
77 }
78
79 void bioCreateBackgroundJob(int type, void *data) {
80 struct bio_job *job = zmalloc(sizeof(*job));
81
82 job->type = type;
83 job->data = data;
84 pthread_mutex_lock(&bio_mutex);
85 listAddNodeTail(bio_jobs,job);
86 bio_pending[type]++;
87 pthread_cond_signal(&bio_condvar);
88 pthread_mutex_unlock(&bio_mutex);
89 }
90
91 void *bioProcessBackgroundJobs(void *arg) {
92 struct bio_job *job;
93 REDIS_NOTUSED(arg);
94
95 pthread_detach(pthread_self());
96 pthread_mutex_lock(&bio_mutex);
97 while(1) {
98 listNode *ln;
99 int type;
100
101 /* The loop always starts with the lock hold. */
102 if (listLength(bio_jobs) == 0) {
103 pthread_cond_wait(&bio_condvar,&bio_mutex);
104 continue;
105 }
106 /* Pop the job from the queue. */
107 ln = listFirst(bio_jobs);
108 job = ln->value;
109 type = job->type;
110 listDelNode(bio_jobs,ln);
111 /* It is now possible to unlock the background system as we know have
112 * a stand alone job structure to process.*/
113 pthread_mutex_unlock(&bio_mutex);
114
115 /* Process the job accordingly to its type. */
116 if (type == REDIS_BIO_CLOSE_FILE) {
117 close((long)job->data);
118 } else {
119 redisPanic("Wrong job type in bioProcessBackgroundJobs().");
120 }
121 zfree(job);
122
123 /* Lock again before reiterating the loop, if there are no longer
124 * jobs to process we'll block again in pthread_cond_wait(). */
125 pthread_mutex_lock(&bio_mutex);
126 bio_pending[type]--;
127 }
128 }
129
130 /* Return the number of pending jobs of the specified type. */
131 unsigned long long bioPendingJobsOfType(int type) {
132 unsigned long long val;
133 pthread_mutex_lock(&bio_mutex);
134 val = bio_pending[type];
135 pthread_mutex_unlock(&bio_mutex);
136 return val;
137 }
138
139 /* Wait until the number of pending jobs of the specified type are
140 * less or equal to the specified number.
141 *
142 * This function may block for long time, it should only be used to perform
143 * special tasks like AOF rewriting or alike. */
144 void bioWaitPendingJobsLE(int type, unsigned long long num) {
145 unsigned long long iteration = 0;
146
147 /* We poll the jobs queue aggressively to start, and gradually relax
148 * the polling speed if it is going to take too much time. */
149 while(1) {
150 iteration++;
151 if (iteration > 1000 && iteration <= 10000) {
152 usleep(100);
153 } else if (iteration > 10000) {
154 usleep(1000);
155 }
156 if (bioPendingJobsOfType(type) <= num) break;
157 }
158 }