X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/1fce3201145f256401bef7fde96a8c1509bcb4a7..8218db3dae220409f239a5a0adb47d106b178bf9:/src/redis.h diff --git a/src/redis.h b/src/redis.h index 054ee930..78294d0d 100644 --- a/src/redis.h +++ b/src/redis.h @@ -50,11 +50,6 @@ #define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */ #define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */ -/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */ -#define REDIS_WRITEV_THRESHOLD 3 -/* Max number of iovecs used for each writev call */ -#define REDIS_WRITEV_IOVEC_COUNT 256 - /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ @@ -75,6 +70,10 @@ #define REDIS_ZSET 3 #define REDIS_HASH 4 #define REDIS_VMPOINTER 8 +/* Object types only used for persistence in .rdb files */ +#define REDIS_HASH_ZIPMAP 9 +#define REDIS_LIST_ZIPLIST 10 +#define REDIS_SET_INTSET 11 /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object @@ -119,10 +118,15 @@ #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ -/* Disk store cache object->storage values */ -#define REDIS_DS_MEMORY 0 /* The object is on memory */ -#define REDIS_DS_DIRTY 1 /* The object was modified */ -#define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */ +/* Scheduled IO opeations flags. */ +#define REDIS_IO_LOAD 1 +#define REDIS_IO_SAVE 2 +#define REDIS_IO_LOADINPROG 4 +#define REDIS_IO_SAVEINPROG 8 + +/* Generic IO flags */ +#define REDIS_IO_ONLYLOADS 1 +#define REDIS_IO_ASAP 2 #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 #define REDIS_THREAD_STACK_SIZE (1024*1024*4) @@ -136,6 +140,8 @@ #define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ #define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ #define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */ +#define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in + server.unblocked_clients */ /* Client request types */ #define REDIS_REQ_INLINE 1 @@ -184,8 +190,8 @@ #define APPENDFSYNC_EVERYSEC 2 /* Zip structure related defaults */ -#define REDIS_HASH_MAX_ZIPMAP_ENTRIES 64 -#define REDIS_HASH_MAX_ZIPMAP_VALUE 512 +#define REDIS_HASH_MAX_ZIPMAP_ENTRIES 512 +#define REDIS_HASH_MAX_ZIPMAP_VALUE 64 #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 512 #define REDIS_LIST_MAX_ZIPLIST_VALUE 64 #define REDIS_SET_MAX_INTSET_ENTRIES 512 @@ -203,6 +209,12 @@ #define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4 #define REDIS_MAXMEMORY_NO_EVICTION 5 +/* Diskstore background saving thread states */ +#define REDIS_BGSAVE_THREAD_UNACTIVE 0 +#define REDIS_BGSAVE_THREAD_ACTIVE 1 +#define REDIS_BGSAVE_THREAD_DONE_OK 2 +#define REDIS_BGSAVE_THREAD_DONE_ERR 3 + /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) #define redisPanic(_e) _redisPanic(#_e,__FILE__,__LINE__),_exit(1) @@ -220,7 +232,7 @@ void _redisPanic(char *msg, char *file, int line); #define REDIS_LRU_CLOCK_RESOLUTION 10 /* LRU clock resolution in seconds */ typedef struct redisObject { unsigned type:4; - unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */ + unsigned notused:2; /* Not used */ unsigned encoding:4; unsigned lru:22; /* lru time (relative to server.lruclock) */ int refcount; @@ -261,14 +273,15 @@ typedef struct vmPointer { _var.type = REDIS_STRING; \ _var.encoding = REDIS_ENCODING_RAW; \ _var.ptr = _ptr; \ - _var.storage = REDIS_DS_MEMORY; \ } while(0); typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ - dict *io_keys; /* Keys with clients waiting for VM I/O */ + dict *io_keys; /* Keys with clients waiting for DS I/O */ + dict *io_negcache; /* Negative caching for disk store */ + dict *io_queued; /* Queued IO operations hash table */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -349,17 +362,20 @@ struct sharedObjectsStruct { /* Global server state structure */ struct redisServer { + /* General */ pthread_t mainthread; + redisDb *db; + dict *commands; /* Command table hahs table */ + aeEventLoop *el; + /* Networking */ int port; char *bindaddr; char *unixsocket; int ipfd; int sofd; - redisDb *db; - long long dirty; /* changes to DB from the last save */ - long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */ list *clients; - dict *commands; /* Command table hahs table */ + list *slaves, *monitors; + char neterr[ANET_ERR_LEN]; /* RDB / AOF loading information */ int loading; off_t loading_total_bytes; @@ -367,9 +383,6 @@ struct redisServer { time_t loading_start_time; /* Fast pointers to often looked up command */ struct redisCommand *delCommand, *multiCommand; - list *slaves, *monitors; - char neterr[ANET_ERR_LEN]; - aeEventLoop *el; int cronloops; /* number of times the cron function run */ time_t lastsave; /* Unix time of last save succeeede */ /* Fields used only for stats */ @@ -382,7 +395,6 @@ struct redisServer { long long stat_keyspace_misses; /* number of failed lookups of keys */ /* Configuration */ int verbosity; - int glueoutputbuf; int maxidletime; int dbnum; int daemonize; @@ -390,25 +402,32 @@ struct redisServer { int appendfsync; int no_appendfsync_on_rewrite; int shutdown_asap; + int activerehashing; + char *requirepass; + /* Persistence */ + long long dirty; /* changes to DB from the last save */ + long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */ time_t lastfsync; int appendfd; int appendseldb; char *pidfile; pid_t bgsavechildpid; pid_t bgrewritechildpid; + int bgsavethread_state; + pthread_mutex_t bgsavethread_mutex; + pthread_t bgsavethread; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; int saveparamslen; + char *dbfilename; + int rdbcompression; + char *appendfilename; + /* Logging */ char *logfile; int syslog_enabled; char *syslog_ident; int syslog_facility; - char *dbfilename; - char *appendfilename; - char *requirepass; - int rdbcompression; - int activerehashing; /* Replication related */ int isslave; /* Slave specific fields */ @@ -432,7 +451,7 @@ struct redisServer { unsigned int bpop_blocked_clients; unsigned int cache_blocked_clients; list *unblocked_clients; /* list of clients to unblock before next loop */ - list *cache_flush_queue; /* keys to flush on disk */ + list *cache_io_queue; /* IO operations queue */ int cache_flush_delay; /* seconds to wait before flushing keys */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ @@ -459,7 +478,7 @@ struct redisServer { list *io_processed; /* List of VM I/O jobs already processed */ list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */ pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ - pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */ + pthread_cond_t io_condvar; /* I/O threads conditional variable */ pthread_attr_t io_threads_attr; /* attributes for threads creation */ int io_active_threads; /* Number of running I/O threads */ int vm_max_threads; /* Max number of I/O threads running at the same time */ @@ -502,6 +521,7 @@ struct redisCommand { int vm_firstkey; /* The first argument that's a key (0 = no keys) */ int vm_lastkey; /* THe last argument that's a key */ int vm_keystep; /* The step between first and last key */ + long long microseconds, calls; }; struct redisFunctionSym { @@ -544,7 +564,7 @@ typedef struct zset { zskiplist *zsl; } zset; -/* VM threaded I/O request message */ +/* DIsk store threaded I/O request message */ #define REDIS_IOJOB_LOAD 0 #define REDIS_IOJOB_SAVE 1 @@ -557,13 +577,13 @@ typedef struct iojob { time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */ } iojob; -/* When diskstore is enabled and a flush operation is requested we push - * one of this structures into server.cache_flush_queue. */ -typedef struct dirtykey { +/* IO operations scheduled -- check dscache.c for more info */ +typedef struct ioop { + int type; redisDb *db; robj *key; time_t ctime; /* This is the creation time of the entry. */ -} dirtykey; +} ioop; /* Structure to hold list iteration abstraction. */ typedef struct { @@ -621,13 +641,15 @@ dictType hashDictType; * Functions prototypes *----------------------------------------------------------------------------*/ +/* Utils */ +long long ustime(void); + /* networking.c -- Networking and Client related operations */ redisClient *createClient(int fd); void closeTimedoutClients(void); void freeClient(redisClient *c); void resetClient(redisClient *c); void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask); -void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); void *addDeferredMultiBulkLength(redisClient *c); void setDeferredMultiBulkLength(redisClient *c, void *node, long length); @@ -649,6 +671,8 @@ void addReplyDouble(redisClient *c, double d); void addReplyLongLong(redisClient *c, long long ll); void addReplyMultiBulkLen(redisClient *c, long length); void *dupClientReplyValue(void *o); +void getClientsMaxBuffers(unsigned long *longest_output_list, + unsigned long *biggest_input_buffer); #ifdef __GNUC__ void addReplyErrorFormat(redisClient *c, const char *fmt, ...) @@ -706,6 +730,7 @@ robj *createSetObject(void); robj *createIntsetObject(void); robj *createHashObject(void); robj *createZsetObject(void); +robj *createZsetZiplistObject(void); int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg); int checkType(redisClient *c, robj *o, int type); int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, const char *msg); @@ -746,11 +771,13 @@ int rdbSaveObject(FILE *fp, robj *o); off_t rdbSavedObjectLen(robj *o); off_t rdbSavedObjectPages(robj *o); robj *rdbLoadObject(int type, FILE *fp); -void backgroundSaveDoneHandler(int statloc); -int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, time_t now); +void backgroundSaveDoneHandler(int exitcode, int bysignal); +int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, time_t expireitme, time_t now); int rdbLoadType(FILE *fp); time_t rdbLoadTime(FILE *fp); robj *rdbLoadStringObject(FILE *fp); +int rdbSaveType(FILE *fp, unsigned char type); +int rdbSaveLen(FILE *fp, uint32_t len); /* AOF persistence */ void flushAppendOnlyFile(void); @@ -760,7 +787,7 @@ int rewriteAppendOnlyFileBackground(void); int loadAppendOnlyFile(char *filename); void stopAppendOnly(void); int startAppendOnly(void); -void backgroundRewriteDoneHandler(int statloc); +void backgroundRewriteDoneHandler(int exitcode, int bysignal); /* Sorted sets data type */ zskiplist *zslCreate(void); @@ -770,7 +797,7 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj); /* Core functions */ void freeMemoryIfNeeded(void); int processCommand(redisClient *c); -void setupSigSegvAction(void); +void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommandByCString(char *s); void call(redisClient *c, struct redisCommand *cmd); @@ -781,15 +808,18 @@ void updateDictResizePolicy(void); int htNeedsResize(dict *dict); void oom(const char *msg); void populateCommandTable(void); +void resetCommandTableStats(void); /* Disk store */ int dsOpen(void); int dsClose(void); -int dsSet(redisDb *db, robj *key, robj *val); +int dsSet(redisDb *db, robj *key, robj *val, time_t expire); robj *dsGet(redisDb *db, robj *key, time_t *expire); int dsDel(redisDb *db, robj *key); int dsExists(redisDb *db, robj *key); -int dsFlushDb(int dbid); +void dsFlushDb(int dbid); +int dsRdbSaveBackground(char *filename); +int dsRdbSave(char *filename); /* Disk Store Cache */ void dsInit(void); @@ -799,14 +829,22 @@ void unlockThreadedIO(void); void freeIOJob(iojob *j); void queueIOJob(iojob *j); void waitEmptyIOJobsQueue(void); +void processAllPendingIOJobs(void); void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); int dontWaitForSwappedKey(redisClient *c, robj *key); void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); int cacheFreeOneEntry(void); -void cacheScheduleForFlush(redisDb *db, robj *key); +void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag); +void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag); +int cacheScheduleIOGetFlags(redisDb *db, robj *key); +void cacheScheduleIO(redisDb *db, robj *key, int type); void cacheCron(void); +int cacheKeyMayExist(redisDb *db, robj *key); +void cacheSetKeyMayExist(redisDb *db, robj *key); +void cacheSetKeyDoesNotExist(redisDb *db, robj *key); +void cacheForcePointInTime(void); /* Set data type */ robj *setTypeCreate(robj *value); @@ -850,6 +888,7 @@ int stringmatchlen(const char *pattern, int patternLen, int stringmatch(const char *pattern, const char *string, int nocase); long long memtoll(const char *p, int *err); int ll2string(char *s, size_t len, long long value); +int d2string(char *s, size_t len, double value); int isStringRepresentableAsLong(sds s, long *longval); int isStringRepresentableAsLongLong(sds s, long long *longval); int isObjectRepresentableAsLongLong(robj *o, long long *llongval);