* - What happens with MULTI/EXEC?
*
* Good question.
+ *
+ * - If dsSet() fails on the write thread log the error and reschedule the
+ * key for flush.
+ *
+ * - Check why INCR will not update the LRU info for the object.
*/
/* Virtual Memory is composed mainly of two subsystems:
zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
- redisLog(REDIS_NOTICE,"Initializing Disk Store at %s", server.ds_path);
+ redisLog(REDIS_NOTICE,"Opening Disk Store: %s", server.ds_path);
/* Open Disk Store */
if (dsOpen() != REDIS_OK) {
redisLog(REDIS_WARNING,"Fatal error opening disk store. Exiting.");
server.io_processed = listCreate();
server.io_ready_clients = listCreate();
pthread_mutex_init(&server.io_mutex,NULL);
+ pthread_cond_init(&server.io_condvar,NULL);
server.io_active_threads = 0;
if (pipe(pipefds) == -1) {
redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting."
while((retval = read(fd,buf,1)) == 1) {
iojob *j;
listNode *ln;
- struct dictEntry *de;
redisLog(REDIS_DEBUG,"Processing I/O completed job");
redisLog(REDIS_DEBUG,"COMPLETED Job type %s, key: %s",
(j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
(unsigned char*)j->key->ptr);
- de = dictFind(j->db->dict,j->key->ptr);
- redisAssert(de != NULL);
if (j->type == REDIS_IOJOB_LOAD) {
/* Create the key-value pair in the in-memory database */
- dbAdd(j->db,j->key,j->val);
+ if (j->val != NULL) {
+ dbAdd(j->db,j->key,j->val);
+ incrRefCount(j->val);
+ if (j->expire != -1) setExpire(j->db,j->key,j->expire);
+ } else {
+ /* The key does not exist. Create a negative cache entry
+ * for this key. */
+ /* FIXME: add this entry into the negative cache */
+ }
/* Handle clients waiting for this key to be loaded. */
handleClientsBlockedOnSwappedKey(j->db,j->key);
freeIOJob(j);
REDIS_NOTUSED(arg);
pthread_detach(pthread_self());
+ lockThreadedIO();
while(1) {
+ /* Wait for more work to do */
+ pthread_cond_wait(&server.io_condvar,&server.io_mutex);
/* Get a new job to process */
- lockThreadedIO();
if (listLength(server.io_newjobs) == 0) {
- /* No new jobs in queue, exit. */
- redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
- (long) pthread_self());
- server.io_active_threads--;
+ /* No new jobs in queue, reiterate. */
unlockThreadedIO();
- return NULL;
+ continue;
}
ln = listFirst(server.io_newjobs);
j = ln->value;
listAddNodeTail(server.io_processing,j);
ln = listLast(server.io_processing); /* We use ln later to remove it */
unlockThreadedIO();
+
redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
(long) pthread_self(),
(j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
/* Process the Job */
if (j->type == REDIS_IOJOB_LOAD) {
- j->val = dsGet(j->db,j->key);
- redisAssert(j->val != NULL);
+ time_t expire;
+
+ j->val = dsGet(j->db,j->key,&expire);
+ if (j->val) j->expire = expire;
} else if (j->type == REDIS_IOJOB_SAVE) {
redisAssert(j->val->storage == REDIS_DS_SAVING);
if (j->val)
/* Done: insert the job into the processed queue */
redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
(long) pthread_self(), (void*)j, (char*)j->key->ptr);
+
lockThreadedIO();
listDelNode(server.io_processing,ln);
listAddNodeTail(server.io_processed,j);
- unlockThreadedIO();
/* Signal the main thread there is new stuff to process */
redisAssert(write(server.io_ready_pipe_write,"x",1) == 1);
}
- return NULL; /* never reached */
+ /* never reached, but that's the full pattern... */
+ unlockThreadedIO();
+ return NULL;
}
void spawnIOThread(void) {
server.io_active_threads++;
}
-/* We need to wait for the last thread to exit before we are able to
- * fork() in order to BGSAVE or BGREWRITEAOF. */
+/* Wait that all the pending IO Jobs are processed */
void waitEmptyIOJobsQueue(void) {
while(1) {
int io_processed_len;
lockThreadedIO();
if (listLength(server.io_newjobs) == 0 &&
- listLength(server.io_processing) == 0 &&
- server.io_active_threads == 0)
+ listLength(server.io_processing) == 0)
{
unlockThreadedIO();
return;
}
}
+/* Process all the IO Jobs already completed by threads but still waiting
+ * processing from the main thread. */
+void processAllPendingIOJobs(void) {
+ while(1) {
+ int io_processed_len;
+
+ lockThreadedIO();
+ io_processed_len = listLength(server.io_processed);
+ unlockThreadedIO();
+ if (io_processed_len == 0) return;
+ vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,
+ (void*)0xdeadbeef,0);
+ }
+}
+
/* This function must be called while with threaded IO locked */
void queueIOJob(iojob *j) {
redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
j->key = key;
incrRefCount(key);
j->val = val;
- incrRefCount(val);
+ if (val) incrRefCount(val);
lockThreadedIO();
queueIOJob(j);
+ pthread_cond_signal(&server.io_condvar);
unlockThreadedIO();
}
dk->key = key;
incrRefCount(key);
dk->ctime = time(NULL);
- listAddNodeTail(server.cache_flush_queue, key);
+ listAddNodeTail(server.cache_flush_queue, dk);
}
void cacheCron(void) {
redisLog(REDIS_DEBUG,"Creating IO Job to save key %s",dk->key->ptr);
- /* Lookup the key. We need to check if it's still here and
- * possibly access to the value. */
+ /* Lookup the key, in order to put the current value in the IO
+ * Job and mark ti as DS_SAVING.
+ * Otherwise if the key does not exists we schedule a disk store
+ * delete operation, setting the value to NULL. */
de = dictFind(dk->db->dict,dk->key->ptr);
if (de) {
val = dictGetEntryVal(de);
}
dsCreateIOJob(REDIS_IOJOB_SAVE,dk->db,dk->key,val);
listDelNode(server.cache_flush_queue,ln);
+ decrRefCount(dk->key);
+ zfree(dk);
} else {
break; /* too early */
}