X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/76a41fb163b6e38845323e2b6a1269a31552c260..f6c0bba8bc1b11fd789b0139dd86c852194ebc4c:/redis.c diff --git a/redis.c b/redis.c index aed49ed8..76784deb 100644 --- a/redis.c +++ b/redis.c @@ -165,11 +165,10 @@ #define REDIS_VM_MAX_RANDOM_JUMP 4096 #define REDIS_VM_MAX_THREADS 32 #define REDIS_THREAD_STACK_SIZE (1024*1024*4) -/* The following is the number of completed I/O jobs to process when the - * handelr is called. 1 is the minimum, and also the default, as it allows - * to block as little as possible other accessing clients. While Virtual - * Memory I/O operations are performed by threads, this operations must - * be processed by the main thread when completed to take effect. */ +/* The following is the *percentage* of completed I/O jobs to process when the + * handelr is called. While Virtual Memory I/O operations are performed by + * threads, this operations must be processed by the main thread when completed + * in order to take effect. */ #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 /* Client flags */ @@ -566,6 +565,7 @@ static int vmWriteObjectOnSwap(robj *o, off_t page); static robj *vmReadObjectFromSwap(off_t page, int type); static void waitEmptyIOJobsQueue(void); static void vmReopenSwapFile(void); +static int vmFreePage(off_t page); static void authCommand(redisClient *c); static void pingCommand(redisClient *c); @@ -2498,7 +2498,8 @@ static void incrRefCount(robj *o) { static void decrRefCount(void *obj) { robj *o = obj; - /* Object is swapped out, or in the process of being loaded. */ + /* Object is a key of a swapped out value, or in the process of being + * loaded. */ if (server.vm_enabled && (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING)) { @@ -7092,6 +7093,7 @@ static void vmInit(void) { static void vmMarkPageUsed(off_t page) { off_t byte = page/8; int bit = page&7; + redisAssert(vmFreePage(page) == 1); server.vm_bitmap[byte] |= 1< 100000000) { + *((char*)-1) = 'x'; + } } /* Test if the page is free */ @@ -7161,7 +7169,6 @@ static int vmFindContiguousPages(off_t *first, off_t n) { while(offset < server.vm_pages) { off_t this = base+offset; - redisLog(REDIS_DEBUG, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X'); /* If we overflow, restart from page zero */ if (this >= server.vm_pages) { this -= server.vm_pages; @@ -7171,6 +7178,7 @@ static int vmFindContiguousPages(off_t *first, off_t n) { numfree = 0; } } + redisLog(REDIS_DEBUG, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X'); if (vmFreePage(this)) { /* This is a free page */ numfree++; @@ -7494,6 +7502,7 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, char buf[1]; int retval; int processed = 0; + int toprocess = -1; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); @@ -7511,6 +7520,10 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, /* Get the processed element (the oldest one) */ lockThreadedIO(); assert(listLength(server.io_processed) != 0); + if (toprocess == -1) { + toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100; + if (toprocess <= 0) toprocess = 1; + } ln = listFirst(server.io_processed); j = ln->value; listDelNode(server.io_processed,ln); @@ -7599,7 +7612,7 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, } } processed++; - if (processed == REDIS_MAX_COMPLETED_JOBS_PROCESSED) return; + if (processed == toprocess) return; } if (retval < 0 && errno != EAGAIN) { redisLog(REDIS_WARNING, @@ -7640,11 +7653,11 @@ again: if (job->canceled) continue; /* Skip this, already canceled. */ if (compareStringObjects(job->key,o) == 0) { - redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (LIST ID %d)\n", - (void*)job, (char*)o->ptr, i); + redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n", + (void*)job, (char*)o->ptr, job->type, i); /* Mark the pages as free since the swap didn't happened * or happened but is now discarded. */ - if (job->type == REDIS_IOJOB_DO_SWAP) + if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP) vmMarkPagesFree(job->page,job->pages); /* Cancel the job. It depends on the list the job is * living in. */ @@ -7759,6 +7772,8 @@ static void spawnIOThread(void) { * fork() in order to BGSAVE or BGREWRITEAOF. */ static void waitEmptyIOJobsQueue(void) { while(1) { + int io_processed_len; + lockThreadedIO(); if (listLength(server.io_newjobs) == 0 && listLength(server.io_processing) == 0 && @@ -7767,8 +7782,18 @@ static void waitEmptyIOJobsQueue(void) { unlockThreadedIO(); return; } + /* While waiting for empty jobs queue condition we post-process some + * finshed job, as I/O threads may be hanging trying to write against + * the io_ready_pipe_write FD but there are so much pending jobs that + * it's blocking. */ + io_processed_len = listLength(server.io_processed); unlockThreadedIO(); - usleep(10000); /* 10 milliseconds */ + if (io_processed_len) { + vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,NULL,0); + usleep(1000); /* 1 millisecond */ + } else { + usleep(10000); /* 10 milliseconds */ + } } }