+static int vmSwapOneObjectBlocking() {
+ return vmSwapOneObject(0);
+}
+
+static int vmSwapOneObjectThreaded() {
+ return vmSwapOneObject(1);
+}
+
+/* Return true if it's safe to swap out objects in a given moment.
+ * Basically we don't want to swap objects out while there is a BGSAVE
+ * or a BGAEOREWRITE running in backgroud. */
+static int vmCanSwapOut(void) {
+ return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1);
+}
+
+/* Delete a key if swapped. Returns 1 if the key was found, was swapped
+ * and was deleted. Otherwise 0 is returned. */
+static int deleteIfSwapped(redisDb *db, robj *key) {
+ dictEntry *de;
+ robj *foundkey;
+
+ if ((de = dictFind(db->dict,key)) == NULL) return 0;
+ foundkey = dictGetEntryKey(de);
+ if (foundkey->storage == REDIS_VM_MEMORY) return 0;
+ deleteKey(db,key);
+ return 1;
+}
+
+/* =================== Virtual Memory - Threaded I/O ======================= */
+
+static void freeIOJob(iojob *j) {
+ if (j->type == REDIS_IOJOB_PREPARE_SWAP ||
+ j->type == REDIS_IOJOB_DO_SWAP)
+ decrRefCount(j->val);
+ decrRefCount(j->key);
+ zfree(j);
+}
+
+/* Every time a thread finished a Job, it writes a byte into the write side
+ * of an unix pipe in order to "awake" the main thread, and this function
+ * is called. */
+static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
+ int mask)
+{
+ char buf[1];
+ int retval;
+ int processed = 0;
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+ REDIS_NOTUSED(privdata);
+
+ /* For every byte we read in the read side of the pipe, there is one
+ * I/O job completed to process. */
+ while((retval = read(fd,buf,1)) == 1) {
+ iojob *j;
+ listNode *ln;
+ robj *key;
+ struct dictEntry *de;
+
+ redisLog(REDIS_DEBUG,"Processing I/O completed job");
+
+ /* Get the processed element (the oldest one) */
+ lockThreadedIO();
+ assert(listLength(server.io_processed) != 0);
+ ln = listFirst(server.io_processed);
+ j = ln->value;
+ listDelNode(server.io_processed,ln);
+ unlockThreadedIO();
+ /* If this job is marked as canceled, just ignore it */
+ if (j->canceled) {
+ freeIOJob(j);
+ continue;
+ }
+ /* Post process it in the main thread, as there are things we
+ * can do just here to avoid race conditions and/or invasive locks */
+ redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
+ de = dictFind(j->db->dict,j->key);
+ assert(de != NULL);
+ key = dictGetEntryKey(de);
+ if (j->type == REDIS_IOJOB_LOAD) {
+ /* Key loaded, bring it at home */
+ key->storage = REDIS_VM_MEMORY;
+ key->vm.atime = server.unixtime;
+ vmMarkPagesFree(key->vm.page,key->vm.usedpages);
+ redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
+ (unsigned char*) key->ptr);
+ server.vm_stats_swapped_objects--;
+ server.vm_stats_swapins++;
+ freeIOJob(j);
+ } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
+ /* Now we know the amount of pages required to swap this object.
+ * Let's find some space for it, and queue this task again
+ * rebranded as REDIS_IOJOB_DO_SWAP. */
+ if (vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR) {
+ /* Ooops... no space! */
+ freeIOJob(j);
+ } else {
+ /* Note that we need to mark this pages as used now,
+ * if the job will be canceled, we'll mark them as freed
+ * again. */
+ vmMarkPagesUsed(j->page,j->pages);
+ j->type = REDIS_IOJOB_DO_SWAP;
+ lockThreadedIO();
+ queueIOJob(j);
+ unlockThreadedIO();
+ }
+ } else if (j->type == REDIS_IOJOB_DO_SWAP) {
+ robj *val;
+
+ /* Key swapped. We can finally free some memory. */
+ if (key->storage != REDIS_VM_SWAPPING) {
+ printf("key->storage: %d\n",key->storage);
+ printf("key->name: %s\n",(char*)key->ptr);
+ printf("key->refcount: %d\n",key->refcount);
+ printf("val: %p\n",(void*)j->val);
+ printf("val->type: %d\n",j->val->type);
+ printf("val->ptr: %s\n",(char*)j->val->ptr);
+ }
+ redisAssert(key->storage == REDIS_VM_SWAPPING);
+ val = dictGetEntryVal(de);
+ key->vm.page = j->page;
+ key->vm.usedpages = j->pages;
+ key->storage = REDIS_VM_SWAPPED;
+ key->vtype = j->val->type;
+ decrRefCount(val); /* Deallocate the object from memory. */
+ dictGetEntryVal(de) = NULL;
+ redisLog(REDIS_DEBUG,
+ "VM: object %s swapped out at %lld (%lld pages) (threaded)",
+ (unsigned char*) key->ptr,
+ (unsigned long long) j->page, (unsigned long long) j->pages);
+ server.vm_stats_swapped_objects++;
+ server.vm_stats_swapouts++;
+ freeIOJob(j);
+ /* Put a few more swap requests in queue if we are still
+ * out of memory */
+ if (zmalloc_used_memory() > server.vm_max_memory) {
+ int more = 1;
+ while(more) {
+ lockThreadedIO();
+ more = listLength(server.io_newjobs) <
+ (unsigned) server.vm_max_threads;
+ unlockThreadedIO();
+ /* Don't waste CPU time if swappable objects are rare. */
+ if (vmSwapOneObjectThreaded() == REDIS_ERR) break;
+ }
+ }
+ }
+ processed++;
+ if (processed == REDIS_MAX_COMPLETED_JOBS_PROCESSED) return;
+ }
+ if (retval < 0 && errno != EAGAIN) {
+ redisLog(REDIS_WARNING,
+ "WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
+ strerror(errno));
+ }
+}
+
+static void lockThreadedIO(void) {
+ pthread_mutex_lock(&server.io_mutex);
+}
+
+static void unlockThreadedIO(void) {
+ pthread_mutex_unlock(&server.io_mutex);
+}
+
+/* Remove the specified object from the threaded I/O queue if still not
+ * processed, otherwise make sure to flag it as canceled. */
+static void vmCancelThreadedIOJob(robj *o) {
+ list *lists[3] = {
+ server.io_newjobs, /* 0 */
+ server.io_processing, /* 1 */
+ server.io_processed /* 2 */
+ };
+ int i;
+
+ assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING);
+again:
+ lockThreadedIO();
+ /* Search for a matching key in one of the queues */
+ for (i = 0; i < 3; i++) {
+ listNode *ln;
+ listIter li;
+
+ listRewind(lists[i],&li);
+ while ((ln = listNext(&li)) != NULL) {
+ iojob *job = ln->value;
+
+ if (job->canceled) continue; /* Skip this, already canceled. */
+ if (compareStringObjects(job->key,o) == 0) {
+ redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (LIST ID %d)\n",
+ (void*)job, (char*)o->ptr, i);
+ /* Mark the pages as free since the swap didn't happened
+ * or happened but is now discarded. */
+ if (job->type == REDIS_IOJOB_DO_SWAP)
+ vmMarkPagesFree(job->page,job->pages);
+ /* Cancel the job. It depends on the list the job is
+ * living in. */
+ switch(i) {
+ case 0: /* io_newjobs */
+ /* If the job was yet not processed the best thing to do
+ * is to remove it from the queue at all */
+ freeIOJob(job);
+ listDelNode(lists[i],ln);
+ break;
+ case 1: /* io_processing */
+ /* Oh Shi- the thread is messing with the Job, and
+ * probably with the object if this is a
+ * PREPARE_SWAP or DO_SWAP job. Better to wait for the
+ * job to move into the next queue... */
+ if (job->type != REDIS_IOJOB_LOAD) {
+ /* Yes, we try again and again until the job
+ * is completed. */
+ unlockThreadedIO();
+ /* But let's wait some time for the I/O thread
+ * to finish with this job. After all this condition
+ * should be very rare. */
+ usleep(1);
+ goto again;
+ } else {
+ job->canceled = 1;
+ break;
+ }
+ case 2: /* io_processed */
+ /* The job was already processed, that's easy...
+ * just mark it as canceled so that we'll ignore it
+ * when processing completed jobs. */
+ job->canceled = 1;
+ break;
+ }
+ /* Finally we have to adjust the storage type of the object
+ * in order to "UNDO" the operaiton. */
+ if (o->storage == REDIS_VM_LOADING)
+ o->storage = REDIS_VM_SWAPPED;
+ else if (o->storage == REDIS_VM_SWAPPING)
+ o->storage = REDIS_VM_MEMORY;
+ unlockThreadedIO();
+ return;
+ }
+ }
+ }
+ unlockThreadedIO();
+ assert(1 != 1); /* We should never reach this */
+}
+
+static void *IOThreadEntryPoint(void *arg) {
+ iojob *j;
+ listNode *ln;
+ REDIS_NOTUSED(arg);
+
+ pthread_detach(pthread_self());
+ while(1) {
+ /* Get a new job to process */
+ lockThreadedIO();
+ if (listLength(server.io_newjobs) == 0) {
+#ifdef REDIS_HELGRIND_FRIENDLY
+ /* No new jobs? Wait and retry, because to be Helgrind
+ * (valgrind --tool=helgrind) what's needed is to take
+ * the same threads running instead to create/destroy threads
+ * as needed (otherwise valgrind will fail) */
+ unlockThreadedIO();
+ usleep(1); /* Give some time for the I/O thread to work. */
+ continue;
+#endif
+ /* No new jobs in queue, exit. */
+ redisLog(REDIS_DEBUG,"Thread %lld exiting, nothing to do",
+ (long long) pthread_self());
+ server.io_active_threads--;
+ unlockThreadedIO();
+ return NULL;
+ }
+ ln = listFirst(server.io_newjobs);
+ j = ln->value;
+ listDelNode(server.io_newjobs,ln);
+ /* Add the job in the processing queue */
+ j->thread = pthread_self();
+ listAddNodeTail(server.io_processing,j);
+ ln = listLast(server.io_processing); /* We use ln later to remove it */
+ unlockThreadedIO();
+ redisLog(REDIS_DEBUG,"Thread %lld got a new job (type %d): %p about key '%s'",
+ (long long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
+
+ /* Process the Job */
+ if (j->type == REDIS_IOJOB_LOAD) {
+ } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
+ FILE *fp = fopen("/dev/null","w+");
+ j->pages = rdbSavedObjectPages(j->val,fp);
+ fclose(fp);
+ } else if (j->type == REDIS_IOJOB_DO_SWAP) {
+ if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
+ j->canceled = 1;
+ }
+
+ /* Done: insert the job into the processed queue */
+ redisLog(REDIS_DEBUG,"Thread %lld completed the job: %p (key %s)",
+ (long long) pthread_self(), (void*)j, (char*)j->key->ptr);
+ lockThreadedIO();
+ listDelNode(server.io_processing,ln);
+ listAddNodeTail(server.io_processed,j);
+ unlockThreadedIO();
+
+ /* Signal the main thread there is new stuff to process */
+ assert(write(server.io_ready_pipe_write,"x",1) == 1);
+ }
+ return NULL; /* never reached */
+}
+
+static void spawnIOThread(void) {
+ pthread_t thread;
+
+ pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL);
+ server.io_active_threads++;
+}
+
+/* We need to wait for the last thread to exit before we are able to
+ * fork() in order to BGSAVE or BGREWRITEAOF. */
+static void waitZeroActiveThreads(void) {
+ while(1) {
+ lockThreadedIO();
+ if (server.io_active_threads == 0) {
+ unlockThreadedIO();
+ return;
+ }
+ unlockThreadedIO();
+ usleep(10000); /* 10 milliseconds */
+ }
+}
+
+/* This function must be called while with threaded IO locked */
+static void queueIOJob(iojob *j) {
+ redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
+ (void*)j, j->type, (char*)j->key->ptr);
+ listAddNodeTail(server.io_newjobs,j);
+ if (server.io_active_threads < server.vm_max_threads)
+ spawnIOThread();
+}
+
+static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
+ iojob *j;
+
+ assert(key->storage == REDIS_VM_MEMORY);
+ assert(key->refcount == 1);
+
+ j = zmalloc(sizeof(*j));
+ j->type = REDIS_IOJOB_PREPARE_SWAP;
+ j->db = db;
+ j->key = dupStringObject(key);
+ j->val = val;
+ incrRefCount(val);
+ j->canceled = 0;
+ j->thread = (pthread_t) -1;
+ key->storage = REDIS_VM_SWAPPING;
+
+ lockThreadedIO();
+ queueIOJob(j);
+ unlockThreadedIO();
+ return REDIS_OK;
+}
+