#define REDIS_VM_MAX_RANDOM_JUMP 4096
#define REDIS_VM_MAX_THREADS 32
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
-/* The following is the number of completed I/O jobs to process when the
- * handelr is called. 1 is the minimum, and also the default, as it allows
- * to block as little as possible other accessing clients. While Virtual
- * Memory I/O operations are performed by threads, this operations must
- * be processed by the main thread when completed to take effect. */
+/* The following is the *percentage* of completed I/O jobs to process when the
+ * handelr is called. While Virtual Memory I/O operations are performed by
+ * threads, this operations must be processed by the main thread when completed
+ * in order to take effect. */
#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
/* Client flags */
static robj *vmReadObjectFromSwap(off_t page, int type);
static void waitEmptyIOJobsQueue(void);
static void vmReopenSwapFile(void);
+static int vmFreePage(off_t page);
static void authCommand(redisClient *c);
static void pingCommand(redisClient *c);
static void decrRefCount(void *obj) {
robj *o = obj;
- /* Object is swapped out, or in the process of being loaded. */
+ /* Object is a key of a swapped out value, or in the process of being
+ * loaded. */
if (server.vm_enabled &&
(o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
{
static void vmMarkPageUsed(off_t page) {
off_t byte = page/8;
int bit = page&7;
+ redisAssert(vmFreePage(page) == 1);
server.vm_bitmap[byte] |= 1<<bit;
redisLog(REDIS_DEBUG,"Mark used: %lld (byte:%lld bit:%d)\n",
(long long)page, (long long)byte, bit);
static void vmMarkPageFree(off_t page) {
off_t byte = page/8;
int bit = page&7;
+ redisAssert(vmFreePage(page) == 0);
server.vm_bitmap[byte] &= ~(1<<bit);
+ redisLog(REDIS_DEBUG,"Mark free: %lld (byte:%lld bit:%d)\n",
+ (long long)page, (long long)byte, bit);
}
/* Mark N contiguous pages as free, with 'page' being the first. */
for (j = 0; j < count; j++)
vmMarkPageFree(page+j);
server.vm_stats_used_pages -= count;
+ if (server.vm_stats_used_pages > 100000000) {
+ *((char*)-1) = 'x';
+ }
}
/* Test if the page is free */
while(offset < server.vm_pages) {
off_t this = base+offset;
- redisLog(REDIS_DEBUG, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
/* If we overflow, restart from page zero */
if (this >= server.vm_pages) {
this -= server.vm_pages;
numfree = 0;
}
}
+ redisLog(REDIS_DEBUG, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
if (vmFreePage(this)) {
/* This is a free page */
numfree++;
char buf[1];
int retval;
int processed = 0;
+ int toprocess = -1;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
/* Get the processed element (the oldest one) */
lockThreadedIO();
assert(listLength(server.io_processed) != 0);
+ if (toprocess == -1) {
+ toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100;
+ if (toprocess <= 0) toprocess = 1;
+ }
ln = listFirst(server.io_processed);
j = ln->value;
listDelNode(server.io_processed,ln);
}
}
processed++;
- if (processed == REDIS_MAX_COMPLETED_JOBS_PROCESSED) return;
+ if (processed == toprocess) return;
}
if (retval < 0 && errno != EAGAIN) {
redisLog(REDIS_WARNING,
if (job->canceled) continue; /* Skip this, already canceled. */
if (compareStringObjects(job->key,o) == 0) {
- redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (LIST ID %d)\n",
- (void*)job, (char*)o->ptr, i);
+ redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n",
+ (void*)job, (char*)o->ptr, job->type, i);
/* Mark the pages as free since the swap didn't happened
* or happened but is now discarded. */
- if (job->type == REDIS_IOJOB_DO_SWAP)
+ if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP)
vmMarkPagesFree(job->page,job->pages);
/* Cancel the job. It depends on the list the job is
* living in. */
* fork() in order to BGSAVE or BGREWRITEAOF. */
static void waitEmptyIOJobsQueue(void) {
while(1) {
+ int io_processed_len;
+
lockThreadedIO();
if (listLength(server.io_newjobs) == 0 &&
listLength(server.io_processing) == 0 &&
unlockThreadedIO();
return;
}
+ /* While waiting for empty jobs queue condition we post-process some
+ * finshed job, as I/O threads may be hanging trying to write against
+ * the io_ready_pipe_write FD but there are so much pending jobs that
+ * it's blocking. */
+ io_processed_len = listLength(server.io_processed);
unlockThreadedIO();
- usleep(10000); /* 10 milliseconds */
+ if (io_processed_len) {
+ vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,NULL,0);
+ usleep(1000); /* 1 millisecond */
+ } else {
+ usleep(10000); /* 10 milliseconds */
+ }
}
}