--- /dev/null
+/* Redis Sentinel implementation
+ * -----------------------------
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "redis.h"
+#include "hiredis.h"
+#include "async.h"
+
+#include <ctype.h>
+#include <arpa/inet.h>
+#include <sys/socket.h>
+
+#define REDIS_SENTINEL_PORT 26379
+
+/* ======================== Sentinel global state =========================== */
+
+typedef long long mstime_t; /* millisecond time type. */
+
+/* Address object, used to describe an ip:port pair. */
+typedef struct sentinelAddr {
+ char *ip;
+ int port;
+} sentinelAddr;
+
+/* A Sentinel Redis Instance object is monitoring. */
+#define SRI_MASTER (1<<0)
+#define SRI_SLAVE (1<<1)
+#define SRI_SENTINEL (1<<2)
+#define SRI_DISCONNECTED (1<<3)
+#define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
+#define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
+#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
+ its master is down. */
+/* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
+ * allowed to perform the failover for this master.
+ * When set in a SRI_SENTINEL instance means that sentinel is allowed to
+ * perform the failover on its master. */
+#define SRI_CAN_FAILOVER (1<<7)
+#define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
+ this master. */
+#define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
+#define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
+#define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
+#define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
+#define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
+
+#define SENTINEL_INFO_PERIOD 10000
+#define SENTINEL_PING_PERIOD 1000
+#define SENTINEL_ASK_PERIOD 1000
+#define SENTINEL_PUBLISH_PERIOD 5000
+#define SENTINEL_DOWN_AFTER_PERIOD 30000
+#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
+#define SENTINEL_TILT_TRIGGER 2000
+#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
+#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
+#define SENTINEL_PROMOTION_RETRY_PERIOD 30000
+#define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
+#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
+#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
+#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
+#define SENTINEL_MAX_PENDING_COMMANDS 100
+#define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
+
+/* How many milliseconds is an information valid? This applies for instance
+ * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
+#define SENTINEL_INFO_VALIDITY_TIME 5000
+#define SENTINEL_FAILOVER_FIXED_DELAY 5000
+#define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
+
+/* Failover machine different states. */
+#define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
+#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
+#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
+#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
+#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
+#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
+#define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
+#define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
+#define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
+#define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
+#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
+
+#define SENTINEL_MASTER_LINK_STATUS_UP 0
+#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
+
+typedef struct sentinelRedisInstance {
+ int flags; /* See SRI_... defines */
+ char *name; /* Master name from the point of view of this sentinel. */
+ char *runid; /* run ID of this instance. */
+ sentinelAddr *addr; /* Master host. */
+ redisAsyncContext *cc; /* Hiredis context for commands. */
+ redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
+ int pending_commands; /* Number of commands sent waiting for a reply. */
+ mstime_t cc_conn_time; /* cc connection time. */
+ mstime_t pc_conn_time; /* pc connection time. */
+ mstime_t pc_last_activity; /* Last time we received any message. */
+ mstime_t last_avail_time; /* Last time the instance replied to ping with
+ a reply we consider valid. */
+ mstime_t last_pong_time; /* Last time the instance replied to ping,
+ whatever the reply was. That's used to check
+ if the link is idle and must be reconnected. */
+ mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
+ mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
+ we received an hello from this Sentinel
+ via Pub/Sub. */
+ mstime_t last_master_down_reply_time; /* Time of last reply to
+ SENTINEL is-master-down command. */
+ mstime_t s_down_since_time; /* Subjectively down since time. */
+ mstime_t o_down_since_time; /* Objectively down since time. */
+ mstime_t down_after_period; /* Consider it down after that period. */
+ mstime_t info_refresh; /* Time at which we received INFO output from it. */
+
+ /* Master specific. */
+ dict *sentinels; /* Other sentinels monitoring the same master. */
+ dict *slaves; /* Slaves for this master instance. */
+ int quorum; /* Number of sentinels that need to agree on failure. */
+ int parallel_syncs; /* How many slaves to reconfigure at same time. */
+
+ /* Slave specific. */
+ mstime_t master_link_down_time; /* Slave replication link down time. */
+ int slave_priority; /* Slave priority according to its INFO output. */
+ mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
+ struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
+ char *slave_master_host; /* Master host as reported by INFO */
+ int slave_master_port; /* Master port as reported by INFO */
+ int slave_master_link_status; /* Master link status as reported by INFO */
+ /* Failover */
+ char *leader; /* If this is a master instance, this is the runid of
+ the Sentinel that should perform the failover. If
+ this is a Sentinel, this is the runid of the Sentinel
+ that this other Sentinel is voting as leader.
+ This field is valid only if SRI_MASTER_DOWN is
+ set on the Sentinel instance. */
+ int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
+ mstime_t failover_state_change_time;
+ mstime_t failover_start_time; /* When to start to failover if leader. */
+ mstime_t failover_timeout; /* Max time to refresh failover state. */
+ struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
+ /* Scripts executed to notify admin or reconfigure clients: when they
+ * are set to NULL no script is executed. */
+ char *notify_script;
+ char *client_reconfig_script;
+} sentinelRedisInstance;
+
+/* Main state. */
+struct sentinelState {
+ dict *masters; /* Dictionary of master sentinelRedisInstances.
+ Key is the instance name, value is the
+ sentinelRedisInstance structure pointer. */
+ int tilt; /* Are we in TILT mode? */
+ mstime_t tilt_start_time; /* When TITL started. */
+ mstime_t previous_time; /* Time last time we ran the time handler. */
+} sentinel;
+
+/* ======================= hiredis ae.c adapters =============================
+ * Note: this implementation is taken from hiredis/adapters/ae.h, however
+ * we have our modified copy for Sentinel in order to use our allocator
+ * and to have full control over how the adapter works. */
+
+typedef struct redisAeEvents {
+ redisAsyncContext *context;
+ aeEventLoop *loop;
+ int fd;
+ int reading, writing;
+} redisAeEvents;
+
+static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
+ ((void)el); ((void)fd); ((void)mask);
+
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ redisAsyncHandleRead(e->context);
+}
+
+static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
+ ((void)el); ((void)fd); ((void)mask);
+
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ redisAsyncHandleWrite(e->context);
+}
+
+static void redisAeAddRead(void *privdata) {
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ aeEventLoop *loop = e->loop;
+ if (!e->reading) {
+ e->reading = 1;
+ aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
+ }
+}
+
+static void redisAeDelRead(void *privdata) {
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ aeEventLoop *loop = e->loop;
+ if (e->reading) {
+ e->reading = 0;
+ aeDeleteFileEvent(loop,e->fd,AE_READABLE);
+ }
+}
+
+static void redisAeAddWrite(void *privdata) {
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ aeEventLoop *loop = e->loop;
+ if (!e->writing) {
+ e->writing = 1;
+ aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
+ }
+}
+
+static void redisAeDelWrite(void *privdata) {
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ aeEventLoop *loop = e->loop;
+ if (e->writing) {
+ e->writing = 0;
+ aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
+ }
+}
+
+static void redisAeCleanup(void *privdata) {
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ redisAeDelRead(privdata);
+ redisAeDelWrite(privdata);
+ zfree(e);
+}
+
+static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
+ redisContext *c = &(ac->c);
+ redisAeEvents *e;
+
+ /* Nothing should be attached when something is already attached */
+ if (ac->ev.data != NULL)
+ return REDIS_ERR;
+
+ /* Create container for context and r/w events */
+ e = (redisAeEvents*)zmalloc(sizeof(*e));
+ e->context = ac;
+ e->loop = loop;
+ e->fd = c->fd;
+ e->reading = e->writing = 0;
+
+ /* Register functions to start/stop listening for events */
+ ac->ev.addRead = redisAeAddRead;
+ ac->ev.delRead = redisAeDelRead;
+ ac->ev.addWrite = redisAeAddWrite;
+ ac->ev.delWrite = redisAeDelWrite;
+ ac->ev.cleanup = redisAeCleanup;
+ ac->ev.data = e;
+
+ return REDIS_OK;
+}
+
+/* ============================= Prototypes ================================= */
+
+void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
+void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
+void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
+sentinelRedisInstance *sentinelGetMasterByName(char *name);
+char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
+char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
+int yesnotoi(char *s);
+void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
+const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
+
+/* ========================= Dictionary types =============================== */
+
+unsigned int dictSdsHash(const void *key);
+int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
+void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
+
+void dictInstancesValDestructor (void *privdata, void *obj) {
+ releaseSentinelRedisInstance(obj);
+}
+
+/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
+ *
+ * also used for: sentinelRedisInstance->sentinels dictionary that maps
+ * sentinels ip:port to last seen time in Pub/Sub hello message. */
+dictType instancesDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ NULL, /* key destructor */
+ dictInstancesValDestructor /* val destructor */
+};
+
+/* Instance runid (sds) -> votes (long casted to void*)
+ *
+ * This is useful into sentinelGetObjectiveLeader() function in order to
+ * count the votes and understand who is the leader. */
+dictType leaderVotesDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ NULL, /* key destructor */
+ NULL /* val destructor */
+};
+
+/* =========================== Initialization =============================== */
+
+void sentinelCommand(redisClient *c);
+
+struct redisCommand sentinelcmds[] = {
+ {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
+ {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
+ {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
+ {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
+ {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
+ {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}
+};
+
+/* This function overwrites a few normal Redis config default with Sentinel
+ * specific defaults. */
+void initSentinelConfig(void) {
+ server.port = REDIS_SENTINEL_PORT;
+}
+
+/* Perform the Sentinel mode initialization. */
+void initSentinel(void) {
+ int j;
+
+ /* Remove usual Redis commands from the command table, then just add
+ * the SENTINEL command. */
+ dictEmpty(server.commands);
+ for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
+ int retval;
+ struct redisCommand *cmd = sentinelcmds+j;
+
+ retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
+ redisAssert(retval == DICT_OK);
+ }
+
+ /* Initialize various data structures. */
+ sentinel.masters = dictCreate(&instancesDictType,NULL);
+ sentinel.tilt = 0;
+ sentinel.tilt_start_time = mstime();
+ sentinel.previous_time = mstime();
+}
+
+/* ============================== sentinelAddr ============================== */
+
+/* Create a sentinelAddr object and return it on success.
+ * On error NULL is returned and errno is set to:
+ * ENOENT: Can't resolve the hostname.
+ * EINVAL: Invalid port number.
+ */
+sentinelAddr *createSentinelAddr(char *hostname, int port) {
+ char buf[32];
+ sentinelAddr *sa;
+
+ if (port <= 0 || port > 65535) {
+ errno = EINVAL;
+ return NULL;
+ }
+ if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
+ errno = ENOENT;
+ return NULL;
+ }
+ sa = zmalloc(sizeof(*sa));
+ sa->ip = sdsnew(buf);
+ sa->port = port;
+ return sa;
+}
+
+/* Free a Sentinel address. Can't fail. */
+void releaseSentinelAddr(sentinelAddr *sa) {
+ sdsfree(sa->ip);
+ zfree(sa);
+}
+
+/* =========================== Events notification ========================== */
+
+void sentinelCallNotificationScript(char *scriptpath, char *type, char *msg) {
+ /* TODO: implement it. */
+}
+
+/* Send an event to log, pub/sub, user notification script.
+ *
+ * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
+ * the execution of the user notification script.
+ *
+ * 'type' is the message type, also used as a pub/sub channel name.
+ *
+ * 'ri', is the redis instance target of this event if applicable, and is
+ * used to obtain the path of the notification script to execute.
+ *
+ * The remaining arguments are printf-alike.
+ * If the format specifier starts with the two characters "%@" then ri is
+ * not NULL, and the message is prefixed with an instance identifier in the
+ * following format:
+ *
+ * <instance type> <instance name> <ip> <port>
+ *
+ * If the instance type is not master, than the additional string is
+ * added to specify the originating master:
+ *
+ * @ <master name> <master ip> <master port>
+ *
+ * Any other specifier after "%@" is processed by printf itself.
+ */
+void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
+ const char *fmt, ...) {
+ va_list ap;
+ char msg[REDIS_MAX_LOGMSG_LEN];
+ robj *channel, *payload;
+
+ /* Handle %@ */
+ if (fmt[0] == '%' && fmt[1] == '@') {
+ sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
+ NULL : ri->master;
+
+ if (master) {
+ snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
+ sentinelRedisInstanceTypeStr(ri),
+ ri->name, ri->addr->ip, ri->addr->port,
+ master->name, master->addr->ip, master->addr->port);
+ } else {
+ snprintf(msg, sizeof(msg), "%s %s %s %d",
+ sentinelRedisInstanceTypeStr(ri),
+ ri->name, ri->addr->ip, ri->addr->port);
+ }
+ fmt += 2;
+ } else {
+ msg[0] = '\0';
+ }
+
+ /* Use vsprintf for the rest of the formatting if any. */
+ if (fmt[0] != '\0') {
+ va_start(ap, fmt);
+ vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
+ va_end(ap);
+ }
+
+ /* Log the message if the log level allows it to be logged. */
+ if (level >= server.verbosity)
+ redisLog(level,"%s %s",type,msg);
+
+ /* Publish the message via Pub/Sub if it's not a debugging one. */
+ if (level != REDIS_DEBUG) {
+ channel = createStringObject(type,strlen(type));
+ payload = createStringObject(msg,strlen(msg));
+ pubsubPublishMessage(channel,payload);
+ decrRefCount(channel);
+ decrRefCount(payload);
+ }
+
+ /* Call the notification script if applicable. */
+ if (level == REDIS_WARNING && ri != NULL) {
+ sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
+ ri : ri->master;
+ if (master->notify_script) {
+ sentinelCallNotificationScript(master->notify_script,type,msg);
+ }
+ }
+}
+
+/* ========================== sentinelRedisInstance ========================= */
+
+/* Create a redis instance, the following fields must be populated by the
+ * caller if needed:
+ * runid: set to NULL but will be populated once INFO output is received.
+ * info_refresh: is set to 0 to mean that we never received INFO so far.
+ *
+ * If SRI_MASTER is set into initial flags the instance is added to
+ * sentinel.masters table.
+ *
+ * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
+ * instance is added into master->slaves or master->sentinels table.
+ *
+ * If the instance is a slave or sentinel, the name parameter is ignored and
+ * is created automatically as hostname:port.
+ *
+ * The function fails if hostname can't be resolved or port is out of range.
+ * When this happens NULL is returned and errno is set accordingly to the
+ * createSentinelAddr() function.
+ *
+ * The function may also fail and return NULL with errno set to EBUSY if
+ * a master or slave with the same name already exists. */
+sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
+ sentinelRedisInstance *ri;
+ sentinelAddr *addr;
+ dict *table;
+ char slavename[128], *sdsname;
+
+ redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
+ redisAssert((flags & SRI_MASTER) || master != NULL);
+
+ /* Check address validity. */
+ addr = createSentinelAddr(hostname,port);
+ if (addr == NULL) return NULL;
+
+ /* For slaves and sentinel we use ip:port as name. */
+ if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
+ snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
+ name = slavename;
+ }
+
+ /* Make sure the entry is not duplicated. This may happen when the same
+ * name for a master is used multiple times inside the configuration or
+ * if we try to add multiple times a slave or sentinel with same ip/port
+ * to a master. */
+ if (flags & SRI_MASTER) table = sentinel.masters;
+ else if (flags & SRI_SLAVE) table = master->slaves;
+ else if (flags & SRI_SENTINEL) table = master->sentinels;
+ sdsname = sdsnew(name);
+ if (dictFind(table,sdsname)) {
+ sdsfree(sdsname);
+ errno = EBUSY;
+ return NULL;
+ }
+
+ /* Create the instance object. */
+ ri = zmalloc(sizeof(*ri));
+ /* Note that all the instances are started in the disconnected state,
+ * the event loop will take care of connecting them. */
+ ri->flags = flags | SRI_DISCONNECTED;
+ ri->name = sdsname;
+ ri->runid = NULL;
+ ri->addr = addr;
+ ri->cc = NULL;
+ ri->pc = NULL;
+ ri->pending_commands = 0;
+ ri->cc_conn_time = 0;
+ ri->pc_conn_time = 0;
+ ri->pc_last_activity = 0;
+ ri->last_avail_time = mstime();
+ ri->last_pong_time = mstime();
+ ri->last_pub_time = mstime();
+ ri->last_hello_time = mstime();
+ ri->last_master_down_reply_time = mstime();
+ ri->s_down_since_time = 0;
+ ri->o_down_since_time = 0;
+ ri->down_after_period = master ? master->down_after_period :
+ SENTINEL_DOWN_AFTER_PERIOD;
+ ri->master_link_down_time = 0;
+ ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
+ ri->slave_reconf_sent_time = 0;
+ ri->slave_master_host = NULL;
+ ri->slave_master_port = 0;
+ ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
+ ri->sentinels = dictCreate(&instancesDictType,NULL);
+ ri->quorum = quorum;
+ ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
+ ri->master = master;
+ ri->slaves = dictCreate(&instancesDictType,NULL);
+ ri->info_refresh = 0;
+
+ /* Failover state. */
+ ri->leader = NULL;
+ ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+ ri->failover_state_change_time = 0;
+ ri->failover_start_time = 0;
+ ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
+ ri->promoted_slave = NULL;
+ ri->notify_script = NULL;
+ ri->client_reconfig_script = NULL;
+
+ /* Add into the right table. */
+ dictAdd(table, ri->name, ri);
+ return ri;
+}
+
+/* Release this instance and all its slaves, sentinels, hiredis connections.
+ * This function also takes care of unlinking the instance from the main
+ * masters table (if it is a master) or from its master sentinels/slaves table
+ * if it is a slave or sentinel. */
+void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
+ /* Release all its slaves or sentinels if any. */
+ dictRelease(ri->sentinels);
+ dictRelease(ri->slaves);
+
+ /* Release hiredis connections. Note that redisAsyncFree() will call
+ * the disconnection callback. */
+ if (ri->cc) {
+ redisAsyncFree(ri->cc);
+ ri->cc = NULL;
+ }
+ if (ri->pc) {
+ redisAsyncFree(ri->pc);
+ ri->pc = NULL;
+ }
+
+ /* Free other resources. */
+ sdsfree(ri->name);
+ sdsfree(ri->runid);
+ sdsfree(ri->notify_script);
+ sdsfree(ri->client_reconfig_script);
+ sdsfree(ri->slave_master_host);
+ sdsfree(ri->leader);
+ releaseSentinelAddr(ri->addr);
+
+ /* Clear state into the master if needed. */
+ if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
+ ri->master->promoted_slave = NULL;
+
+ zfree(ri);
+}
+
+/* Lookup a slave in a master Redis instance, by ip and port. */
+sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
+ sentinelRedisInstance *ri, char *ip, int port)
+{
+ sds key;
+ sentinelRedisInstance *slave;
+
+ redisAssert(ri->flags & SRI_MASTER);
+ key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
+ slave = dictFetchValue(ri->slaves,key);
+ sdsfree(key);
+ return slave;
+}
+
+/* Return the name of the type of the instance as a string. */
+const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
+ if (ri->flags & SRI_MASTER) return "master";
+ else if (ri->flags & SRI_SLAVE) return "slave";
+ else if (ri->flags & SRI_SENTINEL) return "sentinel";
+ else return "unknown";
+}
+
+/* This function removes all the instances found in the dictionary of instances
+ * 'd', having either:
+ *
+ * 1) The same ip/port as specified.
+ * 2) The same runid.
+ *
+ * "1" and "2" don't need to verify at the same time, just one is enough.
+ * If "runid" is NULL it is not checked.
+ * Similarly if "ip" is NULL it is not checked.
+ *
+ * This function is useful because every time we add a new Sentinel into
+ * a master's Sentinels dictionary, we want to be very sure about not
+ * having duplicated instances for any reason. This is so important because
+ * we use those other sentinels in order to run our quorum protocol to
+ * understand if it's time to proceeed with the fail over.
+ *
+ * Making sure no duplication is possible we greately improve the robustness
+ * of the quorum (otherwise we may end counting the same instance multiple
+ * times for some reason).
+ *
+ * The function returns the number of Sentinels removed. */
+int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
+ dictIterator *di;
+ dictEntry *de;
+ int removed = 0;
+
+ di = dictGetSafeIterator(master->sentinels);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
+ (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
+ {
+ dictDelete(master->sentinels,ri->name);
+ removed++;
+ }
+ }
+ dictReleaseIterator(di);
+ return removed;
+}
+
+/* Search an instance with the same runid, ip and port into a dictionary
+ * of instances. Return NULL if not found, otherwise return the instance
+ * pointer.
+ *
+ * runid or ip can be NULL. In such a case the search is performed only
+ * by the non-NULL field. */
+sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
+ dictIterator *di;
+ dictEntry *de;
+ sentinelRedisInstance *instance = NULL;
+
+ redisAssert(ip || runid); /* User must pass at least one search param. */
+ di = dictGetIterator(instances);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ if (runid && !ri->runid) continue;
+ if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
+ (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
+ ri->addr->port == port)))
+ {
+ instance = ri;
+ break;
+ }
+ }
+ dictReleaseIterator(di);
+ return instance;
+}
+
+/* Simple master lookup by name */
+sentinelRedisInstance *sentinelGetMasterByName(char *name) {
+ sentinelRedisInstance *ri;
+ sds sdsname = sdsnew(name);
+
+ ri = dictFetchValue(sentinel.masters,sdsname);
+ sdsfree(sdsname);
+ return ri;
+}
+
+/* Add the specified flags to all the instances in the specified dictionary. */
+void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
+ dictIterator *di;
+ dictEntry *de;
+
+ di = dictGetIterator(instances);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ ri->flags |= flags;
+ }
+ dictReleaseIterator(di);
+}
+
+/* Remove the specified flags to all the instances in the specified
+ * dictionary. */
+void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
+ dictIterator *di;
+ dictEntry *de;
+
+ di = dictGetIterator(instances);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ ri->flags &= ~flags;
+ }
+ dictReleaseIterator(di);
+}
+
+/* Reset the state of a monitored master:
+ * 1) Remove all slaves.
+ * 2) Remove all sentinels.
+ * 3) Remove most of the flags resulting from runtime operations.
+ * 4) Reset timers to their default value.
+ * 5) In the process of doing this undo the failover if in progress.
+ * 6) Disconnect the connections with the master (will reconnect automatically).
+ */
+void sentinelResetMaster(sentinelRedisInstance *ri) {
+ redisAssert(ri->flags & SRI_MASTER);
+ dictRelease(ri->slaves);
+ dictRelease(ri->sentinels);
+ ri->slaves = dictCreate(&instancesDictType,NULL);
+ ri->sentinels = dictCreate(&instancesDictType,NULL);
+ if (ri->cc) redisAsyncFree(ri->cc);
+ if (ri->pc) redisAsyncFree(ri->pc);
+ ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
+ if (ri->leader) {
+ sdsfree(ri->leader);
+ ri->leader = NULL;
+ }
+ ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+ ri->failover_state_change_time = 0;
+ ri->failover_start_time = 0;
+ ri->promoted_slave = NULL;
+ sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
+}
+
+/* Call sentinelResetMaster() on every master with a name matching the specified
+ * pattern. */
+int sentinelResetMastersByPattern(char *pattern) {
+ dictIterator *di;
+ dictEntry *de;
+ int reset = 0;
+
+ di = dictGetIterator(sentinel.masters);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ if (ri->name) {
+ if (stringmatch(pattern,ri->name,0)) {
+ sentinelResetMaster(ri);
+ reset++;
+ }
+ }
+ }
+ dictReleaseIterator(di);
+ return reset;
+}
+
+/* ============================ Config handling ============================= */
+char *sentinelHandleConfiguration(char **argv, int argc) {
+ sentinelRedisInstance *ri;
+
+ if (!strcasecmp(argv[0],"monitor") && argc == 5) {
+ /* monitor <name> <host> <port> <quorum> */
+ int quorum = atoi(argv[4]);
+
+ if (quorum <= 0) return "Quorum must be 1 or greater.";
+ if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
+ atoi(argv[3]),quorum,NULL) == NULL)
+ {
+ switch(errno) {
+ case EBUSY: return "Duplicated master name.";
+ case ENOENT: return "Can't resolve master instance hostname.";
+ case EINVAL: return "Invalid port number";
+ }
+ }
+ } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
+ /* down-after-milliseconds <name> <milliseconds> */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ ri->down_after_period = atoi(argv[2]);
+ if (ri->down_after_period <= 0)
+ return "negative or zero time parameter.";
+ } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
+ /* failover-timeout <name> <milliseconds> */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ ri->failover_timeout = atoi(argv[2]);
+ if (ri->failover_timeout <= 0)
+ return "negative or zero time parameter.";
+ } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
+ /* can-failover <name> <yes/no> */
+ int yesno = yesnotoi(argv[2]);
+
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ if (yesno == -1) return "Argument must be either yes or no.";
+ if (yesno)
+ ri->flags |= SRI_CAN_FAILOVER;
+ else
+ ri->flags &= ~SRI_CAN_FAILOVER;
+ } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
+ /* parallel-syncs <name> <milliseconds> */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ ri->parallel_syncs = atoi(argv[2]);
+ } else {
+ return "Unrecognized sentinel configuration statement.";
+ }
+ return NULL;
+}
+
+/* ====================== hiredis connection handling ======================= */
+
+/* This function takes an hiredis context that is in an error condition
+ * and make sure to mark the instance as disconnected performing the
+ * cleanup needed.
+ *
+ * Note: we don't free the hiredis context as hiredis will do it for us
+ * for async conenctions. */
+void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
+ sentinelRedisInstance *ri = c->data;
+ int pubsub = (ri->pc == c);
+
+ sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
+ "%@ #%s", c->errstr);
+ if (pubsub)
+ ri->pc = NULL;
+ else
+ ri->cc = NULL;
+ ri->flags |= SRI_DISCONNECTED;
+}
+
+void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
+ if (status != REDIS_OK) {
+ sentinelDisconnectInstanceFromContext(c);
+ } else {
+ sentinelRedisInstance *ri = c->data;
+ int pubsub = (ri->pc == c);
+
+ sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
+ "%@");
+ }
+}
+
+void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
+ sentinelDisconnectInstanceFromContext(c);
+}
+
+/* Create the async connections for the specified instance if the instance
+ * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
+ * one of the two links (commands and pub/sub) is missing. */
+void sentinelReconnectInstance(sentinelRedisInstance *ri) {
+ if (!(ri->flags & SRI_DISCONNECTED)) return;
+
+ /* Commands connection. */
+ if (ri->cc == NULL) {
+ ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
+ if (ri->cc->err) {
+ sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
+ ri->cc->errstr);
+ redisAsyncFree(ri->cc);
+ ri->cc = NULL;
+ } else {
+ ri->cc_conn_time = mstime();
+ ri->cc->data = ri;
+ redisAeAttach(server.el,ri->cc);
+ redisAsyncSetConnectCallback(ri->cc,
+ sentinelLinkEstablishedCallback);
+ redisAsyncSetDisconnectCallback(ri->cc,
+ sentinelDisconnectCallback);
+ }
+ }
+ /* Pub / Sub */
+ if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
+ ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
+ if (ri->pc->err) {
+ sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
+ ri->pc->errstr);
+ redisAsyncFree(ri->pc);
+ ri->pc = NULL;
+ } else {
+ int retval;
+
+ ri->pc_conn_time = mstime();
+ ri->pc->data = ri;
+ redisAeAttach(server.el,ri->pc);
+ redisAsyncSetConnectCallback(ri->pc,
+ sentinelLinkEstablishedCallback);
+ redisAsyncSetDisconnectCallback(ri->pc,
+ sentinelDisconnectCallback);
+ /* Now we subscribe to the Sentinels "Hello" channel. */
+ retval = redisAsyncCommand(ri->pc,
+ sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
+ SENTINEL_HELLO_CHANNEL);
+ if (retval != REDIS_OK) {
+ /* If we can't subscribe, the Pub/Sub connection is useless
+ * and we can simply disconnect it and try again. */
+ redisAsyncFree(ri->pc);
+ ri->pc = NULL;
+ return;
+ }
+ }
+ }
+ /* Clear the DISCONNECTED flags only if we have both the connections
+ * (or just the commands connection if this is a slave or a
+ * sentinel instance). */
+ if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
+ ri->flags &= ~SRI_DISCONNECTED;
+}
+
+/* ======================== Redis instances pinging ======================== */
+
+/* Process the INFO output from masters. */
+void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
+ sds *lines;
+ int numlines, j;
+ int role = 0;
+
+
+ /* The following fields must be reset to a given value in the case they
+ * are not found at all in the INFO output. */
+ ri->master_link_down_time = 0;
+
+ /* Process line by line. */
+ lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
+ for (j = 0; j < numlines; j++) {
+ sentinelRedisInstance *slave;
+ sds l = lines[j];
+
+ /* run_id:<40 hex chars>*/
+ if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
+ if (ri->runid == NULL) {
+ ri->runid = sdsnewlen(l+7,40);
+ } else {
+ /* TODO: check if run_id has changed. This means the
+ * instance has been restarted, we want to set a flag
+ * and notify this event. */
+ }
+ }
+
+ /* slave0:<ip>,<port>,<state> */
+ if ((ri->flags & SRI_MASTER) &&
+ sdslen(l) >= 7 &&
+ !memcmp(l,"slave",5) && isdigit(l[5]))
+ {
+ char *ip, *port, *end;
+
+ ip = strchr(l,':'); if (!ip) continue;
+ ip++; /* Now ip points to start of ip address. */
+ port = strchr(ip,','); if (!port) continue;
+ *port = '\0'; /* nul term for easy access. */
+ port++; /* Now port points to start of port number. */
+ end = strchr(port,','); if (!end) continue;
+ *end = '\0'; /* nul term for easy access. */
+
+ /* Check if we already have this slave into our table,
+ * otherwise add it. */
+ if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
+ if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
+ atoi(port), ri->quorum,ri)) != NULL)
+ {
+ sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
+ }
+ }
+ }
+
+ /* master_link_down_since_seconds:<seconds> */
+ if (sdslen(l) >= 32 &&
+ !memcmp(l,"master_link_down_since_seconds",30))
+ {
+ ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
+ }
+
+ /* role:<role> */
+ if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
+ else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
+
+ if (role == SRI_SLAVE) {
+ /* master_host:<host> */
+ if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
+ sdsfree(ri->slave_master_host);
+ ri->slave_master_host = sdsnew(l+12);
+ }
+
+ /* master_port:<port> */
+ if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
+ ri->slave_master_port = atoi(l+12);
+
+ /* master_link_status:<status> */
+ if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
+ ri->slave_master_link_status =
+ (strcasecmp(l+19,"up") == 0) ?
+ SENTINEL_MASTER_LINK_STATUS_UP :
+ SENTINEL_MASTER_LINK_STATUS_DOWN;
+ }
+ }
+ }
+ ri->info_refresh = mstime();
+ sdsfreesplitres(lines,numlines);
+
+ if (sentinel.tilt) return;
+
+ /* Act if a slave turned into a master. */
+ if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
+ if (ri->flags & SRI_PROMOTED) {
+ /* If this is a promoted slave we can change state to the
+ * failover state machine. */
+ if (ri->master &&
+ (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+ (ri->master->flags & SRI_I_AM_THE_LEADER) &&
+ (ri->master->failover_state ==
+ SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
+ {
+ ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
+ ri->master->failover_state_change_time = mstime();
+ sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
+ sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
+ ri->master,"%@");
+ }
+ } else {
+ /* Otherwise we interpret this as the start of the failover. */
+ if (ri->master &&
+ (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) == 0)
+ {
+ ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
+ sentinelEvent(REDIS_WARNING,"failover-detected",ri->master,"%@");
+ ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
+ ri->master->failover_state_change_time = mstime();
+ ri->master->promoted_slave = ri;
+ ri->flags |= SRI_PROMOTED;
+ /* We are an observer, so we can only assume that the leader
+ * is reconfiguring the slave instances. For this reason we
+ * set all the instances as RECONF_SENT waiting for progresses
+ * on this side. */
+ sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
+ SRI_RECONF_SENT);
+ }
+ }
+ }
+
+ /* Detect if the slave that is in the process of being reconfigured
+ * changed state. */
+ if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
+ (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
+ {
+ /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
+ if ((ri->flags & SRI_RECONF_SENT) &&
+ ri->slave_master_host &&
+ strcmp(ri->slave_master_host,
+ ri->master->promoted_slave->addr->ip) == 0 &&
+ ri->slave_master_port == ri->master->promoted_slave->addr->port)
+ {
+ ri->flags &= ~SRI_RECONF_SENT;
+ ri->flags |= SRI_RECONF_INPROG;
+ sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
+ }
+
+ /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
+ if ((ri->flags & SRI_RECONF_INPROG) &&
+ ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
+ {
+ ri->flags &= ~SRI_RECONF_INPROG;
+ ri->flags |= SRI_RECONF_DONE;
+ sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
+ /* If we are moving forward (a new slave is now configured)
+ * we update the change_time as we are conceptually passing
+ * to the next slave. */
+ ri->failover_state_change_time = mstime();
+ }
+ }
+}
+
+void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+ redisReply *r;
+
+ ri->pending_commands--;
+ if (!reply) return;
+ r = reply;
+
+ if (r->type == REDIS_REPLY_STRING) {
+ sentinelRefreshInstanceInfo(ri,r->str);
+ }
+}
+
+/* Just discard the reply. We use this when we are not monitoring the return
+ * value of the command but its effects directly. */
+void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+
+ ri->pending_commands--;
+}
+
+void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+ redisReply *r;
+
+ ri->pending_commands--;
+ if (!reply) return;
+ r = reply;
+
+ if (r->type == REDIS_REPLY_STATUS ||
+ r->type == REDIS_REPLY_ERROR) {
+ /* Update the "instance available" field only if this is an
+ * acceptable reply. */
+ if (strncmp(r->str,"PONG",4) == 0 ||
+ strncmp(r->str,"LOADING",7) == 0 ||
+ strncmp(r->str,"MASTERDOWN",10) == 0)
+ {
+ ri->last_avail_time = mstime();
+ }
+ }
+ ri->last_pong_time = mstime();
+}
+
+/* This is called when we get the reply about the PUBLISH command we send
+ * to the master to advertise this sentinel. */
+void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+ redisReply *r;
+
+ ri->pending_commands--;
+ if (!reply) return;
+ r = reply;
+
+ /* Only update pub_time if we actually published our message. Otherwise
+ * we'll retry against in 100 milliseconds. */
+ if (r->type != REDIS_REPLY_ERROR)
+ ri->last_pub_time = mstime();
+}
+
+/* This is our Pub/Sub callback for the Hello channel. It's useful in order
+ * to discover other sentinels attached at the same master. */
+void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+ redisReply *r;
+
+ if (!reply) return;
+ r = reply;
+
+ /* Update the last activity in the pubsub channel. Note that since we
+ * receive our messages as well this timestamp can be used to detect
+ * if the link is probably diconnected even if it seems otherwise. */
+ ri->pc_last_activity = mstime();
+
+ /* Sanity check in the reply we expect, so that the code that follows
+ * can avoid to check for details. */
+ if (r->type != REDIS_REPLY_ARRAY ||
+ r->elements != 3 ||
+ r->element[0]->type != REDIS_REPLY_STRING ||
+ r->element[1]->type != REDIS_REPLY_STRING ||
+ r->element[2]->type != REDIS_REPLY_STRING ||
+ strcmp(r->element[0]->str,"message") != 0) return;
+
+ /* We are not interested in meeting ourselves */
+ if (strstr(r->element[2]->str,server.runid) != NULL) return;
+
+ {
+ int numtokens, port, removed, canfailover;
+ char **token = sdssplitlen(r->element[2]->str,
+ r->element[2]->len,
+ ":",1,&numtokens);
+ sentinelRedisInstance *sentinel;
+
+ if (numtokens == 4) {
+ /* First, try to see if we already have this sentinel. */
+ port = atoi(token[1]);
+ canfailover = atoi(token[3]);
+ sentinel = getSentinelRedisInstanceByAddrAndRunID(
+ ri->sentinels,token[0],port,token[2]);
+
+ if (!sentinel) {
+ /* If not, remove all the sentinels that have the same runid
+ * OR the same ip/port, because it's either a restart or a
+ * network topology change. */
+ removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
+ token[2]);
+ if (removed) {
+ sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
+ "%@ #duplicate of %s:%d or %s",
+ token[0],port,token[2]);
+ }
+
+ /* Add the new sentinel. */
+ sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
+ token[0],port,ri->quorum,ri);
+ if (sentinel) {
+ sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
+ /* The runid is NULL after a new instance creation and
+ * for Sentinels we don't have a later chance to fill it,
+ * so do it now. */
+ sentinel->runid = sdsnew(token[2]);
+ }
+ }
+
+ /* Update the state of the Sentinel. */
+ if (sentinel) {
+ sentinel->last_hello_time = mstime();
+ if (canfailover)
+ sentinel->flags |= SRI_CAN_FAILOVER;
+ else
+ sentinel->flags &= ~SRI_CAN_FAILOVER;
+ }
+ }
+ sdsfreesplitres(token,numtokens);
+ }
+}
+
+void sentinelPingInstance(sentinelRedisInstance *ri) {
+ mstime_t now = mstime();
+ mstime_t info_period;
+ int retval;
+
+ /* Return ASAP if we have already a PING or INFO already pending, or
+ * in the case the instance is not properly connected. */
+ if (ri->flags & SRI_DISCONNECTED) return;
+
+ /* For INFO, PING, PUBLISH that are not critical commands to send we
+ * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
+ * want to use a lot of memory just because a link is not working
+ * properly (note that anyway there is a redundant protection about this,
+ * that is, the link will be disconnected and reconnected if a long
+ * timeout condition is detected. */
+ if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
+
+ /* If this is a slave of a master in O_DOWN condition we start sending
+ * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
+ * period. In this state we want to closely monitor slaves in case they
+ * are turned into masters by another Sentinel, or by the sysadmin. */
+ if ((ri->flags & SRI_SLAVE) &&
+ (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
+ info_period = 1000;
+ } else {
+ info_period = SENTINEL_INFO_PERIOD;
+ }
+
+ if ((ri->flags & SRI_SENTINEL) == 0 &&
+ (ri->info_refresh == 0 ||
+ (now - ri->info_refresh) > info_period))
+ {
+ /* Send INFO to masters and slaves, not sentinels. */
+ retval = redisAsyncCommand(ri->cc,
+ sentinelInfoReplyCallback, NULL, "INFO");
+ if (retval != REDIS_OK) return;
+ ri->pending_commands++;
+ } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
+ /* Send PING to all the three kinds of instances. */
+ retval = redisAsyncCommand(ri->cc,
+ sentinelPingReplyCallback, NULL, "PING");
+ if (retval != REDIS_OK) return;
+ ri->pending_commands++;
+ } else if ((ri->flags & SRI_MASTER) &&
+ (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
+ {
+ /* PUBLISH hello messages only to masters. */
+ struct sockaddr_in sa;
+ socklen_t salen = sizeof(sa);
+
+ if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
+ char myaddr[128];
+
+ snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
+ inet_ntoa(sa.sin_addr), server.port, server.runid,
+ (ri->flags & SRI_CAN_FAILOVER) != 0);
+ retval = redisAsyncCommand(ri->cc,
+ sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
+ SENTINEL_HELLO_CHANNEL,myaddr);
+ if (retval != REDIS_OK) return;
+ ri->pending_commands++;
+ }
+ }
+}
+
+/* =========================== SENTINEL command ============================= */
+
+const char *sentinelFailoverStateStr(int state) {
+ switch(state) {
+ case SENTINEL_FAILOVER_STATE_NONE: return "none";
+ case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
+ case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
+ case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
+ case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
+ case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
+ case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
+ case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
+ case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
+ default: return "unknown";
+ }
+}
+
+/* Redis instance to Redis protocol representation. */
+void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
+ char *flags = sdsempty();
+ void *mbl;
+ int fields = 0;
+
+ mbl = addDeferredMultiBulkLength(c);
+
+ addReplyBulkCString(c,"name");
+ addReplyBulkCString(c,ri->name);
+ fields++;
+
+ addReplyBulkCString(c,"ip");
+ addReplyBulkCString(c,ri->addr->ip);
+ fields++;
+
+ addReplyBulkCString(c,"port");
+ addReplyBulkLongLong(c,ri->addr->port);
+ fields++;
+
+ addReplyBulkCString(c,"runid");
+ addReplyBulkCString(c,ri->runid ? ri->runid : "");
+ fields++;
+
+ addReplyBulkCString(c,"flags");
+ if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
+ if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
+ if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
+ if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
+ if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
+ if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
+ if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
+ if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
+ flags = sdscat(flags,"failover_in_progress,");
+ if (ri->flags & SRI_I_AM_THE_LEADER)
+ flags = sdscat(flags,"i_am_the_leader,");
+ if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
+ if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
+ if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
+ if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
+
+ if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
+ addReplyBulkCString(c,flags);
+ sdsfree(flags);
+ fields++;
+
+ addReplyBulkCString(c,"pending-commands");
+ addReplyBulkLongLong(c,ri->pending_commands);
+ fields++;
+
+ if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
+ addReplyBulkCString(c,"failover-state");
+ addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
+ fields++;
+ }
+
+ addReplyBulkCString(c,"last-ok-ping-reply");
+ addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
+ fields++;
+
+ addReplyBulkCString(c,"last-ping-reply");
+ addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
+ fields++;
+
+ if (ri->flags & SRI_S_DOWN) {
+ addReplyBulkCString(c,"s-down-time");
+ addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
+ fields++;
+ }
+
+ if (ri->flags & SRI_O_DOWN) {
+ addReplyBulkCString(c,"o-down-time");
+ addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
+ fields++;
+ }
+
+ /* Masters and Slaves */
+ if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+ addReplyBulkCString(c,"info-refresh");
+ addReplyBulkLongLong(c,mstime() - ri->info_refresh);
+ fields++;
+ }
+
+ /* Only masters */
+ if (ri->flags & SRI_MASTER) {
+ addReplyBulkCString(c,"num-slaves");
+ addReplyBulkLongLong(c,dictSize(ri->slaves));
+ fields++;
+
+ addReplyBulkCString(c,"num-other-sentinels");
+ addReplyBulkLongLong(c,dictSize(ri->sentinels));
+ fields++;
+
+ addReplyBulkCString(c,"quorum");
+ addReplyBulkLongLong(c,ri->quorum);
+ fields++;
+ }
+
+ /* Only slaves */
+ if (ri->flags & SRI_SLAVE) {
+ addReplyBulkCString(c,"master-link-down-time");
+ addReplyBulkLongLong(c,ri->master_link_down_time);
+ fields++;
+
+ addReplyBulkCString(c,"master-link-status");
+ addReplyBulkCString(c,
+ (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
+ "ok" : "err");
+ fields++;
+
+ addReplyBulkCString(c,"master-host");
+ addReplyBulkCString(c,
+ ri->slave_master_host ? ri->slave_master_host : "?");
+ fields++;
+
+ addReplyBulkCString(c,"master-port");
+ addReplyBulkLongLong(c,ri->slave_master_port);
+ fields++;
+ }
+
+ /* Only sentinels */
+ if (ri->flags & SRI_SENTINEL) {
+ addReplyBulkCString(c,"last-hello-message");
+ addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
+ fields++;
+
+ addReplyBulkCString(c,"can-failover-its-master");
+ addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
+ fields++;
+
+ if (ri->flags & SRI_MASTER_DOWN) {
+ addReplyBulkCString(c,"subjective-leader");
+ addReplyBulkCString(c,ri->leader ? ri->leader : "?");
+ fields++;
+ }
+ }
+
+ setDeferredMultiBulkLength(c,mbl,fields*2);
+}
+
+/* Output a number of instances contanined inside a dictionary as
+ * Redis protocol. */
+void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
+ dictIterator *di;
+ dictEntry *de;
+
+ di = dictGetIterator(instances);
+ addReplyMultiBulkLen(c,dictSize(instances));
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ addReplySentinelRedisInstance(c,ri);
+ }
+ dictReleaseIterator(di);
+}
+
+/* Lookup the named master into sentinel.masters.
+ * If the master is not found reply to the client with an error and returns
+ * NULL. */
+sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
+ robj *name)
+{
+ sentinelRedisInstance *ri;
+
+ ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
+ if (!ri) {
+ addReplyError(c,"No such master with that name");
+ return NULL;
+ }
+ return ri;
+}
+
+void sentinelCommand(redisClient *c) {
+ if (!strcasecmp(c->argv[1]->ptr,"masters")) {
+ /* SENTINEL MASTERS */
+ if (c->argc != 2) goto numargserr;
+
+ addReplyDictOfRedisInstances(c,sentinel.masters);
+ } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
+ /* SENTINEL SLAVES <master-name> */
+ sentinelRedisInstance *ri;
+
+ if (c->argc != 3) goto numargserr;
+ if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
+ return;
+ addReplyDictOfRedisInstances(c,ri->slaves);
+ } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
+ /* SENTINEL SENTINELS <master-name> */
+ sentinelRedisInstance *ri;
+
+ if (c->argc != 3) goto numargserr;
+ if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
+ return;
+ addReplyDictOfRedisInstances(c,ri->sentinels);
+ } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
+ /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
+ sentinelRedisInstance *ri;
+ char *leader = NULL;
+ long port;
+ int isdown = 0;
+
+ if (c->argc != 4) goto numargserr;
+ if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
+ return;
+ ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
+ c->argv[2]->ptr,port,NULL);
+
+ /* It exists? Is actually a master? Is subjectively down? It's down.
+ * Note: if we are in tilt mode we always reply with "0". */
+ if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
+ (ri->flags & SRI_MASTER))
+ isdown = 1;
+ if (ri) leader = sentinelGetSubjectiveLeader(ri);
+
+ /* Reply with a two-elements multi-bulk reply: down state, leader. */
+ addReplyMultiBulkLen(c,2);
+ addReply(c, isdown ? shared.cone : shared.czero);
+ addReplyBulkCString(c, leader ? leader : "?");
+ if (leader) sdsfree(leader);
+ } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
+ /* SENTINEL RESET <pattern> */
+ if (c->argc != 3) goto numargserr;
+ addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr));
+ } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
+ /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
+ sentinelRedisInstance *ri;
+
+ if (c->argc != 3) goto numargserr;
+ ri = sentinelGetMasterByName(c->argv[2]->ptr);
+ if (ri == NULL) {
+ addReply(c,shared.nullmultibulk);
+ } else {
+ sentinelAddr *addr = ri->addr;
+
+ if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
+ addr = ri->promoted_slave->addr;
+ addReplyMultiBulkLen(c,2);
+ addReplyBulkCString(c,addr->ip);
+ addReplyBulkLongLong(c,addr->port);
+ }
+ } else {
+ addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
+ (char*)c->argv[1]->ptr);
+ }
+ return;
+
+numargserr:
+ addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
+ (char*)c->argv[1]->ptr);
+}
+
+/* ===================== SENTINEL availability checks ======================= */
+
+/* Is this instance down from our point of view? */
+void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
+ mstime_t elapsed = mstime() - ri->last_avail_time;
+
+ /* Check if we are in need for a reconnection of one of the
+ * links, because we are detecting low activity.
+ *
+ * 1) Check if the command link seems connected, was connected not less
+ * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
+ * idle time that is greater than down_after_period / 2 seconds. */
+ if (ri->cc &&
+ (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+ (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
+ {
+ redisAsyncFree(ri->cc); /* will call the disconnection callback */
+ }
+
+ /* 2) Check if the pubsub link seems connected, was connected not less
+ * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
+ * activity in the Pub/Sub channel for more than
+ * SENTINEL_PUBLISH_PERIOD * 3.
+ */
+ if (ri->pc &&
+ (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+ (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
+ {
+ redisAsyncFree(ri->pc); /* will call the disconnection callback */
+ }
+
+ /* Update the subjectively down flag. */
+ if (elapsed > ri->down_after_period) {
+ /* Is subjectively down */
+ if ((ri->flags & SRI_S_DOWN) == 0) {
+ sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
+ ri->s_down_since_time = mstime();
+ ri->flags |= SRI_S_DOWN;
+ }
+ } else {
+ /* Is subjectively up */
+ if (ri->flags & SRI_S_DOWN) {
+ sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
+ ri->flags &= ~SRI_S_DOWN;
+ }
+ }
+}
+
+/* Is this instance down accordingly to the configured quorum? */
+void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
+ dictIterator *di;
+ dictEntry *de;
+ int quorum = 0, odown = 0;
+
+ if (master->flags & SRI_S_DOWN) {
+ /* Is down for enough sentinels? */
+ quorum = 1; /* the current sentinel. */
+ /* Count all the other sentinels. */
+ di = dictGetIterator(master->sentinels);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ if (ri->flags & SRI_MASTER_DOWN) quorum++;
+ }
+ dictReleaseIterator(di);
+ if (quorum >= master->quorum) odown = 1;
+ }
+
+ /* Set the flag accordingly to the outcome. */
+ if (odown) {
+ if ((master->flags & SRI_O_DOWN) == 0) {
+ sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
+ quorum, master->quorum);
+ master->flags |= SRI_O_DOWN;
+ master->o_down_since_time = mstime();
+ }
+ } else {
+ if (master->flags & SRI_O_DOWN) {
+ sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
+ master->flags &= ~SRI_O_DOWN;
+ }
+ }
+}
+
+/* Receive the SENTINEL is-master-down-by-addr reply, see the
+ * sentinelAskMasterStateToOtherSentinels() function for more information. */
+void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+ redisReply *r;
+
+ ri->pending_commands--;
+ if (!reply) return;
+ r = reply;
+
+ /* Ignore every error or unexpected reply.
+ * Note that if the command returns an error for any reason we'll
+ * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
+ if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
+ r->element[0]->type == REDIS_REPLY_INTEGER &&
+ r->element[1]->type == REDIS_REPLY_STRING)
+ {
+ ri->last_master_down_reply_time = mstime();
+ if (r->element[0]->integer == 1) {
+ ri->flags |= SRI_MASTER_DOWN;
+ } else {
+ ri->flags &= ~SRI_MASTER_DOWN;
+ }
+ sdsfree(ri->leader);
+ ri->leader = sdsnew(r->element[1]->str);
+ }
+}
+
+/* If we think (subjectively) the master is down, we start sending
+ * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
+ * in order to get the replies that allow to reach the quorum and
+ * possibly also mark the master as objectively down. */
+void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
+ dictIterator *di;
+ dictEntry *de;
+
+ di = dictGetIterator(master->sentinels);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
+ char port[32];
+ int retval;
+
+ /* If the master state from other sentinel is too old, we clear it. */
+ if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
+ ri->flags &= ~SRI_MASTER_DOWN;
+ sdsfree(ri->leader);
+ ri->leader = NULL;
+ }
+
+ /* Only ask if master is down to other sentinels if:
+ *
+ * 1) We believe it is down, or there is a failover in progress.
+ * 2) Sentinel is connected.
+ * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
+ if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
+ continue;
+ if (ri->flags & SRI_DISCONNECTED) continue;
+ if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
+ continue;
+
+ /* Ask */
+ ll2string(port,sizeof(port),master->addr->port);
+ retval = redisAsyncCommand(ri->cc,
+ sentinelReceiveIsMasterDownReply, NULL,
+ "SENTINEL is-master-down-by-addr %s %s",
+ master->addr->ip, port);
+ if (retval == REDIS_OK) ri->pending_commands++;
+ }
+ dictReleaseIterator(di);
+}
+
+/* =============================== FAILOVER ================================= */
+
+/* Given a master get the "subjective leader", that is, among all the sentinels
+ * with given characteristics, the one with the lexicographically smaller
+ * runid. The characteristics required are:
+ *
+ * 1) Has SRI_CAN_FAILOVER flag.
+ * 2) Is not disconnected.
+ * 3) Recently answered to our ping (no longer than
+ * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
+ *
+ * The function returns a pointer to an sds string representing the runid of the
+ * leader sentinel instance (from our point of view). Otherwise NULL is
+ * returned if there are no suitable sentinels.
+ */
+
+int compareRunID(const void *a, const void *b) {
+ char **aptrptr = (char**)a, **bptrptr = (char**)b;
+ return strcasecmp(*aptrptr, *bptrptr);
+}
+
+char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
+ dictIterator *di;
+ dictEntry *de;
+ char **instance =
+ zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
+ int instances = 0;
+ char *leader = NULL;
+
+ if (master->flags & SRI_CAN_FAILOVER) {
+ /* Add myself if I'm a Sentinel that can failover this master. */
+ instance[instances++] = server.runid;
+ }
+
+ di = dictGetIterator(master->sentinels);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ mstime_t lag = mstime() - ri->last_avail_time;
+
+ if (lag > SENTINEL_INFO_VALIDITY_TIME ||
+ !(ri->flags & SRI_CAN_FAILOVER) ||
+ (ri->flags & SRI_DISCONNECTED) ||
+ ri->runid == NULL)
+ continue;
+ instance[instances++] = ri->runid;
+ }
+ dictReleaseIterator(di);
+
+ /* If we have at least one instance passing our checks, order the array
+ * by runid. */
+ if (instances) {
+ qsort(instance,instances,sizeof(char*),compareRunID);
+ leader = sdsnew(instance[0]);
+ }
+ zfree(instance);
+ return leader;
+}
+
+struct sentinelLeader {
+ char *runid;
+ unsigned long votes;
+};
+
+/* Helper function for sentinelGetObjectiveLeader, increment the counter
+ * relative to the specified runid. */
+void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
+ dictEntry *de = dictFind(counters,runid);
+ uint64_t oldval;
+
+ if (de) {
+ oldval = dictGetUnsignedIntegerVal(de);
+ dictSetUnsignedIntegerVal(de,oldval+1);
+ } else {
+ de = dictAddRaw(counters,runid);
+ redisAssert(de != NULL);
+ dictSetUnsignedIntegerVal(de,1);
+ }
+}
+
+/* Scan all the Sentinels attached to this master to check what is the
+ * most voted leader among Sentinels. */
+char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
+ dict *counters;
+ dictIterator *di;
+ dictEntry *de;
+ unsigned int voters = 0, voters_quorum;
+ char *myvote;
+ char *winner = NULL;
+
+ redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
+ counters = dictCreate(&leaderVotesDictType,NULL);
+
+ /* Count my vote. */
+ myvote = sentinelGetSubjectiveLeader(master);
+ if (myvote) {
+ sentinelObjectiveLeaderIncr(counters,myvote);
+ voters++;
+ }
+
+ /* Count other sentinels votes */
+ di = dictGetIterator(master->sentinels);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ if (ri->leader == NULL) continue;
+ /* If the failover is not already in progress we are only interested
+ * in Sentinels that believe the master is down. Otherwise the leader
+ * selection is useful for the "failover-takedown" when the original
+ * leader fails. In that case we consider all the voters. */
+ if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+ !(ri->flags & SRI_MASTER_DOWN)) continue;
+ sentinelObjectiveLeaderIncr(counters,ri->leader);
+ voters++;
+ }
+ dictReleaseIterator(di);
+ voters_quorum = voters/2+1;
+
+ /* Check what's the winner. For the winner to win, it needs two conditions:
+ * 1) Absolute majority between voters (50% + 1).
+ * 2) And anyway at least master->quorum votes. */
+ {
+ uint64_t max_votes = 0; /* Max votes so far. */
+
+ di = dictGetIterator(counters);
+ while((de = dictNext(di)) != NULL) {
+ uint64_t votes = dictGetUnsignedIntegerVal(de);
+
+ if (max_votes < votes) {
+ max_votes = votes;
+ winner = dictGetKey(de);
+ }
+ }
+ dictReleaseIterator(di);
+ if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
+ winner = NULL;
+ }
+ winner = winner ? sdsnew(winner) : NULL;
+ sdsfree(myvote);
+ dictRelease(counters);
+ return winner;
+}
+
+/* This function checks if there are the conditions to start the failover,
+ * that is:
+ *
+ * 1) Enough time has passed since O_DOWN.
+ * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
+ * 3) We are the objectively leader for this master.
+ *
+ * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
+ * and SRI_I_AM_THE_LEADER.
+ */
+void sentinelStartFailover(sentinelRedisInstance *master) {
+ char *leader;
+ int isleader;
+
+ /* We can't failover if the master is not in O_DOWN state or if
+ * there is not already a failover in progress (to perform the
+ * takedown if the leader died) or if this Sentinel is not allowed
+ * to start a failover. */
+ if (!(master->flags & SRI_CAN_FAILOVER) ||
+ !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
+
+ leader = sentinelGetObjectiveLeader(master);
+ isleader = leader && strcasecmp(leader,server.runid) == 0;
+ sdsfree(leader);
+
+ /* If I'm not the leader, I can't failover for sure. */
+ if (!isleader) return;
+
+ /* If the failover is already in progress there are two options... */
+ if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
+ if (master->flags & SRI_I_AM_THE_LEADER) {
+ /* 1) I'm flagged as leader so I already started the failover.
+ * Just return. */
+ return;
+ } else {
+ mstime_t elapsed = mstime() - master->failover_state_change_time;
+
+ /* 2) I'm the new leader, but I'm not flagged as leader in the
+ * master: I did not started the failover, but the original
+ * leader has no longer the leadership.
+ *
+ * In this case if the failover appears to be lagging
+ * for at least 25% of the configured failover timeout,
+ * I can assume I can take control. Otherwise
+ * it's better to return and wait more. */
+ if (elapsed < (master->failover_timeout/4)) return;
+ sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
+ /* We have already an elected slave if we are in
+ * FAILOVER_IN_PROGRESS state, that is, the slave that we
+ * observed turning into a master. */
+ master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
+ /* As an observer we flagged all the slaves as RECONF_SENT but
+ * now we are in charge of actually sending the reconfiguration
+ * command so let's clear this flag for all the instances. */
+ sentinelDelFlagsToDictOfRedisInstances(master->slaves,
+ SRI_RECONF_SENT);
+ }
+ } else {
+ /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set. */
+ master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
+ }
+
+ master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
+ sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
+
+ /* Pick a random delay if it's a fresh failover (WAIT_START), and not
+ * a recovery of a failover started by another sentinel. */
+ if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
+ master->failover_start_time = mstime() +
+ SENTINEL_FAILOVER_FIXED_DELAY +
+ (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
+ sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
+ "%@ #starting in %lld milliseconds",
+ master->failover_start_time-mstime());
+ }
+ master->failover_state_change_time = mstime();
+}
+
+/* Select a suitable slave to promote. The current algorithm only uses
+ * the following parameters:
+ *
+ * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
+ * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
+ * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
+ * 4) master_link_down_time no more than:
+ * (now - master->s_down_since_time) + (master->down_after_period * 10).
+ *
+ * Among all the slaves matching the above conditions we select the slave
+ * with lower slave_priority. If priority is the same we select the slave
+ * with lexicographically smaller runid.
+ *
+ * The function returns the pointer to the selected slave, otherwise
+ * NULL if no suitable slave was found.
+ */
+
+int compareSlavesForPromotion(const void *a, const void *b) {
+ sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
+ **sb = (sentinelRedisInstance **)b;
+ if ((*sa)->slave_priority != (*sb)->slave_priority)
+ return (*sa)->slave_priority - (*sb)->slave_priority;
+ return strcasecmp((*sa)->runid,(*sb)->runid);
+}
+
+sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
+ sentinelRedisInstance **instance =
+ zmalloc(sizeof(instance[0])*dictSize(master->slaves));
+ sentinelRedisInstance *selected = NULL;
+ int instances = 0;
+ dictIterator *di;
+ dictEntry *de;
+ mstime_t max_master_down_time;
+
+ max_master_down_time = (mstime() - master->s_down_since_time) +
+ (master->down_after_period * 10);
+
+ di = dictGetIterator(master->slaves);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *slave = dictGetVal(de);
+ mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
+
+ if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
+ if (slave->last_avail_time < info_validity_time) continue;
+ if (slave->info_refresh < info_validity_time) continue;
+ if (slave->master_link_down_time > max_master_down_time) continue;
+ instance[instances++] = slave;
+ }
+ dictReleaseIterator(di);
+ if (instances) {
+ qsort(instance,instances,sizeof(sentinelRedisInstance*),
+ compareSlavesForPromotion);
+ selected = instance[0];
+ }
+ zfree(instance);
+ return selected;
+}
+
+/* ---------------- Failover state machine implementation ------------------- */
+void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
+ if (mstime() >= ri->failover_start_time) {
+ ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
+ ri->failover_state_change_time = mstime();
+ sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
+ }
+}
+
+void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
+ sentinelRedisInstance *slave = sentinelSelectSlave(ri);
+
+ if (slave == NULL) {
+ sentinelEvent(REDIS_WARNING,"-no-good-slave",ri,
+ "%@ #retrying in %d seconds",
+ (SENTINEL_FAILOVER_FIXED_DELAY+
+ SENTINEL_FAILOVER_MAX_RANDOM_DELAY)/1000);
+ ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
+ ri->failover_start_time = mstime() + SENTINEL_FAILOVER_FIXED_DELAY +
+ SENTINEL_FAILOVER_MAX_RANDOM_DELAY;
+ } else {
+ sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
+ slave->flags |= SRI_PROMOTED;
+ ri->promoted_slave = slave;
+ ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
+ ri->failover_state_change_time = mstime();
+ sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
+ slave, "%@");
+ }
+}
+
+void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
+ int retval;
+
+ if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
+
+ /* Send SLAVEOF NO ONE command to turn the slave into a master.
+ * We actually register a generic callback for this command as we don't
+ * really care about the reply. We check if it worked indirectly observing
+ * if INFO returns a different role (master instead of slave). */
+ retval = redisAsyncCommand(ri->promoted_slave->cc,
+ sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
+ if (retval != REDIS_OK) return;
+ ri->promoted_slave->pending_commands++;
+ sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
+ ri->promoted_slave,"%@");
+ ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
+ ri->failover_state_change_time = mstime();
+}
+
+/* We actually wait for promotion indirectly checking with INFO when the
+ * slave turns into a master. */
+void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
+ mstime_t elapsed = mstime() - ri->failover_state_change_time;
+
+ if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
+ sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
+ "%@");
+ sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
+ ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
+ ri->failover_state_change_time = mstime();
+ ri->promoted_slave->flags &= ~SRI_PROMOTED;
+ ri->promoted_slave = NULL;
+ }
+}
+
+void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
+ int not_reconfigured = 0, timeout = 0;
+ dictIterator *di;
+ dictEntry *de;
+ mstime_t elapsed = mstime() - master->failover_state_change_time;
+
+ /* We can't consider failover finished if the promoted slave is
+ * not reachable. */
+ if (master->promoted_slave == NULL ||
+ master->promoted_slave->flags & SRI_S_DOWN) return;
+
+ /* The failover terminates once all the reachable slaves are properly
+ * configured. */
+ di = dictGetIterator(master->slaves);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *slave = dictGetVal(de);
+
+ if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
+ if (slave->flags & SRI_S_DOWN) continue;
+ not_reconfigured++;
+ }
+ dictReleaseIterator(di);
+
+ /* Force end of failover on timeout. */
+ if (elapsed > master->failover_timeout) {
+ not_reconfigured = 0;
+ timeout = 1;
+ sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
+ }
+
+ if (not_reconfigured == 0) {
+ sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
+ master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
+ master->failover_state_change_time = mstime();
+ }
+
+ /* If I'm the leader it is a good idea to send a best effort SLAVEOF
+ * command to all the slaves still not reconfigured to replicate with
+ * the new master. */
+ if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
+ dictIterator *di;
+ dictEntry *de;
+ char master_port[32];
+
+ ll2string(master_port,sizeof(master_port),
+ master->promoted_slave->addr->port);
+
+ di = dictGetIterator(master->slaves);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *slave = dictGetVal(de);
+ int retval;
+
+ if (slave->flags &
+ (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
+
+ retval = redisAsyncCommand(slave->cc,
+ sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+ master->promoted_slave->addr->ip,
+ master_port);
+ if (retval == REDIS_OK) {
+ sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
+ slave->flags |= SRI_RECONF_SENT;
+ }
+ }
+ dictReleaseIterator(di);
+ }
+}
+
+/* Send SLAVE OF <new master address> to all the remaining slaves that
+ * still don't appear to have the configuration updated. */
+void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
+ dictIterator *di;
+ dictEntry *de;
+ int in_progress = 0;
+
+ di = dictGetIterator(master->slaves);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *slave = dictGetVal(de);
+
+ if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
+ in_progress++;
+ }
+ dictReleaseIterator(di);
+
+ di = dictGetIterator(master->slaves);
+ while(in_progress < master->parallel_syncs &&
+ (de = dictNext(di)) != NULL)
+ {
+ sentinelRedisInstance *slave = dictGetVal(de);
+ int retval;
+ char master_port[32];
+
+ /* Skip the promoted slave, and already configured slaves. */
+ if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
+
+ /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
+ * the slave moving forward to the next state. */
+ if ((slave->flags & SRI_RECONF_SENT) &&
+ (mstime() - slave->slave_reconf_sent_time) >
+ SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
+ {
+ sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
+ slave->flags &= ~SRI_RECONF_SENT;
+ }
+
+ /* Nothing to do for instances that are disconnected or already
+ * in RECONF_SENT state. */
+ if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
+ continue;
+
+ /* Send SLAVEOF <new master>. */
+ ll2string(master_port,sizeof(master_port),
+ master->promoted_slave->addr->port);
+ retval = redisAsyncCommand(slave->cc,
+ sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+ master->promoted_slave->addr->ip,
+ master_port);
+ if (retval == REDIS_OK) {
+ slave->flags |= SRI_RECONF_SENT;
+ slave->pending_commands++;
+ slave->slave_reconf_sent_time = mstime();
+ sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
+ in_progress++;
+ }
+ }
+ dictReleaseIterator(di);
+ sentinelFailoverDetectEnd(master);
+}
+
+/* This function is called when the slave is in
+ * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
+ * to remove it from the master table and add the promoted slave instead.
+ *
+ * If there are no promoted slaves as this instance is unique, we remove
+ * and re-add it with the same address to trigger a complete state
+ * refresh. */
+void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
+ sentinelRedisInstance *new, *ref = master->promoted_slave ?
+ master->promoted_slave : master;
+ int quorum = ref->quorum, parallel_syncs = ref->parallel_syncs;
+ char *name = sdsnew(master->name);
+ char *ip = sdsnew(ref->addr->ip), *oldip = sdsnew(master->addr->ip);
+ int port = ref->addr->port, oldport = master->addr->port;
+ int retval, oldflags = master->flags;
+ mstime_t old_down_after_period = master->down_after_period;
+ mstime_t old_failover_timeout = master->failover_timeout;
+
+ retval = dictDelete(sentinel.masters,master->name);
+ redisAssert(retval == DICT_OK);
+ new = createSentinelRedisInstance(name,SRI_MASTER,ip,port,quorum,NULL);
+ redisAssert(new != NULL);
+ new->parallel_syncs = parallel_syncs;
+ new->flags |= (oldflags & SRI_CAN_FAILOVER);
+ new->down_after_period = old_down_after_period;
+ new->failover_timeout = old_failover_timeout;
+ /* TODO: ... set the scripts as well. */
+ sentinelEvent(REDIS_WARNING,"+switch-master",new,"%s %s %d %s %d",
+ name, oldip, oldport, ip, port);
+ sdsfree(name);
+ sdsfree(ip);
+ sdsfree(oldip);
+}
+
+void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
+ redisAssert(ri->flags & SRI_MASTER);
+
+ if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
+
+ switch(ri->failover_state) {
+ case SENTINEL_FAILOVER_STATE_WAIT_START:
+ sentinelFailoverWaitStart(ri);
+ break;
+ case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
+ sentinelFailoverSelectSlave(ri);
+ break;
+ case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
+ sentinelFailoverSendSlaveOfNoOne(ri);
+ break;
+ case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
+ sentinelFailoverWaitPromotion(ri);
+ break;
+ case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
+ sentinelFailoverReconfNextSlave(ri);
+ break;
+ case SENTINEL_FAILOVER_STATE_DETECT_END:
+ sentinelFailoverDetectEnd(ri);
+ break;
+ }
+}
+
+/* The following is called only for master instances and will abort the
+ * failover process if:
+ *
+ * 1) The failover is in progress.
+ * 2) We already promoted a slave.
+ * 3) The promoted slave is in extended SDOWN condition.
+ */
+void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
+ dictIterator *di;
+ dictEntry *de;
+
+ /* Failover is in progress? Do we have a promoted slave? */
+ if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
+
+ /* Is the promoted slave into an extended SDOWN state? */
+ if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
+ (mstime() - ri->promoted_slave->s_down_since_time) <
+ (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
+
+ sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
+
+ /* Clear failover related flags from slaves.
+ * Also if we are the leader make sure to send SLAVEOF commands to all the
+ * already reconfigured slaves in order to turn them back into slaves of
+ * the original master. */
+
+ di = dictGetIterator(ri->slaves);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *slave = dictGetVal(de);
+ if (ri->flags & SRI_I_AM_THE_LEADER) {
+ char master_port[32];
+ int retval;
+
+ ll2string(master_port,sizeof(master_port),ri->addr->port);
+ retval = redisAsyncCommand(slave->cc,
+ sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+ ri->addr->ip,
+ master_port);
+ if (retval == REDIS_OK)
+ sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
+ }
+ slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
+ }
+ dictReleaseIterator(di);
+
+ ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER);
+ ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+ ri->failover_state_change_time = mstime();
+ ri->promoted_slave->flags &= ~SRI_PROMOTED;
+ ri->promoted_slave = NULL;
+}
+
+/* ======================== SENTINEL timer handler ==========================
+ * This is the "main" our Sentinel, being sentinel completely non blocking
+ * in design. The function is called every second.
+ * -------------------------------------------------------------------------- */
+
+/* Perform scheduled operations for the specified Redis instance. */
+void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
+ /* ========== MONITORING HALF ============ */
+ /* Every kind of instance */
+ sentinelReconnectInstance(ri);
+ sentinelPingInstance(ri);
+
+ /* Masters and slaves */
+ if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+ /* Nothing so far. */
+ }
+
+ /* Only masters */
+ if (ri->flags & SRI_MASTER) {
+ sentinelAskMasterStateToOtherSentinels(ri);
+ }
+
+ /* ============== ACTING HALF ============= */
+ /* We don't proceed with the acting half if we are in TILT mode.
+ * TILT happens when we find something odd with the time, like a
+ * sudden change in the clock. */
+ if (sentinel.tilt) {
+ if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
+ sentinel.tilt = 0;
+ sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
+ }
+
+ /* Every kind of instance */
+ sentinelCheckSubjectivelyDown(ri);
+
+ /* Masters and slaves */
+ if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+ /* Nothing so far. */
+ }
+
+ /* Only masters */
+ if (ri->flags & SRI_MASTER) {
+ sentinelCheckObjectivelyDown(ri);
+ sentinelStartFailover(ri);
+ sentinelFailoverStateMachine(ri);
+ sentinelAbortFailoverIfNeeded(ri);
+ }
+}
+
+/* Perform scheduled operations for all the instances in the dictionary.
+ * Recursively call the function against dictionaries of slaves. */
+void sentinelHandleDictOfRedisInstances(dict *instances) {
+ dictIterator *di;
+ dictEntry *de;
+ sentinelRedisInstance *switch_to_promoted = NULL;
+
+ /* There are a number of things we need to perform against every master. */
+ di = dictGetIterator(instances);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ sentinelHandleRedisInstance(ri);
+ if (ri->flags & SRI_MASTER) {
+ sentinelHandleDictOfRedisInstances(ri->slaves);
+ sentinelHandleDictOfRedisInstances(ri->sentinels);
+ if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
+ switch_to_promoted = ri;
+ }
+ }
+ }
+ if (switch_to_promoted)
+ sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
+ dictReleaseIterator(di);
+}
+
+/* This function checks if we need to enter the TITL mode.
+ *
+ * The TILT mode is entered if we detect that between two invocations of the
+ * timer interrupt, a negative amount of time, or too much time has passed.
+ * Note that we expect that more or less just 100 milliseconds will pass
+ * if everything is fine. However we'll see a negative number or a
+ * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
+ * following conditions happen:
+ *
+ * 1) The Sentiel process for some time is blocked, for every kind of
+ * random reason: the load is huge, the computer was freezed for some time
+ * in I/O or alike, the process was stopped by a signal. Everything.
+ * 2) The system clock was altered significantly.
+ *
+ * Under both this conditions we'll see everything as timed out and failing
+ * without good reasons. Instead we enter the TILT mode and wait
+ * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
+ *
+ * During TILT time we still collect information, we just do not act. */
+void sentinelCheckTiltCondition(void) {
+ mstime_t now = mstime();
+ mstime_t delta = now - sentinel.previous_time;
+
+ if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
+ sentinel.tilt = 1;
+ sentinel.tilt_start_time = mstime();
+ sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
+ }
+ sentinel.previous_time = mstime();
+}
+
+void sentinelTimer(void) {
+ sentinelCheckTiltCondition();
+ sentinelHandleDictOfRedisInstances(sentinel.masters);
+}
+