X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/f34a6cd85e3327bf4fbe94ba46d6e7fefd7d61c7..d158dc28f6617fd517172e04d3f46ce53d88013e:/src/redis.h diff --git a/src/redis.h b/src/redis.h index e12b1c18..b15bb370 100644 --- a/src/redis.h +++ b/src/redis.h @@ -119,10 +119,11 @@ #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ -/* Disk store cache object->storage values */ -#define REDIS_DS_MEMORY 0 /* The object is on memory */ -#define REDIS_DS_DIRTY 1 /* The object was modified */ -#define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */ +/* Scheduled IO opeations flags. */ +#define REDIS_IO_LOAD 1 +#define REDIS_IO_SAVE 2 +#define REDIS_IO_LOADINPROG 4 +#define REDIS_IO_SAVEINPROG 8 #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 #define REDIS_THREAD_STACK_SIZE (1024*1024*4) @@ -220,7 +221,7 @@ void _redisPanic(char *msg, char *file, int line); #define REDIS_LRU_CLOCK_RESOLUTION 10 /* LRU clock resolution in seconds */ typedef struct redisObject { unsigned type:4; - unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */ + unsigned notused:2; /* Not used */ unsigned encoding:4; unsigned lru:22; /* lru time (relative to server.lruclock) */ int refcount; @@ -261,14 +262,15 @@ typedef struct vmPointer { _var.type = REDIS_STRING; \ _var.encoding = REDIS_ENCODING_RAW; \ _var.ptr = _ptr; \ - _var.storage = REDIS_DS_MEMORY; \ } while(0); typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ - dict *io_keys; /* Keys with clients waiting for VM I/O */ + dict *io_keys; /* Keys with clients waiting for DS I/O */ + dict *io_negcache; /* Negative caching for disk store */ + dict *io_queued; /* Queued IO operations hash table */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -431,7 +433,9 @@ struct redisServer { /* Blocked clients */ unsigned int bpop_blocked_clients; unsigned int cache_blocked_clients; - list *unblocked_clients; + list *unblocked_clients; /* list of clients to unblock before next loop */ + list *cache_io_queue; /* IO operations queue */ + int cache_flush_delay; /* seconds to wait before flushing keys */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -457,7 +461,7 @@ struct redisServer { list *io_processed; /* List of VM I/O jobs already processed */ list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */ pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ - pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */ + pthread_cond_t io_condvar; /* I/O threads conditional variable */ pthread_attr_t io_threads_attr; /* attributes for threads creation */ int io_active_threads; /* Number of running I/O threads */ int vm_max_threads; /* Max number of I/O threads running at the same time */ @@ -542,7 +546,7 @@ typedef struct zset { zskiplist *zsl; } zset; -/* VM threaded I/O request message */ +/* DIsk store threaded I/O request message */ #define REDIS_IOJOB_LOAD 0 #define REDIS_IOJOB_SAVE 1 @@ -552,8 +556,17 @@ typedef struct iojob { robj *key; /* This I/O request is about this key */ robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */ + time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */ } iojob; +/* IO operations scheduled -- check dscache.c for more info */ +typedef struct ioop { + int type; + redisDb *db; + robj *key; + time_t ctime; /* This is the creation time of the entry. */ +} ioop; + /* Structure to hold list iteration abstraction. */ typedef struct { robj *subject; @@ -736,6 +749,10 @@ off_t rdbSavedObjectLen(robj *o); off_t rdbSavedObjectPages(robj *o); robj *rdbLoadObject(int type, FILE *fp); void backgroundSaveDoneHandler(int statloc); +int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, time_t now); +int rdbLoadType(FILE *fp); +time_t rdbLoadTime(FILE *fp); +robj *rdbLoadStringObject(FILE *fp); /* AOF persistence */ void flushAppendOnlyFile(void); @@ -771,35 +788,34 @@ void populateCommandTable(void); int dsOpen(void); int dsClose(void); int dsSet(redisDb *db, robj *key, robj *val); -robj *dsGet(redisDb *db, robj *key); +robj *dsGet(redisDb *db, robj *key, time_t *expire); +int dsDel(redisDb *db, robj *key); int dsExists(redisDb *db, robj *key); +void dsFlushDb(int dbid); /* Disk Store Cache */ -void vmInit(void); -void vmMarkPagesFree(off_t page, off_t count); -robj *vmLoadObject(robj *o); -robj *vmPreviewObject(robj *o); -int vmSwapOneObjectBlocking(void); -int vmSwapOneObjectThreaded(void); -int vmCanSwapOut(void); +void dsInit(void); void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask); -void vmCancelThreadedIOJob(robj *o); void lockThreadedIO(void); void unlockThreadedIO(void); -int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db); void freeIOJob(iojob *j); void queueIOJob(iojob *j); -int vmWriteObjectOnSwap(robj *o, off_t page); -robj *vmReadObjectFromSwap(off_t page, int type); void waitEmptyIOJobsQueue(void); -void vmReopenSwapFile(void); -int vmFreePage(off_t page); +void processAllPendingIOJobs(void); void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); int dontWaitForSwappedKey(redisClient *c, robj *key); void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); -vmpointer *vmSwapObjectBlocking(robj *val); +int cacheFreeOneEntry(void); +void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag); +void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag); +int cacheScheduleIOGetFlags(redisDb *db, robj *key); +void cacheScheduleIO(redisDb *db, robj *key, int type); +void cacheCron(void); +int cacheKeyMayExist(redisDb *db, robj *key); +void cacheSetKeyMayExist(redisDb *db, robj *key); +void cacheSetKeyDoesNotExist(redisDb *db, robj *key); /* Set data type */ robj *setTypeCreate(robj *value); @@ -870,6 +886,8 @@ robj *dbRandomKey(redisDb *db); int dbDelete(redisDb *db, robj *key); long long emptyDb(); int selectDb(redisClient *c, int id); +void signalModifiedKey(redisDb *db, robj *key); +void signalFlushedDb(int dbid); /* Git SHA1 */ char *redisGitSHA1(void);