From 6b5daa2df2a0711a25746cb025927dc3deb7717e Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Jul 2012 12:54:52 +0200 Subject: [PATCH] First implementation of Redis Sentinel. This commit implements the first, beta quality implementation of Redis Sentinel, a distributed monitoring system for Redis with notification and automatic failover capabilities. More info at http://redis.io/topics/sentinel --- .gitignore | 2 + sentinel.conf | 41 + src/Makefile | 25 +- src/anet.c | 15 + src/config.c | 11 + src/redis.c | 107 ++- src/redis.h | 18 + src/sentinel.c | 2439 ++++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 2614 insertions(+), 44 deletions(-) create mode 100644 sentinel.conf create mode 100644 src/sentinel.c diff --git a/.gitignore b/.gitignore index 5f262c46..c94c5ac9 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.log redis-cli redis-server +redis-sentinel redis-benchmark redis-check-dump redis-check-aof @@ -24,3 +25,4 @@ deps/lua/src/luac deps/lua/src/liblua.a .make-* .prerequisites +*.dSYM diff --git a/sentinel.conf b/sentinel.conf new file mode 100644 index 00000000..0e7a9542 --- /dev/null +++ b/sentinel.conf @@ -0,0 +1,41 @@ +# Example sentienl.conf + +# sentinel monitor quorum. Tells Sentinel to monitor this +# slave, and to consider it in O_DOWN (Objectively Down) state only if at +# least two sentinels agree. +# +# Note: master name should not include special characters or spaces. +# The valid charset is A-z 0-9 and the three characters ".-_". +sentinel monitor mymaster 127.0.0.1 6379 2 + +# Number of milliseconds the master (or any attached slave or sentinel) should +# be unreachable (as in, not acceptable reply to PING, continuously, for the +# specified period) in order to consider it in S_DOWN state (Subjectively +# Down). +# +# Default is 30 seconds. +sentinel down-after-milliseconds mymaster 30000 + +# Specify if this Sentinel can start the failover for this master. +sentinel can-failover mymaster yes + +# How many slaves we can reconfigure to point to the new slave simultaneously +# during the failover. Use a low number if you use the slaves to serve query +# to avoid that all the slaves will be unreachable at about the same +# time while performing the synchronization with the master. +sentinel parallel-syncs mymaster 1 + +# Specifies the failover timeout in milliseconds. When this time has elapsed +# without any progress in the failover process, it is considered concluded by +# the sentinel even if not all the attached slaves were correctly configured +# to replicate with the new master (however a "best effort" SLAVEOF command +# is sent to all the slaves before). +# +# Also when 25% of this time has elapsed without any advancement, and there +# is a leader switch (the sentinel did not started the failover but is now +# elected as leader), the sentinel will continue the failover doing a +# "takeover". +# +# Default is 15 minutes. +sentinel failover-timeout mymaster 900000 + diff --git a/src/Makefile b/src/Makefile index bffdb30e..358b4cba 100644 --- a/src/Makefile +++ b/src/Makefile @@ -78,6 +78,7 @@ endif REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS) REDIS_LD=$(QUIET_LINK)$(CC) $(FINAL_LDFLAGS) +REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL) PREFIX?=/usr/local INSTALL_BIN= $(PREFIX)/bin @@ -93,10 +94,12 @@ ENDCOLOR="\033[0m" ifndef V QUIET_CC = @printf ' %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR) 1>&2; QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) 1>&2; +QUIET_INSTALL = @printf ' %b %b\n' $(LINKCOLOR)INSTALL$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) 1>&2; endif REDIS_SERVER_NAME= redis-server -REDIS_SERVER_OBJ= adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o +REDIS_SENTINEL_NAME= redis-sentinel +REDIS_SERVER_OBJ= adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o REDIS_CLI_NAME= redis-cli REDIS_CLI_OBJ= anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o REDIS_BENCHMARK_NAME= redis-benchmark @@ -106,7 +109,7 @@ REDIS_CHECK_DUMP_OBJ= redis-check-dump.o lzf_c.o lzf_d.o crc64.o REDIS_CHECK_AOF_NAME= redis-check-aof REDIS_CHECK_AOF_OBJ= redis-check-aof.o -all: $(REDIS_SERVER_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME) +all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME) @echo "" @echo "Hint: To run 'make test' is a good idea ;)" @echo "" @@ -151,7 +154,11 @@ endif # redis-server $(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) - $(REDIS_LD) -o $@ $^ ../deps/lua/src/liblua.a $(FINAL_LIBS) + $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a $(FINAL_LIBS) + +# redis-sentinel +$(REDIS_SENTINEL_NAME): $(REDIS_SERVER_NAME) + $(REDIS_INSTALL) $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) # redis-cli $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ) @@ -176,7 +183,7 @@ $(REDIS_CHECK_AOF_NAME): $(REDIS_CHECK_AOF_OBJ) $(REDIS_CC) -c $< clean: - rm -rf $(REDIS_SERVER_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html + rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html .PHONY: clean @@ -217,8 +224,8 @@ src/help.h: install: all mkdir -p $(INSTALL_BIN) - $(INSTALL) $(REDIS_SERVER_NAME) $(INSTALL_BIN) - $(INSTALL) $(REDIS_BENCHMARK_NAME) $(INSTALL_BIN) - $(INSTALL) $(REDIS_CLI_NAME) $(INSTALL_BIN) - $(INSTALL) $(REDIS_CHECK_DUMP_NAME) $(INSTALL_BIN) - $(INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN) + $(REDIS_INSTALL) $(REDIS_SERVER_NAME) $(INSTALL_BIN) + $(REDIS_INSTALL) $(REDIS_BENCHMARK_NAME) $(INSTALL_BIN) + $(REDIS_INSTALL) $(REDIS_CLI_NAME) $(INSTALL_BIN) + $(REDIS_INSTALL) $(REDIS_CHECK_DUMP_NAME) $(INSTALL_BIN) + $(REDIS_INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN) diff --git a/src/anet.c b/src/anet.c index 434d945c..4b52425c 100644 --- a/src/anet.c +++ b/src/anet.c @@ -367,3 +367,18 @@ int anetPeerToString(int fd, char *ip, int *port) { if (port) *port = ntohs(sa.sin_port); return 0; } + +int anetSockName(int fd, char *ip, int *port) { + struct sockaddr_in sa; + socklen_t salen = sizeof(sa); + + if (getsockname(fd,(struct sockaddr*)&sa,&salen) == -1) { + *port = 0; + ip[0] = '?'; + ip[1] = '\0'; + return -1; + } + if (ip) strcpy(ip,inet_ntoa(sa.sin_addr)); + if (port) *port = ntohs(sa.sin_port); + return 0; +} diff --git a/src/config.c b/src/config.c index cf614008..9e247d66 100644 --- a/src/config.c +++ b/src/config.c @@ -354,6 +354,17 @@ void loadServerConfigFromString(char *config) { if ((server.stop_writes_on_bgsave_err = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } + } else if (!strcasecmp(argv[0],"sentinel")) { + /* argc == 1 is handled by main() as we need to enter the sentinel + * mode ASAP. */ + if (argc != 1) { + if (!server.sentinel_mode) { + err = "sentinel directive while not in sentinel mode"; + goto loaderr; + } + err = sentinelHandleConfiguration(argv+1,argc-1); + if (err) goto loaderr; + } } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } diff --git a/src/redis.c b/src/redis.c index b6490d57..248693b6 100644 --- a/src/redis.c +++ b/src/redis.c @@ -821,13 +821,8 @@ void clientsCron(void) { * a macro is used: run_with_period(milliseconds) { .... } */ -/* Using the following macro you can run code inside serverCron() with the - * specified period, specified in milliseconds. - * The actual resolution depends on REDIS_HZ. */ -#define run_with_period(_ms_) if (!(loops % ((_ms_)/(1000/REDIS_HZ)))) - int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { - int j, loops = server.cronloops; + int j; REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(id); REDIS_NOTUSED(clientData); @@ -896,11 +891,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } /* Show information about connected clients */ - run_with_period(5000) { - redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use", - listLength(server.clients)-listLength(server.slaves), - listLength(server.slaves), - zmalloc_used_memory()); + if (!server.sentinel_mode) { + run_with_period(5000) { + redisLog(REDIS_VERBOSE, + "%d clients connected (%d slaves), %zu bytes in use", + listLength(server.clients)-listLength(server.slaves), + listLength(server.slaves), + zmalloc_used_memory()); + } } /* We need to do a few operations on clients asynchronously. */ @@ -985,6 +983,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { if (server.cluster_enabled) clusterCron(); } + /* Run the sentinel timer if we are in sentinel mode. */ + run_with_period(100) { + if (server.sentinel_mode) sentinelTimer(); + } + server.cronloops++; return 1000/REDIS_HZ; } @@ -2444,21 +2447,26 @@ void usage() { fprintf(stderr," ./redis-server /etc/redis/6379.conf\n"); fprintf(stderr," ./redis-server --port 7777\n"); fprintf(stderr," ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n"); - fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n"); + fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n"); + fprintf(stderr,"Sentinel mode:\n"); + fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n"); exit(1); } void redisAsciiArt(void) { #include "asciilogo.h" char *buf = zmalloc(1024*16); + char *mode = "stand alone"; + + if (server.cluster_enabled) mode = "cluster"; + else if (server.sentinel_mode) mode = "sentinel"; snprintf(buf,1024*16,ascii_logo, REDIS_VERSION, redisGitSHA1(), strtol(redisGitDirty(),NULL,10) > 0, (sizeof(long) == 8) ? "64" : "32", - server.cluster_enabled ? "cluster" : "stand alone", - server.port, + mode, server.port, (long) getpid() ); redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf); @@ -2496,8 +2504,35 @@ void setupSignalHandlers(void) { void memtest(size_t megabytes, int passes); +/* Returns 1 if there is --sentinel among the arguments or if + * argv[0] is exactly "redis-sentinel". */ +int checkForSentinelMode(int argc, char **argv) { + int j; + + if (strstr(argv[0],"redis-sentinel") != NULL) return 1; + for (j = 1; j < argc; j++) + if (!strcmp(argv[j],"--sentinel")) return 1; + return 0; +} + +/* Function called at startup to load RDB or AOF file in memory. */ +void loadDataFromDisk(void) { + long long start = ustime(); + if (server.aof_state == REDIS_AOF_ON) { + if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK) + redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); + } else { + if (rdbLoad(server.rdb_filename) == REDIS_OK) { + redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds", + (float)(ustime()-start)/1000000); + } else if (errno != ENOENT) { + redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting."); + exit(1); + } + } +} + int main(int argc, char **argv) { - long long start; struct timeval tv; /* We need to initialize our libraries, and the server configuration. */ @@ -2505,8 +2540,17 @@ int main(int argc, char **argv) { srand(time(NULL)^getpid()); gettimeofday(&tv,NULL); dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid()); + server.sentinel_mode = checkForSentinelMode(argc,argv); initServerConfig(); + /* We need to init sentinel right now as parsing the configuration file + * in sentinel mode will have the effect of populating the sentinel + * data structures with master nodes to monitor. */ + if (server.sentinel_mode) { + initSentinelConfig(); + initSentinel(); + } + if (argc >= 2) { int j = 1; /* First option to parse in argv[] */ sds options = sdsempty(); @@ -2558,27 +2602,20 @@ int main(int argc, char **argv) { initServer(); if (server.daemonize) createPidFile(); redisAsciiArt(); - redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION); -#ifdef __linux__ - linuxOvercommitMemoryWarning(); -#endif - start = ustime(); - if (server.aof_state == REDIS_AOF_ON) { - if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK) - redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); - } else { - if (rdbLoad(server.rdb_filename) == REDIS_OK) { - redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds", - (float)(ustime()-start)/1000000); - } else if (errno != ENOENT) { - redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting."); - exit(1); - } + + if (!server.sentinel_mode) { + /* Things only needed when not runnign in Sentinel mode. */ + redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION); + #ifdef __linux__ + linuxOvercommitMemoryWarning(); + #endif + loadDataFromDisk(); + if (server.ipfd > 0) + redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); + if (server.sofd > 0) + redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); } - if (server.ipfd > 0) - redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); - if (server.sofd > 0) - redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); + aeSetBeforeSleepProc(server.el,beforeSleep); aeMain(server.el); aeDeleteEventLoop(server.el); diff --git a/src/redis.h b/src/redis.h index 33a676dc..bee8cf4f 100644 --- a/src/redis.h +++ b/src/redis.h @@ -257,6 +257,11 @@ #define REDIS_PROPAGATE_AOF 1 #define REDIS_PROPAGATE_REPL 2 +/* Using the following macro you can run code inside serverCron() with the + * specified period, specified in milliseconds. + * The actual resolution depends on REDIS_HZ. */ +#define run_with_period(_ms_) if (!(server.cronloops%((_ms_)/(1000/REDIS_HZ)))) + /* We can print the stacktrace, so our assert is defined this way: */ #define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1))) #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1))) @@ -579,6 +584,7 @@ struct redisServer { int arch_bits; /* 32 or 64 depending on sizeof(long) */ int cronloops; /* Number of times the cron function run */ char runid[REDIS_RUN_ID_SIZE+1]; /* ID always different at every exec. */ + int sentinel_mode; /* True if this instance is a Sentinel. */ /* Networking */ int port; /* TCP listening port */ char *bindaddr; /* Bind address or NULL */ @@ -1115,6 +1121,12 @@ void clusterCron(void); clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); void clusterPropagatePublish(robj *channel, robj *message); +/* Sentinel */ +void initSentinelConfig(void); +void initSentinel(void); +void sentinelTimer(void); +char *sentinelHandleConfiguration(char **argv, int argc); + /* Scripting */ void scriptingInit(void); @@ -1280,4 +1292,10 @@ void enableWatchdog(int period); void disableWatchdog(void); void watchdogScheduleSignal(int period); void redisLogHexDump(int level, char *descr, void *value, size_t len); + +#define redisDebug(fmt, ...) \ + printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__) +#define redisDebugMark() \ + printf("-- MARK %s:%d --\n", __FILE__, __LINE__) + #endif diff --git a/src/sentinel.c b/src/sentinel.c new file mode 100644 index 00000000..050fa404 --- /dev/null +++ b/src/sentinel.c @@ -0,0 +1,2439 @@ +/* Redis Sentinel implementation + * ----------------------------- + * + * Copyright (c) 2009-2012, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "redis.h" +#include "hiredis.h" +#include "async.h" + +#include +#include +#include + +#define REDIS_SENTINEL_PORT 26379 + +/* ======================== Sentinel global state =========================== */ + +typedef long long mstime_t; /* millisecond time type. */ + +/* Address object, used to describe an ip:port pair. */ +typedef struct sentinelAddr { + char *ip; + int port; +} sentinelAddr; + +/* A Sentinel Redis Instance object is monitoring. */ +#define SRI_MASTER (1<<0) +#define SRI_SLAVE (1<<1) +#define SRI_SENTINEL (1<<2) +#define SRI_DISCONNECTED (1<<3) +#define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */ +#define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */ +#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that + its master is down. */ +/* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are + * allowed to perform the failover for this master. + * When set in a SRI_SENTINEL instance means that sentinel is allowed to + * perform the failover on its master. */ +#define SRI_CAN_FAILOVER (1<<7) +#define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for + this master. */ +#define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */ +#define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */ +#define SRI_RECONF_SENT (1<<11) /* SLAVEOF sent. */ +#define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */ +#define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */ + +#define SENTINEL_INFO_PERIOD 10000 +#define SENTINEL_PING_PERIOD 1000 +#define SENTINEL_ASK_PERIOD 1000 +#define SENTINEL_PUBLISH_PERIOD 5000 +#define SENTINEL_DOWN_AFTER_PERIOD 30000 +#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello" +#define SENTINEL_TILT_TRIGGER 2000 +#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30) +#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100 +#define SENTINEL_PROMOTION_RETRY_PERIOD 30000 +#define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000 +#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1 +#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000 +#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000) +#define SENTINEL_MAX_PENDING_COMMANDS 100 +#define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10 + +/* How many milliseconds is an information valid? This applies for instance + * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */ +#define SENTINEL_INFO_VALIDITY_TIME 5000 +#define SENTINEL_FAILOVER_FIXED_DELAY 5000 +#define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000 + +/* Failover machine different states. */ +#define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */ +#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/ +#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */ +#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */ +#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */ +#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */ +#define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */ +#define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */ +#define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */ +#define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */ +#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */ + +#define SENTINEL_MASTER_LINK_STATUS_UP 0 +#define SENTINEL_MASTER_LINK_STATUS_DOWN 1 + +typedef struct sentinelRedisInstance { + int flags; /* See SRI_... defines */ + char *name; /* Master name from the point of view of this sentinel. */ + char *runid; /* run ID of this instance. */ + sentinelAddr *addr; /* Master host. */ + redisAsyncContext *cc; /* Hiredis context for commands. */ + redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */ + int pending_commands; /* Number of commands sent waiting for a reply. */ + mstime_t cc_conn_time; /* cc connection time. */ + mstime_t pc_conn_time; /* pc connection time. */ + mstime_t pc_last_activity; /* Last time we received any message. */ + mstime_t last_avail_time; /* Last time the instance replied to ping with + a reply we consider valid. */ + mstime_t last_pong_time; /* Last time the instance replied to ping, + whatever the reply was. That's used to check + if the link is idle and must be reconnected. */ + mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */ + mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time + we received an hello from this Sentinel + via Pub/Sub. */ + mstime_t last_master_down_reply_time; /* Time of last reply to + SENTINEL is-master-down command. */ + mstime_t s_down_since_time; /* Subjectively down since time. */ + mstime_t o_down_since_time; /* Objectively down since time. */ + mstime_t down_after_period; /* Consider it down after that period. */ + mstime_t info_refresh; /* Time at which we received INFO output from it. */ + + /* Master specific. */ + dict *sentinels; /* Other sentinels monitoring the same master. */ + dict *slaves; /* Slaves for this master instance. */ + int quorum; /* Number of sentinels that need to agree on failure. */ + int parallel_syncs; /* How many slaves to reconfigure at same time. */ + + /* Slave specific. */ + mstime_t master_link_down_time; /* Slave replication link down time. */ + int slave_priority; /* Slave priority according to its INFO output. */ + mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF */ + struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */ + char *slave_master_host; /* Master host as reported by INFO */ + int slave_master_port; /* Master port as reported by INFO */ + int slave_master_link_status; /* Master link status as reported by INFO */ + /* Failover */ + char *leader; /* If this is a master instance, this is the runid of + the Sentinel that should perform the failover. If + this is a Sentinel, this is the runid of the Sentinel + that this other Sentinel is voting as leader. + This field is valid only if SRI_MASTER_DOWN is + set on the Sentinel instance. */ + int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */ + mstime_t failover_state_change_time; + mstime_t failover_start_time; /* When to start to failover if leader. */ + mstime_t failover_timeout; /* Max time to refresh failover state. */ + struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */ + /* Scripts executed to notify admin or reconfigure clients: when they + * are set to NULL no script is executed. */ + char *notify_script; + char *client_reconfig_script; +} sentinelRedisInstance; + +/* Main state. */ +struct sentinelState { + dict *masters; /* Dictionary of master sentinelRedisInstances. + Key is the instance name, value is the + sentinelRedisInstance structure pointer. */ + int tilt; /* Are we in TILT mode? */ + mstime_t tilt_start_time; /* When TITL started. */ + mstime_t previous_time; /* Time last time we ran the time handler. */ +} sentinel; + +/* ======================= hiredis ae.c adapters ============================= + * Note: this implementation is taken from hiredis/adapters/ae.h, however + * we have our modified copy for Sentinel in order to use our allocator + * and to have full control over how the adapter works. */ + +typedef struct redisAeEvents { + redisAsyncContext *context; + aeEventLoop *loop; + int fd; + int reading, writing; +} redisAeEvents; + +static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) { + ((void)el); ((void)fd); ((void)mask); + + redisAeEvents *e = (redisAeEvents*)privdata; + redisAsyncHandleRead(e->context); +} + +static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) { + ((void)el); ((void)fd); ((void)mask); + + redisAeEvents *e = (redisAeEvents*)privdata; + redisAsyncHandleWrite(e->context); +} + +static void redisAeAddRead(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (!e->reading) { + e->reading = 1; + aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e); + } +} + +static void redisAeDelRead(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (e->reading) { + e->reading = 0; + aeDeleteFileEvent(loop,e->fd,AE_READABLE); + } +} + +static void redisAeAddWrite(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (!e->writing) { + e->writing = 1; + aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e); + } +} + +static void redisAeDelWrite(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + aeEventLoop *loop = e->loop; + if (e->writing) { + e->writing = 0; + aeDeleteFileEvent(loop,e->fd,AE_WRITABLE); + } +} + +static void redisAeCleanup(void *privdata) { + redisAeEvents *e = (redisAeEvents*)privdata; + redisAeDelRead(privdata); + redisAeDelWrite(privdata); + zfree(e); +} + +static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisAeEvents *e; + + /* Nothing should be attached when something is already attached */ + if (ac->ev.data != NULL) + return REDIS_ERR; + + /* Create container for context and r/w events */ + e = (redisAeEvents*)zmalloc(sizeof(*e)); + e->context = ac; + e->loop = loop; + e->fd = c->fd; + e->reading = e->writing = 0; + + /* Register functions to start/stop listening for events */ + ac->ev.addRead = redisAeAddRead; + ac->ev.delRead = redisAeDelRead; + ac->ev.addWrite = redisAeAddWrite; + ac->ev.delWrite = redisAeDelWrite; + ac->ev.cleanup = redisAeCleanup; + ac->ev.data = e; + + return REDIS_OK; +} + +/* ============================= Prototypes ================================= */ + +void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status); +void sentinelDisconnectCallback(const redisAsyncContext *c, int status); +void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata); +sentinelRedisInstance *sentinelGetMasterByName(char *name); +char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master); +char *sentinelGetObjectiveLeader(sentinelRedisInstance *master); +int yesnotoi(char *s); +void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c); +const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri); + +/* ========================= Dictionary types =============================== */ + +unsigned int dictSdsHash(const void *key); +int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2); +void releaseSentinelRedisInstance(sentinelRedisInstance *ri); + +void dictInstancesValDestructor (void *privdata, void *obj) { + releaseSentinelRedisInstance(obj); +} + +/* Instance name (sds) -> instance (sentinelRedisInstance pointer) + * + * also used for: sentinelRedisInstance->sentinels dictionary that maps + * sentinels ip:port to last seen time in Pub/Sub hello message. */ +dictType instancesDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + dictInstancesValDestructor /* val destructor */ +}; + +/* Instance runid (sds) -> votes (long casted to void*) + * + * This is useful into sentinelGetObjectiveLeader() function in order to + * count the votes and understand who is the leader. */ +dictType leaderVotesDictType = { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + NULL, /* key destructor */ + NULL /* val destructor */ +}; + +/* =========================== Initialization =============================== */ + +void sentinelCommand(redisClient *c); + +struct redisCommand sentinelcmds[] = { + {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0}, + {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0}, + {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0}, + {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}, + {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0}, + {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0} +}; + +/* This function overwrites a few normal Redis config default with Sentinel + * specific defaults. */ +void initSentinelConfig(void) { + server.port = REDIS_SENTINEL_PORT; +} + +/* Perform the Sentinel mode initialization. */ +void initSentinel(void) { + int j; + + /* Remove usual Redis commands from the command table, then just add + * the SENTINEL command. */ + dictEmpty(server.commands); + for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) { + int retval; + struct redisCommand *cmd = sentinelcmds+j; + + retval = dictAdd(server.commands, sdsnew(cmd->name), cmd); + redisAssert(retval == DICT_OK); + } + + /* Initialize various data structures. */ + sentinel.masters = dictCreate(&instancesDictType,NULL); + sentinel.tilt = 0; + sentinel.tilt_start_time = mstime(); + sentinel.previous_time = mstime(); +} + +/* ============================== sentinelAddr ============================== */ + +/* Create a sentinelAddr object and return it on success. + * On error NULL is returned and errno is set to: + * ENOENT: Can't resolve the hostname. + * EINVAL: Invalid port number. + */ +sentinelAddr *createSentinelAddr(char *hostname, int port) { + char buf[32]; + sentinelAddr *sa; + + if (port <= 0 || port > 65535) { + errno = EINVAL; + return NULL; + } + if (anetResolve(NULL,hostname,buf) == ANET_ERR) { + errno = ENOENT; + return NULL; + } + sa = zmalloc(sizeof(*sa)); + sa->ip = sdsnew(buf); + sa->port = port; + return sa; +} + +/* Free a Sentinel address. Can't fail. */ +void releaseSentinelAddr(sentinelAddr *sa) { + sdsfree(sa->ip); + zfree(sa); +} + +/* =========================== Events notification ========================== */ + +void sentinelCallNotificationScript(char *scriptpath, char *type, char *msg) { + /* TODO: implement it. */ +} + +/* Send an event to log, pub/sub, user notification script. + * + * 'level' is the log level for logging. Only REDIS_WARNING events will trigger + * the execution of the user notification script. + * + * 'type' is the message type, also used as a pub/sub channel name. + * + * 'ri', is the redis instance target of this event if applicable, and is + * used to obtain the path of the notification script to execute. + * + * The remaining arguments are printf-alike. + * If the format specifier starts with the two characters "%@" then ri is + * not NULL, and the message is prefixed with an instance identifier in the + * following format: + * + * + * + * If the instance type is not master, than the additional string is + * added to specify the originating master: + * + * @ + * + * Any other specifier after "%@" is processed by printf itself. + */ +void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, + const char *fmt, ...) { + va_list ap; + char msg[REDIS_MAX_LOGMSG_LEN]; + robj *channel, *payload; + + /* Handle %@ */ + if (fmt[0] == '%' && fmt[1] == '@') { + sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? + NULL : ri->master; + + if (master) { + snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d", + sentinelRedisInstanceTypeStr(ri), + ri->name, ri->addr->ip, ri->addr->port, + master->name, master->addr->ip, master->addr->port); + } else { + snprintf(msg, sizeof(msg), "%s %s %s %d", + sentinelRedisInstanceTypeStr(ri), + ri->name, ri->addr->ip, ri->addr->port); + } + fmt += 2; + } else { + msg[0] = '\0'; + } + + /* Use vsprintf for the rest of the formatting if any. */ + if (fmt[0] != '\0') { + va_start(ap, fmt); + vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap); + va_end(ap); + } + + /* Log the message if the log level allows it to be logged. */ + if (level >= server.verbosity) + redisLog(level,"%s %s",type,msg); + + /* Publish the message via Pub/Sub if it's not a debugging one. */ + if (level != REDIS_DEBUG) { + channel = createStringObject(type,strlen(type)); + payload = createStringObject(msg,strlen(msg)); + pubsubPublishMessage(channel,payload); + decrRefCount(channel); + decrRefCount(payload); + } + + /* Call the notification script if applicable. */ + if (level == REDIS_WARNING && ri != NULL) { + sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? + ri : ri->master; + if (master->notify_script) { + sentinelCallNotificationScript(master->notify_script,type,msg); + } + } +} + +/* ========================== sentinelRedisInstance ========================= */ + +/* Create a redis instance, the following fields must be populated by the + * caller if needed: + * runid: set to NULL but will be populated once INFO output is received. + * info_refresh: is set to 0 to mean that we never received INFO so far. + * + * If SRI_MASTER is set into initial flags the instance is added to + * sentinel.masters table. + * + * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the + * instance is added into master->slaves or master->sentinels table. + * + * If the instance is a slave or sentinel, the name parameter is ignored and + * is created automatically as hostname:port. + * + * The function fails if hostname can't be resolved or port is out of range. + * When this happens NULL is returned and errno is set accordingly to the + * createSentinelAddr() function. + * + * The function may also fail and return NULL with errno set to EBUSY if + * a master or slave with the same name already exists. */ +sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) { + sentinelRedisInstance *ri; + sentinelAddr *addr; + dict *table; + char slavename[128], *sdsname; + + redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL)); + redisAssert((flags & SRI_MASTER) || master != NULL); + + /* Check address validity. */ + addr = createSentinelAddr(hostname,port); + if (addr == NULL) return NULL; + + /* For slaves and sentinel we use ip:port as name. */ + if (flags & (SRI_SLAVE|SRI_SENTINEL)) { + snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port); + name = slavename; + } + + /* Make sure the entry is not duplicated. This may happen when the same + * name for a master is used multiple times inside the configuration or + * if we try to add multiple times a slave or sentinel with same ip/port + * to a master. */ + if (flags & SRI_MASTER) table = sentinel.masters; + else if (flags & SRI_SLAVE) table = master->slaves; + else if (flags & SRI_SENTINEL) table = master->sentinels; + sdsname = sdsnew(name); + if (dictFind(table,sdsname)) { + sdsfree(sdsname); + errno = EBUSY; + return NULL; + } + + /* Create the instance object. */ + ri = zmalloc(sizeof(*ri)); + /* Note that all the instances are started in the disconnected state, + * the event loop will take care of connecting them. */ + ri->flags = flags | SRI_DISCONNECTED; + ri->name = sdsname; + ri->runid = NULL; + ri->addr = addr; + ri->cc = NULL; + ri->pc = NULL; + ri->pending_commands = 0; + ri->cc_conn_time = 0; + ri->pc_conn_time = 0; + ri->pc_last_activity = 0; + ri->last_avail_time = mstime(); + ri->last_pong_time = mstime(); + ri->last_pub_time = mstime(); + ri->last_hello_time = mstime(); + ri->last_master_down_reply_time = mstime(); + ri->s_down_since_time = 0; + ri->o_down_since_time = 0; + ri->down_after_period = master ? master->down_after_period : + SENTINEL_DOWN_AFTER_PERIOD; + ri->master_link_down_time = 0; + ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY; + ri->slave_reconf_sent_time = 0; + ri->slave_master_host = NULL; + ri->slave_master_port = 0; + ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN; + ri->sentinels = dictCreate(&instancesDictType,NULL); + ri->quorum = quorum; + ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS; + ri->master = master; + ri->slaves = dictCreate(&instancesDictType,NULL); + ri->info_refresh = 0; + + /* Failover state. */ + ri->leader = NULL; + ri->failover_state = SENTINEL_FAILOVER_STATE_NONE; + ri->failover_state_change_time = 0; + ri->failover_start_time = 0; + ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT; + ri->promoted_slave = NULL; + ri->notify_script = NULL; + ri->client_reconfig_script = NULL; + + /* Add into the right table. */ + dictAdd(table, ri->name, ri); + return ri; +} + +/* Release this instance and all its slaves, sentinels, hiredis connections. + * This function also takes care of unlinking the instance from the main + * masters table (if it is a master) or from its master sentinels/slaves table + * if it is a slave or sentinel. */ +void releaseSentinelRedisInstance(sentinelRedisInstance *ri) { + /* Release all its slaves or sentinels if any. */ + dictRelease(ri->sentinels); + dictRelease(ri->slaves); + + /* Release hiredis connections. Note that redisAsyncFree() will call + * the disconnection callback. */ + if (ri->cc) { + redisAsyncFree(ri->cc); + ri->cc = NULL; + } + if (ri->pc) { + redisAsyncFree(ri->pc); + ri->pc = NULL; + } + + /* Free other resources. */ + sdsfree(ri->name); + sdsfree(ri->runid); + sdsfree(ri->notify_script); + sdsfree(ri->client_reconfig_script); + sdsfree(ri->slave_master_host); + sdsfree(ri->leader); + releaseSentinelAddr(ri->addr); + + /* Clear state into the master if needed. */ + if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master) + ri->master->promoted_slave = NULL; + + zfree(ri); +} + +/* Lookup a slave in a master Redis instance, by ip and port. */ +sentinelRedisInstance *sentinelRedisInstanceLookupSlave( + sentinelRedisInstance *ri, char *ip, int port) +{ + sds key; + sentinelRedisInstance *slave; + + redisAssert(ri->flags & SRI_MASTER); + key = sdscatprintf(sdsempty(),"%s:%d",ip,port); + slave = dictFetchValue(ri->slaves,key); + sdsfree(key); + return slave; +} + +/* Return the name of the type of the instance as a string. */ +const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) { + if (ri->flags & SRI_MASTER) return "master"; + else if (ri->flags & SRI_SLAVE) return "slave"; + else if (ri->flags & SRI_SENTINEL) return "sentinel"; + else return "unknown"; +} + +/* This function removes all the instances found in the dictionary of instances + * 'd', having either: + * + * 1) The same ip/port as specified. + * 2) The same runid. + * + * "1" and "2" don't need to verify at the same time, just one is enough. + * If "runid" is NULL it is not checked. + * Similarly if "ip" is NULL it is not checked. + * + * This function is useful because every time we add a new Sentinel into + * a master's Sentinels dictionary, we want to be very sure about not + * having duplicated instances for any reason. This is so important because + * we use those other sentinels in order to run our quorum protocol to + * understand if it's time to proceeed with the fail over. + * + * Making sure no duplication is possible we greately improve the robustness + * of the quorum (otherwise we may end counting the same instance multiple + * times for some reason). + * + * The function returns the number of Sentinels removed. */ +int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) { + dictIterator *di; + dictEntry *de; + int removed = 0; + + di = dictGetSafeIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) || + (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port)) + { + dictDelete(master->sentinels,ri->name); + removed++; + } + } + dictReleaseIterator(di); + return removed; +} + +/* Search an instance with the same runid, ip and port into a dictionary + * of instances. Return NULL if not found, otherwise return the instance + * pointer. + * + * runid or ip can be NULL. In such a case the search is performed only + * by the non-NULL field. */ +sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) { + dictIterator *di; + dictEntry *de; + sentinelRedisInstance *instance = NULL; + + redisAssert(ip || runid); /* User must pass at least one search param. */ + di = dictGetIterator(instances); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + if (runid && !ri->runid) continue; + if ((runid == NULL || strcmp(ri->runid, runid) == 0) && + (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 && + ri->addr->port == port))) + { + instance = ri; + break; + } + } + dictReleaseIterator(di); + return instance; +} + +/* Simple master lookup by name */ +sentinelRedisInstance *sentinelGetMasterByName(char *name) { + sentinelRedisInstance *ri; + sds sdsname = sdsnew(name); + + ri = dictFetchValue(sentinel.masters,sdsname); + sdsfree(sdsname); + return ri; +} + +/* Add the specified flags to all the instances in the specified dictionary. */ +void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) { + dictIterator *di; + dictEntry *de; + + di = dictGetIterator(instances); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + ri->flags |= flags; + } + dictReleaseIterator(di); +} + +/* Remove the specified flags to all the instances in the specified + * dictionary. */ +void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) { + dictIterator *di; + dictEntry *de; + + di = dictGetIterator(instances); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + ri->flags &= ~flags; + } + dictReleaseIterator(di); +} + +/* Reset the state of a monitored master: + * 1) Remove all slaves. + * 2) Remove all sentinels. + * 3) Remove most of the flags resulting from runtime operations. + * 4) Reset timers to their default value. + * 5) In the process of doing this undo the failover if in progress. + * 6) Disconnect the connections with the master (will reconnect automatically). + */ +void sentinelResetMaster(sentinelRedisInstance *ri) { + redisAssert(ri->flags & SRI_MASTER); + dictRelease(ri->slaves); + dictRelease(ri->sentinels); + ri->slaves = dictCreate(&instancesDictType,NULL); + ri->sentinels = dictCreate(&instancesDictType,NULL); + if (ri->cc) redisAsyncFree(ri->cc); + if (ri->pc) redisAsyncFree(ri->pc); + ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED; + if (ri->leader) { + sdsfree(ri->leader); + ri->leader = NULL; + } + ri->failover_state = SENTINEL_FAILOVER_STATE_NONE; + ri->failover_state_change_time = 0; + ri->failover_start_time = 0; + ri->promoted_slave = NULL; + sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@"); +} + +/* Call sentinelResetMaster() on every master with a name matching the specified + * pattern. */ +int sentinelResetMastersByPattern(char *pattern) { + dictIterator *di; + dictEntry *de; + int reset = 0; + + di = dictGetIterator(sentinel.masters); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + if (ri->name) { + if (stringmatch(pattern,ri->name,0)) { + sentinelResetMaster(ri); + reset++; + } + } + } + dictReleaseIterator(di); + return reset; +} + +/* ============================ Config handling ============================= */ +char *sentinelHandleConfiguration(char **argv, int argc) { + sentinelRedisInstance *ri; + + if (!strcasecmp(argv[0],"monitor") && argc == 5) { + /* monitor */ + int quorum = atoi(argv[4]); + + if (quorum <= 0) return "Quorum must be 1 or greater."; + if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2], + atoi(argv[3]),quorum,NULL) == NULL) + { + switch(errno) { + case EBUSY: return "Duplicated master name."; + case ENOENT: return "Can't resolve master instance hostname."; + case EINVAL: return "Invalid port number"; + } + } + } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) { + /* down-after-milliseconds */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->down_after_period = atoi(argv[2]); + if (ri->down_after_period <= 0) + return "negative or zero time parameter."; + } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) { + /* failover-timeout */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->failover_timeout = atoi(argv[2]); + if (ri->failover_timeout <= 0) + return "negative or zero time parameter."; + } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) { + /* can-failover */ + int yesno = yesnotoi(argv[2]); + + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + if (yesno == -1) return "Argument must be either yes or no."; + if (yesno) + ri->flags |= SRI_CAN_FAILOVER; + else + ri->flags &= ~SRI_CAN_FAILOVER; + } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) { + /* parallel-syncs */ + ri = sentinelGetMasterByName(argv[1]); + if (!ri) return "No such master with specified name."; + ri->parallel_syncs = atoi(argv[2]); + } else { + return "Unrecognized sentinel configuration statement."; + } + return NULL; +} + +/* ====================== hiredis connection handling ======================= */ + +/* This function takes an hiredis context that is in an error condition + * and make sure to mark the instance as disconnected performing the + * cleanup needed. + * + * Note: we don't free the hiredis context as hiredis will do it for us + * for async conenctions. */ +void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) { + sentinelRedisInstance *ri = c->data; + int pubsub = (ri->pc == c); + + sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri, + "%@ #%s", c->errstr); + if (pubsub) + ri->pc = NULL; + else + ri->cc = NULL; + ri->flags |= SRI_DISCONNECTED; +} + +void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + sentinelDisconnectInstanceFromContext(c); + } else { + sentinelRedisInstance *ri = c->data; + int pubsub = (ri->pc == c); + + sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri, + "%@"); + } +} + +void sentinelDisconnectCallback(const redisAsyncContext *c, int status) { + sentinelDisconnectInstanceFromContext(c); +} + +/* Create the async connections for the specified instance if the instance + * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just + * one of the two links (commands and pub/sub) is missing. */ +void sentinelReconnectInstance(sentinelRedisInstance *ri) { + if (!(ri->flags & SRI_DISCONNECTED)) return; + + /* Commands connection. */ + if (ri->cc == NULL) { + ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port); + if (ri->cc->err) { + sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s", + ri->cc->errstr); + redisAsyncFree(ri->cc); + ri->cc = NULL; + } else { + ri->cc_conn_time = mstime(); + ri->cc->data = ri; + redisAeAttach(server.el,ri->cc); + redisAsyncSetConnectCallback(ri->cc, + sentinelLinkEstablishedCallback); + redisAsyncSetDisconnectCallback(ri->cc, + sentinelDisconnectCallback); + } + } + /* Pub / Sub */ + if ((ri->flags & SRI_MASTER) && ri->pc == NULL) { + ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port); + if (ri->pc->err) { + sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s", + ri->pc->errstr); + redisAsyncFree(ri->pc); + ri->pc = NULL; + } else { + int retval; + + ri->pc_conn_time = mstime(); + ri->pc->data = ri; + redisAeAttach(server.el,ri->pc); + redisAsyncSetConnectCallback(ri->pc, + sentinelLinkEstablishedCallback); + redisAsyncSetDisconnectCallback(ri->pc, + sentinelDisconnectCallback); + /* Now we subscribe to the Sentinels "Hello" channel. */ + retval = redisAsyncCommand(ri->pc, + sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s", + SENTINEL_HELLO_CHANNEL); + if (retval != REDIS_OK) { + /* If we can't subscribe, the Pub/Sub connection is useless + * and we can simply disconnect it and try again. */ + redisAsyncFree(ri->pc); + ri->pc = NULL; + return; + } + } + } + /* Clear the DISCONNECTED flags only if we have both the connections + * (or just the commands connection if this is a slave or a + * sentinel instance). */ + if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc)) + ri->flags &= ~SRI_DISCONNECTED; +} + +/* ======================== Redis instances pinging ======================== */ + +/* Process the INFO output from masters. */ +void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) { + sds *lines; + int numlines, j; + int role = 0; + + + /* The following fields must be reset to a given value in the case they + * are not found at all in the INFO output. */ + ri->master_link_down_time = 0; + + /* Process line by line. */ + lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines); + for (j = 0; j < numlines; j++) { + sentinelRedisInstance *slave; + sds l = lines[j]; + + /* run_id:<40 hex chars>*/ + if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) { + if (ri->runid == NULL) { + ri->runid = sdsnewlen(l+7,40); + } else { + /* TODO: check if run_id has changed. This means the + * instance has been restarted, we want to set a flag + * and notify this event. */ + } + } + + /* slave0:,, */ + if ((ri->flags & SRI_MASTER) && + sdslen(l) >= 7 && + !memcmp(l,"slave",5) && isdigit(l[5])) + { + char *ip, *port, *end; + + ip = strchr(l,':'); if (!ip) continue; + ip++; /* Now ip points to start of ip address. */ + port = strchr(ip,','); if (!port) continue; + *port = '\0'; /* nul term for easy access. */ + port++; /* Now port points to start of port number. */ + end = strchr(port,','); if (!end) continue; + *end = '\0'; /* nul term for easy access. */ + + /* Check if we already have this slave into our table, + * otherwise add it. */ + if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) { + if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip, + atoi(port), ri->quorum,ri)) != NULL) + { + sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@"); + } + } + } + + /* master_link_down_since_seconds: */ + if (sdslen(l) >= 32 && + !memcmp(l,"master_link_down_since_seconds",30)) + { + ri->master_link_down_time = strtoll(l+31,NULL,10)*1000; + } + + /* role: */ + if (!memcmp(l,"role:master",11)) role = SRI_MASTER; + else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE; + + if (role == SRI_SLAVE) { + /* master_host: */ + if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) { + sdsfree(ri->slave_master_host); + ri->slave_master_host = sdsnew(l+12); + } + + /* master_port: */ + if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) + ri->slave_master_port = atoi(l+12); + + /* master_link_status: */ + if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) { + ri->slave_master_link_status = + (strcasecmp(l+19,"up") == 0) ? + SENTINEL_MASTER_LINK_STATUS_UP : + SENTINEL_MASTER_LINK_STATUS_DOWN; + } + } + } + ri->info_refresh = mstime(); + sdsfreesplitres(lines,numlines); + + if (sentinel.tilt) return; + + /* Act if a slave turned into a master. */ + if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) { + if (ri->flags & SRI_PROMOTED) { + /* If this is a promoted slave we can change state to the + * failover state machine. */ + if (ri->master && + (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) && + (ri->master->flags & SRI_I_AM_THE_LEADER) && + (ri->master->failover_state == + SENTINEL_FAILOVER_STATE_WAIT_PROMOTION)) + { + ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES; + ri->master->failover_state_change_time = mstime(); + sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@"); + sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves", + ri->master,"%@"); + } + } else { + /* Otherwise we interpret this as the start of the failover. */ + if (ri->master && + (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) == 0) + { + ri->master->flags |= SRI_FAILOVER_IN_PROGRESS; + sentinelEvent(REDIS_WARNING,"failover-detected",ri->master,"%@"); + ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END; + ri->master->failover_state_change_time = mstime(); + ri->master->promoted_slave = ri; + ri->flags |= SRI_PROMOTED; + /* We are an observer, so we can only assume that the leader + * is reconfiguring the slave instances. For this reason we + * set all the instances as RECONF_SENT waiting for progresses + * on this side. */ + sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves, + SRI_RECONF_SENT); + } + } + } + + /* Detect if the slave that is in the process of being reconfigured + * changed state. */ + if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE && + (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))) + { + /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */ + if ((ri->flags & SRI_RECONF_SENT) && + ri->slave_master_host && + strcmp(ri->slave_master_host, + ri->master->promoted_slave->addr->ip) == 0 && + ri->slave_master_port == ri->master->promoted_slave->addr->port) + { + ri->flags &= ~SRI_RECONF_SENT; + ri->flags |= SRI_RECONF_INPROG; + sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@"); + } + + /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */ + if ((ri->flags & SRI_RECONF_INPROG) && + ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) + { + ri->flags &= ~SRI_RECONF_INPROG; + ri->flags |= SRI_RECONF_DONE; + sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@"); + /* If we are moving forward (a new slave is now configured) + * we update the change_time as we are conceptually passing + * to the next slave. */ + ri->failover_state_change_time = mstime(); + } + } +} + +void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { + sentinelRedisInstance *ri = c->data; + redisReply *r; + + ri->pending_commands--; + if (!reply) return; + r = reply; + + if (r->type == REDIS_REPLY_STRING) { + sentinelRefreshInstanceInfo(ri,r->str); + } +} + +/* Just discard the reply. We use this when we are not monitoring the return + * value of the command but its effects directly. */ +void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { + sentinelRedisInstance *ri = c->data; + + ri->pending_commands--; +} + +void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { + sentinelRedisInstance *ri = c->data; + redisReply *r; + + ri->pending_commands--; + if (!reply) return; + r = reply; + + if (r->type == REDIS_REPLY_STATUS || + r->type == REDIS_REPLY_ERROR) { + /* Update the "instance available" field only if this is an + * acceptable reply. */ + if (strncmp(r->str,"PONG",4) == 0 || + strncmp(r->str,"LOADING",7) == 0 || + strncmp(r->str,"MASTERDOWN",10) == 0) + { + ri->last_avail_time = mstime(); + } + } + ri->last_pong_time = mstime(); +} + +/* This is called when we get the reply about the PUBLISH command we send + * to the master to advertise this sentinel. */ +void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) { + sentinelRedisInstance *ri = c->data; + redisReply *r; + + ri->pending_commands--; + if (!reply) return; + r = reply; + + /* Only update pub_time if we actually published our message. Otherwise + * we'll retry against in 100 milliseconds. */ + if (r->type != REDIS_REPLY_ERROR) + ri->last_pub_time = mstime(); +} + +/* This is our Pub/Sub callback for the Hello channel. It's useful in order + * to discover other sentinels attached at the same master. */ +void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) { + sentinelRedisInstance *ri = c->data; + redisReply *r; + + if (!reply) return; + r = reply; + + /* Update the last activity in the pubsub channel. Note that since we + * receive our messages as well this timestamp can be used to detect + * if the link is probably diconnected even if it seems otherwise. */ + ri->pc_last_activity = mstime(); + + /* Sanity check in the reply we expect, so that the code that follows + * can avoid to check for details. */ + if (r->type != REDIS_REPLY_ARRAY || + r->elements != 3 || + r->element[0]->type != REDIS_REPLY_STRING || + r->element[1]->type != REDIS_REPLY_STRING || + r->element[2]->type != REDIS_REPLY_STRING || + strcmp(r->element[0]->str,"message") != 0) return; + + /* We are not interested in meeting ourselves */ + if (strstr(r->element[2]->str,server.runid) != NULL) return; + + { + int numtokens, port, removed, canfailover; + char **token = sdssplitlen(r->element[2]->str, + r->element[2]->len, + ":",1,&numtokens); + sentinelRedisInstance *sentinel; + + if (numtokens == 4) { + /* First, try to see if we already have this sentinel. */ + port = atoi(token[1]); + canfailover = atoi(token[3]); + sentinel = getSentinelRedisInstanceByAddrAndRunID( + ri->sentinels,token[0],port,token[2]); + + if (!sentinel) { + /* If not, remove all the sentinels that have the same runid + * OR the same ip/port, because it's either a restart or a + * network topology change. */ + removed = removeMatchingSentinelsFromMaster(ri,token[0],port, + token[2]); + if (removed) { + sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri, + "%@ #duplicate of %s:%d or %s", + token[0],port,token[2]); + } + + /* Add the new sentinel. */ + sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL, + token[0],port,ri->quorum,ri); + if (sentinel) { + sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@"); + /* The runid is NULL after a new instance creation and + * for Sentinels we don't have a later chance to fill it, + * so do it now. */ + sentinel->runid = sdsnew(token[2]); + } + } + + /* Update the state of the Sentinel. */ + if (sentinel) { + sentinel->last_hello_time = mstime(); + if (canfailover) + sentinel->flags |= SRI_CAN_FAILOVER; + else + sentinel->flags &= ~SRI_CAN_FAILOVER; + } + } + sdsfreesplitres(token,numtokens); + } +} + +void sentinelPingInstance(sentinelRedisInstance *ri) { + mstime_t now = mstime(); + mstime_t info_period; + int retval; + + /* Return ASAP if we have already a PING or INFO already pending, or + * in the case the instance is not properly connected. */ + if (ri->flags & SRI_DISCONNECTED) return; + + /* For INFO, PING, PUBLISH that are not critical commands to send we + * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't + * want to use a lot of memory just because a link is not working + * properly (note that anyway there is a redundant protection about this, + * that is, the link will be disconnected and reconnected if a long + * timeout condition is detected. */ + if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return; + + /* If this is a slave of a master in O_DOWN condition we start sending + * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD + * period. In this state we want to closely monitor slaves in case they + * are turned into masters by another Sentinel, or by the sysadmin. */ + if ((ri->flags & SRI_SLAVE) && + (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) { + info_period = 1000; + } else { + info_period = SENTINEL_INFO_PERIOD; + } + + if ((ri->flags & SRI_SENTINEL) == 0 && + (ri->info_refresh == 0 || + (now - ri->info_refresh) > info_period)) + { + /* Send INFO to masters and slaves, not sentinels. */ + retval = redisAsyncCommand(ri->cc, + sentinelInfoReplyCallback, NULL, "INFO"); + if (retval != REDIS_OK) return; + ri->pending_commands++; + } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) { + /* Send PING to all the three kinds of instances. */ + retval = redisAsyncCommand(ri->cc, + sentinelPingReplyCallback, NULL, "PING"); + if (retval != REDIS_OK) return; + ri->pending_commands++; + } else if ((ri->flags & SRI_MASTER) && + (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) + { + /* PUBLISH hello messages only to masters. */ + struct sockaddr_in sa; + socklen_t salen = sizeof(sa); + + if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) { + char myaddr[128]; + + snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d", + inet_ntoa(sa.sin_addr), server.port, server.runid, + (ri->flags & SRI_CAN_FAILOVER) != 0); + retval = redisAsyncCommand(ri->cc, + sentinelPublishReplyCallback, NULL, "PUBLISH %s %s", + SENTINEL_HELLO_CHANNEL,myaddr); + if (retval != REDIS_OK) return; + ri->pending_commands++; + } + } +} + +/* =========================== SENTINEL command ============================= */ + +const char *sentinelFailoverStateStr(int state) { + switch(state) { + case SENTINEL_FAILOVER_STATE_NONE: return "none"; + case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start"; + case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave"; + case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone"; + case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion"; + case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves"; + case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients"; + case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end"; + case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config"; + default: return "unknown"; + } +} + +/* Redis instance to Redis protocol representation. */ +void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) { + char *flags = sdsempty(); + void *mbl; + int fields = 0; + + mbl = addDeferredMultiBulkLength(c); + + addReplyBulkCString(c,"name"); + addReplyBulkCString(c,ri->name); + fields++; + + addReplyBulkCString(c,"ip"); + addReplyBulkCString(c,ri->addr->ip); + fields++; + + addReplyBulkCString(c,"port"); + addReplyBulkLongLong(c,ri->addr->port); + fields++; + + addReplyBulkCString(c,"runid"); + addReplyBulkCString(c,ri->runid ? ri->runid : ""); + fields++; + + addReplyBulkCString(c,"flags"); + if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,"); + if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,"); + if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,"); + if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,"); + if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,"); + if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,"); + if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,"); + if (ri->flags & SRI_FAILOVER_IN_PROGRESS) + flags = sdscat(flags,"failover_in_progress,"); + if (ri->flags & SRI_I_AM_THE_LEADER) + flags = sdscat(flags,"i_am_the_leader,"); + if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,"); + if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,"); + if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,"); + if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,"); + + if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */ + addReplyBulkCString(c,flags); + sdsfree(flags); + fields++; + + addReplyBulkCString(c,"pending-commands"); + addReplyBulkLongLong(c,ri->pending_commands); + fields++; + + if (ri->flags & SRI_FAILOVER_IN_PROGRESS) { + addReplyBulkCString(c,"failover-state"); + addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state)); + fields++; + } + + addReplyBulkCString(c,"last-ok-ping-reply"); + addReplyBulkLongLong(c,mstime() - ri->last_avail_time); + fields++; + + addReplyBulkCString(c,"last-ping-reply"); + addReplyBulkLongLong(c,mstime() - ri->last_pong_time); + fields++; + + if (ri->flags & SRI_S_DOWN) { + addReplyBulkCString(c,"s-down-time"); + addReplyBulkLongLong(c,mstime()-ri->s_down_since_time); + fields++; + } + + if (ri->flags & SRI_O_DOWN) { + addReplyBulkCString(c,"o-down-time"); + addReplyBulkLongLong(c,mstime()-ri->o_down_since_time); + fields++; + } + + /* Masters and Slaves */ + if (ri->flags & (SRI_MASTER|SRI_SLAVE)) { + addReplyBulkCString(c,"info-refresh"); + addReplyBulkLongLong(c,mstime() - ri->info_refresh); + fields++; + } + + /* Only masters */ + if (ri->flags & SRI_MASTER) { + addReplyBulkCString(c,"num-slaves"); + addReplyBulkLongLong(c,dictSize(ri->slaves)); + fields++; + + addReplyBulkCString(c,"num-other-sentinels"); + addReplyBulkLongLong(c,dictSize(ri->sentinels)); + fields++; + + addReplyBulkCString(c,"quorum"); + addReplyBulkLongLong(c,ri->quorum); + fields++; + } + + /* Only slaves */ + if (ri->flags & SRI_SLAVE) { + addReplyBulkCString(c,"master-link-down-time"); + addReplyBulkLongLong(c,ri->master_link_down_time); + fields++; + + addReplyBulkCString(c,"master-link-status"); + addReplyBulkCString(c, + (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ? + "ok" : "err"); + fields++; + + addReplyBulkCString(c,"master-host"); + addReplyBulkCString(c, + ri->slave_master_host ? ri->slave_master_host : "?"); + fields++; + + addReplyBulkCString(c,"master-port"); + addReplyBulkLongLong(c,ri->slave_master_port); + fields++; + } + + /* Only sentinels */ + if (ri->flags & SRI_SENTINEL) { + addReplyBulkCString(c,"last-hello-message"); + addReplyBulkLongLong(c,mstime() - ri->last_hello_time); + fields++; + + addReplyBulkCString(c,"can-failover-its-master"); + addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0); + fields++; + + if (ri->flags & SRI_MASTER_DOWN) { + addReplyBulkCString(c,"subjective-leader"); + addReplyBulkCString(c,ri->leader ? ri->leader : "?"); + fields++; + } + } + + setDeferredMultiBulkLength(c,mbl,fields*2); +} + +/* Output a number of instances contanined inside a dictionary as + * Redis protocol. */ +void addReplyDictOfRedisInstances(redisClient *c, dict *instances) { + dictIterator *di; + dictEntry *de; + + di = dictGetIterator(instances); + addReplyMultiBulkLen(c,dictSize(instances)); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + addReplySentinelRedisInstance(c,ri); + } + dictReleaseIterator(di); +} + +/* Lookup the named master into sentinel.masters. + * If the master is not found reply to the client with an error and returns + * NULL. */ +sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c, + robj *name) +{ + sentinelRedisInstance *ri; + + ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr); + if (!ri) { + addReplyError(c,"No such master with that name"); + return NULL; + } + return ri; +} + +void sentinelCommand(redisClient *c) { + if (!strcasecmp(c->argv[1]->ptr,"masters")) { + /* SENTINEL MASTERS */ + if (c->argc != 2) goto numargserr; + + addReplyDictOfRedisInstances(c,sentinel.masters); + } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) { + /* SENTINEL SLAVES */ + sentinelRedisInstance *ri; + + if (c->argc != 3) goto numargserr; + if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL) + return; + addReplyDictOfRedisInstances(c,ri->slaves); + } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) { + /* SENTINEL SENTINELS */ + sentinelRedisInstance *ri; + + if (c->argc != 3) goto numargserr; + if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL) + return; + addReplyDictOfRedisInstances(c,ri->sentinels); + } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) { + /* SENTINEL IS-MASTER-DOWN-BY-ADDR */ + sentinelRedisInstance *ri; + char *leader = NULL; + long port; + int isdown = 0; + + if (c->argc != 4) goto numargserr; + if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK) + return; + ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters, + c->argv[2]->ptr,port,NULL); + + /* It exists? Is actually a master? Is subjectively down? It's down. + * Note: if we are in tilt mode we always reply with "0". */ + if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) && + (ri->flags & SRI_MASTER)) + isdown = 1; + if (ri) leader = sentinelGetSubjectiveLeader(ri); + + /* Reply with a two-elements multi-bulk reply: down state, leader. */ + addReplyMultiBulkLen(c,2); + addReply(c, isdown ? shared.cone : shared.czero); + addReplyBulkCString(c, leader ? leader : "?"); + if (leader) sdsfree(leader); + } else if (!strcasecmp(c->argv[1]->ptr,"reset")) { + /* SENTINEL RESET */ + if (c->argc != 3) goto numargserr; + addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr)); + } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) { + /* SENTINEL GET-MASTER-ADDR-BY-NAME */ + sentinelRedisInstance *ri; + + if (c->argc != 3) goto numargserr; + ri = sentinelGetMasterByName(c->argv[2]->ptr); + if (ri == NULL) { + addReply(c,shared.nullmultibulk); + } else { + sentinelAddr *addr = ri->addr; + + if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave) + addr = ri->promoted_slave->addr; + addReplyMultiBulkLen(c,2); + addReplyBulkCString(c,addr->ip); + addReplyBulkLongLong(c,addr->port); + } + } else { + addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'", + (char*)c->argv[1]->ptr); + } + return; + +numargserr: + addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'", + (char*)c->argv[1]->ptr); +} + +/* ===================== SENTINEL availability checks ======================= */ + +/* Is this instance down from our point of view? */ +void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) { + mstime_t elapsed = mstime() - ri->last_avail_time; + + /* Check if we are in need for a reconnection of one of the + * links, because we are detecting low activity. + * + * 1) Check if the command link seems connected, was connected not less + * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an + * idle time that is greater than down_after_period / 2 seconds. */ + if (ri->cc && + (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && + (mstime() - ri->last_pong_time) > (ri->down_after_period/2)) + { + redisAsyncFree(ri->cc); /* will call the disconnection callback */ + } + + /* 2) Check if the pubsub link seems connected, was connected not less + * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no + * activity in the Pub/Sub channel for more than + * SENTINEL_PUBLISH_PERIOD * 3. + */ + if (ri->pc && + (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && + (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3)) + { + redisAsyncFree(ri->pc); /* will call the disconnection callback */ + } + + /* Update the subjectively down flag. */ + if (elapsed > ri->down_after_period) { + /* Is subjectively down */ + if ((ri->flags & SRI_S_DOWN) == 0) { + sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@"); + ri->s_down_since_time = mstime(); + ri->flags |= SRI_S_DOWN; + } + } else { + /* Is subjectively up */ + if (ri->flags & SRI_S_DOWN) { + sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@"); + ri->flags &= ~SRI_S_DOWN; + } + } +} + +/* Is this instance down accordingly to the configured quorum? */ +void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) { + dictIterator *di; + dictEntry *de; + int quorum = 0, odown = 0; + + if (master->flags & SRI_S_DOWN) { + /* Is down for enough sentinels? */ + quorum = 1; /* the current sentinel. */ + /* Count all the other sentinels. */ + di = dictGetIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + if (ri->flags & SRI_MASTER_DOWN) quorum++; + } + dictReleaseIterator(di); + if (quorum >= master->quorum) odown = 1; + } + + /* Set the flag accordingly to the outcome. */ + if (odown) { + if ((master->flags & SRI_O_DOWN) == 0) { + sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d", + quorum, master->quorum); + master->flags |= SRI_O_DOWN; + master->o_down_since_time = mstime(); + } + } else { + if (master->flags & SRI_O_DOWN) { + sentinelEvent(REDIS_WARNING,"-odown",master,"%@"); + master->flags &= ~SRI_O_DOWN; + } + } +} + +/* Receive the SENTINEL is-master-down-by-addr reply, see the + * sentinelAskMasterStateToOtherSentinels() function for more information. */ +void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) { + sentinelRedisInstance *ri = c->data; + redisReply *r; + + ri->pending_commands--; + if (!reply) return; + r = reply; + + /* Ignore every error or unexpected reply. + * Note that if the command returns an error for any reason we'll + * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */ + if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 && + r->element[0]->type == REDIS_REPLY_INTEGER && + r->element[1]->type == REDIS_REPLY_STRING) + { + ri->last_master_down_reply_time = mstime(); + if (r->element[0]->integer == 1) { + ri->flags |= SRI_MASTER_DOWN; + } else { + ri->flags &= ~SRI_MASTER_DOWN; + } + sdsfree(ri->leader); + ri->leader = sdsnew(r->element[1]->str); + } +} + +/* If we think (subjectively) the master is down, we start sending + * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels + * in order to get the replies that allow to reach the quorum and + * possibly also mark the master as objectively down. */ +void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) { + dictIterator *di; + dictEntry *de; + + di = dictGetIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + mstime_t elapsed = mstime() - ri->last_master_down_reply_time; + char port[32]; + int retval; + + /* If the master state from other sentinel is too old, we clear it. */ + if (elapsed > SENTINEL_INFO_VALIDITY_TIME) { + ri->flags &= ~SRI_MASTER_DOWN; + sdsfree(ri->leader); + ri->leader = NULL; + } + + /* Only ask if master is down to other sentinels if: + * + * 1) We believe it is down, or there is a failover in progress. + * 2) Sentinel is connected. + * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */ + if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0) + continue; + if (ri->flags & SRI_DISCONNECTED) continue; + if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD) + continue; + + /* Ask */ + ll2string(port,sizeof(port),master->addr->port); + retval = redisAsyncCommand(ri->cc, + sentinelReceiveIsMasterDownReply, NULL, + "SENTINEL is-master-down-by-addr %s %s", + master->addr->ip, port); + if (retval == REDIS_OK) ri->pending_commands++; + } + dictReleaseIterator(di); +} + +/* =============================== FAILOVER ================================= */ + +/* Given a master get the "subjective leader", that is, among all the sentinels + * with given characteristics, the one with the lexicographically smaller + * runid. The characteristics required are: + * + * 1) Has SRI_CAN_FAILOVER flag. + * 2) Is not disconnected. + * 3) Recently answered to our ping (no longer than + * SENTINEL_INFO_VALIDITY_TIME milliseconds ago). + * + * The function returns a pointer to an sds string representing the runid of the + * leader sentinel instance (from our point of view). Otherwise NULL is + * returned if there are no suitable sentinels. + */ + +int compareRunID(const void *a, const void *b) { + char **aptrptr = (char**)a, **bptrptr = (char**)b; + return strcasecmp(*aptrptr, *bptrptr); +} + +char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) { + dictIterator *di; + dictEntry *de; + char **instance = + zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1)); + int instances = 0; + char *leader = NULL; + + if (master->flags & SRI_CAN_FAILOVER) { + /* Add myself if I'm a Sentinel that can failover this master. */ + instance[instances++] = server.runid; + } + + di = dictGetIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + mstime_t lag = mstime() - ri->last_avail_time; + + if (lag > SENTINEL_INFO_VALIDITY_TIME || + !(ri->flags & SRI_CAN_FAILOVER) || + (ri->flags & SRI_DISCONNECTED) || + ri->runid == NULL) + continue; + instance[instances++] = ri->runid; + } + dictReleaseIterator(di); + + /* If we have at least one instance passing our checks, order the array + * by runid. */ + if (instances) { + qsort(instance,instances,sizeof(char*),compareRunID); + leader = sdsnew(instance[0]); + } + zfree(instance); + return leader; +} + +struct sentinelLeader { + char *runid; + unsigned long votes; +}; + +/* Helper function for sentinelGetObjectiveLeader, increment the counter + * relative to the specified runid. */ +void sentinelObjectiveLeaderIncr(dict *counters, char *runid) { + dictEntry *de = dictFind(counters,runid); + uint64_t oldval; + + if (de) { + oldval = dictGetUnsignedIntegerVal(de); + dictSetUnsignedIntegerVal(de,oldval+1); + } else { + de = dictAddRaw(counters,runid); + redisAssert(de != NULL); + dictSetUnsignedIntegerVal(de,1); + } +} + +/* Scan all the Sentinels attached to this master to check what is the + * most voted leader among Sentinels. */ +char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) { + dict *counters; + dictIterator *di; + dictEntry *de; + unsigned int voters = 0, voters_quorum; + char *myvote; + char *winner = NULL; + + redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)); + counters = dictCreate(&leaderVotesDictType,NULL); + + /* Count my vote. */ + myvote = sentinelGetSubjectiveLeader(master); + if (myvote) { + sentinelObjectiveLeaderIncr(counters,myvote); + voters++; + } + + /* Count other sentinels votes */ + di = dictGetIterator(master->sentinels); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + if (ri->leader == NULL) continue; + /* If the failover is not already in progress we are only interested + * in Sentinels that believe the master is down. Otherwise the leader + * selection is useful for the "failover-takedown" when the original + * leader fails. In that case we consider all the voters. */ + if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) && + !(ri->flags & SRI_MASTER_DOWN)) continue; + sentinelObjectiveLeaderIncr(counters,ri->leader); + voters++; + } + dictReleaseIterator(di); + voters_quorum = voters/2+1; + + /* Check what's the winner. For the winner to win, it needs two conditions: + * 1) Absolute majority between voters (50% + 1). + * 2) And anyway at least master->quorum votes. */ + { + uint64_t max_votes = 0; /* Max votes so far. */ + + di = dictGetIterator(counters); + while((de = dictNext(di)) != NULL) { + uint64_t votes = dictGetUnsignedIntegerVal(de); + + if (max_votes < votes) { + max_votes = votes; + winner = dictGetKey(de); + } + } + dictReleaseIterator(di); + if (winner && (max_votes < voters_quorum || max_votes < master->quorum)) + winner = NULL; + } + winner = winner ? sdsnew(winner) : NULL; + sdsfree(myvote); + dictRelease(counters); + return winner; +} + +/* This function checks if there are the conditions to start the failover, + * that is: + * + * 1) Enough time has passed since O_DOWN. + * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it. + * 3) We are the objectively leader for this master. + * + * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS + * and SRI_I_AM_THE_LEADER. + */ +void sentinelStartFailover(sentinelRedisInstance *master) { + char *leader; + int isleader; + + /* We can't failover if the master is not in O_DOWN state or if + * there is not already a failover in progress (to perform the + * takedown if the leader died) or if this Sentinel is not allowed + * to start a failover. */ + if (!(master->flags & SRI_CAN_FAILOVER) || + !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return; + + leader = sentinelGetObjectiveLeader(master); + isleader = leader && strcasecmp(leader,server.runid) == 0; + sdsfree(leader); + + /* If I'm not the leader, I can't failover for sure. */ + if (!isleader) return; + + /* If the failover is already in progress there are two options... */ + if (master->flags & SRI_FAILOVER_IN_PROGRESS) { + if (master->flags & SRI_I_AM_THE_LEADER) { + /* 1) I'm flagged as leader so I already started the failover. + * Just return. */ + return; + } else { + mstime_t elapsed = mstime() - master->failover_state_change_time; + + /* 2) I'm the new leader, but I'm not flagged as leader in the + * master: I did not started the failover, but the original + * leader has no longer the leadership. + * + * In this case if the failover appears to be lagging + * for at least 25% of the configured failover timeout, + * I can assume I can take control. Otherwise + * it's better to return and wait more. */ + if (elapsed < (master->failover_timeout/4)) return; + sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@"); + /* We have already an elected slave if we are in + * FAILOVER_IN_PROGRESS state, that is, the slave that we + * observed turning into a master. */ + master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES; + /* As an observer we flagged all the slaves as RECONF_SENT but + * now we are in charge of actually sending the reconfiguration + * command so let's clear this flag for all the instances. */ + sentinelDelFlagsToDictOfRedisInstances(master->slaves, + SRI_RECONF_SENT); + } + } else { + /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set. */ + master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START; + } + + master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER; + sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@"); + + /* Pick a random delay if it's a fresh failover (WAIT_START), and not + * a recovery of a failover started by another sentinel. */ + if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) { + master->failover_start_time = mstime() + + SENTINEL_FAILOVER_FIXED_DELAY + + (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY); + sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master, + "%@ #starting in %lld milliseconds", + master->failover_start_time-mstime()); + } + master->failover_state_change_time = mstime(); +} + +/* Select a suitable slave to promote. The current algorithm only uses + * the following parameters: + * + * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED. + * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME. + * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME. + * 4) master_link_down_time no more than: + * (now - master->s_down_since_time) + (master->down_after_period * 10). + * + * Among all the slaves matching the above conditions we select the slave + * with lower slave_priority. If priority is the same we select the slave + * with lexicographically smaller runid. + * + * The function returns the pointer to the selected slave, otherwise + * NULL if no suitable slave was found. + */ + +int compareSlavesForPromotion(const void *a, const void *b) { + sentinelRedisInstance **sa = (sentinelRedisInstance **)a, + **sb = (sentinelRedisInstance **)b; + if ((*sa)->slave_priority != (*sb)->slave_priority) + return (*sa)->slave_priority - (*sb)->slave_priority; + return strcasecmp((*sa)->runid,(*sb)->runid); +} + +sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) { + sentinelRedisInstance **instance = + zmalloc(sizeof(instance[0])*dictSize(master->slaves)); + sentinelRedisInstance *selected = NULL; + int instances = 0; + dictIterator *di; + dictEntry *de; + mstime_t max_master_down_time; + + max_master_down_time = (mstime() - master->s_down_since_time) + + (master->down_after_period * 10); + + di = dictGetIterator(master->slaves); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *slave = dictGetVal(de); + mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME; + + if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue; + if (slave->last_avail_time < info_validity_time) continue; + if (slave->info_refresh < info_validity_time) continue; + if (slave->master_link_down_time > max_master_down_time) continue; + instance[instances++] = slave; + } + dictReleaseIterator(di); + if (instances) { + qsort(instance,instances,sizeof(sentinelRedisInstance*), + compareSlavesForPromotion); + selected = instance[0]; + } + zfree(instance); + return selected; +} + +/* ---------------- Failover state machine implementation ------------------- */ +void sentinelFailoverWaitStart(sentinelRedisInstance *ri) { + if (mstime() >= ri->failover_start_time) { + ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE; + ri->failover_state_change_time = mstime(); + sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@"); + } +} + +void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) { + sentinelRedisInstance *slave = sentinelSelectSlave(ri); + + if (slave == NULL) { + sentinelEvent(REDIS_WARNING,"-no-good-slave",ri, + "%@ #retrying in %d seconds", + (SENTINEL_FAILOVER_FIXED_DELAY+ + SENTINEL_FAILOVER_MAX_RANDOM_DELAY)/1000); + ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START; + ri->failover_start_time = mstime() + SENTINEL_FAILOVER_FIXED_DELAY + + SENTINEL_FAILOVER_MAX_RANDOM_DELAY; + } else { + sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@"); + slave->flags |= SRI_PROMOTED; + ri->promoted_slave = slave; + ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE; + ri->failover_state_change_time = mstime(); + sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone", + slave, "%@"); + } +} + +void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) { + int retval; + + if (ri->promoted_slave->flags & SRI_DISCONNECTED) return; + + /* Send SLAVEOF NO ONE command to turn the slave into a master. + * We actually register a generic callback for this command as we don't + * really care about the reply. We check if it worked indirectly observing + * if INFO returns a different role (master instead of slave). */ + retval = redisAsyncCommand(ri->promoted_slave->cc, + sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE"); + if (retval != REDIS_OK) return; + ri->promoted_slave->pending_commands++; + sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion", + ri->promoted_slave,"%@"); + ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION; + ri->failover_state_change_time = mstime(); +} + +/* We actually wait for promotion indirectly checking with INFO when the + * slave turns into a master. */ +void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) { + mstime_t elapsed = mstime() - ri->failover_state_change_time; + + if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) { + sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave, + "%@"); + sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@"); + ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE; + ri->failover_state_change_time = mstime(); + ri->promoted_slave->flags &= ~SRI_PROMOTED; + ri->promoted_slave = NULL; + } +} + +void sentinelFailoverDetectEnd(sentinelRedisInstance *master) { + int not_reconfigured = 0, timeout = 0; + dictIterator *di; + dictEntry *de; + mstime_t elapsed = mstime() - master->failover_state_change_time; + + /* We can't consider failover finished if the promoted slave is + * not reachable. */ + if (master->promoted_slave == NULL || + master->promoted_slave->flags & SRI_S_DOWN) return; + + /* The failover terminates once all the reachable slaves are properly + * configured. */ + di = dictGetIterator(master->slaves); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *slave = dictGetVal(de); + + if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue; + if (slave->flags & SRI_S_DOWN) continue; + not_reconfigured++; + } + dictReleaseIterator(di); + + /* Force end of failover on timeout. */ + if (elapsed > master->failover_timeout) { + not_reconfigured = 0; + timeout = 1; + sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@"); + } + + if (not_reconfigured == 0) { + sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@"); + master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG; + master->failover_state_change_time = mstime(); + } + + /* If I'm the leader it is a good idea to send a best effort SLAVEOF + * command to all the slaves still not reconfigured to replicate with + * the new master. */ + if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) { + dictIterator *di; + dictEntry *de; + char master_port[32]; + + ll2string(master_port,sizeof(master_port), + master->promoted_slave->addr->port); + + di = dictGetIterator(master->slaves); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *slave = dictGetVal(de); + int retval; + + if (slave->flags & + (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue; + + retval = redisAsyncCommand(slave->cc, + sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s", + master->promoted_slave->addr->ip, + master_port); + if (retval == REDIS_OK) { + sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@"); + slave->flags |= SRI_RECONF_SENT; + } + } + dictReleaseIterator(di); + } +} + +/* Send SLAVE OF to all the remaining slaves that + * still don't appear to have the configuration updated. */ +void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) { + dictIterator *di; + dictEntry *de; + int in_progress = 0; + + di = dictGetIterator(master->slaves); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *slave = dictGetVal(de); + + if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)) + in_progress++; + } + dictReleaseIterator(di); + + di = dictGetIterator(master->slaves); + while(in_progress < master->parallel_syncs && + (de = dictNext(di)) != NULL) + { + sentinelRedisInstance *slave = dictGetVal(de); + int retval; + char master_port[32]; + + /* Skip the promoted slave, and already configured slaves. */ + if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue; + + /* Clear the SRI_RECONF_SENT flag if too much time elapsed without + * the slave moving forward to the next state. */ + if ((slave->flags & SRI_RECONF_SENT) && + (mstime() - slave->slave_reconf_sent_time) > + SENTINEL_SLAVE_RECONF_RETRY_PERIOD) + { + sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@"); + slave->flags &= ~SRI_RECONF_SENT; + } + + /* Nothing to do for instances that are disconnected or already + * in RECONF_SENT state. */ + if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG)) + continue; + + /* Send SLAVEOF . */ + ll2string(master_port,sizeof(master_port), + master->promoted_slave->addr->port); + retval = redisAsyncCommand(slave->cc, + sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s", + master->promoted_slave->addr->ip, + master_port); + if (retval == REDIS_OK) { + slave->flags |= SRI_RECONF_SENT; + slave->pending_commands++; + slave->slave_reconf_sent_time = mstime(); + sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@"); + in_progress++; + } + } + dictReleaseIterator(di); + sentinelFailoverDetectEnd(master); +} + +/* This function is called when the slave is in + * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need + * to remove it from the master table and add the promoted slave instead. + * + * If there are no promoted slaves as this instance is unique, we remove + * and re-add it with the same address to trigger a complete state + * refresh. */ +void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) { + sentinelRedisInstance *new, *ref = master->promoted_slave ? + master->promoted_slave : master; + int quorum = ref->quorum, parallel_syncs = ref->parallel_syncs; + char *name = sdsnew(master->name); + char *ip = sdsnew(ref->addr->ip), *oldip = sdsnew(master->addr->ip); + int port = ref->addr->port, oldport = master->addr->port; + int retval, oldflags = master->flags; + mstime_t old_down_after_period = master->down_after_period; + mstime_t old_failover_timeout = master->failover_timeout; + + retval = dictDelete(sentinel.masters,master->name); + redisAssert(retval == DICT_OK); + new = createSentinelRedisInstance(name,SRI_MASTER,ip,port,quorum,NULL); + redisAssert(new != NULL); + new->parallel_syncs = parallel_syncs; + new->flags |= (oldflags & SRI_CAN_FAILOVER); + new->down_after_period = old_down_after_period; + new->failover_timeout = old_failover_timeout; + /* TODO: ... set the scripts as well. */ + sentinelEvent(REDIS_WARNING,"+switch-master",new,"%s %s %d %s %d", + name, oldip, oldport, ip, port); + sdsfree(name); + sdsfree(ip); + sdsfree(oldip); +} + +void sentinelFailoverStateMachine(sentinelRedisInstance *ri) { + redisAssert(ri->flags & SRI_MASTER); + + if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return; + + switch(ri->failover_state) { + case SENTINEL_FAILOVER_STATE_WAIT_START: + sentinelFailoverWaitStart(ri); + break; + case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: + sentinelFailoverSelectSlave(ri); + break; + case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: + sentinelFailoverSendSlaveOfNoOne(ri); + break; + case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: + sentinelFailoverWaitPromotion(ri); + break; + case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: + sentinelFailoverReconfNextSlave(ri); + break; + case SENTINEL_FAILOVER_STATE_DETECT_END: + sentinelFailoverDetectEnd(ri); + break; + } +} + +/* The following is called only for master instances and will abort the + * failover process if: + * + * 1) The failover is in progress. + * 2) We already promoted a slave. + * 3) The promoted slave is in extended SDOWN condition. + */ +void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) { + dictIterator *di; + dictEntry *de; + + /* Failover is in progress? Do we have a promoted slave? */ + if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return; + + /* Is the promoted slave into an extended SDOWN state? */ + if (!(ri->promoted_slave->flags & SRI_S_DOWN) || + (mstime() - ri->promoted_slave->s_down_since_time) < + (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return; + + sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@"); + + /* Clear failover related flags from slaves. + * Also if we are the leader make sure to send SLAVEOF commands to all the + * already reconfigured slaves in order to turn them back into slaves of + * the original master. */ + + di = dictGetIterator(ri->slaves); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *slave = dictGetVal(de); + if (ri->flags & SRI_I_AM_THE_LEADER) { + char master_port[32]; + int retval; + + ll2string(master_port,sizeof(master_port),ri->addr->port); + retval = redisAsyncCommand(slave->cc, + sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s", + ri->addr->ip, + master_port); + if (retval == REDIS_OK) + sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@"); + } + slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE); + } + dictReleaseIterator(di); + + ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER); + ri->failover_state = SENTINEL_FAILOVER_STATE_NONE; + ri->failover_state_change_time = mstime(); + ri->promoted_slave->flags &= ~SRI_PROMOTED; + ri->promoted_slave = NULL; +} + +/* ======================== SENTINEL timer handler ========================== + * This is the "main" our Sentinel, being sentinel completely non blocking + * in design. The function is called every second. + * -------------------------------------------------------------------------- */ + +/* Perform scheduled operations for the specified Redis instance. */ +void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { + /* ========== MONITORING HALF ============ */ + /* Every kind of instance */ + sentinelReconnectInstance(ri); + sentinelPingInstance(ri); + + /* Masters and slaves */ + if (ri->flags & (SRI_MASTER|SRI_SLAVE)) { + /* Nothing so far. */ + } + + /* Only masters */ + if (ri->flags & SRI_MASTER) { + sentinelAskMasterStateToOtherSentinels(ri); + } + + /* ============== ACTING HALF ============= */ + /* We don't proceed with the acting half if we are in TILT mode. + * TILT happens when we find something odd with the time, like a + * sudden change in the clock. */ + if (sentinel.tilt) { + if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return; + sentinel.tilt = 0; + sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited"); + } + + /* Every kind of instance */ + sentinelCheckSubjectivelyDown(ri); + + /* Masters and slaves */ + if (ri->flags & (SRI_MASTER|SRI_SLAVE)) { + /* Nothing so far. */ + } + + /* Only masters */ + if (ri->flags & SRI_MASTER) { + sentinelCheckObjectivelyDown(ri); + sentinelStartFailover(ri); + sentinelFailoverStateMachine(ri); + sentinelAbortFailoverIfNeeded(ri); + } +} + +/* Perform scheduled operations for all the instances in the dictionary. + * Recursively call the function against dictionaries of slaves. */ +void sentinelHandleDictOfRedisInstances(dict *instances) { + dictIterator *di; + dictEntry *de; + sentinelRedisInstance *switch_to_promoted = NULL; + + /* There are a number of things we need to perform against every master. */ + di = dictGetIterator(instances); + while((de = dictNext(di)) != NULL) { + sentinelRedisInstance *ri = dictGetVal(de); + + sentinelHandleRedisInstance(ri); + if (ri->flags & SRI_MASTER) { + sentinelHandleDictOfRedisInstances(ri->slaves); + sentinelHandleDictOfRedisInstances(ri->sentinels); + if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) { + switch_to_promoted = ri; + } + } + } + if (switch_to_promoted) + sentinelFailoverSwitchToPromotedSlave(switch_to_promoted); + dictReleaseIterator(di); +} + +/* This function checks if we need to enter the TITL mode. + * + * The TILT mode is entered if we detect that between two invocations of the + * timer interrupt, a negative amount of time, or too much time has passed. + * Note that we expect that more or less just 100 milliseconds will pass + * if everything is fine. However we'll see a negative number or a + * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the + * following conditions happen: + * + * 1) The Sentiel process for some time is blocked, for every kind of + * random reason: the load is huge, the computer was freezed for some time + * in I/O or alike, the process was stopped by a signal. Everything. + * 2) The system clock was altered significantly. + * + * Under both this conditions we'll see everything as timed out and failing + * without good reasons. Instead we enter the TILT mode and wait + * for SENTIENL_TILT_PERIOD to elapse before starting to act again. + * + * During TILT time we still collect information, we just do not act. */ +void sentinelCheckTiltCondition(void) { + mstime_t now = mstime(); + mstime_t delta = now - sentinel.previous_time; + + if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) { + sentinel.tilt = 1; + sentinel.tilt_start_time = mstime(); + sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered"); + } + sentinel.previous_time = mstime(); +} + +void sentinelTimer(void) { + sentinelCheckTiltCondition(); + sentinelHandleDictOfRedisInstances(sentinel.masters); +} + -- 2.45.2