X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/36c17a53b6aece050b79b667fd32064f6eb116c2..69298a05eb23cbbf60f9008faa2e11866ab4352a:/src/dscache.c diff --git a/src/dscache.c b/src/dscache.c index 1c419c6a..a4d045e1 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -185,8 +185,7 @@ int cacheFreeOneEntry(void) { * are swappable objects */ int maxtries = 100; - if (dictSize(db->dict) == 0) continue; - for (i = 0; i < 5; i++) { + for (i = 0; i < 5 && dictSize(db->dict); i++) { dictEntry *de; double swappability; robj keyobj; @@ -324,7 +323,14 @@ void freeIOJob(iojob *j) { /* Every time a thread finished a Job, it writes a byte into the write side * of an unix pipe in order to "awake" the main thread, and this function - * is called. */ + * is called. + * + * If privdata == NULL the function will try to put more jobs in the queue + * of IO jobs to process as more room is made. privdata is equal to NULL + * when the function is called from the event loop, so we want to push + * more IO jobs in the queue. Instead when the function is called by + * other functions that want to create a write-barrier to avoid race + * conditions we don't push new jobs in the queue. */ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask) { @@ -332,7 +338,6 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int retval, processed = 0, toprocess = -1; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); - REDIS_NOTUSED(privdata); /* For every byte we read in the read side of the pipe, there is one * I/O job completed to process. */ @@ -391,6 +396,7 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, freeIOJob(j); } processed++; + if (privdata == NULL) cacheScheduleIOPushJobs(0); if (processed == toprocess) return; } if (retval < 0 && errno != EAGAIN) { @@ -412,6 +418,7 @@ void *IOThreadEntryPoint(void *arg) { iojob *j; listNode *ln; REDIS_NOTUSED(arg); + long long start; pthread_detach(pthread_self()); lockThreadedIO(); @@ -419,10 +426,13 @@ void *IOThreadEntryPoint(void *arg) { /* Get a new job to process */ if (listLength(server.io_newjobs) == 0) { /* Wait for more work to do */ + redisLog(REDIS_DEBUG,"[T] wait for signal"); pthread_cond_wait(&server.io_condvar,&server.io_mutex); + redisLog(REDIS_DEBUG,"[T] signal received"); continue; } - redisLog(REDIS_DEBUG,"%ld IO jobs to process", + start = ustime(); + redisLog(REDIS_DEBUG,"[T] %ld IO jobs to process", listLength(server.io_newjobs)); ln = listFirst(server.io_newjobs); j = ln->value; @@ -432,7 +442,7 @@ void *IOThreadEntryPoint(void *arg) { ln = listLast(server.io_processing); /* We use ln later to remove it */ unlockThreadedIO(); - redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'", + redisLog(REDIS_DEBUG,"[T] %ld: new job type %s: %p about key '%s'", (long) pthread_self(), (j->type == REDIS_IOJOB_LOAD) ? "load" : "save", (void*)j, (char*)j->key->ptr); @@ -445,22 +455,25 @@ void *IOThreadEntryPoint(void *arg) { if (j->val) j->expire = expire; } else if (j->type == REDIS_IOJOB_SAVE) { if (j->val) { - dsSet(j->db,j->key,j->val); + dsSet(j->db,j->key,j->val,j->expire); } else { dsDel(j->db,j->key); } } /* Done: insert the job into the processed queue */ - redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)", + redisLog(REDIS_DEBUG,"[T] %ld completed the job: %p (key %s)", (long) pthread_self(), (void*)j, (char*)j->key->ptr); + redisLog(REDIS_DEBUG,"[T] lock IO"); lockThreadedIO(); + redisLog(REDIS_DEBUG,"[T] IO locked"); listDelNode(server.io_processing,ln); listAddNodeTail(server.io_processed,j); /* Signal the main thread there is new stuff to process */ redisAssert(write(server.io_ready_pipe_write,"x",1) == 1); + redisLog(REDIS_DEBUG,"TIME (%c): %lld\n", j->type == REDIS_IOJOB_LOAD ? 'L' : 'S', ustime()-start); } /* never reached, but that's the full pattern... */ unlockThreadedIO(); @@ -502,30 +515,39 @@ int processActiveIOJobs(int max) { while(max == -1 || max > 0) { int io_processed_len; + redisLog(REDIS_DEBUG,"[P] lock IO"); lockThreadedIO(); + redisLog(REDIS_DEBUG,"Waiting IO jobs processing: new:%d proessing:%d processed:%d",listLength(server.io_newjobs),listLength(server.io_processing),listLength(server.io_processed)); + if (listLength(server.io_newjobs) == 0 && listLength(server.io_processing) == 0) { /* There is nothing more to process */ + redisLog(REDIS_DEBUG,"[P] Nothing to process, unlock IO, return"); unlockThreadedIO(); break; } -#if 0 +#if 1 /* If there are new jobs we need to signal the thread to * process the next one. FIXME: drop this if useless. */ - redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d", + redisLog(REDIS_DEBUG,"[P] waitEmptyIOJobsQueue: new %d, processing %d, processed %d", listLength(server.io_newjobs), - listLength(server.io_processing)); + listLength(server.io_processing), + listLength(server.io_processed)); if (listLength(server.io_newjobs)) { + redisLog(REDIS_DEBUG,"[P] There are new jobs, signal"); pthread_cond_signal(&server.io_condvar); } #endif /* Check if we can process some finished job */ io_processed_len = listLength(server.io_processed); + redisLog(REDIS_DEBUG,"[P] Unblock IO"); unlockThreadedIO(); + redisLog(REDIS_DEBUG,"[P] Wait"); + usleep(10000); if (io_processed_len) { vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read, (void*)0xdeadbeef,0); @@ -591,7 +613,7 @@ void cacheForcePointInTime(void) { processAllPendingIOJobs(); } -void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) { +void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val, time_t expire) { iojob *j; j = zmalloc(sizeof(*j)); @@ -601,6 +623,7 @@ void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) { incrRefCount(key); j->val = val; if (val) incrRefCount(val); + j->expire = expire; lockThreadedIO(); queueIOJob(j); @@ -726,12 +749,15 @@ void cacheScheduleIO(redisDb *db, robj *key, int type) { * scheduled completion time, but just do the operation ASAP. This is useful * when we need to reclaim memory from the IO queue. */ -#define MAX_IO_JOBS_QUEUE 100 +#define MAX_IO_JOBS_QUEUE 10 int cacheScheduleIOPushJobs(int flags) { time_t now = time(NULL); listNode *ln; int jobs, topush = 0, pushed = 0; + /* Don't push new jobs if there is a threaded BGSAVE in progress. */ + if (server.bgsavethread != (pthread_t) -1) return 0; + /* Sync stuff on disk, but only if we have less * than MAX_IO_JOBS_QUEUE IO jobs. */ lockThreadedIO(); @@ -753,7 +779,8 @@ int cacheScheduleIOPushJobs(int flags) { if (op->type != REDIS_IO_LOAD && flags & REDIS_IO_ONLYLOADS) break; - if (!(flags & REDIS_IO_ASAP) && + /* Don't execute SAVE before the scheduled time for completion */ + if (op->type == REDIS_IO_SAVE && !(flags & REDIS_IO_ASAP) && (now - op->ctime) < server.cache_flush_delay) break; /* Don't add a SAVE job in the IO thread queue if there is already @@ -777,20 +804,23 @@ int cacheScheduleIOPushJobs(int flags) { op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr); if (op->type == REDIS_IO_LOAD) { - cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL); + cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL,0); } else { + time_t expire = -1; + /* Lookup the key, in order to put the current value in the IO * Job. Otherwise if the key does not exists we schedule a disk * store delete operation, setting the value to NULL. */ de = dictFind(op->db->dict,op->key->ptr); if (de) { val = dictGetEntryVal(de); + expire = getExpire(op->db,op->key); } else { /* Setting the value to NULL tells the IO thread to delete * the key on disk. */ val = NULL; } - cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val); + cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val,expire); } /* Mark the operation as in progress. */ cacheScheduleIODelFlag(op->db,op->key,op->type);