1 /* Background I/O service for Redis.
3 * This file implements operations that we need to perform in the background.
4 * Currently there is only a single operation, that is a background close(2)
5 * system call. This is needed as when the process is the last owner of a
6 * reference to a file closing it means unlinking it, and the deletion of the
7 * file is slow, blocking the server.
9 * In the future we'll either continue implementing new things we need or
10 * we'll switch to libeio. However there are probably long term uses for this
11 * file as we may want to put here Redis specific background tasks (for instance
12 * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
18 * The design is trivial, we have a structure representing a job to perform
19 * and a single thread performing all the I/O operations in the queue.
20 * Currently there is no way for the creator of the job to be notified about
21 * the completion of the operation, this will only be added when/if needed.
27 static pthread_mutex_t bio_mutex
;
28 static pthread_cond_t bio_condvar
;
29 static list
*bio_jobs
;
30 /* The following array is used to hold the number of pending jobs for every
31 * OP type. This allows us to export the bioPendingJobsOfType() API that is
32 * useful when the main thread wants to perform some operation that may involve
33 * objects shared with the background thread. The main thread will just wait
34 * that there are no longer jobs of this type to be executed before performing
35 * the sensible operation. This data is also useful for reporting. */
36 static unsigned long long *bio_pending
;
38 /* This structure represents a background Job. It is only used locally to this
39 * file as the API deos not expose the internals at all. */
41 int type
; /* Job type, for instance BIO_JOB_CLOSE */
42 void *data
; /* Job specific arguments pointer. */
45 void *bioProcessBackgroundJobs(void *arg
);
47 /* Make sure we have enough stack to perform all the things we do in the
49 #define REDIS_THREAD_STACK_SIZE (1024*1024*4)
51 /* Initialize the background system, spawning the thread. */
58 /* Initialization of state vars and objects */
59 pthread_mutex_init(&bio_mutex
,NULL
);
60 pthread_cond_init(&bio_condvar
,NULL
);
61 bio_jobs
= listCreate();
62 bio_pending
= zmalloc(sizeof(*bio_pending
)*REDIS_BIO_MAX_OP_ID
);
63 for (j
= 0; j
< REDIS_BIO_MAX_OP_ID
; j
++) bio_pending
[j
] = 0;
65 /* Set the stack size as by default it may be small in some system */
66 pthread_attr_init(&attr
);
67 pthread_attr_getstacksize(&attr
,&stacksize
);
68 if (!stacksize
) stacksize
= 1; /* The world is full of Solaris Fixes */
69 while (stacksize
< REDIS_THREAD_STACK_SIZE
) stacksize
*= 2;
70 pthread_attr_setstacksize(&attr
, stacksize
);
72 /* Ready to spawn our thread */
73 if (pthread_create(&thread
,&attr
,bioProcessBackgroundJobs
,NULL
) != 0) {
74 redisLog(REDIS_WARNING
,"Fatal: Can't initialize Background Jobs.");
79 void bioCreateBackgroundJob(int type
, void *data
) {
80 struct bio_job
*job
= zmalloc(sizeof(*job
));
84 pthread_mutex_lock(&bio_mutex
);
85 listAddNodeTail(bio_jobs
,job
);
87 pthread_cond_signal(&bio_condvar
);
88 pthread_mutex_unlock(&bio_mutex
);
91 void *bioProcessBackgroundJobs(void *arg
) {
95 pthread_detach(pthread_self());
96 pthread_mutex_lock(&bio_mutex
);
101 /* The loop always starts with the lock hold. */
102 if (listLength(bio_jobs
) == 0) {
103 pthread_cond_wait(&bio_condvar
,&bio_mutex
);
106 /* Pop the job from the queue. */
107 ln
= listFirst(bio_jobs
);
110 listDelNode(bio_jobs
,ln
);
111 /* It is now possible to unlock the background system as we know have
112 * a stand alone job structure to process.*/
113 pthread_mutex_unlock(&bio_mutex
);
115 /* Process the job accordingly to its type. */
116 if (type
== REDIS_BIO_CLOSE_FILE
) {
117 close((long)job
->data
);
119 redisPanic("Wrong job type in bioProcessBackgroundJobs().");
123 /* Lock again before reiterating the loop, if there are no longer
124 * jobs to process we'll block again in pthread_cond_wait(). */
125 pthread_mutex_lock(&bio_mutex
);
130 /* Return the number of pending jobs of the specified type. */
131 unsigned long long bioPendingJobsOfType(int type
) {
132 unsigned long long val
;
133 pthread_mutex_lock(&bio_mutex
);
134 val
= bio_pending
[type
];
135 pthread_mutex_unlock(&bio_mutex
);
139 /* Wait until the number of pending jobs of the specified type are
140 * less or equal to the specified number.
142 * This function may block for long time, it should only be used to perform
143 * special tasks like AOF rewriting or alike. */
144 void bioWaitPendingJobsLE(int type
, unsigned long long num
) {
145 unsigned long long iteration
= 0;
147 /* We poll the jobs queue aggressively to start, and gradually relax
148 * the polling speed if it is going to take too much time. */
151 if (iteration
> 1000 && iteration
<= 10000) {
153 } else if (iteration
> 10000) {
156 if (bioPendingJobsOfType(type
) <= num
) break;