X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/f34a6cd85e3327bf4fbe94ba46d6e7fefd7d61c7..4c5f0966b2e582981d9fdaf3b511c6cf4ac4d4d5:/src/redis.h diff --git a/src/redis.h b/src/redis.h index e12b1c18..78294d0d 100644 --- a/src/redis.h +++ b/src/redis.h @@ -50,11 +50,6 @@ #define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */ #define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */ -/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */ -#define REDIS_WRITEV_THRESHOLD 3 -/* Max number of iovecs used for each writev call */ -#define REDIS_WRITEV_IOVEC_COUNT 256 - /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ @@ -75,6 +70,10 @@ #define REDIS_ZSET 3 #define REDIS_HASH 4 #define REDIS_VMPOINTER 8 +/* Object types only used for persistence in .rdb files */ +#define REDIS_HASH_ZIPMAP 9 +#define REDIS_LIST_ZIPLIST 10 +#define REDIS_SET_INTSET 11 /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object @@ -119,10 +118,15 @@ #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ -/* Disk store cache object->storage values */ -#define REDIS_DS_MEMORY 0 /* The object is on memory */ -#define REDIS_DS_DIRTY 1 /* The object was modified */ -#define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */ +/* Scheduled IO opeations flags. */ +#define REDIS_IO_LOAD 1 +#define REDIS_IO_SAVE 2 +#define REDIS_IO_LOADINPROG 4 +#define REDIS_IO_SAVEINPROG 8 + +/* Generic IO flags */ +#define REDIS_IO_ONLYLOADS 1 +#define REDIS_IO_ASAP 2 #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 #define REDIS_THREAD_STACK_SIZE (1024*1024*4) @@ -136,6 +140,8 @@ #define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ #define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ #define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */ +#define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in + server.unblocked_clients */ /* Client request types */ #define REDIS_REQ_INLINE 1 @@ -184,8 +190,8 @@ #define APPENDFSYNC_EVERYSEC 2 /* Zip structure related defaults */ -#define REDIS_HASH_MAX_ZIPMAP_ENTRIES 64 -#define REDIS_HASH_MAX_ZIPMAP_VALUE 512 +#define REDIS_HASH_MAX_ZIPMAP_ENTRIES 512 +#define REDIS_HASH_MAX_ZIPMAP_VALUE 64 #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 512 #define REDIS_LIST_MAX_ZIPLIST_VALUE 64 #define REDIS_SET_MAX_INTSET_ENTRIES 512 @@ -203,6 +209,12 @@ #define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4 #define REDIS_MAXMEMORY_NO_EVICTION 5 +/* Diskstore background saving thread states */ +#define REDIS_BGSAVE_THREAD_UNACTIVE 0 +#define REDIS_BGSAVE_THREAD_ACTIVE 1 +#define REDIS_BGSAVE_THREAD_DONE_OK 2 +#define REDIS_BGSAVE_THREAD_DONE_ERR 3 + /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) #define redisPanic(_e) _redisPanic(#_e,__FILE__,__LINE__),_exit(1) @@ -220,7 +232,7 @@ void _redisPanic(char *msg, char *file, int line); #define REDIS_LRU_CLOCK_RESOLUTION 10 /* LRU clock resolution in seconds */ typedef struct redisObject { unsigned type:4; - unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */ + unsigned notused:2; /* Not used */ unsigned encoding:4; unsigned lru:22; /* lru time (relative to server.lruclock) */ int refcount; @@ -261,14 +273,15 @@ typedef struct vmPointer { _var.type = REDIS_STRING; \ _var.encoding = REDIS_ENCODING_RAW; \ _var.ptr = _ptr; \ - _var.storage = REDIS_DS_MEMORY; \ } while(0); typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ - dict *io_keys; /* Keys with clients waiting for VM I/O */ + dict *io_keys; /* Keys with clients waiting for DS I/O */ + dict *io_negcache; /* Negative caching for disk store */ + dict *io_queued; /* Queued IO operations hash table */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -349,17 +362,20 @@ struct sharedObjectsStruct { /* Global server state structure */ struct redisServer { + /* General */ pthread_t mainthread; + redisDb *db; + dict *commands; /* Command table hahs table */ + aeEventLoop *el; + /* Networking */ int port; char *bindaddr; char *unixsocket; int ipfd; int sofd; - redisDb *db; - long long dirty; /* changes to DB from the last save */ - long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */ list *clients; - dict *commands; /* Command table hahs table */ + list *slaves, *monitors; + char neterr[ANET_ERR_LEN]; /* RDB / AOF loading information */ int loading; off_t loading_total_bytes; @@ -367,9 +383,6 @@ struct redisServer { time_t loading_start_time; /* Fast pointers to often looked up command */ struct redisCommand *delCommand, *multiCommand; - list *slaves, *monitors; - char neterr[ANET_ERR_LEN]; - aeEventLoop *el; int cronloops; /* number of times the cron function run */ time_t lastsave; /* Unix time of last save succeeede */ /* Fields used only for stats */ @@ -382,7 +395,6 @@ struct redisServer { long long stat_keyspace_misses; /* number of failed lookups of keys */ /* Configuration */ int verbosity; - int glueoutputbuf; int maxidletime; int dbnum; int daemonize; @@ -390,25 +402,32 @@ struct redisServer { int appendfsync; int no_appendfsync_on_rewrite; int shutdown_asap; + int activerehashing; + char *requirepass; + /* Persistence */ + long long dirty; /* changes to DB from the last save */ + long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */ time_t lastfsync; int appendfd; int appendseldb; char *pidfile; pid_t bgsavechildpid; pid_t bgrewritechildpid; + int bgsavethread_state; + pthread_mutex_t bgsavethread_mutex; + pthread_t bgsavethread; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; int saveparamslen; + char *dbfilename; + int rdbcompression; + char *appendfilename; + /* Logging */ char *logfile; int syslog_enabled; char *syslog_ident; int syslog_facility; - char *dbfilename; - char *appendfilename; - char *requirepass; - int rdbcompression; - int activerehashing; /* Replication related */ int isslave; /* Slave specific fields */ @@ -431,7 +450,9 @@ struct redisServer { /* Blocked clients */ unsigned int bpop_blocked_clients; unsigned int cache_blocked_clients; - list *unblocked_clients; + list *unblocked_clients; /* list of clients to unblock before next loop */ + list *cache_io_queue; /* IO operations queue */ + int cache_flush_delay; /* seconds to wait before flushing keys */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; @@ -457,7 +478,7 @@ struct redisServer { list *io_processed; /* List of VM I/O jobs already processed */ list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */ pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ - pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */ + pthread_cond_t io_condvar; /* I/O threads conditional variable */ pthread_attr_t io_threads_attr; /* attributes for threads creation */ int io_active_threads; /* Number of running I/O threads */ int vm_max_threads; /* Max number of I/O threads running at the same time */ @@ -500,6 +521,7 @@ struct redisCommand { int vm_firstkey; /* The first argument that's a key (0 = no keys) */ int vm_lastkey; /* THe last argument that's a key */ int vm_keystep; /* The step between first and last key */ + long long microseconds, calls; }; struct redisFunctionSym { @@ -542,7 +564,7 @@ typedef struct zset { zskiplist *zsl; } zset; -/* VM threaded I/O request message */ +/* DIsk store threaded I/O request message */ #define REDIS_IOJOB_LOAD 0 #define REDIS_IOJOB_SAVE 1 @@ -552,8 +574,17 @@ typedef struct iojob { robj *key; /* This I/O request is about this key */ robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */ + time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */ } iojob; +/* IO operations scheduled -- check dscache.c for more info */ +typedef struct ioop { + int type; + redisDb *db; + robj *key; + time_t ctime; /* This is the creation time of the entry. */ +} ioop; + /* Structure to hold list iteration abstraction. */ typedef struct { robj *subject; @@ -610,13 +641,15 @@ dictType hashDictType; * Functions prototypes *----------------------------------------------------------------------------*/ +/* Utils */ +long long ustime(void); + /* networking.c -- Networking and Client related operations */ redisClient *createClient(int fd); void closeTimedoutClients(void); void freeClient(redisClient *c); void resetClient(redisClient *c); void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask); -void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); void *addDeferredMultiBulkLength(redisClient *c); void setDeferredMultiBulkLength(redisClient *c, void *node, long length); @@ -638,6 +671,8 @@ void addReplyDouble(redisClient *c, double d); void addReplyLongLong(redisClient *c, long long ll); void addReplyMultiBulkLen(redisClient *c, long length); void *dupClientReplyValue(void *o); +void getClientsMaxBuffers(unsigned long *longest_output_list, + unsigned long *biggest_input_buffer); #ifdef __GNUC__ void addReplyErrorFormat(redisClient *c, const char *fmt, ...) @@ -695,6 +730,7 @@ robj *createSetObject(void); robj *createIntsetObject(void); robj *createHashObject(void); robj *createZsetObject(void); +robj *createZsetZiplistObject(void); int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg); int checkType(redisClient *c, robj *o, int type); int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, const char *msg); @@ -735,7 +771,13 @@ int rdbSaveObject(FILE *fp, robj *o); off_t rdbSavedObjectLen(robj *o); off_t rdbSavedObjectPages(robj *o); robj *rdbLoadObject(int type, FILE *fp); -void backgroundSaveDoneHandler(int statloc); +void backgroundSaveDoneHandler(int exitcode, int bysignal); +int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, time_t expireitme, time_t now); +int rdbLoadType(FILE *fp); +time_t rdbLoadTime(FILE *fp); +robj *rdbLoadStringObject(FILE *fp); +int rdbSaveType(FILE *fp, unsigned char type); +int rdbSaveLen(FILE *fp, uint32_t len); /* AOF persistence */ void flushAppendOnlyFile(void); @@ -745,7 +787,7 @@ int rewriteAppendOnlyFileBackground(void); int loadAppendOnlyFile(char *filename); void stopAppendOnly(void); int startAppendOnly(void); -void backgroundRewriteDoneHandler(int statloc); +void backgroundRewriteDoneHandler(int exitcode, int bysignal); /* Sorted sets data type */ zskiplist *zslCreate(void); @@ -755,7 +797,7 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj); /* Core functions */ void freeMemoryIfNeeded(void); int processCommand(redisClient *c); -void setupSigSegvAction(void); +void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommandByCString(char *s); void call(redisClient *c, struct redisCommand *cmd); @@ -766,40 +808,43 @@ void updateDictResizePolicy(void); int htNeedsResize(dict *dict); void oom(const char *msg); void populateCommandTable(void); +void resetCommandTableStats(void); /* Disk store */ int dsOpen(void); int dsClose(void); -int dsSet(redisDb *db, robj *key, robj *val); -robj *dsGet(redisDb *db, robj *key); +int dsSet(redisDb *db, robj *key, robj *val, time_t expire); +robj *dsGet(redisDb *db, robj *key, time_t *expire); +int dsDel(redisDb *db, robj *key); int dsExists(redisDb *db, robj *key); +void dsFlushDb(int dbid); +int dsRdbSaveBackground(char *filename); +int dsRdbSave(char *filename); /* Disk Store Cache */ -void vmInit(void); -void vmMarkPagesFree(off_t page, off_t count); -robj *vmLoadObject(robj *o); -robj *vmPreviewObject(robj *o); -int vmSwapOneObjectBlocking(void); -int vmSwapOneObjectThreaded(void); -int vmCanSwapOut(void); +void dsInit(void); void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask); -void vmCancelThreadedIOJob(robj *o); void lockThreadedIO(void); void unlockThreadedIO(void); -int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db); void freeIOJob(iojob *j); void queueIOJob(iojob *j); -int vmWriteObjectOnSwap(robj *o, off_t page); -robj *vmReadObjectFromSwap(off_t page, int type); void waitEmptyIOJobsQueue(void); -void vmReopenSwapFile(void); -int vmFreePage(off_t page); +void processAllPendingIOJobs(void); void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); int dontWaitForSwappedKey(redisClient *c, robj *key); void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); -vmpointer *vmSwapObjectBlocking(robj *val); +int cacheFreeOneEntry(void); +void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag); +void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag); +int cacheScheduleIOGetFlags(redisDb *db, robj *key); +void cacheScheduleIO(redisDb *db, robj *key, int type); +void cacheCron(void); +int cacheKeyMayExist(redisDb *db, robj *key); +void cacheSetKeyMayExist(redisDb *db, robj *key); +void cacheSetKeyDoesNotExist(redisDb *db, robj *key); +void cacheForcePointInTime(void); /* Set data type */ robj *setTypeCreate(robj *value); @@ -843,6 +888,7 @@ int stringmatchlen(const char *pattern, int patternLen, int stringmatch(const char *pattern, const char *string, int nocase); long long memtoll(const char *p, int *err); int ll2string(char *s, size_t len, long long value); +int d2string(char *s, size_t len, double value); int isStringRepresentableAsLong(sds s, long *longval); int isStringRepresentableAsLongLong(sds s, long long *longval); int isObjectRepresentableAsLongLong(robj *o, long long *llongval); @@ -870,6 +916,8 @@ robj *dbRandomKey(redisDb *db); int dbDelete(redisDb *db, robj *key); long long emptyDb(); int selectDb(redisClient *c, int id); +void signalModifiedKey(redisDb *db, robj *key); +void signalFlushedDb(int dbid); /* Git SHA1 */ char *redisGitSHA1(void);