]> git.saurik.com Git - redis.git/blame_incremental - src/sentinel.c
Sentinel: don't start a failover as leader if there is no good slave.
[redis.git] / src / sentinel.c
... / ...
CommitLineData
1/* Redis Sentinel implementation
2 * -----------------------------
3 *
4 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32#include "redis.h"
33#include "hiredis.h"
34#include "async.h"
35
36#include <ctype.h>
37#include <arpa/inet.h>
38#include <sys/socket.h>
39
40extern char **environ;
41
42#define REDIS_SENTINEL_PORT 26379
43
44/* ======================== Sentinel global state =========================== */
45
46typedef long long mstime_t; /* millisecond time type. */
47
48/* Address object, used to describe an ip:port pair. */
49typedef struct sentinelAddr {
50 char *ip;
51 int port;
52} sentinelAddr;
53
54/* A Sentinel Redis Instance object is monitoring. */
55#define SRI_MASTER (1<<0)
56#define SRI_SLAVE (1<<1)
57#define SRI_SENTINEL (1<<2)
58#define SRI_DISCONNECTED (1<<3)
59#define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
60#define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
61#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
62 its master is down. */
63/* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
64 * allowed to perform the failover for this master.
65 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
66 * perform the failover on its master. */
67#define SRI_CAN_FAILOVER (1<<7)
68#define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
69 this master. */
70#define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
71#define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
72#define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
73#define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
74#define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
75
76#define SENTINEL_INFO_PERIOD 10000
77#define SENTINEL_PING_PERIOD 1000
78#define SENTINEL_ASK_PERIOD 1000
79#define SENTINEL_PUBLISH_PERIOD 5000
80#define SENTINEL_DOWN_AFTER_PERIOD 30000
81#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
82#define SENTINEL_TILT_TRIGGER 2000
83#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
84#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
85#define SENTINEL_PROMOTION_RETRY_PERIOD 30000
86#define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
87#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
88#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
89#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
90#define SENTINEL_MAX_PENDING_COMMANDS 100
91#define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
92
93/* How many milliseconds is an information valid? This applies for instance
94 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
95#define SENTINEL_INFO_VALIDITY_TIME 5000
96#define SENTINEL_FAILOVER_FIXED_DELAY 5000
97#define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
98
99/* Failover machine different states. */
100#define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
101#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
102#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
103#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
104#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
105#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
106#define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
107#define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
108#define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
109#define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
110#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
111
112#define SENTINEL_MASTER_LINK_STATUS_UP 0
113#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
114
115/* Generic flags that can be used with different functions. */
116#define SENTINEL_NO_FLAGS 0
117#define SENTINEL_GENERATE_EVENT 1
118
119typedef struct sentinelRedisInstance {
120 int flags; /* See SRI_... defines */
121 char *name; /* Master name from the point of view of this sentinel. */
122 char *runid; /* run ID of this instance. */
123 sentinelAddr *addr; /* Master host. */
124 redisAsyncContext *cc; /* Hiredis context for commands. */
125 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
126 int pending_commands; /* Number of commands sent waiting for a reply. */
127 mstime_t cc_conn_time; /* cc connection time. */
128 mstime_t pc_conn_time; /* pc connection time. */
129 mstime_t pc_last_activity; /* Last time we received any message. */
130 mstime_t last_avail_time; /* Last time the instance replied to ping with
131 a reply we consider valid. */
132 mstime_t last_pong_time; /* Last time the instance replied to ping,
133 whatever the reply was. That's used to check
134 if the link is idle and must be reconnected. */
135 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
136 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
137 we received an hello from this Sentinel
138 via Pub/Sub. */
139 mstime_t last_master_down_reply_time; /* Time of last reply to
140 SENTINEL is-master-down command. */
141 mstime_t s_down_since_time; /* Subjectively down since time. */
142 mstime_t o_down_since_time; /* Objectively down since time. */
143 mstime_t down_after_period; /* Consider it down after that period. */
144 mstime_t info_refresh; /* Time at which we received INFO output from it. */
145
146 /* Master specific. */
147 dict *sentinels; /* Other sentinels monitoring the same master. */
148 dict *slaves; /* Slaves for this master instance. */
149 int quorum; /* Number of sentinels that need to agree on failure. */
150 int parallel_syncs; /* How many slaves to reconfigure at same time. */
151
152 /* Slave specific. */
153 mstime_t master_link_down_time; /* Slave replication link down time. */
154 int slave_priority; /* Slave priority according to its INFO output. */
155 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
156 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
157 char *slave_master_host; /* Master host as reported by INFO */
158 int slave_master_port; /* Master port as reported by INFO */
159 int slave_master_link_status; /* Master link status as reported by INFO */
160 /* Failover */
161 char *leader; /* If this is a master instance, this is the runid of
162 the Sentinel that should perform the failover. If
163 this is a Sentinel, this is the runid of the Sentinel
164 that this other Sentinel is voting as leader.
165 This field is valid only if SRI_MASTER_DOWN is
166 set on the Sentinel instance. */
167 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
168 mstime_t failover_state_change_time;
169 mstime_t failover_start_time; /* When to start to failover if leader. */
170 mstime_t failover_timeout; /* Max time to refresh failover state. */
171 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
172 /* Scripts executed to notify admin or reconfigure clients: when they
173 * are set to NULL no script is executed. */
174 char *notification_script;
175 char *client_reconfig_script;
176} sentinelRedisInstance;
177
178/* Main state. */
179struct sentinelState {
180 dict *masters; /* Dictionary of master sentinelRedisInstances.
181 Key is the instance name, value is the
182 sentinelRedisInstance structure pointer. */
183 int tilt; /* Are we in TILT mode? */
184 mstime_t tilt_start_time; /* When TITL started. */
185 mstime_t previous_time; /* Time last time we ran the time handler. */
186} sentinel;
187
188/* ======================= hiredis ae.c adapters =============================
189 * Note: this implementation is taken from hiredis/adapters/ae.h, however
190 * we have our modified copy for Sentinel in order to use our allocator
191 * and to have full control over how the adapter works. */
192
193typedef struct redisAeEvents {
194 redisAsyncContext *context;
195 aeEventLoop *loop;
196 int fd;
197 int reading, writing;
198} redisAeEvents;
199
200static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
201 ((void)el); ((void)fd); ((void)mask);
202
203 redisAeEvents *e = (redisAeEvents*)privdata;
204 redisAsyncHandleRead(e->context);
205}
206
207static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
208 ((void)el); ((void)fd); ((void)mask);
209
210 redisAeEvents *e = (redisAeEvents*)privdata;
211 redisAsyncHandleWrite(e->context);
212}
213
214static void redisAeAddRead(void *privdata) {
215 redisAeEvents *e = (redisAeEvents*)privdata;
216 aeEventLoop *loop = e->loop;
217 if (!e->reading) {
218 e->reading = 1;
219 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
220 }
221}
222
223static void redisAeDelRead(void *privdata) {
224 redisAeEvents *e = (redisAeEvents*)privdata;
225 aeEventLoop *loop = e->loop;
226 if (e->reading) {
227 e->reading = 0;
228 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
229 }
230}
231
232static void redisAeAddWrite(void *privdata) {
233 redisAeEvents *e = (redisAeEvents*)privdata;
234 aeEventLoop *loop = e->loop;
235 if (!e->writing) {
236 e->writing = 1;
237 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
238 }
239}
240
241static void redisAeDelWrite(void *privdata) {
242 redisAeEvents *e = (redisAeEvents*)privdata;
243 aeEventLoop *loop = e->loop;
244 if (e->writing) {
245 e->writing = 0;
246 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
247 }
248}
249
250static void redisAeCleanup(void *privdata) {
251 redisAeEvents *e = (redisAeEvents*)privdata;
252 redisAeDelRead(privdata);
253 redisAeDelWrite(privdata);
254 zfree(e);
255}
256
257static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
258 redisContext *c = &(ac->c);
259 redisAeEvents *e;
260
261 /* Nothing should be attached when something is already attached */
262 if (ac->ev.data != NULL)
263 return REDIS_ERR;
264
265 /* Create container for context and r/w events */
266 e = (redisAeEvents*)zmalloc(sizeof(*e));
267 e->context = ac;
268 e->loop = loop;
269 e->fd = c->fd;
270 e->reading = e->writing = 0;
271
272 /* Register functions to start/stop listening for events */
273 ac->ev.addRead = redisAeAddRead;
274 ac->ev.delRead = redisAeDelRead;
275 ac->ev.addWrite = redisAeAddWrite;
276 ac->ev.delWrite = redisAeDelWrite;
277 ac->ev.cleanup = redisAeCleanup;
278 ac->ev.data = e;
279
280 return REDIS_OK;
281}
282
283/* ============================= Prototypes ================================= */
284
285void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
286void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
287void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
288sentinelRedisInstance *sentinelGetMasterByName(char *name);
289char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
290char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
291int yesnotoi(char *s);
292void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
293void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
294const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
295void sentinelAbortFailover(sentinelRedisInstance *ri);
296void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
297sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
298
299/* ========================= Dictionary types =============================== */
300
301unsigned int dictSdsHash(const void *key);
302int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
303void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
304
305void dictInstancesValDestructor (void *privdata, void *obj) {
306 releaseSentinelRedisInstance(obj);
307}
308
309/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
310 *
311 * also used for: sentinelRedisInstance->sentinels dictionary that maps
312 * sentinels ip:port to last seen time in Pub/Sub hello message. */
313dictType instancesDictType = {
314 dictSdsHash, /* hash function */
315 NULL, /* key dup */
316 NULL, /* val dup */
317 dictSdsKeyCompare, /* key compare */
318 NULL, /* key destructor */
319 dictInstancesValDestructor /* val destructor */
320};
321
322/* Instance runid (sds) -> votes (long casted to void*)
323 *
324 * This is useful into sentinelGetObjectiveLeader() function in order to
325 * count the votes and understand who is the leader. */
326dictType leaderVotesDictType = {
327 dictSdsHash, /* hash function */
328 NULL, /* key dup */
329 NULL, /* val dup */
330 dictSdsKeyCompare, /* key compare */
331 NULL, /* key destructor */
332 NULL /* val destructor */
333};
334
335/* =========================== Initialization =============================== */
336
337void sentinelCommand(redisClient *c);
338
339struct redisCommand sentinelcmds[] = {
340 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
341 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
342 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
343 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
344 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
345 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}
346};
347
348/* This function overwrites a few normal Redis config default with Sentinel
349 * specific defaults. */
350void initSentinelConfig(void) {
351 server.port = REDIS_SENTINEL_PORT;
352}
353
354/* Perform the Sentinel mode initialization. */
355void initSentinel(void) {
356 int j;
357
358 /* Remove usual Redis commands from the command table, then just add
359 * the SENTINEL command. */
360 dictEmpty(server.commands);
361 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
362 int retval;
363 struct redisCommand *cmd = sentinelcmds+j;
364
365 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
366 redisAssert(retval == DICT_OK);
367 }
368
369 /* Initialize various data structures. */
370 sentinel.masters = dictCreate(&instancesDictType,NULL);
371 sentinel.tilt = 0;
372 sentinel.tilt_start_time = mstime();
373 sentinel.previous_time = mstime();
374}
375
376/* ============================== sentinelAddr ============================== */
377
378/* Create a sentinelAddr object and return it on success.
379 * On error NULL is returned and errno is set to:
380 * ENOENT: Can't resolve the hostname.
381 * EINVAL: Invalid port number.
382 */
383sentinelAddr *createSentinelAddr(char *hostname, int port) {
384 char buf[32];
385 sentinelAddr *sa;
386
387 if (port <= 0 || port > 65535) {
388 errno = EINVAL;
389 return NULL;
390 }
391 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
392 errno = ENOENT;
393 return NULL;
394 }
395 sa = zmalloc(sizeof(*sa));
396 sa->ip = sdsnew(buf);
397 sa->port = port;
398 return sa;
399}
400
401/* Free a Sentinel address. Can't fail. */
402void releaseSentinelAddr(sentinelAddr *sa) {
403 sdsfree(sa->ip);
404 zfree(sa);
405}
406
407/* =========================== Events notification ========================== */
408
409void sentinelCallNotificationScript(char *scriptpath, char *type, char *msg) {
410 pid_t pid = fork();
411
412 if (pid == -1) {
413 /* Parent on error. */
414 sentinelEvent(REDIS_WARNING,"-notification-script-error",NULL,
415 "#can't fork: %s",strerror(errno));
416 return;
417 } else if (pid == 0) {
418 /* Child */
419 char *argv[4];
420
421 argv[0] = scriptpath;
422 argv[1] = type;
423 argv[2] = msg;
424 argv[3] = NULL;
425 execve(scriptpath,argv,environ);
426 /* If we are here an error occurred. */
427 sentinelEvent(REDIS_WARNING,"-notification-script-error",NULL,
428 "#execve(2): %s",strerror(errno));
429 _exit(1);
430 } else {
431 sentinelEvent(REDIS_DEBUG,"+child",NULL,"%ld",(long)pid);
432 }
433}
434
435/* Send an event to log, pub/sub, user notification script.
436 *
437 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
438 * the execution of the user notification script.
439 *
440 * 'type' is the message type, also used as a pub/sub channel name.
441 *
442 * 'ri', is the redis instance target of this event if applicable, and is
443 * used to obtain the path of the notification script to execute.
444 *
445 * The remaining arguments are printf-alike.
446 * If the format specifier starts with the two characters "%@" then ri is
447 * not NULL, and the message is prefixed with an instance identifier in the
448 * following format:
449 *
450 * <instance type> <instance name> <ip> <port>
451 *
452 * If the instance type is not master, than the additional string is
453 * added to specify the originating master:
454 *
455 * @ <master name> <master ip> <master port>
456 *
457 * Any other specifier after "%@" is processed by printf itself.
458 */
459void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
460 const char *fmt, ...) {
461 va_list ap;
462 char msg[REDIS_MAX_LOGMSG_LEN];
463 robj *channel, *payload;
464
465 /* Handle %@ */
466 if (fmt[0] == '%' && fmt[1] == '@') {
467 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
468 NULL : ri->master;
469
470 if (master) {
471 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
472 sentinelRedisInstanceTypeStr(ri),
473 ri->name, ri->addr->ip, ri->addr->port,
474 master->name, master->addr->ip, master->addr->port);
475 } else {
476 snprintf(msg, sizeof(msg), "%s %s %s %d",
477 sentinelRedisInstanceTypeStr(ri),
478 ri->name, ri->addr->ip, ri->addr->port);
479 }
480 fmt += 2;
481 } else {
482 msg[0] = '\0';
483 }
484
485 /* Use vsprintf for the rest of the formatting if any. */
486 if (fmt[0] != '\0') {
487 va_start(ap, fmt);
488 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
489 va_end(ap);
490 }
491
492 /* Log the message if the log level allows it to be logged. */
493 if (level >= server.verbosity)
494 redisLog(level,"%s %s",type,msg);
495
496 /* Publish the message via Pub/Sub if it's not a debugging one. */
497 if (level != REDIS_DEBUG) {
498 channel = createStringObject(type,strlen(type));
499 payload = createStringObject(msg,strlen(msg));
500 pubsubPublishMessage(channel,payload);
501 decrRefCount(channel);
502 decrRefCount(payload);
503 }
504
505 /* Call the notification script if applicable. */
506 if (level == REDIS_WARNING && ri != NULL) {
507 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
508 ri : ri->master;
509 if (master->notification_script) {
510 sentinelCallNotificationScript(master->notification_script,
511 type,msg);
512 }
513 }
514}
515
516/* ========================== sentinelRedisInstance ========================= */
517
518/* Create a redis instance, the following fields must be populated by the
519 * caller if needed:
520 * runid: set to NULL but will be populated once INFO output is received.
521 * info_refresh: is set to 0 to mean that we never received INFO so far.
522 *
523 * If SRI_MASTER is set into initial flags the instance is added to
524 * sentinel.masters table.
525 *
526 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
527 * instance is added into master->slaves or master->sentinels table.
528 *
529 * If the instance is a slave or sentinel, the name parameter is ignored and
530 * is created automatically as hostname:port.
531 *
532 * The function fails if hostname can't be resolved or port is out of range.
533 * When this happens NULL is returned and errno is set accordingly to the
534 * createSentinelAddr() function.
535 *
536 * The function may also fail and return NULL with errno set to EBUSY if
537 * a master or slave with the same name already exists. */
538sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
539 sentinelRedisInstance *ri;
540 sentinelAddr *addr;
541 dict *table;
542 char slavename[128], *sdsname;
543
544 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
545 redisAssert((flags & SRI_MASTER) || master != NULL);
546
547 /* Check address validity. */
548 addr = createSentinelAddr(hostname,port);
549 if (addr == NULL) return NULL;
550
551 /* For slaves and sentinel we use ip:port as name. */
552 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
553 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
554 name = slavename;
555 }
556
557 /* Make sure the entry is not duplicated. This may happen when the same
558 * name for a master is used multiple times inside the configuration or
559 * if we try to add multiple times a slave or sentinel with same ip/port
560 * to a master. */
561 if (flags & SRI_MASTER) table = sentinel.masters;
562 else if (flags & SRI_SLAVE) table = master->slaves;
563 else if (flags & SRI_SENTINEL) table = master->sentinels;
564 sdsname = sdsnew(name);
565 if (dictFind(table,sdsname)) {
566 sdsfree(sdsname);
567 errno = EBUSY;
568 return NULL;
569 }
570
571 /* Create the instance object. */
572 ri = zmalloc(sizeof(*ri));
573 /* Note that all the instances are started in the disconnected state,
574 * the event loop will take care of connecting them. */
575 ri->flags = flags | SRI_DISCONNECTED;
576 ri->name = sdsname;
577 ri->runid = NULL;
578 ri->addr = addr;
579 ri->cc = NULL;
580 ri->pc = NULL;
581 ri->pending_commands = 0;
582 ri->cc_conn_time = 0;
583 ri->pc_conn_time = 0;
584 ri->pc_last_activity = 0;
585 ri->last_avail_time = mstime();
586 ri->last_pong_time = mstime();
587 ri->last_pub_time = mstime();
588 ri->last_hello_time = mstime();
589 ri->last_master_down_reply_time = mstime();
590 ri->s_down_since_time = 0;
591 ri->o_down_since_time = 0;
592 ri->down_after_period = master ? master->down_after_period :
593 SENTINEL_DOWN_AFTER_PERIOD;
594 ri->master_link_down_time = 0;
595 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
596 ri->slave_reconf_sent_time = 0;
597 ri->slave_master_host = NULL;
598 ri->slave_master_port = 0;
599 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
600 ri->sentinels = dictCreate(&instancesDictType,NULL);
601 ri->quorum = quorum;
602 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
603 ri->master = master;
604 ri->slaves = dictCreate(&instancesDictType,NULL);
605 ri->info_refresh = 0;
606
607 /* Failover state. */
608 ri->leader = NULL;
609 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
610 ri->failover_state_change_time = 0;
611 ri->failover_start_time = 0;
612 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
613 ri->promoted_slave = NULL;
614 ri->notification_script = NULL;
615 ri->client_reconfig_script = NULL;
616
617 /* Add into the right table. */
618 dictAdd(table, ri->name, ri);
619 return ri;
620}
621
622/* Release this instance and all its slaves, sentinels, hiredis connections.
623 * This function also takes care of unlinking the instance from the main
624 * masters table (if it is a master) or from its master sentinels/slaves table
625 * if it is a slave or sentinel. */
626void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
627 /* Release all its slaves or sentinels if any. */
628 dictRelease(ri->sentinels);
629 dictRelease(ri->slaves);
630
631 /* Release hiredis connections. */
632 if (ri->cc) sentinelKillLink(ri,ri->cc);
633 if (ri->pc) sentinelKillLink(ri,ri->pc);
634
635 /* Free other resources. */
636 sdsfree(ri->name);
637 sdsfree(ri->runid);
638 sdsfree(ri->notification_script);
639 sdsfree(ri->client_reconfig_script);
640 sdsfree(ri->slave_master_host);
641 sdsfree(ri->leader);
642 releaseSentinelAddr(ri->addr);
643
644 /* Clear state into the master if needed. */
645 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
646 ri->master->promoted_slave = NULL;
647
648 zfree(ri);
649}
650
651/* Lookup a slave in a master Redis instance, by ip and port. */
652sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
653 sentinelRedisInstance *ri, char *ip, int port)
654{
655 sds key;
656 sentinelRedisInstance *slave;
657
658 redisAssert(ri->flags & SRI_MASTER);
659 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
660 slave = dictFetchValue(ri->slaves,key);
661 sdsfree(key);
662 return slave;
663}
664
665/* Return the name of the type of the instance as a string. */
666const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
667 if (ri->flags & SRI_MASTER) return "master";
668 else if (ri->flags & SRI_SLAVE) return "slave";
669 else if (ri->flags & SRI_SENTINEL) return "sentinel";
670 else return "unknown";
671}
672
673/* This function removes all the instances found in the dictionary of instances
674 * 'd', having either:
675 *
676 * 1) The same ip/port as specified.
677 * 2) The same runid.
678 *
679 * "1" and "2" don't need to verify at the same time, just one is enough.
680 * If "runid" is NULL it is not checked.
681 * Similarly if "ip" is NULL it is not checked.
682 *
683 * This function is useful because every time we add a new Sentinel into
684 * a master's Sentinels dictionary, we want to be very sure about not
685 * having duplicated instances for any reason. This is so important because
686 * we use those other sentinels in order to run our quorum protocol to
687 * understand if it's time to proceeed with the fail over.
688 *
689 * Making sure no duplication is possible we greately improve the robustness
690 * of the quorum (otherwise we may end counting the same instance multiple
691 * times for some reason).
692 *
693 * The function returns the number of Sentinels removed. */
694int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
695 dictIterator *di;
696 dictEntry *de;
697 int removed = 0;
698
699 di = dictGetSafeIterator(master->sentinels);
700 while((de = dictNext(di)) != NULL) {
701 sentinelRedisInstance *ri = dictGetVal(de);
702
703 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
704 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
705 {
706 dictDelete(master->sentinels,ri->name);
707 removed++;
708 }
709 }
710 dictReleaseIterator(di);
711 return removed;
712}
713
714/* Search an instance with the same runid, ip and port into a dictionary
715 * of instances. Return NULL if not found, otherwise return the instance
716 * pointer.
717 *
718 * runid or ip can be NULL. In such a case the search is performed only
719 * by the non-NULL field. */
720sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
721 dictIterator *di;
722 dictEntry *de;
723 sentinelRedisInstance *instance = NULL;
724
725 redisAssert(ip || runid); /* User must pass at least one search param. */
726 di = dictGetIterator(instances);
727 while((de = dictNext(di)) != NULL) {
728 sentinelRedisInstance *ri = dictGetVal(de);
729
730 if (runid && !ri->runid) continue;
731 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
732 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
733 ri->addr->port == port)))
734 {
735 instance = ri;
736 break;
737 }
738 }
739 dictReleaseIterator(di);
740 return instance;
741}
742
743/* Simple master lookup by name */
744sentinelRedisInstance *sentinelGetMasterByName(char *name) {
745 sentinelRedisInstance *ri;
746 sds sdsname = sdsnew(name);
747
748 ri = dictFetchValue(sentinel.masters,sdsname);
749 sdsfree(sdsname);
750 return ri;
751}
752
753/* Add the specified flags to all the instances in the specified dictionary. */
754void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
755 dictIterator *di;
756 dictEntry *de;
757
758 di = dictGetIterator(instances);
759 while((de = dictNext(di)) != NULL) {
760 sentinelRedisInstance *ri = dictGetVal(de);
761 ri->flags |= flags;
762 }
763 dictReleaseIterator(di);
764}
765
766/* Remove the specified flags to all the instances in the specified
767 * dictionary. */
768void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
769 dictIterator *di;
770 dictEntry *de;
771
772 di = dictGetIterator(instances);
773 while((de = dictNext(di)) != NULL) {
774 sentinelRedisInstance *ri = dictGetVal(de);
775 ri->flags &= ~flags;
776 }
777 dictReleaseIterator(di);
778}
779
780/* Reset the state of a monitored master:
781 * 1) Remove all slaves.
782 * 2) Remove all sentinels.
783 * 3) Remove most of the flags resulting from runtime operations.
784 * 4) Reset timers to their default value.
785 * 5) In the process of doing this undo the failover if in progress.
786 * 6) Disconnect the connections with the master (will reconnect automatically).
787 */
788void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
789 redisAssert(ri->flags & SRI_MASTER);
790 dictRelease(ri->slaves);
791 dictRelease(ri->sentinels);
792 ri->slaves = dictCreate(&instancesDictType,NULL);
793 ri->sentinels = dictCreate(&instancesDictType,NULL);
794 if (ri->cc) sentinelKillLink(ri,ri->cc);
795 if (ri->pc) sentinelKillLink(ri,ri->pc);
796 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
797 if (ri->leader) {
798 sdsfree(ri->leader);
799 ri->leader = NULL;
800 }
801 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
802 ri->failover_state_change_time = 0;
803 ri->failover_start_time = 0;
804 ri->promoted_slave = NULL;
805 sdsfree(ri->runid);
806 sdsfree(ri->slave_master_host);
807 ri->runid = NULL;
808 ri->slave_master_host = NULL;
809 ri->last_avail_time = mstime();
810 ri->last_pong_time = mstime();
811 if (flags & SENTINEL_GENERATE_EVENT)
812 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
813}
814
815/* Call sentinelResetMaster() on every master with a name matching the specified
816 * pattern. */
817int sentinelResetMastersByPattern(char *pattern, int flags) {
818 dictIterator *di;
819 dictEntry *de;
820 int reset = 0;
821
822 di = dictGetIterator(sentinel.masters);
823 while((de = dictNext(di)) != NULL) {
824 sentinelRedisInstance *ri = dictGetVal(de);
825
826 if (ri->name) {
827 if (stringmatch(pattern,ri->name,0)) {
828 sentinelResetMaster(ri,flags);
829 reset++;
830 }
831 }
832 }
833 dictReleaseIterator(di);
834 return reset;
835}
836
837/* Reset the specified master with sentinelResetMaster(), and also change
838 * the ip:port address, but take the name of the instance unmodified.
839 *
840 * This is used to handle the +switch-master and +redirect-to-master events.
841 *
842 * The function returns REDIS_ERR if the address can't be resolved for some
843 * reason. Otherwise REDIS_OK is returned.
844 *
845 * TODO: make this reset so that original sentinels are re-added with
846 * same ip / port / runid.
847 */
848
849int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
850 sentinelAddr *oldaddr, *newaddr;
851
852 newaddr = createSentinelAddr(ip,port);
853 if (newaddr == NULL) return REDIS_ERR;
854 sentinelResetMaster(master,SENTINEL_NO_FLAGS);
855 oldaddr = master->addr;
856 master->addr = newaddr;
857 /* Release the old address at the end so we are safe even if the function
858 * gets the master->addr->ip and master->addr->port as arguments. */
859 releaseSentinelAddr(oldaddr);
860 return REDIS_OK;
861}
862
863/* ============================ Config handling ============================= */
864char *sentinelHandleConfiguration(char **argv, int argc) {
865 sentinelRedisInstance *ri;
866
867 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
868 /* monitor <name> <host> <port> <quorum> */
869 int quorum = atoi(argv[4]);
870
871 if (quorum <= 0) return "Quorum must be 1 or greater.";
872 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
873 atoi(argv[3]),quorum,NULL) == NULL)
874 {
875 switch(errno) {
876 case EBUSY: return "Duplicated master name.";
877 case ENOENT: return "Can't resolve master instance hostname.";
878 case EINVAL: return "Invalid port number";
879 }
880 }
881 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
882 /* down-after-milliseconds <name> <milliseconds> */
883 ri = sentinelGetMasterByName(argv[1]);
884 if (!ri) return "No such master with specified name.";
885 ri->down_after_period = atoi(argv[2]);
886 if (ri->down_after_period <= 0)
887 return "negative or zero time parameter.";
888 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
889 /* failover-timeout <name> <milliseconds> */
890 ri = sentinelGetMasterByName(argv[1]);
891 if (!ri) return "No such master with specified name.";
892 ri->failover_timeout = atoi(argv[2]);
893 if (ri->failover_timeout <= 0)
894 return "negative or zero time parameter.";
895 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
896 /* can-failover <name> <yes/no> */
897 int yesno = yesnotoi(argv[2]);
898
899 ri = sentinelGetMasterByName(argv[1]);
900 if (!ri) return "No such master with specified name.";
901 if (yesno == -1) return "Argument must be either yes or no.";
902 if (yesno)
903 ri->flags |= SRI_CAN_FAILOVER;
904 else
905 ri->flags &= ~SRI_CAN_FAILOVER;
906 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
907 /* parallel-syncs <name> <milliseconds> */
908 ri = sentinelGetMasterByName(argv[1]);
909 if (!ri) return "No such master with specified name.";
910 ri->parallel_syncs = atoi(argv[2]);
911 } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
912 /* notification-script <name> <path> */
913 ri = sentinelGetMasterByName(argv[1]);
914 if (!ri) return "No such master with specified name.";
915 if (access(argv[2],X_OK) == -1)
916 return "Notification script seems non existing or non executable.";
917 ri->notification_script = sdsnew(argv[2]);
918 } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
919 /* client-reconfig-script <name> <path> */
920 ri = sentinelGetMasterByName(argv[1]);
921 if (!ri) return "No such master with specified name.";
922 if (access(argv[2],X_OK) == -1)
923 return "Client reconfiguration script seems non existing or "
924 "non executable.";
925 ri->client_reconfig_script = sdsnew(argv[2]);
926 } else {
927 return "Unrecognized sentinel configuration statement.";
928 }
929 return NULL;
930}
931
932/* ====================== hiredis connection handling ======================= */
933
934/* Completely disconnect an hiredis link from an instance. */
935void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
936 if (ri->cc == c) {
937 ri->cc = NULL;
938 ri->pending_commands = 0;
939 }
940 if (ri->pc == c) ri->pc = NULL;
941 c->data = NULL;
942 ri->flags |= SRI_DISCONNECTED;
943 redisAsyncFree(c);
944}
945
946/* This function takes an hiredis context that is in an error condition
947 * and make sure to mark the instance as disconnected performing the
948 * cleanup needed.
949 *
950 * Note: we don't free the hiredis context as hiredis will do it for us
951 * for async conenctions. */
952void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
953 sentinelRedisInstance *ri = c->data;
954 int pubsub;
955
956 if (ri == NULL) return; /* The instance no longer exists. */
957
958 pubsub = (ri->pc == c);
959 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
960 "%@ #%s", c->errstr);
961 if (pubsub)
962 ri->pc = NULL;
963 else
964 ri->cc = NULL;
965 ri->flags |= SRI_DISCONNECTED;
966}
967
968void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
969 if (status != REDIS_OK) {
970 sentinelDisconnectInstanceFromContext(c);
971 } else {
972 sentinelRedisInstance *ri = c->data;
973 int pubsub = (ri->pc == c);
974
975 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
976 "%@");
977 }
978}
979
980void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
981 sentinelDisconnectInstanceFromContext(c);
982}
983
984/* Create the async connections for the specified instance if the instance
985 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
986 * one of the two links (commands and pub/sub) is missing. */
987void sentinelReconnectInstance(sentinelRedisInstance *ri) {
988 if (!(ri->flags & SRI_DISCONNECTED)) return;
989
990 /* Commands connection. */
991 if (ri->cc == NULL) {
992 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
993 if (ri->cc->err) {
994 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
995 ri->cc->errstr);
996 sentinelKillLink(ri,ri->cc);
997 } else {
998 ri->cc_conn_time = mstime();
999 ri->cc->data = ri;
1000 redisAeAttach(server.el,ri->cc);
1001 redisAsyncSetConnectCallback(ri->cc,
1002 sentinelLinkEstablishedCallback);
1003 redisAsyncSetDisconnectCallback(ri->cc,
1004 sentinelDisconnectCallback);
1005 }
1006 }
1007 /* Pub / Sub */
1008 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
1009 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1010 if (ri->pc->err) {
1011 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
1012 ri->pc->errstr);
1013 sentinelKillLink(ri,ri->pc);
1014 } else {
1015 int retval;
1016
1017 ri->pc_conn_time = mstime();
1018 ri->pc->data = ri;
1019 redisAeAttach(server.el,ri->pc);
1020 redisAsyncSetConnectCallback(ri->pc,
1021 sentinelLinkEstablishedCallback);
1022 redisAsyncSetDisconnectCallback(ri->pc,
1023 sentinelDisconnectCallback);
1024 /* Now we subscribe to the Sentinels "Hello" channel. */
1025 retval = redisAsyncCommand(ri->pc,
1026 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
1027 SENTINEL_HELLO_CHANNEL);
1028 if (retval != REDIS_OK) {
1029 /* If we can't subscribe, the Pub/Sub connection is useless
1030 * and we can simply disconnect it and try again. */
1031 sentinelKillLink(ri,ri->pc);
1032 return;
1033 }
1034 }
1035 }
1036 /* Clear the DISCONNECTED flags only if we have both the connections
1037 * (or just the commands connection if this is a slave or a
1038 * sentinel instance). */
1039 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
1040 ri->flags &= ~SRI_DISCONNECTED;
1041}
1042
1043/* ======================== Redis instances pinging ======================== */
1044
1045/* Process the INFO output from masters. */
1046void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
1047 sds *lines;
1048 int numlines, j;
1049 int role = 0;
1050 int runid_changed = 0; /* true if runid changed. */
1051 int first_runid = 0; /* true if this is the first runid we receive. */
1052
1053 /* The following fields must be reset to a given value in the case they
1054 * are not found at all in the INFO output. */
1055 ri->master_link_down_time = 0;
1056
1057 /* Process line by line. */
1058 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
1059 for (j = 0; j < numlines; j++) {
1060 sentinelRedisInstance *slave;
1061 sds l = lines[j];
1062
1063 /* run_id:<40 hex chars>*/
1064 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
1065 if (ri->runid == NULL) {
1066 ri->runid = sdsnewlen(l+7,40);
1067 first_runid = 1;
1068 } else {
1069 if (strncmp(ri->runid,l+7,40) != 0) {
1070 runid_changed = 1;
1071 sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
1072 sdsfree(ri->runid);
1073 ri->runid = sdsnewlen(l+7,40);
1074 }
1075 }
1076 }
1077
1078 /* slave0:<ip>,<port>,<state> */
1079 if ((ri->flags & SRI_MASTER) &&
1080 sdslen(l) >= 7 &&
1081 !memcmp(l,"slave",5) && isdigit(l[5]))
1082 {
1083 char *ip, *port, *end;
1084
1085 ip = strchr(l,':'); if (!ip) continue;
1086 ip++; /* Now ip points to start of ip address. */
1087 port = strchr(ip,','); if (!port) continue;
1088 *port = '\0'; /* nul term for easy access. */
1089 port++; /* Now port points to start of port number. */
1090 end = strchr(port,','); if (!end) continue;
1091 *end = '\0'; /* nul term for easy access. */
1092
1093 /* Check if we already have this slave into our table,
1094 * otherwise add it. */
1095 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1096 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1097 atoi(port), ri->quorum,ri)) != NULL)
1098 {
1099 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1100 }
1101 }
1102 }
1103
1104 /* master_link_down_since_seconds:<seconds> */
1105 if (sdslen(l) >= 32 &&
1106 !memcmp(l,"master_link_down_since_seconds",30))
1107 {
1108 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1109 }
1110
1111 /* role:<role> */
1112 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1113 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1114
1115 if (role == SRI_SLAVE) {
1116 /* master_host:<host> */
1117 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1118 sdsfree(ri->slave_master_host);
1119 ri->slave_master_host = sdsnew(l+12);
1120 }
1121
1122 /* master_port:<port> */
1123 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1124 ri->slave_master_port = atoi(l+12);
1125
1126 /* master_link_status:<status> */
1127 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1128 ri->slave_master_link_status =
1129 (strcasecmp(l+19,"up") == 0) ?
1130 SENTINEL_MASTER_LINK_STATUS_UP :
1131 SENTINEL_MASTER_LINK_STATUS_DOWN;
1132 }
1133 }
1134 }
1135 ri->info_refresh = mstime();
1136 sdsfreesplitres(lines,numlines);
1137
1138 if (sentinel.tilt) return;
1139
1140 /* Act if a master turned into a slave. */
1141 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
1142 if (first_runid && ri->slave_master_host) {
1143 /* If it is the first time we receive INFO from it, but it's
1144 * a slave while it was configured as a master, we want to monitor
1145 * its master instead. */
1146 sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
1147 "%s %s %d %s %d",
1148 ri->name, ri->addr->ip, ri->addr->port,
1149 ri->slave_master_host, ri->slave_master_port);
1150 sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
1151 ri->slave_master_port);
1152 return;
1153 }
1154 }
1155
1156 /* Act if a slave turned into a master. */
1157 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
1158 if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1159 (runid_changed || first_runid))
1160 {
1161 /* If a slave turned into a master, but at the same time the
1162 * runid has changed, or it is simply the first time we see and
1163 * INFO output from this instance, this is a reboot with a wrong
1164 * configuration.
1165 *
1166 * Log the event and remove the slave. */
1167 int retval;
1168
1169 sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
1170 retval = dictDelete(ri->master->slaves,ri->name);
1171 redisAssert(retval == REDIS_OK);
1172 return;
1173 } else if (ri->flags & SRI_PROMOTED) {
1174 /* If this is a promoted slave we can change state to the
1175 * failover state machine. */
1176 if (ri->master &&
1177 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1178 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1179 (ri->master->failover_state ==
1180 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1181 {
1182 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1183 ri->master->failover_state_change_time = mstime();
1184 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1185 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1186 ri->master,"%@");
1187 }
1188 } else {
1189 /* Otherwise we interpret this as the start of the failover. */
1190 if (ri->master &&
1191 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) == 0)
1192 {
1193 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1194 sentinelEvent(REDIS_WARNING,"failover-detected",ri->master,"%@");
1195 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1196 ri->master->failover_state_change_time = mstime();
1197 ri->master->promoted_slave = ri;
1198 ri->flags |= SRI_PROMOTED;
1199 /* We are an observer, so we can only assume that the leader
1200 * is reconfiguring the slave instances. For this reason we
1201 * set all the instances as RECONF_SENT waiting for progresses
1202 * on this side. */
1203 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1204 SRI_RECONF_SENT);
1205 }
1206 }
1207 }
1208
1209 /* Detect if the slave that is in the process of being reconfigured
1210 * changed state. */
1211 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1212 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1213 {
1214 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1215 if ((ri->flags & SRI_RECONF_SENT) &&
1216 ri->slave_master_host &&
1217 strcmp(ri->slave_master_host,
1218 ri->master->promoted_slave->addr->ip) == 0 &&
1219 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1220 {
1221 ri->flags &= ~SRI_RECONF_SENT;
1222 ri->flags |= SRI_RECONF_INPROG;
1223 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1224 }
1225
1226 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1227 if ((ri->flags & SRI_RECONF_INPROG) &&
1228 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1229 {
1230 ri->flags &= ~SRI_RECONF_INPROG;
1231 ri->flags |= SRI_RECONF_DONE;
1232 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1233 /* If we are moving forward (a new slave is now configured)
1234 * we update the change_time as we are conceptually passing
1235 * to the next slave. */
1236 ri->failover_state_change_time = mstime();
1237 }
1238 }
1239}
1240
1241void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1242 sentinelRedisInstance *ri = c->data;
1243 redisReply *r;
1244
1245 if (ri) ri->pending_commands--;
1246 if (!reply || !ri) return;
1247 r = reply;
1248
1249 if (r->type == REDIS_REPLY_STRING) {
1250 sentinelRefreshInstanceInfo(ri,r->str);
1251 }
1252}
1253
1254/* Just discard the reply. We use this when we are not monitoring the return
1255 * value of the command but its effects directly. */
1256void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1257 sentinelRedisInstance *ri = c->data;
1258
1259 if (ri) ri->pending_commands--;
1260}
1261
1262void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1263 sentinelRedisInstance *ri = c->data;
1264 redisReply *r;
1265
1266 if (ri) ri->pending_commands--;
1267 if (!reply || !ri) return;
1268 r = reply;
1269
1270 if (r->type == REDIS_REPLY_STATUS ||
1271 r->type == REDIS_REPLY_ERROR) {
1272 /* Update the "instance available" field only if this is an
1273 * acceptable reply. */
1274 if (strncmp(r->str,"PONG",4) == 0 ||
1275 strncmp(r->str,"LOADING",7) == 0 ||
1276 strncmp(r->str,"MASTERDOWN",10) == 0)
1277 {
1278 ri->last_avail_time = mstime();
1279 }
1280 }
1281 ri->last_pong_time = mstime();
1282}
1283
1284/* This is called when we get the reply about the PUBLISH command we send
1285 * to the master to advertise this sentinel. */
1286void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1287 sentinelRedisInstance *ri = c->data;
1288 redisReply *r;
1289
1290 if (ri) ri->pending_commands--;
1291 if (!reply || !ri) return;
1292 r = reply;
1293
1294 /* Only update pub_time if we actually published our message. Otherwise
1295 * we'll retry against in 100 milliseconds. */
1296 if (r->type != REDIS_REPLY_ERROR)
1297 ri->last_pub_time = mstime();
1298}
1299
1300/* This is our Pub/Sub callback for the Hello channel. It's useful in order
1301 * to discover other sentinels attached at the same master. */
1302void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1303 sentinelRedisInstance *ri = c->data;
1304 redisReply *r;
1305
1306 if (!reply || !ri) return;
1307 r = reply;
1308
1309 /* Update the last activity in the pubsub channel. Note that since we
1310 * receive our messages as well this timestamp can be used to detect
1311 * if the link is probably diconnected even if it seems otherwise. */
1312 ri->pc_last_activity = mstime();
1313
1314 /* Sanity check in the reply we expect, so that the code that follows
1315 * can avoid to check for details. */
1316 if (r->type != REDIS_REPLY_ARRAY ||
1317 r->elements != 3 ||
1318 r->element[0]->type != REDIS_REPLY_STRING ||
1319 r->element[1]->type != REDIS_REPLY_STRING ||
1320 r->element[2]->type != REDIS_REPLY_STRING ||
1321 strcmp(r->element[0]->str,"message") != 0) return;
1322
1323 /* We are not interested in meeting ourselves */
1324 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1325
1326 {
1327 int numtokens, port, removed, canfailover;
1328 char **token = sdssplitlen(r->element[2]->str,
1329 r->element[2]->len,
1330 ":",1,&numtokens);
1331 sentinelRedisInstance *sentinel;
1332
1333 if (numtokens == 4) {
1334 /* First, try to see if we already have this sentinel. */
1335 port = atoi(token[1]);
1336 canfailover = atoi(token[3]);
1337 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1338 ri->sentinels,token[0],port,token[2]);
1339
1340 if (!sentinel) {
1341 /* If not, remove all the sentinels that have the same runid
1342 * OR the same ip/port, because it's either a restart or a
1343 * network topology change. */
1344 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1345 token[2]);
1346 if (removed) {
1347 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1348 "%@ #duplicate of %s:%d or %s",
1349 token[0],port,token[2]);
1350 }
1351
1352 /* Add the new sentinel. */
1353 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1354 token[0],port,ri->quorum,ri);
1355 if (sentinel) {
1356 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1357 /* The runid is NULL after a new instance creation and
1358 * for Sentinels we don't have a later chance to fill it,
1359 * so do it now. */
1360 sentinel->runid = sdsnew(token[2]);
1361 }
1362 }
1363
1364 /* Update the state of the Sentinel. */
1365 if (sentinel) {
1366 sentinel->last_hello_time = mstime();
1367 if (canfailover)
1368 sentinel->flags |= SRI_CAN_FAILOVER;
1369 else
1370 sentinel->flags &= ~SRI_CAN_FAILOVER;
1371 }
1372 }
1373 sdsfreesplitres(token,numtokens);
1374 }
1375}
1376
1377void sentinelPingInstance(sentinelRedisInstance *ri) {
1378 mstime_t now = mstime();
1379 mstime_t info_period;
1380 int retval;
1381
1382 /* Return ASAP if we have already a PING or INFO already pending, or
1383 * in the case the instance is not properly connected. */
1384 if (ri->flags & SRI_DISCONNECTED) return;
1385
1386 /* For INFO, PING, PUBLISH that are not critical commands to send we
1387 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1388 * want to use a lot of memory just because a link is not working
1389 * properly (note that anyway there is a redundant protection about this,
1390 * that is, the link will be disconnected and reconnected if a long
1391 * timeout condition is detected. */
1392 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1393
1394 /* If this is a slave of a master in O_DOWN condition we start sending
1395 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1396 * period. In this state we want to closely monitor slaves in case they
1397 * are turned into masters by another Sentinel, or by the sysadmin. */
1398 if ((ri->flags & SRI_SLAVE) &&
1399 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1400 info_period = 1000;
1401 } else {
1402 info_period = SENTINEL_INFO_PERIOD;
1403 }
1404
1405 if ((ri->flags & SRI_SENTINEL) == 0 &&
1406 (ri->info_refresh == 0 ||
1407 (now - ri->info_refresh) > info_period))
1408 {
1409 /* Send INFO to masters and slaves, not sentinels. */
1410 retval = redisAsyncCommand(ri->cc,
1411 sentinelInfoReplyCallback, NULL, "INFO");
1412 if (retval != REDIS_OK) return;
1413 ri->pending_commands++;
1414 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1415 /* Send PING to all the three kinds of instances. */
1416 retval = redisAsyncCommand(ri->cc,
1417 sentinelPingReplyCallback, NULL, "PING");
1418 if (retval != REDIS_OK) return;
1419 ri->pending_commands++;
1420 } else if ((ri->flags & SRI_MASTER) &&
1421 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1422 {
1423 /* PUBLISH hello messages only to masters. */
1424 struct sockaddr_in sa;
1425 socklen_t salen = sizeof(sa);
1426
1427 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1428 char myaddr[128];
1429
1430 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1431 inet_ntoa(sa.sin_addr), server.port, server.runid,
1432 (ri->flags & SRI_CAN_FAILOVER) != 0);
1433 retval = redisAsyncCommand(ri->cc,
1434 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1435 SENTINEL_HELLO_CHANNEL,myaddr);
1436 if (retval != REDIS_OK) return;
1437 ri->pending_commands++;
1438 }
1439 }
1440}
1441
1442/* =========================== SENTINEL command ============================= */
1443
1444const char *sentinelFailoverStateStr(int state) {
1445 switch(state) {
1446 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1447 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1448 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1449 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1450 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1451 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1452 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1453 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1454 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1455 default: return "unknown";
1456 }
1457}
1458
1459/* Redis instance to Redis protocol representation. */
1460void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1461 char *flags = sdsempty();
1462 void *mbl;
1463 int fields = 0;
1464
1465 mbl = addDeferredMultiBulkLength(c);
1466
1467 addReplyBulkCString(c,"name");
1468 addReplyBulkCString(c,ri->name);
1469 fields++;
1470
1471 addReplyBulkCString(c,"ip");
1472 addReplyBulkCString(c,ri->addr->ip);
1473 fields++;
1474
1475 addReplyBulkCString(c,"port");
1476 addReplyBulkLongLong(c,ri->addr->port);
1477 fields++;
1478
1479 addReplyBulkCString(c,"runid");
1480 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1481 fields++;
1482
1483 addReplyBulkCString(c,"flags");
1484 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1485 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1486 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1487 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1488 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1489 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1490 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1491 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1492 flags = sdscat(flags,"failover_in_progress,");
1493 if (ri->flags & SRI_I_AM_THE_LEADER)
1494 flags = sdscat(flags,"i_am_the_leader,");
1495 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1496 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1497 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1498 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1499
1500 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1501 addReplyBulkCString(c,flags);
1502 sdsfree(flags);
1503 fields++;
1504
1505 addReplyBulkCString(c,"pending-commands");
1506 addReplyBulkLongLong(c,ri->pending_commands);
1507 fields++;
1508
1509 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1510 addReplyBulkCString(c,"failover-state");
1511 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1512 fields++;
1513 }
1514
1515 addReplyBulkCString(c,"last-ok-ping-reply");
1516 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1517 fields++;
1518
1519 addReplyBulkCString(c,"last-ping-reply");
1520 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1521 fields++;
1522
1523 if (ri->flags & SRI_S_DOWN) {
1524 addReplyBulkCString(c,"s-down-time");
1525 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1526 fields++;
1527 }
1528
1529 if (ri->flags & SRI_O_DOWN) {
1530 addReplyBulkCString(c,"o-down-time");
1531 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1532 fields++;
1533 }
1534
1535 /* Masters and Slaves */
1536 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1537 addReplyBulkCString(c,"info-refresh");
1538 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1539 fields++;
1540 }
1541
1542 /* Only masters */
1543 if (ri->flags & SRI_MASTER) {
1544 addReplyBulkCString(c,"num-slaves");
1545 addReplyBulkLongLong(c,dictSize(ri->slaves));
1546 fields++;
1547
1548 addReplyBulkCString(c,"num-other-sentinels");
1549 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1550 fields++;
1551
1552 addReplyBulkCString(c,"quorum");
1553 addReplyBulkLongLong(c,ri->quorum);
1554 fields++;
1555 }
1556
1557 /* Only slaves */
1558 if (ri->flags & SRI_SLAVE) {
1559 addReplyBulkCString(c,"master-link-down-time");
1560 addReplyBulkLongLong(c,ri->master_link_down_time);
1561 fields++;
1562
1563 addReplyBulkCString(c,"master-link-status");
1564 addReplyBulkCString(c,
1565 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1566 "ok" : "err");
1567 fields++;
1568
1569 addReplyBulkCString(c,"master-host");
1570 addReplyBulkCString(c,
1571 ri->slave_master_host ? ri->slave_master_host : "?");
1572 fields++;
1573
1574 addReplyBulkCString(c,"master-port");
1575 addReplyBulkLongLong(c,ri->slave_master_port);
1576 fields++;
1577 }
1578
1579 /* Only sentinels */
1580 if (ri->flags & SRI_SENTINEL) {
1581 addReplyBulkCString(c,"last-hello-message");
1582 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1583 fields++;
1584
1585 addReplyBulkCString(c,"can-failover-its-master");
1586 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1587 fields++;
1588
1589 if (ri->flags & SRI_MASTER_DOWN) {
1590 addReplyBulkCString(c,"subjective-leader");
1591 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1592 fields++;
1593 }
1594 }
1595
1596 setDeferredMultiBulkLength(c,mbl,fields*2);
1597}
1598
1599/* Output a number of instances contanined inside a dictionary as
1600 * Redis protocol. */
1601void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1602 dictIterator *di;
1603 dictEntry *de;
1604
1605 di = dictGetIterator(instances);
1606 addReplyMultiBulkLen(c,dictSize(instances));
1607 while((de = dictNext(di)) != NULL) {
1608 sentinelRedisInstance *ri = dictGetVal(de);
1609
1610 addReplySentinelRedisInstance(c,ri);
1611 }
1612 dictReleaseIterator(di);
1613}
1614
1615/* Lookup the named master into sentinel.masters.
1616 * If the master is not found reply to the client with an error and returns
1617 * NULL. */
1618sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1619 robj *name)
1620{
1621 sentinelRedisInstance *ri;
1622
1623 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1624 if (!ri) {
1625 addReplyError(c,"No such master with that name");
1626 return NULL;
1627 }
1628 return ri;
1629}
1630
1631void sentinelCommand(redisClient *c) {
1632 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1633 /* SENTINEL MASTERS */
1634 if (c->argc != 2) goto numargserr;
1635
1636 addReplyDictOfRedisInstances(c,sentinel.masters);
1637 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1638 /* SENTINEL SLAVES <master-name> */
1639 sentinelRedisInstance *ri;
1640
1641 if (c->argc != 3) goto numargserr;
1642 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1643 return;
1644 addReplyDictOfRedisInstances(c,ri->slaves);
1645 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1646 /* SENTINEL SENTINELS <master-name> */
1647 sentinelRedisInstance *ri;
1648
1649 if (c->argc != 3) goto numargserr;
1650 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1651 return;
1652 addReplyDictOfRedisInstances(c,ri->sentinels);
1653 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1654 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1655 sentinelRedisInstance *ri;
1656 char *leader = NULL;
1657 long port;
1658 int isdown = 0;
1659
1660 if (c->argc != 4) goto numargserr;
1661 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
1662 return;
1663 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
1664 c->argv[2]->ptr,port,NULL);
1665
1666 /* It exists? Is actually a master? Is subjectively down? It's down.
1667 * Note: if we are in tilt mode we always reply with "0". */
1668 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
1669 (ri->flags & SRI_MASTER))
1670 isdown = 1;
1671 if (ri) leader = sentinelGetSubjectiveLeader(ri);
1672
1673 /* Reply with a two-elements multi-bulk reply: down state, leader. */
1674 addReplyMultiBulkLen(c,2);
1675 addReply(c, isdown ? shared.cone : shared.czero);
1676 addReplyBulkCString(c, leader ? leader : "?");
1677 if (leader) sdsfree(leader);
1678 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
1679 /* SENTINEL RESET <pattern> */
1680 if (c->argc != 3) goto numargserr;
1681 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
1682 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
1683 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
1684 sentinelRedisInstance *ri;
1685
1686 if (c->argc != 3) goto numargserr;
1687 ri = sentinelGetMasterByName(c->argv[2]->ptr);
1688 if (ri == NULL) {
1689 addReply(c,shared.nullmultibulk);
1690 } else {
1691 sentinelAddr *addr = ri->addr;
1692
1693 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
1694 addr = ri->promoted_slave->addr;
1695 addReplyMultiBulkLen(c,2);
1696 addReplyBulkCString(c,addr->ip);
1697 addReplyBulkLongLong(c,addr->port);
1698 }
1699 } else {
1700 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
1701 (char*)c->argv[1]->ptr);
1702 }
1703 return;
1704
1705numargserr:
1706 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
1707 (char*)c->argv[1]->ptr);
1708}
1709
1710/* ===================== SENTINEL availability checks ======================= */
1711
1712/* Is this instance down from our point of view? */
1713void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
1714 mstime_t elapsed = mstime() - ri->last_avail_time;
1715
1716 /* Check if we are in need for a reconnection of one of the
1717 * links, because we are detecting low activity.
1718 *
1719 * 1) Check if the command link seems connected, was connected not less
1720 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
1721 * idle time that is greater than down_after_period / 2 seconds. */
1722 if (ri->cc &&
1723 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1724 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
1725 {
1726 sentinelKillLink(ri,ri->cc);
1727 }
1728
1729 /* 2) Check if the pubsub link seems connected, was connected not less
1730 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
1731 * activity in the Pub/Sub channel for more than
1732 * SENTINEL_PUBLISH_PERIOD * 3.
1733 */
1734 if (ri->pc &&
1735 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1736 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
1737 {
1738 sentinelKillLink(ri,ri->pc);
1739 }
1740
1741 /* Update the subjectively down flag. */
1742 if (elapsed > ri->down_after_period) {
1743 /* Is subjectively down */
1744 if ((ri->flags & SRI_S_DOWN) == 0) {
1745 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
1746 ri->s_down_since_time = mstime();
1747 ri->flags |= SRI_S_DOWN;
1748 }
1749 } else {
1750 /* Is subjectively up */
1751 if (ri->flags & SRI_S_DOWN) {
1752 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
1753 ri->flags &= ~SRI_S_DOWN;
1754 }
1755 }
1756}
1757
1758/* Is this instance down accordingly to the configured quorum? */
1759void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
1760 dictIterator *di;
1761 dictEntry *de;
1762 int quorum = 0, odown = 0;
1763
1764 if (master->flags & SRI_S_DOWN) {
1765 /* Is down for enough sentinels? */
1766 quorum = 1; /* the current sentinel. */
1767 /* Count all the other sentinels. */
1768 di = dictGetIterator(master->sentinels);
1769 while((de = dictNext(di)) != NULL) {
1770 sentinelRedisInstance *ri = dictGetVal(de);
1771
1772 if (ri->flags & SRI_MASTER_DOWN) quorum++;
1773 }
1774 dictReleaseIterator(di);
1775 if (quorum >= master->quorum) odown = 1;
1776 }
1777
1778 /* Set the flag accordingly to the outcome. */
1779 if (odown) {
1780 if ((master->flags & SRI_O_DOWN) == 0) {
1781 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
1782 quorum, master->quorum);
1783 master->flags |= SRI_O_DOWN;
1784 master->o_down_since_time = mstime();
1785 }
1786 } else {
1787 if (master->flags & SRI_O_DOWN) {
1788 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
1789 master->flags &= ~SRI_O_DOWN;
1790 }
1791 }
1792}
1793
1794/* Receive the SENTINEL is-master-down-by-addr reply, see the
1795 * sentinelAskMasterStateToOtherSentinels() function for more information. */
1796void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
1797 sentinelRedisInstance *ri = c->data;
1798 redisReply *r;
1799
1800 if (ri) ri->pending_commands--;
1801 if (!reply || !ri) return;
1802 r = reply;
1803
1804 /* Ignore every error or unexpected reply.
1805 * Note that if the command returns an error for any reason we'll
1806 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
1807 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
1808 r->element[0]->type == REDIS_REPLY_INTEGER &&
1809 r->element[1]->type == REDIS_REPLY_STRING)
1810 {
1811 ri->last_master_down_reply_time = mstime();
1812 if (r->element[0]->integer == 1) {
1813 ri->flags |= SRI_MASTER_DOWN;
1814 } else {
1815 ri->flags &= ~SRI_MASTER_DOWN;
1816 }
1817 sdsfree(ri->leader);
1818 ri->leader = sdsnew(r->element[1]->str);
1819 }
1820}
1821
1822/* If we think (subjectively) the master is down, we start sending
1823 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
1824 * in order to get the replies that allow to reach the quorum and
1825 * possibly also mark the master as objectively down. */
1826void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
1827 dictIterator *di;
1828 dictEntry *de;
1829
1830 di = dictGetIterator(master->sentinels);
1831 while((de = dictNext(di)) != NULL) {
1832 sentinelRedisInstance *ri = dictGetVal(de);
1833 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
1834 char port[32];
1835 int retval;
1836
1837 /* If the master state from other sentinel is too old, we clear it. */
1838 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
1839 ri->flags &= ~SRI_MASTER_DOWN;
1840 sdsfree(ri->leader);
1841 ri->leader = NULL;
1842 }
1843
1844 /* Only ask if master is down to other sentinels if:
1845 *
1846 * 1) We believe it is down, or there is a failover in progress.
1847 * 2) Sentinel is connected.
1848 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
1849 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
1850 continue;
1851 if (ri->flags & SRI_DISCONNECTED) continue;
1852 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
1853 continue;
1854
1855 /* Ask */
1856 ll2string(port,sizeof(port),master->addr->port);
1857 retval = redisAsyncCommand(ri->cc,
1858 sentinelReceiveIsMasterDownReply, NULL,
1859 "SENTINEL is-master-down-by-addr %s %s",
1860 master->addr->ip, port);
1861 if (retval == REDIS_OK) ri->pending_commands++;
1862 }
1863 dictReleaseIterator(di);
1864}
1865
1866/* =============================== FAILOVER ================================= */
1867
1868/* Given a master get the "subjective leader", that is, among all the sentinels
1869 * with given characteristics, the one with the lexicographically smaller
1870 * runid. The characteristics required are:
1871 *
1872 * 1) Has SRI_CAN_FAILOVER flag.
1873 * 2) Is not disconnected.
1874 * 3) Recently answered to our ping (no longer than
1875 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
1876 *
1877 * The function returns a pointer to an sds string representing the runid of the
1878 * leader sentinel instance (from our point of view). Otherwise NULL is
1879 * returned if there are no suitable sentinels.
1880 */
1881
1882int compareRunID(const void *a, const void *b) {
1883 char **aptrptr = (char**)a, **bptrptr = (char**)b;
1884 return strcasecmp(*aptrptr, *bptrptr);
1885}
1886
1887char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
1888 dictIterator *di;
1889 dictEntry *de;
1890 char **instance =
1891 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
1892 int instances = 0;
1893 char *leader = NULL;
1894
1895 if (master->flags & SRI_CAN_FAILOVER) {
1896 /* Add myself if I'm a Sentinel that can failover this master. */
1897 instance[instances++] = server.runid;
1898 }
1899
1900 di = dictGetIterator(master->sentinels);
1901 while((de = dictNext(di)) != NULL) {
1902 sentinelRedisInstance *ri = dictGetVal(de);
1903 mstime_t lag = mstime() - ri->last_avail_time;
1904
1905 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
1906 !(ri->flags & SRI_CAN_FAILOVER) ||
1907 (ri->flags & SRI_DISCONNECTED) ||
1908 ri->runid == NULL)
1909 continue;
1910 instance[instances++] = ri->runid;
1911 }
1912 dictReleaseIterator(di);
1913
1914 /* If we have at least one instance passing our checks, order the array
1915 * by runid. */
1916 if (instances) {
1917 qsort(instance,instances,sizeof(char*),compareRunID);
1918 leader = sdsnew(instance[0]);
1919 }
1920 zfree(instance);
1921 return leader;
1922}
1923
1924struct sentinelLeader {
1925 char *runid;
1926 unsigned long votes;
1927};
1928
1929/* Helper function for sentinelGetObjectiveLeader, increment the counter
1930 * relative to the specified runid. */
1931void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
1932 dictEntry *de = dictFind(counters,runid);
1933 uint64_t oldval;
1934
1935 if (de) {
1936 oldval = dictGetUnsignedIntegerVal(de);
1937 dictSetUnsignedIntegerVal(de,oldval+1);
1938 } else {
1939 de = dictAddRaw(counters,runid);
1940 redisAssert(de != NULL);
1941 dictSetUnsignedIntegerVal(de,1);
1942 }
1943}
1944
1945/* Scan all the Sentinels attached to this master to check what is the
1946 * most voted leader among Sentinels. */
1947char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
1948 dict *counters;
1949 dictIterator *di;
1950 dictEntry *de;
1951 unsigned int voters = 0, voters_quorum;
1952 char *myvote;
1953 char *winner = NULL;
1954
1955 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
1956 counters = dictCreate(&leaderVotesDictType,NULL);
1957
1958 /* Count my vote. */
1959 myvote = sentinelGetSubjectiveLeader(master);
1960 if (myvote) {
1961 sentinelObjectiveLeaderIncr(counters,myvote);
1962 voters++;
1963 }
1964
1965 /* Count other sentinels votes */
1966 di = dictGetIterator(master->sentinels);
1967 while((de = dictNext(di)) != NULL) {
1968 sentinelRedisInstance *ri = dictGetVal(de);
1969 if (ri->leader == NULL) continue;
1970 /* If the failover is not already in progress we are only interested
1971 * in Sentinels that believe the master is down. Otherwise the leader
1972 * selection is useful for the "failover-takedown" when the original
1973 * leader fails. In that case we consider all the voters. */
1974 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1975 !(ri->flags & SRI_MASTER_DOWN)) continue;
1976 sentinelObjectiveLeaderIncr(counters,ri->leader);
1977 voters++;
1978 }
1979 dictReleaseIterator(di);
1980 voters_quorum = voters/2+1;
1981
1982 /* Check what's the winner. For the winner to win, it needs two conditions:
1983 * 1) Absolute majority between voters (50% + 1).
1984 * 2) And anyway at least master->quorum votes. */
1985 {
1986 uint64_t max_votes = 0; /* Max votes so far. */
1987
1988 di = dictGetIterator(counters);
1989 while((de = dictNext(di)) != NULL) {
1990 uint64_t votes = dictGetUnsignedIntegerVal(de);
1991
1992 if (max_votes < votes) {
1993 max_votes = votes;
1994 winner = dictGetKey(de);
1995 }
1996 }
1997 dictReleaseIterator(di);
1998 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
1999 winner = NULL;
2000 }
2001 winner = winner ? sdsnew(winner) : NULL;
2002 sdsfree(myvote);
2003 dictRelease(counters);
2004 return winner;
2005}
2006
2007/* This function checks if there are the conditions to start the failover,
2008 * that is:
2009 *
2010 * 1) Enough time has passed since O_DOWN.
2011 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
2012 * 3) We are the objectively leader for this master.
2013 *
2014 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
2015 * and SRI_I_AM_THE_LEADER.
2016 */
2017void sentinelStartFailover(sentinelRedisInstance *master) {
2018 char *leader;
2019 int isleader;
2020
2021 /* We can't failover if the master is not in O_DOWN state or if
2022 * there is not already a failover in progress (to perform the
2023 * takedown if the leader died) or if this Sentinel is not allowed
2024 * to start a failover. */
2025 if (!(master->flags & SRI_CAN_FAILOVER) ||
2026 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
2027
2028 leader = sentinelGetObjectiveLeader(master);
2029 isleader = leader && strcasecmp(leader,server.runid) == 0;
2030 sdsfree(leader);
2031
2032 /* If I'm not the leader, I can't failover for sure. */
2033 if (!isleader) return;
2034
2035 /* If the failover is already in progress there are two options... */
2036 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
2037 if (master->flags & SRI_I_AM_THE_LEADER) {
2038 /* 1) I'm flagged as leader so I already started the failover.
2039 * Just return. */
2040 return;
2041 } else {
2042 mstime_t elapsed = mstime() - master->failover_state_change_time;
2043
2044 /* 2) I'm the new leader, but I'm not flagged as leader in the
2045 * master: I did not started the failover, but the original
2046 * leader has no longer the leadership.
2047 *
2048 * In this case if the failover appears to be lagging
2049 * for at least 25% of the configured failover timeout,
2050 * I can assume I can take control. Otherwise
2051 * it's better to return and wait more. */
2052 if (elapsed < (master->failover_timeout/4)) return;
2053 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
2054 /* We have already an elected slave if we are in
2055 * FAILOVER_IN_PROGRESS state, that is, the slave that we
2056 * observed turning into a master. */
2057 master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
2058 /* As an observer we flagged all the slaves as RECONF_SENT but
2059 * now we are in charge of actually sending the reconfiguration
2060 * command so let's clear this flag for all the instances. */
2061 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
2062 SRI_RECONF_SENT);
2063 }
2064 } else {
2065 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set.
2066 *
2067 * Do we have a slave to promote? Otherwise don't start a failover
2068 * at all. */
2069 if (sentinelSelectSlave(master) == NULL) return;
2070 master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
2071 }
2072
2073 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
2074 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
2075
2076 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
2077 * a recovery of a failover started by another sentinel. */
2078 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
2079 master->failover_start_time = mstime() +
2080 SENTINEL_FAILOVER_FIXED_DELAY +
2081 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
2082 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
2083 "%@ #starting in %lld milliseconds",
2084 master->failover_start_time-mstime());
2085 }
2086 master->failover_state_change_time = mstime();
2087}
2088
2089/* Select a suitable slave to promote. The current algorithm only uses
2090 * the following parameters:
2091 *
2092 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
2093 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
2094 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
2095 * 4) master_link_down_time no more than:
2096 * (now - master->s_down_since_time) + (master->down_after_period * 10).
2097 *
2098 * Among all the slaves matching the above conditions we select the slave
2099 * with lower slave_priority. If priority is the same we select the slave
2100 * with lexicographically smaller runid.
2101 *
2102 * The function returns the pointer to the selected slave, otherwise
2103 * NULL if no suitable slave was found.
2104 */
2105
2106int compareSlavesForPromotion(const void *a, const void *b) {
2107 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
2108 **sb = (sentinelRedisInstance **)b;
2109 if ((*sa)->slave_priority != (*sb)->slave_priority)
2110 return (*sa)->slave_priority - (*sb)->slave_priority;
2111 return strcasecmp((*sa)->runid,(*sb)->runid);
2112}
2113
2114sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
2115 sentinelRedisInstance **instance =
2116 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
2117 sentinelRedisInstance *selected = NULL;
2118 int instances = 0;
2119 dictIterator *di;
2120 dictEntry *de;
2121 mstime_t max_master_down_time;
2122
2123 max_master_down_time = (mstime() - master->s_down_since_time) +
2124 (master->down_after_period * 10);
2125
2126 di = dictGetIterator(master->slaves);
2127 while((de = dictNext(di)) != NULL) {
2128 sentinelRedisInstance *slave = dictGetVal(de);
2129 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2130
2131 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2132 if (slave->last_avail_time < info_validity_time) continue;
2133 if (slave->info_refresh < info_validity_time) continue;
2134 if (slave->master_link_down_time > max_master_down_time) continue;
2135 instance[instances++] = slave;
2136 }
2137 dictReleaseIterator(di);
2138 if (instances) {
2139 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2140 compareSlavesForPromotion);
2141 selected = instance[0];
2142 }
2143 zfree(instance);
2144 return selected;
2145}
2146
2147/* ---------------- Failover state machine implementation ------------------- */
2148void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2149 if (mstime() >= ri->failover_start_time) {
2150 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2151 ri->failover_state_change_time = mstime();
2152 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2153 }
2154}
2155
2156void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2157 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2158
2159 if (slave == NULL) {
2160 sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
2161 sentinelAbortFailover(ri);
2162 } else {
2163 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2164 slave->flags |= SRI_PROMOTED;
2165 ri->promoted_slave = slave;
2166 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2167 ri->failover_state_change_time = mstime();
2168 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2169 slave, "%@");
2170 }
2171}
2172
2173void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2174 int retval;
2175
2176 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2177
2178 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2179 * We actually register a generic callback for this command as we don't
2180 * really care about the reply. We check if it worked indirectly observing
2181 * if INFO returns a different role (master instead of slave). */
2182 retval = redisAsyncCommand(ri->promoted_slave->cc,
2183 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2184 if (retval != REDIS_OK) return;
2185 ri->promoted_slave->pending_commands++;
2186 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2187 ri->promoted_slave,"%@");
2188 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2189 ri->failover_state_change_time = mstime();
2190}
2191
2192/* We actually wait for promotion indirectly checking with INFO when the
2193 * slave turns into a master. */
2194void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2195 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2196
2197 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2198 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2199 "%@");
2200 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2201 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2202 ri->failover_state_change_time = mstime();
2203 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2204 ri->promoted_slave = NULL;
2205 }
2206}
2207
2208void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2209 int not_reconfigured = 0, timeout = 0;
2210 dictIterator *di;
2211 dictEntry *de;
2212 mstime_t elapsed = mstime() - master->failover_state_change_time;
2213
2214 /* We can't consider failover finished if the promoted slave is
2215 * not reachable. */
2216 if (master->promoted_slave == NULL ||
2217 master->promoted_slave->flags & SRI_S_DOWN) return;
2218
2219 /* The failover terminates once all the reachable slaves are properly
2220 * configured. */
2221 di = dictGetIterator(master->slaves);
2222 while((de = dictNext(di)) != NULL) {
2223 sentinelRedisInstance *slave = dictGetVal(de);
2224
2225 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2226 if (slave->flags & SRI_S_DOWN) continue;
2227 not_reconfigured++;
2228 }
2229 dictReleaseIterator(di);
2230
2231 /* Force end of failover on timeout. */
2232 if (elapsed > master->failover_timeout) {
2233 not_reconfigured = 0;
2234 timeout = 1;
2235 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2236 }
2237
2238 if (not_reconfigured == 0) {
2239 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2240 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2241 master->failover_state_change_time = mstime();
2242 }
2243
2244 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2245 * command to all the slaves still not reconfigured to replicate with
2246 * the new master. */
2247 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2248 dictIterator *di;
2249 dictEntry *de;
2250 char master_port[32];
2251
2252 ll2string(master_port,sizeof(master_port),
2253 master->promoted_slave->addr->port);
2254
2255 di = dictGetIterator(master->slaves);
2256 while((de = dictNext(di)) != NULL) {
2257 sentinelRedisInstance *slave = dictGetVal(de);
2258 int retval;
2259
2260 if (slave->flags &
2261 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2262
2263 retval = redisAsyncCommand(slave->cc,
2264 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2265 master->promoted_slave->addr->ip,
2266 master_port);
2267 if (retval == REDIS_OK) {
2268 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2269 slave->flags |= SRI_RECONF_SENT;
2270 }
2271 }
2272 dictReleaseIterator(di);
2273 }
2274}
2275
2276/* Send SLAVE OF <new master address> to all the remaining slaves that
2277 * still don't appear to have the configuration updated. */
2278void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2279 dictIterator *di;
2280 dictEntry *de;
2281 int in_progress = 0;
2282
2283 di = dictGetIterator(master->slaves);
2284 while((de = dictNext(di)) != NULL) {
2285 sentinelRedisInstance *slave = dictGetVal(de);
2286
2287 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2288 in_progress++;
2289 }
2290 dictReleaseIterator(di);
2291
2292 di = dictGetIterator(master->slaves);
2293 while(in_progress < master->parallel_syncs &&
2294 (de = dictNext(di)) != NULL)
2295 {
2296 sentinelRedisInstance *slave = dictGetVal(de);
2297 int retval;
2298 char master_port[32];
2299
2300 /* Skip the promoted slave, and already configured slaves. */
2301 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2302
2303 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2304 * the slave moving forward to the next state. */
2305 if ((slave->flags & SRI_RECONF_SENT) &&
2306 (mstime() - slave->slave_reconf_sent_time) >
2307 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2308 {
2309 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2310 slave->flags &= ~SRI_RECONF_SENT;
2311 }
2312
2313 /* Nothing to do for instances that are disconnected or already
2314 * in RECONF_SENT state. */
2315 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2316 continue;
2317
2318 /* Send SLAVEOF <new master>. */
2319 ll2string(master_port,sizeof(master_port),
2320 master->promoted_slave->addr->port);
2321 retval = redisAsyncCommand(slave->cc,
2322 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2323 master->promoted_slave->addr->ip,
2324 master_port);
2325 if (retval == REDIS_OK) {
2326 slave->flags |= SRI_RECONF_SENT;
2327 slave->pending_commands++;
2328 slave->slave_reconf_sent_time = mstime();
2329 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2330 in_progress++;
2331 }
2332 }
2333 dictReleaseIterator(di);
2334 sentinelFailoverDetectEnd(master);
2335}
2336
2337/* This function is called when the slave is in
2338 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2339 * to remove it from the master table and add the promoted slave instead.
2340 *
2341 * If there are no promoted slaves as this instance is unique, we remove
2342 * and re-add it with the same address to trigger a complete state
2343 * refresh. */
2344void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2345 sentinelRedisInstance *ref = master->promoted_slave ?
2346 master->promoted_slave : master;
2347
2348 sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
2349 master->name, master->addr->ip, master->addr->port,
2350 ref->addr->ip, ref->addr->port);
2351
2352 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
2353}
2354
2355void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2356 redisAssert(ri->flags & SRI_MASTER);
2357
2358 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2359
2360 switch(ri->failover_state) {
2361 case SENTINEL_FAILOVER_STATE_WAIT_START:
2362 sentinelFailoverWaitStart(ri);
2363 break;
2364 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2365 sentinelFailoverSelectSlave(ri);
2366 break;
2367 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2368 sentinelFailoverSendSlaveOfNoOne(ri);
2369 break;
2370 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2371 sentinelFailoverWaitPromotion(ri);
2372 break;
2373 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2374 sentinelFailoverReconfNextSlave(ri);
2375 break;
2376 case SENTINEL_FAILOVER_STATE_DETECT_END:
2377 sentinelFailoverDetectEnd(ri);
2378 break;
2379 }
2380}
2381
2382/* Abort a failover in progress with the following steps:
2383 * 1) If this instance is the leaer send a SLAVEOF command to all the already
2384 * reconfigured slaves if any to configure them to replicate with the
2385 * original master.
2386 * 2) For both leaders and observers: clear the failover flags and state in
2387 * the master instance.
2388 * 3) If there is already a promoted slave and we are the leader, and this
2389 * slave is not DISCONNECTED, try to reconfigure it to replicate
2390 * back to the master as well, sending a best effort SLAVEOF command.
2391 */
2392void sentinelAbortFailover(sentinelRedisInstance *ri) {
2393 char master_port[32];
2394 dictIterator *di;
2395 dictEntry *de;
2396
2397 redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
2398 ll2string(master_port,sizeof(master_port),ri->addr->port);
2399
2400 /* Clear failover related flags from slaves.
2401 * Also if we are the leader make sure to send SLAVEOF commands to all the
2402 * already reconfigured slaves in order to turn them back into slaves of
2403 * the original master. */
2404 di = dictGetIterator(ri->slaves);
2405 while((de = dictNext(di)) != NULL) {
2406 sentinelRedisInstance *slave = dictGetVal(de);
2407 if ((ri->flags & SRI_I_AM_THE_LEADER) &&
2408 !(slave->flags & SRI_DISCONNECTED) &&
2409 (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
2410 SRI_RECONF_DONE)))
2411 {
2412 int retval;
2413
2414 retval = redisAsyncCommand(slave->cc,
2415 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2416 ri->addr->ip,
2417 master_port);
2418 if (retval == REDIS_OK)
2419 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2420 }
2421 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2422 }
2423 dictReleaseIterator(di);
2424
2425 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER);
2426 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2427 ri->failover_state_change_time = mstime();
2428 if (ri->promoted_slave) {
2429 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2430 ri->promoted_slave = NULL;
2431 }
2432}
2433
2434/* The following is called only for master instances and will abort the
2435 * failover process if:
2436 *
2437 * 1) The failover is in progress.
2438 * 2) We already promoted a slave.
2439 * 3) The promoted slave is in extended SDOWN condition.
2440 */
2441void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2442 /* Failover is in progress? Do we have a promoted slave? */
2443 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2444
2445 /* Is the promoted slave into an extended SDOWN state? */
2446 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2447 (mstime() - ri->promoted_slave->s_down_since_time) <
2448 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2449
2450 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2451 sentinelAbortFailover(ri);
2452}
2453
2454/* ======================== SENTINEL timer handler ==========================
2455 * This is the "main" our Sentinel, being sentinel completely non blocking
2456 * in design. The function is called every second.
2457 * -------------------------------------------------------------------------- */
2458
2459/* Perform scheduled operations for the specified Redis instance. */
2460void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2461 /* ========== MONITORING HALF ============ */
2462 /* Every kind of instance */
2463 sentinelReconnectInstance(ri);
2464 sentinelPingInstance(ri);
2465
2466 /* Masters and slaves */
2467 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2468 /* Nothing so far. */
2469 }
2470
2471 /* Only masters */
2472 if (ri->flags & SRI_MASTER) {
2473 sentinelAskMasterStateToOtherSentinels(ri);
2474 }
2475
2476 /* ============== ACTING HALF ============= */
2477 /* We don't proceed with the acting half if we are in TILT mode.
2478 * TILT happens when we find something odd with the time, like a
2479 * sudden change in the clock. */
2480 if (sentinel.tilt) {
2481 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2482 sentinel.tilt = 0;
2483 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2484 }
2485
2486 /* Every kind of instance */
2487 sentinelCheckSubjectivelyDown(ri);
2488
2489 /* Masters and slaves */
2490 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2491 /* Nothing so far. */
2492 }
2493
2494 /* Only masters */
2495 if (ri->flags & SRI_MASTER) {
2496 sentinelCheckObjectivelyDown(ri);
2497 sentinelStartFailover(ri);
2498 sentinelFailoverStateMachine(ri);
2499 sentinelAbortFailoverIfNeeded(ri);
2500 }
2501}
2502
2503/* Perform scheduled operations for all the instances in the dictionary.
2504 * Recursively call the function against dictionaries of slaves. */
2505void sentinelHandleDictOfRedisInstances(dict *instances) {
2506 dictIterator *di;
2507 dictEntry *de;
2508 sentinelRedisInstance *switch_to_promoted = NULL;
2509
2510 /* There are a number of things we need to perform against every master. */
2511 di = dictGetIterator(instances);
2512 while((de = dictNext(di)) != NULL) {
2513 sentinelRedisInstance *ri = dictGetVal(de);
2514
2515 sentinelHandleRedisInstance(ri);
2516 if (ri->flags & SRI_MASTER) {
2517 sentinelHandleDictOfRedisInstances(ri->slaves);
2518 sentinelHandleDictOfRedisInstances(ri->sentinels);
2519 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
2520 switch_to_promoted = ri;
2521 }
2522 }
2523 }
2524 if (switch_to_promoted)
2525 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
2526 dictReleaseIterator(di);
2527}
2528
2529/* This function checks if we need to enter the TITL mode.
2530 *
2531 * The TILT mode is entered if we detect that between two invocations of the
2532 * timer interrupt, a negative amount of time, or too much time has passed.
2533 * Note that we expect that more or less just 100 milliseconds will pass
2534 * if everything is fine. However we'll see a negative number or a
2535 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
2536 * following conditions happen:
2537 *
2538 * 1) The Sentiel process for some time is blocked, for every kind of
2539 * random reason: the load is huge, the computer was freezed for some time
2540 * in I/O or alike, the process was stopped by a signal. Everything.
2541 * 2) The system clock was altered significantly.
2542 *
2543 * Under both this conditions we'll see everything as timed out and failing
2544 * without good reasons. Instead we enter the TILT mode and wait
2545 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
2546 *
2547 * During TILT time we still collect information, we just do not act. */
2548void sentinelCheckTiltCondition(void) {
2549 mstime_t now = mstime();
2550 mstime_t delta = now - sentinel.previous_time;
2551
2552 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
2553 sentinel.tilt = 1;
2554 sentinel.tilt_start_time = mstime();
2555 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
2556 }
2557 sentinel.previous_time = mstime();
2558}
2559
2560/* Handle terminated childs resulting from calls to notifications and client
2561 * reconfigurations scripts. */
2562void sentinelHandleChildren(void) {
2563 int statloc;
2564 pid_t pid;
2565
2566 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
2567 int exitcode = WEXITSTATUS(statloc);
2568 int bysignal = 0;
2569
2570 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
2571 sentinelEvent(REDIS_DEBUG,"-child",NULL,"%ld %d %d",
2572 (long)pid, exitcode, bysignal);
2573
2574 /* TODO: remove client reconfiguration scripts from the queue. */
2575 }
2576}
2577
2578void sentinelTimer(void) {
2579 sentinelCheckTiltCondition();
2580 sentinelHandleDictOfRedisInstances(sentinel.masters);
2581 sentinelHandleChildren();
2582}
2583