while((retval = read(fd,buf,1)) == 1) {
iojob *j;
listNode *ln;
- struct dictEntry *de;
redisLog(REDIS_DEBUG,"Processing I/O completed job");
redisLog(REDIS_DEBUG,"COMPLETED Job type %s, key: %s",
(j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
(unsigned char*)j->key->ptr);
- de = dictFind(j->db->dict,j->key->ptr);
- redisAssert(de != NULL);
if (j->type == REDIS_IOJOB_LOAD) {
/* Create the key-value pair in the in-memory database */
dbAdd(j->db,j->key,j->val);
+ incrRefCount(j->val);
/* Handle clients waiting for this key to be loaded. */
handleClientsBlockedOnSwappedKey(j->db,j->key);
freeIOJob(j);
lockThreadedIO();
if (listLength(server.io_newjobs) == 0) {
/* No new jobs in queue, exit. */
- redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
- (long) pthread_self());
- server.io_active_threads--;
unlockThreadedIO();
- return NULL;
+ sleep(1);
+ continue;
}
ln = listFirst(server.io_newjobs);
j = ln->value;
j->key = key;
incrRefCount(key);
j->val = val;
- incrRefCount(val);
+ if (val) incrRefCount(val);
lockThreadedIO();
queueIOJob(j);
kill(server.bgsavechildpid,SIGKILL);
rdbRemoveTempFile(server.bgsavechildpid);
}
- if (server.appendonly) {
+ if (server.ds_enabled) {
+ /* FIXME: flush all objects on disk */
+ } else if (server.appendonly) {
/* Append only file: fsync() the AOF and exit */
aof_fsync(server.appendfd);
} else if (server.saveparamslen > 0) {