X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/f2da3a620ce7d9462613a93c07022bb805eb5c90..ddbc81af33d6b3cda29cedf39e5587edf33b7ec1:/src/redis.h diff --git a/src/redis.h b/src/redis.h index 8dd46169..d9a4b912 100644 --- a/src/redis.h +++ b/src/redis.h @@ -125,6 +125,7 @@ #define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */ #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 +#define REDIS_THREAD_STACK_SIZE (1024*1024*4) /* Client flags */ #define REDIS_SLAVE 1 /* This client is a slave server */ @@ -430,7 +431,9 @@ struct redisServer { /* Blocked clients */ unsigned int bpop_blocked_clients; unsigned int cache_blocked_clients; - list *unblocked_clients; + list *unblocked_clients; /* list of clients to unblock before next loop */ + list *cache_flush_queue; /* keys to flush on disk */ + int cache_flush_delay; /* seconds to wait before flushing keys */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -542,24 +545,25 @@ typedef struct zset { } zset; /* VM threaded I/O request message */ -#define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */ -#define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */ -#define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */ +#define REDIS_IOJOB_LOAD 0 +#define REDIS_IOJOB_SAVE 1 + typedef struct iojob { int type; /* Request type, REDIS_IOJOB_* */ redisDb *db;/* Redis database */ - robj *key; /* This I/O request is about swapping this key */ - robj *id; /* Unique identifier of this job: - this is the object to swap for REDIS_IOREQ_*_SWAP, or the - vmpointer objct for REDIS_IOREQ_LOAD. */ - robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this - * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */ - off_t page; /* Swap page where to read/write the object */ - off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */ - int canceled; /* True if this command was canceled by blocking side of VM */ - pthread_t thread; /* ID of the thread processing this entry */ + robj *key; /* This I/O request is about this key */ + robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this + * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */ } iojob; +/* When diskstore is enabled and a flush operation is requested we push + * one of this structures into server.cache_flush_queue. */ +typedef struct dirtykey { + redisDb *db; + robj *key; + time_t ctime; /* This is the creation time of the entry. */ +} dirtykey; + /* Structure to hold list iteration abstraction. */ typedef struct { robj *subject; @@ -778,34 +782,26 @@ int dsOpen(void); int dsClose(void); int dsSet(redisDb *db, robj *key, robj *val); robj *dsGet(redisDb *db, robj *key); +int dsDel(redisDb *db, robj *key); int dsExists(redisDb *db, robj *key); +int dsFlushDb(int dbid); /* Disk Store Cache */ -void vmInit(void); -void vmMarkPagesFree(off_t page, off_t count); -robj *vmLoadObject(robj *o); -robj *vmPreviewObject(robj *o); -int vmSwapOneObjectBlocking(void); -int vmSwapOneObjectThreaded(void); -int vmCanSwapOut(void); +void dsInit(void); void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask); -void vmCancelThreadedIOJob(robj *o); void lockThreadedIO(void); void unlockThreadedIO(void); -int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db); void freeIOJob(iojob *j); void queueIOJob(iojob *j); -int vmWriteObjectOnSwap(robj *o, off_t page); -robj *vmReadObjectFromSwap(off_t page, int type); void waitEmptyIOJobsQueue(void); -void vmReopenSwapFile(void); -int vmFreePage(off_t page); void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); int dontWaitForSwappedKey(redisClient *c, robj *key); void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); -vmpointer *vmSwapObjectBlocking(robj *val); +int cacheFreeOneEntry(void); +void cacheScheduleForFlush(redisDb *db, robj *key); +void cacheCron(void); /* Set data type */ robj *setTypeCreate(robj *value); @@ -876,6 +872,8 @@ robj *dbRandomKey(redisDb *db); int dbDelete(redisDb *db, robj *key); long long emptyDb(); int selectDb(redisClient *c, int id); +void signalModifiedKey(redisDb *db, robj *key); +void signalFlushedDb(int dbid); /* Git SHA1 */ char *redisGitSHA1(void);