X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/633a941028ad60bfad1bdeee95edaeba6109304f..d4d3a70da2c9be4c5aa67a0be735568dbe436568:/src/redis.h?ds=sidebyside diff --git a/src/redis.h b/src/redis.h index cdddb601..32dcc359 100644 --- a/src/redis.h +++ b/src/redis.h @@ -18,6 +18,7 @@ #include #include #include +#include #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ @@ -70,10 +71,12 @@ #define REDIS_ZSET 3 #define REDIS_HASH 4 #define REDIS_VMPOINTER 8 + /* Object types only used for persistence in .rdb files */ #define REDIS_HASH_ZIPMAP 9 #define REDIS_LIST_ZIPLIST 10 #define REDIS_SET_INTSET 11 +#define REDIS_ZSET_ZIPLIST 12 /* Objects encoding. Some kind of objects like Strings and Hashes can be * internally represented in multiple ways. The 'encoding' field of the object @@ -85,6 +88,7 @@ #define REDIS_ENCODING_LINKEDLIST 4 /* Encoded as regular linked list */ #define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */ #define REDIS_ENCODING_INTSET 6 /* Encoded as intset */ +#define REDIS_ENCODING_SKIPLIST 7 /* Encoded as skiplist */ /* Object types only used for dumping to disk */ #define REDIS_EXPIRETIME 253 @@ -195,6 +199,8 @@ #define REDIS_LIST_MAX_ZIPLIST_ENTRIES 512 #define REDIS_LIST_MAX_ZIPLIST_VALUE 64 #define REDIS_SET_MAX_INTSET_ENTRIES 512 +#define REDIS_ZSET_MAX_ZIPLIST_ENTRIES 128 +#define REDIS_ZSET_MAX_ZIPLIST_VALUE 64 /* Sets operations codes */ #define REDIS_OP_UNION 0 @@ -360,7 +366,124 @@ struct sharedObjectsStruct { *integers[REDIS_SHARED_INTEGERS]; }; -/* Global server state structure */ +/*----------------------------------------------------------------------------- + * Redis cluster data structures + *----------------------------------------------------------------------------*/ + +#define REDIS_CLUSTER_SLOTS 4096 +#define REDIS_CLUSTER_OK 0 /* Everything looks ok */ +#define REDIS_CLUSTER_FAIL 1 /* The cluster can't work */ +#define REDIS_CLUSTER_NEEDHELP 2 /* The cluster works, but needs some help */ +#define REDIS_CLUSTER_NAMELEN 40 /* sha1 hex length */ +#define REDIS_CLUSTER_PORT_INCR 10000 /* Cluster port = baseport + PORT_INCR */ + +struct clusterNode; + +/* clusterLink encapsulates everything needed to talk with a remote node. */ +typedef struct clusterLink { + int fd; /* TCP socket file descriptor */ + sds sndbuf; /* Packet send buffer */ + sds rcvbuf; /* Packet reception buffer */ + struct clusterNode *node; /* Node related to this link if any, or NULL */ +} clusterLink; + +/* Node flags */ +#define REDIS_NODE_MASTER 1 /* The node is a master */ +#define REDIS_NODE_SLAVE 2 /* The node is a slave */ +#define REDIS_NODE_PFAIL 4 /* Failure? Need acknowledge */ +#define REDIS_NODE_FAIL 8 /* The node is believed to be malfunctioning */ +#define REDIS_NODE_MYSELF 16 /* This node is myself */ +#define REDIS_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */ +#define REDIS_NODE_NOADDR 64 /* We don't know the address of this node */ +#define REDIS_NODE_MEET 128 /* Send a MEET message to this node */ +#define REDIS_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" + +struct clusterNode { + char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ + int flags; /* REDIS_NODE_... */ + unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */ + int numslaves; /* Number of slave nodes, if this is a master */ + struct clusterNode **slaves; /* pointers to slave nodes */ + struct clusterNode *slaveof; /* pointer to the master node */ + time_t ping_sent; /* Unix time we sent latest ping */ + time_t pong_received; /* Unix time we received the pong */ + char *configdigest; /* Configuration digest of this node */ + time_t configdigest_ts; /* Configuration digest timestamp */ + char ip[16]; /* Latest known IP address of this node */ + int port; /* Latest known port of this node */ + clusterLink *link; /* TCP/IP link with this node */ +}; +typedef struct clusterNode clusterNode; + +typedef struct { + char *configfile; + clusterNode *myself; /* This node */ + int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */ + int node_timeout; + dict *nodes; /* Hash table of name -> clusterNode structures */ + clusterNode *migrating_slots_to[REDIS_CLUSTER_SLOTS]; + clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS]; + clusterNode *slots[REDIS_CLUSTER_SLOTS]; +} clusterState; + +/* Redis cluster messages header */ + +/* Note that the PING, PONG and MEET messages are actually the same exact + * kind of packet. PONG is the reply to ping, in the extact format as a PING, + * while MEET is a special PING that forces the receiver to add the sender + * as a node (if it is not already in the list). */ +#define CLUSTERMSG_TYPE_PING 0 /* Ping */ +#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */ +#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */ +#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */ + +/* Initially we don't know our "name", but we'll find it once we connect + * to the first node, using the getsockname() function. Then we'll use this + * address for all the next messages. */ +typedef struct { + char nodename[REDIS_CLUSTER_NAMELEN]; + uint32_t ping_sent; + uint32_t pong_received; + char ip[16]; /* IP address last time it was seen */ + uint16_t port; /* port last time it was seen */ + uint16_t flags; + uint32_t notused; /* for 64 bit alignment */ +} clusterMsgDataGossip; + +typedef struct { + char nodename[REDIS_CLUSTER_NAMELEN]; +} clusterMsgDataFail; + +union clusterMsgData { + /* PING, MEET and PONG */ + struct { + /* Array of N clusterMsgDataGossip structures */ + clusterMsgDataGossip gossip[1]; + } ping; + /* FAIL */ + struct { + clusterMsgDataFail about; + } fail; +}; + +typedef struct { + uint32_t totlen; /* Total length of this message */ + uint16_t type; /* Message type */ + uint16_t count; /* Only used for some kind of messages. */ + char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */ + unsigned char myslots[REDIS_CLUSTER_SLOTS/8]; + char slaveof[REDIS_CLUSTER_NAMELEN]; + char configdigest[32]; + uint16_t port; /* Sender TCP base port */ + unsigned char state; /* Cluster state from the POV of the sender */ + unsigned char notused[5]; /* Reserved for future use. For alignment. */ + union clusterMsgData data; +} clusterMsg; + +/*----------------------------------------------------------------------------- + * Global server state + *----------------------------------------------------------------------------*/ + struct redisServer { /* General */ pthread_t mainthread; @@ -373,6 +496,7 @@ struct redisServer { char *unixsocket; int ipfd; int sofd; + int cfd; list *clients; list *slaves, *monitors; char neterr[ANET_ERR_LEN]; @@ -468,6 +592,8 @@ struct redisServer { size_t list_max_ziplist_entries; size_t list_max_ziplist_value; size_t set_max_intset_entries; + size_t zset_max_ziplist_entries; + size_t zset_max_ziplist_value; time_t unixtime; /* Unix time sampled every second. */ /* Virtual memory I/O threads stuff */ /* An I/O thread process an element taken from the io_jobs queue and @@ -499,6 +625,8 @@ struct redisServer { /* Misc */ unsigned lruclock:22; /* clock incrementing every minute, for LRU */ unsigned lruclock_padding:10; + int cluster_enabled; + clusterState cluster; }; typedef struct pubsubPattern { @@ -507,20 +635,19 @@ typedef struct pubsubPattern { } pubsubPattern; typedef void redisCommandProc(redisClient *c); -typedef void redisVmPreloadProc(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); +typedef int *redisGetKeysProc(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags); struct redisCommand { char *name; redisCommandProc *proc; int arity; int flags; - /* Use a function to determine which keys need to be loaded - * in the background prior to executing this command. Takes precedence - * over vm_firstkey and others, ignored when NULL */ - redisVmPreloadProc *vm_preload_proc; + /* Use a function to determine keys arguments in a command line. + * Used both for diskstore preloading and Redis Cluster. */ + redisGetKeysProc *getkeys_proc; /* What keys should be loaded in background when calling this command? */ - int vm_firstkey; /* The first argument that's a key (0 = no keys) */ - int vm_lastkey; /* THe last argument that's a key */ - int vm_keystep; /* The step between first and last key */ + int firstkey; /* The first argument that's a key (0 = no keys) */ + int lastkey; /* THe last argument that's a key */ + int keystep; /* The step between first and last key */ long long microseconds, calls; }; @@ -634,6 +761,7 @@ extern struct redisServer server; extern struct sharedObjectsStruct shared; extern dictType setDictType; extern dictType zsetDictType; +extern dictType clusterNodesDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; dictType hashDictType; @@ -730,6 +858,7 @@ robj *createSetObject(void); robj *createIntsetObject(void); robj *createHashObject(void); robj *createZsetObject(void); +robj *createZsetZiplistObject(void); int getLongFromObjectOrReply(redisClient *c, robj *o, long *target, const char *msg); int checkType(redisClient *c, robj *o, int type); int getLongLongFromObjectOrReply(redisClient *c, robj *o, long long *target, const char *msg); @@ -748,6 +877,7 @@ int fwriteBulkString(FILE *fp, char *s, unsigned long len); int fwriteBulkDouble(FILE *fp, double d); int fwriteBulkLongLong(FILE *fp, long long l); int fwriteBulkObject(FILE *fp, robj *obj); +int fwriteBulkCount(FILE *fp, char prefix, int count); /* Replication */ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); @@ -792,6 +922,12 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal); zskiplist *zslCreate(void); void zslFree(zskiplist *zsl); zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj); +unsigned char *zzlInsert(unsigned char *zl, robj *ele, double score); +double zzlGetScore(unsigned char *sptr); +void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); +void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); +unsigned int zsetLength(robj *zobj); +void zsetConvert(robj *zobj, int encoding); /* Core functions */ void freeMemoryIfNeeded(void); @@ -829,8 +965,6 @@ void freeIOJob(iojob *j); void queueIOJob(iojob *j); void waitEmptyIOJobsQueue(void); void processAllPendingIOJobs(void); -void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); -void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv); int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd); int dontWaitForSwappedKey(redisClient *c, robj *key); void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key); @@ -887,6 +1021,8 @@ int stringmatchlen(const char *pattern, int patternLen, int stringmatch(const char *pattern, const char *string, int nocase); long long memtoll(const char *p, int *err); int ll2string(char *s, size_t len, long long value); +int string2ll(char *s, size_t len, long long *value); +int d2string(char *s, size_t len, double value); int isStringRepresentableAsLong(sds s, long *longval); int isStringRepresentableAsLongLong(sds s, long long *longval); int isObjectRepresentableAsLongLong(robj *o, long long *llongval); @@ -917,6 +1053,24 @@ int selectDb(redisClient *c, int id); void signalModifiedKey(redisDb *db, robj *key); void signalFlushedDb(int dbid); +/* API to get key arguments from commands */ +#define REDIS_GETKEYS_ALL 0 +#define REDIS_GETKEYS_PRELOAD 1 +int *getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, int *numkeys, int flags); +void getKeysFreeResult(int *result); +int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags); +int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags); +int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags); + +/* Cluster */ +void clusterInit(void); +unsigned short crc16(const char *buf, int len); +unsigned int keyHashSlot(char *key, int keylen); +clusterNode *createClusterNode(char *nodename, int flags); +int clusterAddNode(clusterNode *node); +void clusterCron(void); +clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot); + /* Git SHA1 */ char *redisGitSHA1(void); char *redisGitDirty(void); @@ -1039,6 +1193,11 @@ void punsubscribeCommand(redisClient *c); void publishCommand(redisClient *c); void watchCommand(redisClient *c); void unwatchCommand(redisClient *c); +void clusterCommand(redisClient *c); +void restoreCommand(redisClient *c); +void migrateCommand(redisClient *c); +void dumpCommand(redisClient *c); +void objectCommand(redisClient *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated));