]> git.saurik.com Git - redis.git/blame_incremental - src/sentinel.c
Merge pull request #628 from pietern/unstable-zip
[redis.git] / src / sentinel.c
... / ...
CommitLineData
1/* Redis Sentinel implementation
2 * -----------------------------
3 *
4 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32#include "redis.h"
33#include "hiredis.h"
34#include "async.h"
35
36#include <ctype.h>
37#include <arpa/inet.h>
38#include <sys/socket.h>
39#include <sys/wait.h>
40
41extern char **environ;
42
43#define REDIS_SENTINEL_PORT 26379
44
45/* ======================== Sentinel global state =========================== */
46
47typedef long long mstime_t; /* millisecond time type. */
48
49/* Address object, used to describe an ip:port pair. */
50typedef struct sentinelAddr {
51 char *ip;
52 int port;
53} sentinelAddr;
54
55/* A Sentinel Redis Instance object is monitoring. */
56#define SRI_MASTER (1<<0)
57#define SRI_SLAVE (1<<1)
58#define SRI_SENTINEL (1<<2)
59#define SRI_DISCONNECTED (1<<3)
60#define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
61#define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
62#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
63 its master is down. */
64/* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
65 * allowed to perform the failover for this master.
66 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
67 * perform the failover on its master. */
68#define SRI_CAN_FAILOVER (1<<7)
69#define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
70 this master. */
71#define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
72#define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
73#define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
74#define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
75#define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
76#define SRI_FORCE_FAILOVER (1<<14) /* Force failover with master up. */
77
78#define SENTINEL_INFO_PERIOD 10000
79#define SENTINEL_PING_PERIOD 1000
80#define SENTINEL_ASK_PERIOD 1000
81#define SENTINEL_PUBLISH_PERIOD 5000
82#define SENTINEL_DOWN_AFTER_PERIOD 30000
83#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
84#define SENTINEL_TILT_TRIGGER 2000
85#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
86#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
87#define SENTINEL_PROMOTION_RETRY_PERIOD 30000
88#define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
89#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
90#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
91#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
92#define SENTINEL_MAX_PENDING_COMMANDS 100
93#define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
94
95/* How many milliseconds is an information valid? This applies for instance
96 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
97#define SENTINEL_INFO_VALIDITY_TIME 5000
98#define SENTINEL_FAILOVER_FIXED_DELAY 5000
99#define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
100
101/* Failover machine different states. */
102#define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
103#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
104#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
105#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
106#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
107#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
108#define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
109#define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
110#define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
111#define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
112#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
113
114#define SENTINEL_MASTER_LINK_STATUS_UP 0
115#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
116
117/* Generic flags that can be used with different functions. */
118#define SENTINEL_NO_FLAGS 0
119#define SENTINEL_GENERATE_EVENT 1
120#define SENTINEL_LEADER 2
121#define SENTINEL_OBSERVER 4
122
123/* Script execution flags and limits. */
124#define SENTINEL_SCRIPT_NONE 0
125#define SENTINEL_SCRIPT_RUNNING 1
126#define SENTINEL_SCRIPT_MAX_QUEUE 256
127#define SENTINEL_SCRIPT_MAX_RUNNING 16
128#define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
129#define SENTINEL_SCRIPT_MAX_RETRY 10
130#define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
131
132typedef struct sentinelRedisInstance {
133 int flags; /* See SRI_... defines */
134 char *name; /* Master name from the point of view of this sentinel. */
135 char *runid; /* run ID of this instance. */
136 sentinelAddr *addr; /* Master host. */
137 redisAsyncContext *cc; /* Hiredis context for commands. */
138 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
139 int pending_commands; /* Number of commands sent waiting for a reply. */
140 mstime_t cc_conn_time; /* cc connection time. */
141 mstime_t pc_conn_time; /* pc connection time. */
142 mstime_t pc_last_activity; /* Last time we received any message. */
143 mstime_t last_avail_time; /* Last time the instance replied to ping with
144 a reply we consider valid. */
145 mstime_t last_pong_time; /* Last time the instance replied to ping,
146 whatever the reply was. That's used to check
147 if the link is idle and must be reconnected. */
148 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
149 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
150 we received an hello from this Sentinel
151 via Pub/Sub. */
152 mstime_t last_master_down_reply_time; /* Time of last reply to
153 SENTINEL is-master-down command. */
154 mstime_t s_down_since_time; /* Subjectively down since time. */
155 mstime_t o_down_since_time; /* Objectively down since time. */
156 mstime_t down_after_period; /* Consider it down after that period. */
157 mstime_t info_refresh; /* Time at which we received INFO output from it. */
158
159 /* Master specific. */
160 dict *sentinels; /* Other sentinels monitoring the same master. */
161 dict *slaves; /* Slaves for this master instance. */
162 int quorum; /* Number of sentinels that need to agree on failure. */
163 int parallel_syncs; /* How many slaves to reconfigure at same time. */
164
165 /* Slave specific. */
166 mstime_t master_link_down_time; /* Slave replication link down time. */
167 int slave_priority; /* Slave priority according to its INFO output. */
168 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
169 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
170 char *slave_master_host; /* Master host as reported by INFO */
171 int slave_master_port; /* Master port as reported by INFO */
172 int slave_master_link_status; /* Master link status as reported by INFO */
173 /* Failover */
174 char *leader; /* If this is a master instance, this is the runid of
175 the Sentinel that should perform the failover. If
176 this is a Sentinel, this is the runid of the Sentinel
177 that this other Sentinel is voting as leader.
178 This field is valid only if SRI_MASTER_DOWN is
179 set on the Sentinel instance. */
180 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
181 mstime_t failover_state_change_time;
182 mstime_t failover_start_time; /* When to start to failover if leader. */
183 mstime_t failover_timeout; /* Max time to refresh failover state. */
184 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
185 /* Scripts executed to notify admin or reconfigure clients: when they
186 * are set to NULL no script is executed. */
187 char *notification_script;
188 char *client_reconfig_script;
189} sentinelRedisInstance;
190
191/* Main state. */
192struct sentinelState {
193 dict *masters; /* Dictionary of master sentinelRedisInstances.
194 Key is the instance name, value is the
195 sentinelRedisInstance structure pointer. */
196 int tilt; /* Are we in TILT mode? */
197 int running_scripts; /* Number of scripts in execution right now. */
198 mstime_t tilt_start_time; /* When TITL started. */
199 mstime_t previous_time; /* Time last time we ran the time handler. */
200 list *scripts_queue; /* Queue of user scripts to execute. */
201} sentinel;
202
203/* A script execution job. */
204typedef struct sentinelScriptJob {
205 int flags; /* Script job flags: SENTINEL_SCRIPT_* */
206 int retry_num; /* Number of times we tried to execute it. */
207 char **argv; /* Arguments to call the script. */
208 mstime_t start_time; /* Script execution time if the script is running,
209 otherwise 0 if we are allowed to retry the
210 execution at any time. If the script is not
211 running and it's not 0, it means: do not run
212 before the specified time. */
213 pid_t pid; /* Script execution pid. */
214} sentinelScriptJob;
215
216/* ======================= hiredis ae.c adapters =============================
217 * Note: this implementation is taken from hiredis/adapters/ae.h, however
218 * we have our modified copy for Sentinel in order to use our allocator
219 * and to have full control over how the adapter works. */
220
221typedef struct redisAeEvents {
222 redisAsyncContext *context;
223 aeEventLoop *loop;
224 int fd;
225 int reading, writing;
226} redisAeEvents;
227
228static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
229 ((void)el); ((void)fd); ((void)mask);
230
231 redisAeEvents *e = (redisAeEvents*)privdata;
232 redisAsyncHandleRead(e->context);
233}
234
235static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
236 ((void)el); ((void)fd); ((void)mask);
237
238 redisAeEvents *e = (redisAeEvents*)privdata;
239 redisAsyncHandleWrite(e->context);
240}
241
242static void redisAeAddRead(void *privdata) {
243 redisAeEvents *e = (redisAeEvents*)privdata;
244 aeEventLoop *loop = e->loop;
245 if (!e->reading) {
246 e->reading = 1;
247 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
248 }
249}
250
251static void redisAeDelRead(void *privdata) {
252 redisAeEvents *e = (redisAeEvents*)privdata;
253 aeEventLoop *loop = e->loop;
254 if (e->reading) {
255 e->reading = 0;
256 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
257 }
258}
259
260static void redisAeAddWrite(void *privdata) {
261 redisAeEvents *e = (redisAeEvents*)privdata;
262 aeEventLoop *loop = e->loop;
263 if (!e->writing) {
264 e->writing = 1;
265 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
266 }
267}
268
269static void redisAeDelWrite(void *privdata) {
270 redisAeEvents *e = (redisAeEvents*)privdata;
271 aeEventLoop *loop = e->loop;
272 if (e->writing) {
273 e->writing = 0;
274 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
275 }
276}
277
278static void redisAeCleanup(void *privdata) {
279 redisAeEvents *e = (redisAeEvents*)privdata;
280 redisAeDelRead(privdata);
281 redisAeDelWrite(privdata);
282 zfree(e);
283}
284
285static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
286 redisContext *c = &(ac->c);
287 redisAeEvents *e;
288
289 /* Nothing should be attached when something is already attached */
290 if (ac->ev.data != NULL)
291 return REDIS_ERR;
292
293 /* Create container for context and r/w events */
294 e = (redisAeEvents*)zmalloc(sizeof(*e));
295 e->context = ac;
296 e->loop = loop;
297 e->fd = c->fd;
298 e->reading = e->writing = 0;
299
300 /* Register functions to start/stop listening for events */
301 ac->ev.addRead = redisAeAddRead;
302 ac->ev.delRead = redisAeDelRead;
303 ac->ev.addWrite = redisAeAddWrite;
304 ac->ev.delWrite = redisAeDelWrite;
305 ac->ev.cleanup = redisAeCleanup;
306 ac->ev.data = e;
307
308 return REDIS_OK;
309}
310
311/* ============================= Prototypes ================================= */
312
313void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
314void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
315void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
316sentinelRedisInstance *sentinelGetMasterByName(char *name);
317char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
318char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
319int yesnotoi(char *s);
320void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
321void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
322const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
323void sentinelAbortFailover(sentinelRedisInstance *ri);
324void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
325sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
326void sentinelScheduleScriptExecution(char *path, ...);
327void sentinelStartFailover(sentinelRedisInstance *master, int state);
328
329/* ========================= Dictionary types =============================== */
330
331unsigned int dictSdsHash(const void *key);
332int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
333void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
334
335void dictInstancesValDestructor (void *privdata, void *obj) {
336 releaseSentinelRedisInstance(obj);
337}
338
339/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
340 *
341 * also used for: sentinelRedisInstance->sentinels dictionary that maps
342 * sentinels ip:port to last seen time in Pub/Sub hello message. */
343dictType instancesDictType = {
344 dictSdsHash, /* hash function */
345 NULL, /* key dup */
346 NULL, /* val dup */
347 dictSdsKeyCompare, /* key compare */
348 NULL, /* key destructor */
349 dictInstancesValDestructor /* val destructor */
350};
351
352/* Instance runid (sds) -> votes (long casted to void*)
353 *
354 * This is useful into sentinelGetObjectiveLeader() function in order to
355 * count the votes and understand who is the leader. */
356dictType leaderVotesDictType = {
357 dictSdsHash, /* hash function */
358 NULL, /* key dup */
359 NULL, /* val dup */
360 dictSdsKeyCompare, /* key compare */
361 NULL, /* key destructor */
362 NULL /* val destructor */
363};
364
365/* =========================== Initialization =============================== */
366
367void sentinelCommand(redisClient *c);
368
369struct redisCommand sentinelcmds[] = {
370 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
371 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
372 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
373 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
374 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
375 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}
376};
377
378/* This function overwrites a few normal Redis config default with Sentinel
379 * specific defaults. */
380void initSentinelConfig(void) {
381 server.port = REDIS_SENTINEL_PORT;
382}
383
384/* Perform the Sentinel mode initialization. */
385void initSentinel(void) {
386 int j;
387
388 /* Remove usual Redis commands from the command table, then just add
389 * the SENTINEL command. */
390 dictEmpty(server.commands);
391 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
392 int retval;
393 struct redisCommand *cmd = sentinelcmds+j;
394
395 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
396 redisAssert(retval == DICT_OK);
397 }
398
399 /* Initialize various data structures. */
400 sentinel.masters = dictCreate(&instancesDictType,NULL);
401 sentinel.tilt = 0;
402 sentinel.tilt_start_time = mstime();
403 sentinel.previous_time = mstime();
404 sentinel.running_scripts = 0;
405 sentinel.scripts_queue = listCreate();
406}
407
408/* ============================== sentinelAddr ============================== */
409
410/* Create a sentinelAddr object and return it on success.
411 * On error NULL is returned and errno is set to:
412 * ENOENT: Can't resolve the hostname.
413 * EINVAL: Invalid port number.
414 */
415sentinelAddr *createSentinelAddr(char *hostname, int port) {
416 char buf[32];
417 sentinelAddr *sa;
418
419 if (port <= 0 || port > 65535) {
420 errno = EINVAL;
421 return NULL;
422 }
423 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
424 errno = ENOENT;
425 return NULL;
426 }
427 sa = zmalloc(sizeof(*sa));
428 sa->ip = sdsnew(buf);
429 sa->port = port;
430 return sa;
431}
432
433/* Free a Sentinel address. Can't fail. */
434void releaseSentinelAddr(sentinelAddr *sa) {
435 sdsfree(sa->ip);
436 zfree(sa);
437}
438
439/* =========================== Events notification ========================== */
440
441/* Send an event to log, pub/sub, user notification script.
442 *
443 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
444 * the execution of the user notification script.
445 *
446 * 'type' is the message type, also used as a pub/sub channel name.
447 *
448 * 'ri', is the redis instance target of this event if applicable, and is
449 * used to obtain the path of the notification script to execute.
450 *
451 * The remaining arguments are printf-alike.
452 * If the format specifier starts with the two characters "%@" then ri is
453 * not NULL, and the message is prefixed with an instance identifier in the
454 * following format:
455 *
456 * <instance type> <instance name> <ip> <port>
457 *
458 * If the instance type is not master, than the additional string is
459 * added to specify the originating master:
460 *
461 * @ <master name> <master ip> <master port>
462 *
463 * Any other specifier after "%@" is processed by printf itself.
464 */
465void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
466 const char *fmt, ...) {
467 va_list ap;
468 char msg[REDIS_MAX_LOGMSG_LEN];
469 robj *channel, *payload;
470
471 /* Handle %@ */
472 if (fmt[0] == '%' && fmt[1] == '@') {
473 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
474 NULL : ri->master;
475
476 if (master) {
477 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
478 sentinelRedisInstanceTypeStr(ri),
479 ri->name, ri->addr->ip, ri->addr->port,
480 master->name, master->addr->ip, master->addr->port);
481 } else {
482 snprintf(msg, sizeof(msg), "%s %s %s %d",
483 sentinelRedisInstanceTypeStr(ri),
484 ri->name, ri->addr->ip, ri->addr->port);
485 }
486 fmt += 2;
487 } else {
488 msg[0] = '\0';
489 }
490
491 /* Use vsprintf for the rest of the formatting if any. */
492 if (fmt[0] != '\0') {
493 va_start(ap, fmt);
494 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
495 va_end(ap);
496 }
497
498 /* Log the message if the log level allows it to be logged. */
499 if (level >= server.verbosity)
500 redisLog(level,"%s %s",type,msg);
501
502 /* Publish the message via Pub/Sub if it's not a debugging one. */
503 if (level != REDIS_DEBUG) {
504 channel = createStringObject(type,strlen(type));
505 payload = createStringObject(msg,strlen(msg));
506 pubsubPublishMessage(channel,payload);
507 decrRefCount(channel);
508 decrRefCount(payload);
509 }
510
511 /* Call the notification script if applicable. */
512 if (level == REDIS_WARNING && ri != NULL) {
513 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
514 ri : ri->master;
515 if (master->notification_script) {
516 sentinelScheduleScriptExecution(master->notification_script,
517 type,msg,NULL);
518 }
519 }
520}
521
522/* ============================ script execution ============================ */
523
524/* Release a script job structure and all the associated data. */
525void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
526 int j = 0;
527
528 while(sj->argv[j]) sdsfree(sj->argv[j++]);
529 zfree(sj->argv);
530 zfree(sj);
531}
532
533#define SENTINEL_SCRIPT_MAX_ARGS 16
534void sentinelScheduleScriptExecution(char *path, ...) {
535 va_list ap;
536 char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
537 int argc = 1;
538 sentinelScriptJob *sj;
539
540 va_start(ap, path);
541 while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
542 argv[argc] = va_arg(ap,char*);
543 if (!argv[argc]) break;
544 argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
545 argc++;
546 }
547 va_end(ap);
548 argv[0] = sdsnew(path);
549
550 sj = zmalloc(sizeof(*sj));
551 sj->flags = SENTINEL_SCRIPT_NONE;
552 sj->retry_num = 0;
553 sj->argv = zmalloc(sizeof(char*)*(argc+1));
554 sj->start_time = 0;
555 sj->pid = 0;
556 memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
557
558 listAddNodeTail(sentinel.scripts_queue,sj);
559
560 /* Remove the oldest non running script if we already hit the limit. */
561 if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
562 listNode *ln;
563 listIter li;
564
565 listRewind(sentinel.scripts_queue,&li);
566 while ((ln = listNext(&li)) != NULL) {
567 sj = ln->value;
568
569 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
570 /* The first node is the oldest as we add on tail. */
571 listDelNode(sentinel.scripts_queue,ln);
572 sentinelReleaseScriptJob(sj);
573 break;
574 }
575 redisAssert(listLength(sentinel.scripts_queue) <=
576 SENTINEL_SCRIPT_MAX_QUEUE);
577 }
578}
579
580/* Lookup a script in the scripts queue via pid, and returns the list node
581 * (so that we can easily remove it from the queue if needed). */
582listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
583 listNode *ln;
584 listIter li;
585
586 listRewind(sentinel.scripts_queue,&li);
587 while ((ln = listNext(&li)) != NULL) {
588 sentinelScriptJob *sj = ln->value;
589
590 if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
591 return ln;
592 }
593 return NULL;
594}
595
596/* Run pending scripts if we are not already at max number of running
597 * scripts. */
598void sentinelRunPendingScripts(void) {
599 listNode *ln;
600 listIter li;
601 mstime_t now = mstime();
602
603 /* Find jobs that are not running and run them, from the top to the
604 * tail of the queue, so we run older jobs first. */
605 listRewind(sentinel.scripts_queue,&li);
606 while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
607 (ln = listNext(&li)) != NULL)
608 {
609 sentinelScriptJob *sj = ln->value;
610 pid_t pid;
611
612 /* Skip if already running. */
613 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
614
615 /* Skip if it's a retry, but not enough time has elapsed. */
616 if (sj->start_time && sj->start_time > now) continue;
617
618 sj->flags |= SENTINEL_SCRIPT_RUNNING;
619 sj->start_time = mstime();
620 sj->retry_num++;
621 pid = fork();
622
623 if (pid == -1) {
624 /* Parent (fork error).
625 * We report fork errors as signal 99, in order to unify the
626 * reporting with other kind of errors. */
627 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
628 "%s %d %d", sj->argv[0], 99, 0);
629 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
630 sj->pid = 0;
631 } else if (pid == 0) {
632 /* Child */
633 execve(sj->argv[0],sj->argv,environ);
634 /* If we are here an error occurred. */
635 _exit(2); /* Don't retry execution. */
636 } else {
637 sentinel.running_scripts++;
638 sj->pid = pid;
639 sentinelEvent(REDIS_DEBUG,"+script-child",NULL,"%ld",(long)pid);
640 }
641 }
642}
643
644/* How much to delay the execution of a script that we need to retry after
645 * an error?
646 *
647 * We double the retry delay for every further retry we do. So for instance
648 * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
649 * starting from the second attempt to execute the script the delays are:
650 * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
651mstime_t sentinelScriptRetryDelay(int retry_num) {
652 mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
653
654 while (retry_num-- > 1) delay *= 2;
655 return delay;
656}
657
658/* Check for scripts that terminated, and remove them from the queue if the
659 * script terminated successfully. If instead the script was terminated by
660 * a signal, or returned exit code "1", it is scheduled to run again if
661 * the max number of retries did not already elapsed. */
662void sentinelCollectTerminatedScripts(void) {
663 int statloc;
664 pid_t pid;
665
666 while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
667 int exitcode = WEXITSTATUS(statloc);
668 int bysignal = 0;
669 listNode *ln;
670 sentinelScriptJob *sj;
671
672 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
673 sentinelEvent(REDIS_DEBUG,"-script-child",NULL,"%ld %d %d",
674 (long)pid, exitcode, bysignal);
675
676 ln = sentinelGetScriptListNodeByPid(pid);
677 if (ln == NULL) {
678 redisLog(REDIS_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
679 continue;
680 }
681 sj = ln->value;
682
683 /* If the script was terminated by a signal or returns an
684 * exit code of "1" (that means: please retry), we reschedule it
685 * if the max number of retries is not already reached. */
686 if ((bysignal || exitcode == 1) &&
687 sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
688 {
689 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
690 sj->pid = 0;
691 sj->start_time = mstime() +
692 sentinelScriptRetryDelay(sj->retry_num);
693 } else {
694 /* Otherwise let's remove the script, but log the event if the
695 * execution did not terminated in the best of the ways. */
696 if (bysignal || exitcode != 0) {
697 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
698 "%s %d %d", sj->argv[0], bysignal, exitcode);
699 }
700 listDelNode(sentinel.scripts_queue,ln);
701 sentinelReleaseScriptJob(sj);
702 sentinel.running_scripts--;
703 }
704 }
705}
706
707/* Kill scripts in timeout, they'll be collected by the
708 * sentinelCollectTerminatedScripts() function. */
709void sentinelKillTimedoutScripts(void) {
710 listNode *ln;
711 listIter li;
712 mstime_t now = mstime();
713
714 listRewind(sentinel.scripts_queue,&li);
715 while ((ln = listNext(&li)) != NULL) {
716 sentinelScriptJob *sj = ln->value;
717
718 if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
719 (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
720 {
721 sentinelEvent(REDIS_WARNING,"-script-timeout",NULL,"%s %ld",
722 sj->argv[0], (long)sj->pid);
723 kill(sj->pid,SIGKILL);
724 }
725 }
726}
727
728/* Implements SENTINEL PENDING-SCRIPTS command. */
729void sentinelPendingScriptsCommand(redisClient *c) {
730 listNode *ln;
731 listIter li;
732
733 addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
734 listRewind(sentinel.scripts_queue,&li);
735 while ((ln = listNext(&li)) != NULL) {
736 sentinelScriptJob *sj = ln->value;
737 int j = 0;
738
739 addReplyMultiBulkLen(c,10);
740
741 addReplyBulkCString(c,"argv");
742 while (sj->argv[j]) j++;
743 addReplyMultiBulkLen(c,j);
744 j = 0;
745 while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
746
747 addReplyBulkCString(c,"flags");
748 addReplyBulkCString(c,
749 (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
750
751 addReplyBulkCString(c,"pid");
752 addReplyBulkLongLong(c,sj->pid);
753
754 if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
755 addReplyBulkCString(c,"run-time");
756 addReplyBulkLongLong(c,mstime() - sj->start_time);
757 } else {
758 mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
759 if (delay < 0) delay = 0;
760 addReplyBulkCString(c,"run-delay");
761 addReplyBulkLongLong(c,delay);
762 }
763
764 addReplyBulkCString(c,"retry-num");
765 addReplyBulkLongLong(c,sj->retry_num);
766 }
767}
768
769/* This function calls, if any, the client reconfiguration script with the
770 * following parameters:
771 *
772 * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
773 *
774 * It is called every time a failover starts, ends, or is aborted.
775 *
776 * <state> is "start", "end" or "abort".
777 * <role> is either "leader" or "observer".
778 *
779 * from/to fields are respectively master -> promoted slave addresses for
780 * "start" and "end", or the reverse (promoted slave -> master) in case of
781 * "abort".
782 */
783void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
784 char fromport[32], toport[32];
785
786 if (master->client_reconfig_script == NULL) return;
787 ll2string(fromport,sizeof(fromport),from->port);
788 ll2string(toport,sizeof(toport),to->port);
789 sentinelScheduleScriptExecution(master->client_reconfig_script,
790 master->name,
791 (role == SENTINEL_LEADER) ? "leader" : "observer",
792 state, from->ip, fromport, to->ip, toport);
793}
794
795/* ========================== sentinelRedisInstance ========================= */
796
797/* Create a redis instance, the following fields must be populated by the
798 * caller if needed:
799 * runid: set to NULL but will be populated once INFO output is received.
800 * info_refresh: is set to 0 to mean that we never received INFO so far.
801 *
802 * If SRI_MASTER is set into initial flags the instance is added to
803 * sentinel.masters table.
804 *
805 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
806 * instance is added into master->slaves or master->sentinels table.
807 *
808 * If the instance is a slave or sentinel, the name parameter is ignored and
809 * is created automatically as hostname:port.
810 *
811 * The function fails if hostname can't be resolved or port is out of range.
812 * When this happens NULL is returned and errno is set accordingly to the
813 * createSentinelAddr() function.
814 *
815 * The function may also fail and return NULL with errno set to EBUSY if
816 * a master or slave with the same name already exists. */
817sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
818 sentinelRedisInstance *ri;
819 sentinelAddr *addr;
820 dict *table;
821 char slavename[128], *sdsname;
822
823 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
824 redisAssert((flags & SRI_MASTER) || master != NULL);
825
826 /* Check address validity. */
827 addr = createSentinelAddr(hostname,port);
828 if (addr == NULL) return NULL;
829
830 /* For slaves and sentinel we use ip:port as name. */
831 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
832 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
833 name = slavename;
834 }
835
836 /* Make sure the entry is not duplicated. This may happen when the same
837 * name for a master is used multiple times inside the configuration or
838 * if we try to add multiple times a slave or sentinel with same ip/port
839 * to a master. */
840 if (flags & SRI_MASTER) table = sentinel.masters;
841 else if (flags & SRI_SLAVE) table = master->slaves;
842 else if (flags & SRI_SENTINEL) table = master->sentinels;
843 sdsname = sdsnew(name);
844 if (dictFind(table,sdsname)) {
845 sdsfree(sdsname);
846 errno = EBUSY;
847 return NULL;
848 }
849
850 /* Create the instance object. */
851 ri = zmalloc(sizeof(*ri));
852 /* Note that all the instances are started in the disconnected state,
853 * the event loop will take care of connecting them. */
854 ri->flags = flags | SRI_DISCONNECTED;
855 ri->name = sdsname;
856 ri->runid = NULL;
857 ri->addr = addr;
858 ri->cc = NULL;
859 ri->pc = NULL;
860 ri->pending_commands = 0;
861 ri->cc_conn_time = 0;
862 ri->pc_conn_time = 0;
863 ri->pc_last_activity = 0;
864 ri->last_avail_time = mstime();
865 ri->last_pong_time = mstime();
866 ri->last_pub_time = mstime();
867 ri->last_hello_time = mstime();
868 ri->last_master_down_reply_time = mstime();
869 ri->s_down_since_time = 0;
870 ri->o_down_since_time = 0;
871 ri->down_after_period = master ? master->down_after_period :
872 SENTINEL_DOWN_AFTER_PERIOD;
873 ri->master_link_down_time = 0;
874 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
875 ri->slave_reconf_sent_time = 0;
876 ri->slave_master_host = NULL;
877 ri->slave_master_port = 0;
878 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
879 ri->sentinels = dictCreate(&instancesDictType,NULL);
880 ri->quorum = quorum;
881 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
882 ri->master = master;
883 ri->slaves = dictCreate(&instancesDictType,NULL);
884 ri->info_refresh = 0;
885
886 /* Failover state. */
887 ri->leader = NULL;
888 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
889 ri->failover_state_change_time = 0;
890 ri->failover_start_time = 0;
891 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
892 ri->promoted_slave = NULL;
893 ri->notification_script = NULL;
894 ri->client_reconfig_script = NULL;
895
896 /* Add into the right table. */
897 dictAdd(table, ri->name, ri);
898 return ri;
899}
900
901/* Release this instance and all its slaves, sentinels, hiredis connections.
902 * This function also takes care of unlinking the instance from the main
903 * masters table (if it is a master) or from its master sentinels/slaves table
904 * if it is a slave or sentinel. */
905void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
906 /* Release all its slaves or sentinels if any. */
907 dictRelease(ri->sentinels);
908 dictRelease(ri->slaves);
909
910 /* Release hiredis connections. */
911 if (ri->cc) sentinelKillLink(ri,ri->cc);
912 if (ri->pc) sentinelKillLink(ri,ri->pc);
913
914 /* Free other resources. */
915 sdsfree(ri->name);
916 sdsfree(ri->runid);
917 sdsfree(ri->notification_script);
918 sdsfree(ri->client_reconfig_script);
919 sdsfree(ri->slave_master_host);
920 sdsfree(ri->leader);
921 releaseSentinelAddr(ri->addr);
922
923 /* Clear state into the master if needed. */
924 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
925 ri->master->promoted_slave = NULL;
926
927 zfree(ri);
928}
929
930/* Lookup a slave in a master Redis instance, by ip and port. */
931sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
932 sentinelRedisInstance *ri, char *ip, int port)
933{
934 sds key;
935 sentinelRedisInstance *slave;
936
937 redisAssert(ri->flags & SRI_MASTER);
938 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
939 slave = dictFetchValue(ri->slaves,key);
940 sdsfree(key);
941 return slave;
942}
943
944/* Return the name of the type of the instance as a string. */
945const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
946 if (ri->flags & SRI_MASTER) return "master";
947 else if (ri->flags & SRI_SLAVE) return "slave";
948 else if (ri->flags & SRI_SENTINEL) return "sentinel";
949 else return "unknown";
950}
951
952/* This function removes all the instances found in the dictionary of instances
953 * 'd', having either:
954 *
955 * 1) The same ip/port as specified.
956 * 2) The same runid.
957 *
958 * "1" and "2" don't need to verify at the same time, just one is enough.
959 * If "runid" is NULL it is not checked.
960 * Similarly if "ip" is NULL it is not checked.
961 *
962 * This function is useful because every time we add a new Sentinel into
963 * a master's Sentinels dictionary, we want to be very sure about not
964 * having duplicated instances for any reason. This is so important because
965 * we use those other sentinels in order to run our quorum protocol to
966 * understand if it's time to proceeed with the fail over.
967 *
968 * Making sure no duplication is possible we greately improve the robustness
969 * of the quorum (otherwise we may end counting the same instance multiple
970 * times for some reason).
971 *
972 * The function returns the number of Sentinels removed. */
973int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
974 dictIterator *di;
975 dictEntry *de;
976 int removed = 0;
977
978 di = dictGetSafeIterator(master->sentinels);
979 while((de = dictNext(di)) != NULL) {
980 sentinelRedisInstance *ri = dictGetVal(de);
981
982 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
983 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
984 {
985 dictDelete(master->sentinels,ri->name);
986 removed++;
987 }
988 }
989 dictReleaseIterator(di);
990 return removed;
991}
992
993/* Search an instance with the same runid, ip and port into a dictionary
994 * of instances. Return NULL if not found, otherwise return the instance
995 * pointer.
996 *
997 * runid or ip can be NULL. In such a case the search is performed only
998 * by the non-NULL field. */
999sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
1000 dictIterator *di;
1001 dictEntry *de;
1002 sentinelRedisInstance *instance = NULL;
1003
1004 redisAssert(ip || runid); /* User must pass at least one search param. */
1005 di = dictGetIterator(instances);
1006 while((de = dictNext(di)) != NULL) {
1007 sentinelRedisInstance *ri = dictGetVal(de);
1008
1009 if (runid && !ri->runid) continue;
1010 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
1011 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
1012 ri->addr->port == port)))
1013 {
1014 instance = ri;
1015 break;
1016 }
1017 }
1018 dictReleaseIterator(di);
1019 return instance;
1020}
1021
1022/* Simple master lookup by name */
1023sentinelRedisInstance *sentinelGetMasterByName(char *name) {
1024 sentinelRedisInstance *ri;
1025 sds sdsname = sdsnew(name);
1026
1027 ri = dictFetchValue(sentinel.masters,sdsname);
1028 sdsfree(sdsname);
1029 return ri;
1030}
1031
1032/* Add the specified flags to all the instances in the specified dictionary. */
1033void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
1034 dictIterator *di;
1035 dictEntry *de;
1036
1037 di = dictGetIterator(instances);
1038 while((de = dictNext(di)) != NULL) {
1039 sentinelRedisInstance *ri = dictGetVal(de);
1040 ri->flags |= flags;
1041 }
1042 dictReleaseIterator(di);
1043}
1044
1045/* Remove the specified flags to all the instances in the specified
1046 * dictionary. */
1047void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
1048 dictIterator *di;
1049 dictEntry *de;
1050
1051 di = dictGetIterator(instances);
1052 while((de = dictNext(di)) != NULL) {
1053 sentinelRedisInstance *ri = dictGetVal(de);
1054 ri->flags &= ~flags;
1055 }
1056 dictReleaseIterator(di);
1057}
1058
1059/* Reset the state of a monitored master:
1060 * 1) Remove all slaves.
1061 * 2) Remove all sentinels.
1062 * 3) Remove most of the flags resulting from runtime operations.
1063 * 4) Reset timers to their default value.
1064 * 5) In the process of doing this undo the failover if in progress.
1065 * 6) Disconnect the connections with the master (will reconnect automatically).
1066 */
1067void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
1068 redisAssert(ri->flags & SRI_MASTER);
1069 dictRelease(ri->slaves);
1070 dictRelease(ri->sentinels);
1071 ri->slaves = dictCreate(&instancesDictType,NULL);
1072 ri->sentinels = dictCreate(&instancesDictType,NULL);
1073 if (ri->cc) sentinelKillLink(ri,ri->cc);
1074 if (ri->pc) sentinelKillLink(ri,ri->pc);
1075 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
1076 if (ri->leader) {
1077 sdsfree(ri->leader);
1078 ri->leader = NULL;
1079 }
1080 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1081 ri->failover_state_change_time = 0;
1082 ri->failover_start_time = 0;
1083 ri->promoted_slave = NULL;
1084 sdsfree(ri->runid);
1085 sdsfree(ri->slave_master_host);
1086 ri->runid = NULL;
1087 ri->slave_master_host = NULL;
1088 ri->last_avail_time = mstime();
1089 ri->last_pong_time = mstime();
1090 if (flags & SENTINEL_GENERATE_EVENT)
1091 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
1092}
1093
1094/* Call sentinelResetMaster() on every master with a name matching the specified
1095 * pattern. */
1096int sentinelResetMastersByPattern(char *pattern, int flags) {
1097 dictIterator *di;
1098 dictEntry *de;
1099 int reset = 0;
1100
1101 di = dictGetIterator(sentinel.masters);
1102 while((de = dictNext(di)) != NULL) {
1103 sentinelRedisInstance *ri = dictGetVal(de);
1104
1105 if (ri->name) {
1106 if (stringmatch(pattern,ri->name,0)) {
1107 sentinelResetMaster(ri,flags);
1108 reset++;
1109 }
1110 }
1111 }
1112 dictReleaseIterator(di);
1113 return reset;
1114}
1115
1116/* Reset the specified master with sentinelResetMaster(), and also change
1117 * the ip:port address, but take the name of the instance unmodified.
1118 *
1119 * This is used to handle the +switch-master and +redirect-to-master events.
1120 *
1121 * The function returns REDIS_ERR if the address can't be resolved for some
1122 * reason. Otherwise REDIS_OK is returned.
1123 *
1124 * TODO: make this reset so that original sentinels are re-added with
1125 * same ip / port / runid.
1126 */
1127
1128int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
1129 sentinelAddr *oldaddr, *newaddr;
1130
1131 newaddr = createSentinelAddr(ip,port);
1132 if (newaddr == NULL) return REDIS_ERR;
1133 sentinelResetMaster(master,SENTINEL_NO_FLAGS);
1134 oldaddr = master->addr;
1135 master->addr = newaddr;
1136 /* Release the old address at the end so we are safe even if the function
1137 * gets the master->addr->ip and master->addr->port as arguments. */
1138 releaseSentinelAddr(oldaddr);
1139 return REDIS_OK;
1140}
1141
1142/* ============================ Config handling ============================= */
1143char *sentinelHandleConfiguration(char **argv, int argc) {
1144 sentinelRedisInstance *ri;
1145
1146 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
1147 /* monitor <name> <host> <port> <quorum> */
1148 int quorum = atoi(argv[4]);
1149
1150 if (quorum <= 0) return "Quorum must be 1 or greater.";
1151 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
1152 atoi(argv[3]),quorum,NULL) == NULL)
1153 {
1154 switch(errno) {
1155 case EBUSY: return "Duplicated master name.";
1156 case ENOENT: return "Can't resolve master instance hostname.";
1157 case EINVAL: return "Invalid port number";
1158 }
1159 }
1160 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
1161 /* down-after-milliseconds <name> <milliseconds> */
1162 ri = sentinelGetMasterByName(argv[1]);
1163 if (!ri) return "No such master with specified name.";
1164 ri->down_after_period = atoi(argv[2]);
1165 if (ri->down_after_period <= 0)
1166 return "negative or zero time parameter.";
1167 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
1168 /* failover-timeout <name> <milliseconds> */
1169 ri = sentinelGetMasterByName(argv[1]);
1170 if (!ri) return "No such master with specified name.";
1171 ri->failover_timeout = atoi(argv[2]);
1172 if (ri->failover_timeout <= 0)
1173 return "negative or zero time parameter.";
1174 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
1175 /* can-failover <name> <yes/no> */
1176 int yesno = yesnotoi(argv[2]);
1177
1178 ri = sentinelGetMasterByName(argv[1]);
1179 if (!ri) return "No such master with specified name.";
1180 if (yesno == -1) return "Argument must be either yes or no.";
1181 if (yesno)
1182 ri->flags |= SRI_CAN_FAILOVER;
1183 else
1184 ri->flags &= ~SRI_CAN_FAILOVER;
1185 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
1186 /* parallel-syncs <name> <milliseconds> */
1187 ri = sentinelGetMasterByName(argv[1]);
1188 if (!ri) return "No such master with specified name.";
1189 ri->parallel_syncs = atoi(argv[2]);
1190 } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
1191 /* notification-script <name> <path> */
1192 ri = sentinelGetMasterByName(argv[1]);
1193 if (!ri) return "No such master with specified name.";
1194 if (access(argv[2],X_OK) == -1)
1195 return "Notification script seems non existing or non executable.";
1196 ri->notification_script = sdsnew(argv[2]);
1197 } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
1198 /* client-reconfig-script <name> <path> */
1199 ri = sentinelGetMasterByName(argv[1]);
1200 if (!ri) return "No such master with specified name.";
1201 if (access(argv[2],X_OK) == -1)
1202 return "Client reconfiguration script seems non existing or "
1203 "non executable.";
1204 ri->client_reconfig_script = sdsnew(argv[2]);
1205 } else {
1206 return "Unrecognized sentinel configuration statement.";
1207 }
1208 return NULL;
1209}
1210
1211/* ====================== hiredis connection handling ======================= */
1212
1213/* Completely disconnect an hiredis link from an instance. */
1214void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
1215 if (ri->cc == c) {
1216 ri->cc = NULL;
1217 ri->pending_commands = 0;
1218 }
1219 if (ri->pc == c) ri->pc = NULL;
1220 c->data = NULL;
1221 ri->flags |= SRI_DISCONNECTED;
1222 redisAsyncFree(c);
1223}
1224
1225/* This function takes an hiredis context that is in an error condition
1226 * and make sure to mark the instance as disconnected performing the
1227 * cleanup needed.
1228 *
1229 * Note: we don't free the hiredis context as hiredis will do it for us
1230 * for async conenctions. */
1231void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
1232 sentinelRedisInstance *ri = c->data;
1233 int pubsub;
1234
1235 if (ri == NULL) return; /* The instance no longer exists. */
1236
1237 pubsub = (ri->pc == c);
1238 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
1239 "%@ #%s", c->errstr);
1240 if (pubsub)
1241 ri->pc = NULL;
1242 else
1243 ri->cc = NULL;
1244 ri->flags |= SRI_DISCONNECTED;
1245}
1246
1247void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
1248 if (status != REDIS_OK) {
1249 sentinelDisconnectInstanceFromContext(c);
1250 } else {
1251 sentinelRedisInstance *ri = c->data;
1252 int pubsub = (ri->pc == c);
1253
1254 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
1255 "%@");
1256 }
1257}
1258
1259void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
1260 sentinelDisconnectInstanceFromContext(c);
1261}
1262
1263/* Create the async connections for the specified instance if the instance
1264 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
1265 * one of the two links (commands and pub/sub) is missing. */
1266void sentinelReconnectInstance(sentinelRedisInstance *ri) {
1267 if (!(ri->flags & SRI_DISCONNECTED)) return;
1268
1269 /* Commands connection. */
1270 if (ri->cc == NULL) {
1271 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1272 if (ri->cc->err) {
1273 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
1274 ri->cc->errstr);
1275 sentinelKillLink(ri,ri->cc);
1276 } else {
1277 ri->cc_conn_time = mstime();
1278 ri->cc->data = ri;
1279 redisAeAttach(server.el,ri->cc);
1280 redisAsyncSetConnectCallback(ri->cc,
1281 sentinelLinkEstablishedCallback);
1282 redisAsyncSetDisconnectCallback(ri->cc,
1283 sentinelDisconnectCallback);
1284 }
1285 }
1286 /* Pub / Sub */
1287 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
1288 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1289 if (ri->pc->err) {
1290 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
1291 ri->pc->errstr);
1292 sentinelKillLink(ri,ri->pc);
1293 } else {
1294 int retval;
1295
1296 ri->pc_conn_time = mstime();
1297 ri->pc->data = ri;
1298 redisAeAttach(server.el,ri->pc);
1299 redisAsyncSetConnectCallback(ri->pc,
1300 sentinelLinkEstablishedCallback);
1301 redisAsyncSetDisconnectCallback(ri->pc,
1302 sentinelDisconnectCallback);
1303 /* Now we subscribe to the Sentinels "Hello" channel. */
1304 retval = redisAsyncCommand(ri->pc,
1305 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
1306 SENTINEL_HELLO_CHANNEL);
1307 if (retval != REDIS_OK) {
1308 /* If we can't subscribe, the Pub/Sub connection is useless
1309 * and we can simply disconnect it and try again. */
1310 sentinelKillLink(ri,ri->pc);
1311 return;
1312 }
1313 }
1314 }
1315 /* Clear the DISCONNECTED flags only if we have both the connections
1316 * (or just the commands connection if this is a slave or a
1317 * sentinel instance). */
1318 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
1319 ri->flags &= ~SRI_DISCONNECTED;
1320}
1321
1322/* ======================== Redis instances pinging ======================== */
1323
1324/* Process the INFO output from masters. */
1325void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
1326 sds *lines;
1327 int numlines, j;
1328 int role = 0;
1329 int runid_changed = 0; /* true if runid changed. */
1330 int first_runid = 0; /* true if this is the first runid we receive. */
1331
1332 /* The following fields must be reset to a given value in the case they
1333 * are not found at all in the INFO output. */
1334 ri->master_link_down_time = 0;
1335
1336 /* Process line by line. */
1337 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
1338 for (j = 0; j < numlines; j++) {
1339 sentinelRedisInstance *slave;
1340 sds l = lines[j];
1341
1342 /* run_id:<40 hex chars>*/
1343 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
1344 if (ri->runid == NULL) {
1345 ri->runid = sdsnewlen(l+7,40);
1346 first_runid = 1;
1347 } else {
1348 if (strncmp(ri->runid,l+7,40) != 0) {
1349 runid_changed = 1;
1350 sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
1351 sdsfree(ri->runid);
1352 ri->runid = sdsnewlen(l+7,40);
1353 }
1354 }
1355 }
1356
1357 /* slave0:<ip>,<port>,<state> */
1358 if ((ri->flags & SRI_MASTER) &&
1359 sdslen(l) >= 7 &&
1360 !memcmp(l,"slave",5) && isdigit(l[5]))
1361 {
1362 char *ip, *port, *end;
1363
1364 ip = strchr(l,':'); if (!ip) continue;
1365 ip++; /* Now ip points to start of ip address. */
1366 port = strchr(ip,','); if (!port) continue;
1367 *port = '\0'; /* nul term for easy access. */
1368 port++; /* Now port points to start of port number. */
1369 end = strchr(port,','); if (!end) continue;
1370 *end = '\0'; /* nul term for easy access. */
1371
1372 /* Check if we already have this slave into our table,
1373 * otherwise add it. */
1374 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1375 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1376 atoi(port), ri->quorum,ri)) != NULL)
1377 {
1378 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1379 }
1380 }
1381 }
1382
1383 /* master_link_down_since_seconds:<seconds> */
1384 if (sdslen(l) >= 32 &&
1385 !memcmp(l,"master_link_down_since_seconds",30))
1386 {
1387 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1388 }
1389
1390 /* role:<role> */
1391 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1392 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1393
1394 if (role == SRI_SLAVE) {
1395 /* master_host:<host> */
1396 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1397 sdsfree(ri->slave_master_host);
1398 ri->slave_master_host = sdsnew(l+12);
1399 }
1400
1401 /* master_port:<port> */
1402 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1403 ri->slave_master_port = atoi(l+12);
1404
1405 /* master_link_status:<status> */
1406 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1407 ri->slave_master_link_status =
1408 (strcasecmp(l+19,"up") == 0) ?
1409 SENTINEL_MASTER_LINK_STATUS_UP :
1410 SENTINEL_MASTER_LINK_STATUS_DOWN;
1411 }
1412 }
1413 }
1414 ri->info_refresh = mstime();
1415 sdsfreesplitres(lines,numlines);
1416
1417 /* ---------------------------- Acting half ----------------------------- */
1418 if (sentinel.tilt) return;
1419
1420 /* Act if a master turned into a slave. */
1421 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
1422 if (first_runid && ri->slave_master_host) {
1423 /* If it is the first time we receive INFO from it, but it's
1424 * a slave while it was configured as a master, we want to monitor
1425 * its master instead. */
1426 sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
1427 "%s %s %d %s %d",
1428 ri->name, ri->addr->ip, ri->addr->port,
1429 ri->slave_master_host, ri->slave_master_port);
1430 sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
1431 ri->slave_master_port);
1432 return;
1433 }
1434 }
1435
1436 /* Act if a slave turned into a master. */
1437 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
1438 if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1439 (runid_changed || first_runid))
1440 {
1441 /* If a slave turned into maser but:
1442 *
1443 * 1) Failover not in progress.
1444 * 2) RunID hs changed, or its the first time we see an INFO output.
1445 *
1446 * We assume this is a reboot with a wrong configuration.
1447 * Log the event and remove the slave. */
1448 int retval;
1449
1450 sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
1451 retval = dictDelete(ri->master->slaves,ri->name);
1452 redisAssert(retval == REDIS_OK);
1453 return;
1454 } else if (ri->flags & SRI_PROMOTED) {
1455 /* If this is a promoted slave we can change state to the
1456 * failover state machine. */
1457 if ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1458 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1459 (ri->master->failover_state ==
1460 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1461 {
1462 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1463 ri->master->failover_state_change_time = mstime();
1464 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1465 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1466 ri->master,"%@");
1467 sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
1468 "start",ri->master->addr,ri->addr);
1469 }
1470 } else if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) ||
1471 ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1472 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1473 ri->master->failover_state ==
1474 SENTINEL_FAILOVER_STATE_WAIT_START))
1475 {
1476 /* No failover in progress? Then it is the start of a failover
1477 * and we are an observer.
1478 *
1479 * We also do that if we are a leader doing a failover, in wait
1480 * start, but well, somebody else started before us. */
1481
1482 if (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) {
1483 sentinelEvent(REDIS_WARNING,"-failover-abort-race",
1484 ri->master, "%@");
1485 sentinelAbortFailover(ri->master);
1486 }
1487
1488 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1489 sentinelEvent(REDIS_WARNING,"+failover-detected",ri->master,"%@");
1490 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1491 ri->master->failover_state_change_time = mstime();
1492 ri->master->promoted_slave = ri;
1493 ri->flags |= SRI_PROMOTED;
1494 sentinelCallClientReconfScript(ri->master,SENTINEL_OBSERVER,
1495 "start", ri->master->addr,ri->addr);
1496 /* We are an observer, so we can only assume that the leader
1497 * is reconfiguring the slave instances. For this reason we
1498 * set all the instances as RECONF_SENT waiting for progresses
1499 * on this side. */
1500 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1501 SRI_RECONF_SENT);
1502 }
1503 }
1504
1505 /* Detect if the slave that is in the process of being reconfigured
1506 * changed state. */
1507 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1508 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1509 {
1510 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1511 if ((ri->flags & SRI_RECONF_SENT) &&
1512 ri->slave_master_host &&
1513 strcmp(ri->slave_master_host,
1514 ri->master->promoted_slave->addr->ip) == 0 &&
1515 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1516 {
1517 ri->flags &= ~SRI_RECONF_SENT;
1518 ri->flags |= SRI_RECONF_INPROG;
1519 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1520 }
1521
1522 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1523 if ((ri->flags & SRI_RECONF_INPROG) &&
1524 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1525 {
1526 ri->flags &= ~SRI_RECONF_INPROG;
1527 ri->flags |= SRI_RECONF_DONE;
1528 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1529 /* If we are moving forward (a new slave is now configured)
1530 * we update the change_time as we are conceptually passing
1531 * to the next slave. */
1532 ri->failover_state_change_time = mstime();
1533 }
1534 }
1535}
1536
1537void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1538 sentinelRedisInstance *ri = c->data;
1539 redisReply *r;
1540
1541 if (ri) ri->pending_commands--;
1542 if (!reply || !ri) return;
1543 r = reply;
1544
1545 if (r->type == REDIS_REPLY_STRING) {
1546 sentinelRefreshInstanceInfo(ri,r->str);
1547 }
1548}
1549
1550/* Just discard the reply. We use this when we are not monitoring the return
1551 * value of the command but its effects directly. */
1552void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1553 sentinelRedisInstance *ri = c->data;
1554
1555 if (ri) ri->pending_commands--;
1556}
1557
1558void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1559 sentinelRedisInstance *ri = c->data;
1560 redisReply *r;
1561
1562 if (ri) ri->pending_commands--;
1563 if (!reply || !ri) return;
1564 r = reply;
1565
1566 if (r->type == REDIS_REPLY_STATUS ||
1567 r->type == REDIS_REPLY_ERROR) {
1568 /* Update the "instance available" field only if this is an
1569 * acceptable reply. */
1570 if (strncmp(r->str,"PONG",4) == 0 ||
1571 strncmp(r->str,"LOADING",7) == 0 ||
1572 strncmp(r->str,"MASTERDOWN",10) == 0)
1573 {
1574 ri->last_avail_time = mstime();
1575 }
1576 }
1577 ri->last_pong_time = mstime();
1578}
1579
1580/* This is called when we get the reply about the PUBLISH command we send
1581 * to the master to advertise this sentinel. */
1582void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1583 sentinelRedisInstance *ri = c->data;
1584 redisReply *r;
1585
1586 if (ri) ri->pending_commands--;
1587 if (!reply || !ri) return;
1588 r = reply;
1589
1590 /* Only update pub_time if we actually published our message. Otherwise
1591 * we'll retry against in 100 milliseconds. */
1592 if (r->type != REDIS_REPLY_ERROR)
1593 ri->last_pub_time = mstime();
1594}
1595
1596/* This is our Pub/Sub callback for the Hello channel. It's useful in order
1597 * to discover other sentinels attached at the same master. */
1598void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1599 sentinelRedisInstance *ri = c->data;
1600 redisReply *r;
1601
1602 if (!reply || !ri) return;
1603 r = reply;
1604
1605 /* Update the last activity in the pubsub channel. Note that since we
1606 * receive our messages as well this timestamp can be used to detect
1607 * if the link is probably diconnected even if it seems otherwise. */
1608 ri->pc_last_activity = mstime();
1609
1610 /* Sanity check in the reply we expect, so that the code that follows
1611 * can avoid to check for details. */
1612 if (r->type != REDIS_REPLY_ARRAY ||
1613 r->elements != 3 ||
1614 r->element[0]->type != REDIS_REPLY_STRING ||
1615 r->element[1]->type != REDIS_REPLY_STRING ||
1616 r->element[2]->type != REDIS_REPLY_STRING ||
1617 strcmp(r->element[0]->str,"message") != 0) return;
1618
1619 /* We are not interested in meeting ourselves */
1620 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1621
1622 {
1623 int numtokens, port, removed, canfailover;
1624 char **token = sdssplitlen(r->element[2]->str,
1625 r->element[2]->len,
1626 ":",1,&numtokens);
1627 sentinelRedisInstance *sentinel;
1628
1629 if (numtokens == 4) {
1630 /* First, try to see if we already have this sentinel. */
1631 port = atoi(token[1]);
1632 canfailover = atoi(token[3]);
1633 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1634 ri->sentinels,token[0],port,token[2]);
1635
1636 if (!sentinel) {
1637 /* If not, remove all the sentinels that have the same runid
1638 * OR the same ip/port, because it's either a restart or a
1639 * network topology change. */
1640 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1641 token[2]);
1642 if (removed) {
1643 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1644 "%@ #duplicate of %s:%d or %s",
1645 token[0],port,token[2]);
1646 }
1647
1648 /* Add the new sentinel. */
1649 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1650 token[0],port,ri->quorum,ri);
1651 if (sentinel) {
1652 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1653 /* The runid is NULL after a new instance creation and
1654 * for Sentinels we don't have a later chance to fill it,
1655 * so do it now. */
1656 sentinel->runid = sdsnew(token[2]);
1657 }
1658 }
1659
1660 /* Update the state of the Sentinel. */
1661 if (sentinel) {
1662 sentinel->last_hello_time = mstime();
1663 if (canfailover)
1664 sentinel->flags |= SRI_CAN_FAILOVER;
1665 else
1666 sentinel->flags &= ~SRI_CAN_FAILOVER;
1667 }
1668 }
1669 sdsfreesplitres(token,numtokens);
1670 }
1671}
1672
1673void sentinelPingInstance(sentinelRedisInstance *ri) {
1674 mstime_t now = mstime();
1675 mstime_t info_period;
1676 int retval;
1677
1678 /* Return ASAP if we have already a PING or INFO already pending, or
1679 * in the case the instance is not properly connected. */
1680 if (ri->flags & SRI_DISCONNECTED) return;
1681
1682 /* For INFO, PING, PUBLISH that are not critical commands to send we
1683 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1684 * want to use a lot of memory just because a link is not working
1685 * properly (note that anyway there is a redundant protection about this,
1686 * that is, the link will be disconnected and reconnected if a long
1687 * timeout condition is detected. */
1688 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1689
1690 /* If this is a slave of a master in O_DOWN condition we start sending
1691 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1692 * period. In this state we want to closely monitor slaves in case they
1693 * are turned into masters by another Sentinel, or by the sysadmin. */
1694 if ((ri->flags & SRI_SLAVE) &&
1695 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1696 info_period = 1000;
1697 } else {
1698 info_period = SENTINEL_INFO_PERIOD;
1699 }
1700
1701 if ((ri->flags & SRI_SENTINEL) == 0 &&
1702 (ri->info_refresh == 0 ||
1703 (now - ri->info_refresh) > info_period))
1704 {
1705 /* Send INFO to masters and slaves, not sentinels. */
1706 retval = redisAsyncCommand(ri->cc,
1707 sentinelInfoReplyCallback, NULL, "INFO");
1708 if (retval != REDIS_OK) return;
1709 ri->pending_commands++;
1710 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1711 /* Send PING to all the three kinds of instances. */
1712 retval = redisAsyncCommand(ri->cc,
1713 sentinelPingReplyCallback, NULL, "PING");
1714 if (retval != REDIS_OK) return;
1715 ri->pending_commands++;
1716 } else if ((ri->flags & SRI_MASTER) &&
1717 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1718 {
1719 /* PUBLISH hello messages only to masters. */
1720 struct sockaddr_in sa;
1721 socklen_t salen = sizeof(sa);
1722
1723 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1724 char myaddr[128];
1725
1726 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1727 inet_ntoa(sa.sin_addr), server.port, server.runid,
1728 (ri->flags & SRI_CAN_FAILOVER) != 0);
1729 retval = redisAsyncCommand(ri->cc,
1730 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1731 SENTINEL_HELLO_CHANNEL,myaddr);
1732 if (retval != REDIS_OK) return;
1733 ri->pending_commands++;
1734 }
1735 }
1736}
1737
1738/* =========================== SENTINEL command ============================= */
1739
1740const char *sentinelFailoverStateStr(int state) {
1741 switch(state) {
1742 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1743 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1744 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1745 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1746 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1747 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1748 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1749 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1750 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1751 default: return "unknown";
1752 }
1753}
1754
1755/* Redis instance to Redis protocol representation. */
1756void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1757 char *flags = sdsempty();
1758 void *mbl;
1759 int fields = 0;
1760
1761 mbl = addDeferredMultiBulkLength(c);
1762
1763 addReplyBulkCString(c,"name");
1764 addReplyBulkCString(c,ri->name);
1765 fields++;
1766
1767 addReplyBulkCString(c,"ip");
1768 addReplyBulkCString(c,ri->addr->ip);
1769 fields++;
1770
1771 addReplyBulkCString(c,"port");
1772 addReplyBulkLongLong(c,ri->addr->port);
1773 fields++;
1774
1775 addReplyBulkCString(c,"runid");
1776 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1777 fields++;
1778
1779 addReplyBulkCString(c,"flags");
1780 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1781 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1782 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1783 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1784 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1785 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1786 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1787 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1788 flags = sdscat(flags,"failover_in_progress,");
1789 if (ri->flags & SRI_I_AM_THE_LEADER)
1790 flags = sdscat(flags,"i_am_the_leader,");
1791 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1792 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1793 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1794 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1795
1796 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1797 addReplyBulkCString(c,flags);
1798 sdsfree(flags);
1799 fields++;
1800
1801 addReplyBulkCString(c,"pending-commands");
1802 addReplyBulkLongLong(c,ri->pending_commands);
1803 fields++;
1804
1805 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1806 addReplyBulkCString(c,"failover-state");
1807 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1808 fields++;
1809 }
1810
1811 addReplyBulkCString(c,"last-ok-ping-reply");
1812 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1813 fields++;
1814
1815 addReplyBulkCString(c,"last-ping-reply");
1816 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1817 fields++;
1818
1819 if (ri->flags & SRI_S_DOWN) {
1820 addReplyBulkCString(c,"s-down-time");
1821 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1822 fields++;
1823 }
1824
1825 if (ri->flags & SRI_O_DOWN) {
1826 addReplyBulkCString(c,"o-down-time");
1827 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1828 fields++;
1829 }
1830
1831 /* Masters and Slaves */
1832 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1833 addReplyBulkCString(c,"info-refresh");
1834 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1835 fields++;
1836 }
1837
1838 /* Only masters */
1839 if (ri->flags & SRI_MASTER) {
1840 addReplyBulkCString(c,"num-slaves");
1841 addReplyBulkLongLong(c,dictSize(ri->slaves));
1842 fields++;
1843
1844 addReplyBulkCString(c,"num-other-sentinels");
1845 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1846 fields++;
1847
1848 addReplyBulkCString(c,"quorum");
1849 addReplyBulkLongLong(c,ri->quorum);
1850 fields++;
1851 }
1852
1853 /* Only slaves */
1854 if (ri->flags & SRI_SLAVE) {
1855 addReplyBulkCString(c,"master-link-down-time");
1856 addReplyBulkLongLong(c,ri->master_link_down_time);
1857 fields++;
1858
1859 addReplyBulkCString(c,"master-link-status");
1860 addReplyBulkCString(c,
1861 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1862 "ok" : "err");
1863 fields++;
1864
1865 addReplyBulkCString(c,"master-host");
1866 addReplyBulkCString(c,
1867 ri->slave_master_host ? ri->slave_master_host : "?");
1868 fields++;
1869
1870 addReplyBulkCString(c,"master-port");
1871 addReplyBulkLongLong(c,ri->slave_master_port);
1872 fields++;
1873 }
1874
1875 /* Only sentinels */
1876 if (ri->flags & SRI_SENTINEL) {
1877 addReplyBulkCString(c,"last-hello-message");
1878 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1879 fields++;
1880
1881 addReplyBulkCString(c,"can-failover-its-master");
1882 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1883 fields++;
1884
1885 if (ri->flags & SRI_MASTER_DOWN) {
1886 addReplyBulkCString(c,"subjective-leader");
1887 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1888 fields++;
1889 }
1890 }
1891
1892 setDeferredMultiBulkLength(c,mbl,fields*2);
1893}
1894
1895/* Output a number of instances contanined inside a dictionary as
1896 * Redis protocol. */
1897void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1898 dictIterator *di;
1899 dictEntry *de;
1900
1901 di = dictGetIterator(instances);
1902 addReplyMultiBulkLen(c,dictSize(instances));
1903 while((de = dictNext(di)) != NULL) {
1904 sentinelRedisInstance *ri = dictGetVal(de);
1905
1906 addReplySentinelRedisInstance(c,ri);
1907 }
1908 dictReleaseIterator(di);
1909}
1910
1911/* Lookup the named master into sentinel.masters.
1912 * If the master is not found reply to the client with an error and returns
1913 * NULL. */
1914sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1915 robj *name)
1916{
1917 sentinelRedisInstance *ri;
1918
1919 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1920 if (!ri) {
1921 addReplyError(c,"No such master with that name");
1922 return NULL;
1923 }
1924 return ri;
1925}
1926
1927void sentinelCommand(redisClient *c) {
1928 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1929 /* SENTINEL MASTERS */
1930 if (c->argc != 2) goto numargserr;
1931
1932 addReplyDictOfRedisInstances(c,sentinel.masters);
1933 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1934 /* SENTINEL SLAVES <master-name> */
1935 sentinelRedisInstance *ri;
1936
1937 if (c->argc != 3) goto numargserr;
1938 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1939 return;
1940 addReplyDictOfRedisInstances(c,ri->slaves);
1941 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1942 /* SENTINEL SENTINELS <master-name> */
1943 sentinelRedisInstance *ri;
1944
1945 if (c->argc != 3) goto numargserr;
1946 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1947 return;
1948 addReplyDictOfRedisInstances(c,ri->sentinels);
1949 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1950 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1951 sentinelRedisInstance *ri;
1952 char *leader = NULL;
1953 long port;
1954 int isdown = 0;
1955
1956 if (c->argc != 4) goto numargserr;
1957 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
1958 return;
1959 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
1960 c->argv[2]->ptr,port,NULL);
1961
1962 /* It exists? Is actually a master? Is subjectively down? It's down.
1963 * Note: if we are in tilt mode we always reply with "0". */
1964 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
1965 (ri->flags & SRI_MASTER))
1966 isdown = 1;
1967 if (ri) leader = sentinelGetSubjectiveLeader(ri);
1968
1969 /* Reply with a two-elements multi-bulk reply: down state, leader. */
1970 addReplyMultiBulkLen(c,2);
1971 addReply(c, isdown ? shared.cone : shared.czero);
1972 addReplyBulkCString(c, leader ? leader : "?");
1973 if (leader) sdsfree(leader);
1974 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
1975 /* SENTINEL RESET <pattern> */
1976 if (c->argc != 3) goto numargserr;
1977 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
1978 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
1979 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
1980 sentinelRedisInstance *ri;
1981
1982 if (c->argc != 3) goto numargserr;
1983 ri = sentinelGetMasterByName(c->argv[2]->ptr);
1984 if (ri == NULL) {
1985 addReply(c,shared.nullmultibulk);
1986 } else {
1987 sentinelAddr *addr = ri->addr;
1988
1989 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
1990 addr = ri->promoted_slave->addr;
1991 addReplyMultiBulkLen(c,2);
1992 addReplyBulkCString(c,addr->ip);
1993 addReplyBulkLongLong(c,addr->port);
1994 }
1995 } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
1996 /* SENTINEL FAILOVER <master-name> */
1997 sentinelRedisInstance *ri;
1998
1999 if (c->argc != 3) goto numargserr;
2000 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
2001 return;
2002 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
2003 addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
2004 return;
2005 }
2006 if (sentinelSelectSlave(ri) == NULL) {
2007 addReplySds(c,sdsnew("-NOGOODSLAVE No suitable slave to promote\r\n"));
2008 return;
2009 }
2010 sentinelStartFailover(ri,SENTINEL_FAILOVER_STATE_WAIT_START);
2011 ri->flags |= SRI_FORCE_FAILOVER;
2012 addReply(c,shared.ok);
2013 } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
2014 /* SENTINEL PENDING-SCRIPTS */
2015
2016 if (c->argc != 2) goto numargserr;
2017 sentinelPendingScriptsCommand(c);
2018 } else {
2019 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
2020 (char*)c->argv[1]->ptr);
2021 }
2022 return;
2023
2024numargserr:
2025 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
2026 (char*)c->argv[1]->ptr);
2027}
2028
2029/* ===================== SENTINEL availability checks ======================= */
2030
2031/* Is this instance down from our point of view? */
2032void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
2033 mstime_t elapsed = mstime() - ri->last_avail_time;
2034
2035 /* Check if we are in need for a reconnection of one of the
2036 * links, because we are detecting low activity.
2037 *
2038 * 1) Check if the command link seems connected, was connected not less
2039 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
2040 * idle time that is greater than down_after_period / 2 seconds. */
2041 if (ri->cc &&
2042 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
2043 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
2044 {
2045 sentinelKillLink(ri,ri->cc);
2046 }
2047
2048 /* 2) Check if the pubsub link seems connected, was connected not less
2049 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
2050 * activity in the Pub/Sub channel for more than
2051 * SENTINEL_PUBLISH_PERIOD * 3.
2052 */
2053 if (ri->pc &&
2054 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
2055 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
2056 {
2057 sentinelKillLink(ri,ri->pc);
2058 }
2059
2060 /* Update the subjectively down flag. */
2061 if (elapsed > ri->down_after_period) {
2062 /* Is subjectively down */
2063 if ((ri->flags & SRI_S_DOWN) == 0) {
2064 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
2065 ri->s_down_since_time = mstime();
2066 ri->flags |= SRI_S_DOWN;
2067 }
2068 } else {
2069 /* Is subjectively up */
2070 if (ri->flags & SRI_S_DOWN) {
2071 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
2072 ri->flags &= ~SRI_S_DOWN;
2073 }
2074 }
2075}
2076
2077/* Is this instance down accordingly to the configured quorum? */
2078void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
2079 dictIterator *di;
2080 dictEntry *de;
2081 int quorum = 0, odown = 0;
2082
2083 if (master->flags & SRI_S_DOWN) {
2084 /* Is down for enough sentinels? */
2085 quorum = 1; /* the current sentinel. */
2086 /* Count all the other sentinels. */
2087 di = dictGetIterator(master->sentinels);
2088 while((de = dictNext(di)) != NULL) {
2089 sentinelRedisInstance *ri = dictGetVal(de);
2090
2091 if (ri->flags & SRI_MASTER_DOWN) quorum++;
2092 }
2093 dictReleaseIterator(di);
2094 if (quorum >= master->quorum) odown = 1;
2095 }
2096
2097 /* Set the flag accordingly to the outcome. */
2098 if (odown) {
2099 if ((master->flags & SRI_O_DOWN) == 0) {
2100 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
2101 quorum, master->quorum);
2102 master->flags |= SRI_O_DOWN;
2103 master->o_down_since_time = mstime();
2104 }
2105 } else {
2106 if (master->flags & SRI_O_DOWN) {
2107 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
2108 master->flags &= ~SRI_O_DOWN;
2109 }
2110 }
2111}
2112
2113/* Receive the SENTINEL is-master-down-by-addr reply, see the
2114 * sentinelAskMasterStateToOtherSentinels() function for more information. */
2115void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
2116 sentinelRedisInstance *ri = c->data;
2117 redisReply *r;
2118
2119 if (ri) ri->pending_commands--;
2120 if (!reply || !ri) return;
2121 r = reply;
2122
2123 /* Ignore every error or unexpected reply.
2124 * Note that if the command returns an error for any reason we'll
2125 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
2126 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
2127 r->element[0]->type == REDIS_REPLY_INTEGER &&
2128 r->element[1]->type == REDIS_REPLY_STRING)
2129 {
2130 ri->last_master_down_reply_time = mstime();
2131 if (r->element[0]->integer == 1) {
2132 ri->flags |= SRI_MASTER_DOWN;
2133 } else {
2134 ri->flags &= ~SRI_MASTER_DOWN;
2135 }
2136 sdsfree(ri->leader);
2137 ri->leader = sdsnew(r->element[1]->str);
2138 }
2139}
2140
2141/* If we think (subjectively) the master is down, we start sending
2142 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
2143 * in order to get the replies that allow to reach the quorum and
2144 * possibly also mark the master as objectively down. */
2145void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
2146 dictIterator *di;
2147 dictEntry *de;
2148
2149 di = dictGetIterator(master->sentinels);
2150 while((de = dictNext(di)) != NULL) {
2151 sentinelRedisInstance *ri = dictGetVal(de);
2152 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
2153 char port[32];
2154 int retval;
2155
2156 /* If the master state from other sentinel is too old, we clear it. */
2157 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
2158 ri->flags &= ~SRI_MASTER_DOWN;
2159 sdsfree(ri->leader);
2160 ri->leader = NULL;
2161 }
2162
2163 /* Only ask if master is down to other sentinels if:
2164 *
2165 * 1) We believe it is down, or there is a failover in progress.
2166 * 2) Sentinel is connected.
2167 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
2168 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
2169 continue;
2170 if (ri->flags & SRI_DISCONNECTED) continue;
2171 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
2172 continue;
2173
2174 /* Ask */
2175 ll2string(port,sizeof(port),master->addr->port);
2176 retval = redisAsyncCommand(ri->cc,
2177 sentinelReceiveIsMasterDownReply, NULL,
2178 "SENTINEL is-master-down-by-addr %s %s",
2179 master->addr->ip, port);
2180 if (retval == REDIS_OK) ri->pending_commands++;
2181 }
2182 dictReleaseIterator(di);
2183}
2184
2185/* =============================== FAILOVER ================================= */
2186
2187/* Given a master get the "subjective leader", that is, among all the sentinels
2188 * with given characteristics, the one with the lexicographically smaller
2189 * runid. The characteristics required are:
2190 *
2191 * 1) Has SRI_CAN_FAILOVER flag.
2192 * 2) Is not disconnected.
2193 * 3) Recently answered to our ping (no longer than
2194 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
2195 *
2196 * The function returns a pointer to an sds string representing the runid of the
2197 * leader sentinel instance (from our point of view). Otherwise NULL is
2198 * returned if there are no suitable sentinels.
2199 */
2200
2201int compareRunID(const void *a, const void *b) {
2202 char **aptrptr = (char**)a, **bptrptr = (char**)b;
2203 return strcasecmp(*aptrptr, *bptrptr);
2204}
2205
2206char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
2207 dictIterator *di;
2208 dictEntry *de;
2209 char **instance =
2210 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
2211 int instances = 0;
2212 char *leader = NULL;
2213
2214 if (master->flags & SRI_CAN_FAILOVER) {
2215 /* Add myself if I'm a Sentinel that can failover this master. */
2216 instance[instances++] = server.runid;
2217 }
2218
2219 di = dictGetIterator(master->sentinels);
2220 while((de = dictNext(di)) != NULL) {
2221 sentinelRedisInstance *ri = dictGetVal(de);
2222 mstime_t lag = mstime() - ri->last_avail_time;
2223
2224 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
2225 !(ri->flags & SRI_CAN_FAILOVER) ||
2226 (ri->flags & SRI_DISCONNECTED) ||
2227 ri->runid == NULL)
2228 continue;
2229 instance[instances++] = ri->runid;
2230 }
2231 dictReleaseIterator(di);
2232
2233 /* If we have at least one instance passing our checks, order the array
2234 * by runid. */
2235 if (instances) {
2236 qsort(instance,instances,sizeof(char*),compareRunID);
2237 leader = sdsnew(instance[0]);
2238 }
2239 zfree(instance);
2240 return leader;
2241}
2242
2243struct sentinelLeader {
2244 char *runid;
2245 unsigned long votes;
2246};
2247
2248/* Helper function for sentinelGetObjectiveLeader, increment the counter
2249 * relative to the specified runid. */
2250void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
2251 dictEntry *de = dictFind(counters,runid);
2252 uint64_t oldval;
2253
2254 if (de) {
2255 oldval = dictGetUnsignedIntegerVal(de);
2256 dictSetUnsignedIntegerVal(de,oldval+1);
2257 } else {
2258 de = dictAddRaw(counters,runid);
2259 redisAssert(de != NULL);
2260 dictSetUnsignedIntegerVal(de,1);
2261 }
2262}
2263
2264/* Scan all the Sentinels attached to this master to check what is the
2265 * most voted leader among Sentinels. */
2266char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
2267 dict *counters;
2268 dictIterator *di;
2269 dictEntry *de;
2270 unsigned int voters = 0, voters_quorum;
2271 char *myvote;
2272 char *winner = NULL;
2273
2274 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
2275 counters = dictCreate(&leaderVotesDictType,NULL);
2276
2277 /* Count my vote. */
2278 myvote = sentinelGetSubjectiveLeader(master);
2279 if (myvote) {
2280 sentinelObjectiveLeaderIncr(counters,myvote);
2281 voters++;
2282 }
2283
2284 /* Count other sentinels votes */
2285 di = dictGetIterator(master->sentinels);
2286 while((de = dictNext(di)) != NULL) {
2287 sentinelRedisInstance *ri = dictGetVal(de);
2288 if (ri->leader == NULL) continue;
2289 /* If the failover is not already in progress we are only interested
2290 * in Sentinels that believe the master is down. Otherwise the leader
2291 * selection is useful for the "failover-takedown" when the original
2292 * leader fails. In that case we consider all the voters. */
2293 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
2294 !(ri->flags & SRI_MASTER_DOWN)) continue;
2295 sentinelObjectiveLeaderIncr(counters,ri->leader);
2296 voters++;
2297 }
2298 dictReleaseIterator(di);
2299 voters_quorum = voters/2+1;
2300
2301 /* Check what's the winner. For the winner to win, it needs two conditions:
2302 * 1) Absolute majority between voters (50% + 1).
2303 * 2) And anyway at least master->quorum votes. */
2304 {
2305 uint64_t max_votes = 0; /* Max votes so far. */
2306
2307 di = dictGetIterator(counters);
2308 while((de = dictNext(di)) != NULL) {
2309 uint64_t votes = dictGetUnsignedIntegerVal(de);
2310
2311 if (max_votes < votes) {
2312 max_votes = votes;
2313 winner = dictGetKey(de);
2314 }
2315 }
2316 dictReleaseIterator(di);
2317 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
2318 winner = NULL;
2319 }
2320 winner = winner ? sdsnew(winner) : NULL;
2321 sdsfree(myvote);
2322 dictRelease(counters);
2323 return winner;
2324}
2325
2326/* Setup the master state to start a failover as a leader.
2327 *
2328 * State can be either:
2329 *
2330 * SENTINEL_FAILOVER_STATE_WAIT_START: starts a failover from scratch.
2331 * SENTINEL_FAILOVER_STATE_RECONF_SLAVES: takedown a failed failover.
2332 */
2333void sentinelStartFailover(sentinelRedisInstance *master, int state) {
2334 redisAssert(master->flags & SRI_MASTER);
2335 redisAssert(state == SENTINEL_FAILOVER_STATE_WAIT_START ||
2336 state == SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
2337
2338 master->failover_state = state;
2339 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
2340 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
2341
2342 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
2343 * a recovery of a failover started by another sentinel. */
2344 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
2345 master->failover_start_time = mstime() +
2346 SENTINEL_FAILOVER_FIXED_DELAY +
2347 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
2348 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
2349 "%@ #starting in %lld milliseconds",
2350 master->failover_start_time-mstime());
2351 }
2352 master->failover_state_change_time = mstime();
2353}
2354
2355/* This function checks if there are the conditions to start the failover,
2356 * that is:
2357 *
2358 * 1) Enough time has passed since O_DOWN.
2359 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
2360 * 3) We are the objectively leader for this master.
2361 *
2362 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
2363 * and SRI_I_AM_THE_LEADER.
2364 */
2365void sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
2366 char *leader;
2367 int isleader;
2368
2369 /* We can't failover if the master is not in O_DOWN state or if
2370 * there is not already a failover in progress (to perform the
2371 * takedown if the leader died) or if this Sentinel is not allowed
2372 * to start a failover. */
2373 if (!(master->flags & SRI_CAN_FAILOVER) ||
2374 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
2375
2376 leader = sentinelGetObjectiveLeader(master);
2377 isleader = leader && strcasecmp(leader,server.runid) == 0;
2378 sdsfree(leader);
2379
2380 /* If I'm not the leader, I can't failover for sure. */
2381 if (!isleader) return;
2382
2383 /* If the failover is already in progress there are two options... */
2384 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
2385 if (master->flags & SRI_I_AM_THE_LEADER) {
2386 /* 1) I'm flagged as leader so I already started the failover.
2387 * Just return. */
2388 return;
2389 } else {
2390 mstime_t elapsed = mstime() - master->failover_state_change_time;
2391
2392 /* 2) I'm the new leader, but I'm not flagged as leader in the
2393 * master: I did not started the failover, but the original
2394 * leader has no longer the leadership.
2395 *
2396 * In this case if the failover appears to be lagging
2397 * for at least 25% of the configured failover timeout,
2398 * I can assume I can take control. Otherwise
2399 * it's better to return and wait more. */
2400 if (elapsed < (master->failover_timeout/4)) return;
2401 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
2402 /* We have already an elected slave if we are in
2403 * FAILOVER_IN_PROGRESS state, that is, the slave that we
2404 * observed turning into a master. */
2405 sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
2406 /* As an observer we flagged all the slaves as RECONF_SENT but
2407 * now we are in charge of actually sending the reconfiguration
2408 * command so let's clear this flag for all the instances. */
2409 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
2410 SRI_RECONF_SENT);
2411 }
2412 } else {
2413 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set.
2414 *
2415 * Do we have a slave to promote? Otherwise don't start a failover
2416 * at all. */
2417 if (sentinelSelectSlave(master) == NULL) return;
2418 sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_WAIT_START);
2419 }
2420}
2421
2422/* Select a suitable slave to promote. The current algorithm only uses
2423 * the following parameters:
2424 *
2425 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
2426 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
2427 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
2428 * 4) master_link_down_time no more than:
2429 * (now - master->s_down_since_time) + (master->down_after_period * 10).
2430 *
2431 * Among all the slaves matching the above conditions we select the slave
2432 * with lower slave_priority. If priority is the same we select the slave
2433 * with lexicographically smaller runid.
2434 *
2435 * The function returns the pointer to the selected slave, otherwise
2436 * NULL if no suitable slave was found.
2437 */
2438
2439int compareSlavesForPromotion(const void *a, const void *b) {
2440 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
2441 **sb = (sentinelRedisInstance **)b;
2442 if ((*sa)->slave_priority != (*sb)->slave_priority)
2443 return (*sa)->slave_priority - (*sb)->slave_priority;
2444 return strcasecmp((*sa)->runid,(*sb)->runid);
2445}
2446
2447sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
2448 sentinelRedisInstance **instance =
2449 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
2450 sentinelRedisInstance *selected = NULL;
2451 int instances = 0;
2452 dictIterator *di;
2453 dictEntry *de;
2454 mstime_t max_master_down_time = 0;
2455
2456 if (master->flags & SRI_S_DOWN)
2457 max_master_down_time += mstime() - master->s_down_since_time;
2458 max_master_down_time += master->down_after_period * 10;
2459
2460 di = dictGetIterator(master->slaves);
2461 while((de = dictNext(di)) != NULL) {
2462 sentinelRedisInstance *slave = dictGetVal(de);
2463 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2464
2465 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2466 if (slave->last_avail_time < info_validity_time) continue;
2467
2468 /* If the master is in SDOWN state we get INFO for slaves every second.
2469 * Otherwise we get it with the usual period so we need to account for
2470 * a larger delay. */
2471 if ((master->flags & SRI_S_DOWN) == 0)
2472 info_validity_time -= SENTINEL_INFO_PERIOD;
2473 if (slave->info_refresh < info_validity_time) continue;
2474 if (slave->master_link_down_time > max_master_down_time) continue;
2475 instance[instances++] = slave;
2476 }
2477 dictReleaseIterator(di);
2478 if (instances) {
2479 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2480 compareSlavesForPromotion);
2481 selected = instance[0];
2482 }
2483 zfree(instance);
2484 return selected;
2485}
2486
2487/* ---------------- Failover state machine implementation ------------------- */
2488void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2489 /* If we in "wait start" but the master is no longer in ODOWN nor in
2490 * SDOWN condition we abort the failover. This is important as it
2491 * prevents a useless failover in a a notable case of netsplit, where
2492 * the senitnels are split from the redis instances. In this case
2493 * the failover will not start while there is the split because no
2494 * good slave can be reached. However when the split is resolved, we
2495 * can go to waitstart if the slave is back rechable a few milliseconds
2496 * before the master is. In that case when the master is back online
2497 * we cancel the failover. */
2498 if ((ri->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_FORCE_FAILOVER)) == 0) {
2499 sentinelEvent(REDIS_WARNING,"-failover-abort-master-is-back",
2500 ri,"%@");
2501 sentinelAbortFailover(ri);
2502 return;
2503 }
2504
2505 /* Start the failover going to the next state if enough time has
2506 * elapsed. */
2507 if (mstime() >= ri->failover_start_time) {
2508 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2509 ri->failover_state_change_time = mstime();
2510 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2511 }
2512}
2513
2514void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2515 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2516
2517 if (slave == NULL) {
2518 sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
2519 sentinelAbortFailover(ri);
2520 } else {
2521 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2522 slave->flags |= SRI_PROMOTED;
2523 ri->promoted_slave = slave;
2524 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2525 ri->failover_state_change_time = mstime();
2526 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2527 slave, "%@");
2528 }
2529}
2530
2531void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2532 int retval;
2533
2534 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2535
2536 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2537 * We actually register a generic callback for this command as we don't
2538 * really care about the reply. We check if it worked indirectly observing
2539 * if INFO returns a different role (master instead of slave). */
2540 retval = redisAsyncCommand(ri->promoted_slave->cc,
2541 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2542 if (retval != REDIS_OK) return;
2543 ri->promoted_slave->pending_commands++;
2544 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2545 ri->promoted_slave,"%@");
2546 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2547 ri->failover_state_change_time = mstime();
2548}
2549
2550/* We actually wait for promotion indirectly checking with INFO when the
2551 * slave turns into a master. */
2552void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2553 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2554
2555 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2556 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2557 "%@");
2558 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2559 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2560 ri->failover_state_change_time = mstime();
2561 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2562 ri->promoted_slave = NULL;
2563 }
2564}
2565
2566void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2567 int not_reconfigured = 0, timeout = 0;
2568 dictIterator *di;
2569 dictEntry *de;
2570 mstime_t elapsed = mstime() - master->failover_state_change_time;
2571
2572 /* We can't consider failover finished if the promoted slave is
2573 * not reachable. */
2574 if (master->promoted_slave == NULL ||
2575 master->promoted_slave->flags & SRI_S_DOWN) return;
2576
2577 /* The failover terminates once all the reachable slaves are properly
2578 * configured. */
2579 di = dictGetIterator(master->slaves);
2580 while((de = dictNext(di)) != NULL) {
2581 sentinelRedisInstance *slave = dictGetVal(de);
2582
2583 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2584 if (slave->flags & SRI_S_DOWN) continue;
2585 not_reconfigured++;
2586 }
2587 dictReleaseIterator(di);
2588
2589 /* Force end of failover on timeout. */
2590 if (elapsed > master->failover_timeout) {
2591 not_reconfigured = 0;
2592 timeout = 1;
2593 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2594 }
2595
2596 if (not_reconfigured == 0) {
2597 int role = (master->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
2598 SENTINEL_OBSERVER;
2599
2600 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2601 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2602 master->failover_state_change_time = mstime();
2603 sentinelCallClientReconfScript(master,role,"end",master->addr,
2604 master->promoted_slave->addr);
2605 }
2606
2607 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2608 * command to all the slaves still not reconfigured to replicate with
2609 * the new master. */
2610 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2611 dictIterator *di;
2612 dictEntry *de;
2613 char master_port[32];
2614
2615 ll2string(master_port,sizeof(master_port),
2616 master->promoted_slave->addr->port);
2617
2618 di = dictGetIterator(master->slaves);
2619 while((de = dictNext(di)) != NULL) {
2620 sentinelRedisInstance *slave = dictGetVal(de);
2621 int retval;
2622
2623 if (slave->flags &
2624 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2625
2626 retval = redisAsyncCommand(slave->cc,
2627 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2628 master->promoted_slave->addr->ip,
2629 master_port);
2630 if (retval == REDIS_OK) {
2631 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2632 slave->flags |= SRI_RECONF_SENT;
2633 }
2634 }
2635 dictReleaseIterator(di);
2636 }
2637}
2638
2639/* Send SLAVE OF <new master address> to all the remaining slaves that
2640 * still don't appear to have the configuration updated. */
2641void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2642 dictIterator *di;
2643 dictEntry *de;
2644 int in_progress = 0;
2645
2646 di = dictGetIterator(master->slaves);
2647 while((de = dictNext(di)) != NULL) {
2648 sentinelRedisInstance *slave = dictGetVal(de);
2649
2650 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2651 in_progress++;
2652 }
2653 dictReleaseIterator(di);
2654
2655 di = dictGetIterator(master->slaves);
2656 while(in_progress < master->parallel_syncs &&
2657 (de = dictNext(di)) != NULL)
2658 {
2659 sentinelRedisInstance *slave = dictGetVal(de);
2660 int retval;
2661 char master_port[32];
2662
2663 /* Skip the promoted slave, and already configured slaves. */
2664 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2665
2666 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2667 * the slave moving forward to the next state. */
2668 if ((slave->flags & SRI_RECONF_SENT) &&
2669 (mstime() - slave->slave_reconf_sent_time) >
2670 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2671 {
2672 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2673 slave->flags &= ~SRI_RECONF_SENT;
2674 }
2675
2676 /* Nothing to do for instances that are disconnected or already
2677 * in RECONF_SENT state. */
2678 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2679 continue;
2680
2681 /* Send SLAVEOF <new master>. */
2682 ll2string(master_port,sizeof(master_port),
2683 master->promoted_slave->addr->port);
2684 retval = redisAsyncCommand(slave->cc,
2685 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2686 master->promoted_slave->addr->ip,
2687 master_port);
2688 if (retval == REDIS_OK) {
2689 slave->flags |= SRI_RECONF_SENT;
2690 slave->pending_commands++;
2691 slave->slave_reconf_sent_time = mstime();
2692 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2693 in_progress++;
2694 }
2695 }
2696 dictReleaseIterator(di);
2697 sentinelFailoverDetectEnd(master);
2698}
2699
2700/* This function is called when the slave is in
2701 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2702 * to remove it from the master table and add the promoted slave instead.
2703 *
2704 * If there are no promoted slaves as this instance is unique, we remove
2705 * and re-add it with the same address to trigger a complete state
2706 * refresh. */
2707void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2708 sentinelRedisInstance *ref = master->promoted_slave ?
2709 master->promoted_slave : master;
2710
2711 sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
2712 master->name, master->addr->ip, master->addr->port,
2713 ref->addr->ip, ref->addr->port);
2714
2715 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
2716}
2717
2718void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2719 redisAssert(ri->flags & SRI_MASTER);
2720
2721 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2722
2723 switch(ri->failover_state) {
2724 case SENTINEL_FAILOVER_STATE_WAIT_START:
2725 sentinelFailoverWaitStart(ri);
2726 break;
2727 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2728 sentinelFailoverSelectSlave(ri);
2729 break;
2730 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2731 sentinelFailoverSendSlaveOfNoOne(ri);
2732 break;
2733 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2734 sentinelFailoverWaitPromotion(ri);
2735 break;
2736 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2737 sentinelFailoverReconfNextSlave(ri);
2738 break;
2739 case SENTINEL_FAILOVER_STATE_DETECT_END:
2740 sentinelFailoverDetectEnd(ri);
2741 break;
2742 }
2743}
2744
2745/* Abort a failover in progress with the following steps:
2746 * 1) If this instance is the leaer send a SLAVEOF command to all the already
2747 * reconfigured slaves if any to configure them to replicate with the
2748 * original master.
2749 * 2) For both leaders and observers: clear the failover flags and state in
2750 * the master instance.
2751 * 3) If there is already a promoted slave and we are the leader, and this
2752 * slave is not DISCONNECTED, try to reconfigure it to replicate
2753 * back to the master as well, sending a best effort SLAVEOF command.
2754 */
2755void sentinelAbortFailover(sentinelRedisInstance *ri) {
2756 char master_port[32];
2757 dictIterator *di;
2758 dictEntry *de;
2759 int sentinel_role;
2760
2761 redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
2762 ll2string(master_port,sizeof(master_port),ri->addr->port);
2763
2764 /* Clear failover related flags from slaves.
2765 * Also if we are the leader make sure to send SLAVEOF commands to all the
2766 * already reconfigured slaves in order to turn them back into slaves of
2767 * the original master. */
2768 di = dictGetIterator(ri->slaves);
2769 while((de = dictNext(di)) != NULL) {
2770 sentinelRedisInstance *slave = dictGetVal(de);
2771 if ((ri->flags & SRI_I_AM_THE_LEADER) &&
2772 !(slave->flags & SRI_DISCONNECTED) &&
2773 (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
2774 SRI_RECONF_DONE)))
2775 {
2776 int retval;
2777
2778 retval = redisAsyncCommand(slave->cc,
2779 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2780 ri->addr->ip,
2781 master_port);
2782 if (retval == REDIS_OK)
2783 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2784 }
2785 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2786 }
2787 dictReleaseIterator(di);
2788
2789 sentinel_role = (ri->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
2790 SENTINEL_OBSERVER;
2791 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER|SRI_FORCE_FAILOVER);
2792 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2793 ri->failover_state_change_time = mstime();
2794 if (ri->promoted_slave) {
2795 sentinelCallClientReconfScript(ri,sentinel_role,"abort",
2796 ri->promoted_slave->addr,ri->addr);
2797 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2798 ri->promoted_slave = NULL;
2799 }
2800}
2801
2802/* The following is called only for master instances and will abort the
2803 * failover process if:
2804 *
2805 * 1) The failover is in progress.
2806 * 2) We already promoted a slave.
2807 * 3) The promoted slave is in extended SDOWN condition.
2808 */
2809void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2810 /* Failover is in progress? Do we have a promoted slave? */
2811 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2812
2813 /* Is the promoted slave into an extended SDOWN state? */
2814 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2815 (mstime() - ri->promoted_slave->s_down_since_time) <
2816 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2817
2818 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2819 sentinelAbortFailover(ri);
2820}
2821
2822/* ======================== SENTINEL timer handler ==========================
2823 * This is the "main" our Sentinel, being sentinel completely non blocking
2824 * in design. The function is called every second.
2825 * -------------------------------------------------------------------------- */
2826
2827/* Perform scheduled operations for the specified Redis instance. */
2828void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2829 /* ========== MONITORING HALF ============ */
2830 /* Every kind of instance */
2831 sentinelReconnectInstance(ri);
2832 sentinelPingInstance(ri);
2833
2834 /* Masters and slaves */
2835 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2836 /* Nothing so far. */
2837 }
2838
2839 /* Only masters */
2840 if (ri->flags & SRI_MASTER) {
2841 sentinelAskMasterStateToOtherSentinels(ri);
2842 }
2843
2844 /* ============== ACTING HALF ============= */
2845 /* We don't proceed with the acting half if we are in TILT mode.
2846 * TILT happens when we find something odd with the time, like a
2847 * sudden change in the clock. */
2848 if (sentinel.tilt) {
2849 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2850 sentinel.tilt = 0;
2851 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2852 }
2853
2854 /* Every kind of instance */
2855 sentinelCheckSubjectivelyDown(ri);
2856
2857 /* Masters and slaves */
2858 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2859 /* Nothing so far. */
2860 }
2861
2862 /* Only masters */
2863 if (ri->flags & SRI_MASTER) {
2864 sentinelCheckObjectivelyDown(ri);
2865 sentinelStartFailoverIfNeeded(ri);
2866 sentinelFailoverStateMachine(ri);
2867 sentinelAbortFailoverIfNeeded(ri);
2868 }
2869}
2870
2871/* Perform scheduled operations for all the instances in the dictionary.
2872 * Recursively call the function against dictionaries of slaves. */
2873void sentinelHandleDictOfRedisInstances(dict *instances) {
2874 dictIterator *di;
2875 dictEntry *de;
2876 sentinelRedisInstance *switch_to_promoted = NULL;
2877
2878 /* There are a number of things we need to perform against every master. */
2879 di = dictGetIterator(instances);
2880 while((de = dictNext(di)) != NULL) {
2881 sentinelRedisInstance *ri = dictGetVal(de);
2882
2883 sentinelHandleRedisInstance(ri);
2884 if (ri->flags & SRI_MASTER) {
2885 sentinelHandleDictOfRedisInstances(ri->slaves);
2886 sentinelHandleDictOfRedisInstances(ri->sentinels);
2887 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
2888 switch_to_promoted = ri;
2889 }
2890 }
2891 }
2892 if (switch_to_promoted)
2893 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
2894 dictReleaseIterator(di);
2895}
2896
2897/* This function checks if we need to enter the TITL mode.
2898 *
2899 * The TILT mode is entered if we detect that between two invocations of the
2900 * timer interrupt, a negative amount of time, or too much time has passed.
2901 * Note that we expect that more or less just 100 milliseconds will pass
2902 * if everything is fine. However we'll see a negative number or a
2903 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
2904 * following conditions happen:
2905 *
2906 * 1) The Sentiel process for some time is blocked, for every kind of
2907 * random reason: the load is huge, the computer was freezed for some time
2908 * in I/O or alike, the process was stopped by a signal. Everything.
2909 * 2) The system clock was altered significantly.
2910 *
2911 * Under both this conditions we'll see everything as timed out and failing
2912 * without good reasons. Instead we enter the TILT mode and wait
2913 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
2914 *
2915 * During TILT time we still collect information, we just do not act. */
2916void sentinelCheckTiltCondition(void) {
2917 mstime_t now = mstime();
2918 mstime_t delta = now - sentinel.previous_time;
2919
2920 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
2921 sentinel.tilt = 1;
2922 sentinel.tilt_start_time = mstime();
2923 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
2924 }
2925 sentinel.previous_time = mstime();
2926}
2927
2928void sentinelTimer(void) {
2929 sentinelCheckTiltCondition();
2930 sentinelHandleDictOfRedisInstances(sentinel.masters);
2931 sentinelRunPendingScripts();
2932 sentinelCollectTerminatedScripts();
2933 sentinelKillTimedoutScripts();
2934}
2935