#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
#define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */
-/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
-#define REDIS_WRITEV_THRESHOLD 3
-/* Max number of iovecs used for each writev call */
-#define REDIS_WRITEV_IOVEC_COUNT 256
-
/* Hash table parameters */
#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
-/* Disk store cache object->storage values */
-#define REDIS_DS_MEMORY 0 /* The object is on memory */
-#define REDIS_DS_DIRTY 1 /* The object was modified */
-#define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */
+/* Scheduled IO opeations flags. */
+#define REDIS_IO_LOAD 1
+#define REDIS_IO_SAVE 2
+#define REDIS_IO_LOADINPROG 4
+#define REDIS_IO_SAVEINPROG 8
+
+/* Generic IO flags */
+#define REDIS_IO_ONLYLOADS 1
+#define REDIS_IO_ASAP 2
#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
#define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */
#define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */
#define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */
+#define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in
+ server.unblocked_clients */
/* Client request types */
#define REDIS_REQ_INLINE 1
#define APPENDFSYNC_EVERYSEC 2
/* Zip structure related defaults */
-#define REDIS_HASH_MAX_ZIPMAP_ENTRIES 64
-#define REDIS_HASH_MAX_ZIPMAP_VALUE 512
+#define REDIS_HASH_MAX_ZIPMAP_ENTRIES 512
+#define REDIS_HASH_MAX_ZIPMAP_VALUE 64
#define REDIS_LIST_MAX_ZIPLIST_ENTRIES 512
#define REDIS_LIST_MAX_ZIPLIST_VALUE 64
#define REDIS_SET_MAX_INTSET_ENTRIES 512
#define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4
#define REDIS_MAXMEMORY_NO_EVICTION 5
+/* Diskstore background saving thread states */
+#define REDIS_BGSAVE_THREAD_UNACTIVE 0
+#define REDIS_BGSAVE_THREAD_ACTIVE 1
+#define REDIS_BGSAVE_THREAD_DONE_OK 2
+#define REDIS_BGSAVE_THREAD_DONE_ERR 3
+
/* We can print the stacktrace, so our assert is defined this way: */
#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
#define redisPanic(_e) _redisPanic(#_e,__FILE__,__LINE__),_exit(1)
#define REDIS_LRU_CLOCK_RESOLUTION 10 /* LRU clock resolution in seconds */
typedef struct redisObject {
unsigned type:4;
- unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */
+ unsigned notused:2; /* Not used */
unsigned encoding:4;
unsigned lru:22; /* lru time (relative to server.lruclock) */
int refcount;
_var.type = REDIS_STRING; \
_var.encoding = REDIS_ENCODING_RAW; \
_var.ptr = _ptr; \
- _var.storage = REDIS_DS_MEMORY; \
} while(0);
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
- dict *io_keys; /* Keys with clients waiting for VM I/O */
+ dict *io_keys; /* Keys with clients waiting for DS I/O */
+ dict *io_negcache; /* Negative caching for disk store */
+ dict *io_queued; /* Queued IO operations hash table */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
/* Global server state structure */
struct redisServer {
+ /* General */
pthread_t mainthread;
+ redisDb *db;
+ dict *commands; /* Command table hahs table */
+ aeEventLoop *el;
+ /* Networking */
int port;
char *bindaddr;
char *unixsocket;
int ipfd;
int sofd;
- redisDb *db;
- long long dirty; /* changes to DB from the last save */
- long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */
list *clients;
- dict *commands; /* Command table hahs table */
+ list *slaves, *monitors;
+ char neterr[ANET_ERR_LEN];
/* RDB / AOF loading information */
int loading;
off_t loading_total_bytes;
time_t loading_start_time;
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand;
- list *slaves, *monitors;
- char neterr[ANET_ERR_LEN];
- aeEventLoop *el;
int cronloops; /* number of times the cron function run */
time_t lastsave; /* Unix time of last save succeeede */
/* Fields used only for stats */
long long stat_keyspace_misses; /* number of failed lookups of keys */
/* Configuration */
int verbosity;
- int glueoutputbuf;
int maxidletime;
int dbnum;
int daemonize;
int appendfsync;
int no_appendfsync_on_rewrite;
int shutdown_asap;
+ int activerehashing;
+ char *requirepass;
+ /* Persistence */
+ long long dirty; /* changes to DB from the last save */
+ long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */
time_t lastfsync;
int appendfd;
int appendseldb;
char *pidfile;
pid_t bgsavechildpid;
pid_t bgrewritechildpid;
+ int bgsavethread_state;
+ pthread_mutex_t bgsavethread_mutex;
+ pthread_t bgsavethread;
sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
sds aofbuf; /* AOF buffer, written before entering the event loop */
struct saveparam *saveparams;
int saveparamslen;
+ char *dbfilename;
+ int rdbcompression;
+ char *appendfilename;
+ /* Logging */
char *logfile;
int syslog_enabled;
char *syslog_ident;
int syslog_facility;
- char *dbfilename;
- char *appendfilename;
- char *requirepass;
- int rdbcompression;
- int activerehashing;
/* Replication related */
int isslave;
/* Slave specific fields */
unsigned int bpop_blocked_clients;
unsigned int cache_blocked_clients;
list *unblocked_clients; /* list of clients to unblock before next loop */
- list *cache_flush_queue; /* keys to flush on disk */
+ list *cache_io_queue; /* IO operations queue */
int cache_flush_delay; /* seconds to wait before flushing keys */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
list *io_processed; /* List of VM I/O jobs already processed */
list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
- pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
+ pthread_cond_t io_condvar; /* I/O threads conditional variable */
pthread_attr_t io_threads_attr; /* attributes for threads creation */
int io_active_threads; /* Number of running I/O threads */
int vm_max_threads; /* Max number of I/O threads running at the same time */
int vm_firstkey; /* The first argument that's a key (0 = no keys) */
int vm_lastkey; /* THe last argument that's a key */
int vm_keystep; /* The step between first and last key */
+ long long microseconds, calls;
};
struct redisFunctionSym {
zskiplist *zsl;
} zset;
-/* VM threaded I/O request message */
+/* DIsk store threaded I/O request message */
#define REDIS_IOJOB_LOAD 0
#define REDIS_IOJOB_SAVE 1
robj *key; /* This I/O request is about this key */
robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this
* field is populated by the I/O thread for REDIS_IOJOB_LOAD. */
+ time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */
} iojob;
-/* When diskstore is enabled and a flush operation is requested we push
- * one of this structures into server.cache_flush_queue. */
-typedef struct dirtykey {
+/* IO operations scheduled -- check dscache.c for more info */
+typedef struct ioop {
+ int type;
redisDb *db;
robj *key;
time_t ctime; /* This is the creation time of the entry. */
-} dirtykey;
+} ioop;
/* Structure to hold list iteration abstraction. */
typedef struct {
void freeClient(redisClient *c);
void resetClient(redisClient *c);
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
-void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
void addReply(redisClient *c, robj *obj);
void *addDeferredMultiBulkLength(redisClient *c);
void setDeferredMultiBulkLength(redisClient *c, void *node, long length);
void addReplyLongLong(redisClient *c, long long ll);
void addReplyMultiBulkLen(redisClient *c, long length);
void *dupClientReplyValue(void *o);
+void getClientsMaxBuffers(unsigned long *longest_output_list,
+ unsigned long *biggest_input_buffer);
#ifdef __GNUC__
void addReplyErrorFormat(redisClient *c, const char *fmt, ...)
off_t rdbSavedObjectLen(robj *o);
off_t rdbSavedObjectPages(robj *o);
robj *rdbLoadObject(int type, FILE *fp);
-void backgroundSaveDoneHandler(int statloc);
+void backgroundSaveDoneHandler(int exitcode, int bysignal);
+int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, time_t expireitme, time_t now);
+int rdbLoadType(FILE *fp);
+time_t rdbLoadTime(FILE *fp);
+robj *rdbLoadStringObject(FILE *fp);
+int rdbSaveType(FILE *fp, unsigned char type);
+int rdbSaveLen(FILE *fp, uint32_t len);
/* AOF persistence */
void flushAppendOnlyFile(void);
int loadAppendOnlyFile(char *filename);
void stopAppendOnly(void);
int startAppendOnly(void);
-void backgroundRewriteDoneHandler(int statloc);
+void backgroundRewriteDoneHandler(int exitcode, int bysignal);
/* Sorted sets data type */
zskiplist *zslCreate(void);
int htNeedsResize(dict *dict);
void oom(const char *msg);
void populateCommandTable(void);
+void resetCommandTableStats(void);
/* Disk store */
int dsOpen(void);
int dsClose(void);
-int dsSet(redisDb *db, robj *key, robj *val);
-robj *dsGet(redisDb *db, robj *key);
+int dsSet(redisDb *db, robj *key, robj *val, time_t expire);
+robj *dsGet(redisDb *db, robj *key, time_t *expire);
int dsDel(redisDb *db, robj *key);
int dsExists(redisDb *db, robj *key);
-int dsFlushDb(int dbid);
+void dsFlushDb(int dbid);
+int dsRdbSaveBackground(char *filename);
+int dsRdbSave(char *filename);
/* Disk Store Cache */
void dsInit(void);
void freeIOJob(iojob *j);
void queueIOJob(iojob *j);
void waitEmptyIOJobsQueue(void);
+void processAllPendingIOJobs(void);
void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
int dontWaitForSwappedKey(redisClient *c, robj *key);
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
int cacheFreeOneEntry(void);
+void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag);
+void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag);
+int cacheScheduleIOGetFlags(redisDb *db, robj *key);
+void cacheScheduleIO(redisDb *db, robj *key, int type);
+void cacheCron(void);
+int cacheKeyMayExist(redisDb *db, robj *key);
+void cacheSetKeyMayExist(redisDb *db, robj *key);
+void cacheSetKeyDoesNotExist(redisDb *db, robj *key);
+void cacheForcePointInTime(void);
/* Set data type */
robj *setTypeCreate(robj *value);