1 /* Background I/O service for Redis. 
   3  * This file implements operations that we need to perform in the background. 
   4  * Currently there is only a single operation, that is a background close(2) 
   5  * system call. This is needed as when the process is the last owner of a 
   6  * reference to a file closing it means unlinking it, and the deletion of the 
   7  * file is slow, blocking the server. 
   9  * In the future we'll either continue implementing new things we need or 
  10  * we'll switch to libeio. However there are probably long term uses for this 
  11  * file as we may want to put here Redis specific background tasks (for instance 
  12  * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL 
  18  * The design is trivial, we have a structure representing a job to perform 
  19  * and a different thread and job queue for every job type. 
  20  * Every thread wait for new jobs in its queue, and process every job 
  23  * Jobs of the same type are guaranteed to be processed from the least 
  24  * recently inserted to the most recently inserted (older jobs processed 
  27  * Currently there is no way for the creator of the job to be notified about 
  28  * the completion of the operation, this will only be added when/if needed. 
  30  * ---------------------------------------------------------------------------- 
  32  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com> 
  33  * All rights reserved. 
  35  * Redistribution and use in source and binary forms, with or without 
  36  * modification, are permitted provided that the following conditions are met: 
  38  *   * Redistributions of source code must retain the above copyright notice, 
  39  *     this list of conditions and the following disclaimer. 
  40  *   * Redistributions in binary form must reproduce the above copyright 
  41  *     notice, this list of conditions and the following disclaimer in the 
  42  *     documentation and/or other materials provided with the distribution. 
  43  *   * Neither the name of Redis nor the names of its contributors may be used 
  44  *     to endorse or promote products derived from this software without 
  45  *     specific prior written permission. 
  47  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 
  48  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
  49  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
  50  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
  51  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
  52  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
  53  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
  54  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
  55  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 
  56  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 
  57  * POSSIBILITY OF SUCH DAMAGE. 
  64 static pthread_mutex_t bio_mutex
[REDIS_BIO_NUM_OPS
]; 
  65 static pthread_cond_t bio_condvar
[REDIS_BIO_NUM_OPS
]; 
  66 static list 
*bio_jobs
[REDIS_BIO_NUM_OPS
]; 
  67 /* The following array is used to hold the number of pending jobs for every 
  68  * OP type. This allows us to export the bioPendingJobsOfType() API that is 
  69  * useful when the main thread wants to perform some operation that may involve 
  70  * objects shared with the background thread. The main thread will just wait 
  71  * that there are no longer jobs of this type to be executed before performing 
  72  * the sensible operation. This data is also useful for reporting. */ 
  73 static unsigned long long bio_pending
[REDIS_BIO_NUM_OPS
]; 
  75 /* This structure represents a background Job. It is only used locally to this 
  76  * file as the API deos not expose the internals at all. */ 
  78     time_t time
; /* Time at which the job was created. */ 
  79     /* Job specific arguments pointers. If we need to pass more than three 
  80      * arguments we can just pass a pointer to a structure or alike. */ 
  81     void *arg1
, *arg2
, *arg3
; 
  84 void *bioProcessBackgroundJobs(void *arg
); 
  86 /* Make sure we have enough stack to perform all the things we do in the 
  88 #define REDIS_THREAD_STACK_SIZE (1024*1024*4) 
  90 /* Initialize the background system, spawning the thread. */ 
  97     /* Initialization of state vars and objects */ 
  98     for (j 
= 0; j 
< REDIS_BIO_NUM_OPS
; j
++) { 
  99         pthread_mutex_init(&bio_mutex
[j
],NULL
); 
 100         pthread_cond_init(&bio_condvar
[j
],NULL
); 
 101         bio_jobs
[j
] = listCreate(); 
 105     /* Set the stack size as by default it may be small in some system */ 
 106     pthread_attr_init(&attr
); 
 107     pthread_attr_getstacksize(&attr
,&stacksize
); 
 108     if (!stacksize
) stacksize 
= 1; /* The world is full of Solaris Fixes */ 
 109     while (stacksize 
< REDIS_THREAD_STACK_SIZE
) stacksize 
*= 2; 
 110     pthread_attr_setstacksize(&attr
, stacksize
); 
 112     /* Ready to spawn our threads. We use the single argument the thread 
 113      * function accepts in order to pass the job ID the thread is 
 115     for (j 
= 0; j 
< REDIS_BIO_NUM_OPS
; j
++) { 
 116         void *arg 
= (void*)(unsigned long) j
; 
 117         if (pthread_create(&thread
,&attr
,bioProcessBackgroundJobs
,arg
) != 0) { 
 118             redisLog(REDIS_WARNING
,"Fatal: Can't initialize Background Jobs."); 
 124 void bioCreateBackgroundJob(int type
, void *arg1
, void *arg2
, void *arg3
) { 
 125     struct bio_job 
*job 
= zmalloc(sizeof(*job
)); 
 127     job
->time 
= time(NULL
); 
 131     pthread_mutex_lock(&bio_mutex
[type
]); 
 132     listAddNodeTail(bio_jobs
[type
],job
); 
 134     pthread_cond_signal(&bio_condvar
[type
]); 
 135     pthread_mutex_unlock(&bio_mutex
[type
]); 
 138 void *bioProcessBackgroundJobs(void *arg
) { 
 140     unsigned long type 
= (unsigned long) arg
; 
 143     pthread_detach(pthread_self()); 
 144     pthread_mutex_lock(&bio_mutex
[type
]); 
 145     /* Block SIGALRM so we are sure that only the main thread will 
 146      * receive the watchdog signal. */ 
 147     sigemptyset(&sigset
); 
 148     sigaddset(&sigset
, SIGALRM
); 
 149     if (pthread_sigmask(SIG_BLOCK
, &sigset
, NULL
)) 
 150         redisLog(REDIS_WARNING
, 
 151             "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno
)); 
 156         /* The loop always starts with the lock hold. */ 
 157         if (listLength(bio_jobs
[type
]) == 0) { 
 158             pthread_cond_wait(&bio_condvar
[type
],&bio_mutex
[type
]); 
 161         /* Pop the job from the queue. */ 
 162         ln 
= listFirst(bio_jobs
[type
]); 
 164         /* It is now possible to unlock the background system as we know have 
 165          * a stand alone job structure to process.*/ 
 166         pthread_mutex_unlock(&bio_mutex
[type
]); 
 168         /* Process the job accordingly to its type. */ 
 169         if (type 
== REDIS_BIO_CLOSE_FILE
) { 
 170             close((long)job
->arg1
); 
 171         } else if (type 
== REDIS_BIO_AOF_FSYNC
) { 
 172             aof_fsync((long)job
->arg1
); 
 174             redisPanic("Wrong job type in bioProcessBackgroundJobs()."); 
 178         /* Lock again before reiterating the loop, if there are no longer 
 179          * jobs to process we'll block again in pthread_cond_wait(). */ 
 180         pthread_mutex_lock(&bio_mutex
[type
]); 
 181         listDelNode(bio_jobs
[type
],ln
); 
 186 /* Return the number of pending jobs of the specified type. */ 
 187 unsigned long long bioPendingJobsOfType(int type
) { 
 188     unsigned long long val
; 
 189     pthread_mutex_lock(&bio_mutex
[type
]); 
 190     val 
= bio_pending
[type
]; 
 191     pthread_mutex_unlock(&bio_mutex
[type
]); 
 195 #if 0 /* We don't use the following code for now, and bioWaitPendingJobsLE 
 196          probably needs a rewrite using conditional variables instead of the 
 197          current implementation. */ 
 200 /* Wait until the number of pending jobs of the specified type are 
 201  * less or equal to the specified number. 
 203  * This function may block for long time, it should only be used to perform 
 204  * the following tasks: 
 206  * 1) To avoid that the main thread is pushing jobs of a given time so fast 
 207  *    that the background thread can't process them at the same speed. 
 208  *    So before creating a new job of a given type the main thread should 
 209  *    call something like: bioWaitPendingJobsLE(job_type,10000); 
 210  * 2) In order to perform special operations that make it necessary to be sure 
 211  *    no one is touching shared resourced in the background. 
 213 void bioWaitPendingJobsLE(int type
, unsigned long long num
) { 
 214     unsigned long long iteration 
= 0; 
 216     /* We poll the jobs queue aggressively to start, and gradually relax 
 217      * the polling speed if it is going to take too much time. */ 
 220         if (iteration 
> 1000 && iteration 
<= 10000) { 
 222         } else if (iteration 
> 10000) { 
 225         if (bioPendingJobsOfType(type
) <= num
) break; 
 229 /* Return the older job of the specified type. */ 
 230 time_t bioOlderJobOfType(int type
) { 
 235     pthread_mutex_lock(&bio_mutex
[type
]); 
 236     ln 
= listFirst(bio_jobs
[type
]); 
 238         pthread_mutex_unlock(&bio_mutex
[type
]); 
 243     pthread_mutex_unlock(&bio_mutex
[type
]);