*
* - If dsSet() fails on the write thread log the error and reschedule the
* key for flush.
+ *
+ * - Check why INCR will not update the LRU info for the object.
*/
/* Virtual Memory is composed mainly of two subsystems:
server.io_processed = listCreate();
server.io_ready_clients = listCreate();
pthread_mutex_init(&server.io_mutex,NULL);
+ pthread_cond_init(&server.io_condvar,NULL);
server.io_active_threads = 0;
if (pipe(pipefds) == -1) {
redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting."
REDIS_NOTUSED(arg);
pthread_detach(pthread_self());
+ lockThreadedIO();
while(1) {
+ /* Wait for more work to do */
+ pthread_cond_wait(&server.io_condvar,&server.io_mutex);
/* Get a new job to process */
- lockThreadedIO();
if (listLength(server.io_newjobs) == 0) {
- /* No new jobs in queue, exit. */
+ /* No new jobs in queue, reiterate. */
unlockThreadedIO();
- sleep(1);
continue;
}
ln = listFirst(server.io_newjobs);
listAddNodeTail(server.io_processing,j);
ln = listLast(server.io_processing); /* We use ln later to remove it */
unlockThreadedIO();
+
redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
(long) pthread_self(),
(j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
/* Done: insert the job into the processed queue */
redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
(long) pthread_self(), (void*)j, (char*)j->key->ptr);
+
lockThreadedIO();
listDelNode(server.io_processing,ln);
listAddNodeTail(server.io_processed,j);
- unlockThreadedIO();
/* Signal the main thread there is new stuff to process */
redisAssert(write(server.io_ready_pipe_write,"x",1) == 1);
}
- return NULL; /* never reached */
+ /* never reached, but that's the full pattern... */
+ unlockThreadedIO();
+ return NULL;
}
void spawnIOThread(void) {
lockThreadedIO();
queueIOJob(j);
+ pthread_cond_signal(&server.io_condvar);
unlockThreadedIO();
}
robj *createStringObjectFromLongLong(long long value) {
robj *o;
if (value >= 0 && value < REDIS_SHARED_INTEGERS &&
+ !server.ds_enabled &&
pthread_equal(pthread_self(),server.mainthread)) {
incrRefCount(shared.integers[value]);
o = shared.integers[value];
* Note that we also avoid using shared integers when maxmemory is used
* because every object needs to have a private LRU field for the LRU
* algorithm to work well. */
- if (server.ds_enabled == 0 &&
+ if (!server.ds_enabled &&
server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS &&
pthread_equal(pthread_self(),server.mainthread))
{
list *io_processed; /* List of VM I/O jobs already processed */
list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
- pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
+ pthread_cond_t io_condvar; /* I/O threads conditional variable */
pthread_attr_t io_threads_attr; /* attributes for threads creation */
int io_active_threads; /* Number of running I/O threads */
int vm_max_threads; /* Max number of I/O threads running at the same time */