#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
-/* Disk store cache object->storage values */
-#define REDIS_DS_MEMORY 0 /* The object is on memory */
-#define REDIS_DS_DIRTY 1 /* The object was modified */
-#define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */
+/* Scheduled IO opeations flags. */
+#define REDIS_IO_LOAD 1
+#define REDIS_IO_SAVE 2
+#define REDIS_IO_LOADINPROG 4
+#define REDIS_IO_SAVEINPROG 8
#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
+#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
/* Client flags */
#define REDIS_SLAVE 1 /* This client is a slave server */
#define REDIS_LRU_CLOCK_RESOLUTION 10 /* LRU clock resolution in seconds */
typedef struct redisObject {
unsigned type:4;
- unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */
+ unsigned notused:2; /* Not used */
unsigned encoding:4;
unsigned lru:22; /* lru time (relative to server.lruclock) */
int refcount;
_var.type = REDIS_STRING; \
_var.encoding = REDIS_ENCODING_RAW; \
_var.ptr = _ptr; \
- _var.storage = REDIS_DS_MEMORY; \
} while(0);
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
- dict *io_keys; /* Keys with clients waiting for VM I/O */
+ dict *io_keys; /* Keys with clients waiting for DS I/O */
+ dict *io_negcache; /* Negative caching for disk store */
+ dict *io_queued; /* Queued IO operations hash table */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
/* Blocked clients */
unsigned int bpop_blocked_clients;
unsigned int cache_blocked_clients;
- list *unblocked_clients;
+ list *unblocked_clients; /* list of clients to unblock before next loop */
+ list *cache_io_queue; /* IO operations queue */
+ int cache_flush_delay; /* seconds to wait before flushing keys */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
list *io_processed; /* List of VM I/O jobs already processed */
list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
- pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
+ pthread_cond_t io_condvar; /* I/O threads conditional variable */
pthread_attr_t io_threads_attr; /* attributes for threads creation */
int io_active_threads; /* Number of running I/O threads */
int vm_max_threads; /* Max number of I/O threads running at the same time */
zskiplist *zsl;
} zset;
-/* VM threaded I/O request message */
-#define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */
-#define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */
-#define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */
+/* DIsk store threaded I/O request message */
+#define REDIS_IOJOB_LOAD 0
+#define REDIS_IOJOB_SAVE 1
+
typedef struct iojob {
int type; /* Request type, REDIS_IOJOB_* */
redisDb *db;/* Redis database */
- robj *key; /* This I/O request is about swapping this key */
- robj *id; /* Unique identifier of this job:
- this is the object to swap for REDIS_IOREQ_*_SWAP, or the
- vmpointer objct for REDIS_IOREQ_LOAD. */
- robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
- * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
- off_t page; /* Swap page where to read/write the object */
- off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */
- int canceled; /* True if this command was canceled by blocking side of VM */
- pthread_t thread; /* ID of the thread processing this entry */
+ robj *key; /* This I/O request is about this key */
+ robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this
+ * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */
+ time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */
} iojob;
+/* IO operations scheduled -- check dscache.c for more info */
+typedef struct ioop {
+ int type;
+ redisDb *db;
+ robj *key;
+ time_t ctime; /* This is the creation time of the entry. */
+} ioop;
+
/* Structure to hold list iteration abstraction. */
typedef struct {
robj *subject;
off_t rdbSavedObjectPages(robj *o);
robj *rdbLoadObject(int type, FILE *fp);
void backgroundSaveDoneHandler(int statloc);
+int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, time_t now);
+int rdbLoadType(FILE *fp);
+time_t rdbLoadTime(FILE *fp);
+robj *rdbLoadStringObject(FILE *fp);
/* AOF persistence */
void flushAppendOnlyFile(void);
int dsOpen(void);
int dsClose(void);
int dsSet(redisDb *db, robj *key, robj *val);
-robj *dsGet(redisDb *db, robj *key);
+robj *dsGet(redisDb *db, robj *key, time_t *expire);
+int dsDel(redisDb *db, robj *key);
int dsExists(redisDb *db, robj *key);
+void dsFlushDb(int dbid);
/* Disk Store Cache */
-void vmInit(void);
-void vmMarkPagesFree(off_t page, off_t count);
-robj *vmLoadObject(robj *o);
-robj *vmPreviewObject(robj *o);
-int vmSwapOneObjectBlocking(void);
-int vmSwapOneObjectThreaded(void);
-int vmCanSwapOut(void);
+void dsInit(void);
void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask);
-void vmCancelThreadedIOJob(robj *o);
void lockThreadedIO(void);
void unlockThreadedIO(void);
-int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db);
void freeIOJob(iojob *j);
void queueIOJob(iojob *j);
-int vmWriteObjectOnSwap(robj *o, off_t page);
-robj *vmReadObjectFromSwap(off_t page, int type);
void waitEmptyIOJobsQueue(void);
-void vmReopenSwapFile(void);
-int vmFreePage(off_t page);
+void processAllPendingIOJobs(void);
void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
int dontWaitForSwappedKey(redisClient *c, robj *key);
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
-vmpointer *vmSwapObjectBlocking(robj *val);
+int cacheFreeOneEntry(void);
+void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag);
+void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag);
+int cacheScheduleIOGetFlags(redisDb *db, robj *key);
+void cacheScheduleIO(redisDb *db, robj *key, int type);
+void cacheCron(void);
+int cacheKeyMayExist(redisDb *db, robj *key);
+void cacheSetKeyMayExist(redisDb *db, robj *key);
+void cacheSetKeyDoesNotExist(redisDb *db, robj *key);
/* Set data type */
robj *setTypeCreate(robj *value);
int dbDelete(redisDb *db, robj *key);
long long emptyDb();
int selectDb(redisClient *c, int id);
+void signalModifiedKey(redisDb *db, robj *key);
+void signalFlushedDb(int dbid);
/* Git SHA1 */
char *redisGitSHA1(void);