X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/51335102acb364be4c0652ff4e91c9a080b1e7e0..9ad853ccde374ed371efee3b1654c6ebb39e06fa:/src/dscache.c diff --git a/src/dscache.c b/src/dscache.c index 4ebca708..15134cc7 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -132,6 +132,7 @@ void dsInit(void) { server.io_ready_clients = listCreate(); pthread_mutex_init(&server.io_mutex,NULL); pthread_cond_init(&server.io_condvar,NULL); + pthread_mutex_init(&server.bgsavethread_mutex,NULL); server.io_active_threads = 0; if (pipe(pipefds) == -1) { redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting." @@ -184,8 +185,7 @@ int cacheFreeOneEntry(void) { * are swappable objects */ int maxtries = 100; - if (dictSize(db->dict) == 0) continue; - for (i = 0; i < 5; i++) { + for (i = 0; i < 5 && dictSize(db->dict); i++) { dictEntry *de; double swappability; robj keyobj; @@ -323,7 +323,10 @@ void freeIOJob(iojob *j) { /* Every time a thread finished a Job, it writes a byte into the write side * of an unix pipe in order to "awake" the main thread, and this function - * is called. */ + * is called. + * + * If privdata != NULL the function will try to put more jobs in the queue + * of IO jobs to process as more room is made. */ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask) { @@ -331,7 +334,6 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int retval, processed = 0, toprocess = -1; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); - REDIS_NOTUSED(privdata); /* For every byte we read in the read side of the pipe, there is one * I/O job completed to process. */ @@ -390,6 +392,7 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, freeIOJob(j); } processed++; + if (privdata != NULL) cacheScheduleIOPushJobs(0); if (processed == toprocess) return; } if (retval < 0 && errno != EAGAIN) { @@ -411,6 +414,7 @@ void *IOThreadEntryPoint(void *arg) { iojob *j; listNode *ln; REDIS_NOTUSED(arg); + long long start; pthread_detach(pthread_self()); lockThreadedIO(); @@ -418,10 +422,13 @@ void *IOThreadEntryPoint(void *arg) { /* Get a new job to process */ if (listLength(server.io_newjobs) == 0) { /* Wait for more work to do */ + redisLog(REDIS_DEBUG,"[T] wait for signal"); pthread_cond_wait(&server.io_condvar,&server.io_mutex); + redisLog(REDIS_DEBUG,"[T] signal received"); continue; } - redisLog(REDIS_DEBUG,"%ld IO jobs to process", + start = ustime(); + redisLog(REDIS_DEBUG,"[T] %ld IO jobs to process", listLength(server.io_newjobs)); ln = listFirst(server.io_newjobs); j = ln->value; @@ -431,7 +438,7 @@ void *IOThreadEntryPoint(void *arg) { ln = listLast(server.io_processing); /* We use ln later to remove it */ unlockThreadedIO(); - redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'", + redisLog(REDIS_DEBUG,"[T] %ld: new job type %s: %p about key '%s'", (long) pthread_self(), (j->type == REDIS_IOJOB_LOAD) ? "load" : "save", (void*)j, (char*)j->key->ptr); @@ -444,22 +451,25 @@ void *IOThreadEntryPoint(void *arg) { if (j->val) j->expire = expire; } else if (j->type == REDIS_IOJOB_SAVE) { if (j->val) { - dsSet(j->db,j->key,j->val); + dsSet(j->db,j->key,j->val,j->expire); } else { dsDel(j->db,j->key); } } /* Done: insert the job into the processed queue */ - redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)", + redisLog(REDIS_DEBUG,"[T] %ld completed the job: %p (key %s)", (long) pthread_self(), (void*)j, (char*)j->key->ptr); + redisLog(REDIS_DEBUG,"[T] lock IO"); lockThreadedIO(); + redisLog(REDIS_DEBUG,"[T] IO locked"); listDelNode(server.io_processing,ln); listAddNodeTail(server.io_processed,j); /* Signal the main thread there is new stuff to process */ redisAssert(write(server.io_ready_pipe_write,"x",1) == 1); + redisLog(REDIS_WARNING,"TIME (%c): %lld\n", j->type == REDIS_IOJOB_LOAD ? 'L' : 'S', ustime()-start); } /* never reached, but that's the full pattern... */ unlockThreadedIO(); @@ -501,30 +511,39 @@ int processActiveIOJobs(int max) { while(max == -1 || max > 0) { int io_processed_len; + redisLog(REDIS_DEBUG,"[P] lock IO"); lockThreadedIO(); + redisLog(REDIS_DEBUG,"Waiting IO jobs processing: new:%d proessing:%d processed:%d",listLength(server.io_newjobs),listLength(server.io_processing),listLength(server.io_processed)); + if (listLength(server.io_newjobs) == 0 && listLength(server.io_processing) == 0) { /* There is nothing more to process */ + redisLog(REDIS_DEBUG,"[P] Nothing to process, unlock IO, return"); unlockThreadedIO(); break; } -#if 0 +#if 1 /* If there are new jobs we need to signal the thread to - * process the next one. */ - redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d", + * process the next one. FIXME: drop this if useless. */ + redisLog(REDIS_DEBUG,"[P] waitEmptyIOJobsQueue: new %d, processing %d, processed %d", listLength(server.io_newjobs), - listLength(server.io_processing)); + listLength(server.io_processing), + listLength(server.io_processed)); if (listLength(server.io_newjobs)) { + redisLog(REDIS_DEBUG,"[P] There are new jobs, signal"); pthread_cond_signal(&server.io_condvar); } #endif /* Check if we can process some finished job */ io_processed_len = listLength(server.io_processed); + redisLog(REDIS_DEBUG,"[P] Unblock IO"); unlockThreadedIO(); + redisLog(REDIS_DEBUG,"[P] Wait"); + usleep(10000); if (io_processed_len) { vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read, (void*)0xdeadbeef,0); @@ -576,7 +595,21 @@ void queueIOJob(iojob *j) { spawnIOThread(); } -void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) { +/* Consume all the IO scheduled operations, and all the thread IO jobs + * so that eventually the state of diskstore is a point-in-time snapshot. + * + * This is useful when we need to BGSAVE with diskstore enabled. */ +void cacheForcePointInTime(void) { + redisLog(REDIS_NOTICE,"Diskstore: synching on disk to reach point-in-time state."); + while (listLength(server.cache_io_queue) != 0) { + cacheScheduleIOPushJobs(REDIS_IO_ASAP); + processActiveIOJobs(1); + } + waitEmptyIOJobsQueue(); + processAllPendingIOJobs(); +} + +void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val, time_t expire) { iojob *j; j = zmalloc(sizeof(*j)); @@ -586,6 +619,7 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) { incrRefCount(key); j->val = val; if (val) incrRefCount(val); + j->expire = expire; lockThreadedIO(); queueIOJob(j); @@ -711,12 +745,15 @@ void cacheScheduleIO(redisDb *db, robj *key, int type) { * scheduled completion time, but just do the operation ASAP. This is useful * when we need to reclaim memory from the IO queue. */ -#define MAX_IO_JOBS_QUEUE 100 +#define MAX_IO_JOBS_QUEUE 10 int cacheScheduleIOPushJobs(int flags) { time_t now = time(NULL); listNode *ln; int jobs, topush = 0, pushed = 0; + /* Don't push new jobs if there is a threaded BGSAVE in progress. */ + if (server.bgsavethread != (pthread_t) -1) return 0; + /* Sync stuff on disk, but only if we have less * than MAX_IO_JOBS_QUEUE IO jobs. */ lockThreadedIO(); @@ -738,7 +775,8 @@ int cacheScheduleIOPushJobs(int flags) { if (op->type != REDIS_IO_LOAD && flags & REDIS_IO_ONLYLOADS) break; - if (!(flags & REDIS_IO_ASAP) && + /* Don't execute SAVE before the scheduled time for completion */ + if (op->type == REDIS_IO_SAVE && !(flags & REDIS_IO_ASAP) && (now - op->ctime) < server.cache_flush_delay) break; /* Don't add a SAVE job in the IO thread queue if there is already @@ -762,20 +800,23 @@ int cacheScheduleIOPushJobs(int flags) { op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr); if (op->type == REDIS_IO_LOAD) { - dsCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL); + cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL,0); } else { + time_t expire = -1; + /* Lookup the key, in order to put the current value in the IO * Job. Otherwise if the key does not exists we schedule a disk * store delete operation, setting the value to NULL. */ de = dictFind(op->db->dict,op->key->ptr); if (de) { val = dictGetEntryVal(de); + expire = getExpire(op->db,op->key); } else { /* Setting the value to NULL tells the IO thread to delete * the key on disk. */ val = NULL; } - dsCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val); + cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val,expire); } /* Mark the operation as in progress. */ cacheScheduleIODelFlag(op->db,op->key,op->type);