]> git.saurik.com Git - redis.git/blob - src/sentinel.c
Sentinel: do not crash against slaves not publishing the runid.
[redis.git] / src / sentinel.c
1 /* Redis Sentinel implementation
2 * -----------------------------
3 *
4 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include "redis.h"
33 #include "hiredis.h"
34 #include "async.h"
35
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <sys/socket.h>
39 #include <sys/wait.h>
40
41 extern char **environ;
42
43 #define REDIS_SENTINEL_PORT 26379
44
45 /* ======================== Sentinel global state =========================== */
46
47 typedef long long mstime_t; /* millisecond time type. */
48
49 /* Address object, used to describe an ip:port pair. */
50 typedef struct sentinelAddr {
51 char *ip;
52 int port;
53 } sentinelAddr;
54
55 /* A Sentinel Redis Instance object is monitoring. */
56 #define SRI_MASTER (1<<0)
57 #define SRI_SLAVE (1<<1)
58 #define SRI_SENTINEL (1<<2)
59 #define SRI_DISCONNECTED (1<<3)
60 #define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
61 #define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
62 #define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
63 its master is down. */
64 /* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
65 * allowed to perform the failover for this master.
66 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
67 * perform the failover on its master. */
68 #define SRI_CAN_FAILOVER (1<<7)
69 #define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
70 this master. */
71 #define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
72 #define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
73 #define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
74 #define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
75 #define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
76 #define SRI_FORCE_FAILOVER (1<<14) /* Force failover with master up. */
77 #define SRI_SCRIPT_KILL_SENT (1<<15) /* SCRIPT KILL already sent on -BUSY */
78
79 #define SENTINEL_INFO_PERIOD 10000
80 #define SENTINEL_PING_PERIOD 1000
81 #define SENTINEL_ASK_PERIOD 1000
82 #define SENTINEL_PUBLISH_PERIOD 5000
83 #define SENTINEL_DOWN_AFTER_PERIOD 30000
84 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
85 #define SENTINEL_TILT_TRIGGER 2000
86 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
87 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
88 #define SENTINEL_PROMOTION_RETRY_PERIOD 30000
89 #define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
90 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
91 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
92 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
93 #define SENTINEL_MAX_PENDING_COMMANDS 100
94 #define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
95
96 /* How many milliseconds is an information valid? This applies for instance
97 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
98 #define SENTINEL_INFO_VALIDITY_TIME 5000
99 #define SENTINEL_FAILOVER_FIXED_DELAY 5000
100 #define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
101
102 /* Failover machine different states. */
103 #define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
104 #define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
105 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
106 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
107 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
108 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
109 #define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
110 #define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
111 #define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
112 #define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
113 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
114
115 #define SENTINEL_MASTER_LINK_STATUS_UP 0
116 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
117
118 /* Generic flags that can be used with different functions. */
119 #define SENTINEL_NO_FLAGS 0
120 #define SENTINEL_GENERATE_EVENT 1
121 #define SENTINEL_LEADER 2
122 #define SENTINEL_OBSERVER 4
123
124 /* Script execution flags and limits. */
125 #define SENTINEL_SCRIPT_NONE 0
126 #define SENTINEL_SCRIPT_RUNNING 1
127 #define SENTINEL_SCRIPT_MAX_QUEUE 256
128 #define SENTINEL_SCRIPT_MAX_RUNNING 16
129 #define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
130 #define SENTINEL_SCRIPT_MAX_RETRY 10
131 #define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
132
133 typedef struct sentinelRedisInstance {
134 int flags; /* See SRI_... defines */
135 char *name; /* Master name from the point of view of this sentinel. */
136 char *runid; /* run ID of this instance. */
137 sentinelAddr *addr; /* Master host. */
138 redisAsyncContext *cc; /* Hiredis context for commands. */
139 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
140 int pending_commands; /* Number of commands sent waiting for a reply. */
141 mstime_t cc_conn_time; /* cc connection time. */
142 mstime_t pc_conn_time; /* pc connection time. */
143 mstime_t pc_last_activity; /* Last time we received any message. */
144 mstime_t last_avail_time; /* Last time the instance replied to ping with
145 a reply we consider valid. */
146 mstime_t last_pong_time; /* Last time the instance replied to ping,
147 whatever the reply was. That's used to check
148 if the link is idle and must be reconnected. */
149 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
150 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
151 we received an hello from this Sentinel
152 via Pub/Sub. */
153 mstime_t last_master_down_reply_time; /* Time of last reply to
154 SENTINEL is-master-down command. */
155 mstime_t s_down_since_time; /* Subjectively down since time. */
156 mstime_t o_down_since_time; /* Objectively down since time. */
157 mstime_t down_after_period; /* Consider it down after that period. */
158 mstime_t info_refresh; /* Time at which we received INFO output from it. */
159
160 /* Master specific. */
161 dict *sentinels; /* Other sentinels monitoring the same master. */
162 dict *slaves; /* Slaves for this master instance. */
163 int quorum; /* Number of sentinels that need to agree on failure. */
164 int parallel_syncs; /* How many slaves to reconfigure at same time. */
165
166 /* Slave specific. */
167 mstime_t master_link_down_time; /* Slave replication link down time. */
168 int slave_priority; /* Slave priority according to its INFO output. */
169 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
170 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
171 char *slave_master_host; /* Master host as reported by INFO */
172 int slave_master_port; /* Master port as reported by INFO */
173 int slave_master_link_status; /* Master link status as reported by INFO */
174 /* Failover */
175 char *leader; /* If this is a master instance, this is the runid of
176 the Sentinel that should perform the failover. If
177 this is a Sentinel, this is the runid of the Sentinel
178 that this other Sentinel is voting as leader.
179 This field is valid only if SRI_MASTER_DOWN is
180 set on the Sentinel instance. */
181 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
182 mstime_t failover_state_change_time;
183 mstime_t failover_start_time; /* When to start to failover if leader. */
184 mstime_t failover_timeout; /* Max time to refresh failover state. */
185 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
186 /* Scripts executed to notify admin or reconfigure clients: when they
187 * are set to NULL no script is executed. */
188 char *notification_script;
189 char *client_reconfig_script;
190 } sentinelRedisInstance;
191
192 /* Main state. */
193 struct sentinelState {
194 dict *masters; /* Dictionary of master sentinelRedisInstances.
195 Key is the instance name, value is the
196 sentinelRedisInstance structure pointer. */
197 int tilt; /* Are we in TILT mode? */
198 int running_scripts; /* Number of scripts in execution right now. */
199 mstime_t tilt_start_time; /* When TITL started. */
200 mstime_t previous_time; /* Time last time we ran the time handler. */
201 list *scripts_queue; /* Queue of user scripts to execute. */
202 } sentinel;
203
204 /* A script execution job. */
205 typedef struct sentinelScriptJob {
206 int flags; /* Script job flags: SENTINEL_SCRIPT_* */
207 int retry_num; /* Number of times we tried to execute it. */
208 char **argv; /* Arguments to call the script. */
209 mstime_t start_time; /* Script execution time if the script is running,
210 otherwise 0 if we are allowed to retry the
211 execution at any time. If the script is not
212 running and it's not 0, it means: do not run
213 before the specified time. */
214 pid_t pid; /* Script execution pid. */
215 } sentinelScriptJob;
216
217 /* ======================= hiredis ae.c adapters =============================
218 * Note: this implementation is taken from hiredis/adapters/ae.h, however
219 * we have our modified copy for Sentinel in order to use our allocator
220 * and to have full control over how the adapter works. */
221
222 typedef struct redisAeEvents {
223 redisAsyncContext *context;
224 aeEventLoop *loop;
225 int fd;
226 int reading, writing;
227 } redisAeEvents;
228
229 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
230 ((void)el); ((void)fd); ((void)mask);
231
232 redisAeEvents *e = (redisAeEvents*)privdata;
233 redisAsyncHandleRead(e->context);
234 }
235
236 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
237 ((void)el); ((void)fd); ((void)mask);
238
239 redisAeEvents *e = (redisAeEvents*)privdata;
240 redisAsyncHandleWrite(e->context);
241 }
242
243 static void redisAeAddRead(void *privdata) {
244 redisAeEvents *e = (redisAeEvents*)privdata;
245 aeEventLoop *loop = e->loop;
246 if (!e->reading) {
247 e->reading = 1;
248 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
249 }
250 }
251
252 static void redisAeDelRead(void *privdata) {
253 redisAeEvents *e = (redisAeEvents*)privdata;
254 aeEventLoop *loop = e->loop;
255 if (e->reading) {
256 e->reading = 0;
257 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
258 }
259 }
260
261 static void redisAeAddWrite(void *privdata) {
262 redisAeEvents *e = (redisAeEvents*)privdata;
263 aeEventLoop *loop = e->loop;
264 if (!e->writing) {
265 e->writing = 1;
266 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
267 }
268 }
269
270 static void redisAeDelWrite(void *privdata) {
271 redisAeEvents *e = (redisAeEvents*)privdata;
272 aeEventLoop *loop = e->loop;
273 if (e->writing) {
274 e->writing = 0;
275 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
276 }
277 }
278
279 static void redisAeCleanup(void *privdata) {
280 redisAeEvents *e = (redisAeEvents*)privdata;
281 redisAeDelRead(privdata);
282 redisAeDelWrite(privdata);
283 zfree(e);
284 }
285
286 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
287 redisContext *c = &(ac->c);
288 redisAeEvents *e;
289
290 /* Nothing should be attached when something is already attached */
291 if (ac->ev.data != NULL)
292 return REDIS_ERR;
293
294 /* Create container for context and r/w events */
295 e = (redisAeEvents*)zmalloc(sizeof(*e));
296 e->context = ac;
297 e->loop = loop;
298 e->fd = c->fd;
299 e->reading = e->writing = 0;
300
301 /* Register functions to start/stop listening for events */
302 ac->ev.addRead = redisAeAddRead;
303 ac->ev.delRead = redisAeDelRead;
304 ac->ev.addWrite = redisAeAddWrite;
305 ac->ev.delWrite = redisAeDelWrite;
306 ac->ev.cleanup = redisAeCleanup;
307 ac->ev.data = e;
308
309 return REDIS_OK;
310 }
311
312 /* ============================= Prototypes ================================= */
313
314 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
315 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
316 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
317 sentinelRedisInstance *sentinelGetMasterByName(char *name);
318 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
319 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
320 int yesnotoi(char *s);
321 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
322 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
323 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
324 void sentinelAbortFailover(sentinelRedisInstance *ri);
325 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
326 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
327 void sentinelScheduleScriptExecution(char *path, ...);
328 void sentinelStartFailover(sentinelRedisInstance *master, int state);
329
330 /* ========================= Dictionary types =============================== */
331
332 unsigned int dictSdsHash(const void *key);
333 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
334 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
335
336 void dictInstancesValDestructor (void *privdata, void *obj) {
337 releaseSentinelRedisInstance(obj);
338 }
339
340 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
341 *
342 * also used for: sentinelRedisInstance->sentinels dictionary that maps
343 * sentinels ip:port to last seen time in Pub/Sub hello message. */
344 dictType instancesDictType = {
345 dictSdsHash, /* hash function */
346 NULL, /* key dup */
347 NULL, /* val dup */
348 dictSdsKeyCompare, /* key compare */
349 NULL, /* key destructor */
350 dictInstancesValDestructor /* val destructor */
351 };
352
353 /* Instance runid (sds) -> votes (long casted to void*)
354 *
355 * This is useful into sentinelGetObjectiveLeader() function in order to
356 * count the votes and understand who is the leader. */
357 dictType leaderVotesDictType = {
358 dictSdsHash, /* hash function */
359 NULL, /* key dup */
360 NULL, /* val dup */
361 dictSdsKeyCompare, /* key compare */
362 NULL, /* key destructor */
363 NULL /* val destructor */
364 };
365
366 /* =========================== Initialization =============================== */
367
368 void sentinelCommand(redisClient *c);
369 void sentinelInfoCommand(redisClient *c);
370
371 struct redisCommand sentinelcmds[] = {
372 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
373 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
374 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
375 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
376 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
377 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
378 {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0}
379 };
380
381 /* This function overwrites a few normal Redis config default with Sentinel
382 * specific defaults. */
383 void initSentinelConfig(void) {
384 server.port = REDIS_SENTINEL_PORT;
385 }
386
387 /* Perform the Sentinel mode initialization. */
388 void initSentinel(void) {
389 int j;
390
391 /* Remove usual Redis commands from the command table, then just add
392 * the SENTINEL command. */
393 dictEmpty(server.commands);
394 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
395 int retval;
396 struct redisCommand *cmd = sentinelcmds+j;
397
398 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
399 redisAssert(retval == DICT_OK);
400 }
401
402 /* Initialize various data structures. */
403 sentinel.masters = dictCreate(&instancesDictType,NULL);
404 sentinel.tilt = 0;
405 sentinel.tilt_start_time = mstime();
406 sentinel.previous_time = mstime();
407 sentinel.running_scripts = 0;
408 sentinel.scripts_queue = listCreate();
409 }
410
411 /* ============================== sentinelAddr ============================== */
412
413 /* Create a sentinelAddr object and return it on success.
414 * On error NULL is returned and errno is set to:
415 * ENOENT: Can't resolve the hostname.
416 * EINVAL: Invalid port number.
417 */
418 sentinelAddr *createSentinelAddr(char *hostname, int port) {
419 char buf[32];
420 sentinelAddr *sa;
421
422 if (port <= 0 || port > 65535) {
423 errno = EINVAL;
424 return NULL;
425 }
426 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
427 errno = ENOENT;
428 return NULL;
429 }
430 sa = zmalloc(sizeof(*sa));
431 sa->ip = sdsnew(buf);
432 sa->port = port;
433 return sa;
434 }
435
436 /* Free a Sentinel address. Can't fail. */
437 void releaseSentinelAddr(sentinelAddr *sa) {
438 sdsfree(sa->ip);
439 zfree(sa);
440 }
441
442 /* =========================== Events notification ========================== */
443
444 /* Send an event to log, pub/sub, user notification script.
445 *
446 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
447 * the execution of the user notification script.
448 *
449 * 'type' is the message type, also used as a pub/sub channel name.
450 *
451 * 'ri', is the redis instance target of this event if applicable, and is
452 * used to obtain the path of the notification script to execute.
453 *
454 * The remaining arguments are printf-alike.
455 * If the format specifier starts with the two characters "%@" then ri is
456 * not NULL, and the message is prefixed with an instance identifier in the
457 * following format:
458 *
459 * <instance type> <instance name> <ip> <port>
460 *
461 * If the instance type is not master, than the additional string is
462 * added to specify the originating master:
463 *
464 * @ <master name> <master ip> <master port>
465 *
466 * Any other specifier after "%@" is processed by printf itself.
467 */
468 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
469 const char *fmt, ...) {
470 va_list ap;
471 char msg[REDIS_MAX_LOGMSG_LEN];
472 robj *channel, *payload;
473
474 /* Handle %@ */
475 if (fmt[0] == '%' && fmt[1] == '@') {
476 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
477 NULL : ri->master;
478
479 if (master) {
480 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
481 sentinelRedisInstanceTypeStr(ri),
482 ri->name, ri->addr->ip, ri->addr->port,
483 master->name, master->addr->ip, master->addr->port);
484 } else {
485 snprintf(msg, sizeof(msg), "%s %s %s %d",
486 sentinelRedisInstanceTypeStr(ri),
487 ri->name, ri->addr->ip, ri->addr->port);
488 }
489 fmt += 2;
490 } else {
491 msg[0] = '\0';
492 }
493
494 /* Use vsprintf for the rest of the formatting if any. */
495 if (fmt[0] != '\0') {
496 va_start(ap, fmt);
497 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
498 va_end(ap);
499 }
500
501 /* Log the message if the log level allows it to be logged. */
502 if (level >= server.verbosity)
503 redisLog(level,"%s %s",type,msg);
504
505 /* Publish the message via Pub/Sub if it's not a debugging one. */
506 if (level != REDIS_DEBUG) {
507 channel = createStringObject(type,strlen(type));
508 payload = createStringObject(msg,strlen(msg));
509 pubsubPublishMessage(channel,payload);
510 decrRefCount(channel);
511 decrRefCount(payload);
512 }
513
514 /* Call the notification script if applicable. */
515 if (level == REDIS_WARNING && ri != NULL) {
516 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
517 ri : ri->master;
518 if (master->notification_script) {
519 sentinelScheduleScriptExecution(master->notification_script,
520 type,msg,NULL);
521 }
522 }
523 }
524
525 /* ============================ script execution ============================ */
526
527 /* Release a script job structure and all the associated data. */
528 void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
529 int j = 0;
530
531 while(sj->argv[j]) sdsfree(sj->argv[j++]);
532 zfree(sj->argv);
533 zfree(sj);
534 }
535
536 #define SENTINEL_SCRIPT_MAX_ARGS 16
537 void sentinelScheduleScriptExecution(char *path, ...) {
538 va_list ap;
539 char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
540 int argc = 1;
541 sentinelScriptJob *sj;
542
543 va_start(ap, path);
544 while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
545 argv[argc] = va_arg(ap,char*);
546 if (!argv[argc]) break;
547 argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
548 argc++;
549 }
550 va_end(ap);
551 argv[0] = sdsnew(path);
552
553 sj = zmalloc(sizeof(*sj));
554 sj->flags = SENTINEL_SCRIPT_NONE;
555 sj->retry_num = 0;
556 sj->argv = zmalloc(sizeof(char*)*(argc+1));
557 sj->start_time = 0;
558 sj->pid = 0;
559 memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
560
561 listAddNodeTail(sentinel.scripts_queue,sj);
562
563 /* Remove the oldest non running script if we already hit the limit. */
564 if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
565 listNode *ln;
566 listIter li;
567
568 listRewind(sentinel.scripts_queue,&li);
569 while ((ln = listNext(&li)) != NULL) {
570 sj = ln->value;
571
572 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
573 /* The first node is the oldest as we add on tail. */
574 listDelNode(sentinel.scripts_queue,ln);
575 sentinelReleaseScriptJob(sj);
576 break;
577 }
578 redisAssert(listLength(sentinel.scripts_queue) <=
579 SENTINEL_SCRIPT_MAX_QUEUE);
580 }
581 }
582
583 /* Lookup a script in the scripts queue via pid, and returns the list node
584 * (so that we can easily remove it from the queue if needed). */
585 listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
586 listNode *ln;
587 listIter li;
588
589 listRewind(sentinel.scripts_queue,&li);
590 while ((ln = listNext(&li)) != NULL) {
591 sentinelScriptJob *sj = ln->value;
592
593 if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
594 return ln;
595 }
596 return NULL;
597 }
598
599 /* Run pending scripts if we are not already at max number of running
600 * scripts. */
601 void sentinelRunPendingScripts(void) {
602 listNode *ln;
603 listIter li;
604 mstime_t now = mstime();
605
606 /* Find jobs that are not running and run them, from the top to the
607 * tail of the queue, so we run older jobs first. */
608 listRewind(sentinel.scripts_queue,&li);
609 while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
610 (ln = listNext(&li)) != NULL)
611 {
612 sentinelScriptJob *sj = ln->value;
613 pid_t pid;
614
615 /* Skip if already running. */
616 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
617
618 /* Skip if it's a retry, but not enough time has elapsed. */
619 if (sj->start_time && sj->start_time > now) continue;
620
621 sj->flags |= SENTINEL_SCRIPT_RUNNING;
622 sj->start_time = mstime();
623 sj->retry_num++;
624 pid = fork();
625
626 if (pid == -1) {
627 /* Parent (fork error).
628 * We report fork errors as signal 99, in order to unify the
629 * reporting with other kind of errors. */
630 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
631 "%s %d %d", sj->argv[0], 99, 0);
632 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
633 sj->pid = 0;
634 } else if (pid == 0) {
635 /* Child */
636 execve(sj->argv[0],sj->argv,environ);
637 /* If we are here an error occurred. */
638 _exit(2); /* Don't retry execution. */
639 } else {
640 sentinel.running_scripts++;
641 sj->pid = pid;
642 sentinelEvent(REDIS_DEBUG,"+script-child",NULL,"%ld",(long)pid);
643 }
644 }
645 }
646
647 /* How much to delay the execution of a script that we need to retry after
648 * an error?
649 *
650 * We double the retry delay for every further retry we do. So for instance
651 * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
652 * starting from the second attempt to execute the script the delays are:
653 * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
654 mstime_t sentinelScriptRetryDelay(int retry_num) {
655 mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
656
657 while (retry_num-- > 1) delay *= 2;
658 return delay;
659 }
660
661 /* Check for scripts that terminated, and remove them from the queue if the
662 * script terminated successfully. If instead the script was terminated by
663 * a signal, or returned exit code "1", it is scheduled to run again if
664 * the max number of retries did not already elapsed. */
665 void sentinelCollectTerminatedScripts(void) {
666 int statloc;
667 pid_t pid;
668
669 while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
670 int exitcode = WEXITSTATUS(statloc);
671 int bysignal = 0;
672 listNode *ln;
673 sentinelScriptJob *sj;
674
675 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
676 sentinelEvent(REDIS_DEBUG,"-script-child",NULL,"%ld %d %d",
677 (long)pid, exitcode, bysignal);
678
679 ln = sentinelGetScriptListNodeByPid(pid);
680 if (ln == NULL) {
681 redisLog(REDIS_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
682 continue;
683 }
684 sj = ln->value;
685
686 /* If the script was terminated by a signal or returns an
687 * exit code of "1" (that means: please retry), we reschedule it
688 * if the max number of retries is not already reached. */
689 if ((bysignal || exitcode == 1) &&
690 sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
691 {
692 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
693 sj->pid = 0;
694 sj->start_time = mstime() +
695 sentinelScriptRetryDelay(sj->retry_num);
696 } else {
697 /* Otherwise let's remove the script, but log the event if the
698 * execution did not terminated in the best of the ways. */
699 if (bysignal || exitcode != 0) {
700 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
701 "%s %d %d", sj->argv[0], bysignal, exitcode);
702 }
703 listDelNode(sentinel.scripts_queue,ln);
704 sentinelReleaseScriptJob(sj);
705 sentinel.running_scripts--;
706 }
707 }
708 }
709
710 /* Kill scripts in timeout, they'll be collected by the
711 * sentinelCollectTerminatedScripts() function. */
712 void sentinelKillTimedoutScripts(void) {
713 listNode *ln;
714 listIter li;
715 mstime_t now = mstime();
716
717 listRewind(sentinel.scripts_queue,&li);
718 while ((ln = listNext(&li)) != NULL) {
719 sentinelScriptJob *sj = ln->value;
720
721 if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
722 (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
723 {
724 sentinelEvent(REDIS_WARNING,"-script-timeout",NULL,"%s %ld",
725 sj->argv[0], (long)sj->pid);
726 kill(sj->pid,SIGKILL);
727 }
728 }
729 }
730
731 /* Implements SENTINEL PENDING-SCRIPTS command. */
732 void sentinelPendingScriptsCommand(redisClient *c) {
733 listNode *ln;
734 listIter li;
735
736 addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
737 listRewind(sentinel.scripts_queue,&li);
738 while ((ln = listNext(&li)) != NULL) {
739 sentinelScriptJob *sj = ln->value;
740 int j = 0;
741
742 addReplyMultiBulkLen(c,10);
743
744 addReplyBulkCString(c,"argv");
745 while (sj->argv[j]) j++;
746 addReplyMultiBulkLen(c,j);
747 j = 0;
748 while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
749
750 addReplyBulkCString(c,"flags");
751 addReplyBulkCString(c,
752 (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
753
754 addReplyBulkCString(c,"pid");
755 addReplyBulkLongLong(c,sj->pid);
756
757 if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
758 addReplyBulkCString(c,"run-time");
759 addReplyBulkLongLong(c,mstime() - sj->start_time);
760 } else {
761 mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
762 if (delay < 0) delay = 0;
763 addReplyBulkCString(c,"run-delay");
764 addReplyBulkLongLong(c,delay);
765 }
766
767 addReplyBulkCString(c,"retry-num");
768 addReplyBulkLongLong(c,sj->retry_num);
769 }
770 }
771
772 /* This function calls, if any, the client reconfiguration script with the
773 * following parameters:
774 *
775 * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
776 *
777 * It is called every time a failover starts, ends, or is aborted.
778 *
779 * <state> is "start", "end" or "abort".
780 * <role> is either "leader" or "observer".
781 *
782 * from/to fields are respectively master -> promoted slave addresses for
783 * "start" and "end", or the reverse (promoted slave -> master) in case of
784 * "abort".
785 */
786 void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
787 char fromport[32], toport[32];
788
789 if (master->client_reconfig_script == NULL) return;
790 ll2string(fromport,sizeof(fromport),from->port);
791 ll2string(toport,sizeof(toport),to->port);
792 sentinelScheduleScriptExecution(master->client_reconfig_script,
793 master->name,
794 (role == SENTINEL_LEADER) ? "leader" : "observer",
795 state, from->ip, fromport, to->ip, toport, NULL);
796 }
797
798 /* ========================== sentinelRedisInstance ========================= */
799
800 /* Create a redis instance, the following fields must be populated by the
801 * caller if needed:
802 * runid: set to NULL but will be populated once INFO output is received.
803 * info_refresh: is set to 0 to mean that we never received INFO so far.
804 *
805 * If SRI_MASTER is set into initial flags the instance is added to
806 * sentinel.masters table.
807 *
808 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
809 * instance is added into master->slaves or master->sentinels table.
810 *
811 * If the instance is a slave or sentinel, the name parameter is ignored and
812 * is created automatically as hostname:port.
813 *
814 * The function fails if hostname can't be resolved or port is out of range.
815 * When this happens NULL is returned and errno is set accordingly to the
816 * createSentinelAddr() function.
817 *
818 * The function may also fail and return NULL with errno set to EBUSY if
819 * a master or slave with the same name already exists. */
820 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
821 sentinelRedisInstance *ri;
822 sentinelAddr *addr;
823 dict *table = NULL;
824 char slavename[128], *sdsname;
825
826 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
827 redisAssert((flags & SRI_MASTER) || master != NULL);
828
829 /* Check address validity. */
830 addr = createSentinelAddr(hostname,port);
831 if (addr == NULL) return NULL;
832
833 /* For slaves and sentinel we use ip:port as name. */
834 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
835 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
836 name = slavename;
837 }
838
839 /* Make sure the entry is not duplicated. This may happen when the same
840 * name for a master is used multiple times inside the configuration or
841 * if we try to add multiple times a slave or sentinel with same ip/port
842 * to a master. */
843 if (flags & SRI_MASTER) table = sentinel.masters;
844 else if (flags & SRI_SLAVE) table = master->slaves;
845 else if (flags & SRI_SENTINEL) table = master->sentinels;
846 sdsname = sdsnew(name);
847 if (dictFind(table,sdsname)) {
848 sdsfree(sdsname);
849 errno = EBUSY;
850 return NULL;
851 }
852
853 /* Create the instance object. */
854 ri = zmalloc(sizeof(*ri));
855 /* Note that all the instances are started in the disconnected state,
856 * the event loop will take care of connecting them. */
857 ri->flags = flags | SRI_DISCONNECTED;
858 ri->name = sdsname;
859 ri->runid = NULL;
860 ri->addr = addr;
861 ri->cc = NULL;
862 ri->pc = NULL;
863 ri->pending_commands = 0;
864 ri->cc_conn_time = 0;
865 ri->pc_conn_time = 0;
866 ri->pc_last_activity = 0;
867 ri->last_avail_time = mstime();
868 ri->last_pong_time = mstime();
869 ri->last_pub_time = mstime();
870 ri->last_hello_time = mstime();
871 ri->last_master_down_reply_time = mstime();
872 ri->s_down_since_time = 0;
873 ri->o_down_since_time = 0;
874 ri->down_after_period = master ? master->down_after_period :
875 SENTINEL_DOWN_AFTER_PERIOD;
876 ri->master_link_down_time = 0;
877 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
878 ri->slave_reconf_sent_time = 0;
879 ri->slave_master_host = NULL;
880 ri->slave_master_port = 0;
881 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
882 ri->sentinels = dictCreate(&instancesDictType,NULL);
883 ri->quorum = quorum;
884 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
885 ri->master = master;
886 ri->slaves = dictCreate(&instancesDictType,NULL);
887 ri->info_refresh = 0;
888
889 /* Failover state. */
890 ri->leader = NULL;
891 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
892 ri->failover_state_change_time = 0;
893 ri->failover_start_time = 0;
894 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
895 ri->promoted_slave = NULL;
896 ri->notification_script = NULL;
897 ri->client_reconfig_script = NULL;
898
899 /* Add into the right table. */
900 dictAdd(table, ri->name, ri);
901 return ri;
902 }
903
904 /* Release this instance and all its slaves, sentinels, hiredis connections.
905 * This function also takes care of unlinking the instance from the main
906 * masters table (if it is a master) or from its master sentinels/slaves table
907 * if it is a slave or sentinel. */
908 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
909 /* Release all its slaves or sentinels if any. */
910 dictRelease(ri->sentinels);
911 dictRelease(ri->slaves);
912
913 /* Release hiredis connections. */
914 if (ri->cc) sentinelKillLink(ri,ri->cc);
915 if (ri->pc) sentinelKillLink(ri,ri->pc);
916
917 /* Free other resources. */
918 sdsfree(ri->name);
919 sdsfree(ri->runid);
920 sdsfree(ri->notification_script);
921 sdsfree(ri->client_reconfig_script);
922 sdsfree(ri->slave_master_host);
923 sdsfree(ri->leader);
924 releaseSentinelAddr(ri->addr);
925
926 /* Clear state into the master if needed. */
927 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
928 ri->master->promoted_slave = NULL;
929
930 zfree(ri);
931 }
932
933 /* Lookup a slave in a master Redis instance, by ip and port. */
934 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
935 sentinelRedisInstance *ri, char *ip, int port)
936 {
937 sds key;
938 sentinelRedisInstance *slave;
939
940 redisAssert(ri->flags & SRI_MASTER);
941 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
942 slave = dictFetchValue(ri->slaves,key);
943 sdsfree(key);
944 return slave;
945 }
946
947 /* Return the name of the type of the instance as a string. */
948 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
949 if (ri->flags & SRI_MASTER) return "master";
950 else if (ri->flags & SRI_SLAVE) return "slave";
951 else if (ri->flags & SRI_SENTINEL) return "sentinel";
952 else return "unknown";
953 }
954
955 /* This function removes all the instances found in the dictionary of instances
956 * 'd', having either:
957 *
958 * 1) The same ip/port as specified.
959 * 2) The same runid.
960 *
961 * "1" and "2" don't need to verify at the same time, just one is enough.
962 * If "runid" is NULL it is not checked.
963 * Similarly if "ip" is NULL it is not checked.
964 *
965 * This function is useful because every time we add a new Sentinel into
966 * a master's Sentinels dictionary, we want to be very sure about not
967 * having duplicated instances for any reason. This is so important because
968 * we use those other sentinels in order to run our quorum protocol to
969 * understand if it's time to proceeed with the fail over.
970 *
971 * Making sure no duplication is possible we greately improve the robustness
972 * of the quorum (otherwise we may end counting the same instance multiple
973 * times for some reason).
974 *
975 * The function returns the number of Sentinels removed. */
976 int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
977 dictIterator *di;
978 dictEntry *de;
979 int removed = 0;
980
981 di = dictGetSafeIterator(master->sentinels);
982 while((de = dictNext(di)) != NULL) {
983 sentinelRedisInstance *ri = dictGetVal(de);
984
985 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
986 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
987 {
988 dictDelete(master->sentinels,ri->name);
989 removed++;
990 }
991 }
992 dictReleaseIterator(di);
993 return removed;
994 }
995
996 /* Search an instance with the same runid, ip and port into a dictionary
997 * of instances. Return NULL if not found, otherwise return the instance
998 * pointer.
999 *
1000 * runid or ip can be NULL. In such a case the search is performed only
1001 * by the non-NULL field. */
1002 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
1003 dictIterator *di;
1004 dictEntry *de;
1005 sentinelRedisInstance *instance = NULL;
1006
1007 redisAssert(ip || runid); /* User must pass at least one search param. */
1008 di = dictGetIterator(instances);
1009 while((de = dictNext(di)) != NULL) {
1010 sentinelRedisInstance *ri = dictGetVal(de);
1011
1012 if (runid && !ri->runid) continue;
1013 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
1014 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
1015 ri->addr->port == port)))
1016 {
1017 instance = ri;
1018 break;
1019 }
1020 }
1021 dictReleaseIterator(di);
1022 return instance;
1023 }
1024
1025 /* Simple master lookup by name */
1026 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
1027 sentinelRedisInstance *ri;
1028 sds sdsname = sdsnew(name);
1029
1030 ri = dictFetchValue(sentinel.masters,sdsname);
1031 sdsfree(sdsname);
1032 return ri;
1033 }
1034
1035 /* Add the specified flags to all the instances in the specified dictionary. */
1036 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
1037 dictIterator *di;
1038 dictEntry *de;
1039
1040 di = dictGetIterator(instances);
1041 while((de = dictNext(di)) != NULL) {
1042 sentinelRedisInstance *ri = dictGetVal(de);
1043 ri->flags |= flags;
1044 }
1045 dictReleaseIterator(di);
1046 }
1047
1048 /* Remove the specified flags to all the instances in the specified
1049 * dictionary. */
1050 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
1051 dictIterator *di;
1052 dictEntry *de;
1053
1054 di = dictGetIterator(instances);
1055 while((de = dictNext(di)) != NULL) {
1056 sentinelRedisInstance *ri = dictGetVal(de);
1057 ri->flags &= ~flags;
1058 }
1059 dictReleaseIterator(di);
1060 }
1061
1062 /* Reset the state of a monitored master:
1063 * 1) Remove all slaves.
1064 * 2) Remove all sentinels.
1065 * 3) Remove most of the flags resulting from runtime operations.
1066 * 4) Reset timers to their default value.
1067 * 5) In the process of doing this undo the failover if in progress.
1068 * 6) Disconnect the connections with the master (will reconnect automatically).
1069 */
1070 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
1071 redisAssert(ri->flags & SRI_MASTER);
1072 dictRelease(ri->slaves);
1073 dictRelease(ri->sentinels);
1074 ri->slaves = dictCreate(&instancesDictType,NULL);
1075 ri->sentinels = dictCreate(&instancesDictType,NULL);
1076 if (ri->cc) sentinelKillLink(ri,ri->cc);
1077 if (ri->pc) sentinelKillLink(ri,ri->pc);
1078 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
1079 if (ri->leader) {
1080 sdsfree(ri->leader);
1081 ri->leader = NULL;
1082 }
1083 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1084 ri->failover_state_change_time = 0;
1085 ri->failover_start_time = 0;
1086 ri->promoted_slave = NULL;
1087 sdsfree(ri->runid);
1088 sdsfree(ri->slave_master_host);
1089 ri->runid = NULL;
1090 ri->slave_master_host = NULL;
1091 ri->last_avail_time = mstime();
1092 ri->last_pong_time = mstime();
1093 if (flags & SENTINEL_GENERATE_EVENT)
1094 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
1095 }
1096
1097 /* Call sentinelResetMaster() on every master with a name matching the specified
1098 * pattern. */
1099 int sentinelResetMastersByPattern(char *pattern, int flags) {
1100 dictIterator *di;
1101 dictEntry *de;
1102 int reset = 0;
1103
1104 di = dictGetIterator(sentinel.masters);
1105 while((de = dictNext(di)) != NULL) {
1106 sentinelRedisInstance *ri = dictGetVal(de);
1107
1108 if (ri->name) {
1109 if (stringmatch(pattern,ri->name,0)) {
1110 sentinelResetMaster(ri,flags);
1111 reset++;
1112 }
1113 }
1114 }
1115 dictReleaseIterator(di);
1116 return reset;
1117 }
1118
1119 /* Reset the specified master with sentinelResetMaster(), and also change
1120 * the ip:port address, but take the name of the instance unmodified.
1121 *
1122 * This is used to handle the +switch-master and +redirect-to-master events.
1123 *
1124 * The function returns REDIS_ERR if the address can't be resolved for some
1125 * reason. Otherwise REDIS_OK is returned.
1126 *
1127 * TODO: make this reset so that original sentinels are re-added with
1128 * same ip / port / runid.
1129 */
1130
1131 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
1132 sentinelAddr *oldaddr, *newaddr;
1133
1134 newaddr = createSentinelAddr(ip,port);
1135 if (newaddr == NULL) return REDIS_ERR;
1136 sentinelResetMaster(master,SENTINEL_NO_FLAGS);
1137 oldaddr = master->addr;
1138 master->addr = newaddr;
1139 /* Release the old address at the end so we are safe even if the function
1140 * gets the master->addr->ip and master->addr->port as arguments. */
1141 releaseSentinelAddr(oldaddr);
1142 return REDIS_OK;
1143 }
1144
1145 /* ============================ Config handling ============================= */
1146 char *sentinelHandleConfiguration(char **argv, int argc) {
1147 sentinelRedisInstance *ri;
1148
1149 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
1150 /* monitor <name> <host> <port> <quorum> */
1151 int quorum = atoi(argv[4]);
1152
1153 if (quorum <= 0) return "Quorum must be 1 or greater.";
1154 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
1155 atoi(argv[3]),quorum,NULL) == NULL)
1156 {
1157 switch(errno) {
1158 case EBUSY: return "Duplicated master name.";
1159 case ENOENT: return "Can't resolve master instance hostname.";
1160 case EINVAL: return "Invalid port number";
1161 }
1162 }
1163 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
1164 /* down-after-milliseconds <name> <milliseconds> */
1165 ri = sentinelGetMasterByName(argv[1]);
1166 if (!ri) return "No such master with specified name.";
1167 ri->down_after_period = atoi(argv[2]);
1168 if (ri->down_after_period <= 0)
1169 return "negative or zero time parameter.";
1170 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
1171 /* failover-timeout <name> <milliseconds> */
1172 ri = sentinelGetMasterByName(argv[1]);
1173 if (!ri) return "No such master with specified name.";
1174 ri->failover_timeout = atoi(argv[2]);
1175 if (ri->failover_timeout <= 0)
1176 return "negative or zero time parameter.";
1177 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
1178 /* can-failover <name> <yes/no> */
1179 int yesno = yesnotoi(argv[2]);
1180
1181 ri = sentinelGetMasterByName(argv[1]);
1182 if (!ri) return "No such master with specified name.";
1183 if (yesno == -1) return "Argument must be either yes or no.";
1184 if (yesno)
1185 ri->flags |= SRI_CAN_FAILOVER;
1186 else
1187 ri->flags &= ~SRI_CAN_FAILOVER;
1188 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
1189 /* parallel-syncs <name> <milliseconds> */
1190 ri = sentinelGetMasterByName(argv[1]);
1191 if (!ri) return "No such master with specified name.";
1192 ri->parallel_syncs = atoi(argv[2]);
1193 } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
1194 /* notification-script <name> <path> */
1195 ri = sentinelGetMasterByName(argv[1]);
1196 if (!ri) return "No such master with specified name.";
1197 if (access(argv[2],X_OK) == -1)
1198 return "Notification script seems non existing or non executable.";
1199 ri->notification_script = sdsnew(argv[2]);
1200 } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
1201 /* client-reconfig-script <name> <path> */
1202 ri = sentinelGetMasterByName(argv[1]);
1203 if (!ri) return "No such master with specified name.";
1204 if (access(argv[2],X_OK) == -1)
1205 return "Client reconfiguration script seems non existing or "
1206 "non executable.";
1207 ri->client_reconfig_script = sdsnew(argv[2]);
1208 } else {
1209 return "Unrecognized sentinel configuration statement.";
1210 }
1211 return NULL;
1212 }
1213
1214 /* ====================== hiredis connection handling ======================= */
1215
1216 /* Completely disconnect an hiredis link from an instance. */
1217 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
1218 if (ri->cc == c) {
1219 ri->cc = NULL;
1220 ri->pending_commands = 0;
1221 }
1222 if (ri->pc == c) ri->pc = NULL;
1223 c->data = NULL;
1224 ri->flags |= SRI_DISCONNECTED;
1225 redisAsyncFree(c);
1226 }
1227
1228 /* This function takes an hiredis context that is in an error condition
1229 * and make sure to mark the instance as disconnected performing the
1230 * cleanup needed.
1231 *
1232 * Note: we don't free the hiredis context as hiredis will do it for us
1233 * for async conenctions. */
1234 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
1235 sentinelRedisInstance *ri = c->data;
1236 int pubsub;
1237
1238 if (ri == NULL) return; /* The instance no longer exists. */
1239
1240 pubsub = (ri->pc == c);
1241 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
1242 "%@ #%s", c->errstr);
1243 if (pubsub)
1244 ri->pc = NULL;
1245 else
1246 ri->cc = NULL;
1247 ri->flags |= SRI_DISCONNECTED;
1248 }
1249
1250 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
1251 if (status != REDIS_OK) {
1252 sentinelDisconnectInstanceFromContext(c);
1253 } else {
1254 sentinelRedisInstance *ri = c->data;
1255 int pubsub = (ri->pc == c);
1256
1257 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
1258 "%@");
1259 }
1260 }
1261
1262 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
1263 sentinelDisconnectInstanceFromContext(c);
1264 }
1265
1266 /* Create the async connections for the specified instance if the instance
1267 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
1268 * one of the two links (commands and pub/sub) is missing. */
1269 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
1270 if (!(ri->flags & SRI_DISCONNECTED)) return;
1271
1272 /* Commands connection. */
1273 if (ri->cc == NULL) {
1274 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1275 if (ri->cc->err) {
1276 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
1277 ri->cc->errstr);
1278 sentinelKillLink(ri,ri->cc);
1279 } else {
1280 ri->cc_conn_time = mstime();
1281 ri->cc->data = ri;
1282 redisAeAttach(server.el,ri->cc);
1283 redisAsyncSetConnectCallback(ri->cc,
1284 sentinelLinkEstablishedCallback);
1285 redisAsyncSetDisconnectCallback(ri->cc,
1286 sentinelDisconnectCallback);
1287 }
1288 }
1289 /* Pub / Sub */
1290 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
1291 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1292 if (ri->pc->err) {
1293 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
1294 ri->pc->errstr);
1295 sentinelKillLink(ri,ri->pc);
1296 } else {
1297 int retval;
1298
1299 ri->pc_conn_time = mstime();
1300 ri->pc->data = ri;
1301 redisAeAttach(server.el,ri->pc);
1302 redisAsyncSetConnectCallback(ri->pc,
1303 sentinelLinkEstablishedCallback);
1304 redisAsyncSetDisconnectCallback(ri->pc,
1305 sentinelDisconnectCallback);
1306 /* Now we subscribe to the Sentinels "Hello" channel. */
1307 retval = redisAsyncCommand(ri->pc,
1308 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
1309 SENTINEL_HELLO_CHANNEL);
1310 if (retval != REDIS_OK) {
1311 /* If we can't subscribe, the Pub/Sub connection is useless
1312 * and we can simply disconnect it and try again. */
1313 sentinelKillLink(ri,ri->pc);
1314 return;
1315 }
1316 }
1317 }
1318 /* Clear the DISCONNECTED flags only if we have both the connections
1319 * (or just the commands connection if this is a slave or a
1320 * sentinel instance). */
1321 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
1322 ri->flags &= ~SRI_DISCONNECTED;
1323 }
1324
1325 /* ======================== Redis instances pinging ======================== */
1326
1327 /* Process the INFO output from masters. */
1328 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
1329 sds *lines;
1330 int numlines, j;
1331 int role = 0;
1332 int runid_changed = 0; /* true if runid changed. */
1333 int first_runid = 0; /* true if this is the first runid we receive. */
1334
1335 /* The following fields must be reset to a given value in the case they
1336 * are not found at all in the INFO output. */
1337 ri->master_link_down_time = 0;
1338
1339 /* Process line by line. */
1340 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
1341 for (j = 0; j < numlines; j++) {
1342 sentinelRedisInstance *slave;
1343 sds l = lines[j];
1344
1345 /* run_id:<40 hex chars>*/
1346 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
1347 if (ri->runid == NULL) {
1348 ri->runid = sdsnewlen(l+7,40);
1349 first_runid = 1;
1350 } else {
1351 if (strncmp(ri->runid,l+7,40) != 0) {
1352 runid_changed = 1;
1353 sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
1354 sdsfree(ri->runid);
1355 ri->runid = sdsnewlen(l+7,40);
1356 }
1357 }
1358 }
1359
1360 /* slave0:<ip>,<port>,<state> */
1361 if ((ri->flags & SRI_MASTER) &&
1362 sdslen(l) >= 7 &&
1363 !memcmp(l,"slave",5) && isdigit(l[5]))
1364 {
1365 char *ip, *port, *end;
1366
1367 ip = strchr(l,':'); if (!ip) continue;
1368 ip++; /* Now ip points to start of ip address. */
1369 port = strchr(ip,','); if (!port) continue;
1370 *port = '\0'; /* nul term for easy access. */
1371 port++; /* Now port points to start of port number. */
1372 end = strchr(port,','); if (!end) continue;
1373 *end = '\0'; /* nul term for easy access. */
1374
1375 /* Check if we already have this slave into our table,
1376 * otherwise add it. */
1377 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1378 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1379 atoi(port), ri->quorum,ri)) != NULL)
1380 {
1381 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1382 }
1383 }
1384 }
1385
1386 /* master_link_down_since_seconds:<seconds> */
1387 if (sdslen(l) >= 32 &&
1388 !memcmp(l,"master_link_down_since_seconds",30))
1389 {
1390 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1391 }
1392
1393 /* role:<role> */
1394 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1395 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1396
1397 if (role == SRI_SLAVE) {
1398 /* master_host:<host> */
1399 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1400 sdsfree(ri->slave_master_host);
1401 ri->slave_master_host = sdsnew(l+12);
1402 }
1403
1404 /* master_port:<port> */
1405 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1406 ri->slave_master_port = atoi(l+12);
1407
1408 /* master_link_status:<status> */
1409 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1410 ri->slave_master_link_status =
1411 (strcasecmp(l+19,"up") == 0) ?
1412 SENTINEL_MASTER_LINK_STATUS_UP :
1413 SENTINEL_MASTER_LINK_STATUS_DOWN;
1414 }
1415
1416 /* slave_priority:<priority> */
1417 if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
1418 ri->slave_priority = atoi(l+15);
1419 }
1420 }
1421 ri->info_refresh = mstime();
1422 sdsfreesplitres(lines,numlines);
1423
1424 /* ---------------------------- Acting half ----------------------------- */
1425 if (sentinel.tilt) return;
1426
1427 /* Act if a master turned into a slave. */
1428 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
1429 if (first_runid && ri->slave_master_host) {
1430 /* If it is the first time we receive INFO from it, but it's
1431 * a slave while it was configured as a master, we want to monitor
1432 * its master instead. */
1433 sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
1434 "%s %s %d %s %d",
1435 ri->name, ri->addr->ip, ri->addr->port,
1436 ri->slave_master_host, ri->slave_master_port);
1437 sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
1438 ri->slave_master_port);
1439 return;
1440 }
1441 }
1442
1443 /* Act if a slave turned into a master. */
1444 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
1445 if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1446 (runid_changed || first_runid))
1447 {
1448 /* If a slave turned into maser but:
1449 *
1450 * 1) Failover not in progress.
1451 * 2) RunID hs changed, or its the first time we see an INFO output.
1452 *
1453 * We assume this is a reboot with a wrong configuration.
1454 * Log the event and remove the slave. */
1455 int retval;
1456
1457 sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
1458 retval = dictDelete(ri->master->slaves,ri->name);
1459 redisAssert(retval == REDIS_OK);
1460 return;
1461 } else if (ri->flags & SRI_PROMOTED) {
1462 /* If this is a promoted slave we can change state to the
1463 * failover state machine. */
1464 if ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1465 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1466 (ri->master->failover_state ==
1467 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1468 {
1469 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1470 ri->master->failover_state_change_time = mstime();
1471 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1472 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1473 ri->master,"%@");
1474 sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
1475 "start",ri->master->addr,ri->addr);
1476 }
1477 } else if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) ||
1478 ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1479 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1480 ri->master->failover_state ==
1481 SENTINEL_FAILOVER_STATE_WAIT_START))
1482 {
1483 /* No failover in progress? Then it is the start of a failover
1484 * and we are an observer.
1485 *
1486 * We also do that if we are a leader doing a failover, in wait
1487 * start, but well, somebody else started before us. */
1488
1489 if (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) {
1490 sentinelEvent(REDIS_WARNING,"-failover-abort-race",
1491 ri->master, "%@");
1492 sentinelAbortFailover(ri->master);
1493 }
1494
1495 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1496 sentinelEvent(REDIS_WARNING,"+failover-detected",ri->master,"%@");
1497 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1498 ri->master->failover_state_change_time = mstime();
1499 ri->master->promoted_slave = ri;
1500 ri->flags |= SRI_PROMOTED;
1501 sentinelCallClientReconfScript(ri->master,SENTINEL_OBSERVER,
1502 "start", ri->master->addr,ri->addr);
1503 /* We are an observer, so we can only assume that the leader
1504 * is reconfiguring the slave instances. For this reason we
1505 * set all the instances as RECONF_SENT waiting for progresses
1506 * on this side. */
1507 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1508 SRI_RECONF_SENT);
1509 }
1510 }
1511
1512 /* Detect if the slave that is in the process of being reconfigured
1513 * changed state. */
1514 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1515 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1516 {
1517 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1518 if ((ri->flags & SRI_RECONF_SENT) &&
1519 ri->slave_master_host &&
1520 strcmp(ri->slave_master_host,
1521 ri->master->promoted_slave->addr->ip) == 0 &&
1522 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1523 {
1524 ri->flags &= ~SRI_RECONF_SENT;
1525 ri->flags |= SRI_RECONF_INPROG;
1526 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1527 }
1528
1529 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1530 if ((ri->flags & SRI_RECONF_INPROG) &&
1531 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1532 {
1533 ri->flags &= ~SRI_RECONF_INPROG;
1534 ri->flags |= SRI_RECONF_DONE;
1535 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1536 /* If we are moving forward (a new slave is now configured)
1537 * we update the change_time as we are conceptually passing
1538 * to the next slave. */
1539 ri->failover_state_change_time = mstime();
1540 }
1541 }
1542 }
1543
1544 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1545 sentinelRedisInstance *ri = c->data;
1546 redisReply *r;
1547
1548 if (ri) ri->pending_commands--;
1549 if (!reply || !ri) return;
1550 r = reply;
1551
1552 if (r->type == REDIS_REPLY_STRING) {
1553 sentinelRefreshInstanceInfo(ri,r->str);
1554 }
1555 }
1556
1557 /* Just discard the reply. We use this when we are not monitoring the return
1558 * value of the command but its effects directly. */
1559 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1560 sentinelRedisInstance *ri = c->data;
1561
1562 if (ri) ri->pending_commands--;
1563 }
1564
1565 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1566 sentinelRedisInstance *ri = c->data;
1567 redisReply *r;
1568
1569 if (ri) ri->pending_commands--;
1570 if (!reply || !ri) return;
1571 r = reply;
1572
1573 if (r->type == REDIS_REPLY_STATUS ||
1574 r->type == REDIS_REPLY_ERROR) {
1575 /* Update the "instance available" field only if this is an
1576 * acceptable reply. */
1577 if (strncmp(r->str,"PONG",4) == 0 ||
1578 strncmp(r->str,"LOADING",7) == 0 ||
1579 strncmp(r->str,"MASTERDOWN",10) == 0)
1580 {
1581 ri->last_avail_time = mstime();
1582 } else {
1583 /* Send a SCRIPT KILL command if the instance appears to be
1584 * down because of a busy script. */
1585 if (strncmp(r->str,"BUSY",4) == 0 &&
1586 (ri->flags & SRI_S_DOWN) &&
1587 !(ri->flags & SRI_SCRIPT_KILL_SENT))
1588 {
1589 redisAsyncCommand(ri->cc,
1590 sentinelDiscardReplyCallback, NULL, "SCRIPT KILL");
1591 ri->flags |= SRI_SCRIPT_KILL_SENT;
1592 }
1593 }
1594 }
1595 ri->last_pong_time = mstime();
1596 }
1597
1598 /* This is called when we get the reply about the PUBLISH command we send
1599 * to the master to advertise this sentinel. */
1600 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1601 sentinelRedisInstance *ri = c->data;
1602 redisReply *r;
1603
1604 if (ri) ri->pending_commands--;
1605 if (!reply || !ri) return;
1606 r = reply;
1607
1608 /* Only update pub_time if we actually published our message. Otherwise
1609 * we'll retry against in 100 milliseconds. */
1610 if (r->type != REDIS_REPLY_ERROR)
1611 ri->last_pub_time = mstime();
1612 }
1613
1614 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
1615 * to discover other sentinels attached at the same master. */
1616 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1617 sentinelRedisInstance *ri = c->data;
1618 redisReply *r;
1619
1620 if (!reply || !ri) return;
1621 r = reply;
1622
1623 /* Update the last activity in the pubsub channel. Note that since we
1624 * receive our messages as well this timestamp can be used to detect
1625 * if the link is probably diconnected even if it seems otherwise. */
1626 ri->pc_last_activity = mstime();
1627
1628 /* Sanity check in the reply we expect, so that the code that follows
1629 * can avoid to check for details. */
1630 if (r->type != REDIS_REPLY_ARRAY ||
1631 r->elements != 3 ||
1632 r->element[0]->type != REDIS_REPLY_STRING ||
1633 r->element[1]->type != REDIS_REPLY_STRING ||
1634 r->element[2]->type != REDIS_REPLY_STRING ||
1635 strcmp(r->element[0]->str,"message") != 0) return;
1636
1637 /* We are not interested in meeting ourselves */
1638 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1639
1640 {
1641 int numtokens, port, removed, canfailover;
1642 char **token = sdssplitlen(r->element[2]->str,
1643 r->element[2]->len,
1644 ":",1,&numtokens);
1645 sentinelRedisInstance *sentinel;
1646
1647 if (numtokens == 4) {
1648 /* First, try to see if we already have this sentinel. */
1649 port = atoi(token[1]);
1650 canfailover = atoi(token[3]);
1651 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1652 ri->sentinels,token[0],port,token[2]);
1653
1654 if (!sentinel) {
1655 /* If not, remove all the sentinels that have the same runid
1656 * OR the same ip/port, because it's either a restart or a
1657 * network topology change. */
1658 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1659 token[2]);
1660 if (removed) {
1661 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1662 "%@ #duplicate of %s:%d or %s",
1663 token[0],port,token[2]);
1664 }
1665
1666 /* Add the new sentinel. */
1667 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1668 token[0],port,ri->quorum,ri);
1669 if (sentinel) {
1670 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1671 /* The runid is NULL after a new instance creation and
1672 * for Sentinels we don't have a later chance to fill it,
1673 * so do it now. */
1674 sentinel->runid = sdsnew(token[2]);
1675 }
1676 }
1677
1678 /* Update the state of the Sentinel. */
1679 if (sentinel) {
1680 sentinel->last_hello_time = mstime();
1681 if (canfailover)
1682 sentinel->flags |= SRI_CAN_FAILOVER;
1683 else
1684 sentinel->flags &= ~SRI_CAN_FAILOVER;
1685 }
1686 }
1687 sdsfreesplitres(token,numtokens);
1688 }
1689 }
1690
1691 void sentinelPingInstance(sentinelRedisInstance *ri) {
1692 mstime_t now = mstime();
1693 mstime_t info_period;
1694 int retval;
1695
1696 /* Return ASAP if we have already a PING or INFO already pending, or
1697 * in the case the instance is not properly connected. */
1698 if (ri->flags & SRI_DISCONNECTED) return;
1699
1700 /* For INFO, PING, PUBLISH that are not critical commands to send we
1701 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1702 * want to use a lot of memory just because a link is not working
1703 * properly (note that anyway there is a redundant protection about this,
1704 * that is, the link will be disconnected and reconnected if a long
1705 * timeout condition is detected. */
1706 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1707
1708 /* If this is a slave of a master in O_DOWN condition we start sending
1709 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1710 * period. In this state we want to closely monitor slaves in case they
1711 * are turned into masters by another Sentinel, or by the sysadmin. */
1712 if ((ri->flags & SRI_SLAVE) &&
1713 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1714 info_period = 1000;
1715 } else {
1716 info_period = SENTINEL_INFO_PERIOD;
1717 }
1718
1719 if ((ri->flags & SRI_SENTINEL) == 0 &&
1720 (ri->info_refresh == 0 ||
1721 (now - ri->info_refresh) > info_period))
1722 {
1723 /* Send INFO to masters and slaves, not sentinels. */
1724 retval = redisAsyncCommand(ri->cc,
1725 sentinelInfoReplyCallback, NULL, "INFO");
1726 if (retval != REDIS_OK) return;
1727 ri->pending_commands++;
1728 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1729 /* Send PING to all the three kinds of instances. */
1730 retval = redisAsyncCommand(ri->cc,
1731 sentinelPingReplyCallback, NULL, "PING");
1732 if (retval != REDIS_OK) return;
1733 ri->pending_commands++;
1734 } else if ((ri->flags & SRI_MASTER) &&
1735 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1736 {
1737 /* PUBLISH hello messages only to masters. */
1738 struct sockaddr_in sa;
1739 socklen_t salen = sizeof(sa);
1740
1741 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1742 char myaddr[128];
1743
1744 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1745 inet_ntoa(sa.sin_addr), server.port, server.runid,
1746 (ri->flags & SRI_CAN_FAILOVER) != 0);
1747 retval = redisAsyncCommand(ri->cc,
1748 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1749 SENTINEL_HELLO_CHANNEL,myaddr);
1750 if (retval != REDIS_OK) return;
1751 ri->pending_commands++;
1752 }
1753 }
1754 }
1755
1756 /* =========================== SENTINEL command ============================= */
1757
1758 const char *sentinelFailoverStateStr(int state) {
1759 switch(state) {
1760 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1761 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1762 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1763 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1764 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1765 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1766 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1767 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1768 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1769 default: return "unknown";
1770 }
1771 }
1772
1773 /* Redis instance to Redis protocol representation. */
1774 void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1775 char *flags = sdsempty();
1776 void *mbl;
1777 int fields = 0;
1778
1779 mbl = addDeferredMultiBulkLength(c);
1780
1781 addReplyBulkCString(c,"name");
1782 addReplyBulkCString(c,ri->name);
1783 fields++;
1784
1785 addReplyBulkCString(c,"ip");
1786 addReplyBulkCString(c,ri->addr->ip);
1787 fields++;
1788
1789 addReplyBulkCString(c,"port");
1790 addReplyBulkLongLong(c,ri->addr->port);
1791 fields++;
1792
1793 addReplyBulkCString(c,"runid");
1794 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1795 fields++;
1796
1797 addReplyBulkCString(c,"flags");
1798 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1799 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1800 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1801 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1802 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1803 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1804 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1805 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1806 flags = sdscat(flags,"failover_in_progress,");
1807 if (ri->flags & SRI_I_AM_THE_LEADER)
1808 flags = sdscat(flags,"i_am_the_leader,");
1809 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1810 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1811 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1812 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1813
1814 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1815 addReplyBulkCString(c,flags);
1816 sdsfree(flags);
1817 fields++;
1818
1819 addReplyBulkCString(c,"pending-commands");
1820 addReplyBulkLongLong(c,ri->pending_commands);
1821 fields++;
1822
1823 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1824 addReplyBulkCString(c,"failover-state");
1825 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1826 fields++;
1827 }
1828
1829 addReplyBulkCString(c,"last-ok-ping-reply");
1830 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1831 fields++;
1832
1833 addReplyBulkCString(c,"last-ping-reply");
1834 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1835 fields++;
1836
1837 if (ri->flags & SRI_S_DOWN) {
1838 addReplyBulkCString(c,"s-down-time");
1839 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1840 fields++;
1841 }
1842
1843 if (ri->flags & SRI_O_DOWN) {
1844 addReplyBulkCString(c,"o-down-time");
1845 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1846 fields++;
1847 }
1848
1849 /* Masters and Slaves */
1850 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1851 addReplyBulkCString(c,"info-refresh");
1852 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1853 fields++;
1854 }
1855
1856 /* Only masters */
1857 if (ri->flags & SRI_MASTER) {
1858 addReplyBulkCString(c,"num-slaves");
1859 addReplyBulkLongLong(c,dictSize(ri->slaves));
1860 fields++;
1861
1862 addReplyBulkCString(c,"num-other-sentinels");
1863 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1864 fields++;
1865
1866 addReplyBulkCString(c,"quorum");
1867 addReplyBulkLongLong(c,ri->quorum);
1868 fields++;
1869 }
1870
1871 /* Only slaves */
1872 if (ri->flags & SRI_SLAVE) {
1873 addReplyBulkCString(c,"master-link-down-time");
1874 addReplyBulkLongLong(c,ri->master_link_down_time);
1875 fields++;
1876
1877 addReplyBulkCString(c,"master-link-status");
1878 addReplyBulkCString(c,
1879 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1880 "ok" : "err");
1881 fields++;
1882
1883 addReplyBulkCString(c,"master-host");
1884 addReplyBulkCString(c,
1885 ri->slave_master_host ? ri->slave_master_host : "?");
1886 fields++;
1887
1888 addReplyBulkCString(c,"master-port");
1889 addReplyBulkLongLong(c,ri->slave_master_port);
1890 fields++;
1891
1892 addReplyBulkCString(c,"slave-priority");
1893 addReplyBulkLongLong(c,ri->slave_priority);
1894 fields++;
1895 }
1896
1897 /* Only sentinels */
1898 if (ri->flags & SRI_SENTINEL) {
1899 addReplyBulkCString(c,"last-hello-message");
1900 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1901 fields++;
1902
1903 addReplyBulkCString(c,"can-failover-its-master");
1904 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1905 fields++;
1906
1907 if (ri->flags & SRI_MASTER_DOWN) {
1908 addReplyBulkCString(c,"subjective-leader");
1909 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1910 fields++;
1911 }
1912 }
1913
1914 setDeferredMultiBulkLength(c,mbl,fields*2);
1915 }
1916
1917 /* Output a number of instances contanined inside a dictionary as
1918 * Redis protocol. */
1919 void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1920 dictIterator *di;
1921 dictEntry *de;
1922
1923 di = dictGetIterator(instances);
1924 addReplyMultiBulkLen(c,dictSize(instances));
1925 while((de = dictNext(di)) != NULL) {
1926 sentinelRedisInstance *ri = dictGetVal(de);
1927
1928 addReplySentinelRedisInstance(c,ri);
1929 }
1930 dictReleaseIterator(di);
1931 }
1932
1933 /* Lookup the named master into sentinel.masters.
1934 * If the master is not found reply to the client with an error and returns
1935 * NULL. */
1936 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1937 robj *name)
1938 {
1939 sentinelRedisInstance *ri;
1940
1941 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1942 if (!ri) {
1943 addReplyError(c,"No such master with that name");
1944 return NULL;
1945 }
1946 return ri;
1947 }
1948
1949 void sentinelCommand(redisClient *c) {
1950 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1951 /* SENTINEL MASTERS */
1952 if (c->argc != 2) goto numargserr;
1953
1954 addReplyDictOfRedisInstances(c,sentinel.masters);
1955 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1956 /* SENTINEL SLAVES <master-name> */
1957 sentinelRedisInstance *ri;
1958
1959 if (c->argc != 3) goto numargserr;
1960 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1961 return;
1962 addReplyDictOfRedisInstances(c,ri->slaves);
1963 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1964 /* SENTINEL SENTINELS <master-name> */
1965 sentinelRedisInstance *ri;
1966
1967 if (c->argc != 3) goto numargserr;
1968 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1969 return;
1970 addReplyDictOfRedisInstances(c,ri->sentinels);
1971 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1972 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1973 sentinelRedisInstance *ri;
1974 char *leader = NULL;
1975 long port;
1976 int isdown = 0;
1977
1978 if (c->argc != 4) goto numargserr;
1979 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
1980 return;
1981 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
1982 c->argv[2]->ptr,port,NULL);
1983
1984 /* It exists? Is actually a master? Is subjectively down? It's down.
1985 * Note: if we are in tilt mode we always reply with "0". */
1986 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
1987 (ri->flags & SRI_MASTER))
1988 isdown = 1;
1989 if (ri) leader = sentinelGetSubjectiveLeader(ri);
1990
1991 /* Reply with a two-elements multi-bulk reply: down state, leader. */
1992 addReplyMultiBulkLen(c,2);
1993 addReply(c, isdown ? shared.cone : shared.czero);
1994 addReplyBulkCString(c, leader ? leader : "?");
1995 if (leader) sdsfree(leader);
1996 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
1997 /* SENTINEL RESET <pattern> */
1998 if (c->argc != 3) goto numargserr;
1999 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
2000 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
2001 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
2002 sentinelRedisInstance *ri;
2003
2004 if (c->argc != 3) goto numargserr;
2005 ri = sentinelGetMasterByName(c->argv[2]->ptr);
2006 if (ri == NULL) {
2007 addReply(c,shared.nullmultibulk);
2008 } else {
2009 sentinelAddr *addr = ri->addr;
2010
2011 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
2012 addr = ri->promoted_slave->addr;
2013 addReplyMultiBulkLen(c,2);
2014 addReplyBulkCString(c,addr->ip);
2015 addReplyBulkLongLong(c,addr->port);
2016 }
2017 } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
2018 /* SENTINEL FAILOVER <master-name> */
2019 sentinelRedisInstance *ri;
2020
2021 if (c->argc != 3) goto numargserr;
2022 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
2023 return;
2024 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
2025 addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
2026 return;
2027 }
2028 if (sentinelSelectSlave(ri) == NULL) {
2029 addReplySds(c,sdsnew("-NOGOODSLAVE No suitable slave to promote\r\n"));
2030 return;
2031 }
2032 sentinelStartFailover(ri,SENTINEL_FAILOVER_STATE_WAIT_START);
2033 ri->flags |= SRI_FORCE_FAILOVER;
2034 addReply(c,shared.ok);
2035 } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
2036 /* SENTINEL PENDING-SCRIPTS */
2037
2038 if (c->argc != 2) goto numargserr;
2039 sentinelPendingScriptsCommand(c);
2040 } else {
2041 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
2042 (char*)c->argv[1]->ptr);
2043 }
2044 return;
2045
2046 numargserr:
2047 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
2048 (char*)c->argv[1]->ptr);
2049 }
2050
2051 void sentinelInfoCommand(redisClient *c) {
2052 char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
2053 sds info = sdsempty();
2054 int defsections = !strcasecmp(section,"default");
2055 int sections = 0;
2056
2057 if (c->argc > 2) {
2058 addReply(c,shared.syntaxerr);
2059 return;
2060 }
2061
2062 if (!strcasecmp(section,"server") || defsections) {
2063 if (sections++) info = sdscat(info,"\r\n");
2064 sds serversection = genRedisInfoString("server");
2065 info = sdscatlen(info,serversection,sdslen(serversection));
2066 sdsfree(serversection);
2067 }
2068
2069 if (!strcasecmp(section,"sentinel") || defsections) {
2070 dictIterator *di;
2071 dictEntry *de;
2072 int master_id = 0;
2073
2074 if (sections++) info = sdscat(info,"\r\n");
2075 info = sdscatprintf(info,
2076 "# Sentinel\r\n"
2077 "sentinel_masters:%lu\r\n"
2078 "sentinel_tilt:%d\r\n"
2079 "sentinel_running_scripts:%d\r\n"
2080 "sentinel_scripts_queue_length:%ld\r\n",
2081 dictSize(sentinel.masters),
2082 sentinel.tilt,
2083 sentinel.running_scripts,
2084 listLength(sentinel.scripts_queue));
2085
2086 di = dictGetIterator(sentinel.masters);
2087 while((de = dictNext(di)) != NULL) {
2088 sentinelRedisInstance *ri = dictGetVal(de);
2089 char *status = "ok";
2090
2091 if (ri->flags & SRI_O_DOWN) status = "odown";
2092 else if (ri->flags & SRI_S_DOWN) status = "sdown";
2093 info = sdscatprintf(info,
2094 "master%d:name=%s,status=%s,address=%s:%d,"
2095 "slaves=%lu,sentinels=%lu\r\n",
2096 master_id++, ri->name, status,
2097 ri->addr->ip, ri->addr->port,
2098 dictSize(ri->slaves),
2099 dictSize(ri->sentinels)+1);
2100 }
2101 dictReleaseIterator(di);
2102 }
2103
2104 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
2105 (unsigned long)sdslen(info)));
2106 addReplySds(c,info);
2107 addReply(c,shared.crlf);
2108 }
2109
2110 /* ===================== SENTINEL availability checks ======================= */
2111
2112 /* Is this instance down from our point of view? */
2113 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
2114 mstime_t elapsed = mstime() - ri->last_avail_time;
2115
2116 /* Check if we are in need for a reconnection of one of the
2117 * links, because we are detecting low activity.
2118 *
2119 * 1) Check if the command link seems connected, was connected not less
2120 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
2121 * idle time that is greater than down_after_period / 2 seconds. */
2122 if (ri->cc &&
2123 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
2124 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
2125 {
2126 sentinelKillLink(ri,ri->cc);
2127 }
2128
2129 /* 2) Check if the pubsub link seems connected, was connected not less
2130 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
2131 * activity in the Pub/Sub channel for more than
2132 * SENTINEL_PUBLISH_PERIOD * 3.
2133 */
2134 if (ri->pc &&
2135 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
2136 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
2137 {
2138 sentinelKillLink(ri,ri->pc);
2139 }
2140
2141 /* Update the subjectively down flag. */
2142 if (elapsed > ri->down_after_period) {
2143 /* Is subjectively down */
2144 if ((ri->flags & SRI_S_DOWN) == 0) {
2145 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
2146 ri->s_down_since_time = mstime();
2147 ri->flags |= SRI_S_DOWN;
2148 }
2149 } else {
2150 /* Is subjectively up */
2151 if (ri->flags & SRI_S_DOWN) {
2152 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
2153 ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
2154 }
2155 }
2156 }
2157
2158 /* Is this instance down accordingly to the configured quorum? */
2159 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
2160 dictIterator *di;
2161 dictEntry *de;
2162 int quorum = 0, odown = 0;
2163
2164 if (master->flags & SRI_S_DOWN) {
2165 /* Is down for enough sentinels? */
2166 quorum = 1; /* the current sentinel. */
2167 /* Count all the other sentinels. */
2168 di = dictGetIterator(master->sentinels);
2169 while((de = dictNext(di)) != NULL) {
2170 sentinelRedisInstance *ri = dictGetVal(de);
2171
2172 if (ri->flags & SRI_MASTER_DOWN) quorum++;
2173 }
2174 dictReleaseIterator(di);
2175 if (quorum >= master->quorum) odown = 1;
2176 }
2177
2178 /* Set the flag accordingly to the outcome. */
2179 if (odown) {
2180 if ((master->flags & SRI_O_DOWN) == 0) {
2181 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
2182 quorum, master->quorum);
2183 master->flags |= SRI_O_DOWN;
2184 master->o_down_since_time = mstime();
2185 }
2186 } else {
2187 if (master->flags & SRI_O_DOWN) {
2188 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
2189 master->flags &= ~SRI_O_DOWN;
2190 }
2191 }
2192 }
2193
2194 /* Receive the SENTINEL is-master-down-by-addr reply, see the
2195 * sentinelAskMasterStateToOtherSentinels() function for more information. */
2196 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
2197 sentinelRedisInstance *ri = c->data;
2198 redisReply *r;
2199
2200 if (ri) ri->pending_commands--;
2201 if (!reply || !ri) return;
2202 r = reply;
2203
2204 /* Ignore every error or unexpected reply.
2205 * Note that if the command returns an error for any reason we'll
2206 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
2207 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
2208 r->element[0]->type == REDIS_REPLY_INTEGER &&
2209 r->element[1]->type == REDIS_REPLY_STRING)
2210 {
2211 ri->last_master_down_reply_time = mstime();
2212 if (r->element[0]->integer == 1) {
2213 ri->flags |= SRI_MASTER_DOWN;
2214 } else {
2215 ri->flags &= ~SRI_MASTER_DOWN;
2216 }
2217 sdsfree(ri->leader);
2218 ri->leader = sdsnew(r->element[1]->str);
2219 }
2220 }
2221
2222 /* If we think (subjectively) the master is down, we start sending
2223 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
2224 * in order to get the replies that allow to reach the quorum and
2225 * possibly also mark the master as objectively down. */
2226 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
2227 dictIterator *di;
2228 dictEntry *de;
2229
2230 di = dictGetIterator(master->sentinels);
2231 while((de = dictNext(di)) != NULL) {
2232 sentinelRedisInstance *ri = dictGetVal(de);
2233 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
2234 char port[32];
2235 int retval;
2236
2237 /* If the master state from other sentinel is too old, we clear it. */
2238 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
2239 ri->flags &= ~SRI_MASTER_DOWN;
2240 sdsfree(ri->leader);
2241 ri->leader = NULL;
2242 }
2243
2244 /* Only ask if master is down to other sentinels if:
2245 *
2246 * 1) We believe it is down, or there is a failover in progress.
2247 * 2) Sentinel is connected.
2248 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
2249 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
2250 continue;
2251 if (ri->flags & SRI_DISCONNECTED) continue;
2252 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
2253 continue;
2254
2255 /* Ask */
2256 ll2string(port,sizeof(port),master->addr->port);
2257 retval = redisAsyncCommand(ri->cc,
2258 sentinelReceiveIsMasterDownReply, NULL,
2259 "SENTINEL is-master-down-by-addr %s %s",
2260 master->addr->ip, port);
2261 if (retval == REDIS_OK) ri->pending_commands++;
2262 }
2263 dictReleaseIterator(di);
2264 }
2265
2266 /* =============================== FAILOVER ================================= */
2267
2268 /* Given a master get the "subjective leader", that is, among all the sentinels
2269 * with given characteristics, the one with the lexicographically smaller
2270 * runid. The characteristics required are:
2271 *
2272 * 1) Has SRI_CAN_FAILOVER flag.
2273 * 2) Is not disconnected.
2274 * 3) Recently answered to our ping (no longer than
2275 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
2276 *
2277 * The function returns a pointer to an sds string representing the runid of the
2278 * leader sentinel instance (from our point of view). Otherwise NULL is
2279 * returned if there are no suitable sentinels.
2280 */
2281
2282 int compareRunID(const void *a, const void *b) {
2283 char **aptrptr = (char**)a, **bptrptr = (char**)b;
2284 return strcasecmp(*aptrptr, *bptrptr);
2285 }
2286
2287 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
2288 dictIterator *di;
2289 dictEntry *de;
2290 char **instance =
2291 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
2292 int instances = 0;
2293 char *leader = NULL;
2294
2295 if (master->flags & SRI_CAN_FAILOVER) {
2296 /* Add myself if I'm a Sentinel that can failover this master. */
2297 instance[instances++] = server.runid;
2298 }
2299
2300 di = dictGetIterator(master->sentinels);
2301 while((de = dictNext(di)) != NULL) {
2302 sentinelRedisInstance *ri = dictGetVal(de);
2303 mstime_t lag = mstime() - ri->last_avail_time;
2304
2305 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
2306 !(ri->flags & SRI_CAN_FAILOVER) ||
2307 (ri->flags & SRI_DISCONNECTED) ||
2308 ri->runid == NULL)
2309 continue;
2310 instance[instances++] = ri->runid;
2311 }
2312 dictReleaseIterator(di);
2313
2314 /* If we have at least one instance passing our checks, order the array
2315 * by runid. */
2316 if (instances) {
2317 qsort(instance,instances,sizeof(char*),compareRunID);
2318 leader = sdsnew(instance[0]);
2319 }
2320 zfree(instance);
2321 return leader;
2322 }
2323
2324 struct sentinelLeader {
2325 char *runid;
2326 unsigned long votes;
2327 };
2328
2329 /* Helper function for sentinelGetObjectiveLeader, increment the counter
2330 * relative to the specified runid. */
2331 void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
2332 dictEntry *de = dictFind(counters,runid);
2333 uint64_t oldval;
2334
2335 if (de) {
2336 oldval = dictGetUnsignedIntegerVal(de);
2337 dictSetUnsignedIntegerVal(de,oldval+1);
2338 } else {
2339 de = dictAddRaw(counters,runid);
2340 redisAssert(de != NULL);
2341 dictSetUnsignedIntegerVal(de,1);
2342 }
2343 }
2344
2345 /* Scan all the Sentinels attached to this master to check what is the
2346 * most voted leader among Sentinels. */
2347 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
2348 dict *counters;
2349 dictIterator *di;
2350 dictEntry *de;
2351 unsigned int voters = 0, voters_quorum;
2352 char *myvote;
2353 char *winner = NULL;
2354
2355 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
2356 counters = dictCreate(&leaderVotesDictType,NULL);
2357
2358 /* Count my vote. */
2359 myvote = sentinelGetSubjectiveLeader(master);
2360 if (myvote) {
2361 sentinelObjectiveLeaderIncr(counters,myvote);
2362 voters++;
2363 }
2364
2365 /* Count other sentinels votes */
2366 di = dictGetIterator(master->sentinels);
2367 while((de = dictNext(di)) != NULL) {
2368 sentinelRedisInstance *ri = dictGetVal(de);
2369 if (ri->leader == NULL) continue;
2370 /* If the failover is not already in progress we are only interested
2371 * in Sentinels that believe the master is down. Otherwise the leader
2372 * selection is useful for the "failover-takedown" when the original
2373 * leader fails. In that case we consider all the voters. */
2374 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
2375 !(ri->flags & SRI_MASTER_DOWN)) continue;
2376 sentinelObjectiveLeaderIncr(counters,ri->leader);
2377 voters++;
2378 }
2379 dictReleaseIterator(di);
2380 voters_quorum = voters/2+1;
2381
2382 /* Check what's the winner. For the winner to win, it needs two conditions:
2383 * 1) Absolute majority between voters (50% + 1).
2384 * 2) And anyway at least master->quorum votes. */
2385 {
2386 uint64_t max_votes = 0; /* Max votes so far. */
2387
2388 di = dictGetIterator(counters);
2389 while((de = dictNext(di)) != NULL) {
2390 uint64_t votes = dictGetUnsignedIntegerVal(de);
2391
2392 if (max_votes < votes) {
2393 max_votes = votes;
2394 winner = dictGetKey(de);
2395 }
2396 }
2397 dictReleaseIterator(di);
2398 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
2399 winner = NULL;
2400 }
2401 winner = winner ? sdsnew(winner) : NULL;
2402 sdsfree(myvote);
2403 dictRelease(counters);
2404 return winner;
2405 }
2406
2407 /* Setup the master state to start a failover as a leader.
2408 *
2409 * State can be either:
2410 *
2411 * SENTINEL_FAILOVER_STATE_WAIT_START: starts a failover from scratch.
2412 * SENTINEL_FAILOVER_STATE_RECONF_SLAVES: takedown a failed failover.
2413 */
2414 void sentinelStartFailover(sentinelRedisInstance *master, int state) {
2415 redisAssert(master->flags & SRI_MASTER);
2416 redisAssert(state == SENTINEL_FAILOVER_STATE_WAIT_START ||
2417 state == SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
2418
2419 master->failover_state = state;
2420 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
2421 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
2422
2423 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
2424 * a recovery of a failover started by another sentinel. */
2425 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
2426 master->failover_start_time = mstime() +
2427 SENTINEL_FAILOVER_FIXED_DELAY +
2428 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
2429 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
2430 "%@ #starting in %lld milliseconds",
2431 master->failover_start_time-mstime());
2432 }
2433 master->failover_state_change_time = mstime();
2434 }
2435
2436 /* This function checks if there are the conditions to start the failover,
2437 * that is:
2438 *
2439 * 1) Enough time has passed since O_DOWN.
2440 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
2441 * 3) We are the objectively leader for this master.
2442 *
2443 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
2444 * and SRI_I_AM_THE_LEADER.
2445 */
2446 void sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
2447 char *leader;
2448 int isleader;
2449
2450 /* We can't failover if the master is not in O_DOWN state or if
2451 * there is not already a failover in progress (to perform the
2452 * takedown if the leader died) or if this Sentinel is not allowed
2453 * to start a failover. */
2454 if (!(master->flags & SRI_CAN_FAILOVER) ||
2455 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
2456
2457 leader = sentinelGetObjectiveLeader(master);
2458 isleader = leader && strcasecmp(leader,server.runid) == 0;
2459 sdsfree(leader);
2460
2461 /* If I'm not the leader, I can't failover for sure. */
2462 if (!isleader) return;
2463
2464 /* If the failover is already in progress there are two options... */
2465 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
2466 if (master->flags & SRI_I_AM_THE_LEADER) {
2467 /* 1) I'm flagged as leader so I already started the failover.
2468 * Just return. */
2469 return;
2470 } else {
2471 mstime_t elapsed = mstime() - master->failover_state_change_time;
2472
2473 /* 2) I'm the new leader, but I'm not flagged as leader in the
2474 * master: I did not started the failover, but the original
2475 * leader has no longer the leadership.
2476 *
2477 * In this case if the failover appears to be lagging
2478 * for at least 25% of the configured failover timeout,
2479 * I can assume I can take control. Otherwise
2480 * it's better to return and wait more. */
2481 if (elapsed < (master->failover_timeout/4)) return;
2482 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
2483 /* We have already an elected slave if we are in
2484 * FAILOVER_IN_PROGRESS state, that is, the slave that we
2485 * observed turning into a master. */
2486 sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
2487 /* As an observer we flagged all the slaves as RECONF_SENT but
2488 * now we are in charge of actually sending the reconfiguration
2489 * command so let's clear this flag for all the instances. */
2490 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
2491 SRI_RECONF_SENT);
2492 }
2493 } else {
2494 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set.
2495 *
2496 * Do we have a slave to promote? Otherwise don't start a failover
2497 * at all. */
2498 if (sentinelSelectSlave(master) == NULL) return;
2499 sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_WAIT_START);
2500 }
2501 }
2502
2503 /* Select a suitable slave to promote. The current algorithm only uses
2504 * the following parameters:
2505 *
2506 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
2507 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
2508 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
2509 * 4) master_link_down_time no more than:
2510 * (now - master->s_down_since_time) + (master->down_after_period * 10).
2511 * 5) Slave priority can't be zero, otherwise the slave is discareded.
2512 *
2513 * Among all the slaves matching the above conditions we select the slave
2514 * with lower slave_priority. If priority is the same we select the slave
2515 * with lexicographically smaller runid.
2516 *
2517 * The function returns the pointer to the selected slave, otherwise
2518 * NULL if no suitable slave was found.
2519 */
2520
2521 int compareSlavesForPromotion(const void *a, const void *b) {
2522 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
2523 **sb = (sentinelRedisInstance **)b;
2524 char *sa_runid, *sb_runid;
2525
2526 if ((*sa)->slave_priority != (*sb)->slave_priority)
2527 return (*sa)->slave_priority - (*sb)->slave_priority;
2528
2529 /* If priority is the same, select the slave with that has the
2530 * lexicographically smaller runid. Note that we try to handle runid
2531 * == NULL as there are old Redis versions that don't publish runid in
2532 * INFO. A NULL runid is considered bigger than any other runid. */
2533 sa_runid = (*sa)->runid;
2534 sb_runid = (*sb)->runid;
2535 if (sa_runid == NULL && sb_runid == NULL) return 0;
2536 else if (sa_runid == NULL) return 1; /* a > b */
2537 else if (sb_runid == NULL) return -1; /* a < b */
2538 return strcasecmp(sa_runid, sb_runid);
2539 }
2540
2541 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
2542 sentinelRedisInstance **instance =
2543 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
2544 sentinelRedisInstance *selected = NULL;
2545 int instances = 0;
2546 dictIterator *di;
2547 dictEntry *de;
2548 mstime_t max_master_down_time = 0;
2549
2550 if (master->flags & SRI_S_DOWN)
2551 max_master_down_time += mstime() - master->s_down_since_time;
2552 max_master_down_time += master->down_after_period * 10;
2553
2554 di = dictGetIterator(master->slaves);
2555 while((de = dictNext(di)) != NULL) {
2556 sentinelRedisInstance *slave = dictGetVal(de);
2557 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2558
2559 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2560 if (slave->last_avail_time < info_validity_time) continue;
2561 if (slave->slave_priority == 0) continue;
2562
2563 /* If the master is in SDOWN state we get INFO for slaves every second.
2564 * Otherwise we get it with the usual period so we need to account for
2565 * a larger delay. */
2566 if ((master->flags & SRI_S_DOWN) == 0)
2567 info_validity_time -= SENTINEL_INFO_PERIOD;
2568 if (slave->info_refresh < info_validity_time) continue;
2569 if (slave->master_link_down_time > max_master_down_time) continue;
2570 instance[instances++] = slave;
2571 }
2572 dictReleaseIterator(di);
2573 if (instances) {
2574 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2575 compareSlavesForPromotion);
2576 selected = instance[0];
2577 }
2578 zfree(instance);
2579 return selected;
2580 }
2581
2582 /* ---------------- Failover state machine implementation ------------------- */
2583 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2584 /* If we in "wait start" but the master is no longer in ODOWN nor in
2585 * SDOWN condition we abort the failover. This is important as it
2586 * prevents a useless failover in a a notable case of netsplit, where
2587 * the senitnels are split from the redis instances. In this case
2588 * the failover will not start while there is the split because no
2589 * good slave can be reached. However when the split is resolved, we
2590 * can go to waitstart if the slave is back rechable a few milliseconds
2591 * before the master is. In that case when the master is back online
2592 * we cancel the failover. */
2593 if ((ri->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_FORCE_FAILOVER)) == 0) {
2594 sentinelEvent(REDIS_WARNING,"-failover-abort-master-is-back",
2595 ri,"%@");
2596 sentinelAbortFailover(ri);
2597 return;
2598 }
2599
2600 /* Start the failover going to the next state if enough time has
2601 * elapsed. */
2602 if (mstime() >= ri->failover_start_time) {
2603 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2604 ri->failover_state_change_time = mstime();
2605 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2606 }
2607 }
2608
2609 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2610 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2611
2612 if (slave == NULL) {
2613 sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
2614 sentinelAbortFailover(ri);
2615 } else {
2616 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2617 slave->flags |= SRI_PROMOTED;
2618 ri->promoted_slave = slave;
2619 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2620 ri->failover_state_change_time = mstime();
2621 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2622 slave, "%@");
2623 }
2624 }
2625
2626 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2627 int retval;
2628
2629 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2630
2631 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2632 * We actually register a generic callback for this command as we don't
2633 * really care about the reply. We check if it worked indirectly observing
2634 * if INFO returns a different role (master instead of slave). */
2635 retval = redisAsyncCommand(ri->promoted_slave->cc,
2636 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2637 if (retval != REDIS_OK) return;
2638 ri->promoted_slave->pending_commands++;
2639 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2640 ri->promoted_slave,"%@");
2641 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2642 ri->failover_state_change_time = mstime();
2643 }
2644
2645 /* We actually wait for promotion indirectly checking with INFO when the
2646 * slave turns into a master. */
2647 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2648 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2649
2650 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2651 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2652 "%@");
2653 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2654 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2655 ri->failover_state_change_time = mstime();
2656 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2657 ri->promoted_slave = NULL;
2658 }
2659 }
2660
2661 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2662 int not_reconfigured = 0, timeout = 0;
2663 dictIterator *di;
2664 dictEntry *de;
2665 mstime_t elapsed = mstime() - master->failover_state_change_time;
2666
2667 /* We can't consider failover finished if the promoted slave is
2668 * not reachable. */
2669 if (master->promoted_slave == NULL ||
2670 master->promoted_slave->flags & SRI_S_DOWN) return;
2671
2672 /* The failover terminates once all the reachable slaves are properly
2673 * configured. */
2674 di = dictGetIterator(master->slaves);
2675 while((de = dictNext(di)) != NULL) {
2676 sentinelRedisInstance *slave = dictGetVal(de);
2677
2678 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2679 if (slave->flags & SRI_S_DOWN) continue;
2680 not_reconfigured++;
2681 }
2682 dictReleaseIterator(di);
2683
2684 /* Force end of failover on timeout. */
2685 if (elapsed > master->failover_timeout) {
2686 not_reconfigured = 0;
2687 timeout = 1;
2688 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2689 }
2690
2691 if (not_reconfigured == 0) {
2692 int role = (master->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
2693 SENTINEL_OBSERVER;
2694
2695 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2696 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2697 master->failover_state_change_time = mstime();
2698 sentinelCallClientReconfScript(master,role,"end",master->addr,
2699 master->promoted_slave->addr);
2700 }
2701
2702 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2703 * command to all the slaves still not reconfigured to replicate with
2704 * the new master. */
2705 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2706 dictIterator *di;
2707 dictEntry *de;
2708 char master_port[32];
2709
2710 ll2string(master_port,sizeof(master_port),
2711 master->promoted_slave->addr->port);
2712
2713 di = dictGetIterator(master->slaves);
2714 while((de = dictNext(di)) != NULL) {
2715 sentinelRedisInstance *slave = dictGetVal(de);
2716 int retval;
2717
2718 if (slave->flags &
2719 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2720
2721 retval = redisAsyncCommand(slave->cc,
2722 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2723 master->promoted_slave->addr->ip,
2724 master_port);
2725 if (retval == REDIS_OK) {
2726 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2727 slave->flags |= SRI_RECONF_SENT;
2728 }
2729 }
2730 dictReleaseIterator(di);
2731 }
2732 }
2733
2734 /* Send SLAVE OF <new master address> to all the remaining slaves that
2735 * still don't appear to have the configuration updated. */
2736 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2737 dictIterator *di;
2738 dictEntry *de;
2739 int in_progress = 0;
2740
2741 di = dictGetIterator(master->slaves);
2742 while((de = dictNext(di)) != NULL) {
2743 sentinelRedisInstance *slave = dictGetVal(de);
2744
2745 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2746 in_progress++;
2747 }
2748 dictReleaseIterator(di);
2749
2750 di = dictGetIterator(master->slaves);
2751 while(in_progress < master->parallel_syncs &&
2752 (de = dictNext(di)) != NULL)
2753 {
2754 sentinelRedisInstance *slave = dictGetVal(de);
2755 int retval;
2756 char master_port[32];
2757
2758 /* Skip the promoted slave, and already configured slaves. */
2759 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2760
2761 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2762 * the slave moving forward to the next state. */
2763 if ((slave->flags & SRI_RECONF_SENT) &&
2764 (mstime() - slave->slave_reconf_sent_time) >
2765 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2766 {
2767 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2768 slave->flags &= ~SRI_RECONF_SENT;
2769 }
2770
2771 /* Nothing to do for instances that are disconnected or already
2772 * in RECONF_SENT state. */
2773 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2774 continue;
2775
2776 /* Send SLAVEOF <new master>. */
2777 ll2string(master_port,sizeof(master_port),
2778 master->promoted_slave->addr->port);
2779 retval = redisAsyncCommand(slave->cc,
2780 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2781 master->promoted_slave->addr->ip,
2782 master_port);
2783 if (retval == REDIS_OK) {
2784 slave->flags |= SRI_RECONF_SENT;
2785 slave->pending_commands++;
2786 slave->slave_reconf_sent_time = mstime();
2787 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2788 in_progress++;
2789 }
2790 }
2791 dictReleaseIterator(di);
2792 sentinelFailoverDetectEnd(master);
2793 }
2794
2795 /* This function is called when the slave is in
2796 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2797 * to remove it from the master table and add the promoted slave instead.
2798 *
2799 * If there are no promoted slaves as this instance is unique, we remove
2800 * and re-add it with the same address to trigger a complete state
2801 * refresh. */
2802 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2803 sentinelRedisInstance *ref = master->promoted_slave ?
2804 master->promoted_slave : master;
2805
2806 sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
2807 master->name, master->addr->ip, master->addr->port,
2808 ref->addr->ip, ref->addr->port);
2809
2810 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
2811 }
2812
2813 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2814 redisAssert(ri->flags & SRI_MASTER);
2815
2816 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2817
2818 switch(ri->failover_state) {
2819 case SENTINEL_FAILOVER_STATE_WAIT_START:
2820 sentinelFailoverWaitStart(ri);
2821 break;
2822 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2823 sentinelFailoverSelectSlave(ri);
2824 break;
2825 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2826 sentinelFailoverSendSlaveOfNoOne(ri);
2827 break;
2828 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2829 sentinelFailoverWaitPromotion(ri);
2830 break;
2831 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2832 sentinelFailoverReconfNextSlave(ri);
2833 break;
2834 case SENTINEL_FAILOVER_STATE_DETECT_END:
2835 sentinelFailoverDetectEnd(ri);
2836 break;
2837 }
2838 }
2839
2840 /* Abort a failover in progress with the following steps:
2841 * 1) If this instance is the leaer send a SLAVEOF command to all the already
2842 * reconfigured slaves if any to configure them to replicate with the
2843 * original master.
2844 * 2) For both leaders and observers: clear the failover flags and state in
2845 * the master instance.
2846 * 3) If there is already a promoted slave and we are the leader, and this
2847 * slave is not DISCONNECTED, try to reconfigure it to replicate
2848 * back to the master as well, sending a best effort SLAVEOF command.
2849 */
2850 void sentinelAbortFailover(sentinelRedisInstance *ri) {
2851 char master_port[32];
2852 dictIterator *di;
2853 dictEntry *de;
2854 int sentinel_role;
2855
2856 redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
2857 ll2string(master_port,sizeof(master_port),ri->addr->port);
2858
2859 /* Clear failover related flags from slaves.
2860 * Also if we are the leader make sure to send SLAVEOF commands to all the
2861 * already reconfigured slaves in order to turn them back into slaves of
2862 * the original master. */
2863 di = dictGetIterator(ri->slaves);
2864 while((de = dictNext(di)) != NULL) {
2865 sentinelRedisInstance *slave = dictGetVal(de);
2866 if ((ri->flags & SRI_I_AM_THE_LEADER) &&
2867 !(slave->flags & SRI_DISCONNECTED) &&
2868 (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
2869 SRI_RECONF_DONE)))
2870 {
2871 int retval;
2872
2873 retval = redisAsyncCommand(slave->cc,
2874 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2875 ri->addr->ip,
2876 master_port);
2877 if (retval == REDIS_OK)
2878 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2879 }
2880 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2881 }
2882 dictReleaseIterator(di);
2883
2884 sentinel_role = (ri->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
2885 SENTINEL_OBSERVER;
2886 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER|SRI_FORCE_FAILOVER);
2887 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2888 ri->failover_state_change_time = mstime();
2889 if (ri->promoted_slave) {
2890 sentinelCallClientReconfScript(ri,sentinel_role,"abort",
2891 ri->promoted_slave->addr,ri->addr);
2892 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2893 ri->promoted_slave = NULL;
2894 }
2895 }
2896
2897 /* The following is called only for master instances and will abort the
2898 * failover process if:
2899 *
2900 * 1) The failover is in progress.
2901 * 2) We already promoted a slave.
2902 * 3) The promoted slave is in extended SDOWN condition.
2903 */
2904 void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2905 /* Failover is in progress? Do we have a promoted slave? */
2906 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2907
2908 /* Is the promoted slave into an extended SDOWN state? */
2909 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2910 (mstime() - ri->promoted_slave->s_down_since_time) <
2911 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2912
2913 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2914 sentinelAbortFailover(ri);
2915 }
2916
2917 /* ======================== SENTINEL timer handler ==========================
2918 * This is the "main" our Sentinel, being sentinel completely non blocking
2919 * in design. The function is called every second.
2920 * -------------------------------------------------------------------------- */
2921
2922 /* Perform scheduled operations for the specified Redis instance. */
2923 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2924 /* ========== MONITORING HALF ============ */
2925 /* Every kind of instance */
2926 sentinelReconnectInstance(ri);
2927 sentinelPingInstance(ri);
2928
2929 /* Masters and slaves */
2930 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2931 /* Nothing so far. */
2932 }
2933
2934 /* Only masters */
2935 if (ri->flags & SRI_MASTER) {
2936 sentinelAskMasterStateToOtherSentinels(ri);
2937 }
2938
2939 /* ============== ACTING HALF ============= */
2940 /* We don't proceed with the acting half if we are in TILT mode.
2941 * TILT happens when we find something odd with the time, like a
2942 * sudden change in the clock. */
2943 if (sentinel.tilt) {
2944 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2945 sentinel.tilt = 0;
2946 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2947 }
2948
2949 /* Every kind of instance */
2950 sentinelCheckSubjectivelyDown(ri);
2951
2952 /* Masters and slaves */
2953 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2954 /* Nothing so far. */
2955 }
2956
2957 /* Only masters */
2958 if (ri->flags & SRI_MASTER) {
2959 sentinelCheckObjectivelyDown(ri);
2960 sentinelStartFailoverIfNeeded(ri);
2961 sentinelFailoverStateMachine(ri);
2962 sentinelAbortFailoverIfNeeded(ri);
2963 }
2964 }
2965
2966 /* Perform scheduled operations for all the instances in the dictionary.
2967 * Recursively call the function against dictionaries of slaves. */
2968 void sentinelHandleDictOfRedisInstances(dict *instances) {
2969 dictIterator *di;
2970 dictEntry *de;
2971 sentinelRedisInstance *switch_to_promoted = NULL;
2972
2973 /* There are a number of things we need to perform against every master. */
2974 di = dictGetIterator(instances);
2975 while((de = dictNext(di)) != NULL) {
2976 sentinelRedisInstance *ri = dictGetVal(de);
2977
2978 sentinelHandleRedisInstance(ri);
2979 if (ri->flags & SRI_MASTER) {
2980 sentinelHandleDictOfRedisInstances(ri->slaves);
2981 sentinelHandleDictOfRedisInstances(ri->sentinels);
2982 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
2983 switch_to_promoted = ri;
2984 }
2985 }
2986 }
2987 if (switch_to_promoted)
2988 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
2989 dictReleaseIterator(di);
2990 }
2991
2992 /* This function checks if we need to enter the TITL mode.
2993 *
2994 * The TILT mode is entered if we detect that between two invocations of the
2995 * timer interrupt, a negative amount of time, or too much time has passed.
2996 * Note that we expect that more or less just 100 milliseconds will pass
2997 * if everything is fine. However we'll see a negative number or a
2998 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
2999 * following conditions happen:
3000 *
3001 * 1) The Sentiel process for some time is blocked, for every kind of
3002 * random reason: the load is huge, the computer was freezed for some time
3003 * in I/O or alike, the process was stopped by a signal. Everything.
3004 * 2) The system clock was altered significantly.
3005 *
3006 * Under both this conditions we'll see everything as timed out and failing
3007 * without good reasons. Instead we enter the TILT mode and wait
3008 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
3009 *
3010 * During TILT time we still collect information, we just do not act. */
3011 void sentinelCheckTiltCondition(void) {
3012 mstime_t now = mstime();
3013 mstime_t delta = now - sentinel.previous_time;
3014
3015 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
3016 sentinel.tilt = 1;
3017 sentinel.tilt_start_time = mstime();
3018 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
3019 }
3020 sentinel.previous_time = mstime();
3021 }
3022
3023 void sentinelTimer(void) {
3024 sentinelCheckTiltCondition();
3025 sentinelHandleDictOfRedisInstances(sentinel.masters);
3026 sentinelRunPendingScripts();
3027 sentinelCollectTerminatedScripts();
3028 sentinelKillTimedoutScripts();
3029 }
3030