X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/f1df1739e3f724e787e9a4da0a0f9abb352f65ce..7493d2a0325fe33dc75317bfedf9b1c1e5b0d0b5:/src/dscache.c diff --git a/src/dscache.c b/src/dscache.c index 20c66d59..cbe9bb01 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -185,8 +185,7 @@ int cacheFreeOneEntry(void) { * are swappable objects */ int maxtries = 100; - if (dictSize(db->dict) == 0) continue; - for (i = 0; i < 5; i++) { + for (i = 0; i < 5 && dictSize(db->dict); i++) { dictEntry *de; double swappability; robj keyobj; @@ -213,7 +212,7 @@ int cacheFreeOneEntry(void) { } } if (best == NULL) { - /* Was not able to fix a single object... we should check if our + /* Not able to free a single object? we should check if our * IO queues have stuff in queue, and try to consume the queue * otherwise we'll use an infinite amount of memory if changes to * the dataset are faster than I/O */ @@ -241,13 +240,6 @@ int cacheFreeOneEntry(void) { return REDIS_OK; } -/* Return true if it's safe to swap out objects in a given moment. - * Basically we don't want to swap objects out while there is a BGSAVE - * or a BGAEOREWRITE running in backgroud. */ -int dsCanTouchDiskStore(void) { - return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1); -} - /* ==================== Disk store negative caching ======================== * * When disk store is enabled, we need negative caching, that is, to remember @@ -324,7 +316,14 @@ void freeIOJob(iojob *j) { /* Every time a thread finished a Job, it writes a byte into the write side * of an unix pipe in order to "awake" the main thread, and this function - * is called. */ + * is called. + * + * If privdata == NULL the function will try to put more jobs in the queue + * of IO jobs to process as more room is made. privdata is equal to NULL + * when the function is called from the event loop, so we want to push + * more IO jobs in the queue. Instead when the function is called by + * other functions that want to create a write-barrier to avoid race + * conditions we don't push new jobs in the queue. */ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask) { @@ -332,7 +331,6 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int retval, processed = 0, toprocess = -1; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); - REDIS_NOTUSED(privdata); /* For every byte we read in the read side of the pipe, there is one * I/O job completed to process. */ @@ -385,12 +383,12 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, } cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_LOADINPROG); handleClientsBlockedOnSwappedKey(j->db,j->key); - freeIOJob(j); } else if (j->type == REDIS_IOJOB_SAVE) { cacheScheduleIODelFlag(j->db,j->key,REDIS_IO_SAVEINPROG); - freeIOJob(j); } + freeIOJob(j); processed++; + if (privdata == NULL) cacheScheduleIOPushJobs(0); if (processed == toprocess) return; } if (retval < 0 && errno != EAGAIN) { @@ -412,6 +410,7 @@ void *IOThreadEntryPoint(void *arg) { iojob *j; listNode *ln; REDIS_NOTUSED(arg); + long long start; pthread_detach(pthread_self()); lockThreadedIO(); @@ -419,10 +418,13 @@ void *IOThreadEntryPoint(void *arg) { /* Get a new job to process */ if (listLength(server.io_newjobs) == 0) { /* Wait for more work to do */ + redisLog(REDIS_DEBUG,"[T] wait for signal"); pthread_cond_wait(&server.io_condvar,&server.io_mutex); + redisLog(REDIS_DEBUG,"[T] signal received"); continue; } - redisLog(REDIS_DEBUG,"%ld IO jobs to process", + start = ustime(); + redisLog(REDIS_DEBUG,"[T] %ld IO jobs to process", listLength(server.io_newjobs)); ln = listFirst(server.io_newjobs); j = ln->value; @@ -432,7 +434,7 @@ void *IOThreadEntryPoint(void *arg) { ln = listLast(server.io_processing); /* We use ln later to remove it */ unlockThreadedIO(); - redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'", + redisLog(REDIS_DEBUG,"[T] %ld: new job type %s: %p about key '%s'", (long) pthread_self(), (j->type == REDIS_IOJOB_LOAD) ? "load" : "save", (void*)j, (char*)j->key->ptr); @@ -445,22 +447,25 @@ void *IOThreadEntryPoint(void *arg) { if (j->val) j->expire = expire; } else if (j->type == REDIS_IOJOB_SAVE) { if (j->val) { - dsSet(j->db,j->key,j->val); + dsSet(j->db,j->key,j->val,j->expire); } else { dsDel(j->db,j->key); } } /* Done: insert the job into the processed queue */ - redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)", + redisLog(REDIS_DEBUG,"[T] %ld completed the job: %p (key %s)", (long) pthread_self(), (void*)j, (char*)j->key->ptr); + redisLog(REDIS_DEBUG,"[T] lock IO"); lockThreadedIO(); + redisLog(REDIS_DEBUG,"[T] IO locked"); listDelNode(server.io_processing,ln); listAddNodeTail(server.io_processed,j); /* Signal the main thread there is new stuff to process */ redisAssert(write(server.io_ready_pipe_write,"x",1) == 1); + redisLog(REDIS_DEBUG,"TIME (%c): %lld\n", j->type == REDIS_IOJOB_LOAD ? 'L' : 'S', ustime()-start); } /* never reached, but that's the full pattern... */ unlockThreadedIO(); @@ -502,30 +507,39 @@ int processActiveIOJobs(int max) { while(max == -1 || max > 0) { int io_processed_len; + redisLog(REDIS_DEBUG,"[P] lock IO"); lockThreadedIO(); + redisLog(REDIS_DEBUG,"Waiting IO jobs processing: new:%d proessing:%d processed:%d",listLength(server.io_newjobs),listLength(server.io_processing),listLength(server.io_processed)); + if (listLength(server.io_newjobs) == 0 && listLength(server.io_processing) == 0) { /* There is nothing more to process */ + redisLog(REDIS_DEBUG,"[P] Nothing to process, unlock IO, return"); unlockThreadedIO(); break; } -#if 0 +#if 1 /* If there are new jobs we need to signal the thread to * process the next one. FIXME: drop this if useless. */ - redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d", + redisLog(REDIS_DEBUG,"[P] waitEmptyIOJobsQueue: new %d, processing %d, processed %d", listLength(server.io_newjobs), - listLength(server.io_processing)); + listLength(server.io_processing), + listLength(server.io_processed)); if (listLength(server.io_newjobs)) { + redisLog(REDIS_DEBUG,"[P] There are new jobs, signal"); pthread_cond_signal(&server.io_condvar); } #endif /* Check if we can process some finished job */ io_processed_len = listLength(server.io_processed); + redisLog(REDIS_DEBUG,"[P] Unblock IO"); unlockThreadedIO(); + redisLog(REDIS_DEBUG,"[P] Wait"); + usleep(10000); if (io_processed_len) { vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read, (void*)0xdeadbeef,0); @@ -573,8 +587,6 @@ void queueIOJob(iojob *j) { redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n", (void*)j, j->type, (char*)j->key->ptr); listAddNodeTail(server.io_newjobs,j); - if (server.io_active_threads < server.vm_max_threads) - spawnIOThread(); } /* Consume all the IO scheduled operations, and all the thread IO jobs @@ -591,7 +603,7 @@ void cacheForcePointInTime(void) { processAllPendingIOJobs(); } -void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) { +void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val, time_t expire) { iojob *j; j = zmalloc(sizeof(*j)); @@ -601,6 +613,7 @@ void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) { incrRefCount(key); j->val = val; if (val) incrRefCount(val); + j->expire = expire; lockThreadedIO(); queueIOJob(j); @@ -726,7 +739,7 @@ void cacheScheduleIO(redisDb *db, robj *key, int type) { * scheduled completion time, but just do the operation ASAP. This is useful * when we need to reclaim memory from the IO queue. */ -#define MAX_IO_JOBS_QUEUE 100 +#define MAX_IO_JOBS_QUEUE 10 int cacheScheduleIOPushJobs(int flags) { time_t now = time(NULL); listNode *ln; @@ -781,20 +794,23 @@ int cacheScheduleIOPushJobs(int flags) { op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr); if (op->type == REDIS_IO_LOAD) { - cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL); + cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL,0); } else { + time_t expire = -1; + /* Lookup the key, in order to put the current value in the IO * Job. Otherwise if the key does not exists we schedule a disk * store delete operation, setting the value to NULL. */ de = dictFind(op->db->dict,op->key->ptr); if (de) { val = dictGetEntryVal(de); + expire = getExpire(op->db,op->key); } else { /* Setting the value to NULL tells the IO thread to delete * the key on disk. */ val = NULL; } - cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val); + cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val,expire); } /* Mark the operation as in progress. */ cacheScheduleIODelFlag(op->db,op->key,op->type); @@ -874,8 +890,16 @@ int waitForSwappedKey(redisClient *c, robj *key) { listAddNodeTail(l,c); /* Are we already loading the key from disk? If not create a job */ - if (de == NULL) - cacheScheduleIO(c->db,key,REDIS_IO_LOAD); + if (de == NULL) { + int flags = cacheScheduleIOGetFlags(c->db,key); + + /* It is possible that even if there are no clients waiting for + * a load operation, still we have a load operation in progress. + * For instance think to a client performing a GET and then + * closing the connection */ + if ((flags & (REDIS_IO_LOAD|REDIS_IO_LOADINPROG)) == 0) + cacheScheduleIO(c->db,key,REDIS_IO_LOAD); + } return 1; }