]> git.saurik.com Git - redis.git/commitdiff
First implementation of Redis Sentinel.
authorantirez <antirez@gmail.com>
Mon, 23 Jul 2012 10:54:52 +0000 (12:54 +0200)
committerantirez <antirez@gmail.com>
Mon, 23 Jul 2012 11:14:44 +0000 (13:14 +0200)
This commit implements the first, beta quality implementation of Redis
Sentinel, a distributed monitoring system for Redis with notification
and automatic failover capabilities.

More info at http://redis.io/topics/sentinel

.gitignore
sentinel.conf [new file with mode: 0644]
src/Makefile
src/anet.c
src/config.c
src/redis.c
src/redis.h
src/sentinel.c [new file with mode: 0644]

index 5f262c4617928cbc4adf7e4d01cc06ad7fa3ca8d..c94c5ac9380957eb9d875cd35e927bd3012a9b69 100644 (file)
@@ -4,6 +4,7 @@
 *.log
 redis-cli
 redis-server
+redis-sentinel
 redis-benchmark
 redis-check-dump
 redis-check-aof
@@ -24,3 +25,4 @@ deps/lua/src/luac
 deps/lua/src/liblua.a
 .make-*
 .prerequisites
+*.dSYM
diff --git a/sentinel.conf b/sentinel.conf
new file mode 100644 (file)
index 0000000..0e7a954
--- /dev/null
@@ -0,0 +1,41 @@
+# Example sentienl.conf
+
+# sentinel monitor <name> <ip> <port> quorum. Tells Sentinel to monitor this
+# slave, and to consider it in O_DOWN (Objectively Down) state only if at
+# least two sentinels agree.
+#
+# Note: master name should not include special characters or spaces.
+# The valid charset is A-z 0-9 and the three characters ".-_".
+sentinel monitor mymaster 127.0.0.1 6379 2
+
+# Number of milliseconds the master (or any attached slave or sentinel) should
+# be unreachable (as in, not acceptable reply to PING, continuously, for the
+# specified period) in order to consider it in S_DOWN state (Subjectively
+# Down).
+#
+# Default is 30 seconds.
+sentinel down-after-milliseconds mymaster 30000
+
+# Specify if this Sentinel can start the failover for this master.
+sentinel can-failover mymaster yes
+
+# How many slaves we can reconfigure to point to the new slave simultaneously
+# during the failover. Use a low number if you use the slaves to serve query
+# to avoid that all the slaves will be unreachable at about the same
+# time while performing the synchronization with the master.
+sentinel parallel-syncs mymaster 1
+
+# Specifies the failover timeout in milliseconds. When this time has elapsed
+# without any progress in the failover process, it is considered concluded by
+# the sentinel even if not all the attached slaves were correctly configured
+# to replicate with the new master (however a "best effort" SLAVEOF command
+# is sent to all the slaves before).
+#
+# Also when 25% of this time has elapsed without any advancement, and there
+# is a leader switch (the sentinel did not started the failover but is now
+# elected as leader), the sentinel will continue the failover doing a
+# "takeover".
+#
+# Default is 15 minutes.
+sentinel failover-timeout mymaster 900000
+
index bffdb30e91fdf08125c60cd4fd8d0460d22325bd..358b4cbac6cba59ad9baed4d78135b81f1794f63 100644 (file)
@@ -78,6 +78,7 @@ endif
 
 REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS)
 REDIS_LD=$(QUIET_LINK)$(CC) $(FINAL_LDFLAGS)
+REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL)
 
 PREFIX?=/usr/local
 INSTALL_BIN= $(PREFIX)/bin
@@ -93,10 +94,12 @@ ENDCOLOR="\033[0m"
 ifndef V
 QUIET_CC = @printf '    %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR) 1>&2;
 QUIET_LINK = @printf '    %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) 1>&2;
+QUIET_INSTALL = @printf '    %b %b\n' $(LINKCOLOR)INSTALL$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) 1>&2;
 endif
 
 REDIS_SERVER_NAME= redis-server
-REDIS_SERVER_OBJ= adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o
+REDIS_SENTINEL_NAME= redis-sentinel
+REDIS_SERVER_OBJ= adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o
 REDIS_CLI_NAME= redis-cli
 REDIS_CLI_OBJ= anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o
 REDIS_BENCHMARK_NAME= redis-benchmark
@@ -106,7 +109,7 @@ REDIS_CHECK_DUMP_OBJ= redis-check-dump.o lzf_c.o lzf_d.o crc64.o
 REDIS_CHECK_AOF_NAME= redis-check-aof
 REDIS_CHECK_AOF_OBJ= redis-check-aof.o
 
-all: $(REDIS_SERVER_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME)
+all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME)
        @echo ""
        @echo "Hint: To run 'make test' is a good idea ;)"
        @echo ""
@@ -151,7 +154,11 @@ endif
 
 # redis-server
 $(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ)
-       $(REDIS_LD) -o $@ $^ ../deps/lua/src/liblua.a $(FINAL_LIBS)
+       $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a $(FINAL_LIBS)
+
+# redis-sentinel
+$(REDIS_SENTINEL_NAME): $(REDIS_SERVER_NAME)
+       $(REDIS_INSTALL) $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME)
 
 # redis-cli
 $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ)
@@ -176,7 +183,7 @@ $(REDIS_CHECK_AOF_NAME): $(REDIS_CHECK_AOF_OBJ)
        $(REDIS_CC) -c $<
 
 clean:
-       rm -rf $(REDIS_SERVER_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html
+       rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html
 
 .PHONY: clean
 
@@ -217,8 +224,8 @@ src/help.h:
 
 install: all
        mkdir -p $(INSTALL_BIN)
-       $(INSTALL) $(REDIS_SERVER_NAME) $(INSTALL_BIN)
-       $(INSTALL) $(REDIS_BENCHMARK_NAME) $(INSTALL_BIN)
-       $(INSTALL) $(REDIS_CLI_NAME) $(INSTALL_BIN)
-       $(INSTALL) $(REDIS_CHECK_DUMP_NAME) $(INSTALL_BIN)
-       $(INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN)
+       $(REDIS_INSTALL) $(REDIS_SERVER_NAME) $(INSTALL_BIN)
+       $(REDIS_INSTALL) $(REDIS_BENCHMARK_NAME) $(INSTALL_BIN)
+       $(REDIS_INSTALL) $(REDIS_CLI_NAME) $(INSTALL_BIN)
+       $(REDIS_INSTALL) $(REDIS_CHECK_DUMP_NAME) $(INSTALL_BIN)
+       $(REDIS_INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN)
index 434d945c7d850edd8b8a8934d254bbc88e5d62d0..4b52425cf8166754cecabd5877b5622b32619e2b 100644 (file)
@@ -367,3 +367,18 @@ int anetPeerToString(int fd, char *ip, int *port) {
     if (port) *port = ntohs(sa.sin_port);
     return 0;
 }
+
+int anetSockName(int fd, char *ip, int *port) {
+    struct sockaddr_in sa;
+    socklen_t salen = sizeof(sa);
+
+    if (getsockname(fd,(struct sockaddr*)&sa,&salen) == -1) {
+        *port = 0;
+        ip[0] = '?';
+        ip[1] = '\0';
+        return -1;
+    }
+    if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
+    if (port) *port = ntohs(sa.sin_port);
+    return 0;
+}
index cf614008460f9771eb50d69d3361d2dc32500885..9e247d66b842d64c7c43d17c46e833d298db9ec5 100644 (file)
@@ -354,6 +354,17 @@ void loadServerConfigFromString(char *config) {
             if ((server.stop_writes_on_bgsave_err = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
             }
+        } else if (!strcasecmp(argv[0],"sentinel")) {
+            /* argc == 1 is handled by main() as we need to enter the sentinel
+             * mode ASAP. */
+            if (argc != 1) {
+                if (!server.sentinel_mode) {
+                    err = "sentinel directive while not in sentinel mode";
+                    goto loaderr;
+                }
+                err = sentinelHandleConfiguration(argv+1,argc-1);
+                if (err) goto loaderr;
+            }
         } else {
             err = "Bad directive or wrong number of arguments"; goto loaderr;
         }
index b6490d57a82ffee45bb44f27c7b4dc1249f36bec..248693b6fd1592a8e7166bd5023db82837355a6c 100644 (file)
@@ -821,13 +821,8 @@ void clientsCron(void) {
  * a macro is used: run_with_period(milliseconds) { .... }
  */
 
-/* Using the following macro you can run code inside serverCron() with the
- * specified period, specified in milliseconds.
- * The actual resolution depends on REDIS_HZ. */
-#define run_with_period(_ms_) if (!(loops % ((_ms_)/(1000/REDIS_HZ))))
-
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
-    int j, loops = server.cronloops;
+    int j;
     REDIS_NOTUSED(eventLoop);
     REDIS_NOTUSED(id);
     REDIS_NOTUSED(clientData);
@@ -896,11 +891,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     }
 
     /* Show information about connected clients */
-    run_with_period(5000) {
-        redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
-            listLength(server.clients)-listLength(server.slaves),
-            listLength(server.slaves),
-            zmalloc_used_memory());
+    if (!server.sentinel_mode) {
+        run_with_period(5000) {
+            redisLog(REDIS_VERBOSE,
+                "%d clients connected (%d slaves), %zu bytes in use",
+                listLength(server.clients)-listLength(server.slaves),
+                listLength(server.slaves),
+                zmalloc_used_memory());
+        }
     }
 
     /* We need to do a few operations on clients asynchronously. */
@@ -985,6 +983,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
         if (server.cluster_enabled) clusterCron();
     }
 
+    /* Run the sentinel timer if we are in sentinel mode. */
+    run_with_period(100) {
+        if (server.sentinel_mode) sentinelTimer();
+    }
+
     server.cronloops++;
     return 1000/REDIS_HZ;
 }
@@ -2444,21 +2447,26 @@ void usage() {
     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
     fprintf(stderr,"       ./redis-server --port 7777\n");
     fprintf(stderr,"       ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
-    fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n");
+    fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
+    fprintf(stderr,"Sentinel mode:\n");
+    fprintf(stderr,"       ./redis-server /etc/sentinel.conf --sentinel\n");
     exit(1);
 }
 
 void redisAsciiArt(void) {
 #include "asciilogo.h"
     char *buf = zmalloc(1024*16);
+    char *mode = "stand alone";
+
+    if (server.cluster_enabled) mode = "cluster";
+    else if (server.sentinel_mode) mode = "sentinel";
 
     snprintf(buf,1024*16,ascii_logo,
         REDIS_VERSION,
         redisGitSHA1(),
         strtol(redisGitDirty(),NULL,10) > 0,
         (sizeof(long) == 8) ? "64" : "32",
-        server.cluster_enabled ? "cluster" : "stand alone",
-        server.port,
+        mode, server.port,
         (long) getpid()
     );
     redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf);
@@ -2496,8 +2504,35 @@ void setupSignalHandlers(void) {
 
 void memtest(size_t megabytes, int passes);
 
+/* Returns 1 if there is --sentinel among the arguments or if
+ * argv[0] is exactly "redis-sentinel". */
+int checkForSentinelMode(int argc, char **argv) {
+    int j;
+
+    if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
+    for (j = 1; j < argc; j++)
+        if (!strcmp(argv[j],"--sentinel")) return 1;
+    return 0;
+}
+
+/* Function called at startup to load RDB or AOF file in memory. */
+void loadDataFromDisk(void) {
+    long long start = ustime();
+    if (server.aof_state == REDIS_AOF_ON) {
+        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
+            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
+    } else {
+        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
+            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
+                (float)(ustime()-start)/1000000);
+        } else if (errno != ENOENT) {
+            redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
+            exit(1);
+        }
+    }
+}
+
 int main(int argc, char **argv) {
-    long long start;
     struct timeval tv;
 
     /* We need to initialize our libraries, and the server configuration. */
@@ -2505,8 +2540,17 @@ int main(int argc, char **argv) {
     srand(time(NULL)^getpid());
     gettimeofday(&tv,NULL);
     dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
+    server.sentinel_mode = checkForSentinelMode(argc,argv);
     initServerConfig();
 
+    /* We need to init sentinel right now as parsing the configuration file
+     * in sentinel mode will have the effect of populating the sentinel
+     * data structures with master nodes to monitor. */
+    if (server.sentinel_mode) {
+        initSentinelConfig();
+        initSentinel();
+    }
+
     if (argc >= 2) {
         int j = 1; /* First option to parse in argv[] */
         sds options = sdsempty();
@@ -2558,27 +2602,20 @@ int main(int argc, char **argv) {
     initServer();
     if (server.daemonize) createPidFile();
     redisAsciiArt();
-    redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
-#ifdef __linux__
-    linuxOvercommitMemoryWarning();
-#endif
-    start = ustime();
-    if (server.aof_state == REDIS_AOF_ON) {
-        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
-            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
-    } else {
-        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
-            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
-                (float)(ustime()-start)/1000000);
-        } else if (errno != ENOENT) {
-            redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
-            exit(1);
-        }
+
+    if (!server.sentinel_mode) {
+        /* Things only needed when not runnign in Sentinel mode. */
+        redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
+    #ifdef __linux__
+        linuxOvercommitMemoryWarning();
+    #endif
+        loadDataFromDisk();
+        if (server.ipfd > 0)
+            redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+        if (server.sofd > 0)
+            redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
     }
-    if (server.ipfd > 0)
-        redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
-    if (server.sofd > 0)
-        redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
+
     aeSetBeforeSleepProc(server.el,beforeSleep);
     aeMain(server.el);
     aeDeleteEventLoop(server.el);
index 33a676dc381e99e11cc4602f972ae758829045c7..bee8cf4f15098335a9ad42911d215afc369f5a71 100644 (file)
 #define REDIS_PROPAGATE_AOF 1
 #define REDIS_PROPAGATE_REPL 2
 
+/* Using the following macro you can run code inside serverCron() with the
+ * specified period, specified in milliseconds.
+ * The actual resolution depends on REDIS_HZ. */
+#define run_with_period(_ms_) if (!(server.cronloops%((_ms_)/(1000/REDIS_HZ))))
+
 /* We can print the stacktrace, so our assert is defined this way: */
 #define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1)))
 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
@@ -579,6 +584,7 @@ struct redisServer {
     int arch_bits;              /* 32 or 64 depending on sizeof(long) */
     int cronloops;              /* Number of times the cron function run */
     char runid[REDIS_RUN_ID_SIZE+1];  /* ID always different at every exec. */
+    int sentinel_mode;          /* True if this instance is a Sentinel. */
     /* Networking */
     int port;                   /* TCP listening port */
     char *bindaddr;             /* Bind address or NULL */
@@ -1115,6 +1121,12 @@ void clusterCron(void);
 clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask);
 void clusterPropagatePublish(robj *channel, robj *message);
 
+/* Sentinel */
+void initSentinelConfig(void);
+void initSentinel(void);
+void sentinelTimer(void);
+char *sentinelHandleConfiguration(char **argv, int argc);
+
 /* Scripting */
 void scriptingInit(void);
 
@@ -1280,4 +1292,10 @@ void enableWatchdog(int period);
 void disableWatchdog(void);
 void watchdogScheduleSignal(int period);
 void redisLogHexDump(int level, char *descr, void *value, size_t len);
+
+#define redisDebug(fmt, ...) \
+    printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__)
+#define redisDebugMark() \
+    printf("-- MARK %s:%d --\n", __FILE__, __LINE__)
+
 #endif
diff --git a/src/sentinel.c b/src/sentinel.c
new file mode 100644 (file)
index 0000000..050fa40
--- /dev/null
@@ -0,0 +1,2439 @@
+/* Redis Sentinel implementation
+ * -----------------------------
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *   * Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   * Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *   * Neither the name of Redis nor the names of its contributors may be used
+ *     to endorse or promote products derived from this software without
+ *     specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "redis.h"
+#include "hiredis.h"
+#include "async.h"
+
+#include <ctype.h>
+#include <arpa/inet.h>
+#include <sys/socket.h>
+
+#define REDIS_SENTINEL_PORT 26379
+
+/* ======================== Sentinel global state =========================== */
+
+typedef long long mstime_t; /* millisecond time type. */
+
+/* Address object, used to describe an ip:port pair. */
+typedef struct sentinelAddr {
+    char *ip;
+    int port;
+} sentinelAddr;
+
+/* A Sentinel Redis Instance object is monitoring. */
+#define SRI_MASTER  (1<<0)
+#define SRI_SLAVE   (1<<1)
+#define SRI_SENTINEL (1<<2)
+#define SRI_DISCONNECTED (1<<3)
+#define SRI_S_DOWN (1<<4)   /* Subjectively down (no quorum). */
+#define SRI_O_DOWN (1<<5)   /* Objectively down (quorum reached). */
+#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
+                                   its master is down. */
+/* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
+ * allowed to perform the failover for this master.
+ * When set in a SRI_SENTINEL instance means that sentinel is allowed to
+ * perform the failover on its master. */
+#define SRI_CAN_FAILOVER (1<<7)
+#define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
+                                           this master. */
+#define SRI_I_AM_THE_LEADER (1<<9)     /* We are the leader for this master. */
+#define SRI_PROMOTED (1<<10)            /* Slave selected for promotion. */
+#define SRI_RECONF_SENT (1<<11)     /* SLAVEOF <newmaster> sent. */
+#define SRI_RECONF_INPROG (1<<12)   /* Slave synchronization in progress. */
+#define SRI_RECONF_DONE (1<<13)     /* Slave synchronized with new master. */
+
+#define SENTINEL_INFO_PERIOD 10000
+#define SENTINEL_PING_PERIOD 1000
+#define SENTINEL_ASK_PERIOD 1000
+#define SENTINEL_PUBLISH_PERIOD 5000
+#define SENTINEL_DOWN_AFTER_PERIOD 30000
+#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
+#define SENTINEL_TILT_TRIGGER 2000
+#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
+#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
+#define SENTINEL_PROMOTION_RETRY_PERIOD 30000
+#define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
+#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
+#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
+#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
+#define SENTINEL_MAX_PENDING_COMMANDS 100
+#define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
+
+/* How many milliseconds is an information valid? This applies for instance
+ * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
+#define SENTINEL_INFO_VALIDITY_TIME 5000
+#define SENTINEL_FAILOVER_FIXED_DELAY 5000
+#define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
+
+/* Failover machine different states. */
+#define SENTINEL_FAILOVER_STATE_NONE 0  /* No failover in progress. */
+#define SENTINEL_FAILOVER_STATE_WAIT_START 1  /* Wait for failover_start_time*/ 
+#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
+#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
+#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
+#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
+#define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
+#define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
+#define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
+#define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
+#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
+
+#define SENTINEL_MASTER_LINK_STATUS_UP 0
+#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
+
+typedef struct sentinelRedisInstance {
+    int flags;      /* See SRI_... defines */
+    char *name;     /* Master name from the point of view of this sentinel. */
+    char *runid;    /* run ID of this instance. */
+    sentinelAddr *addr; /* Master host. */
+    redisAsyncContext *cc; /* Hiredis context for commands. */
+    redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
+    int pending_commands;   /* Number of commands sent waiting for a reply. */
+    mstime_t cc_conn_time; /* cc connection time. */
+    mstime_t pc_conn_time; /* pc connection time. */
+    mstime_t pc_last_activity; /* Last time we received any message. */
+    mstime_t last_avail_time; /* Last time the instance replied to ping with
+                                 a reply we consider valid. */
+    mstime_t last_pong_time;  /* Last time the instance replied to ping,
+                                 whatever the reply was. That's used to check
+                                 if the link is idle and must be reconnected. */
+    mstime_t last_pub_time;   /* Last time we sent hello via Pub/Sub. */
+    mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
+                                 we received an hello from this Sentinel
+                                 via Pub/Sub. */
+    mstime_t last_master_down_reply_time; /* Time of last reply to
+                                             SENTINEL is-master-down command. */
+    mstime_t s_down_since_time; /* Subjectively down since time. */
+    mstime_t o_down_since_time; /* Objectively down since time. */
+    mstime_t down_after_period; /* Consider it down after that period. */
+    mstime_t info_refresh;  /* Time at which we received INFO output from it. */
+
+    /* Master specific. */
+    dict *sentinels;    /* Other sentinels monitoring the same master. */
+    dict *slaves;       /* Slaves for this master instance. */
+    int quorum;         /* Number of sentinels that need to agree on failure. */
+    int parallel_syncs; /* How many slaves to reconfigure at same time. */
+
+    /* Slave specific. */
+    mstime_t master_link_down_time; /* Slave replication link down time. */
+    int slave_priority; /* Slave priority according to its INFO output. */
+    mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
+    struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
+    char *slave_master_host;    /* Master host as reported by INFO */
+    int slave_master_port;      /* Master port as reported by INFO */
+    int slave_master_link_status; /* Master link status as reported by INFO */
+    /* Failover */
+    char *leader;       /* If this is a master instance, this is the runid of
+                           the Sentinel that should perform the failover. If
+                           this is a Sentinel, this is the runid of the Sentinel
+                           that this other Sentinel is voting as leader.
+                           This field is valid only if SRI_MASTER_DOWN is
+                           set on the Sentinel instance. */
+    int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
+    mstime_t failover_state_change_time;
+    mstime_t failover_start_time;   /* When to start to failover if leader. */
+    mstime_t failover_timeout;      /* Max time to refresh failover state. */
+    struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
+    /* Scripts executed to notify admin or reconfigure clients: when they
+     * are set to NULL no script is executed. */
+    char *notify_script;
+    char *client_reconfig_script;
+} sentinelRedisInstance;
+
+/* Main state. */
+struct sentinelState {
+    dict *masters;      /* Dictionary of master sentinelRedisInstances.
+                           Key is the instance name, value is the
+                           sentinelRedisInstance structure pointer. */
+    int tilt;           /* Are we in TILT mode? */
+    mstime_t tilt_start_time;   /* When TITL started. */
+    mstime_t previous_time;     /* Time last time we ran the time handler. */
+} sentinel;
+
+/* ======================= hiredis ae.c adapters =============================
+ * Note: this implementation is taken from hiredis/adapters/ae.h, however
+ * we have our modified copy for Sentinel in order to use our allocator
+ * and to have full control over how the adapter works. */
+
+typedef struct redisAeEvents {
+    redisAsyncContext *context;
+    aeEventLoop *loop;
+    int fd;
+    int reading, writing;
+} redisAeEvents;
+
+static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
+    ((void)el); ((void)fd); ((void)mask);
+
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    redisAsyncHandleRead(e->context);
+}
+
+static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
+    ((void)el); ((void)fd); ((void)mask);
+
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    redisAsyncHandleWrite(e->context);
+}
+
+static void redisAeAddRead(void *privdata) {
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    aeEventLoop *loop = e->loop;
+    if (!e->reading) {
+        e->reading = 1;
+        aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
+    }
+}
+
+static void redisAeDelRead(void *privdata) {
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    aeEventLoop *loop = e->loop;
+    if (e->reading) {
+        e->reading = 0;
+        aeDeleteFileEvent(loop,e->fd,AE_READABLE);
+    }
+}
+
+static void redisAeAddWrite(void *privdata) {
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    aeEventLoop *loop = e->loop;
+    if (!e->writing) {
+        e->writing = 1;
+        aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
+    }
+}
+
+static void redisAeDelWrite(void *privdata) {
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    aeEventLoop *loop = e->loop;
+    if (e->writing) {
+        e->writing = 0;
+        aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
+    }
+}
+
+static void redisAeCleanup(void *privdata) {
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    redisAeDelRead(privdata);
+    redisAeDelWrite(privdata);
+    zfree(e);
+}
+
+static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
+    redisContext *c = &(ac->c);
+    redisAeEvents *e;
+
+    /* Nothing should be attached when something is already attached */
+    if (ac->ev.data != NULL)
+        return REDIS_ERR;
+
+    /* Create container for context and r/w events */
+    e = (redisAeEvents*)zmalloc(sizeof(*e));
+    e->context = ac;
+    e->loop = loop;
+    e->fd = c->fd;
+    e->reading = e->writing = 0;
+
+    /* Register functions to start/stop listening for events */
+    ac->ev.addRead = redisAeAddRead;
+    ac->ev.delRead = redisAeDelRead;
+    ac->ev.addWrite = redisAeAddWrite;
+    ac->ev.delWrite = redisAeDelWrite;
+    ac->ev.cleanup = redisAeCleanup;
+    ac->ev.data = e;
+
+    return REDIS_OK;
+}
+
+/* ============================= Prototypes ================================= */
+
+void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
+void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
+void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
+sentinelRedisInstance *sentinelGetMasterByName(char *name);
+char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
+char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
+int yesnotoi(char *s);
+void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
+const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
+
+/* ========================= Dictionary types =============================== */
+
+unsigned int dictSdsHash(const void *key);
+int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
+void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
+
+void dictInstancesValDestructor (void *privdata, void *obj) {
+    releaseSentinelRedisInstance(obj);
+}
+
+/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
+ *
+ * also used for: sentinelRedisInstance->sentinels dictionary that maps
+ * sentinels ip:port to last seen time in Pub/Sub hello message. */
+dictType instancesDictType = {
+    dictSdsHash,               /* hash function */
+    NULL,                      /* key dup */
+    NULL,                      /* val dup */
+    dictSdsKeyCompare,         /* key compare */
+    NULL,                      /* key destructor */
+    dictInstancesValDestructor /* val destructor */
+};
+
+/* Instance runid (sds) -> votes (long casted to void*)
+ *
+ * This is useful into sentinelGetObjectiveLeader() function in order to
+ * count the votes and understand who is the leader. */
+dictType leaderVotesDictType = {
+    dictSdsHash,               /* hash function */
+    NULL,                      /* key dup */
+    NULL,                      /* val dup */
+    dictSdsKeyCompare,         /* key compare */
+    NULL,                      /* key destructor */
+    NULL                       /* val destructor */
+};
+
+/* =========================== Initialization =============================== */
+
+void sentinelCommand(redisClient *c);
+
+struct redisCommand sentinelcmds[] = {
+    {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
+    {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
+    {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
+    {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
+    {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
+    {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}
+};
+
+/* This function overwrites a few normal Redis config default with Sentinel
+ * specific defaults. */
+void initSentinelConfig(void) {
+    server.port = REDIS_SENTINEL_PORT;
+}
+
+/* Perform the Sentinel mode initialization. */
+void initSentinel(void) {
+    int j;
+
+    /* Remove usual Redis commands from the command table, then just add
+     * the SENTINEL command. */
+    dictEmpty(server.commands);
+    for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
+        int retval;
+        struct redisCommand *cmd = sentinelcmds+j;
+
+        retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
+        redisAssert(retval == DICT_OK);
+    }
+
+    /* Initialize various data structures. */
+    sentinel.masters = dictCreate(&instancesDictType,NULL);
+    sentinel.tilt = 0;
+    sentinel.tilt_start_time = mstime();
+    sentinel.previous_time = mstime();
+}
+
+/* ============================== sentinelAddr ============================== */
+
+/* Create a sentinelAddr object and return it on success.
+ * On error NULL is returned and errno is set to:
+ *  ENOENT: Can't resolve the hostname.
+ *  EINVAL: Invalid port number.
+ */
+sentinelAddr *createSentinelAddr(char *hostname, int port) {
+    char buf[32];
+    sentinelAddr *sa;
+
+    if (port <= 0 || port > 65535) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
+        errno = ENOENT;
+        return NULL;
+    }
+    sa = zmalloc(sizeof(*sa));
+    sa->ip = sdsnew(buf);
+    sa->port = port;
+    return sa;
+}
+
+/* Free a Sentinel address. Can't fail. */
+void releaseSentinelAddr(sentinelAddr *sa) {
+    sdsfree(sa->ip);
+    zfree(sa);
+}
+
+/* =========================== Events notification ========================== */
+
+void sentinelCallNotificationScript(char *scriptpath, char *type, char *msg) {
+    /* TODO: implement it. */
+}
+
+/* Send an event to log, pub/sub, user notification script.
+ * 
+ * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
+ * the execution of the user notification script.
+ *
+ * 'type' is the message type, also used as a pub/sub channel name.
+ *
+ * 'ri', is the redis instance target of this event if applicable, and is
+ * used to obtain the path of the notification script to execute.
+ *
+ * The remaining arguments are printf-alike.
+ * If the format specifier starts with the two characters "%@" then ri is
+ * not NULL, and the message is prefixed with an instance identifier in the
+ * following format:
+ *
+ *  <instance type> <instance name> <ip> <port>
+ *
+ *  If the instance type is not master, than the additional string is
+ *  added to specify the originating master:
+ *
+ *  @ <master name> <master ip> <master port>
+ *
+ *  Any other specifier after "%@" is processed by printf itself.
+ */
+void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
+                   const char *fmt, ...) {
+    va_list ap;
+    char msg[REDIS_MAX_LOGMSG_LEN];
+    robj *channel, *payload;
+
+    /* Handle %@ */
+    if (fmt[0] == '%' && fmt[1] == '@') {
+        sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
+                                         NULL : ri->master;
+
+        if (master) {
+            snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
+                sentinelRedisInstanceTypeStr(ri),
+                ri->name, ri->addr->ip, ri->addr->port,
+                master->name, master->addr->ip, master->addr->port);
+        } else {
+            snprintf(msg, sizeof(msg), "%s %s %s %d",
+                sentinelRedisInstanceTypeStr(ri),
+                ri->name, ri->addr->ip, ri->addr->port);
+        }
+        fmt += 2;
+    } else {
+        msg[0] = '\0';
+    }
+
+    /* Use vsprintf for the rest of the formatting if any. */
+    if (fmt[0] != '\0') {
+        va_start(ap, fmt);
+        vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
+        va_end(ap);
+    }
+
+    /* Log the message if the log level allows it to be logged. */
+    if (level >= server.verbosity)
+        redisLog(level,"%s %s",type,msg);
+
+    /* Publish the message via Pub/Sub if it's not a debugging one. */
+    if (level != REDIS_DEBUG) {
+        channel = createStringObject(type,strlen(type));
+        payload = createStringObject(msg,strlen(msg));
+        pubsubPublishMessage(channel,payload);
+        decrRefCount(channel);
+        decrRefCount(payload);
+    }
+
+    /* Call the notification script if applicable. */
+    if (level == REDIS_WARNING && ri != NULL) {
+        sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
+                                         ri : ri->master;
+        if (master->notify_script) {
+            sentinelCallNotificationScript(master->notify_script,type,msg);
+        }
+    }
+}
+
+/* ========================== sentinelRedisInstance ========================= */
+
+/* Create a redis instance, the following fields must be populated by the
+ * caller if needed:
+ * runid: set to NULL but will be populated once INFO output is received.
+ * info_refresh: is set to 0 to mean that we never received INFO so far.
+ *
+ * If SRI_MASTER is set into initial flags the instance is added to
+ * sentinel.masters table.
+ *
+ * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
+ * instance is added into master->slaves or master->sentinels table.
+ *
+ * If the instance is a slave or sentinel, the name parameter is ignored and
+ * is created automatically as hostname:port.
+ *
+ * The function fails if hostname can't be resolved or port is out of range.
+ * When this happens NULL is returned and errno is set accordingly to the
+ * createSentinelAddr() function.
+ *
+ * The function may also fail and return NULL with errno set to EBUSY if
+ * a master or slave with the same name already exists. */
+sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
+    sentinelRedisInstance *ri;
+    sentinelAddr *addr;
+    dict *table;
+    char slavename[128], *sdsname;
+
+    redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
+    redisAssert((flags & SRI_MASTER) || master != NULL);
+
+    /* Check address validity. */
+    addr = createSentinelAddr(hostname,port);
+    if (addr == NULL) return NULL;
+
+    /* For slaves and sentinel we use ip:port as name. */
+    if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
+        snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
+        name = slavename;
+    }
+
+    /* Make sure the entry is not duplicated. This may happen when the same
+     * name for a master is used multiple times inside the configuration or
+     * if we try to add multiple times a slave or sentinel with same ip/port
+     * to a master. */
+    if (flags & SRI_MASTER) table = sentinel.masters;
+    else if (flags & SRI_SLAVE) table = master->slaves;
+    else if (flags & SRI_SENTINEL) table = master->sentinels;
+    sdsname = sdsnew(name);
+    if (dictFind(table,sdsname)) {
+        sdsfree(sdsname);
+        errno = EBUSY;
+        return NULL;
+    }
+
+    /* Create the instance object. */
+    ri = zmalloc(sizeof(*ri));
+    /* Note that all the instances are started in the disconnected state,
+     * the event loop will take care of connecting them. */
+    ri->flags = flags | SRI_DISCONNECTED;
+    ri->name = sdsname;
+    ri->runid = NULL;
+    ri->addr = addr;
+    ri->cc = NULL;
+    ri->pc = NULL;
+    ri->pending_commands = 0;
+    ri->cc_conn_time = 0;
+    ri->pc_conn_time = 0;
+    ri->pc_last_activity = 0;
+    ri->last_avail_time = mstime();
+    ri->last_pong_time = mstime();
+    ri->last_pub_time = mstime();
+    ri->last_hello_time = mstime();
+    ri->last_master_down_reply_time = mstime();
+    ri->s_down_since_time = 0;
+    ri->o_down_since_time = 0;
+    ri->down_after_period = master ? master->down_after_period :
+                            SENTINEL_DOWN_AFTER_PERIOD;
+    ri->master_link_down_time = 0;
+    ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
+    ri->slave_reconf_sent_time = 0;
+    ri->slave_master_host = NULL;
+    ri->slave_master_port = 0;
+    ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
+    ri->sentinels = dictCreate(&instancesDictType,NULL);
+    ri->quorum = quorum;
+    ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
+    ri->master = master;
+    ri->slaves = dictCreate(&instancesDictType,NULL);
+    ri->info_refresh = 0;
+
+    /* Failover state. */
+    ri->leader = NULL;
+    ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+    ri->failover_state_change_time = 0;
+    ri->failover_start_time = 0;
+    ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
+    ri->promoted_slave = NULL;
+    ri->notify_script = NULL;
+    ri->client_reconfig_script = NULL;
+
+    /* Add into the right table. */
+    dictAdd(table, ri->name, ri);
+    return ri;
+}
+
+/* Release this instance and all its slaves, sentinels, hiredis connections.
+ * This function also takes care of unlinking the instance from the main
+ * masters table (if it is a master) or from its master sentinels/slaves table
+ * if it is a slave or sentinel. */
+void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
+    /* Release all its slaves or sentinels if any. */
+    dictRelease(ri->sentinels);
+    dictRelease(ri->slaves);
+
+    /* Release hiredis connections. Note that redisAsyncFree() will call
+     * the disconnection callback. */
+    if (ri->cc) {
+        redisAsyncFree(ri->cc);
+        ri->cc = NULL;
+    }
+    if (ri->pc) {
+        redisAsyncFree(ri->pc);
+        ri->pc = NULL;
+    }
+
+    /* Free other resources. */
+    sdsfree(ri->name);
+    sdsfree(ri->runid);
+    sdsfree(ri->notify_script);
+    sdsfree(ri->client_reconfig_script);
+    sdsfree(ri->slave_master_host);
+    sdsfree(ri->leader);
+    releaseSentinelAddr(ri->addr);
+
+    /* Clear state into the master if needed. */
+    if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
+        ri->master->promoted_slave = NULL;
+
+    zfree(ri);
+}
+
+/* Lookup a slave in a master Redis instance, by ip and port. */
+sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
+                sentinelRedisInstance *ri, char *ip, int port)
+{
+    sds key;
+    sentinelRedisInstance *slave;
+  
+    redisAssert(ri->flags & SRI_MASTER);
+    key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
+    slave = dictFetchValue(ri->slaves,key);
+    sdsfree(key);
+    return slave;
+}
+
+/* Return the name of the type of the instance as a string. */
+const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
+    if (ri->flags & SRI_MASTER) return "master";
+    else if (ri->flags & SRI_SLAVE) return "slave";
+    else if (ri->flags & SRI_SENTINEL) return "sentinel";
+    else return "unknown";
+}
+
+/* This function removes all the instances found in the dictionary of instances
+ * 'd', having either:
+ * 
+ * 1) The same ip/port as specified.
+ * 2) The same runid.
+ *
+ * "1" and "2" don't need to verify at the same time, just one is enough.
+ * If "runid" is NULL it is not checked.
+ * Similarly if "ip" is NULL it is not checked.
+ *
+ * This function is useful because every time we add a new Sentinel into
+ * a master's Sentinels dictionary, we want to be very sure about not
+ * having duplicated instances for any reason. This is so important because
+ * we use those other sentinels in order to run our quorum protocol to
+ * understand if it's time to proceeed with the fail over.
+ *
+ * Making sure no duplication is possible we greately improve the robustness
+ * of the quorum (otherwise we may end counting the same instance multiple
+ * times for some reason).
+ *
+ * The function returns the number of Sentinels removed. */
+int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
+    dictIterator *di;
+    dictEntry *de;
+    int removed = 0;
+
+    di = dictGetSafeIterator(master->sentinels);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+
+        if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
+            (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
+        {
+            dictDelete(master->sentinels,ri->name);
+            removed++;
+        }
+    }
+    dictReleaseIterator(di);
+    return removed;
+}
+
+/* Search an instance with the same runid, ip and port into a dictionary
+ * of instances. Return NULL if not found, otherwise return the instance
+ * pointer.
+ *
+ * runid or ip can be NULL. In such a case the search is performed only
+ * by the non-NULL field. */
+sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
+    dictIterator *di;
+    dictEntry *de;
+    sentinelRedisInstance *instance = NULL;
+
+    redisAssert(ip || runid);   /* User must pass at least one search param. */
+    di = dictGetIterator(instances);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+
+        if (runid && !ri->runid) continue;
+        if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
+            (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
+                            ri->addr->port == port)))
+        {
+            instance = ri;
+            break;
+        }
+    }
+    dictReleaseIterator(di);
+    return instance;
+}
+
+/* Simple master lookup by name */
+sentinelRedisInstance *sentinelGetMasterByName(char *name) {
+    sentinelRedisInstance *ri;
+    sds sdsname = sdsnew(name);
+
+    ri = dictFetchValue(sentinel.masters,sdsname);
+    sdsfree(sdsname);
+    return ri;
+}
+
+/* Add the specified flags to all the instances in the specified dictionary. */
+void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
+    dictIterator *di;
+    dictEntry *de;
+
+    di = dictGetIterator(instances);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+        ri->flags |= flags;
+    }
+    dictReleaseIterator(di);
+}
+
+/* Remove the specified flags to all the instances in the specified
+ * dictionary. */
+void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
+    dictIterator *di;
+    dictEntry *de;
+
+    di = dictGetIterator(instances);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+        ri->flags &= ~flags;
+    }
+    dictReleaseIterator(di);
+}
+
+/* Reset the state of a monitored master:
+ * 1) Remove all slaves.
+ * 2) Remove all sentinels.
+ * 3) Remove most of the flags resulting from runtime operations.
+ * 4) Reset timers to their default value.
+ * 5) In the process of doing this undo the failover if in progress.
+ * 6) Disconnect the connections with the master (will reconnect automatically).
+ */
+void sentinelResetMaster(sentinelRedisInstance *ri) {
+    redisAssert(ri->flags & SRI_MASTER);
+    dictRelease(ri->slaves);
+    dictRelease(ri->sentinels);
+    ri->slaves = dictCreate(&instancesDictType,NULL);
+    ri->sentinels = dictCreate(&instancesDictType,NULL);
+    if (ri->cc) redisAsyncFree(ri->cc);
+    if (ri->pc) redisAsyncFree(ri->pc);
+    ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
+    if (ri->leader) {
+        sdsfree(ri->leader);
+        ri->leader = NULL;
+    }
+    ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+    ri->failover_state_change_time = 0;
+    ri->failover_start_time = 0;
+    ri->promoted_slave = NULL;
+    sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
+}
+
+/* Call sentinelResetMaster() on every master with a name matching the specified
+ * pattern. */
+int sentinelResetMastersByPattern(char *pattern) {
+    dictIterator *di;
+    dictEntry *de;
+    int reset = 0;
+
+    di = dictGetIterator(sentinel.masters);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+
+        if (ri->name) {
+            if (stringmatch(pattern,ri->name,0)) {
+                sentinelResetMaster(ri);
+                reset++;
+            }
+        }
+    }
+    dictReleaseIterator(di);
+    return reset;
+}
+
+/* ============================ Config handling ============================= */
+char *sentinelHandleConfiguration(char **argv, int argc) {
+    sentinelRedisInstance *ri;
+
+    if (!strcasecmp(argv[0],"monitor") && argc == 5) {
+        /* monitor <name> <host> <port> <quorum> */
+        int quorum = atoi(argv[4]);
+
+        if (quorum <= 0) return "Quorum must be 1 or greater.";
+        if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
+                                        atoi(argv[3]),quorum,NULL) == NULL)
+        {
+            switch(errno) {
+            case EBUSY: return "Duplicated master name.";
+            case ENOENT: return "Can't resolve master instance hostname.";
+            case EINVAL: return "Invalid port number";
+            }
+        }
+    } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
+        /* down-after-milliseconds <name> <milliseconds> */
+        ri = sentinelGetMasterByName(argv[1]);
+        if (!ri) return "No such master with specified name.";
+        ri->down_after_period = atoi(argv[2]);
+        if (ri->down_after_period <= 0)
+            return "negative or zero time parameter.";
+    } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
+        /* failover-timeout <name> <milliseconds> */
+        ri = sentinelGetMasterByName(argv[1]);
+        if (!ri) return "No such master with specified name.";
+        ri->failover_timeout = atoi(argv[2]);
+        if (ri->failover_timeout <= 0)
+            return "negative or zero time parameter.";
+    } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
+        /* can-failover <name> <yes/no> */
+        int yesno = yesnotoi(argv[2]);
+
+        ri = sentinelGetMasterByName(argv[1]);
+        if (!ri) return "No such master with specified name.";
+        if (yesno == -1) return "Argument must be either yes or no.";
+        if (yesno)
+            ri->flags |= SRI_CAN_FAILOVER;
+        else
+            ri->flags &= ~SRI_CAN_FAILOVER;
+   } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
+        /* parallel-syncs <name> <milliseconds> */
+        ri = sentinelGetMasterByName(argv[1]);
+        if (!ri) return "No such master with specified name.";
+        ri->parallel_syncs = atoi(argv[2]);
+    } else {
+        return "Unrecognized sentinel configuration statement.";
+    }
+    return NULL;
+}
+
+/* ====================== hiredis connection handling ======================= */
+
+/* This function takes an hiredis context that is in an error condition
+ * and make sure to mark the instance as disconnected performing the
+ * cleanup needed.
+ *
+ * Note: we don't free the hiredis context as hiredis will do it for us
+ * for async conenctions. */
+void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
+    sentinelRedisInstance *ri = c->data;
+    int pubsub = (ri->pc == c);
+
+    sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
+        "%@ #%s", c->errstr);
+    if (pubsub)
+        ri->pc = NULL;
+    else
+        ri->cc = NULL;
+    ri->flags |= SRI_DISCONNECTED;
+}
+
+void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
+    if (status != REDIS_OK) {
+        sentinelDisconnectInstanceFromContext(c);
+    } else {
+        sentinelRedisInstance *ri = c->data;
+        int pubsub = (ri->pc == c);
+
+        sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
+            "%@");
+    }
+}
+
+void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
+    sentinelDisconnectInstanceFromContext(c);
+}
+
+/* Create the async connections for the specified instance if the instance
+ * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
+ * one of the two links (commands and pub/sub) is missing. */
+void sentinelReconnectInstance(sentinelRedisInstance *ri) {
+    if (!(ri->flags & SRI_DISCONNECTED)) return;
+
+    /* Commands connection. */
+    if (ri->cc == NULL) {
+        ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
+        if (ri->cc->err) {
+            sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
+                ri->cc->errstr);
+            redisAsyncFree(ri->cc);
+            ri->cc = NULL;
+        } else {
+            ri->cc_conn_time = mstime();
+            ri->cc->data = ri;
+            redisAeAttach(server.el,ri->cc);
+            redisAsyncSetConnectCallback(ri->cc,
+                                            sentinelLinkEstablishedCallback);
+            redisAsyncSetDisconnectCallback(ri->cc,
+                                            sentinelDisconnectCallback);
+        }
+    }
+    /* Pub / Sub */
+    if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
+        ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
+        if (ri->pc->err) {
+            sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
+                ri->pc->errstr);
+            redisAsyncFree(ri->pc);
+            ri->pc = NULL;
+        } else {
+            int retval;
+
+            ri->pc_conn_time = mstime();
+            ri->pc->data = ri;
+            redisAeAttach(server.el,ri->pc);
+            redisAsyncSetConnectCallback(ri->pc,
+                                            sentinelLinkEstablishedCallback);
+            redisAsyncSetDisconnectCallback(ri->pc,
+                                            sentinelDisconnectCallback);
+            /* Now we subscribe to the Sentinels "Hello" channel. */
+            retval = redisAsyncCommand(ri->pc,
+                sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
+                    SENTINEL_HELLO_CHANNEL);
+            if (retval != REDIS_OK) {
+                /* If we can't subscribe, the Pub/Sub connection is useless
+                 * and we can simply disconnect it and try again. */
+                redisAsyncFree(ri->pc);
+                ri->pc = NULL;
+                return;
+            }
+        }
+    }
+    /* Clear the DISCONNECTED flags only if we have both the connections
+     * (or just the commands connection if this is a slave or a
+     * sentinel instance). */
+    if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
+        ri->flags &= ~SRI_DISCONNECTED;
+}
+
+/* ======================== Redis instances pinging  ======================== */
+
+/* Process the INFO output from masters. */
+void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
+    sds *lines;
+    int numlines, j;
+    int role = 0;
+
+
+    /* The following fields must be reset to a given value in the case they
+     * are not found at all in the INFO output. */
+    ri->master_link_down_time = 0;
+
+    /* Process line by line. */
+    lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
+    for (j = 0; j < numlines; j++) {
+        sentinelRedisInstance *slave;
+        sds l = lines[j];
+
+        /* run_id:<40 hex chars>*/
+        if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
+            if (ri->runid == NULL) {
+                ri->runid = sdsnewlen(l+7,40);
+            } else {
+                /* TODO: check if run_id has changed. This means the
+                 * instance has been restarted, we want to set a flag
+                 * and notify this event. */
+            }
+        }
+
+        /* slave0:<ip>,<port>,<state> */
+        if ((ri->flags & SRI_MASTER) &&
+            sdslen(l) >= 7 &&
+            !memcmp(l,"slave",5) && isdigit(l[5]))
+        {
+            char *ip, *port, *end;
+
+            ip = strchr(l,':'); if (!ip) continue;
+            ip++; /* Now ip points to start of ip address. */
+            port = strchr(ip,','); if (!port) continue;
+            *port = '\0'; /* nul term for easy access. */
+            port++; /* Now port points to start of port number. */
+            end = strchr(port,','); if (!end) continue;
+            *end = '\0'; /* nul term for easy access. */
+
+            /* Check if we already have this slave into our table,
+             * otherwise add it. */
+            if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
+                if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
+                            atoi(port), ri->quorum,ri)) != NULL)
+                {
+                    sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
+                }
+            }
+        }
+
+        /* master_link_down_since_seconds:<seconds> */
+        if (sdslen(l) >= 32 &&
+            !memcmp(l,"master_link_down_since_seconds",30))
+        {
+            ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
+        }
+
+        /* role:<role> */
+        if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
+        else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
+
+        if (role == SRI_SLAVE) {
+            /* master_host:<host> */
+            if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
+                sdsfree(ri->slave_master_host);
+                ri->slave_master_host = sdsnew(l+12);
+            }
+
+            /* master_port:<port> */
+            if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
+                ri->slave_master_port = atoi(l+12);
+            
+            /* master_link_status:<status> */
+            if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
+                ri->slave_master_link_status =
+                    (strcasecmp(l+19,"up") == 0) ?
+                    SENTINEL_MASTER_LINK_STATUS_UP :
+                    SENTINEL_MASTER_LINK_STATUS_DOWN;
+            }
+        }
+    }
+    ri->info_refresh = mstime();
+    sdsfreesplitres(lines,numlines);
+
+    if (sentinel.tilt) return;
+
+    /* Act if a slave turned into a master. */
+    if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
+        if (ri->flags & SRI_PROMOTED) {
+            /* If this is a promoted slave we can change state to the
+             * failover state machine. */
+            if (ri->master &&
+                (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+                (ri->master->flags & SRI_I_AM_THE_LEADER) &&
+                (ri->master->failover_state ==
+                    SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
+            {
+                ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
+                ri->master->failover_state_change_time = mstime();
+                sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
+                sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
+                    ri->master,"%@");
+            }
+        } else {
+            /* Otherwise we interpret this as the start of the failover. */
+            if (ri->master &&
+                (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) == 0)
+            {
+                ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
+                sentinelEvent(REDIS_WARNING,"failover-detected",ri->master,"%@");
+                ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
+                ri->master->failover_state_change_time = mstime();
+                ri->master->promoted_slave = ri;
+                ri->flags |= SRI_PROMOTED;
+                /* We are an observer, so we can only assume that the leader
+                 * is reconfiguring the slave instances. For this reason we
+                 * set all the instances as RECONF_SENT waiting for progresses
+                 * on this side. */
+                sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
+                    SRI_RECONF_SENT);
+            }
+        }
+    }
+
+    /* Detect if the slave that is in the process of being reconfigured
+     * changed state. */
+    if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
+        (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
+    {
+        /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
+        if ((ri->flags & SRI_RECONF_SENT) &&
+            ri->slave_master_host &&
+            strcmp(ri->slave_master_host,
+                    ri->master->promoted_slave->addr->ip) == 0 &&
+            ri->slave_master_port == ri->master->promoted_slave->addr->port)
+        {
+            ri->flags &= ~SRI_RECONF_SENT;
+            ri->flags |= SRI_RECONF_INPROG;
+            sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
+        }
+
+        /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
+        if ((ri->flags & SRI_RECONF_INPROG) &&
+            ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
+        {
+            ri->flags &= ~SRI_RECONF_INPROG;
+            ri->flags |= SRI_RECONF_DONE;
+            sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
+            /* If we are moving forward (a new slave is now configured)
+             * we update the change_time as we are conceptually passing
+             * to the next slave. */
+            ri->failover_state_change_time = mstime();
+        }
+    }
+}
+
+void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+    redisReply *r;
+
+    ri->pending_commands--;
+    if (!reply) return;
+    r = reply;
+
+    if (r->type == REDIS_REPLY_STRING) {
+        sentinelRefreshInstanceInfo(ri,r->str);
+    }
+}
+
+/* Just discard the reply. We use this when we are not monitoring the return
+ * value of the command but its effects directly. */
+void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+
+    ri->pending_commands--;
+}
+
+void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+    redisReply *r;
+
+    ri->pending_commands--;
+    if (!reply) return;
+    r = reply;
+
+    if (r->type == REDIS_REPLY_STATUS ||
+        r->type == REDIS_REPLY_ERROR) {
+        /* Update the "instance available" field only if this is an
+         * acceptable reply. */
+        if (strncmp(r->str,"PONG",4) == 0 ||
+            strncmp(r->str,"LOADING",7) == 0 ||
+            strncmp(r->str,"MASTERDOWN",10) == 0)
+        {
+            ri->last_avail_time = mstime();
+        }
+    }
+    ri->last_pong_time = mstime();
+}
+
+/* This is called when we get the reply about the PUBLISH command we send
+ * to the master to advertise this sentinel. */
+void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+    redisReply *r;
+
+    ri->pending_commands--;
+    if (!reply) return;
+    r = reply;
+
+    /* Only update pub_time if we actually published our message. Otherwise
+     * we'll retry against in 100 milliseconds. */
+    if (r->type != REDIS_REPLY_ERROR)
+        ri->last_pub_time = mstime();
+}
+
+/* This is our Pub/Sub callback for the Hello channel. It's useful in order
+ * to discover other sentinels attached at the same master. */
+void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+    redisReply *r;
+
+    if (!reply) return;
+    r = reply;
+
+    /* Update the last activity in the pubsub channel. Note that since we
+     * receive our messages as well this timestamp can be used to detect
+     * if the link is probably diconnected even if it seems otherwise. */
+    ri->pc_last_activity = mstime();
+   
+    /* Sanity check in the reply we expect, so that the code that follows
+     * can avoid to check for details. */
+    if (r->type != REDIS_REPLY_ARRAY ||
+        r->elements != 3 ||
+        r->element[0]->type != REDIS_REPLY_STRING ||
+        r->element[1]->type != REDIS_REPLY_STRING ||
+        r->element[2]->type != REDIS_REPLY_STRING ||
+        strcmp(r->element[0]->str,"message") != 0) return;
+
+    /* We are not interested in meeting ourselves */
+    if (strstr(r->element[2]->str,server.runid) != NULL) return;
+
+    {
+        int numtokens, port, removed, canfailover;
+        char **token = sdssplitlen(r->element[2]->str,
+                                   r->element[2]->len,
+                                   ":",1,&numtokens);
+        sentinelRedisInstance *sentinel;
+
+        if (numtokens == 4) {
+            /* First, try to see if we already have this sentinel. */
+            port = atoi(token[1]);
+            canfailover = atoi(token[3]);
+            sentinel = getSentinelRedisInstanceByAddrAndRunID(
+                            ri->sentinels,token[0],port,token[2]);
+
+            if (!sentinel) {
+                /* If not, remove all the sentinels that have the same runid
+                 * OR the same ip/port, because it's either a restart or a
+                 * network topology change. */
+                removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
+                                token[2]);
+                if (removed) {
+                    sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
+                        "%@ #duplicate of %s:%d or %s",
+                        token[0],port,token[2]);
+                }
+
+                /* Add the new sentinel. */
+                sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
+                                token[0],port,ri->quorum,ri);
+                if (sentinel) {
+                    sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
+                    /* The runid is NULL after a new instance creation and
+                     * for Sentinels we don't have a later chance to fill it,
+                     * so do it now. */
+                    sentinel->runid = sdsnew(token[2]);
+                }
+            }
+
+            /* Update the state of the Sentinel. */
+            if (sentinel) {
+                sentinel->last_hello_time = mstime();
+                if (canfailover)
+                    sentinel->flags |= SRI_CAN_FAILOVER;
+                else
+                    sentinel->flags &= ~SRI_CAN_FAILOVER;
+            }
+        }
+        sdsfreesplitres(token,numtokens);
+    }
+}
+
+void sentinelPingInstance(sentinelRedisInstance *ri) {
+    mstime_t now = mstime();
+    mstime_t info_period;
+    int retval;
+
+    /* Return ASAP if we have already a PING or INFO already pending, or
+     * in the case the instance is not properly connected. */
+    if (ri->flags & SRI_DISCONNECTED) return;
+
+    /* For INFO, PING, PUBLISH that are not critical commands to send we
+     * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
+     * want to use a lot of memory just because a link is not working
+     * properly (note that anyway there is a redundant protection about this,
+     * that is, the link will be disconnected and reconnected if a long
+     * timeout condition is detected. */
+    if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
+
+    /* If this is a slave of a master in O_DOWN condition we start sending
+     * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
+     * period. In this state we want to closely monitor slaves in case they
+     * are turned into masters by another Sentinel, or by the sysadmin. */
+    if ((ri->flags & SRI_SLAVE) &&
+        (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
+        info_period = 1000;
+    } else {
+        info_period = SENTINEL_INFO_PERIOD;
+    }
+
+    if ((ri->flags & SRI_SENTINEL) == 0 &&
+        (ri->info_refresh == 0 ||
+        (now - ri->info_refresh) > info_period))
+    {
+        /* Send INFO to masters and slaves, not sentinels. */
+        retval = redisAsyncCommand(ri->cc,
+            sentinelInfoReplyCallback, NULL, "INFO");
+        if (retval != REDIS_OK) return;
+        ri->pending_commands++;
+    } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
+        /* Send PING to all the three kinds of instances. */
+        retval = redisAsyncCommand(ri->cc,
+            sentinelPingReplyCallback, NULL, "PING");
+        if (retval != REDIS_OK) return;
+        ri->pending_commands++;
+    } else if ((ri->flags & SRI_MASTER) &&
+               (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
+    {
+        /* PUBLISH hello messages only to masters. */
+        struct sockaddr_in sa;
+        socklen_t salen = sizeof(sa);
+
+        if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
+            char myaddr[128];
+
+            snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
+                inet_ntoa(sa.sin_addr), server.port, server.runid,
+                (ri->flags & SRI_CAN_FAILOVER) != 0);
+            retval = redisAsyncCommand(ri->cc,
+                sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
+                    SENTINEL_HELLO_CHANNEL,myaddr);
+            if (retval != REDIS_OK) return;
+            ri->pending_commands++;
+        }
+    }
+}
+
+/* =========================== SENTINEL command ============================= */
+
+const char *sentinelFailoverStateStr(int state) {
+    switch(state) {
+    case SENTINEL_FAILOVER_STATE_NONE: return "none";
+    case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
+    case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
+    case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
+    case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
+    case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
+    case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
+    case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
+    case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
+    default: return "unknown";
+    }
+}
+
+/* Redis instance to Redis protocol representation. */
+void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
+    char *flags = sdsempty();
+    void *mbl;
+    int fields = 0;
+
+    mbl = addDeferredMultiBulkLength(c);
+
+    addReplyBulkCString(c,"name");
+    addReplyBulkCString(c,ri->name);
+    fields++;
+
+    addReplyBulkCString(c,"ip");
+    addReplyBulkCString(c,ri->addr->ip);
+    fields++;
+
+    addReplyBulkCString(c,"port");
+    addReplyBulkLongLong(c,ri->addr->port);
+    fields++;
+
+    addReplyBulkCString(c,"runid");
+    addReplyBulkCString(c,ri->runid ? ri->runid : "");
+    fields++;
+
+    addReplyBulkCString(c,"flags");
+    if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
+    if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
+    if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
+    if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
+    if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
+    if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
+    if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
+    if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
+        flags = sdscat(flags,"failover_in_progress,");
+    if (ri->flags & SRI_I_AM_THE_LEADER)
+        flags = sdscat(flags,"i_am_the_leader,");
+    if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
+    if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
+    if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
+    if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
+
+    if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
+    addReplyBulkCString(c,flags);
+    sdsfree(flags);
+    fields++;
+
+    addReplyBulkCString(c,"pending-commands");
+    addReplyBulkLongLong(c,ri->pending_commands);
+    fields++;
+
+    if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
+        addReplyBulkCString(c,"failover-state");
+        addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
+        fields++;
+    }
+
+    addReplyBulkCString(c,"last-ok-ping-reply");
+    addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
+    fields++;
+
+    addReplyBulkCString(c,"last-ping-reply");
+    addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
+    fields++;
+
+    if (ri->flags & SRI_S_DOWN) {
+        addReplyBulkCString(c,"s-down-time");
+        addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
+        fields++;
+    }
+
+    if (ri->flags & SRI_O_DOWN) {
+        addReplyBulkCString(c,"o-down-time");
+        addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
+        fields++;
+    }
+
+    /* Masters and Slaves */
+    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+        addReplyBulkCString(c,"info-refresh");
+        addReplyBulkLongLong(c,mstime() - ri->info_refresh);
+        fields++;
+    }
+
+    /* Only masters */
+    if (ri->flags & SRI_MASTER) {
+        addReplyBulkCString(c,"num-slaves");
+        addReplyBulkLongLong(c,dictSize(ri->slaves));
+        fields++;
+
+        addReplyBulkCString(c,"num-other-sentinels");
+        addReplyBulkLongLong(c,dictSize(ri->sentinels));
+        fields++;
+
+        addReplyBulkCString(c,"quorum");
+        addReplyBulkLongLong(c,ri->quorum);
+        fields++;
+    }
+
+    /* Only slaves */
+    if (ri->flags & SRI_SLAVE) {
+        addReplyBulkCString(c,"master-link-down-time");
+        addReplyBulkLongLong(c,ri->master_link_down_time);
+        fields++;
+
+        addReplyBulkCString(c,"master-link-status");
+        addReplyBulkCString(c,
+            (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
+            "ok" : "err");
+        fields++;
+
+        addReplyBulkCString(c,"master-host");
+        addReplyBulkCString(c,
+            ri->slave_master_host ? ri->slave_master_host : "?");
+        fields++;
+
+        addReplyBulkCString(c,"master-port");
+        addReplyBulkLongLong(c,ri->slave_master_port);
+        fields++;
+    }
+
+    /* Only sentinels */
+    if (ri->flags & SRI_SENTINEL) {
+        addReplyBulkCString(c,"last-hello-message");
+        addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
+        fields++;
+
+        addReplyBulkCString(c,"can-failover-its-master");
+        addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
+        fields++;
+
+        if (ri->flags & SRI_MASTER_DOWN) {
+            addReplyBulkCString(c,"subjective-leader");
+            addReplyBulkCString(c,ri->leader ? ri->leader : "?");
+            fields++;
+        }
+    }
+
+    setDeferredMultiBulkLength(c,mbl,fields*2);
+}
+
+/* Output a number of instances contanined inside a dictionary as
+ * Redis protocol. */
+void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
+    dictIterator *di;
+    dictEntry *de;
+
+    di = dictGetIterator(instances);
+    addReplyMultiBulkLen(c,dictSize(instances));
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+
+        addReplySentinelRedisInstance(c,ri);
+    }
+    dictReleaseIterator(di);
+}
+
+/* Lookup the named master into sentinel.masters.
+ * If the master is not found reply to the client with an error and returns
+ * NULL. */
+sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
+                        robj *name)
+{
+    sentinelRedisInstance *ri;
+
+    ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
+    if (!ri) {
+        addReplyError(c,"No such master with that name");
+        return NULL;
+    }
+    return ri;
+}
+
+void sentinelCommand(redisClient *c) {
+    if (!strcasecmp(c->argv[1]->ptr,"masters")) {
+        /* SENTINEL MASTERS */
+        if (c->argc != 2) goto numargserr;
+
+        addReplyDictOfRedisInstances(c,sentinel.masters);
+    } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
+        /* SENTINEL SLAVES <master-name> */
+        sentinelRedisInstance *ri;
+
+        if (c->argc != 3) goto numargserr;
+        if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
+            return;
+        addReplyDictOfRedisInstances(c,ri->slaves);
+    } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
+        /* SENTINEL SENTINELS <master-name> */
+        sentinelRedisInstance *ri;
+
+        if (c->argc != 3) goto numargserr;
+        if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
+            return;
+        addReplyDictOfRedisInstances(c,ri->sentinels);
+    } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
+        /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
+        sentinelRedisInstance *ri;
+        char *leader = NULL;
+        long port;
+        int isdown = 0;
+
+        if (c->argc != 4) goto numargserr;
+        if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
+            return;
+        ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
+            c->argv[2]->ptr,port,NULL);
+
+        /* It exists? Is actually a master? Is subjectively down? It's down.
+         * Note: if we are in tilt mode we always reply with "0". */
+        if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
+                                    (ri->flags & SRI_MASTER))
+            isdown = 1;
+        if (ri) leader = sentinelGetSubjectiveLeader(ri);
+
+        /* Reply with a two-elements multi-bulk reply: down state, leader. */
+        addReplyMultiBulkLen(c,2);
+        addReply(c, isdown ? shared.cone : shared.czero);
+        addReplyBulkCString(c, leader ? leader : "?");
+        if (leader) sdsfree(leader);
+    } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
+        /* SENTINEL RESET <pattern> */
+        if (c->argc != 3) goto numargserr;
+        addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr));
+    } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
+        /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
+        sentinelRedisInstance *ri;
+
+        if (c->argc != 3) goto numargserr;
+        ri = sentinelGetMasterByName(c->argv[2]->ptr);
+        if (ri == NULL) {
+            addReply(c,shared.nullmultibulk);
+        } else {
+            sentinelAddr *addr = ri->addr;
+
+            if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
+                addr = ri->promoted_slave->addr;
+            addReplyMultiBulkLen(c,2);
+            addReplyBulkCString(c,addr->ip);
+            addReplyBulkLongLong(c,addr->port);
+        }
+    } else {
+        addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
+                               (char*)c->argv[1]->ptr);
+    }
+    return;
+
+numargserr:
+    addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
+                          (char*)c->argv[1]->ptr);
+}
+
+/* ===================== SENTINEL availability checks ======================= */
+
+/* Is this instance down from our point of view? */
+void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
+    mstime_t elapsed = mstime() - ri->last_avail_time;
+
+    /* Check if we are in need for a reconnection of one of the 
+     * links, because we are detecting low activity.
+     *
+     * 1) Check if the command link seems connected, was connected not less
+     *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
+     *    idle time that is greater than down_after_period / 2 seconds. */
+    if (ri->cc &&
+        (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+        (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
+    {
+        redisAsyncFree(ri->cc); /* will call the disconnection callback */
+    }
+
+    /* 2) Check if the pubsub link seems connected, was connected not less
+     *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
+     *    activity in the Pub/Sub channel for more than
+     *    SENTINEL_PUBLISH_PERIOD * 3.
+     */
+    if (ri->pc &&
+        (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+        (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
+    {
+        redisAsyncFree(ri->pc); /* will call the disconnection callback */
+    }
+
+    /* Update the subjectively down flag. */
+    if (elapsed > ri->down_after_period) {
+        /* Is subjectively down */
+        if ((ri->flags & SRI_S_DOWN) == 0) {
+            sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
+            ri->s_down_since_time = mstime();
+            ri->flags |= SRI_S_DOWN;
+        }
+    } else {
+        /* Is subjectively up */
+        if (ri->flags & SRI_S_DOWN) {
+            sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
+            ri->flags &= ~SRI_S_DOWN;
+        }
+    }
+}
+
+/* Is this instance down accordingly to the configured quorum? */
+void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
+    dictIterator *di;
+    dictEntry *de;
+    int quorum = 0, odown = 0;
+
+    if (master->flags & SRI_S_DOWN) {
+        /* Is down for enough sentinels? */
+        quorum = 1; /* the current sentinel. */
+        /* Count all the other sentinels. */
+        di = dictGetIterator(master->sentinels);
+        while((de = dictNext(di)) != NULL) {
+            sentinelRedisInstance *ri = dictGetVal(de);
+
+            if (ri->flags & SRI_MASTER_DOWN) quorum++;
+        }
+        dictReleaseIterator(di);
+        if (quorum >= master->quorum) odown = 1;
+    }
+
+    /* Set the flag accordingly to the outcome. */
+    if (odown) {
+        if ((master->flags & SRI_O_DOWN) == 0) {
+            sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
+                quorum, master->quorum);
+            master->flags |= SRI_O_DOWN;
+            master->o_down_since_time = mstime();
+        }
+    } else {
+        if (master->flags & SRI_O_DOWN) {
+            sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
+            master->flags &= ~SRI_O_DOWN;
+        }
+    }
+}
+
+/* Receive the SENTINEL is-master-down-by-addr reply, see the
+ * sentinelAskMasterStateToOtherSentinels() function for more information. */
+void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+    redisReply *r;
+
+    ri->pending_commands--;
+    if (!reply) return;
+    r = reply;
+
+    /* Ignore every error or unexpected reply.
+     * Note that if the command returns an error for any reason we'll
+     * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
+    if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
+        r->element[0]->type == REDIS_REPLY_INTEGER &&
+        r->element[1]->type == REDIS_REPLY_STRING)
+    {
+        ri->last_master_down_reply_time = mstime();
+        if (r->element[0]->integer == 1) {
+            ri->flags |= SRI_MASTER_DOWN;
+        } else {
+            ri->flags &= ~SRI_MASTER_DOWN;
+        }
+        sdsfree(ri->leader);
+        ri->leader = sdsnew(r->element[1]->str);
+    }
+}
+
+/* If we think (subjectively) the master is down, we start sending
+ * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
+ * in order to get the replies that allow to reach the quorum and
+ * possibly also mark the master as objectively down. */
+void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
+    dictIterator *di;
+    dictEntry *de;
+
+    di = dictGetIterator(master->sentinels);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+        mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
+        char port[32];
+        int retval;
+
+        /* If the master state from other sentinel is too old, we clear it. */
+        if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
+            ri->flags &= ~SRI_MASTER_DOWN;
+            sdsfree(ri->leader);
+            ri->leader = NULL;
+        }
+
+        /* Only ask if master is down to other sentinels if:
+         *
+         * 1) We believe it is down, or there is a failover in progress.
+         * 2) Sentinel is connected.
+         * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
+        if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
+            continue;
+        if (ri->flags & SRI_DISCONNECTED) continue;
+        if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
+            continue;
+
+        /* Ask */
+        ll2string(port,sizeof(port),master->addr->port);
+        retval = redisAsyncCommand(ri->cc,
+                    sentinelReceiveIsMasterDownReply, NULL,
+                    "SENTINEL is-master-down-by-addr %s %s",
+                    master->addr->ip, port);
+        if (retval == REDIS_OK) ri->pending_commands++;
+    }
+    dictReleaseIterator(di);
+}
+
+/* =============================== FAILOVER ================================= */
+
+/* Given a master get the "subjective leader", that is, among all the sentinels
+ * with given characteristics, the one with the lexicographically smaller
+ * runid. The characteristics required are:
+ *
+ * 1) Has SRI_CAN_FAILOVER flag.
+ * 2) Is not disconnected.
+ * 3) Recently answered to our ping (no longer than
+ *    SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
+ *
+ * The function returns a pointer to an sds string representing the runid of the
+ * leader sentinel instance (from our point of view). Otherwise NULL is
+ * returned if there are no suitable sentinels.
+ */
+
+int compareRunID(const void *a, const void *b) {
+    char **aptrptr = (char**)a, **bptrptr = (char**)b;
+    return strcasecmp(*aptrptr, *bptrptr);
+}
+
+char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
+    dictIterator *di;
+    dictEntry *de;
+    char **instance =
+        zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
+    int instances = 0;
+    char *leader = NULL;
+
+    if (master->flags & SRI_CAN_FAILOVER) {
+        /* Add myself if I'm a Sentinel that can failover this master. */
+        instance[instances++] = server.runid;
+    }
+
+    di = dictGetIterator(master->sentinels);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+        mstime_t lag = mstime() - ri->last_avail_time;
+
+        if (lag > SENTINEL_INFO_VALIDITY_TIME ||
+            !(ri->flags & SRI_CAN_FAILOVER) ||
+            (ri->flags & SRI_DISCONNECTED) ||
+            ri->runid == NULL)
+            continue;
+        instance[instances++] = ri->runid;
+    }
+    dictReleaseIterator(di);
+
+    /* If we have at least one instance passing our checks, order the array
+     * by runid. */
+    if (instances) {
+        qsort(instance,instances,sizeof(char*),compareRunID);
+        leader = sdsnew(instance[0]);
+    }
+    zfree(instance);
+    return leader;
+}
+
+struct sentinelLeader {
+    char *runid;
+    unsigned long votes;
+};
+
+/* Helper function for sentinelGetObjectiveLeader, increment the counter
+ * relative to the specified runid. */
+void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
+    dictEntry *de = dictFind(counters,runid);
+    uint64_t oldval;
+
+    if (de) {
+        oldval = dictGetUnsignedIntegerVal(de);
+        dictSetUnsignedIntegerVal(de,oldval+1);
+    } else {
+        de = dictAddRaw(counters,runid);
+        redisAssert(de != NULL);
+        dictSetUnsignedIntegerVal(de,1);
+    }
+}
+
+/* Scan all the Sentinels attached to this master to check what is the
+ * most voted leader among Sentinels. */
+char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
+    dict *counters;
+    dictIterator *di;
+    dictEntry *de;
+    unsigned int voters = 0, voters_quorum;
+    char *myvote;
+    char *winner = NULL;
+
+    redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
+    counters = dictCreate(&leaderVotesDictType,NULL);
+
+    /* Count my vote. */
+    myvote = sentinelGetSubjectiveLeader(master);
+    if (myvote) {
+        sentinelObjectiveLeaderIncr(counters,myvote);
+        voters++;
+    }
+
+    /* Count other sentinels votes */
+    di = dictGetIterator(master->sentinels);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+        if (ri->leader == NULL) continue;
+        /* If the failover is not already in progress we are only interested
+         * in Sentinels that believe the master is down. Otherwise the leader
+         * selection is useful for the "failover-takedown" when the original
+         * leader fails. In that case we consider all the voters. */
+        if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+            !(ri->flags & SRI_MASTER_DOWN)) continue;
+        sentinelObjectiveLeaderIncr(counters,ri->leader);
+        voters++;
+    }
+    dictReleaseIterator(di);
+    voters_quorum = voters/2+1;
+
+    /* Check what's the winner. For the winner to win, it needs two conditions:
+     * 1) Absolute majority between voters (50% + 1).
+     * 2) And anyway at least master->quorum votes. */
+    {
+        uint64_t max_votes = 0; /* Max votes so far. */
+
+        di = dictGetIterator(counters);
+        while((de = dictNext(di)) != NULL) {
+            uint64_t votes = dictGetUnsignedIntegerVal(de);
+
+            if (max_votes < votes) {
+                max_votes = votes;
+                winner = dictGetKey(de);
+            }
+        }
+        dictReleaseIterator(di);
+        if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
+            winner = NULL;
+    }
+    winner = winner ? sdsnew(winner) : NULL;
+    sdsfree(myvote);
+    dictRelease(counters);
+    return winner;
+}
+
+/* This function checks if there are the conditions to start the failover,
+ * that is:
+ *
+ * 1) Enough time has passed since O_DOWN.
+ * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
+ * 3) We are the objectively leader for this master.
+ *
+ * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
+ * and SRI_I_AM_THE_LEADER.
+ */
+void sentinelStartFailover(sentinelRedisInstance *master) {
+    char *leader;
+    int isleader;
+
+    /* We can't failover if the master is not in O_DOWN state or if
+     * there is not already a failover in progress (to perform the
+     * takedown if the leader died) or if this Sentinel is not allowed
+     * to start a failover. */
+    if (!(master->flags & SRI_CAN_FAILOVER) ||
+        !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
+
+    leader = sentinelGetObjectiveLeader(master);
+    isleader = leader && strcasecmp(leader,server.runid) == 0;
+    sdsfree(leader);
+
+    /* If I'm not the leader, I can't failover for sure. */
+    if (!isleader) return;
+
+    /* If the failover is already in progress there are two options... */
+    if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
+        if (master->flags & SRI_I_AM_THE_LEADER) {
+            /* 1) I'm flagged as leader so I already started the failover.
+             *    Just return. */
+            return;
+        } else {
+            mstime_t elapsed = mstime() - master->failover_state_change_time;
+
+            /* 2) I'm the new leader, but I'm not flagged as leader in the
+             *    master: I did not started the failover, but the original
+             *    leader has no longer the leadership.
+             *
+             *    In this case if the failover appears to be lagging
+             *    for at least 25% of the configured failover timeout,
+             *    I can assume I can take control. Otherwise
+             *    it's better to return and wait more. */
+            if (elapsed < (master->failover_timeout/4)) return;
+            sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
+            /* We have already an elected slave if we are in
+             * FAILOVER_IN_PROGRESS state, that is, the slave that we
+             * observed turning into a master. */
+            master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
+            /* As an observer we flagged all the slaves as RECONF_SENT but
+             * now we are in charge of actually sending the reconfiguration
+             * command so let's clear this flag for all the instances. */
+            sentinelDelFlagsToDictOfRedisInstances(master->slaves,
+                SRI_RECONF_SENT);
+        }
+    } else {
+        /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set. */
+        master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
+    }
+
+    master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
+    sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
+
+    /* Pick a random delay if it's a fresh failover (WAIT_START), and not
+     * a recovery of a failover started by another sentinel. */
+    if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
+        master->failover_start_time = mstime() +
+            SENTINEL_FAILOVER_FIXED_DELAY +
+            (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
+        sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
+            "%@ #starting in %lld milliseconds",
+            master->failover_start_time-mstime());
+    }
+    master->failover_state_change_time = mstime();
+}
+
+/* Select a suitable slave to promote. The current algorithm only uses
+ * the following parameters:
+ *
+ * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
+ * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
+ * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
+ * 4) master_link_down_time no more than:
+ *     (now - master->s_down_since_time) + (master->down_after_period * 10).
+ *
+ * Among all the slaves matching the above conditions we select the slave
+ * with lower slave_priority. If priority is the same we select the slave
+ * with lexicographically smaller runid.
+ *
+ * The function returns the pointer to the selected slave, otherwise
+ * NULL if no suitable slave was found.
+ */
+
+int compareSlavesForPromotion(const void *a, const void *b) {
+    sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
+                          **sb = (sentinelRedisInstance **)b;
+    if ((*sa)->slave_priority != (*sb)->slave_priority)
+        return (*sa)->slave_priority - (*sb)->slave_priority;
+    return strcasecmp((*sa)->runid,(*sb)->runid);
+}
+
+sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
+    sentinelRedisInstance **instance =
+        zmalloc(sizeof(instance[0])*dictSize(master->slaves));
+    sentinelRedisInstance *selected = NULL;
+    int instances = 0;
+    dictIterator *di;
+    dictEntry *de;
+    mstime_t max_master_down_time;
+
+    max_master_down_time = (mstime() - master->s_down_since_time) +
+                           (master->down_after_period * 10);
+
+    di = dictGetIterator(master->slaves);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *slave = dictGetVal(de);
+        mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
+
+        if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
+        if (slave->last_avail_time < info_validity_time) continue;
+        if (slave->info_refresh < info_validity_time) continue;
+        if (slave->master_link_down_time > max_master_down_time) continue;
+        instance[instances++] = slave;
+    }
+    dictReleaseIterator(di);
+    if (instances) {
+        qsort(instance,instances,sizeof(sentinelRedisInstance*),
+            compareSlavesForPromotion);
+        selected = instance[0];
+    }
+    zfree(instance);
+    return selected;
+}
+
+/* ---------------- Failover state machine implementation ------------------- */
+void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
+    if (mstime() >= ri->failover_start_time) {
+        ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
+        ri->failover_state_change_time = mstime();
+        sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
+    }
+}
+
+void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
+    sentinelRedisInstance *slave = sentinelSelectSlave(ri);
+
+    if (slave == NULL) {
+        sentinelEvent(REDIS_WARNING,"-no-good-slave",ri,
+            "%@ #retrying in %d seconds",
+            (SENTINEL_FAILOVER_FIXED_DELAY+
+             SENTINEL_FAILOVER_MAX_RANDOM_DELAY)/1000);
+        ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
+        ri->failover_start_time = mstime() + SENTINEL_FAILOVER_FIXED_DELAY +
+                                  SENTINEL_FAILOVER_MAX_RANDOM_DELAY;
+    } else {
+        sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
+        slave->flags |= SRI_PROMOTED;
+        ri->promoted_slave = slave;
+        ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
+        ri->failover_state_change_time = mstime();
+        sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
+            slave, "%@");
+    }
+}
+
+void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
+    int retval;
+
+    if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
+
+    /* Send SLAVEOF NO ONE command to turn the slave into a master.
+     * We actually register a generic callback for this command as we don't
+     * really care about the reply. We check if it worked indirectly observing
+     * if INFO returns a different role (master instead of slave). */
+    retval = redisAsyncCommand(ri->promoted_slave->cc,
+        sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
+    if (retval != REDIS_OK) return;
+    ri->promoted_slave->pending_commands++;
+    sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
+        ri->promoted_slave,"%@");
+    ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
+    ri->failover_state_change_time = mstime();
+}
+
+/* We actually wait for promotion indirectly checking with INFO when the
+ * slave turns into a master. */
+void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
+    mstime_t elapsed = mstime() - ri->failover_state_change_time;
+
+    if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
+        sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
+            "%@");
+        sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
+        ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
+        ri->failover_state_change_time = mstime();
+        ri->promoted_slave->flags &= ~SRI_PROMOTED;
+        ri->promoted_slave = NULL;
+    }
+}
+
+void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
+    int not_reconfigured = 0, timeout = 0;
+    dictIterator *di;
+    dictEntry *de;
+    mstime_t elapsed = mstime() - master->failover_state_change_time;
+
+    /* We can't consider failover finished if the promoted slave is
+     * not reachable. */
+    if (master->promoted_slave == NULL ||
+        master->promoted_slave->flags & SRI_S_DOWN) return;
+
+    /* The failover terminates once all the reachable slaves are properly
+     * configured. */
+    di = dictGetIterator(master->slaves);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *slave = dictGetVal(de);
+
+        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
+        if (slave->flags & SRI_S_DOWN) continue;
+        not_reconfigured++;
+    }
+    dictReleaseIterator(di);
+
+    /* Force end of failover on timeout. */
+    if (elapsed > master->failover_timeout) {
+        not_reconfigured = 0;
+        timeout = 1;
+        sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
+    }
+
+    if (not_reconfigured == 0) {
+        sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
+        master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
+        master->failover_state_change_time = mstime();
+    }
+
+    /* If I'm the leader it is a good idea to send a best effort SLAVEOF
+     * command to all the slaves still not reconfigured to replicate with
+     * the new master. */
+    if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
+        dictIterator *di;
+        dictEntry *de;
+        char master_port[32];
+
+        ll2string(master_port,sizeof(master_port),
+            master->promoted_slave->addr->port);
+
+        di = dictGetIterator(master->slaves);
+        while((de = dictNext(di)) != NULL) {
+            sentinelRedisInstance *slave = dictGetVal(de);
+            int retval;
+
+            if (slave->flags &
+                (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
+
+            retval = redisAsyncCommand(slave->cc,
+                sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+                    master->promoted_slave->addr->ip,
+                    master_port);
+            if (retval == REDIS_OK) {
+                sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
+                slave->flags |= SRI_RECONF_SENT;
+            }
+        }
+        dictReleaseIterator(di);
+    }
+}
+
+/* Send SLAVE OF <new master address> to all the remaining slaves that
+ * still don't appear to have the configuration updated. */
+void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
+    dictIterator *di;
+    dictEntry *de;
+    int in_progress = 0;
+
+    di = dictGetIterator(master->slaves);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *slave = dictGetVal(de);
+
+        if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
+            in_progress++;
+    }
+    dictReleaseIterator(di);
+
+    di = dictGetIterator(master->slaves);
+    while(in_progress < master->parallel_syncs &&
+          (de = dictNext(di)) != NULL)
+    {
+        sentinelRedisInstance *slave = dictGetVal(de);
+        int retval;
+        char master_port[32];
+
+        /* Skip the promoted slave, and already configured slaves. */
+        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
+
+        /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
+         * the slave moving forward to the next state. */
+        if ((slave->flags & SRI_RECONF_SENT) &&
+            (mstime() - slave->slave_reconf_sent_time) >
+            SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
+        {
+            sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
+            slave->flags &= ~SRI_RECONF_SENT;
+        }
+
+        /* Nothing to do for instances that are disconnected or already
+         * in RECONF_SENT state. */
+        if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
+            continue;
+
+        /* Send SLAVEOF <new master>. */
+        ll2string(master_port,sizeof(master_port),
+            master->promoted_slave->addr->port);
+        retval = redisAsyncCommand(slave->cc,
+            sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+                master->promoted_slave->addr->ip,
+                master_port);
+        if (retval == REDIS_OK) {
+            slave->flags |= SRI_RECONF_SENT;
+            slave->pending_commands++;
+            slave->slave_reconf_sent_time = mstime();
+            sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
+            in_progress++;
+        }
+    }
+    dictReleaseIterator(di);
+    sentinelFailoverDetectEnd(master);
+}
+
+/* This function is called when the slave is in
+ * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
+ * to remove it from the master table and add the promoted slave instead.
+ *
+ * If there are no promoted slaves as this instance is unique, we remove
+ * and re-add it with the same address to trigger a complete state
+ * refresh. */
+void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
+    sentinelRedisInstance *new, *ref = master->promoted_slave ?
+                                       master->promoted_slave : master;
+    int quorum = ref->quorum, parallel_syncs = ref->parallel_syncs;
+    char *name = sdsnew(master->name);
+    char *ip = sdsnew(ref->addr->ip), *oldip = sdsnew(master->addr->ip);
+    int port = ref->addr->port, oldport = master->addr->port;
+    int retval, oldflags = master->flags;
+    mstime_t old_down_after_period = master->down_after_period;
+    mstime_t old_failover_timeout = master->failover_timeout;
+
+    retval = dictDelete(sentinel.masters,master->name);
+    redisAssert(retval == DICT_OK);
+    new = createSentinelRedisInstance(name,SRI_MASTER,ip,port,quorum,NULL);
+    redisAssert(new != NULL);
+    new->parallel_syncs = parallel_syncs;
+    new->flags |= (oldflags & SRI_CAN_FAILOVER);
+    new->down_after_period = old_down_after_period;
+    new->failover_timeout = old_failover_timeout;
+    /* TODO: ... set the scripts as well. */
+    sentinelEvent(REDIS_WARNING,"+switch-master",new,"%s %s %d %s %d",
+        name, oldip, oldport, ip, port);
+    sdsfree(name);
+    sdsfree(ip);
+    sdsfree(oldip);
+}
+
+void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
+    redisAssert(ri->flags & SRI_MASTER);
+
+    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
+
+    switch(ri->failover_state) {
+        case SENTINEL_FAILOVER_STATE_WAIT_START:
+            sentinelFailoverWaitStart(ri);
+            break;
+        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
+            sentinelFailoverSelectSlave(ri);
+            break;
+        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
+            sentinelFailoverSendSlaveOfNoOne(ri);
+            break;
+        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
+            sentinelFailoverWaitPromotion(ri);
+            break;
+        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
+            sentinelFailoverReconfNextSlave(ri);
+            break;
+        case SENTINEL_FAILOVER_STATE_DETECT_END:
+            sentinelFailoverDetectEnd(ri);
+            break;
+    }
+}
+
+/* The following is called only for master instances and will abort the
+ * failover process if:
+ *
+ * 1) The failover is in progress.
+ * 2) We already promoted a slave.
+ * 3) The promoted slave is in extended SDOWN condition.
+ */
+void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
+    dictIterator *di;
+    dictEntry *de;
+
+    /* Failover is in progress? Do we have a promoted slave? */
+    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
+
+    /* Is the promoted slave into an extended SDOWN state? */
+    if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
+        (mstime() - ri->promoted_slave->s_down_since_time) <
+        (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
+
+    sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
+
+    /* Clear failover related flags from slaves.
+     * Also if we are the leader make sure to send SLAVEOF commands to all the
+     * already reconfigured slaves in order to turn them back into slaves of
+     * the original master. */
+
+    di = dictGetIterator(ri->slaves);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *slave = dictGetVal(de);
+        if (ri->flags & SRI_I_AM_THE_LEADER) {
+            char master_port[32];
+            int retval;
+
+            ll2string(master_port,sizeof(master_port),ri->addr->port);
+            retval = redisAsyncCommand(slave->cc,
+                sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+                    ri->addr->ip,
+                    master_port);
+            if (retval == REDIS_OK)
+                sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
+        }
+        slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
+    }
+    dictReleaseIterator(di);
+
+    ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER);
+    ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+    ri->failover_state_change_time = mstime();
+    ri->promoted_slave->flags &= ~SRI_PROMOTED;
+    ri->promoted_slave = NULL;
+}
+
+/* ======================== SENTINEL timer handler ==========================
+ * This is the "main" our Sentinel, being sentinel completely non blocking
+ * in design. The function is called every second.
+ * -------------------------------------------------------------------------- */
+
+/* Perform scheduled operations for the specified Redis instance. */
+void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
+    /* ========== MONITORING HALF ============ */
+    /* Every kind of instance */
+    sentinelReconnectInstance(ri);
+    sentinelPingInstance(ri);
+
+    /* Masters and slaves */
+    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+        /* Nothing so far. */
+    }
+
+    /* Only masters */
+    if (ri->flags & SRI_MASTER) {
+        sentinelAskMasterStateToOtherSentinels(ri);
+    }
+
+    /* ============== ACTING HALF ============= */
+    /* We don't proceed with the acting half if we are in TILT mode.
+     * TILT happens when we find something odd with the time, like a
+     * sudden change in the clock. */
+    if (sentinel.tilt) {
+        if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
+        sentinel.tilt = 0;
+        sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
+    }
+
+    /* Every kind of instance */
+    sentinelCheckSubjectivelyDown(ri);
+
+    /* Masters and slaves */
+    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+        /* Nothing so far. */
+    }
+
+    /* Only masters */
+    if (ri->flags & SRI_MASTER) {
+        sentinelCheckObjectivelyDown(ri);
+        sentinelStartFailover(ri);
+        sentinelFailoverStateMachine(ri);
+        sentinelAbortFailoverIfNeeded(ri);
+    }
+}
+
+/* Perform scheduled operations for all the instances in the dictionary.
+ * Recursively call the function against dictionaries of slaves. */
+void sentinelHandleDictOfRedisInstances(dict *instances) {
+    dictIterator *di;
+    dictEntry *de;
+    sentinelRedisInstance *switch_to_promoted = NULL;
+
+    /* There are a number of things we need to perform against every master. */
+    di = dictGetIterator(instances);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+
+        sentinelHandleRedisInstance(ri);
+        if (ri->flags & SRI_MASTER) {
+            sentinelHandleDictOfRedisInstances(ri->slaves);
+            sentinelHandleDictOfRedisInstances(ri->sentinels);
+            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
+                switch_to_promoted = ri;
+            }
+        }
+    }
+    if (switch_to_promoted)
+        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
+    dictReleaseIterator(di);
+}
+
+/* This function checks if we need to enter the TITL mode.
+ *
+ * The TILT mode is entered if we detect that between two invocations of the
+ * timer interrupt, a negative amount of time, or too much time has passed.
+ * Note that we expect that more or less just 100 milliseconds will pass
+ * if everything is fine. However we'll see a negative number or a
+ * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
+ * following conditions happen:
+ *
+ * 1) The Sentiel process for some time is blocked, for every kind of
+ * random reason: the load is huge, the computer was freezed for some time
+ * in I/O or alike, the process was stopped by a signal. Everything.
+ * 2) The system clock was altered significantly.
+ *
+ * Under both this conditions we'll see everything as timed out and failing
+ * without good reasons. Instead we enter the TILT mode and wait
+ * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
+ *
+ * During TILT time we still collect information, we just do not act. */
+void sentinelCheckTiltCondition(void) {
+    mstime_t now = mstime();
+    mstime_t delta = now - sentinel.previous_time;
+
+    if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
+        sentinel.tilt = 1;
+        sentinel.tilt_start_time = mstime();
+        sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
+    }
+    sentinel.previous_time = mstime();
+}
+
+void sentinelTimer(void) {
+    sentinelCheckTiltCondition();
+    sentinelHandleDictOfRedisInstances(sentinel.masters);
+}
+