+/* =================== Virtual Memory - Threaded I/O ======================= */
+
+static void freeIOJob(iojob *j) {
+ if (j->type == REDIS_IOJOB_PREPARE_SWAP ||
+ j->type == REDIS_IOJOB_DO_SWAP)
+ decrRefCount(j->val);
+ decrRefCount(j->key);
+ zfree(j);
+}
+
+/* Every time a thread finished a Job, it writes a byte into the write side
+ * of an unix pipe in order to "awake" the main thread, and this function
+ * is called. */
+static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
+ int mask)
+{
+ char buf[1];
+ int retval;
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+ REDIS_NOTUSED(privdata);
+
+ /* For every byte we read in the read side of the pipe, there is one
+ * I/O job completed to process. */
+ while((retval = read(fd,buf,1)) == 1) {
+ iojob *j;
+ listNode *ln;
+ robj *key;
+ struct dictEntry *de;
+
+ redisLog(REDIS_DEBUG,"Processing I/O completed job");
+ assert(listLength(server.io_processed) != 0);
+
+ /* Get the processed element (the oldest one) */
+ lockThreadedIO();
+ ln = listFirst(server.io_processed);
+ j = ln->value;
+ listDelNode(server.io_processed,ln);
+ unlockThreadedIO();
+ /* If this job is marked as canceled, just ignore it */
+ if (j->canceled) {
+ freeIOJob(j);
+ continue;
+ }
+ /* Post process it in the main thread, as there are things we
+ * can do just here to avoid race conditions and/or invasive locks */
+ redisLog(REDIS_DEBUG,"Job type: %d, key at %p (%s) refcount: %d\n", j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
+ de = dictFind(j->db->dict,j->key);
+ assert(de != NULL);
+ key = dictGetEntryKey(de);
+ if (j->type == REDIS_IOJOB_LOAD) {
+ /* Key loaded, bring it at home */
+ key->storage = REDIS_VM_MEMORY;
+ key->vm.atime = server.unixtime;
+ vmMarkPagesFree(key->vm.page,key->vm.usedpages);
+ redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
+ (unsigned char*) key->ptr);
+ server.vm_stats_swapped_objects--;
+ server.vm_stats_swapins++;
+ freeIOJob(j);
+ } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
+ /* Now we know the amount of pages required to swap this object.
+ * Let's find some space for it, and queue this task again
+ * rebranded as REDIS_IOJOB_DO_SWAP. */
+ if (vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR) {
+ /* Ooops... no space! */
+ freeIOJob(j);
+ } else {
+ j->type = REDIS_IOJOB_DO_SWAP;
+ lockThreadedIO();
+ queueIOJob(j);
+ unlockThreadedIO();
+ }
+ } else if (j->type == REDIS_IOJOB_DO_SWAP) {
+ robj *val;
+
+ /* Key swapped. We can finally free some memory. */
+ val = dictGetEntryVal(de);
+ key->vm.page = j->page;
+ key->vm.usedpages = j->pages;
+ key->storage = REDIS_VM_SWAPPED;
+ key->vtype = j->val->type;
+ decrRefCount(val); /* Deallocate the object from memory. */
+ dictGetEntryVal(de) = NULL;
+ vmMarkPagesUsed(j->page,j->pages);
+ redisLog(REDIS_DEBUG,
+ "VM: object %s swapped out at %lld (%lld pages) (threaded)",
+ (unsigned char*) key->ptr,
+ (unsigned long long) j->page, (unsigned long long) j->pages);
+ server.vm_stats_swapped_objects++;
+ server.vm_stats_swapouts++;
+ freeIOJob(j);
+ /* Put a few more swap requests in queue if we are still
+ * out of memory */
+ if (zmalloc_used_memory() > server.vm_max_memory) {
+ int more = 1;
+ while(more) {
+ lockThreadedIO();
+ more = listLength(server.io_newjobs) <
+ (unsigned) server.vm_max_threads;
+ unlockThreadedIO();
+ /* Don't waste CPU time if swappable objects are rare. */
+ if (vmSwapOneObjectThreaded() == REDIS_ERR) break;
+ }
+ }
+ }
+ }
+ if (retval < 0 && errno != EAGAIN) {
+ redisLog(REDIS_WARNING,
+ "WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
+ strerror(errno));
+ }
+}
+
+static void lockThreadedIO(void) {
+ pthread_mutex_lock(&server.io_mutex);
+}
+
+static void unlockThreadedIO(void) {
+ pthread_mutex_unlock(&server.io_mutex);
+}
+
+/* Remove the specified object from the threaded I/O queue if still not
+ * processed, otherwise make sure to flag it as canceled. */
+static void vmCancelThreadedIOJob(robj *o) {
+ list *lists[3] = {
+ server.io_newjobs, server.io_processing, server.io_processed
+ };
+ int i;
+
+ assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING);
+ lockThreadedIO();
+ /* Search for a matching key in one of the queues */
+ for (i = 0; i < 3; i++) {
+ listNode *ln;
+
+ listRewind(lists[i]);
+ while ((ln = listYield(lists[i])) != NULL) {
+ iojob *job = ln->value;
+
+ if (compareStringObjects(job->key,o) == 0) {
+ switch(i) {
+ case 0: /* io_newjobs */
+ /* If the job was not yet processed the best thing to do
+ * is to remove it from the queue at all */
+ decrRefCount(job->key);
+ if (job->type == REDIS_IOJOB_PREPARE_SWAP ||
+ job->type == REDIS_IOJOB_DO_SWAP)
+ decrRefCount(job->val);
+ listDelNode(lists[i],ln);
+ zfree(job);
+ break;
+ case 1: /* io_processing */
+ case 2: /* io_processed */
+ job->canceled = 1;
+ break;
+ }
+ if (o->storage == REDIS_VM_LOADING)
+ o->storage = REDIS_VM_SWAPPED;
+ else if (o->storage == REDIS_VM_SWAPPING)
+ o->storage = REDIS_VM_MEMORY;
+ unlockThreadedIO();
+ return;
+ }
+ }
+ }
+ unlockThreadedIO();
+ assert(1 != 1); /* We should never reach this */
+}
+
+static void *IOThreadEntryPoint(void *arg) {
+ iojob *j;
+ listNode *ln;
+ REDIS_NOTUSED(arg);
+
+ pthread_detach(pthread_self());
+ while(1) {
+ /* Get a new job to process */
+ lockThreadedIO();
+ if (listLength(server.io_newjobs) == 0) {
+ /* No new jobs in queue, exit. */
+ redisLog(REDIS_DEBUG,"Thread %lld exiting, nothing to do\n",
+ (long long) pthread_self());
+ server.io_active_threads--;
+ unlockThreadedIO();
+ return NULL;
+ }
+ ln = listFirst(server.io_newjobs);
+ j = ln->value;
+ listDelNode(server.io_newjobs,ln);
+ /* Add the job in the processing queue */
+ j->thread = pthread_self();
+ listAddNodeTail(server.io_processing,j);
+ ln = listLast(server.io_processing); /* We use ln later to remove it */
+ unlockThreadedIO();
+ redisLog(REDIS_DEBUG,"Thread %lld got a new job: %p about key '%s'\n",
+ (long long) pthread_self(), (void*)j, (char*)j->key->ptr);
+
+ /* Process the Job */
+ if (j->type == REDIS_IOJOB_LOAD) {
+ } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
+ FILE *fp = fopen("/dev/null","w+");
+ j->pages = rdbSavedObjectPages(j->val,fp);
+ fclose(fp);
+ } else if (j->type == REDIS_IOJOB_DO_SWAP) {
+ }
+
+ /* Done: insert the job into the processed queue */
+ redisLog(REDIS_DEBUG,"Thread %lld completed the job: %p\n",
+ (long long) pthread_self(), (void*)j);
+ lockThreadedIO();
+ listDelNode(server.io_processing,ln);
+ listAddNodeTail(server.io_processed,j);
+ unlockThreadedIO();
+
+ /* Signal the main thread there is new stuff to process */
+ assert(write(server.io_ready_pipe_write,"x",1) == 1);
+ }
+ return NULL; /* never reached */
+}
+
+static void spawnIOThread(void) {
+ pthread_t thread;
+
+ pthread_create(&thread,NULL,IOThreadEntryPoint,NULL);
+ server.io_active_threads++;
+}
+
+/* This function must be called while with threaded IO locked */
+static void queueIOJob(iojob *j) {
+ listAddNodeTail(server.io_newjobs,j);
+ if (server.io_active_threads < server.vm_max_threads)
+ spawnIOThread();
+}
+
+static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
+ iojob *j;
+
+ assert(key->storage == REDIS_VM_MEMORY);
+ assert(key->refcount == 1);
+
+ j = zmalloc(sizeof(*j));
+ j->type = REDIS_IOJOB_PREPARE_SWAP;
+ j->db = db;
+ j->key = dupStringObject(key);
+ j->val = val;
+ incrRefCount(val);
+ j->canceled = 0;
+ j->thread = (pthread_t) -1;
+ key->storage = REDIS_VM_SWAPPING;
+
+ lockThreadedIO();
+ queueIOJob(j);
+ unlockThreadedIO();
+ return REDIS_OK;
+}
+