X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/1fce3201145f256401bef7fde96a8c1509bcb4a7..f797c7dc176c833e6aa412c557d7fedd59dc1124:/src/redis.h diff --git a/src/redis.h b/src/redis.h index 054ee930..91a64ecf 100644 --- a/src/redis.h +++ b/src/redis.h @@ -18,6 +18,7 @@ #include #include #include +#include #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@ -50,11 +51,6 @@ #define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */ #define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */ -/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */ -#define REDIS_WRITEV_THRESHOLD 3 -/* Max number of iovecs used for each writev call */ -#define REDIS_WRITEV_IOVEC_COUNT 256 - /* Hash table parameters */ #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */ @@ -75,6 +71,10 @@ #define REDIS_ZSET 3 #define REDIS_HASH 4 #define REDIS_VMPOINTER 8 +/* Object types only used for persistence in .rdb files */ +#define REDIS_HASH_ZIPMAP 9 +#define REDIS_LIST_ZIPLIST 10 +#define REDIS_SET_INTSET 11 /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object @@ -119,10 +119,15 @@ #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */ #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */ -/* Disk store cache object->storage values */ -#define REDIS_DS_MEMORY 0 /* The object is on memory */ -#define REDIS_DS_DIRTY 1 /* The object was modified */ -#define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */ +/* Scheduled IO opeations flags. */ +#define REDIS_IO_LOAD 1 +#define REDIS_IO_SAVE 2 +#define REDIS_IO_LOADINPROG 4 +#define REDIS_IO_SAVEINPROG 8 + +/* Generic IO flags */ +#define REDIS_IO_ONLYLOADS 1 +#define REDIS_IO_ASAP 2 #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 #define REDIS_THREAD_STACK_SIZE (1024*1024*4) @@ -136,6 +141,8 @@ #define REDIS_IO_WAIT 32 /* The client is waiting for Virtual Memory I/O */ #define REDIS_DIRTY_CAS 64 /* Watched keys modified. EXEC will fail. */ #define REDIS_CLOSE_AFTER_REPLY 128 /* Close after writing entire reply. */ +#define REDIS_UNBLOCKED 256 /* This client was unblocked and is stored in + server.unblocked_clients */ /* Client request types */ #define REDIS_REQ_INLINE 1 @@ -184,8 +191,8 @@ #define APPENDFSYNC_EVERYSEC 2 /* Zip structure related defaults */ -#define REDIS_HASH_MAX_ZIPMAP_ENTRIES 64 -#define REDIS_HASH_MAX_ZIPMAP_VALUE 512 +#define REDIS_HASH_MAX_ZIPMAP_ENTRIES 512 +#define REDIS_HASH_MAX_ZIPMAP_VALUE 64 #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 512 #define REDIS_LIST_MAX_ZIPLIST_VALUE 64 #define REDIS_SET_MAX_INTSET_ENTRIES 512 @@ -203,6 +210,12 @@ #define REDIS_MAXMEMORY_ALLKEYS_RANDOM 4 #define REDIS_MAXMEMORY_NO_EVICTION 5 +/* Diskstore background saving thread states */ +#define REDIS_BGSAVE_THREAD_UNACTIVE 0 +#define REDIS_BGSAVE_THREAD_ACTIVE 1 +#define REDIS_BGSAVE_THREAD_DONE_OK 2 +#define REDIS_BGSAVE_THREAD_DONE_ERR 3 + /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) #define redisPanic(_e) _redisPanic(#_e,__FILE__,__LINE__),_exit(1) @@ -220,7 +233,7 @@ void _redisPanic(char *msg, char *file, int line); #define REDIS_LRU_CLOCK_RESOLUTION 10 /* LRU clock resolution in seconds */ typedef struct redisObject { unsigned type:4; - unsigned storage:2; /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */ + unsigned notused:2; /* Not used */ unsigned encoding:4; unsigned lru:22; /* lru time (relative to server.lruclock) */ int refcount; @@ -261,14 +274,15 @@ typedef struct vmPointer { _var.type = REDIS_STRING; \ _var.encoding = REDIS_ENCODING_RAW; \ _var.ptr = _ptr; \ - _var.storage = REDIS_DS_MEMORY; \ } while(0); typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */ - dict *io_keys; /* Keys with clients waiting for VM I/O */ + dict *io_keys; /* Keys with clients waiting for DS I/O */ + dict *io_negcache; /* Negative caching for disk store */ + dict *io_queued; /* Queued IO operations hash table */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; } redisDb; @@ -347,19 +361,140 @@ struct sharedObjectsStruct { *integers[REDIS_SHARED_INTEGERS]; }; -/* Global server state structure */ +/*----------------------------------------------------------------------------- + * Redis cluster data structures + *----------------------------------------------------------------------------*/ + +#define REDIS_CLUSTER_SLOTS 4096 +#define REDIS_CLUSTER_OK 0 /* Everything looks ok */ +#define REDIS_CLUSTER_FAIL 1 /* The cluster can't work */ +#define REDIS_CLUSTER_NEEDHELP 2 /* The cluster works, but needs some help */ +#define REDIS_CLUSTER_NAMELEN 40 /* sha1 hex length */ +#define REDIS_CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */ + +struct clusterNode; + +/* clusterLink encapsulates everything needed to talk with a remote node. */ +typedef struct clusterLink { + int fd; /* TCP socket file descriptor */ + sds sndbuf; /* Packet send buffer */ + sds rcvbuf; /* Packet reception buffer */ + struct clusterNode *node; /* Node related to this link if any, or NULL */ +} clusterLink; + +/* Node flags */ +#define REDIS_NODE_MASTER 1 /* The node is a master */ +#define REDIS_NODE_SLAVE 2 /* The node is a slave */ +#define REDIS_NODE_PFAIL 4 /* Failure? Need acknowledge */ +#define REDIS_NODE_FAIL 8 /* The node is believed to be malfunctioning */ +#define REDIS_NODE_MYSELF 16 /* This node is myself */ +#define REDIS_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */ +#define REDIS_NODE_NOADDR 64 /* We don't know the address of this node */ +#define REDIS_NODE_MEET 128 /* Send a MEET message to this node */ +#define REDIS_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" + +struct clusterNode { + char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ + int flags; /* REDIS_NODE_... */ + unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */ + int numslaves; /* Number of slave nodes, if this is a master */ + struct clusterNode **slaves; /* pointers to slave nodes */ + struct clusterNode *slaveof; /* pointer to the master node */ + time_t ping_sent; /* Unix time we sent latest ping */ + time_t pong_received; /* Unix time we received the pong */ + char *configdigest; /* Configuration digest of this node */ + time_t configdigest_ts; /* Configuration digest timestamp */ + char ip[16]; /* Latest known IP address of this node */ + int port; /* Latest known port of this node */ + clusterLink *link; /* TCP/IP link with this node */ +}; +typedef struct clusterNode clusterNode; + +typedef struct { + char *configfile; + clusterNode *myself; /* This node */ + int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */ + int node_timeout; + dict *nodes; /* Hash table of name -> clusterNode structures */ + clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS]; + clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS]; + clusterNode *slots[REDIS_CLUSTER_SLOTS]; +} clusterState; + +/* Redis cluster messages header */ + +/* Note that the PING, PONG and MEET messages are actually the same exact + * kind of packet. PONG is the reply to ping, in the extact format as a PING, + * while MEET is a special PING that forces the receiver to add the sender + * as a node (if it is not already in the list). */ +#define CLUSTERMSG_TYPE_PING 0 /* Ping */ +#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */ +#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */ +#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */ + +/* Initially we don't know our "name", but we'll find it once we connect + * to the first node, using the getsockname() function. Then we'll use this + * address for all the next messages. */ +typedef struct { + char nodename[REDIS_CLUSTER_NAMELEN]; + uint32_t ping_sent; + uint32_t pong_received; + char ip[16]; /* IP address last time it was seen */ + uint16_t port; /* port last time it was seen */ + uint16_t flags; + uint32_t notused; /* for 64 bit alignment */ +} clusterMsgDataGossip; + +typedef struct { + char nodename[REDIS_CLUSTER_NAMELEN]; +} clusterMsgDataFail; + +union clusterMsgData { + /* PING, MEET and PONG */ + struct { + /* Array of N clusterMsgDataGossip structures */ + clusterMsgDataGossip gossip[1]; + } ping; + /* FAIL */ + struct { + clusterMsgDataFail about; + } fail; +}; + +typedef struct { + uint32_t totlen; /* Total length of this message */ + uint16_t type; /* Message type */ + uint16_t count; /* Only used for some kind of messages. */ + char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */ + unsigned char myslots[REDIS_CLUSTER_SLOTS/8]; + char slaveof[REDIS_CLUSTER_NAMELEN]; + char configdigest[32]; + uint16_t port; /* Sender TCP base port */ + unsigned char state; /* Cluster state from the POV of the sender */ + unsigned char notused[5]; /* Reserved for future use. For alignment. */ + union clusterMsgData data; +} clusterMsg; + +/*----------------------------------------------------------------------------- + * Global server state + *----------------------------------------------------------------------------*/ + struct redisServer { + /* General */ pthread_t mainthread; + redisDb *db; + dict *commands; /* Command table hahs table */ + aeEventLoop *el; + /* Networking */ int port; char *bindaddr; char *unixsocket; int ipfd; int sofd; - redisDb *db; - long long dirty; /* changes to DB from the last save */ - long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */ + int cfd; list *clients; - dict *commands; /* Command table hahs table */ + list *slaves, *monitors; + char neterr[ANET_ERR_LEN]; /* RDB / AOF loading information */ int loading; off_t loading_total_bytes; @@ -367,9 +502,6 @@ struct redisServer { time_t loading_start_time; /* Fast pointers to often looked up command */ struct redisCommand *delCommand, *multiCommand; - list *slaves, *monitors; - char neterr[ANET_ERR_LEN]; - aeEventLoop *el; int cronloops; /* number of times the cron function run */ time_t lastsave; /* Unix time of last save succeeede */ /* Fields used only for stats */ @@ -382,7 +514,6 @@ struct redisServer { long long stat_keyspace_misses; /* number of failed lookups of keys */ /* Configuration */ int verbosity; - int glueoutputbuf; int maxidletime; int dbnum; int daemonize; @@ -390,25 +521,32 @@ struct redisServer { int appendfsync; int no_appendfsync_on_rewrite; int shutdown_asap; + int activerehashing; + char *requirepass; + /* Persistence */ + long long dirty; /* changes to DB from the last save */ + long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */ time_t lastfsync; int appendfd; int appendseldb; char *pidfile; pid_t bgsavechildpid; pid_t bgrewritechildpid; + int bgsavethread_state; + pthread_mutex_t bgsavethread_mutex; + pthread_t bgsavethread; sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */ sds aofbuf; /* AOF buffer, written before entering the event loop */ struct saveparam *saveparams; int saveparamslen; + char *dbfilename; + int rdbcompression; + char *appendfilename; + /* Logging */ char *logfile; int syslog_enabled; char *syslog_ident; int syslog_facility; - char *dbfilename; - char *appendfilename; - char *requirepass; - int rdbcompression; - int activerehashing; /* Replication related */ int isslave; /* Slave specific fields */ @@ -432,7 +570,7 @@ struct redisServer { unsigned int bpop_blocked_clients; unsigned int cache_blocked_clients; list *unblocked_clients; /* list of clients to unblock before next loop */ - list *cache_flush_queue; /* keys to flush on disk */ + list *cache_io_queue; /* IO operations queue */ int cache_flush_delay; /* seconds to wait before flushing keys */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ @@ -459,7 +597,7 @@ struct redisServer { list *io_processed; /* List of VM I/O jobs already processed */ list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */ pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ - pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */ + pthread_cond_t io_condvar; /* I/O threads conditional variable */ pthread_attr_t io_threads_attr; /* attributes for threads creation */ int io_active_threads; /* Number of running I/O threads */ int vm_max_threads; /* Max number of I/O threads running at the same time */ @@ -480,6 +618,8 @@ struct redisServer { /* Misc */ unsigned lruclock:22; /* clock incrementing every minute, for LRU */ unsigned lruclock_padding:10; + int cluster_enabled; + clusterState cluster; }; typedef struct pubsubPattern { @@ -488,20 +628,20 @@ typedef struct pubsubPattern { } pubsubPattern; typedef void redisCommandProc(redisClient *c); -typedef void redisVmPreloadProc(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); +typedef int *redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags); struct redisCommand { char *name; redisCommandProc *proc; int arity; int flags; - /* Use a function to determine which keys need to be loaded - * in the background prior to executing this command. Takes precedence - * over vm_firstkey and others, ignored when NULL */ - redisVmPreloadProc *vm_preload_proc; + /* Use a function to determine keys arguments in a command line. + * Used both for diskstore preloading and Redis Cluster. */ + redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ - int vm_firstkey; /* The first argument that's a key (0 = no keys) */ - int vm_lastkey; /* THe last argument that's a key */ - int vm_keystep; /* The step between first and last key */ + int firstkey; /* The first argument that's a key (0 = no keys) */ + int lastkey; /* THe last argument that's a key */ + int keystep; /* The step between first and last key */ + long long microseconds, calls; }; struct redisFunctionSym { @@ -544,7 +684,7 @@ typedef struct zset { zskiplist *zsl; } zset; -/* VM threaded I/O request message */ +/* DIsk store threaded I/O request message */ #define REDIS_IOJOB_LOAD 0 #define REDIS_IOJOB_SAVE 1 @@ -557,13 +697,13 @@ typedef struct iojob { time_t expire; /* Expire time for this key on REDIS_IOJOB_LOAD */ } iojob; -/* When diskstore is enabled and a flush operation is requested we push - * one of this structures into server.cache_flush_queue. */ -typedef struct dirtykey { +/* IO operations scheduled -- check dscache.c for more info */ +typedef struct ioop { + int type; redisDb *db; robj *key; time_t ctime; /* This is the creation time of the entry. */ -} dirtykey; +} ioop; /* Structure to hold list iteration abstraction. */ typedef struct { @@ -614,6 +754,7 @@ extern struct redisServer server; extern struct sharedObjectsStruct shared; extern dictType setDictType; extern dictType zsetDictType; +extern dictType clusterNodesDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; dictType hashDictType; @@ -621,13 +762,15 @@ dictType hashDictType; * Functions prototypes *----------------------------------------------------------------------------*/ +/* Utils */ +long long ustime(void); + /* networking.c -- Networking and Client related operations */ redisClient *createClient(int fd); void closeTimedoutClients(void); void freeClient(redisClient *c); void resetClient(redisClient *c); void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask); -void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask); void addReply(redisClient *c, robj *obj); void *addDeferredMultiBulkLength(redisClient *c); void setDeferredMultiBulkLength(redisClient *c, void *node, long length); @@ -649,6 +792,8 @@ void addReplyDouble(redisClient *c, double d); void addReplyLongLong(redisClient *c, long long ll); void addReplyMultiBulkLen(redisClient *c, long length); void *dupClientReplyValue(void *o); +void getClientsMaxBuffers(unsigned long *longest_output_list, + unsigned long *biggest_input_buffer); #ifdef __GNUC__ void addReplyErrorFormat(redisClient *c, const char *fmt, ...) @@ -724,6 +869,7 @@ int fwriteBulkString(FILE *fp, char *s, unsigned long len); int fwriteBulkDouble(FILE *fp, double d); int fwriteBulkLongLong(FILE *fp, long long l); int fwriteBulkObject(FILE *fp, robj *obj); +int fwriteBulkCount(FILE *fp, char prefix, int count); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); @@ -746,11 +892,13 @@ int rdbSaveObject(FILE *fp, robj *o); off_t rdbSavedObjectLen(robj *o); off_t rdbSavedObjectPages(robj *o); robj *rdbLoadObject(int type, FILE *fp); -void backgroundSaveDoneHandler(int statloc); -int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, time_t now); +void backgroundSaveDoneHandler(int exitcode, int bysignal); +int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, time_t expireitme, time_t now); int rdbLoadType(FILE *fp); time_t rdbLoadTime(FILE *fp); robj *rdbLoadStringObject(FILE *fp); +int rdbSaveType(FILE *fp, unsigned char type); +int rdbSaveLen(FILE *fp, uint32_t len); /* AOF persistence */ void flushAppendOnlyFile(void); @@ -760,7 +908,7 @@ int rewriteAppendOnlyFileBackground(void); int loadAppendOnlyFile(char *filename); void stopAppendOnly(void); int startAppendOnly(void); -void backgroundRewriteDoneHandler(int statloc); +void backgroundRewriteDoneHandler(int exitcode, int bysignal); /* Sorted sets data type */ zskiplist *zslCreate(void); @@ -770,7 +918,7 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj); /* Core functions */ void freeMemoryIfNeeded(void); int processCommand(redisClient *c); -void setupSigSegvAction(void); +void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommandByCString(char *s); void call(redisClient *c, struct redisCommand *cmd); @@ -781,15 +929,18 @@ void updateDictResizePolicy(void); int htNeedsResize(dict *dict); void oom(const char *msg); void populateCommandTable(void); +void resetCommandTableStats(void); /* Disk store */ int dsOpen(void); int dsClose(void); -int dsSet(redisDb *db, robj *key, robj *val); +int dsSet(redisDb *db, robj *key, robj *val, time_t expire); robj *dsGet(redisDb *db, robj *key, time_t *expire); int dsDel(redisDb *db, robj *key); int dsExists(redisDb *db, robj *key); -int dsFlushDb(int dbid); +void dsFlushDb(int dbid); +int dsRdbSaveBackground(char *filename); +int dsRdbSave(char *filename); /* Disk Store Cache */ void dsInit(void); @@ -799,14 +950,20 @@ void unlockThreadedIO(void); void freeIOJob(iojob *j); void queueIOJob(iojob *j); void waitEmptyIOJobsQueue(void); -void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); -void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); +void processAllPendingIOJobs(void); int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); int dontWaitForSwappedKey(redisClient *c, robj *key); void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); int cacheFreeOneEntry(void); -void cacheScheduleForFlush(redisDb *db, robj *key); +void cacheScheduleIOAddFlag(redisDb *db, robj *key, long flag); +void cacheScheduleIODelFlag(redisDb *db, robj *key, long flag); +int cacheScheduleIOGetFlags(redisDb *db, robj *key); +void cacheScheduleIO(redisDb *db, robj *key, int type); void cacheCron(void); +int cacheKeyMayExist(redisDb *db, robj *key); +void cacheSetKeyMayExist(redisDb *db, robj *key); +void cacheSetKeyDoesNotExist(redisDb *db, robj *key); +void cacheForcePointInTime(void); /* Set data type */ robj *setTypeCreate(robj *value); @@ -880,6 +1037,24 @@ int selectDb(redisClient *c, int id); void signalModifiedKey(redisDb *db, robj *key); void signalFlushedDb(int dbid); +/* API to get key arguments from commands */ +#define REDIS_GETKEYS_ALL 0 +#define REDIS_GETKEYS_PRELOAD 1 +int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags); +void getKeysFreeResult(int *result); +int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags); +int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags); +int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags); + +/* Cluster */ +void clusterInit(void); +unsigned short crc16(const char *buf, int len); +unsigned int keyHashSlot(char *key, int keylen); +clusterNode *createClusterNode(char *nodename, int flags); +int clusterAddNode(clusterNode *node); +void clusterCron(void); +clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot); + /* Git SHA1 */ char *redisGitSHA1(void); char *redisGitDirty(void); @@ -1002,6 +1177,10 @@ void punsubscribeCommand(redisClient *c); void publishCommand(redisClient *c); void watchCommand(redisClient *c); void unwatchCommand(redisClient *c); +void clusterCommand(redisClient *c); +void restoreCommand(redisClient *c); +void migrateCommand(redisClient *c); +void dumpCommand(redisClient *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated));