* as a fully non-blocking VM.
*/
+void spawnIOThread(void);
+
/* =================== Virtual Memory - Blocking Side ====================== */
void dsInit(void) {
- off_t totsize;
int pipefds[2];
size_t stacksize;
- struct flock fl;
zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
/* =================== Virtual Memory - Threaded I/O ======================= */
void freeIOJob(iojob *j) {
- if ((j->type == REDIS_IOJOB_PREPARE_SWAP ||
- j->type == REDIS_IOJOB_DO_SWAP ||
- j->type == REDIS_IOJOB_LOAD) && j->val != NULL)
- {
- /* we fix the storage type, otherwise decrRefCount() will try to
- * kill the I/O thread Job (that does no longer exists). */
- if (j->val->storage == REDIS_VM_SWAPPING)
- j->val->storage = REDIS_VM_MEMORY;
- decrRefCount(j->val);
- }
decrRefCount(j->key);
zfree(j);
}
/* Every time a thread finished a Job, it writes a byte into the write side
* of an unix pipe in order to "awake" the main thread, and this function
- * is called.
- *
- * Note that this is called both by the event loop, when a I/O thread
- * sends a byte in the notification pipe, and is also directly called from
- * waitEmptyIOJobsQueue().
- *
- * In the latter case we don't want to swap more, so we use the
- * "privdata" argument setting it to a not NULL value to signal this
- * condition. */
+ * is called. */
void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
int mask)
{
char buf[1];
- int retval, processed = 0, toprocess = -1, trytoswap = 1;
+ int retval, processed = 0, toprocess = -1;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
- if (privdata != NULL) trytoswap = 0; /* check the comments above... */
-
/* For every byte we read in the read side of the pipe, there is one
* I/O job completed to process. */
while((retval = read(fd,buf,1)) == 1) {
j = ln->value;
listDelNode(server.io_processed,ln);
unlockThreadedIO();
- /* If this job is marked as canceled, just ignore it */
- if (j->canceled) {
- freeIOJob(j);
- continue;
- }
+
/* Post process it in the main thread, as there are things we
* can do just here to avoid race conditions and/or invasive locks */
redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr);
handleClientsBlockedOnSwappedKey(db,j->key);
freeIOJob(j);
zfree(vp);
- } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
- /* Now we know the amount of pages required to swap this object.
- * Let's find some space for it, and queue this task again
- * rebranded as REDIS_IOJOB_DO_SWAP. */
- if (!vmCanSwapOut() ||
- vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
- {
- /* Ooops... no space or we can't swap as there is
- * a fork()ed Redis trying to save stuff on disk. */
- j->val->storage = REDIS_VM_MEMORY; /* undo operation */
- freeIOJob(j);
- } else {
- /* Note that we need to mark this pages as used now,
- * if the job will be canceled, we'll mark them as freed
- * again. */
- vmMarkPagesUsed(j->page,j->pages);
- j->type = REDIS_IOJOB_DO_SWAP;
- lockThreadedIO();
- queueIOJob(j);
- unlockThreadedIO();
- }
} else if (j->type == REDIS_IOJOB_DO_SWAP) {
vmpointer *vp;