]> git.saurik.com Git - redis.git/blob - src/sentinel.c
Sentinel: abort failover when in wait-start if master is back.
[redis.git] / src / sentinel.c
1 /* Redis Sentinel implementation
2 * -----------------------------
3 *
4 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include "redis.h"
33 #include "hiredis.h"
34 #include "async.h"
35
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <sys/socket.h>
39 #include <sys/wait.h>
40
41 extern char **environ;
42
43 #define REDIS_SENTINEL_PORT 26379
44
45 /* ======================== Sentinel global state =========================== */
46
47 typedef long long mstime_t; /* millisecond time type. */
48
49 /* Address object, used to describe an ip:port pair. */
50 typedef struct sentinelAddr {
51 char *ip;
52 int port;
53 } sentinelAddr;
54
55 /* A Sentinel Redis Instance object is monitoring. */
56 #define SRI_MASTER (1<<0)
57 #define SRI_SLAVE (1<<1)
58 #define SRI_SENTINEL (1<<2)
59 #define SRI_DISCONNECTED (1<<3)
60 #define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
61 #define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
62 #define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
63 its master is down. */
64 /* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
65 * allowed to perform the failover for this master.
66 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
67 * perform the failover on its master. */
68 #define SRI_CAN_FAILOVER (1<<7)
69 #define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
70 this master. */
71 #define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
72 #define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
73 #define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
74 #define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
75 #define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
76
77 #define SENTINEL_INFO_PERIOD 10000
78 #define SENTINEL_PING_PERIOD 1000
79 #define SENTINEL_ASK_PERIOD 1000
80 #define SENTINEL_PUBLISH_PERIOD 5000
81 #define SENTINEL_DOWN_AFTER_PERIOD 30000
82 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
83 #define SENTINEL_TILT_TRIGGER 2000
84 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
85 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
86 #define SENTINEL_PROMOTION_RETRY_PERIOD 30000
87 #define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
88 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
89 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
90 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
91 #define SENTINEL_MAX_PENDING_COMMANDS 100
92 #define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
93
94 /* How many milliseconds is an information valid? This applies for instance
95 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
96 #define SENTINEL_INFO_VALIDITY_TIME 5000
97 #define SENTINEL_FAILOVER_FIXED_DELAY 5000
98 #define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
99
100 /* Failover machine different states. */
101 #define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
102 #define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
103 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
104 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
105 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
106 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
107 #define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
108 #define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
109 #define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
110 #define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
111 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
112
113 #define SENTINEL_MASTER_LINK_STATUS_UP 0
114 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
115
116 /* Generic flags that can be used with different functions. */
117 #define SENTINEL_NO_FLAGS 0
118 #define SENTINEL_GENERATE_EVENT 1
119
120 /* Script execution flags and limits. */
121 #define SENTINEL_SCRIPT_NONE 0
122 #define SENTINEL_SCRIPT_RUNNING 1
123 #define SENTINEL_SCRIPT_MAX_QUEUE 256
124 #define SENTINEL_SCRIPT_MAX_RUNNING 16
125 #define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
126 #define SENTINEL_SCRIPT_MAX_RETRY 10
127 #define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
128
129 typedef struct sentinelRedisInstance {
130 int flags; /* See SRI_... defines */
131 char *name; /* Master name from the point of view of this sentinel. */
132 char *runid; /* run ID of this instance. */
133 sentinelAddr *addr; /* Master host. */
134 redisAsyncContext *cc; /* Hiredis context for commands. */
135 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
136 int pending_commands; /* Number of commands sent waiting for a reply. */
137 mstime_t cc_conn_time; /* cc connection time. */
138 mstime_t pc_conn_time; /* pc connection time. */
139 mstime_t pc_last_activity; /* Last time we received any message. */
140 mstime_t last_avail_time; /* Last time the instance replied to ping with
141 a reply we consider valid. */
142 mstime_t last_pong_time; /* Last time the instance replied to ping,
143 whatever the reply was. That's used to check
144 if the link is idle and must be reconnected. */
145 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
146 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
147 we received an hello from this Sentinel
148 via Pub/Sub. */
149 mstime_t last_master_down_reply_time; /* Time of last reply to
150 SENTINEL is-master-down command. */
151 mstime_t s_down_since_time; /* Subjectively down since time. */
152 mstime_t o_down_since_time; /* Objectively down since time. */
153 mstime_t down_after_period; /* Consider it down after that period. */
154 mstime_t info_refresh; /* Time at which we received INFO output from it. */
155
156 /* Master specific. */
157 dict *sentinels; /* Other sentinels monitoring the same master. */
158 dict *slaves; /* Slaves for this master instance. */
159 int quorum; /* Number of sentinels that need to agree on failure. */
160 int parallel_syncs; /* How many slaves to reconfigure at same time. */
161
162 /* Slave specific. */
163 mstime_t master_link_down_time; /* Slave replication link down time. */
164 int slave_priority; /* Slave priority according to its INFO output. */
165 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
166 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
167 char *slave_master_host; /* Master host as reported by INFO */
168 int slave_master_port; /* Master port as reported by INFO */
169 int slave_master_link_status; /* Master link status as reported by INFO */
170 /* Failover */
171 char *leader; /* If this is a master instance, this is the runid of
172 the Sentinel that should perform the failover. If
173 this is a Sentinel, this is the runid of the Sentinel
174 that this other Sentinel is voting as leader.
175 This field is valid only if SRI_MASTER_DOWN is
176 set on the Sentinel instance. */
177 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
178 mstime_t failover_state_change_time;
179 mstime_t failover_start_time; /* When to start to failover if leader. */
180 mstime_t failover_timeout; /* Max time to refresh failover state. */
181 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
182 /* Scripts executed to notify admin or reconfigure clients: when they
183 * are set to NULL no script is executed. */
184 char *notification_script;
185 char *client_reconfig_script;
186 } sentinelRedisInstance;
187
188 /* Main state. */
189 struct sentinelState {
190 dict *masters; /* Dictionary of master sentinelRedisInstances.
191 Key is the instance name, value is the
192 sentinelRedisInstance structure pointer. */
193 int tilt; /* Are we in TILT mode? */
194 int running_scripts; /* Number of scripts in execution right now. */
195 mstime_t tilt_start_time; /* When TITL started. */
196 mstime_t previous_time; /* Time last time we ran the time handler. */
197 list *scripts_queue; /* Queue of user scripts to execute. */
198 } sentinel;
199
200 /* A script execution job. */
201 typedef struct sentinelScriptJob {
202 int flags; /* Script job flags: SENTINEL_SCRIPT_* */
203 int retry_num; /* Number of times we tried to execute it. */
204 char **argv; /* Arguments to call the script. */
205 mstime_t start_time; /* Script execution time if the script is running,
206 otherwise 0 if we are allowed to retry the
207 execution at any time. If the script is not
208 running and it's not 0, it means: do not run
209 before the specified time. */
210 pid_t pid; /* Script execution pid. */
211 } sentinelScriptJob;
212
213 /* ======================= hiredis ae.c adapters =============================
214 * Note: this implementation is taken from hiredis/adapters/ae.h, however
215 * we have our modified copy for Sentinel in order to use our allocator
216 * and to have full control over how the adapter works. */
217
218 typedef struct redisAeEvents {
219 redisAsyncContext *context;
220 aeEventLoop *loop;
221 int fd;
222 int reading, writing;
223 } redisAeEvents;
224
225 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
226 ((void)el); ((void)fd); ((void)mask);
227
228 redisAeEvents *e = (redisAeEvents*)privdata;
229 redisAsyncHandleRead(e->context);
230 }
231
232 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
233 ((void)el); ((void)fd); ((void)mask);
234
235 redisAeEvents *e = (redisAeEvents*)privdata;
236 redisAsyncHandleWrite(e->context);
237 }
238
239 static void redisAeAddRead(void *privdata) {
240 redisAeEvents *e = (redisAeEvents*)privdata;
241 aeEventLoop *loop = e->loop;
242 if (!e->reading) {
243 e->reading = 1;
244 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
245 }
246 }
247
248 static void redisAeDelRead(void *privdata) {
249 redisAeEvents *e = (redisAeEvents*)privdata;
250 aeEventLoop *loop = e->loop;
251 if (e->reading) {
252 e->reading = 0;
253 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
254 }
255 }
256
257 static void redisAeAddWrite(void *privdata) {
258 redisAeEvents *e = (redisAeEvents*)privdata;
259 aeEventLoop *loop = e->loop;
260 if (!e->writing) {
261 e->writing = 1;
262 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
263 }
264 }
265
266 static void redisAeDelWrite(void *privdata) {
267 redisAeEvents *e = (redisAeEvents*)privdata;
268 aeEventLoop *loop = e->loop;
269 if (e->writing) {
270 e->writing = 0;
271 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
272 }
273 }
274
275 static void redisAeCleanup(void *privdata) {
276 redisAeEvents *e = (redisAeEvents*)privdata;
277 redisAeDelRead(privdata);
278 redisAeDelWrite(privdata);
279 zfree(e);
280 }
281
282 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
283 redisContext *c = &(ac->c);
284 redisAeEvents *e;
285
286 /* Nothing should be attached when something is already attached */
287 if (ac->ev.data != NULL)
288 return REDIS_ERR;
289
290 /* Create container for context and r/w events */
291 e = (redisAeEvents*)zmalloc(sizeof(*e));
292 e->context = ac;
293 e->loop = loop;
294 e->fd = c->fd;
295 e->reading = e->writing = 0;
296
297 /* Register functions to start/stop listening for events */
298 ac->ev.addRead = redisAeAddRead;
299 ac->ev.delRead = redisAeDelRead;
300 ac->ev.addWrite = redisAeAddWrite;
301 ac->ev.delWrite = redisAeDelWrite;
302 ac->ev.cleanup = redisAeCleanup;
303 ac->ev.data = e;
304
305 return REDIS_OK;
306 }
307
308 /* ============================= Prototypes ================================= */
309
310 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
311 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
312 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
313 sentinelRedisInstance *sentinelGetMasterByName(char *name);
314 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
315 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
316 int yesnotoi(char *s);
317 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
318 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
319 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
320 void sentinelAbortFailover(sentinelRedisInstance *ri);
321 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
322 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
323 void sentinelScheduleScriptExecution(char *path, ...);
324
325 /* ========================= Dictionary types =============================== */
326
327 unsigned int dictSdsHash(const void *key);
328 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
329 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
330
331 void dictInstancesValDestructor (void *privdata, void *obj) {
332 releaseSentinelRedisInstance(obj);
333 }
334
335 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
336 *
337 * also used for: sentinelRedisInstance->sentinels dictionary that maps
338 * sentinels ip:port to last seen time in Pub/Sub hello message. */
339 dictType instancesDictType = {
340 dictSdsHash, /* hash function */
341 NULL, /* key dup */
342 NULL, /* val dup */
343 dictSdsKeyCompare, /* key compare */
344 NULL, /* key destructor */
345 dictInstancesValDestructor /* val destructor */
346 };
347
348 /* Instance runid (sds) -> votes (long casted to void*)
349 *
350 * This is useful into sentinelGetObjectiveLeader() function in order to
351 * count the votes and understand who is the leader. */
352 dictType leaderVotesDictType = {
353 dictSdsHash, /* hash function */
354 NULL, /* key dup */
355 NULL, /* val dup */
356 dictSdsKeyCompare, /* key compare */
357 NULL, /* key destructor */
358 NULL /* val destructor */
359 };
360
361 /* =========================== Initialization =============================== */
362
363 void sentinelCommand(redisClient *c);
364
365 struct redisCommand sentinelcmds[] = {
366 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
367 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
368 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
369 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
370 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
371 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}
372 };
373
374 /* This function overwrites a few normal Redis config default with Sentinel
375 * specific defaults. */
376 void initSentinelConfig(void) {
377 server.port = REDIS_SENTINEL_PORT;
378 }
379
380 /* Perform the Sentinel mode initialization. */
381 void initSentinel(void) {
382 int j;
383
384 /* Remove usual Redis commands from the command table, then just add
385 * the SENTINEL command. */
386 dictEmpty(server.commands);
387 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
388 int retval;
389 struct redisCommand *cmd = sentinelcmds+j;
390
391 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
392 redisAssert(retval == DICT_OK);
393 }
394
395 /* Initialize various data structures. */
396 sentinel.masters = dictCreate(&instancesDictType,NULL);
397 sentinel.tilt = 0;
398 sentinel.tilt_start_time = mstime();
399 sentinel.previous_time = mstime();
400 sentinel.running_scripts = 0;
401 sentinel.scripts_queue = listCreate();
402 }
403
404 /* ============================== sentinelAddr ============================== */
405
406 /* Create a sentinelAddr object and return it on success.
407 * On error NULL is returned and errno is set to:
408 * ENOENT: Can't resolve the hostname.
409 * EINVAL: Invalid port number.
410 */
411 sentinelAddr *createSentinelAddr(char *hostname, int port) {
412 char buf[32];
413 sentinelAddr *sa;
414
415 if (port <= 0 || port > 65535) {
416 errno = EINVAL;
417 return NULL;
418 }
419 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
420 errno = ENOENT;
421 return NULL;
422 }
423 sa = zmalloc(sizeof(*sa));
424 sa->ip = sdsnew(buf);
425 sa->port = port;
426 return sa;
427 }
428
429 /* Free a Sentinel address. Can't fail. */
430 void releaseSentinelAddr(sentinelAddr *sa) {
431 sdsfree(sa->ip);
432 zfree(sa);
433 }
434
435 /* =========================== Events notification ========================== */
436
437 /* Send an event to log, pub/sub, user notification script.
438 *
439 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
440 * the execution of the user notification script.
441 *
442 * 'type' is the message type, also used as a pub/sub channel name.
443 *
444 * 'ri', is the redis instance target of this event if applicable, and is
445 * used to obtain the path of the notification script to execute.
446 *
447 * The remaining arguments are printf-alike.
448 * If the format specifier starts with the two characters "%@" then ri is
449 * not NULL, and the message is prefixed with an instance identifier in the
450 * following format:
451 *
452 * <instance type> <instance name> <ip> <port>
453 *
454 * If the instance type is not master, than the additional string is
455 * added to specify the originating master:
456 *
457 * @ <master name> <master ip> <master port>
458 *
459 * Any other specifier after "%@" is processed by printf itself.
460 */
461 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
462 const char *fmt, ...) {
463 va_list ap;
464 char msg[REDIS_MAX_LOGMSG_LEN];
465 robj *channel, *payload;
466
467 /* Handle %@ */
468 if (fmt[0] == '%' && fmt[1] == '@') {
469 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
470 NULL : ri->master;
471
472 if (master) {
473 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
474 sentinelRedisInstanceTypeStr(ri),
475 ri->name, ri->addr->ip, ri->addr->port,
476 master->name, master->addr->ip, master->addr->port);
477 } else {
478 snprintf(msg, sizeof(msg), "%s %s %s %d",
479 sentinelRedisInstanceTypeStr(ri),
480 ri->name, ri->addr->ip, ri->addr->port);
481 }
482 fmt += 2;
483 } else {
484 msg[0] = '\0';
485 }
486
487 /* Use vsprintf for the rest of the formatting if any. */
488 if (fmt[0] != '\0') {
489 va_start(ap, fmt);
490 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
491 va_end(ap);
492 }
493
494 /* Log the message if the log level allows it to be logged. */
495 if (level >= server.verbosity)
496 redisLog(level,"%s %s",type,msg);
497
498 /* Publish the message via Pub/Sub if it's not a debugging one. */
499 if (level != REDIS_DEBUG) {
500 channel = createStringObject(type,strlen(type));
501 payload = createStringObject(msg,strlen(msg));
502 pubsubPublishMessage(channel,payload);
503 decrRefCount(channel);
504 decrRefCount(payload);
505 }
506
507 /* Call the notification script if applicable. */
508 if (level == REDIS_WARNING && ri != NULL) {
509 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
510 ri : ri->master;
511 if (master->notification_script) {
512 sentinelScheduleScriptExecution(master->notification_script,
513 type,msg,NULL);
514 }
515 }
516 }
517
518 /* ============================ script execution ============================ */
519
520 /* Release a script job structure and all the associated data. */
521 void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
522 int j = 0;
523
524 while(sj->argv[j]) sdsfree(sj->argv[j++]);
525 zfree(sj->argv);
526 zfree(sj);
527 }
528
529 #define SENTINEL_SCRIPT_MAX_ARGS 16
530 void sentinelScheduleScriptExecution(char *path, ...) {
531 va_list ap;
532 char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
533 int argc = 1;
534 sentinelScriptJob *sj;
535
536 va_start(ap, path);
537 while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
538 argv[argc] = va_arg(ap,char*);
539 if (!argv[argc]) break;
540 argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
541 argc++;
542 }
543 va_end(ap);
544 argv[0] = sdsnew(path);
545
546 sj = zmalloc(sizeof(*sj));
547 sj->flags = SENTINEL_SCRIPT_NONE;
548 sj->retry_num = 0;
549 sj->argv = zmalloc(sizeof(char*)*(argc+1));
550 sj->start_time = 0;
551 sj->pid = 0;
552 memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
553
554 listAddNodeTail(sentinel.scripts_queue,sj);
555
556 /* Remove the oldest non running script if we already hit the limit. */
557 if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
558 listNode *ln;
559 listIter li;
560
561 listRewind(sentinel.scripts_queue,&li);
562 while ((ln = listNext(&li)) != NULL) {
563 sj = ln->value;
564
565 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
566 /* The first node is the oldest as we add on tail. */
567 listDelNode(sentinel.scripts_queue,ln);
568 sentinelReleaseScriptJob(sj);
569 break;
570 }
571 redisAssert(listLength(sentinel.scripts_queue) <=
572 SENTINEL_SCRIPT_MAX_QUEUE);
573 }
574 }
575
576 /* Lookup a script in the scripts queue via pid, and returns the list node
577 * (so that we can easily remove it from the queue if needed). */
578 listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
579 listNode *ln;
580 listIter li;
581
582 listRewind(sentinel.scripts_queue,&li);
583 while ((ln = listNext(&li)) != NULL) {
584 sentinelScriptJob *sj = ln->value;
585
586 if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
587 return ln;
588 }
589 return NULL;
590 }
591
592 /* Run pending scripts if we are not already at max number of running
593 * scripts. */
594 void sentinelRunPendingScripts(void) {
595 listNode *ln;
596 listIter li;
597 mstime_t now = mstime();
598
599 /* Find jobs that are not running and run them, from the top to the
600 * tail of the queue, so we run older jobs first. */
601 listRewind(sentinel.scripts_queue,&li);
602 while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
603 (ln = listNext(&li)) != NULL)
604 {
605 sentinelScriptJob *sj = ln->value;
606 pid_t pid;
607
608 /* Skip if already running. */
609 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
610
611 /* Skip if it's a retry, but not enough time has elapsed. */
612 if (sj->start_time && sj->start_time > now) continue;
613
614 sj->flags |= SENTINEL_SCRIPT_RUNNING;
615 sj->start_time = mstime();
616 sj->retry_num++;
617 pid = fork();
618
619 if (pid == -1) {
620 /* Parent (fork error).
621 * We report fork errors as signal 99, in order to unify the
622 * reporting with other kind of errors. */
623 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
624 "%s %d %d", sj->argv[0], 99, 0);
625 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
626 sj->pid = 0;
627 } else if (pid == 0) {
628 /* Child */
629 execve(sj->argv[0],sj->argv,environ);
630 /* If we are here an error occurred. */
631 _exit(2); /* Don't retry execution. */
632 } else {
633 sentinel.running_scripts++;
634 sj->pid = pid;
635 sentinelEvent(REDIS_DEBUG,"+script-child",NULL,"%ld",(long)pid);
636 }
637 }
638 }
639
640 /* How much to delay the execution of a script that we need to retry after
641 * an error?
642 *
643 * We double the retry delay for every further retry we do. So for instance
644 * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
645 * starting from the second attempt to execute the script the delays are:
646 * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
647 mstime_t sentinelScriptRetryDelay(int retry_num) {
648 mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
649
650 while (retry_num-- > 1) delay *= 2;
651 return delay;
652 }
653
654 /* Check for scripts that terminated, and remove them from the queue if the
655 * script terminated successfully. If instead the script was terminated by
656 * a signal, or returned exit code "1", it is scheduled to run again if
657 * the max number of retries did not already elapsed. */
658 void sentinelCollectTerminatedScripts(void) {
659 int statloc;
660 pid_t pid;
661
662 while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
663 int exitcode = WEXITSTATUS(statloc);
664 int bysignal = 0;
665 listNode *ln;
666 sentinelScriptJob *sj;
667
668 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
669 sentinelEvent(REDIS_DEBUG,"-script-child",NULL,"%ld %d %d",
670 (long)pid, exitcode, bysignal);
671
672 ln = sentinelGetScriptListNodeByPid(pid);
673 if (ln == NULL) {
674 redisLog(REDIS_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
675 continue;
676 }
677 sj = ln->value;
678
679 /* If the script was terminated by a signal or returns an
680 * exit code of "1" (that means: please retry), we reschedule it
681 * if the max number of retries is not already reached. */
682 if ((bysignal || exitcode == 1) &&
683 sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
684 {
685 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
686 sj->pid = 0;
687 sj->start_time = mstime() +
688 sentinelScriptRetryDelay(sj->retry_num);
689 } else {
690 /* Otherwise let's remove the script, but log the event if the
691 * execution did not terminated in the best of the ways. */
692 if (bysignal || exitcode != 0) {
693 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
694 "%s %d %d", sj->argv[0], bysignal, exitcode);
695 }
696 listDelNode(sentinel.scripts_queue,ln);
697 sentinelReleaseScriptJob(sj);
698 sentinel.running_scripts--;
699 }
700 }
701 }
702
703 /* Kill scripts in timeout, they'll be collected by the
704 * sentinelCollectTerminatedScripts() function. */
705 void sentinelKillTimedoutScripts(void) {
706 listNode *ln;
707 listIter li;
708 mstime_t now = mstime();
709
710 listRewind(sentinel.scripts_queue,&li);
711 while ((ln = listNext(&li)) != NULL) {
712 sentinelScriptJob *sj = ln->value;
713
714 if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
715 (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
716 {
717 sentinelEvent(REDIS_WARNING,"-script-timeout",NULL,"%s %ld",
718 sj->argv[0], (long)sj->pid);
719 kill(sj->pid,SIGKILL);
720 }
721 }
722 }
723
724 /* Implements SENTINEL PENDING-SCRIPTS command. */
725 void sentinelPendingScriptsCommand(redisClient *c) {
726 listNode *ln;
727 listIter li;
728
729 addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
730 listRewind(sentinel.scripts_queue,&li);
731 while ((ln = listNext(&li)) != NULL) {
732 sentinelScriptJob *sj = ln->value;
733 int j = 0;
734
735 addReplyMultiBulkLen(c,10);
736
737 addReplyBulkCString(c,"argv");
738 while (sj->argv[j]) j++;
739 addReplyMultiBulkLen(c,j);
740 j = 0;
741 while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
742
743 addReplyBulkCString(c,"flags");
744 addReplyBulkCString(c,
745 (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
746
747 addReplyBulkCString(c,"pid");
748 addReplyBulkLongLong(c,sj->pid);
749
750 if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
751 addReplyBulkCString(c,"run-time");
752 addReplyBulkLongLong(c,mstime() - sj->start_time);
753 } else {
754 mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
755 if (delay < 0) delay = 0;
756 addReplyBulkCString(c,"run-delay");
757 addReplyBulkLongLong(c,delay);
758 }
759
760 addReplyBulkCString(c,"retry-num");
761 addReplyBulkLongLong(c,sj->retry_num);
762 }
763 }
764
765 /* ========================== sentinelRedisInstance ========================= */
766
767 /* Create a redis instance, the following fields must be populated by the
768 * caller if needed:
769 * runid: set to NULL but will be populated once INFO output is received.
770 * info_refresh: is set to 0 to mean that we never received INFO so far.
771 *
772 * If SRI_MASTER is set into initial flags the instance is added to
773 * sentinel.masters table.
774 *
775 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
776 * instance is added into master->slaves or master->sentinels table.
777 *
778 * If the instance is a slave or sentinel, the name parameter is ignored and
779 * is created automatically as hostname:port.
780 *
781 * The function fails if hostname can't be resolved or port is out of range.
782 * When this happens NULL is returned and errno is set accordingly to the
783 * createSentinelAddr() function.
784 *
785 * The function may also fail and return NULL with errno set to EBUSY if
786 * a master or slave with the same name already exists. */
787 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
788 sentinelRedisInstance *ri;
789 sentinelAddr *addr;
790 dict *table;
791 char slavename[128], *sdsname;
792
793 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
794 redisAssert((flags & SRI_MASTER) || master != NULL);
795
796 /* Check address validity. */
797 addr = createSentinelAddr(hostname,port);
798 if (addr == NULL) return NULL;
799
800 /* For slaves and sentinel we use ip:port as name. */
801 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
802 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
803 name = slavename;
804 }
805
806 /* Make sure the entry is not duplicated. This may happen when the same
807 * name for a master is used multiple times inside the configuration or
808 * if we try to add multiple times a slave or sentinel with same ip/port
809 * to a master. */
810 if (flags & SRI_MASTER) table = sentinel.masters;
811 else if (flags & SRI_SLAVE) table = master->slaves;
812 else if (flags & SRI_SENTINEL) table = master->sentinels;
813 sdsname = sdsnew(name);
814 if (dictFind(table,sdsname)) {
815 sdsfree(sdsname);
816 errno = EBUSY;
817 return NULL;
818 }
819
820 /* Create the instance object. */
821 ri = zmalloc(sizeof(*ri));
822 /* Note that all the instances are started in the disconnected state,
823 * the event loop will take care of connecting them. */
824 ri->flags = flags | SRI_DISCONNECTED;
825 ri->name = sdsname;
826 ri->runid = NULL;
827 ri->addr = addr;
828 ri->cc = NULL;
829 ri->pc = NULL;
830 ri->pending_commands = 0;
831 ri->cc_conn_time = 0;
832 ri->pc_conn_time = 0;
833 ri->pc_last_activity = 0;
834 ri->last_avail_time = mstime();
835 ri->last_pong_time = mstime();
836 ri->last_pub_time = mstime();
837 ri->last_hello_time = mstime();
838 ri->last_master_down_reply_time = mstime();
839 ri->s_down_since_time = 0;
840 ri->o_down_since_time = 0;
841 ri->down_after_period = master ? master->down_after_period :
842 SENTINEL_DOWN_AFTER_PERIOD;
843 ri->master_link_down_time = 0;
844 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
845 ri->slave_reconf_sent_time = 0;
846 ri->slave_master_host = NULL;
847 ri->slave_master_port = 0;
848 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
849 ri->sentinels = dictCreate(&instancesDictType,NULL);
850 ri->quorum = quorum;
851 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
852 ri->master = master;
853 ri->slaves = dictCreate(&instancesDictType,NULL);
854 ri->info_refresh = 0;
855
856 /* Failover state. */
857 ri->leader = NULL;
858 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
859 ri->failover_state_change_time = 0;
860 ri->failover_start_time = 0;
861 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
862 ri->promoted_slave = NULL;
863 ri->notification_script = NULL;
864 ri->client_reconfig_script = NULL;
865
866 /* Add into the right table. */
867 dictAdd(table, ri->name, ri);
868 return ri;
869 }
870
871 /* Release this instance and all its slaves, sentinels, hiredis connections.
872 * This function also takes care of unlinking the instance from the main
873 * masters table (if it is a master) or from its master sentinels/slaves table
874 * if it is a slave or sentinel. */
875 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
876 /* Release all its slaves or sentinels if any. */
877 dictRelease(ri->sentinels);
878 dictRelease(ri->slaves);
879
880 /* Release hiredis connections. */
881 if (ri->cc) sentinelKillLink(ri,ri->cc);
882 if (ri->pc) sentinelKillLink(ri,ri->pc);
883
884 /* Free other resources. */
885 sdsfree(ri->name);
886 sdsfree(ri->runid);
887 sdsfree(ri->notification_script);
888 sdsfree(ri->client_reconfig_script);
889 sdsfree(ri->slave_master_host);
890 sdsfree(ri->leader);
891 releaseSentinelAddr(ri->addr);
892
893 /* Clear state into the master if needed. */
894 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
895 ri->master->promoted_slave = NULL;
896
897 zfree(ri);
898 }
899
900 /* Lookup a slave in a master Redis instance, by ip and port. */
901 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
902 sentinelRedisInstance *ri, char *ip, int port)
903 {
904 sds key;
905 sentinelRedisInstance *slave;
906
907 redisAssert(ri->flags & SRI_MASTER);
908 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
909 slave = dictFetchValue(ri->slaves,key);
910 sdsfree(key);
911 return slave;
912 }
913
914 /* Return the name of the type of the instance as a string. */
915 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
916 if (ri->flags & SRI_MASTER) return "master";
917 else if (ri->flags & SRI_SLAVE) return "slave";
918 else if (ri->flags & SRI_SENTINEL) return "sentinel";
919 else return "unknown";
920 }
921
922 /* This function removes all the instances found in the dictionary of instances
923 * 'd', having either:
924 *
925 * 1) The same ip/port as specified.
926 * 2) The same runid.
927 *
928 * "1" and "2" don't need to verify at the same time, just one is enough.
929 * If "runid" is NULL it is not checked.
930 * Similarly if "ip" is NULL it is not checked.
931 *
932 * This function is useful because every time we add a new Sentinel into
933 * a master's Sentinels dictionary, we want to be very sure about not
934 * having duplicated instances for any reason. This is so important because
935 * we use those other sentinels in order to run our quorum protocol to
936 * understand if it's time to proceeed with the fail over.
937 *
938 * Making sure no duplication is possible we greately improve the robustness
939 * of the quorum (otherwise we may end counting the same instance multiple
940 * times for some reason).
941 *
942 * The function returns the number of Sentinels removed. */
943 int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
944 dictIterator *di;
945 dictEntry *de;
946 int removed = 0;
947
948 di = dictGetSafeIterator(master->sentinels);
949 while((de = dictNext(di)) != NULL) {
950 sentinelRedisInstance *ri = dictGetVal(de);
951
952 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
953 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
954 {
955 dictDelete(master->sentinels,ri->name);
956 removed++;
957 }
958 }
959 dictReleaseIterator(di);
960 return removed;
961 }
962
963 /* Search an instance with the same runid, ip and port into a dictionary
964 * of instances. Return NULL if not found, otherwise return the instance
965 * pointer.
966 *
967 * runid or ip can be NULL. In such a case the search is performed only
968 * by the non-NULL field. */
969 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
970 dictIterator *di;
971 dictEntry *de;
972 sentinelRedisInstance *instance = NULL;
973
974 redisAssert(ip || runid); /* User must pass at least one search param. */
975 di = dictGetIterator(instances);
976 while((de = dictNext(di)) != NULL) {
977 sentinelRedisInstance *ri = dictGetVal(de);
978
979 if (runid && !ri->runid) continue;
980 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
981 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
982 ri->addr->port == port)))
983 {
984 instance = ri;
985 break;
986 }
987 }
988 dictReleaseIterator(di);
989 return instance;
990 }
991
992 /* Simple master lookup by name */
993 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
994 sentinelRedisInstance *ri;
995 sds sdsname = sdsnew(name);
996
997 ri = dictFetchValue(sentinel.masters,sdsname);
998 sdsfree(sdsname);
999 return ri;
1000 }
1001
1002 /* Add the specified flags to all the instances in the specified dictionary. */
1003 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
1004 dictIterator *di;
1005 dictEntry *de;
1006
1007 di = dictGetIterator(instances);
1008 while((de = dictNext(di)) != NULL) {
1009 sentinelRedisInstance *ri = dictGetVal(de);
1010 ri->flags |= flags;
1011 }
1012 dictReleaseIterator(di);
1013 }
1014
1015 /* Remove the specified flags to all the instances in the specified
1016 * dictionary. */
1017 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
1018 dictIterator *di;
1019 dictEntry *de;
1020
1021 di = dictGetIterator(instances);
1022 while((de = dictNext(di)) != NULL) {
1023 sentinelRedisInstance *ri = dictGetVal(de);
1024 ri->flags &= ~flags;
1025 }
1026 dictReleaseIterator(di);
1027 }
1028
1029 /* Reset the state of a monitored master:
1030 * 1) Remove all slaves.
1031 * 2) Remove all sentinels.
1032 * 3) Remove most of the flags resulting from runtime operations.
1033 * 4) Reset timers to their default value.
1034 * 5) In the process of doing this undo the failover if in progress.
1035 * 6) Disconnect the connections with the master (will reconnect automatically).
1036 */
1037 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
1038 redisAssert(ri->flags & SRI_MASTER);
1039 dictRelease(ri->slaves);
1040 dictRelease(ri->sentinels);
1041 ri->slaves = dictCreate(&instancesDictType,NULL);
1042 ri->sentinels = dictCreate(&instancesDictType,NULL);
1043 if (ri->cc) sentinelKillLink(ri,ri->cc);
1044 if (ri->pc) sentinelKillLink(ri,ri->pc);
1045 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
1046 if (ri->leader) {
1047 sdsfree(ri->leader);
1048 ri->leader = NULL;
1049 }
1050 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1051 ri->failover_state_change_time = 0;
1052 ri->failover_start_time = 0;
1053 ri->promoted_slave = NULL;
1054 sdsfree(ri->runid);
1055 sdsfree(ri->slave_master_host);
1056 ri->runid = NULL;
1057 ri->slave_master_host = NULL;
1058 ri->last_avail_time = mstime();
1059 ri->last_pong_time = mstime();
1060 if (flags & SENTINEL_GENERATE_EVENT)
1061 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
1062 }
1063
1064 /* Call sentinelResetMaster() on every master with a name matching the specified
1065 * pattern. */
1066 int sentinelResetMastersByPattern(char *pattern, int flags) {
1067 dictIterator *di;
1068 dictEntry *de;
1069 int reset = 0;
1070
1071 di = dictGetIterator(sentinel.masters);
1072 while((de = dictNext(di)) != NULL) {
1073 sentinelRedisInstance *ri = dictGetVal(de);
1074
1075 if (ri->name) {
1076 if (stringmatch(pattern,ri->name,0)) {
1077 sentinelResetMaster(ri,flags);
1078 reset++;
1079 }
1080 }
1081 }
1082 dictReleaseIterator(di);
1083 return reset;
1084 }
1085
1086 /* Reset the specified master with sentinelResetMaster(), and also change
1087 * the ip:port address, but take the name of the instance unmodified.
1088 *
1089 * This is used to handle the +switch-master and +redirect-to-master events.
1090 *
1091 * The function returns REDIS_ERR if the address can't be resolved for some
1092 * reason. Otherwise REDIS_OK is returned.
1093 *
1094 * TODO: make this reset so that original sentinels are re-added with
1095 * same ip / port / runid.
1096 */
1097
1098 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
1099 sentinelAddr *oldaddr, *newaddr;
1100
1101 newaddr = createSentinelAddr(ip,port);
1102 if (newaddr == NULL) return REDIS_ERR;
1103 sentinelResetMaster(master,SENTINEL_NO_FLAGS);
1104 oldaddr = master->addr;
1105 master->addr = newaddr;
1106 /* Release the old address at the end so we are safe even if the function
1107 * gets the master->addr->ip and master->addr->port as arguments. */
1108 releaseSentinelAddr(oldaddr);
1109 return REDIS_OK;
1110 }
1111
1112 /* ============================ Config handling ============================= */
1113 char *sentinelHandleConfiguration(char **argv, int argc) {
1114 sentinelRedisInstance *ri;
1115
1116 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
1117 /* monitor <name> <host> <port> <quorum> */
1118 int quorum = atoi(argv[4]);
1119
1120 if (quorum <= 0) return "Quorum must be 1 or greater.";
1121 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
1122 atoi(argv[3]),quorum,NULL) == NULL)
1123 {
1124 switch(errno) {
1125 case EBUSY: return "Duplicated master name.";
1126 case ENOENT: return "Can't resolve master instance hostname.";
1127 case EINVAL: return "Invalid port number";
1128 }
1129 }
1130 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
1131 /* down-after-milliseconds <name> <milliseconds> */
1132 ri = sentinelGetMasterByName(argv[1]);
1133 if (!ri) return "No such master with specified name.";
1134 ri->down_after_period = atoi(argv[2]);
1135 if (ri->down_after_period <= 0)
1136 return "negative or zero time parameter.";
1137 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
1138 /* failover-timeout <name> <milliseconds> */
1139 ri = sentinelGetMasterByName(argv[1]);
1140 if (!ri) return "No such master with specified name.";
1141 ri->failover_timeout = atoi(argv[2]);
1142 if (ri->failover_timeout <= 0)
1143 return "negative or zero time parameter.";
1144 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
1145 /* can-failover <name> <yes/no> */
1146 int yesno = yesnotoi(argv[2]);
1147
1148 ri = sentinelGetMasterByName(argv[1]);
1149 if (!ri) return "No such master with specified name.";
1150 if (yesno == -1) return "Argument must be either yes or no.";
1151 if (yesno)
1152 ri->flags |= SRI_CAN_FAILOVER;
1153 else
1154 ri->flags &= ~SRI_CAN_FAILOVER;
1155 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
1156 /* parallel-syncs <name> <milliseconds> */
1157 ri = sentinelGetMasterByName(argv[1]);
1158 if (!ri) return "No such master with specified name.";
1159 ri->parallel_syncs = atoi(argv[2]);
1160 } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
1161 /* notification-script <name> <path> */
1162 ri = sentinelGetMasterByName(argv[1]);
1163 if (!ri) return "No such master with specified name.";
1164 if (access(argv[2],X_OK) == -1)
1165 return "Notification script seems non existing or non executable.";
1166 ri->notification_script = sdsnew(argv[2]);
1167 } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
1168 /* client-reconfig-script <name> <path> */
1169 ri = sentinelGetMasterByName(argv[1]);
1170 if (!ri) return "No such master with specified name.";
1171 if (access(argv[2],X_OK) == -1)
1172 return "Client reconfiguration script seems non existing or "
1173 "non executable.";
1174 ri->client_reconfig_script = sdsnew(argv[2]);
1175 } else {
1176 return "Unrecognized sentinel configuration statement.";
1177 }
1178 return NULL;
1179 }
1180
1181 /* ====================== hiredis connection handling ======================= */
1182
1183 /* Completely disconnect an hiredis link from an instance. */
1184 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
1185 if (ri->cc == c) {
1186 ri->cc = NULL;
1187 ri->pending_commands = 0;
1188 }
1189 if (ri->pc == c) ri->pc = NULL;
1190 c->data = NULL;
1191 ri->flags |= SRI_DISCONNECTED;
1192 redisAsyncFree(c);
1193 }
1194
1195 /* This function takes an hiredis context that is in an error condition
1196 * and make sure to mark the instance as disconnected performing the
1197 * cleanup needed.
1198 *
1199 * Note: we don't free the hiredis context as hiredis will do it for us
1200 * for async conenctions. */
1201 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
1202 sentinelRedisInstance *ri = c->data;
1203 int pubsub;
1204
1205 if (ri == NULL) return; /* The instance no longer exists. */
1206
1207 pubsub = (ri->pc == c);
1208 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
1209 "%@ #%s", c->errstr);
1210 if (pubsub)
1211 ri->pc = NULL;
1212 else
1213 ri->cc = NULL;
1214 ri->flags |= SRI_DISCONNECTED;
1215 }
1216
1217 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
1218 if (status != REDIS_OK) {
1219 sentinelDisconnectInstanceFromContext(c);
1220 } else {
1221 sentinelRedisInstance *ri = c->data;
1222 int pubsub = (ri->pc == c);
1223
1224 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
1225 "%@");
1226 }
1227 }
1228
1229 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
1230 sentinelDisconnectInstanceFromContext(c);
1231 }
1232
1233 /* Create the async connections for the specified instance if the instance
1234 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
1235 * one of the two links (commands and pub/sub) is missing. */
1236 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
1237 if (!(ri->flags & SRI_DISCONNECTED)) return;
1238
1239 /* Commands connection. */
1240 if (ri->cc == NULL) {
1241 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1242 if (ri->cc->err) {
1243 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
1244 ri->cc->errstr);
1245 sentinelKillLink(ri,ri->cc);
1246 } else {
1247 ri->cc_conn_time = mstime();
1248 ri->cc->data = ri;
1249 redisAeAttach(server.el,ri->cc);
1250 redisAsyncSetConnectCallback(ri->cc,
1251 sentinelLinkEstablishedCallback);
1252 redisAsyncSetDisconnectCallback(ri->cc,
1253 sentinelDisconnectCallback);
1254 }
1255 }
1256 /* Pub / Sub */
1257 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
1258 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1259 if (ri->pc->err) {
1260 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
1261 ri->pc->errstr);
1262 sentinelKillLink(ri,ri->pc);
1263 } else {
1264 int retval;
1265
1266 ri->pc_conn_time = mstime();
1267 ri->pc->data = ri;
1268 redisAeAttach(server.el,ri->pc);
1269 redisAsyncSetConnectCallback(ri->pc,
1270 sentinelLinkEstablishedCallback);
1271 redisAsyncSetDisconnectCallback(ri->pc,
1272 sentinelDisconnectCallback);
1273 /* Now we subscribe to the Sentinels "Hello" channel. */
1274 retval = redisAsyncCommand(ri->pc,
1275 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
1276 SENTINEL_HELLO_CHANNEL);
1277 if (retval != REDIS_OK) {
1278 /* If we can't subscribe, the Pub/Sub connection is useless
1279 * and we can simply disconnect it and try again. */
1280 sentinelKillLink(ri,ri->pc);
1281 return;
1282 }
1283 }
1284 }
1285 /* Clear the DISCONNECTED flags only if we have both the connections
1286 * (or just the commands connection if this is a slave or a
1287 * sentinel instance). */
1288 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
1289 ri->flags &= ~SRI_DISCONNECTED;
1290 }
1291
1292 /* ======================== Redis instances pinging ======================== */
1293
1294 /* Process the INFO output from masters. */
1295 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
1296 sds *lines;
1297 int numlines, j;
1298 int role = 0;
1299 int runid_changed = 0; /* true if runid changed. */
1300 int first_runid = 0; /* true if this is the first runid we receive. */
1301
1302 /* The following fields must be reset to a given value in the case they
1303 * are not found at all in the INFO output. */
1304 ri->master_link_down_time = 0;
1305
1306 /* Process line by line. */
1307 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
1308 for (j = 0; j < numlines; j++) {
1309 sentinelRedisInstance *slave;
1310 sds l = lines[j];
1311
1312 /* run_id:<40 hex chars>*/
1313 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
1314 if (ri->runid == NULL) {
1315 ri->runid = sdsnewlen(l+7,40);
1316 first_runid = 1;
1317 } else {
1318 if (strncmp(ri->runid,l+7,40) != 0) {
1319 runid_changed = 1;
1320 sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
1321 sdsfree(ri->runid);
1322 ri->runid = sdsnewlen(l+7,40);
1323 }
1324 }
1325 }
1326
1327 /* slave0:<ip>,<port>,<state> */
1328 if ((ri->flags & SRI_MASTER) &&
1329 sdslen(l) >= 7 &&
1330 !memcmp(l,"slave",5) && isdigit(l[5]))
1331 {
1332 char *ip, *port, *end;
1333
1334 ip = strchr(l,':'); if (!ip) continue;
1335 ip++; /* Now ip points to start of ip address. */
1336 port = strchr(ip,','); if (!port) continue;
1337 *port = '\0'; /* nul term for easy access. */
1338 port++; /* Now port points to start of port number. */
1339 end = strchr(port,','); if (!end) continue;
1340 *end = '\0'; /* nul term for easy access. */
1341
1342 /* Check if we already have this slave into our table,
1343 * otherwise add it. */
1344 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1345 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1346 atoi(port), ri->quorum,ri)) != NULL)
1347 {
1348 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1349 }
1350 }
1351 }
1352
1353 /* master_link_down_since_seconds:<seconds> */
1354 if (sdslen(l) >= 32 &&
1355 !memcmp(l,"master_link_down_since_seconds",30))
1356 {
1357 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1358 }
1359
1360 /* role:<role> */
1361 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1362 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1363
1364 if (role == SRI_SLAVE) {
1365 /* master_host:<host> */
1366 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1367 sdsfree(ri->slave_master_host);
1368 ri->slave_master_host = sdsnew(l+12);
1369 }
1370
1371 /* master_port:<port> */
1372 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1373 ri->slave_master_port = atoi(l+12);
1374
1375 /* master_link_status:<status> */
1376 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1377 ri->slave_master_link_status =
1378 (strcasecmp(l+19,"up") == 0) ?
1379 SENTINEL_MASTER_LINK_STATUS_UP :
1380 SENTINEL_MASTER_LINK_STATUS_DOWN;
1381 }
1382 }
1383 }
1384 ri->info_refresh = mstime();
1385 sdsfreesplitres(lines,numlines);
1386
1387 if (sentinel.tilt) return;
1388
1389 /* Act if a master turned into a slave. */
1390 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
1391 if (first_runid && ri->slave_master_host) {
1392 /* If it is the first time we receive INFO from it, but it's
1393 * a slave while it was configured as a master, we want to monitor
1394 * its master instead. */
1395 sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
1396 "%s %s %d %s %d",
1397 ri->name, ri->addr->ip, ri->addr->port,
1398 ri->slave_master_host, ri->slave_master_port);
1399 sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
1400 ri->slave_master_port);
1401 return;
1402 }
1403 }
1404
1405 /* Act if a slave turned into a master. */
1406 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
1407 if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1408 (runid_changed || first_runid))
1409 {
1410 /* If a slave turned into a master, but at the same time the
1411 * runid has changed, or it is simply the first time we see and
1412 * INFO output from this instance, this is a reboot with a wrong
1413 * configuration.
1414 *
1415 * Log the event and remove the slave. */
1416 int retval;
1417
1418 sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
1419 retval = dictDelete(ri->master->slaves,ri->name);
1420 redisAssert(retval == REDIS_OK);
1421 return;
1422 } else if (ri->flags & SRI_PROMOTED) {
1423 /* If this is a promoted slave we can change state to the
1424 * failover state machine. */
1425 if (ri->master &&
1426 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1427 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1428 (ri->master->failover_state ==
1429 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1430 {
1431 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1432 ri->master->failover_state_change_time = mstime();
1433 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1434 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1435 ri->master,"%@");
1436 }
1437 } else {
1438 /* Otherwise we interpret this as the start of the failover. */
1439 if (ri->master &&
1440 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) == 0)
1441 {
1442 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1443 sentinelEvent(REDIS_WARNING,"failover-detected",ri->master,"%@");
1444 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1445 ri->master->failover_state_change_time = mstime();
1446 ri->master->promoted_slave = ri;
1447 ri->flags |= SRI_PROMOTED;
1448 /* We are an observer, so we can only assume that the leader
1449 * is reconfiguring the slave instances. For this reason we
1450 * set all the instances as RECONF_SENT waiting for progresses
1451 * on this side. */
1452 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1453 SRI_RECONF_SENT);
1454 }
1455 }
1456 }
1457
1458 /* Detect if the slave that is in the process of being reconfigured
1459 * changed state. */
1460 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1461 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1462 {
1463 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1464 if ((ri->flags & SRI_RECONF_SENT) &&
1465 ri->slave_master_host &&
1466 strcmp(ri->slave_master_host,
1467 ri->master->promoted_slave->addr->ip) == 0 &&
1468 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1469 {
1470 ri->flags &= ~SRI_RECONF_SENT;
1471 ri->flags |= SRI_RECONF_INPROG;
1472 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1473 }
1474
1475 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1476 if ((ri->flags & SRI_RECONF_INPROG) &&
1477 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1478 {
1479 ri->flags &= ~SRI_RECONF_INPROG;
1480 ri->flags |= SRI_RECONF_DONE;
1481 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1482 /* If we are moving forward (a new slave is now configured)
1483 * we update the change_time as we are conceptually passing
1484 * to the next slave. */
1485 ri->failover_state_change_time = mstime();
1486 }
1487 }
1488 }
1489
1490 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1491 sentinelRedisInstance *ri = c->data;
1492 redisReply *r;
1493
1494 if (ri) ri->pending_commands--;
1495 if (!reply || !ri) return;
1496 r = reply;
1497
1498 if (r->type == REDIS_REPLY_STRING) {
1499 sentinelRefreshInstanceInfo(ri,r->str);
1500 }
1501 }
1502
1503 /* Just discard the reply. We use this when we are not monitoring the return
1504 * value of the command but its effects directly. */
1505 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1506 sentinelRedisInstance *ri = c->data;
1507
1508 if (ri) ri->pending_commands--;
1509 }
1510
1511 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1512 sentinelRedisInstance *ri = c->data;
1513 redisReply *r;
1514
1515 if (ri) ri->pending_commands--;
1516 if (!reply || !ri) return;
1517 r = reply;
1518
1519 if (r->type == REDIS_REPLY_STATUS ||
1520 r->type == REDIS_REPLY_ERROR) {
1521 /* Update the "instance available" field only if this is an
1522 * acceptable reply. */
1523 if (strncmp(r->str,"PONG",4) == 0 ||
1524 strncmp(r->str,"LOADING",7) == 0 ||
1525 strncmp(r->str,"MASTERDOWN",10) == 0)
1526 {
1527 ri->last_avail_time = mstime();
1528 }
1529 }
1530 ri->last_pong_time = mstime();
1531 }
1532
1533 /* This is called when we get the reply about the PUBLISH command we send
1534 * to the master to advertise this sentinel. */
1535 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1536 sentinelRedisInstance *ri = c->data;
1537 redisReply *r;
1538
1539 if (ri) ri->pending_commands--;
1540 if (!reply || !ri) return;
1541 r = reply;
1542
1543 /* Only update pub_time if we actually published our message. Otherwise
1544 * we'll retry against in 100 milliseconds. */
1545 if (r->type != REDIS_REPLY_ERROR)
1546 ri->last_pub_time = mstime();
1547 }
1548
1549 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
1550 * to discover other sentinels attached at the same master. */
1551 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1552 sentinelRedisInstance *ri = c->data;
1553 redisReply *r;
1554
1555 if (!reply || !ri) return;
1556 r = reply;
1557
1558 /* Update the last activity in the pubsub channel. Note that since we
1559 * receive our messages as well this timestamp can be used to detect
1560 * if the link is probably diconnected even if it seems otherwise. */
1561 ri->pc_last_activity = mstime();
1562
1563 /* Sanity check in the reply we expect, so that the code that follows
1564 * can avoid to check for details. */
1565 if (r->type != REDIS_REPLY_ARRAY ||
1566 r->elements != 3 ||
1567 r->element[0]->type != REDIS_REPLY_STRING ||
1568 r->element[1]->type != REDIS_REPLY_STRING ||
1569 r->element[2]->type != REDIS_REPLY_STRING ||
1570 strcmp(r->element[0]->str,"message") != 0) return;
1571
1572 /* We are not interested in meeting ourselves */
1573 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1574
1575 {
1576 int numtokens, port, removed, canfailover;
1577 char **token = sdssplitlen(r->element[2]->str,
1578 r->element[2]->len,
1579 ":",1,&numtokens);
1580 sentinelRedisInstance *sentinel;
1581
1582 if (numtokens == 4) {
1583 /* First, try to see if we already have this sentinel. */
1584 port = atoi(token[1]);
1585 canfailover = atoi(token[3]);
1586 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1587 ri->sentinels,token[0],port,token[2]);
1588
1589 if (!sentinel) {
1590 /* If not, remove all the sentinels that have the same runid
1591 * OR the same ip/port, because it's either a restart or a
1592 * network topology change. */
1593 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1594 token[2]);
1595 if (removed) {
1596 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1597 "%@ #duplicate of %s:%d or %s",
1598 token[0],port,token[2]);
1599 }
1600
1601 /* Add the new sentinel. */
1602 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1603 token[0],port,ri->quorum,ri);
1604 if (sentinel) {
1605 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1606 /* The runid is NULL after a new instance creation and
1607 * for Sentinels we don't have a later chance to fill it,
1608 * so do it now. */
1609 sentinel->runid = sdsnew(token[2]);
1610 }
1611 }
1612
1613 /* Update the state of the Sentinel. */
1614 if (sentinel) {
1615 sentinel->last_hello_time = mstime();
1616 if (canfailover)
1617 sentinel->flags |= SRI_CAN_FAILOVER;
1618 else
1619 sentinel->flags &= ~SRI_CAN_FAILOVER;
1620 }
1621 }
1622 sdsfreesplitres(token,numtokens);
1623 }
1624 }
1625
1626 void sentinelPingInstance(sentinelRedisInstance *ri) {
1627 mstime_t now = mstime();
1628 mstime_t info_period;
1629 int retval;
1630
1631 /* Return ASAP if we have already a PING or INFO already pending, or
1632 * in the case the instance is not properly connected. */
1633 if (ri->flags & SRI_DISCONNECTED) return;
1634
1635 /* For INFO, PING, PUBLISH that are not critical commands to send we
1636 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1637 * want to use a lot of memory just because a link is not working
1638 * properly (note that anyway there is a redundant protection about this,
1639 * that is, the link will be disconnected and reconnected if a long
1640 * timeout condition is detected. */
1641 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1642
1643 /* If this is a slave of a master in O_DOWN condition we start sending
1644 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1645 * period. In this state we want to closely monitor slaves in case they
1646 * are turned into masters by another Sentinel, or by the sysadmin. */
1647 if ((ri->flags & SRI_SLAVE) &&
1648 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1649 info_period = 1000;
1650 } else {
1651 info_period = SENTINEL_INFO_PERIOD;
1652 }
1653
1654 if ((ri->flags & SRI_SENTINEL) == 0 &&
1655 (ri->info_refresh == 0 ||
1656 (now - ri->info_refresh) > info_period))
1657 {
1658 /* Send INFO to masters and slaves, not sentinels. */
1659 retval = redisAsyncCommand(ri->cc,
1660 sentinelInfoReplyCallback, NULL, "INFO");
1661 if (retval != REDIS_OK) return;
1662 ri->pending_commands++;
1663 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1664 /* Send PING to all the three kinds of instances. */
1665 retval = redisAsyncCommand(ri->cc,
1666 sentinelPingReplyCallback, NULL, "PING");
1667 if (retval != REDIS_OK) return;
1668 ri->pending_commands++;
1669 } else if ((ri->flags & SRI_MASTER) &&
1670 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1671 {
1672 /* PUBLISH hello messages only to masters. */
1673 struct sockaddr_in sa;
1674 socklen_t salen = sizeof(sa);
1675
1676 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1677 char myaddr[128];
1678
1679 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1680 inet_ntoa(sa.sin_addr), server.port, server.runid,
1681 (ri->flags & SRI_CAN_FAILOVER) != 0);
1682 retval = redisAsyncCommand(ri->cc,
1683 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1684 SENTINEL_HELLO_CHANNEL,myaddr);
1685 if (retval != REDIS_OK) return;
1686 ri->pending_commands++;
1687 }
1688 }
1689 }
1690
1691 /* =========================== SENTINEL command ============================= */
1692
1693 const char *sentinelFailoverStateStr(int state) {
1694 switch(state) {
1695 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1696 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1697 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1698 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1699 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1700 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1701 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1702 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1703 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1704 default: return "unknown";
1705 }
1706 }
1707
1708 /* Redis instance to Redis protocol representation. */
1709 void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1710 char *flags = sdsempty();
1711 void *mbl;
1712 int fields = 0;
1713
1714 mbl = addDeferredMultiBulkLength(c);
1715
1716 addReplyBulkCString(c,"name");
1717 addReplyBulkCString(c,ri->name);
1718 fields++;
1719
1720 addReplyBulkCString(c,"ip");
1721 addReplyBulkCString(c,ri->addr->ip);
1722 fields++;
1723
1724 addReplyBulkCString(c,"port");
1725 addReplyBulkLongLong(c,ri->addr->port);
1726 fields++;
1727
1728 addReplyBulkCString(c,"runid");
1729 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1730 fields++;
1731
1732 addReplyBulkCString(c,"flags");
1733 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1734 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1735 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1736 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1737 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1738 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1739 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1740 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1741 flags = sdscat(flags,"failover_in_progress,");
1742 if (ri->flags & SRI_I_AM_THE_LEADER)
1743 flags = sdscat(flags,"i_am_the_leader,");
1744 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1745 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1746 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1747 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1748
1749 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1750 addReplyBulkCString(c,flags);
1751 sdsfree(flags);
1752 fields++;
1753
1754 addReplyBulkCString(c,"pending-commands");
1755 addReplyBulkLongLong(c,ri->pending_commands);
1756 fields++;
1757
1758 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1759 addReplyBulkCString(c,"failover-state");
1760 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1761 fields++;
1762 }
1763
1764 addReplyBulkCString(c,"last-ok-ping-reply");
1765 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1766 fields++;
1767
1768 addReplyBulkCString(c,"last-ping-reply");
1769 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1770 fields++;
1771
1772 if (ri->flags & SRI_S_DOWN) {
1773 addReplyBulkCString(c,"s-down-time");
1774 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1775 fields++;
1776 }
1777
1778 if (ri->flags & SRI_O_DOWN) {
1779 addReplyBulkCString(c,"o-down-time");
1780 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1781 fields++;
1782 }
1783
1784 /* Masters and Slaves */
1785 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1786 addReplyBulkCString(c,"info-refresh");
1787 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1788 fields++;
1789 }
1790
1791 /* Only masters */
1792 if (ri->flags & SRI_MASTER) {
1793 addReplyBulkCString(c,"num-slaves");
1794 addReplyBulkLongLong(c,dictSize(ri->slaves));
1795 fields++;
1796
1797 addReplyBulkCString(c,"num-other-sentinels");
1798 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1799 fields++;
1800
1801 addReplyBulkCString(c,"quorum");
1802 addReplyBulkLongLong(c,ri->quorum);
1803 fields++;
1804 }
1805
1806 /* Only slaves */
1807 if (ri->flags & SRI_SLAVE) {
1808 addReplyBulkCString(c,"master-link-down-time");
1809 addReplyBulkLongLong(c,ri->master_link_down_time);
1810 fields++;
1811
1812 addReplyBulkCString(c,"master-link-status");
1813 addReplyBulkCString(c,
1814 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1815 "ok" : "err");
1816 fields++;
1817
1818 addReplyBulkCString(c,"master-host");
1819 addReplyBulkCString(c,
1820 ri->slave_master_host ? ri->slave_master_host : "?");
1821 fields++;
1822
1823 addReplyBulkCString(c,"master-port");
1824 addReplyBulkLongLong(c,ri->slave_master_port);
1825 fields++;
1826 }
1827
1828 /* Only sentinels */
1829 if (ri->flags & SRI_SENTINEL) {
1830 addReplyBulkCString(c,"last-hello-message");
1831 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1832 fields++;
1833
1834 addReplyBulkCString(c,"can-failover-its-master");
1835 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1836 fields++;
1837
1838 if (ri->flags & SRI_MASTER_DOWN) {
1839 addReplyBulkCString(c,"subjective-leader");
1840 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1841 fields++;
1842 }
1843 }
1844
1845 setDeferredMultiBulkLength(c,mbl,fields*2);
1846 }
1847
1848 /* Output a number of instances contanined inside a dictionary as
1849 * Redis protocol. */
1850 void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1851 dictIterator *di;
1852 dictEntry *de;
1853
1854 di = dictGetIterator(instances);
1855 addReplyMultiBulkLen(c,dictSize(instances));
1856 while((de = dictNext(di)) != NULL) {
1857 sentinelRedisInstance *ri = dictGetVal(de);
1858
1859 addReplySentinelRedisInstance(c,ri);
1860 }
1861 dictReleaseIterator(di);
1862 }
1863
1864 /* Lookup the named master into sentinel.masters.
1865 * If the master is not found reply to the client with an error and returns
1866 * NULL. */
1867 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1868 robj *name)
1869 {
1870 sentinelRedisInstance *ri;
1871
1872 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1873 if (!ri) {
1874 addReplyError(c,"No such master with that name");
1875 return NULL;
1876 }
1877 return ri;
1878 }
1879
1880 void sentinelCommand(redisClient *c) {
1881 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1882 /* SENTINEL MASTERS */
1883 if (c->argc != 2) goto numargserr;
1884
1885 addReplyDictOfRedisInstances(c,sentinel.masters);
1886 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1887 /* SENTINEL SLAVES <master-name> */
1888 sentinelRedisInstance *ri;
1889
1890 if (c->argc != 3) goto numargserr;
1891 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1892 return;
1893 addReplyDictOfRedisInstances(c,ri->slaves);
1894 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1895 /* SENTINEL SENTINELS <master-name> */
1896 sentinelRedisInstance *ri;
1897
1898 if (c->argc != 3) goto numargserr;
1899 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1900 return;
1901 addReplyDictOfRedisInstances(c,ri->sentinels);
1902 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1903 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1904 sentinelRedisInstance *ri;
1905 char *leader = NULL;
1906 long port;
1907 int isdown = 0;
1908
1909 if (c->argc != 4) goto numargserr;
1910 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
1911 return;
1912 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
1913 c->argv[2]->ptr,port,NULL);
1914
1915 /* It exists? Is actually a master? Is subjectively down? It's down.
1916 * Note: if we are in tilt mode we always reply with "0". */
1917 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
1918 (ri->flags & SRI_MASTER))
1919 isdown = 1;
1920 if (ri) leader = sentinelGetSubjectiveLeader(ri);
1921
1922 /* Reply with a two-elements multi-bulk reply: down state, leader. */
1923 addReplyMultiBulkLen(c,2);
1924 addReply(c, isdown ? shared.cone : shared.czero);
1925 addReplyBulkCString(c, leader ? leader : "?");
1926 if (leader) sdsfree(leader);
1927 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
1928 /* SENTINEL RESET <pattern> */
1929 if (c->argc != 3) goto numargserr;
1930 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
1931 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
1932 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
1933 sentinelRedisInstance *ri;
1934
1935 if (c->argc != 3) goto numargserr;
1936 ri = sentinelGetMasterByName(c->argv[2]->ptr);
1937 if (ri == NULL) {
1938 addReply(c,shared.nullmultibulk);
1939 } else {
1940 sentinelAddr *addr = ri->addr;
1941
1942 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
1943 addr = ri->promoted_slave->addr;
1944 addReplyMultiBulkLen(c,2);
1945 addReplyBulkCString(c,addr->ip);
1946 addReplyBulkLongLong(c,addr->port);
1947 }
1948 } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
1949 /* SENTINEL PENDING-SCRIPTS */
1950
1951 if (c->argc != 2) goto numargserr;
1952 sentinelPendingScriptsCommand(c);
1953 } else {
1954 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
1955 (char*)c->argv[1]->ptr);
1956 }
1957 return;
1958
1959 numargserr:
1960 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
1961 (char*)c->argv[1]->ptr);
1962 }
1963
1964 /* ===================== SENTINEL availability checks ======================= */
1965
1966 /* Is this instance down from our point of view? */
1967 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
1968 mstime_t elapsed = mstime() - ri->last_avail_time;
1969
1970 /* Check if we are in need for a reconnection of one of the
1971 * links, because we are detecting low activity.
1972 *
1973 * 1) Check if the command link seems connected, was connected not less
1974 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
1975 * idle time that is greater than down_after_period / 2 seconds. */
1976 if (ri->cc &&
1977 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1978 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
1979 {
1980 sentinelKillLink(ri,ri->cc);
1981 }
1982
1983 /* 2) Check if the pubsub link seems connected, was connected not less
1984 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
1985 * activity in the Pub/Sub channel for more than
1986 * SENTINEL_PUBLISH_PERIOD * 3.
1987 */
1988 if (ri->pc &&
1989 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1990 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
1991 {
1992 sentinelKillLink(ri,ri->pc);
1993 }
1994
1995 /* Update the subjectively down flag. */
1996 if (elapsed > ri->down_after_period) {
1997 /* Is subjectively down */
1998 if ((ri->flags & SRI_S_DOWN) == 0) {
1999 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
2000 ri->s_down_since_time = mstime();
2001 ri->flags |= SRI_S_DOWN;
2002 }
2003 } else {
2004 /* Is subjectively up */
2005 if (ri->flags & SRI_S_DOWN) {
2006 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
2007 ri->flags &= ~SRI_S_DOWN;
2008 }
2009 }
2010 }
2011
2012 /* Is this instance down accordingly to the configured quorum? */
2013 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
2014 dictIterator *di;
2015 dictEntry *de;
2016 int quorum = 0, odown = 0;
2017
2018 if (master->flags & SRI_S_DOWN) {
2019 /* Is down for enough sentinels? */
2020 quorum = 1; /* the current sentinel. */
2021 /* Count all the other sentinels. */
2022 di = dictGetIterator(master->sentinels);
2023 while((de = dictNext(di)) != NULL) {
2024 sentinelRedisInstance *ri = dictGetVal(de);
2025
2026 if (ri->flags & SRI_MASTER_DOWN) quorum++;
2027 }
2028 dictReleaseIterator(di);
2029 if (quorum >= master->quorum) odown = 1;
2030 }
2031
2032 /* Set the flag accordingly to the outcome. */
2033 if (odown) {
2034 if ((master->flags & SRI_O_DOWN) == 0) {
2035 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
2036 quorum, master->quorum);
2037 master->flags |= SRI_O_DOWN;
2038 master->o_down_since_time = mstime();
2039 }
2040 } else {
2041 if (master->flags & SRI_O_DOWN) {
2042 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
2043 master->flags &= ~SRI_O_DOWN;
2044 }
2045 }
2046 }
2047
2048 /* Receive the SENTINEL is-master-down-by-addr reply, see the
2049 * sentinelAskMasterStateToOtherSentinels() function for more information. */
2050 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
2051 sentinelRedisInstance *ri = c->data;
2052 redisReply *r;
2053
2054 if (ri) ri->pending_commands--;
2055 if (!reply || !ri) return;
2056 r = reply;
2057
2058 /* Ignore every error or unexpected reply.
2059 * Note that if the command returns an error for any reason we'll
2060 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
2061 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
2062 r->element[0]->type == REDIS_REPLY_INTEGER &&
2063 r->element[1]->type == REDIS_REPLY_STRING)
2064 {
2065 ri->last_master_down_reply_time = mstime();
2066 if (r->element[0]->integer == 1) {
2067 ri->flags |= SRI_MASTER_DOWN;
2068 } else {
2069 ri->flags &= ~SRI_MASTER_DOWN;
2070 }
2071 sdsfree(ri->leader);
2072 ri->leader = sdsnew(r->element[1]->str);
2073 }
2074 }
2075
2076 /* If we think (subjectively) the master is down, we start sending
2077 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
2078 * in order to get the replies that allow to reach the quorum and
2079 * possibly also mark the master as objectively down. */
2080 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
2081 dictIterator *di;
2082 dictEntry *de;
2083
2084 di = dictGetIterator(master->sentinels);
2085 while((de = dictNext(di)) != NULL) {
2086 sentinelRedisInstance *ri = dictGetVal(de);
2087 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
2088 char port[32];
2089 int retval;
2090
2091 /* If the master state from other sentinel is too old, we clear it. */
2092 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
2093 ri->flags &= ~SRI_MASTER_DOWN;
2094 sdsfree(ri->leader);
2095 ri->leader = NULL;
2096 }
2097
2098 /* Only ask if master is down to other sentinels if:
2099 *
2100 * 1) We believe it is down, or there is a failover in progress.
2101 * 2) Sentinel is connected.
2102 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
2103 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
2104 continue;
2105 if (ri->flags & SRI_DISCONNECTED) continue;
2106 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
2107 continue;
2108
2109 /* Ask */
2110 ll2string(port,sizeof(port),master->addr->port);
2111 retval = redisAsyncCommand(ri->cc,
2112 sentinelReceiveIsMasterDownReply, NULL,
2113 "SENTINEL is-master-down-by-addr %s %s",
2114 master->addr->ip, port);
2115 if (retval == REDIS_OK) ri->pending_commands++;
2116 }
2117 dictReleaseIterator(di);
2118 }
2119
2120 /* =============================== FAILOVER ================================= */
2121
2122 /* Given a master get the "subjective leader", that is, among all the sentinels
2123 * with given characteristics, the one with the lexicographically smaller
2124 * runid. The characteristics required are:
2125 *
2126 * 1) Has SRI_CAN_FAILOVER flag.
2127 * 2) Is not disconnected.
2128 * 3) Recently answered to our ping (no longer than
2129 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
2130 *
2131 * The function returns a pointer to an sds string representing the runid of the
2132 * leader sentinel instance (from our point of view). Otherwise NULL is
2133 * returned if there are no suitable sentinels.
2134 */
2135
2136 int compareRunID(const void *a, const void *b) {
2137 char **aptrptr = (char**)a, **bptrptr = (char**)b;
2138 return strcasecmp(*aptrptr, *bptrptr);
2139 }
2140
2141 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
2142 dictIterator *di;
2143 dictEntry *de;
2144 char **instance =
2145 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
2146 int instances = 0;
2147 char *leader = NULL;
2148
2149 if (master->flags & SRI_CAN_FAILOVER) {
2150 /* Add myself if I'm a Sentinel that can failover this master. */
2151 instance[instances++] = server.runid;
2152 }
2153
2154 di = dictGetIterator(master->sentinels);
2155 while((de = dictNext(di)) != NULL) {
2156 sentinelRedisInstance *ri = dictGetVal(de);
2157 mstime_t lag = mstime() - ri->last_avail_time;
2158
2159 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
2160 !(ri->flags & SRI_CAN_FAILOVER) ||
2161 (ri->flags & SRI_DISCONNECTED) ||
2162 ri->runid == NULL)
2163 continue;
2164 instance[instances++] = ri->runid;
2165 }
2166 dictReleaseIterator(di);
2167
2168 /* If we have at least one instance passing our checks, order the array
2169 * by runid. */
2170 if (instances) {
2171 qsort(instance,instances,sizeof(char*),compareRunID);
2172 leader = sdsnew(instance[0]);
2173 }
2174 zfree(instance);
2175 return leader;
2176 }
2177
2178 struct sentinelLeader {
2179 char *runid;
2180 unsigned long votes;
2181 };
2182
2183 /* Helper function for sentinelGetObjectiveLeader, increment the counter
2184 * relative to the specified runid. */
2185 void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
2186 dictEntry *de = dictFind(counters,runid);
2187 uint64_t oldval;
2188
2189 if (de) {
2190 oldval = dictGetUnsignedIntegerVal(de);
2191 dictSetUnsignedIntegerVal(de,oldval+1);
2192 } else {
2193 de = dictAddRaw(counters,runid);
2194 redisAssert(de != NULL);
2195 dictSetUnsignedIntegerVal(de,1);
2196 }
2197 }
2198
2199 /* Scan all the Sentinels attached to this master to check what is the
2200 * most voted leader among Sentinels. */
2201 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
2202 dict *counters;
2203 dictIterator *di;
2204 dictEntry *de;
2205 unsigned int voters = 0, voters_quorum;
2206 char *myvote;
2207 char *winner = NULL;
2208
2209 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
2210 counters = dictCreate(&leaderVotesDictType,NULL);
2211
2212 /* Count my vote. */
2213 myvote = sentinelGetSubjectiveLeader(master);
2214 if (myvote) {
2215 sentinelObjectiveLeaderIncr(counters,myvote);
2216 voters++;
2217 }
2218
2219 /* Count other sentinels votes */
2220 di = dictGetIterator(master->sentinels);
2221 while((de = dictNext(di)) != NULL) {
2222 sentinelRedisInstance *ri = dictGetVal(de);
2223 if (ri->leader == NULL) continue;
2224 /* If the failover is not already in progress we are only interested
2225 * in Sentinels that believe the master is down. Otherwise the leader
2226 * selection is useful for the "failover-takedown" when the original
2227 * leader fails. In that case we consider all the voters. */
2228 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
2229 !(ri->flags & SRI_MASTER_DOWN)) continue;
2230 sentinelObjectiveLeaderIncr(counters,ri->leader);
2231 voters++;
2232 }
2233 dictReleaseIterator(di);
2234 voters_quorum = voters/2+1;
2235
2236 /* Check what's the winner. For the winner to win, it needs two conditions:
2237 * 1) Absolute majority between voters (50% + 1).
2238 * 2) And anyway at least master->quorum votes. */
2239 {
2240 uint64_t max_votes = 0; /* Max votes so far. */
2241
2242 di = dictGetIterator(counters);
2243 while((de = dictNext(di)) != NULL) {
2244 uint64_t votes = dictGetUnsignedIntegerVal(de);
2245
2246 if (max_votes < votes) {
2247 max_votes = votes;
2248 winner = dictGetKey(de);
2249 }
2250 }
2251 dictReleaseIterator(di);
2252 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
2253 winner = NULL;
2254 }
2255 winner = winner ? sdsnew(winner) : NULL;
2256 sdsfree(myvote);
2257 dictRelease(counters);
2258 return winner;
2259 }
2260
2261 /* This function checks if there are the conditions to start the failover,
2262 * that is:
2263 *
2264 * 1) Enough time has passed since O_DOWN.
2265 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
2266 * 3) We are the objectively leader for this master.
2267 *
2268 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
2269 * and SRI_I_AM_THE_LEADER.
2270 */
2271 void sentinelStartFailover(sentinelRedisInstance *master) {
2272 char *leader;
2273 int isleader;
2274
2275 /* We can't failover if the master is not in O_DOWN state or if
2276 * there is not already a failover in progress (to perform the
2277 * takedown if the leader died) or if this Sentinel is not allowed
2278 * to start a failover. */
2279 if (!(master->flags & SRI_CAN_FAILOVER) ||
2280 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
2281
2282 leader = sentinelGetObjectiveLeader(master);
2283 isleader = leader && strcasecmp(leader,server.runid) == 0;
2284 sdsfree(leader);
2285
2286 /* If I'm not the leader, I can't failover for sure. */
2287 if (!isleader) return;
2288
2289 /* If the failover is already in progress there are two options... */
2290 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
2291 if (master->flags & SRI_I_AM_THE_LEADER) {
2292 /* 1) I'm flagged as leader so I already started the failover.
2293 * Just return. */
2294 return;
2295 } else {
2296 mstime_t elapsed = mstime() - master->failover_state_change_time;
2297
2298 /* 2) I'm the new leader, but I'm not flagged as leader in the
2299 * master: I did not started the failover, but the original
2300 * leader has no longer the leadership.
2301 *
2302 * In this case if the failover appears to be lagging
2303 * for at least 25% of the configured failover timeout,
2304 * I can assume I can take control. Otherwise
2305 * it's better to return and wait more. */
2306 if (elapsed < (master->failover_timeout/4)) return;
2307 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
2308 /* We have already an elected slave if we are in
2309 * FAILOVER_IN_PROGRESS state, that is, the slave that we
2310 * observed turning into a master. */
2311 master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
2312 /* As an observer we flagged all the slaves as RECONF_SENT but
2313 * now we are in charge of actually sending the reconfiguration
2314 * command so let's clear this flag for all the instances. */
2315 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
2316 SRI_RECONF_SENT);
2317 }
2318 } else {
2319 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set.
2320 *
2321 * Do we have a slave to promote? Otherwise don't start a failover
2322 * at all. */
2323 if (sentinelSelectSlave(master) == NULL) return;
2324 master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
2325 }
2326
2327 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
2328 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
2329
2330 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
2331 * a recovery of a failover started by another sentinel. */
2332 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
2333 master->failover_start_time = mstime() +
2334 SENTINEL_FAILOVER_FIXED_DELAY +
2335 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
2336 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
2337 "%@ #starting in %lld milliseconds",
2338 master->failover_start_time-mstime());
2339 }
2340 master->failover_state_change_time = mstime();
2341 }
2342
2343 /* Select a suitable slave to promote. The current algorithm only uses
2344 * the following parameters:
2345 *
2346 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
2347 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
2348 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
2349 * 4) master_link_down_time no more than:
2350 * (now - master->s_down_since_time) + (master->down_after_period * 10).
2351 *
2352 * Among all the slaves matching the above conditions we select the slave
2353 * with lower slave_priority. If priority is the same we select the slave
2354 * with lexicographically smaller runid.
2355 *
2356 * The function returns the pointer to the selected slave, otherwise
2357 * NULL if no suitable slave was found.
2358 */
2359
2360 int compareSlavesForPromotion(const void *a, const void *b) {
2361 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
2362 **sb = (sentinelRedisInstance **)b;
2363 if ((*sa)->slave_priority != (*sb)->slave_priority)
2364 return (*sa)->slave_priority - (*sb)->slave_priority;
2365 return strcasecmp((*sa)->runid,(*sb)->runid);
2366 }
2367
2368 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
2369 sentinelRedisInstance **instance =
2370 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
2371 sentinelRedisInstance *selected = NULL;
2372 int instances = 0;
2373 dictIterator *di;
2374 dictEntry *de;
2375 mstime_t max_master_down_time;
2376
2377 max_master_down_time = (mstime() - master->s_down_since_time) +
2378 (master->down_after_period * 10);
2379
2380 di = dictGetIterator(master->slaves);
2381 while((de = dictNext(di)) != NULL) {
2382 sentinelRedisInstance *slave = dictGetVal(de);
2383 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2384
2385 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2386 if (slave->last_avail_time < info_validity_time) continue;
2387 if (slave->info_refresh < info_validity_time) continue;
2388 if (slave->master_link_down_time > max_master_down_time) continue;
2389 instance[instances++] = slave;
2390 }
2391 dictReleaseIterator(di);
2392 if (instances) {
2393 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2394 compareSlavesForPromotion);
2395 selected = instance[0];
2396 }
2397 zfree(instance);
2398 return selected;
2399 }
2400
2401 /* ---------------- Failover state machine implementation ------------------- */
2402 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2403 /* If we in "wait start" but the master is no longer in ODOWN nor in
2404 * SDOWN condition we abort the failover. This is important as it
2405 * prevents a useless failover in a a notable case of netsplit, where
2406 * the senitnels are split from the redis instances. In this case
2407 * the failover will not start while there is the split because no
2408 * good slave can be reached. However when the split is resolved, we
2409 * can go to waitstart if the slave is back rechable a few milliseconds
2410 * before the master is. In that case when the master is back online
2411 * we cancel the failover. */
2412 if ((ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0) {
2413 sentinelEvent(REDIS_WARNING,"-failover-abort-master-is-back",
2414 ri,"%@");
2415 sentinelAbortFailover(ri);
2416 return;
2417 }
2418
2419 /* Start the failover going to the next state if enough time has
2420 * elapsed. */
2421 if (mstime() >= ri->failover_start_time) {
2422 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2423 ri->failover_state_change_time = mstime();
2424 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2425 }
2426 }
2427
2428 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2429 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2430
2431 if (slave == NULL) {
2432 sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
2433 sentinelAbortFailover(ri);
2434 } else {
2435 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2436 slave->flags |= SRI_PROMOTED;
2437 ri->promoted_slave = slave;
2438 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2439 ri->failover_state_change_time = mstime();
2440 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2441 slave, "%@");
2442 }
2443 }
2444
2445 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2446 int retval;
2447
2448 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2449
2450 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2451 * We actually register a generic callback for this command as we don't
2452 * really care about the reply. We check if it worked indirectly observing
2453 * if INFO returns a different role (master instead of slave). */
2454 retval = redisAsyncCommand(ri->promoted_slave->cc,
2455 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2456 if (retval != REDIS_OK) return;
2457 ri->promoted_slave->pending_commands++;
2458 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2459 ri->promoted_slave,"%@");
2460 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2461 ri->failover_state_change_time = mstime();
2462 }
2463
2464 /* We actually wait for promotion indirectly checking with INFO when the
2465 * slave turns into a master. */
2466 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2467 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2468
2469 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2470 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2471 "%@");
2472 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2473 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2474 ri->failover_state_change_time = mstime();
2475 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2476 ri->promoted_slave = NULL;
2477 }
2478 }
2479
2480 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2481 int not_reconfigured = 0, timeout = 0;
2482 dictIterator *di;
2483 dictEntry *de;
2484 mstime_t elapsed = mstime() - master->failover_state_change_time;
2485
2486 /* We can't consider failover finished if the promoted slave is
2487 * not reachable. */
2488 if (master->promoted_slave == NULL ||
2489 master->promoted_slave->flags & SRI_S_DOWN) return;
2490
2491 /* The failover terminates once all the reachable slaves are properly
2492 * configured. */
2493 di = dictGetIterator(master->slaves);
2494 while((de = dictNext(di)) != NULL) {
2495 sentinelRedisInstance *slave = dictGetVal(de);
2496
2497 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2498 if (slave->flags & SRI_S_DOWN) continue;
2499 not_reconfigured++;
2500 }
2501 dictReleaseIterator(di);
2502
2503 /* Force end of failover on timeout. */
2504 if (elapsed > master->failover_timeout) {
2505 not_reconfigured = 0;
2506 timeout = 1;
2507 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2508 }
2509
2510 if (not_reconfigured == 0) {
2511 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2512 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2513 master->failover_state_change_time = mstime();
2514 }
2515
2516 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2517 * command to all the slaves still not reconfigured to replicate with
2518 * the new master. */
2519 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2520 dictIterator *di;
2521 dictEntry *de;
2522 char master_port[32];
2523
2524 ll2string(master_port,sizeof(master_port),
2525 master->promoted_slave->addr->port);
2526
2527 di = dictGetIterator(master->slaves);
2528 while((de = dictNext(di)) != NULL) {
2529 sentinelRedisInstance *slave = dictGetVal(de);
2530 int retval;
2531
2532 if (slave->flags &
2533 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2534
2535 retval = redisAsyncCommand(slave->cc,
2536 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2537 master->promoted_slave->addr->ip,
2538 master_port);
2539 if (retval == REDIS_OK) {
2540 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2541 slave->flags |= SRI_RECONF_SENT;
2542 }
2543 }
2544 dictReleaseIterator(di);
2545 }
2546 }
2547
2548 /* Send SLAVE OF <new master address> to all the remaining slaves that
2549 * still don't appear to have the configuration updated. */
2550 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2551 dictIterator *di;
2552 dictEntry *de;
2553 int in_progress = 0;
2554
2555 di = dictGetIterator(master->slaves);
2556 while((de = dictNext(di)) != NULL) {
2557 sentinelRedisInstance *slave = dictGetVal(de);
2558
2559 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2560 in_progress++;
2561 }
2562 dictReleaseIterator(di);
2563
2564 di = dictGetIterator(master->slaves);
2565 while(in_progress < master->parallel_syncs &&
2566 (de = dictNext(di)) != NULL)
2567 {
2568 sentinelRedisInstance *slave = dictGetVal(de);
2569 int retval;
2570 char master_port[32];
2571
2572 /* Skip the promoted slave, and already configured slaves. */
2573 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2574
2575 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2576 * the slave moving forward to the next state. */
2577 if ((slave->flags & SRI_RECONF_SENT) &&
2578 (mstime() - slave->slave_reconf_sent_time) >
2579 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2580 {
2581 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2582 slave->flags &= ~SRI_RECONF_SENT;
2583 }
2584
2585 /* Nothing to do for instances that are disconnected or already
2586 * in RECONF_SENT state. */
2587 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2588 continue;
2589
2590 /* Send SLAVEOF <new master>. */
2591 ll2string(master_port,sizeof(master_port),
2592 master->promoted_slave->addr->port);
2593 retval = redisAsyncCommand(slave->cc,
2594 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2595 master->promoted_slave->addr->ip,
2596 master_port);
2597 if (retval == REDIS_OK) {
2598 slave->flags |= SRI_RECONF_SENT;
2599 slave->pending_commands++;
2600 slave->slave_reconf_sent_time = mstime();
2601 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2602 in_progress++;
2603 }
2604 }
2605 dictReleaseIterator(di);
2606 sentinelFailoverDetectEnd(master);
2607 }
2608
2609 /* This function is called when the slave is in
2610 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2611 * to remove it from the master table and add the promoted slave instead.
2612 *
2613 * If there are no promoted slaves as this instance is unique, we remove
2614 * and re-add it with the same address to trigger a complete state
2615 * refresh. */
2616 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2617 sentinelRedisInstance *ref = master->promoted_slave ?
2618 master->promoted_slave : master;
2619
2620 sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
2621 master->name, master->addr->ip, master->addr->port,
2622 ref->addr->ip, ref->addr->port);
2623
2624 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
2625 }
2626
2627 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2628 redisAssert(ri->flags & SRI_MASTER);
2629
2630 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2631
2632 switch(ri->failover_state) {
2633 case SENTINEL_FAILOVER_STATE_WAIT_START:
2634 sentinelFailoverWaitStart(ri);
2635 break;
2636 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2637 sentinelFailoverSelectSlave(ri);
2638 break;
2639 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2640 sentinelFailoverSendSlaveOfNoOne(ri);
2641 break;
2642 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2643 sentinelFailoverWaitPromotion(ri);
2644 break;
2645 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2646 sentinelFailoverReconfNextSlave(ri);
2647 break;
2648 case SENTINEL_FAILOVER_STATE_DETECT_END:
2649 sentinelFailoverDetectEnd(ri);
2650 break;
2651 }
2652 }
2653
2654 /* Abort a failover in progress with the following steps:
2655 * 1) If this instance is the leaer send a SLAVEOF command to all the already
2656 * reconfigured slaves if any to configure them to replicate with the
2657 * original master.
2658 * 2) For both leaders and observers: clear the failover flags and state in
2659 * the master instance.
2660 * 3) If there is already a promoted slave and we are the leader, and this
2661 * slave is not DISCONNECTED, try to reconfigure it to replicate
2662 * back to the master as well, sending a best effort SLAVEOF command.
2663 */
2664 void sentinelAbortFailover(sentinelRedisInstance *ri) {
2665 char master_port[32];
2666 dictIterator *di;
2667 dictEntry *de;
2668
2669 redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
2670 ll2string(master_port,sizeof(master_port),ri->addr->port);
2671
2672 /* Clear failover related flags from slaves.
2673 * Also if we are the leader make sure to send SLAVEOF commands to all the
2674 * already reconfigured slaves in order to turn them back into slaves of
2675 * the original master. */
2676 di = dictGetIterator(ri->slaves);
2677 while((de = dictNext(di)) != NULL) {
2678 sentinelRedisInstance *slave = dictGetVal(de);
2679 if ((ri->flags & SRI_I_AM_THE_LEADER) &&
2680 !(slave->flags & SRI_DISCONNECTED) &&
2681 (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
2682 SRI_RECONF_DONE)))
2683 {
2684 int retval;
2685
2686 retval = redisAsyncCommand(slave->cc,
2687 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2688 ri->addr->ip,
2689 master_port);
2690 if (retval == REDIS_OK)
2691 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2692 }
2693 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2694 }
2695 dictReleaseIterator(di);
2696
2697 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER);
2698 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2699 ri->failover_state_change_time = mstime();
2700 if (ri->promoted_slave) {
2701 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2702 ri->promoted_slave = NULL;
2703 }
2704 }
2705
2706 /* The following is called only for master instances and will abort the
2707 * failover process if:
2708 *
2709 * 1) The failover is in progress.
2710 * 2) We already promoted a slave.
2711 * 3) The promoted slave is in extended SDOWN condition.
2712 */
2713 void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2714 /* Failover is in progress? Do we have a promoted slave? */
2715 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2716
2717 /* Is the promoted slave into an extended SDOWN state? */
2718 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2719 (mstime() - ri->promoted_slave->s_down_since_time) <
2720 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2721
2722 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2723 sentinelAbortFailover(ri);
2724 }
2725
2726 /* ======================== SENTINEL timer handler ==========================
2727 * This is the "main" our Sentinel, being sentinel completely non blocking
2728 * in design. The function is called every second.
2729 * -------------------------------------------------------------------------- */
2730
2731 /* Perform scheduled operations for the specified Redis instance. */
2732 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2733 /* ========== MONITORING HALF ============ */
2734 /* Every kind of instance */
2735 sentinelReconnectInstance(ri);
2736 sentinelPingInstance(ri);
2737
2738 /* Masters and slaves */
2739 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2740 /* Nothing so far. */
2741 }
2742
2743 /* Only masters */
2744 if (ri->flags & SRI_MASTER) {
2745 sentinelAskMasterStateToOtherSentinels(ri);
2746 }
2747
2748 /* ============== ACTING HALF ============= */
2749 /* We don't proceed with the acting half if we are in TILT mode.
2750 * TILT happens when we find something odd with the time, like a
2751 * sudden change in the clock. */
2752 if (sentinel.tilt) {
2753 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2754 sentinel.tilt = 0;
2755 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2756 }
2757
2758 /* Every kind of instance */
2759 sentinelCheckSubjectivelyDown(ri);
2760
2761 /* Masters and slaves */
2762 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2763 /* Nothing so far. */
2764 }
2765
2766 /* Only masters */
2767 if (ri->flags & SRI_MASTER) {
2768 sentinelCheckObjectivelyDown(ri);
2769 sentinelStartFailover(ri);
2770 sentinelFailoverStateMachine(ri);
2771 sentinelAbortFailoverIfNeeded(ri);
2772 }
2773 }
2774
2775 /* Perform scheduled operations for all the instances in the dictionary.
2776 * Recursively call the function against dictionaries of slaves. */
2777 void sentinelHandleDictOfRedisInstances(dict *instances) {
2778 dictIterator *di;
2779 dictEntry *de;
2780 sentinelRedisInstance *switch_to_promoted = NULL;
2781
2782 /* There are a number of things we need to perform against every master. */
2783 di = dictGetIterator(instances);
2784 while((de = dictNext(di)) != NULL) {
2785 sentinelRedisInstance *ri = dictGetVal(de);
2786
2787 sentinelHandleRedisInstance(ri);
2788 if (ri->flags & SRI_MASTER) {
2789 sentinelHandleDictOfRedisInstances(ri->slaves);
2790 sentinelHandleDictOfRedisInstances(ri->sentinels);
2791 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
2792 switch_to_promoted = ri;
2793 }
2794 }
2795 }
2796 if (switch_to_promoted)
2797 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
2798 dictReleaseIterator(di);
2799 }
2800
2801 /* This function checks if we need to enter the TITL mode.
2802 *
2803 * The TILT mode is entered if we detect that between two invocations of the
2804 * timer interrupt, a negative amount of time, or too much time has passed.
2805 * Note that we expect that more or less just 100 milliseconds will pass
2806 * if everything is fine. However we'll see a negative number or a
2807 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
2808 * following conditions happen:
2809 *
2810 * 1) The Sentiel process for some time is blocked, for every kind of
2811 * random reason: the load is huge, the computer was freezed for some time
2812 * in I/O or alike, the process was stopped by a signal. Everything.
2813 * 2) The system clock was altered significantly.
2814 *
2815 * Under both this conditions we'll see everything as timed out and failing
2816 * without good reasons. Instead we enter the TILT mode and wait
2817 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
2818 *
2819 * During TILT time we still collect information, we just do not act. */
2820 void sentinelCheckTiltCondition(void) {
2821 mstime_t now = mstime();
2822 mstime_t delta = now - sentinel.previous_time;
2823
2824 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
2825 sentinel.tilt = 1;
2826 sentinel.tilt_start_time = mstime();
2827 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
2828 }
2829 sentinel.previous_time = mstime();
2830 }
2831
2832 void sentinelTimer(void) {
2833 sentinelCheckTiltCondition();
2834 sentinelHandleDictOfRedisInstances(sentinel.masters);
2835 sentinelRunPendingScripts();
2836 sentinelCollectTerminatedScripts();
2837 sentinelKillTimedoutScripts();
2838 }
2839