X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/5f6e1183e7b309b3fd51698523ae424d36fe29d5..8d51fb6a80fb40abd0bb487d71435d3a30c2942e:/src/dscache.c diff --git a/src/dscache.c b/src/dscache.c index 2bda0b50..05112cbb 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -84,6 +84,11 @@ * - What happens with MULTI/EXEC? * * Good question. + * + * - If dsSet() fails on the write thread log the error and reschedule the + * key for flush. + * + * - Check why INCR will not update the LRU info for the object. */ /* Virtual Memory is composed mainly of two subsystems: @@ -117,7 +122,7 @@ void dsInit(void) { zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ - redisLog(REDIS_NOTICE,"Initializing Disk Store at %s", server.ds_path); + redisLog(REDIS_NOTICE,"Opening Disk Store: %s", server.ds_path); /* Open Disk Store */ if (dsOpen() != REDIS_OK) { redisLog(REDIS_WARNING,"Fatal error opening disk store. Exiting."); @@ -130,6 +135,7 @@ void dsInit(void) { server.io_processed = listCreate(); server.io_ready_clients = listCreate(); pthread_mutex_init(&server.io_mutex,NULL); + pthread_cond_init(&server.io_condvar,NULL); server.io_active_threads = 0; if (pipe(pipefds) == -1) { redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting." @@ -227,6 +233,7 @@ int cacheFreeOneEntry(void) { dbDelete(best_db,kobj); decrRefCount(kobj); } + return REDIS_OK; } /* Return true if it's safe to swap out objects in a given moment. @@ -240,7 +247,8 @@ int dsCanTouchDiskStore(void) { void freeIOJob(iojob *j) { decrRefCount(j->key); - decrRefCount(j->val); + /* j->val can be NULL if the job is about deleting the key from disk. */ + if (j->val) decrRefCount(j->val); zfree(j); } @@ -261,7 +269,6 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, while((retval = read(fd,buf,1)) == 1) { iojob *j; listNode *ln; - struct dictEntry *de; redisLog(REDIS_DEBUG,"Processing I/O completed job"); @@ -279,13 +286,23 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, /* Post process it in the main thread, as there are things we * can do just here to avoid race conditions and/or invasive locks */ - redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr); - de = dictFind(j->db->dict,j->key->ptr); - redisAssert(de != NULL); + redisLog(REDIS_DEBUG,"COMPLETED Job type %s, key: %s", + (j->type == REDIS_IOJOB_LOAD) ? "load" : "save", + (unsigned char*)j->key->ptr); if (j->type == REDIS_IOJOB_LOAD) { - dbAdd(j->db,j->key,j->val); + /* Create the key-value pair in the in-memory database */ + if (j->val != NULL) { + dbAdd(j->db,j->key,j->val); + incrRefCount(j->val); + if (j->expire != -1) setExpire(j->db,j->key,j->expire); + } else { + /* The key does not exist. Create a negative cache entry + * for this key. */ + /* FIXME: add this entry into the negative cache */ + } + /* Handle clients waiting for this key to be loaded. */ + handleClientsBlockedOnSwappedKey(j->db,j->key); freeIOJob(j); - /* FIXME: notify clients waiting for this key */ } else if (j->type == REDIS_IOJOB_SAVE) { redisAssert(j->val->storage == REDIS_DS_SAVING); j->val->storage = REDIS_DS_MEMORY; @@ -315,51 +332,57 @@ void *IOThreadEntryPoint(void *arg) { REDIS_NOTUSED(arg); pthread_detach(pthread_self()); + lockThreadedIO(); while(1) { + /* Wait for more work to do */ + pthread_cond_wait(&server.io_condvar,&server.io_mutex); /* Get a new job to process */ - lockThreadedIO(); if (listLength(server.io_newjobs) == 0) { - /* No new jobs in queue, exit. */ - redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do", - (long) pthread_self()); - server.io_active_threads--; + /* No new jobs in queue, reiterate. */ unlockThreadedIO(); - return NULL; + continue; } ln = listFirst(server.io_newjobs); j = ln->value; listDelNode(server.io_newjobs,ln); /* Add the job in the processing queue */ - j->thread = pthread_self(); listAddNodeTail(server.io_processing,j); ln = listLast(server.io_processing); /* We use ln later to remove it */ unlockThreadedIO(); - redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'", - (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr); + + redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'", + (long) pthread_self(), + (j->type == REDIS_IOJOB_LOAD) ? "load" : "save", + (void*)j, (char*)j->key->ptr); /* Process the Job */ if (j->type == REDIS_IOJOB_LOAD) { - vmpointer *vp = (vmpointer*)j->id; - j->val = vmReadObjectFromSwap(j->page,vp->vtype); - } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { - j->pages = rdbSavedObjectPages(j->val); - } else if (j->type == REDIS_IOJOB_DO_SWAP) { - if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR) - j->canceled = 1; + time_t expire; + + j->val = dsGet(j->db,j->key,&expire); + if (j->val) j->expire = expire; + } else if (j->type == REDIS_IOJOB_SAVE) { + redisAssert(j->val->storage == REDIS_DS_SAVING); + if (j->val) + dsSet(j->db,j->key,j->val); + else + dsDel(j->db,j->key); } /* Done: insert the job into the processed queue */ redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)", (long) pthread_self(), (void*)j, (char*)j->key->ptr); + lockThreadedIO(); listDelNode(server.io_processing,ln); listAddNodeTail(server.io_processed,j); - unlockThreadedIO(); /* Signal the main thread there is new stuff to process */ redisAssert(write(server.io_ready_pipe_write,"x",1) == 1); } - return NULL; /* never reached */ + /* never reached, but that's the full pattern... */ + unlockThreadedIO(); + return NULL; } void spawnIOThread(void) { @@ -381,16 +404,14 @@ void spawnIOThread(void) { server.io_active_threads++; } -/* We need to wait for the last thread to exit before we are able to - * fork() in order to BGSAVE or BGREWRITEAOF. */ +/* Wait that all the pending IO Jobs are processed */ void waitEmptyIOJobsQueue(void) { while(1) { int io_processed_len; lockThreadedIO(); if (listLength(server.io_newjobs) == 0 && - listLength(server.io_processing) == 0 && - server.io_active_threads == 0) + listLength(server.io_processing) == 0) { unlockThreadedIO(); return; @@ -411,6 +432,21 @@ void waitEmptyIOJobsQueue(void) { } } +/* Process all the IO Jobs already completed by threads but still waiting + * processing from the main thread. */ +void processAllPendingIOJobs(void) { + while(1) { + int io_processed_len; + + lockThreadedIO(); + io_processed_len = listLength(server.io_processed); + unlockThreadedIO(); + if (io_processed_len == 0) return; + vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read, + (void*)0xdeadbeef,0); + } +} + /* This function must be called while with threaded IO locked */ void queueIOJob(iojob *j) { redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n", @@ -420,52 +456,119 @@ void queueIOJob(iojob *j) { spawnIOThread(); } -int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { +void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) { iojob *j; j = zmalloc(sizeof(*j)); - j->type = REDIS_IOJOB_PREPARE_SWAP; + j->type = type; j->db = db; j->key = key; incrRefCount(key); - j->id = j->val = val; - incrRefCount(val); - j->canceled = 0; - j->thread = (pthread_t) -1; - val->storage = REDIS_VM_SWAPPING; + j->val = val; + if (val) incrRefCount(val); lockThreadedIO(); queueIOJob(j); + pthread_cond_signal(&server.io_condvar); unlockThreadedIO(); - return REDIS_OK; +} + +void cacheScheduleForFlush(redisDb *db, robj *key) { + dirtykey *dk; + dictEntry *de; + + de = dictFind(db->dict,key->ptr); + if (de) { + robj *val = dictGetEntryVal(de); + if (val->storage == REDIS_DS_DIRTY) + return; + else + val->storage = REDIS_DS_DIRTY; + } + + redisLog(REDIS_DEBUG,"Scheduling key %s for saving",key->ptr); + dk = zmalloc(sizeof(*dk)); + dk->db = db; + dk->key = key; + incrRefCount(key); + dk->ctime = time(NULL); + listAddNodeTail(server.cache_flush_queue, dk); +} + +void cacheCron(void) { + time_t now = time(NULL); + listNode *ln; + + /* Sync stuff on disk */ + while((ln = listFirst(server.cache_flush_queue)) != NULL) { + dirtykey *dk = ln->value; + + if ((now - dk->ctime) >= server.cache_flush_delay) { + struct dictEntry *de; + robj *val; + + redisLog(REDIS_DEBUG,"Creating IO Job to save key %s",dk->key->ptr); + + /* Lookup the key, in order to put the current value in the IO + * Job and mark ti as DS_SAVING. + * Otherwise if the key does not exists we schedule a disk store + * delete operation, setting the value to NULL. */ + de = dictFind(dk->db->dict,dk->key->ptr); + if (de) { + val = dictGetEntryVal(de); + redisAssert(val->storage == REDIS_DS_DIRTY); + val->storage = REDIS_DS_SAVING; + } else { + /* Setting the value to NULL tells the IO thread to delete + * the key on disk. */ + val = NULL; + } + dsCreateIOJob(REDIS_IOJOB_SAVE,dk->db,dk->key,val); + listDelNode(server.cache_flush_queue,ln); + decrRefCount(dk->key); + zfree(dk); + } else { + break; /* too early */ + } + } + + /* Reclaim memory from the object cache */ + while (server.ds_enabled && zmalloc_used_memory() > + server.cache_max_memory) + { + if (cacheFreeOneEntry() == REDIS_ERR) break; + } } /* ============ Virtual Memory - Blocking clients on missing keys =========== */ /* This function makes the clinet 'c' waiting for the key 'key' to be loaded. - * If there is not already a job loading the key, it is craeted. - * The key is added to the io_keys list in the client structure, and also + * If the key is already in memory we don't need to block, regardless + * of the storage of the value object for this key: + * + * - If it's REDIS_DS_MEMORY we have the key in memory. + * - If it's REDIS_DS_DIRTY they key was modified, but still in memory. + * - if it's REDIS_DS_SAVING the key is being saved by an IO Job. When + * the client will lookup the key it will block if the key is still + * in this stage but it's more or less the best we can do. + * + * FIXME: we should try if it's actually better to suspend the client + * accessing an object that is being saved, and awake it only when + * the saving was completed. + * + * Otherwise if the key is not in memory, we block the client and start + * an IO Job to load it: + * + * the key is added to the io_keys list in the client structure, and also * in the hash table mapping swapped keys to waiting clients, that is, * server.io_waited_keys. */ int waitForSwappedKey(redisClient *c, robj *key) { struct dictEntry *de; - robj *o; list *l; - /* If the key does not exist or is already in RAM we don't need to - * block the client at all. */ + /* Return ASAP if the key is in memory */ de = dictFind(c->db->dict,key->ptr); - if (de == NULL) return 0; - o = dictGetEntryVal(de); - if (o->storage == REDIS_VM_MEMORY) { - return 0; - } else if (o->storage == REDIS_VM_SWAPPING) { - /* We were swapping the key, undo it! */ - vmCancelThreadedIOJob(o); - return 0; - } - - /* OK: the key is either swapped, or being loaded just now. */ + if (de != NULL) return 0; /* Add the key to the list of keys this client is waiting for. * This maps clients to keys they are waiting for. */ @@ -488,25 +591,8 @@ int waitForSwappedKey(redisClient *c, robj *key) { listAddNodeTail(l,c); /* Are we already loading the key from disk? If not create a job */ - if (o->storage == REDIS_VM_SWAPPED) { - iojob *j; - vmpointer *vp = (vmpointer*)o; - - o->storage = REDIS_VM_LOADING; - j = zmalloc(sizeof(*j)); - j->type = REDIS_IOJOB_LOAD; - j->db = c->db; - j->id = (robj*)vp; - j->key = key; - incrRefCount(key); - j->page = vp->page; - j->val = NULL; - j->canceled = 0; - j->thread = (pthread_t) -1; - lockThreadedIO(); - queueIOJob(j); - unlockThreadedIO(); - } + if (de == NULL) + dsCreateIOJob(REDIS_IOJOB_LOAD,c->db,key,NULL); return 1; } @@ -584,7 +670,7 @@ int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) { if (listLength(c->io_keys)) { c->flags |= REDIS_IO_WAIT; aeDeleteFileEvent(server.el,c->fd,AE_READABLE); - server.vm_blocked_clients++; + server.cache_blocked_clients++; return 1; } else { return 0;