* Every thread wait for new jobs in its queue, and process every job
* sequentially.
*
+ * Jobs of the same type are guaranteed to be processed from the least
+ * recently inserted to the most recently inserted (older jobs processed
+ * first).
+ *
* Currently there is no way for the creator of the job to be notified about
* the completion of the operation, this will only be added when/if needed.
*/
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg;
+ sigset_t sigset;
pthread_detach(pthread_self());
pthread_mutex_lock(&bio_mutex[type]);
+ /* Block SIGALRM so we are sure that only the main thread will
+ * receive the watchdog signal. */
+ sigemptyset(&sigset);
+ sigaddset(&sigset, SIGALRM);
+ if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
+ redisLog(REDIS_WARNING,
+ "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
+
while(1) {
listNode *ln;
/* Pop the job from the queue. */
ln = listFirst(bio_jobs[type]);
job = ln->value;
- listDelNode(bio_jobs[type],ln);
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[type]);
/* Process the job accordingly to its type. */
if (type == REDIS_BIO_CLOSE_FILE) {
close((long)job->arg1);
+ } else if (type == REDIS_BIO_AOF_FSYNC) {
+ aof_fsync((long)job->arg1);
} else {
redisPanic("Wrong job type in bioProcessBackgroundJobs().");
}
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
+ listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}
return val;
}
+#if 0 /* We don't use the following code for now, and bioWaitPendingJobsLE
+ probably needs a rewrite using conditional variables instead of the
+ current implementation. */
+
+
/* Wait until the number of pending jobs of the specified type are
* less or equal to the specified number.
*
pthread_mutex_lock(&bio_mutex[type]);
ln = listFirst(bio_jobs[type]);
+ if (ln == NULL) {
+ pthread_mutex_unlock(&bio_mutex[type]);
+ return 0;
+ }
job = ln->value;
time = job->time;
pthread_mutex_unlock(&bio_mutex[type]);
return time;
}
+#endif