]> git.saurik.com Git - redis.git/blob - src/sentinel.c
A reimplementation of blocking operation internals.
[redis.git] / src / sentinel.c
1 /* Redis Sentinel implementation
2 * -----------------------------
3 *
4 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include "redis.h"
33 #include "hiredis.h"
34 #include "async.h"
35
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <sys/socket.h>
39 #include <sys/wait.h>
40
41 extern char **environ;
42
43 #define REDIS_SENTINEL_PORT 26379
44
45 /* ======================== Sentinel global state =========================== */
46
47 typedef long long mstime_t; /* millisecond time type. */
48
49 /* Address object, used to describe an ip:port pair. */
50 typedef struct sentinelAddr {
51 char *ip;
52 int port;
53 } sentinelAddr;
54
55 /* A Sentinel Redis Instance object is monitoring. */
56 #define SRI_MASTER (1<<0)
57 #define SRI_SLAVE (1<<1)
58 #define SRI_SENTINEL (1<<2)
59 #define SRI_DISCONNECTED (1<<3)
60 #define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
61 #define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
62 #define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
63 its master is down. */
64 /* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
65 * allowed to perform the failover for this master.
66 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
67 * perform the failover on its master. */
68 #define SRI_CAN_FAILOVER (1<<7)
69 #define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
70 this master. */
71 #define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
72 #define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
73 #define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
74 #define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
75 #define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
76 #define SRI_FORCE_FAILOVER (1<<14) /* Force failover with master up. */
77 #define SRI_SCRIPT_KILL_SENT (1<<15) /* SCRIPT KILL already sent on -BUSY */
78
79 #define SENTINEL_INFO_PERIOD 10000
80 #define SENTINEL_PING_PERIOD 1000
81 #define SENTINEL_ASK_PERIOD 1000
82 #define SENTINEL_PUBLISH_PERIOD 5000
83 #define SENTINEL_DOWN_AFTER_PERIOD 30000
84 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
85 #define SENTINEL_TILT_TRIGGER 2000
86 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
87 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
88 #define SENTINEL_PROMOTION_RETRY_PERIOD 30000
89 #define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
90 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
91 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
92 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
93 #define SENTINEL_MAX_PENDING_COMMANDS 100
94 #define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
95
96 /* How many milliseconds is an information valid? This applies for instance
97 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
98 #define SENTINEL_INFO_VALIDITY_TIME 5000
99 #define SENTINEL_FAILOVER_FIXED_DELAY 5000
100 #define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
101
102 /* Failover machine different states. */
103 #define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
104 #define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
105 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
106 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
107 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
108 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
109 #define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
110 #define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
111 #define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
112 #define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
113 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
114
115 #define SENTINEL_MASTER_LINK_STATUS_UP 0
116 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
117
118 /* Generic flags that can be used with different functions. */
119 #define SENTINEL_NO_FLAGS 0
120 #define SENTINEL_GENERATE_EVENT 1
121 #define SENTINEL_LEADER 2
122 #define SENTINEL_OBSERVER 4
123
124 /* Script execution flags and limits. */
125 #define SENTINEL_SCRIPT_NONE 0
126 #define SENTINEL_SCRIPT_RUNNING 1
127 #define SENTINEL_SCRIPT_MAX_QUEUE 256
128 #define SENTINEL_SCRIPT_MAX_RUNNING 16
129 #define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
130 #define SENTINEL_SCRIPT_MAX_RETRY 10
131 #define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
132
133 typedef struct sentinelRedisInstance {
134 int flags; /* See SRI_... defines */
135 char *name; /* Master name from the point of view of this sentinel. */
136 char *runid; /* run ID of this instance. */
137 sentinelAddr *addr; /* Master host. */
138 redisAsyncContext *cc; /* Hiredis context for commands. */
139 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
140 int pending_commands; /* Number of commands sent waiting for a reply. */
141 mstime_t cc_conn_time; /* cc connection time. */
142 mstime_t pc_conn_time; /* pc connection time. */
143 mstime_t pc_last_activity; /* Last time we received any message. */
144 mstime_t last_avail_time; /* Last time the instance replied to ping with
145 a reply we consider valid. */
146 mstime_t last_pong_time; /* Last time the instance replied to ping,
147 whatever the reply was. That's used to check
148 if the link is idle and must be reconnected. */
149 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
150 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
151 we received an hello from this Sentinel
152 via Pub/Sub. */
153 mstime_t last_master_down_reply_time; /* Time of last reply to
154 SENTINEL is-master-down command. */
155 mstime_t s_down_since_time; /* Subjectively down since time. */
156 mstime_t o_down_since_time; /* Objectively down since time. */
157 mstime_t down_after_period; /* Consider it down after that period. */
158 mstime_t info_refresh; /* Time at which we received INFO output from it. */
159
160 /* Master specific. */
161 dict *sentinels; /* Other sentinels monitoring the same master. */
162 dict *slaves; /* Slaves for this master instance. */
163 int quorum; /* Number of sentinels that need to agree on failure. */
164 int parallel_syncs; /* How many slaves to reconfigure at same time. */
165
166 /* Slave specific. */
167 mstime_t master_link_down_time; /* Slave replication link down time. */
168 int slave_priority; /* Slave priority according to its INFO output. */
169 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
170 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
171 char *slave_master_host; /* Master host as reported by INFO */
172 int slave_master_port; /* Master port as reported by INFO */
173 int slave_master_link_status; /* Master link status as reported by INFO */
174 /* Failover */
175 char *leader; /* If this is a master instance, this is the runid of
176 the Sentinel that should perform the failover. If
177 this is a Sentinel, this is the runid of the Sentinel
178 that this other Sentinel is voting as leader.
179 This field is valid only if SRI_MASTER_DOWN is
180 set on the Sentinel instance. */
181 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
182 mstime_t failover_state_change_time;
183 mstime_t failover_start_time; /* When to start to failover if leader. */
184 mstime_t failover_timeout; /* Max time to refresh failover state. */
185 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
186 /* Scripts executed to notify admin or reconfigure clients: when they
187 * are set to NULL no script is executed. */
188 char *notification_script;
189 char *client_reconfig_script;
190 } sentinelRedisInstance;
191
192 /* Main state. */
193 struct sentinelState {
194 dict *masters; /* Dictionary of master sentinelRedisInstances.
195 Key is the instance name, value is the
196 sentinelRedisInstance structure pointer. */
197 int tilt; /* Are we in TILT mode? */
198 int running_scripts; /* Number of scripts in execution right now. */
199 mstime_t tilt_start_time; /* When TITL started. */
200 mstime_t previous_time; /* Time last time we ran the time handler. */
201 list *scripts_queue; /* Queue of user scripts to execute. */
202 } sentinel;
203
204 /* A script execution job. */
205 typedef struct sentinelScriptJob {
206 int flags; /* Script job flags: SENTINEL_SCRIPT_* */
207 int retry_num; /* Number of times we tried to execute it. */
208 char **argv; /* Arguments to call the script. */
209 mstime_t start_time; /* Script execution time if the script is running,
210 otherwise 0 if we are allowed to retry the
211 execution at any time. If the script is not
212 running and it's not 0, it means: do not run
213 before the specified time. */
214 pid_t pid; /* Script execution pid. */
215 } sentinelScriptJob;
216
217 /* ======================= hiredis ae.c adapters =============================
218 * Note: this implementation is taken from hiredis/adapters/ae.h, however
219 * we have our modified copy for Sentinel in order to use our allocator
220 * and to have full control over how the adapter works. */
221
222 typedef struct redisAeEvents {
223 redisAsyncContext *context;
224 aeEventLoop *loop;
225 int fd;
226 int reading, writing;
227 } redisAeEvents;
228
229 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
230 ((void)el); ((void)fd); ((void)mask);
231
232 redisAeEvents *e = (redisAeEvents*)privdata;
233 redisAsyncHandleRead(e->context);
234 }
235
236 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
237 ((void)el); ((void)fd); ((void)mask);
238
239 redisAeEvents *e = (redisAeEvents*)privdata;
240 redisAsyncHandleWrite(e->context);
241 }
242
243 static void redisAeAddRead(void *privdata) {
244 redisAeEvents *e = (redisAeEvents*)privdata;
245 aeEventLoop *loop = e->loop;
246 if (!e->reading) {
247 e->reading = 1;
248 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
249 }
250 }
251
252 static void redisAeDelRead(void *privdata) {
253 redisAeEvents *e = (redisAeEvents*)privdata;
254 aeEventLoop *loop = e->loop;
255 if (e->reading) {
256 e->reading = 0;
257 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
258 }
259 }
260
261 static void redisAeAddWrite(void *privdata) {
262 redisAeEvents *e = (redisAeEvents*)privdata;
263 aeEventLoop *loop = e->loop;
264 if (!e->writing) {
265 e->writing = 1;
266 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
267 }
268 }
269
270 static void redisAeDelWrite(void *privdata) {
271 redisAeEvents *e = (redisAeEvents*)privdata;
272 aeEventLoop *loop = e->loop;
273 if (e->writing) {
274 e->writing = 0;
275 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
276 }
277 }
278
279 static void redisAeCleanup(void *privdata) {
280 redisAeEvents *e = (redisAeEvents*)privdata;
281 redisAeDelRead(privdata);
282 redisAeDelWrite(privdata);
283 zfree(e);
284 }
285
286 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
287 redisContext *c = &(ac->c);
288 redisAeEvents *e;
289
290 /* Nothing should be attached when something is already attached */
291 if (ac->ev.data != NULL)
292 return REDIS_ERR;
293
294 /* Create container for context and r/w events */
295 e = (redisAeEvents*)zmalloc(sizeof(*e));
296 e->context = ac;
297 e->loop = loop;
298 e->fd = c->fd;
299 e->reading = e->writing = 0;
300
301 /* Register functions to start/stop listening for events */
302 ac->ev.addRead = redisAeAddRead;
303 ac->ev.delRead = redisAeDelRead;
304 ac->ev.addWrite = redisAeAddWrite;
305 ac->ev.delWrite = redisAeDelWrite;
306 ac->ev.cleanup = redisAeCleanup;
307 ac->ev.data = e;
308
309 return REDIS_OK;
310 }
311
312 /* ============================= Prototypes ================================= */
313
314 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
315 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
316 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
317 sentinelRedisInstance *sentinelGetMasterByName(char *name);
318 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
319 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
320 int yesnotoi(char *s);
321 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
322 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
323 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
324 void sentinelAbortFailover(sentinelRedisInstance *ri);
325 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
326 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
327 void sentinelScheduleScriptExecution(char *path, ...);
328 void sentinelStartFailover(sentinelRedisInstance *master, int state);
329
330 /* ========================= Dictionary types =============================== */
331
332 unsigned int dictSdsHash(const void *key);
333 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
334 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
335
336 void dictInstancesValDestructor (void *privdata, void *obj) {
337 releaseSentinelRedisInstance(obj);
338 }
339
340 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
341 *
342 * also used for: sentinelRedisInstance->sentinels dictionary that maps
343 * sentinels ip:port to last seen time in Pub/Sub hello message. */
344 dictType instancesDictType = {
345 dictSdsHash, /* hash function */
346 NULL, /* key dup */
347 NULL, /* val dup */
348 dictSdsKeyCompare, /* key compare */
349 NULL, /* key destructor */
350 dictInstancesValDestructor /* val destructor */
351 };
352
353 /* Instance runid (sds) -> votes (long casted to void*)
354 *
355 * This is useful into sentinelGetObjectiveLeader() function in order to
356 * count the votes and understand who is the leader. */
357 dictType leaderVotesDictType = {
358 dictSdsHash, /* hash function */
359 NULL, /* key dup */
360 NULL, /* val dup */
361 dictSdsKeyCompare, /* key compare */
362 NULL, /* key destructor */
363 NULL /* val destructor */
364 };
365
366 /* =========================== Initialization =============================== */
367
368 void sentinelCommand(redisClient *c);
369 void sentinelInfoCommand(redisClient *c);
370
371 struct redisCommand sentinelcmds[] = {
372 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
373 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
374 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
375 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
376 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
377 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
378 {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0}
379 };
380
381 /* This function overwrites a few normal Redis config default with Sentinel
382 * specific defaults. */
383 void initSentinelConfig(void) {
384 server.port = REDIS_SENTINEL_PORT;
385 }
386
387 /* Perform the Sentinel mode initialization. */
388 void initSentinel(void) {
389 int j;
390
391 /* Remove usual Redis commands from the command table, then just add
392 * the SENTINEL command. */
393 dictEmpty(server.commands);
394 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
395 int retval;
396 struct redisCommand *cmd = sentinelcmds+j;
397
398 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
399 redisAssert(retval == DICT_OK);
400 }
401
402 /* Initialize various data structures. */
403 sentinel.masters = dictCreate(&instancesDictType,NULL);
404 sentinel.tilt = 0;
405 sentinel.tilt_start_time = mstime();
406 sentinel.previous_time = mstime();
407 sentinel.running_scripts = 0;
408 sentinel.scripts_queue = listCreate();
409 }
410
411 /* ============================== sentinelAddr ============================== */
412
413 /* Create a sentinelAddr object and return it on success.
414 * On error NULL is returned and errno is set to:
415 * ENOENT: Can't resolve the hostname.
416 * EINVAL: Invalid port number.
417 */
418 sentinelAddr *createSentinelAddr(char *hostname, int port) {
419 char buf[32];
420 sentinelAddr *sa;
421
422 if (port <= 0 || port > 65535) {
423 errno = EINVAL;
424 return NULL;
425 }
426 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
427 errno = ENOENT;
428 return NULL;
429 }
430 sa = zmalloc(sizeof(*sa));
431 sa->ip = sdsnew(buf);
432 sa->port = port;
433 return sa;
434 }
435
436 /* Free a Sentinel address. Can't fail. */
437 void releaseSentinelAddr(sentinelAddr *sa) {
438 sdsfree(sa->ip);
439 zfree(sa);
440 }
441
442 /* =========================== Events notification ========================== */
443
444 /* Send an event to log, pub/sub, user notification script.
445 *
446 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
447 * the execution of the user notification script.
448 *
449 * 'type' is the message type, also used as a pub/sub channel name.
450 *
451 * 'ri', is the redis instance target of this event if applicable, and is
452 * used to obtain the path of the notification script to execute.
453 *
454 * The remaining arguments are printf-alike.
455 * If the format specifier starts with the two characters "%@" then ri is
456 * not NULL, and the message is prefixed with an instance identifier in the
457 * following format:
458 *
459 * <instance type> <instance name> <ip> <port>
460 *
461 * If the instance type is not master, than the additional string is
462 * added to specify the originating master:
463 *
464 * @ <master name> <master ip> <master port>
465 *
466 * Any other specifier after "%@" is processed by printf itself.
467 */
468 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
469 const char *fmt, ...) {
470 va_list ap;
471 char msg[REDIS_MAX_LOGMSG_LEN];
472 robj *channel, *payload;
473
474 /* Handle %@ */
475 if (fmt[0] == '%' && fmt[1] == '@') {
476 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
477 NULL : ri->master;
478
479 if (master) {
480 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
481 sentinelRedisInstanceTypeStr(ri),
482 ri->name, ri->addr->ip, ri->addr->port,
483 master->name, master->addr->ip, master->addr->port);
484 } else {
485 snprintf(msg, sizeof(msg), "%s %s %s %d",
486 sentinelRedisInstanceTypeStr(ri),
487 ri->name, ri->addr->ip, ri->addr->port);
488 }
489 fmt += 2;
490 } else {
491 msg[0] = '\0';
492 }
493
494 /* Use vsprintf for the rest of the formatting if any. */
495 if (fmt[0] != '\0') {
496 va_start(ap, fmt);
497 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
498 va_end(ap);
499 }
500
501 /* Log the message if the log level allows it to be logged. */
502 if (level >= server.verbosity)
503 redisLog(level,"%s %s",type,msg);
504
505 /* Publish the message via Pub/Sub if it's not a debugging one. */
506 if (level != REDIS_DEBUG) {
507 channel = createStringObject(type,strlen(type));
508 payload = createStringObject(msg,strlen(msg));
509 pubsubPublishMessage(channel,payload);
510 decrRefCount(channel);
511 decrRefCount(payload);
512 }
513
514 /* Call the notification script if applicable. */
515 if (level == REDIS_WARNING && ri != NULL) {
516 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
517 ri : ri->master;
518 if (master->notification_script) {
519 sentinelScheduleScriptExecution(master->notification_script,
520 type,msg,NULL);
521 }
522 }
523 }
524
525 /* ============================ script execution ============================ */
526
527 /* Release a script job structure and all the associated data. */
528 void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
529 int j = 0;
530
531 while(sj->argv[j]) sdsfree(sj->argv[j++]);
532 zfree(sj->argv);
533 zfree(sj);
534 }
535
536 #define SENTINEL_SCRIPT_MAX_ARGS 16
537 void sentinelScheduleScriptExecution(char *path, ...) {
538 va_list ap;
539 char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
540 int argc = 1;
541 sentinelScriptJob *sj;
542
543 va_start(ap, path);
544 while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
545 argv[argc] = va_arg(ap,char*);
546 if (!argv[argc]) break;
547 argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
548 argc++;
549 }
550 va_end(ap);
551 argv[0] = sdsnew(path);
552
553 sj = zmalloc(sizeof(*sj));
554 sj->flags = SENTINEL_SCRIPT_NONE;
555 sj->retry_num = 0;
556 sj->argv = zmalloc(sizeof(char*)*(argc+1));
557 sj->start_time = 0;
558 sj->pid = 0;
559 memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
560
561 listAddNodeTail(sentinel.scripts_queue,sj);
562
563 /* Remove the oldest non running script if we already hit the limit. */
564 if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
565 listNode *ln;
566 listIter li;
567
568 listRewind(sentinel.scripts_queue,&li);
569 while ((ln = listNext(&li)) != NULL) {
570 sj = ln->value;
571
572 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
573 /* The first node is the oldest as we add on tail. */
574 listDelNode(sentinel.scripts_queue,ln);
575 sentinelReleaseScriptJob(sj);
576 break;
577 }
578 redisAssert(listLength(sentinel.scripts_queue) <=
579 SENTINEL_SCRIPT_MAX_QUEUE);
580 }
581 }
582
583 /* Lookup a script in the scripts queue via pid, and returns the list node
584 * (so that we can easily remove it from the queue if needed). */
585 listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
586 listNode *ln;
587 listIter li;
588
589 listRewind(sentinel.scripts_queue,&li);
590 while ((ln = listNext(&li)) != NULL) {
591 sentinelScriptJob *sj = ln->value;
592
593 if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
594 return ln;
595 }
596 return NULL;
597 }
598
599 /* Run pending scripts if we are not already at max number of running
600 * scripts. */
601 void sentinelRunPendingScripts(void) {
602 listNode *ln;
603 listIter li;
604 mstime_t now = mstime();
605
606 /* Find jobs that are not running and run them, from the top to the
607 * tail of the queue, so we run older jobs first. */
608 listRewind(sentinel.scripts_queue,&li);
609 while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
610 (ln = listNext(&li)) != NULL)
611 {
612 sentinelScriptJob *sj = ln->value;
613 pid_t pid;
614
615 /* Skip if already running. */
616 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
617
618 /* Skip if it's a retry, but not enough time has elapsed. */
619 if (sj->start_time && sj->start_time > now) continue;
620
621 sj->flags |= SENTINEL_SCRIPT_RUNNING;
622 sj->start_time = mstime();
623 sj->retry_num++;
624 pid = fork();
625
626 if (pid == -1) {
627 /* Parent (fork error).
628 * We report fork errors as signal 99, in order to unify the
629 * reporting with other kind of errors. */
630 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
631 "%s %d %d", sj->argv[0], 99, 0);
632 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
633 sj->pid = 0;
634 } else if (pid == 0) {
635 /* Child */
636 execve(sj->argv[0],sj->argv,environ);
637 /* If we are here an error occurred. */
638 _exit(2); /* Don't retry execution. */
639 } else {
640 sentinel.running_scripts++;
641 sj->pid = pid;
642 sentinelEvent(REDIS_DEBUG,"+script-child",NULL,"%ld",(long)pid);
643 }
644 }
645 }
646
647 /* How much to delay the execution of a script that we need to retry after
648 * an error?
649 *
650 * We double the retry delay for every further retry we do. So for instance
651 * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
652 * starting from the second attempt to execute the script the delays are:
653 * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
654 mstime_t sentinelScriptRetryDelay(int retry_num) {
655 mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
656
657 while (retry_num-- > 1) delay *= 2;
658 return delay;
659 }
660
661 /* Check for scripts that terminated, and remove them from the queue if the
662 * script terminated successfully. If instead the script was terminated by
663 * a signal, or returned exit code "1", it is scheduled to run again if
664 * the max number of retries did not already elapsed. */
665 void sentinelCollectTerminatedScripts(void) {
666 int statloc;
667 pid_t pid;
668
669 while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
670 int exitcode = WEXITSTATUS(statloc);
671 int bysignal = 0;
672 listNode *ln;
673 sentinelScriptJob *sj;
674
675 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
676 sentinelEvent(REDIS_DEBUG,"-script-child",NULL,"%ld %d %d",
677 (long)pid, exitcode, bysignal);
678
679 ln = sentinelGetScriptListNodeByPid(pid);
680 if (ln == NULL) {
681 redisLog(REDIS_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
682 continue;
683 }
684 sj = ln->value;
685
686 /* If the script was terminated by a signal or returns an
687 * exit code of "1" (that means: please retry), we reschedule it
688 * if the max number of retries is not already reached. */
689 if ((bysignal || exitcode == 1) &&
690 sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
691 {
692 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
693 sj->pid = 0;
694 sj->start_time = mstime() +
695 sentinelScriptRetryDelay(sj->retry_num);
696 } else {
697 /* Otherwise let's remove the script, but log the event if the
698 * execution did not terminated in the best of the ways. */
699 if (bysignal || exitcode != 0) {
700 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
701 "%s %d %d", sj->argv[0], bysignal, exitcode);
702 }
703 listDelNode(sentinel.scripts_queue,ln);
704 sentinelReleaseScriptJob(sj);
705 sentinel.running_scripts--;
706 }
707 }
708 }
709
710 /* Kill scripts in timeout, they'll be collected by the
711 * sentinelCollectTerminatedScripts() function. */
712 void sentinelKillTimedoutScripts(void) {
713 listNode *ln;
714 listIter li;
715 mstime_t now = mstime();
716
717 listRewind(sentinel.scripts_queue,&li);
718 while ((ln = listNext(&li)) != NULL) {
719 sentinelScriptJob *sj = ln->value;
720
721 if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
722 (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
723 {
724 sentinelEvent(REDIS_WARNING,"-script-timeout",NULL,"%s %ld",
725 sj->argv[0], (long)sj->pid);
726 kill(sj->pid,SIGKILL);
727 }
728 }
729 }
730
731 /* Implements SENTINEL PENDING-SCRIPTS command. */
732 void sentinelPendingScriptsCommand(redisClient *c) {
733 listNode *ln;
734 listIter li;
735
736 addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
737 listRewind(sentinel.scripts_queue,&li);
738 while ((ln = listNext(&li)) != NULL) {
739 sentinelScriptJob *sj = ln->value;
740 int j = 0;
741
742 addReplyMultiBulkLen(c,10);
743
744 addReplyBulkCString(c,"argv");
745 while (sj->argv[j]) j++;
746 addReplyMultiBulkLen(c,j);
747 j = 0;
748 while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
749
750 addReplyBulkCString(c,"flags");
751 addReplyBulkCString(c,
752 (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
753
754 addReplyBulkCString(c,"pid");
755 addReplyBulkLongLong(c,sj->pid);
756
757 if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
758 addReplyBulkCString(c,"run-time");
759 addReplyBulkLongLong(c,mstime() - sj->start_time);
760 } else {
761 mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
762 if (delay < 0) delay = 0;
763 addReplyBulkCString(c,"run-delay");
764 addReplyBulkLongLong(c,delay);
765 }
766
767 addReplyBulkCString(c,"retry-num");
768 addReplyBulkLongLong(c,sj->retry_num);
769 }
770 }
771
772 /* This function calls, if any, the client reconfiguration script with the
773 * following parameters:
774 *
775 * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
776 *
777 * It is called every time a failover starts, ends, or is aborted.
778 *
779 * <state> is "start", "end" or "abort".
780 * <role> is either "leader" or "observer".
781 *
782 * from/to fields are respectively master -> promoted slave addresses for
783 * "start" and "end", or the reverse (promoted slave -> master) in case of
784 * "abort".
785 */
786 void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
787 char fromport[32], toport[32];
788
789 if (master->client_reconfig_script == NULL) return;
790 ll2string(fromport,sizeof(fromport),from->port);
791 ll2string(toport,sizeof(toport),to->port);
792 sentinelScheduleScriptExecution(master->client_reconfig_script,
793 master->name,
794 (role == SENTINEL_LEADER) ? "leader" : "observer",
795 state, from->ip, fromport, to->ip, toport, NULL);
796 }
797
798 /* ========================== sentinelRedisInstance ========================= */
799
800 /* Create a redis instance, the following fields must be populated by the
801 * caller if needed:
802 * runid: set to NULL but will be populated once INFO output is received.
803 * info_refresh: is set to 0 to mean that we never received INFO so far.
804 *
805 * If SRI_MASTER is set into initial flags the instance is added to
806 * sentinel.masters table.
807 *
808 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
809 * instance is added into master->slaves or master->sentinels table.
810 *
811 * If the instance is a slave or sentinel, the name parameter is ignored and
812 * is created automatically as hostname:port.
813 *
814 * The function fails if hostname can't be resolved or port is out of range.
815 * When this happens NULL is returned and errno is set accordingly to the
816 * createSentinelAddr() function.
817 *
818 * The function may also fail and return NULL with errno set to EBUSY if
819 * a master or slave with the same name already exists. */
820 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
821 sentinelRedisInstance *ri;
822 sentinelAddr *addr;
823 dict *table = NULL;
824 char slavename[128], *sdsname;
825
826 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
827 redisAssert((flags & SRI_MASTER) || master != NULL);
828
829 /* Check address validity. */
830 addr = createSentinelAddr(hostname,port);
831 if (addr == NULL) return NULL;
832
833 /* For slaves and sentinel we use ip:port as name. */
834 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
835 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
836 name = slavename;
837 }
838
839 /* Make sure the entry is not duplicated. This may happen when the same
840 * name for a master is used multiple times inside the configuration or
841 * if we try to add multiple times a slave or sentinel with same ip/port
842 * to a master. */
843 if (flags & SRI_MASTER) table = sentinel.masters;
844 else if (flags & SRI_SLAVE) table = master->slaves;
845 else if (flags & SRI_SENTINEL) table = master->sentinels;
846 sdsname = sdsnew(name);
847 if (dictFind(table,sdsname)) {
848 sdsfree(sdsname);
849 errno = EBUSY;
850 return NULL;
851 }
852
853 /* Create the instance object. */
854 ri = zmalloc(sizeof(*ri));
855 /* Note that all the instances are started in the disconnected state,
856 * the event loop will take care of connecting them. */
857 ri->flags = flags | SRI_DISCONNECTED;
858 ri->name = sdsname;
859 ri->runid = NULL;
860 ri->addr = addr;
861 ri->cc = NULL;
862 ri->pc = NULL;
863 ri->pending_commands = 0;
864 ri->cc_conn_time = 0;
865 ri->pc_conn_time = 0;
866 ri->pc_last_activity = 0;
867 ri->last_avail_time = mstime();
868 ri->last_pong_time = mstime();
869 ri->last_pub_time = mstime();
870 ri->last_hello_time = mstime();
871 ri->last_master_down_reply_time = mstime();
872 ri->s_down_since_time = 0;
873 ri->o_down_since_time = 0;
874 ri->down_after_period = master ? master->down_after_period :
875 SENTINEL_DOWN_AFTER_PERIOD;
876 ri->master_link_down_time = 0;
877 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
878 ri->slave_reconf_sent_time = 0;
879 ri->slave_master_host = NULL;
880 ri->slave_master_port = 0;
881 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
882 ri->sentinels = dictCreate(&instancesDictType,NULL);
883 ri->quorum = quorum;
884 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
885 ri->master = master;
886 ri->slaves = dictCreate(&instancesDictType,NULL);
887 ri->info_refresh = 0;
888
889 /* Failover state. */
890 ri->leader = NULL;
891 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
892 ri->failover_state_change_time = 0;
893 ri->failover_start_time = 0;
894 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
895 ri->promoted_slave = NULL;
896 ri->notification_script = NULL;
897 ri->client_reconfig_script = NULL;
898
899 /* Add into the right table. */
900 dictAdd(table, ri->name, ri);
901 return ri;
902 }
903
904 /* Release this instance and all its slaves, sentinels, hiredis connections.
905 * This function also takes care of unlinking the instance from the main
906 * masters table (if it is a master) or from its master sentinels/slaves table
907 * if it is a slave or sentinel. */
908 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
909 /* Release all its slaves or sentinels if any. */
910 dictRelease(ri->sentinels);
911 dictRelease(ri->slaves);
912
913 /* Release hiredis connections. */
914 if (ri->cc) sentinelKillLink(ri,ri->cc);
915 if (ri->pc) sentinelKillLink(ri,ri->pc);
916
917 /* Free other resources. */
918 sdsfree(ri->name);
919 sdsfree(ri->runid);
920 sdsfree(ri->notification_script);
921 sdsfree(ri->client_reconfig_script);
922 sdsfree(ri->slave_master_host);
923 sdsfree(ri->leader);
924 releaseSentinelAddr(ri->addr);
925
926 /* Clear state into the master if needed. */
927 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
928 ri->master->promoted_slave = NULL;
929
930 zfree(ri);
931 }
932
933 /* Lookup a slave in a master Redis instance, by ip and port. */
934 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
935 sentinelRedisInstance *ri, char *ip, int port)
936 {
937 sds key;
938 sentinelRedisInstance *slave;
939
940 redisAssert(ri->flags & SRI_MASTER);
941 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
942 slave = dictFetchValue(ri->slaves,key);
943 sdsfree(key);
944 return slave;
945 }
946
947 /* Return the name of the type of the instance as a string. */
948 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
949 if (ri->flags & SRI_MASTER) return "master";
950 else if (ri->flags & SRI_SLAVE) return "slave";
951 else if (ri->flags & SRI_SENTINEL) return "sentinel";
952 else return "unknown";
953 }
954
955 /* This function removes all the instances found in the dictionary of instances
956 * 'd', having either:
957 *
958 * 1) The same ip/port as specified.
959 * 2) The same runid.
960 *
961 * "1" and "2" don't need to verify at the same time, just one is enough.
962 * If "runid" is NULL it is not checked.
963 * Similarly if "ip" is NULL it is not checked.
964 *
965 * This function is useful because every time we add a new Sentinel into
966 * a master's Sentinels dictionary, we want to be very sure about not
967 * having duplicated instances for any reason. This is so important because
968 * we use those other sentinels in order to run our quorum protocol to
969 * understand if it's time to proceeed with the fail over.
970 *
971 * Making sure no duplication is possible we greately improve the robustness
972 * of the quorum (otherwise we may end counting the same instance multiple
973 * times for some reason).
974 *
975 * The function returns the number of Sentinels removed. */
976 int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
977 dictIterator *di;
978 dictEntry *de;
979 int removed = 0;
980
981 di = dictGetSafeIterator(master->sentinels);
982 while((de = dictNext(di)) != NULL) {
983 sentinelRedisInstance *ri = dictGetVal(de);
984
985 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
986 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
987 {
988 dictDelete(master->sentinels,ri->name);
989 removed++;
990 }
991 }
992 dictReleaseIterator(di);
993 return removed;
994 }
995
996 /* Search an instance with the same runid, ip and port into a dictionary
997 * of instances. Return NULL if not found, otherwise return the instance
998 * pointer.
999 *
1000 * runid or ip can be NULL. In such a case the search is performed only
1001 * by the non-NULL field. */
1002 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
1003 dictIterator *di;
1004 dictEntry *de;
1005 sentinelRedisInstance *instance = NULL;
1006
1007 redisAssert(ip || runid); /* User must pass at least one search param. */
1008 di = dictGetIterator(instances);
1009 while((de = dictNext(di)) != NULL) {
1010 sentinelRedisInstance *ri = dictGetVal(de);
1011
1012 if (runid && !ri->runid) continue;
1013 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
1014 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
1015 ri->addr->port == port)))
1016 {
1017 instance = ri;
1018 break;
1019 }
1020 }
1021 dictReleaseIterator(di);
1022 return instance;
1023 }
1024
1025 /* Simple master lookup by name */
1026 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
1027 sentinelRedisInstance *ri;
1028 sds sdsname = sdsnew(name);
1029
1030 ri = dictFetchValue(sentinel.masters,sdsname);
1031 sdsfree(sdsname);
1032 return ri;
1033 }
1034
1035 /* Add the specified flags to all the instances in the specified dictionary. */
1036 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
1037 dictIterator *di;
1038 dictEntry *de;
1039
1040 di = dictGetIterator(instances);
1041 while((de = dictNext(di)) != NULL) {
1042 sentinelRedisInstance *ri = dictGetVal(de);
1043 ri->flags |= flags;
1044 }
1045 dictReleaseIterator(di);
1046 }
1047
1048 /* Remove the specified flags to all the instances in the specified
1049 * dictionary. */
1050 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
1051 dictIterator *di;
1052 dictEntry *de;
1053
1054 di = dictGetIterator(instances);
1055 while((de = dictNext(di)) != NULL) {
1056 sentinelRedisInstance *ri = dictGetVal(de);
1057 ri->flags &= ~flags;
1058 }
1059 dictReleaseIterator(di);
1060 }
1061
1062 /* Reset the state of a monitored master:
1063 * 1) Remove all slaves.
1064 * 2) Remove all sentinels.
1065 * 3) Remove most of the flags resulting from runtime operations.
1066 * 4) Reset timers to their default value.
1067 * 5) In the process of doing this undo the failover if in progress.
1068 * 6) Disconnect the connections with the master (will reconnect automatically).
1069 */
1070 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
1071 redisAssert(ri->flags & SRI_MASTER);
1072 dictRelease(ri->slaves);
1073 dictRelease(ri->sentinels);
1074 ri->slaves = dictCreate(&instancesDictType,NULL);
1075 ri->sentinels = dictCreate(&instancesDictType,NULL);
1076 if (ri->cc) sentinelKillLink(ri,ri->cc);
1077 if (ri->pc) sentinelKillLink(ri,ri->pc);
1078 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
1079 if (ri->leader) {
1080 sdsfree(ri->leader);
1081 ri->leader = NULL;
1082 }
1083 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1084 ri->failover_state_change_time = 0;
1085 ri->failover_start_time = 0;
1086 ri->promoted_slave = NULL;
1087 sdsfree(ri->runid);
1088 sdsfree(ri->slave_master_host);
1089 ri->runid = NULL;
1090 ri->slave_master_host = NULL;
1091 ri->last_avail_time = mstime();
1092 ri->last_pong_time = mstime();
1093 if (flags & SENTINEL_GENERATE_EVENT)
1094 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
1095 }
1096
1097 /* Call sentinelResetMaster() on every master with a name matching the specified
1098 * pattern. */
1099 int sentinelResetMastersByPattern(char *pattern, int flags) {
1100 dictIterator *di;
1101 dictEntry *de;
1102 int reset = 0;
1103
1104 di = dictGetIterator(sentinel.masters);
1105 while((de = dictNext(di)) != NULL) {
1106 sentinelRedisInstance *ri = dictGetVal(de);
1107
1108 if (ri->name) {
1109 if (stringmatch(pattern,ri->name,0)) {
1110 sentinelResetMaster(ri,flags);
1111 reset++;
1112 }
1113 }
1114 }
1115 dictReleaseIterator(di);
1116 return reset;
1117 }
1118
1119 /* Reset the specified master with sentinelResetMaster(), and also change
1120 * the ip:port address, but take the name of the instance unmodified.
1121 *
1122 * This is used to handle the +switch-master and +redirect-to-master events.
1123 *
1124 * The function returns REDIS_ERR if the address can't be resolved for some
1125 * reason. Otherwise REDIS_OK is returned.
1126 *
1127 * TODO: make this reset so that original sentinels are re-added with
1128 * same ip / port / runid.
1129 */
1130
1131 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
1132 sentinelAddr *oldaddr, *newaddr;
1133
1134 newaddr = createSentinelAddr(ip,port);
1135 if (newaddr == NULL) return REDIS_ERR;
1136 sentinelResetMaster(master,SENTINEL_NO_FLAGS);
1137 oldaddr = master->addr;
1138 master->addr = newaddr;
1139 /* Release the old address at the end so we are safe even if the function
1140 * gets the master->addr->ip and master->addr->port as arguments. */
1141 releaseSentinelAddr(oldaddr);
1142 return REDIS_OK;
1143 }
1144
1145 /* ============================ Config handling ============================= */
1146 char *sentinelHandleConfiguration(char **argv, int argc) {
1147 sentinelRedisInstance *ri;
1148
1149 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
1150 /* monitor <name> <host> <port> <quorum> */
1151 int quorum = atoi(argv[4]);
1152
1153 if (quorum <= 0) return "Quorum must be 1 or greater.";
1154 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
1155 atoi(argv[3]),quorum,NULL) == NULL)
1156 {
1157 switch(errno) {
1158 case EBUSY: return "Duplicated master name.";
1159 case ENOENT: return "Can't resolve master instance hostname.";
1160 case EINVAL: return "Invalid port number";
1161 }
1162 }
1163 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
1164 /* down-after-milliseconds <name> <milliseconds> */
1165 ri = sentinelGetMasterByName(argv[1]);
1166 if (!ri) return "No such master with specified name.";
1167 ri->down_after_period = atoi(argv[2]);
1168 if (ri->down_after_period <= 0)
1169 return "negative or zero time parameter.";
1170 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
1171 /* failover-timeout <name> <milliseconds> */
1172 ri = sentinelGetMasterByName(argv[1]);
1173 if (!ri) return "No such master with specified name.";
1174 ri->failover_timeout = atoi(argv[2]);
1175 if (ri->failover_timeout <= 0)
1176 return "negative or zero time parameter.";
1177 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
1178 /* can-failover <name> <yes/no> */
1179 int yesno = yesnotoi(argv[2]);
1180
1181 ri = sentinelGetMasterByName(argv[1]);
1182 if (!ri) return "No such master with specified name.";
1183 if (yesno == -1) return "Argument must be either yes or no.";
1184 if (yesno)
1185 ri->flags |= SRI_CAN_FAILOVER;
1186 else
1187 ri->flags &= ~SRI_CAN_FAILOVER;
1188 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
1189 /* parallel-syncs <name> <milliseconds> */
1190 ri = sentinelGetMasterByName(argv[1]);
1191 if (!ri) return "No such master with specified name.";
1192 ri->parallel_syncs = atoi(argv[2]);
1193 } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
1194 /* notification-script <name> <path> */
1195 ri = sentinelGetMasterByName(argv[1]);
1196 if (!ri) return "No such master with specified name.";
1197 if (access(argv[2],X_OK) == -1)
1198 return "Notification script seems non existing or non executable.";
1199 ri->notification_script = sdsnew(argv[2]);
1200 } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
1201 /* client-reconfig-script <name> <path> */
1202 ri = sentinelGetMasterByName(argv[1]);
1203 if (!ri) return "No such master with specified name.";
1204 if (access(argv[2],X_OK) == -1)
1205 return "Client reconfiguration script seems non existing or "
1206 "non executable.";
1207 ri->client_reconfig_script = sdsnew(argv[2]);
1208 } else {
1209 return "Unrecognized sentinel configuration statement.";
1210 }
1211 return NULL;
1212 }
1213
1214 /* ====================== hiredis connection handling ======================= */
1215
1216 /* Completely disconnect an hiredis link from an instance. */
1217 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
1218 if (ri->cc == c) {
1219 ri->cc = NULL;
1220 ri->pending_commands = 0;
1221 }
1222 if (ri->pc == c) ri->pc = NULL;
1223 c->data = NULL;
1224 ri->flags |= SRI_DISCONNECTED;
1225 redisAsyncFree(c);
1226 }
1227
1228 /* This function takes an hiredis context that is in an error condition
1229 * and make sure to mark the instance as disconnected performing the
1230 * cleanup needed.
1231 *
1232 * Note: we don't free the hiredis context as hiredis will do it for us
1233 * for async conenctions. */
1234 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
1235 sentinelRedisInstance *ri = c->data;
1236 int pubsub;
1237
1238 if (ri == NULL) return; /* The instance no longer exists. */
1239
1240 pubsub = (ri->pc == c);
1241 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
1242 "%@ #%s", c->errstr);
1243 if (pubsub)
1244 ri->pc = NULL;
1245 else
1246 ri->cc = NULL;
1247 ri->flags |= SRI_DISCONNECTED;
1248 }
1249
1250 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
1251 if (status != REDIS_OK) {
1252 sentinelDisconnectInstanceFromContext(c);
1253 } else {
1254 sentinelRedisInstance *ri = c->data;
1255 int pubsub = (ri->pc == c);
1256
1257 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
1258 "%@");
1259 }
1260 }
1261
1262 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
1263 sentinelDisconnectInstanceFromContext(c);
1264 }
1265
1266 /* Create the async connections for the specified instance if the instance
1267 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
1268 * one of the two links (commands and pub/sub) is missing. */
1269 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
1270 if (!(ri->flags & SRI_DISCONNECTED)) return;
1271
1272 /* Commands connection. */
1273 if (ri->cc == NULL) {
1274 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1275 if (ri->cc->err) {
1276 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
1277 ri->cc->errstr);
1278 sentinelKillLink(ri,ri->cc);
1279 } else {
1280 ri->cc_conn_time = mstime();
1281 ri->cc->data = ri;
1282 redisAeAttach(server.el,ri->cc);
1283 redisAsyncSetConnectCallback(ri->cc,
1284 sentinelLinkEstablishedCallback);
1285 redisAsyncSetDisconnectCallback(ri->cc,
1286 sentinelDisconnectCallback);
1287 }
1288 }
1289 /* Pub / Sub */
1290 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
1291 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1292 if (ri->pc->err) {
1293 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
1294 ri->pc->errstr);
1295 sentinelKillLink(ri,ri->pc);
1296 } else {
1297 int retval;
1298
1299 ri->pc_conn_time = mstime();
1300 ri->pc->data = ri;
1301 redisAeAttach(server.el,ri->pc);
1302 redisAsyncSetConnectCallback(ri->pc,
1303 sentinelLinkEstablishedCallback);
1304 redisAsyncSetDisconnectCallback(ri->pc,
1305 sentinelDisconnectCallback);
1306 /* Now we subscribe to the Sentinels "Hello" channel. */
1307 retval = redisAsyncCommand(ri->pc,
1308 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
1309 SENTINEL_HELLO_CHANNEL);
1310 if (retval != REDIS_OK) {
1311 /* If we can't subscribe, the Pub/Sub connection is useless
1312 * and we can simply disconnect it and try again. */
1313 sentinelKillLink(ri,ri->pc);
1314 return;
1315 }
1316 }
1317 }
1318 /* Clear the DISCONNECTED flags only if we have both the connections
1319 * (or just the commands connection if this is a slave or a
1320 * sentinel instance). */
1321 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
1322 ri->flags &= ~SRI_DISCONNECTED;
1323 }
1324
1325 /* ======================== Redis instances pinging ======================== */
1326
1327 /* Process the INFO output from masters. */
1328 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
1329 sds *lines;
1330 int numlines, j;
1331 int role = 0;
1332 int runid_changed = 0; /* true if runid changed. */
1333 int first_runid = 0; /* true if this is the first runid we receive. */
1334
1335 /* The following fields must be reset to a given value in the case they
1336 * are not found at all in the INFO output. */
1337 ri->master_link_down_time = 0;
1338
1339 /* Process line by line. */
1340 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
1341 for (j = 0; j < numlines; j++) {
1342 sentinelRedisInstance *slave;
1343 sds l = lines[j];
1344
1345 /* run_id:<40 hex chars>*/
1346 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
1347 if (ri->runid == NULL) {
1348 ri->runid = sdsnewlen(l+7,40);
1349 first_runid = 1;
1350 } else {
1351 if (strncmp(ri->runid,l+7,40) != 0) {
1352 runid_changed = 1;
1353 sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
1354 sdsfree(ri->runid);
1355 ri->runid = sdsnewlen(l+7,40);
1356 }
1357 }
1358 }
1359
1360 /* slave0:<ip>,<port>,<state> */
1361 if ((ri->flags & SRI_MASTER) &&
1362 sdslen(l) >= 7 &&
1363 !memcmp(l,"slave",5) && isdigit(l[5]))
1364 {
1365 char *ip, *port, *end;
1366
1367 ip = strchr(l,':'); if (!ip) continue;
1368 ip++; /* Now ip points to start of ip address. */
1369 port = strchr(ip,','); if (!port) continue;
1370 *port = '\0'; /* nul term for easy access. */
1371 port++; /* Now port points to start of port number. */
1372 end = strchr(port,','); if (!end) continue;
1373 *end = '\0'; /* nul term for easy access. */
1374
1375 /* Check if we already have this slave into our table,
1376 * otherwise add it. */
1377 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1378 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1379 atoi(port), ri->quorum,ri)) != NULL)
1380 {
1381 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1382 }
1383 }
1384 }
1385
1386 /* master_link_down_since_seconds:<seconds> */
1387 if (sdslen(l) >= 32 &&
1388 !memcmp(l,"master_link_down_since_seconds",30))
1389 {
1390 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1391 }
1392
1393 /* role:<role> */
1394 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1395 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1396
1397 if (role == SRI_SLAVE) {
1398 /* master_host:<host> */
1399 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1400 sdsfree(ri->slave_master_host);
1401 ri->slave_master_host = sdsnew(l+12);
1402 }
1403
1404 /* master_port:<port> */
1405 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1406 ri->slave_master_port = atoi(l+12);
1407
1408 /* master_link_status:<status> */
1409 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1410 ri->slave_master_link_status =
1411 (strcasecmp(l+19,"up") == 0) ?
1412 SENTINEL_MASTER_LINK_STATUS_UP :
1413 SENTINEL_MASTER_LINK_STATUS_DOWN;
1414 }
1415
1416 /* slave_priority:<priority> */
1417 if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
1418 ri->slave_priority = atoi(l+15);
1419 }
1420 }
1421 ri->info_refresh = mstime();
1422 sdsfreesplitres(lines,numlines);
1423
1424 /* ---------------------------- Acting half ----------------------------- */
1425 if (sentinel.tilt) return;
1426
1427 /* Act if a master turned into a slave. */
1428 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
1429 if ((first_runid || runid_changed) && ri->slave_master_host) {
1430 /* If it is the first time we receive INFO from it, but it's
1431 * a slave while it was configured as a master, we want to monitor
1432 * its master instead. */
1433 sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
1434 "%s %s %d %s %d",
1435 ri->name, ri->addr->ip, ri->addr->port,
1436 ri->slave_master_host, ri->slave_master_port);
1437 sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
1438 ri->slave_master_port);
1439 return;
1440 }
1441 }
1442
1443 /* Act if a slave turned into a master. */
1444 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
1445 if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1446 (runid_changed || first_runid))
1447 {
1448 /* If a slave turned into master but:
1449 *
1450 * 1) Failover not in progress.
1451 * 2) RunID hs changed, or its the first time we see an INFO output.
1452 *
1453 * We assume this is a reboot with a wrong configuration.
1454 * Log the event and remove the slave. */
1455 int retval;
1456
1457 sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
1458 retval = dictDelete(ri->master->slaves,ri->name);
1459 redisAssert(retval == REDIS_OK);
1460 return;
1461 } else if (ri->flags & SRI_PROMOTED) {
1462 /* If this is a promoted slave we can change state to the
1463 * failover state machine. */
1464 if ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1465 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1466 (ri->master->failover_state ==
1467 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1468 {
1469 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1470 ri->master->failover_state_change_time = mstime();
1471 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1472 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1473 ri->master,"%@");
1474 sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
1475 "start",ri->master->addr,ri->addr);
1476 }
1477 } else if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) ||
1478 ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1479 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1480 ri->master->failover_state ==
1481 SENTINEL_FAILOVER_STATE_WAIT_START))
1482 {
1483 /* No failover in progress? Then it is the start of a failover
1484 * and we are an observer.
1485 *
1486 * We also do that if we are a leader doing a failover, in wait
1487 * start, but well, somebody else started before us. */
1488
1489 if (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) {
1490 sentinelEvent(REDIS_WARNING,"-failover-abort-race",
1491 ri->master, "%@");
1492 sentinelAbortFailover(ri->master);
1493 }
1494
1495 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1496 sentinelEvent(REDIS_WARNING,"+failover-detected",ri->master,"%@");
1497 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1498 ri->master->failover_state_change_time = mstime();
1499 ri->master->promoted_slave = ri;
1500 ri->flags |= SRI_PROMOTED;
1501 sentinelCallClientReconfScript(ri->master,SENTINEL_OBSERVER,
1502 "start", ri->master->addr,ri->addr);
1503 /* We are an observer, so we can only assume that the leader
1504 * is reconfiguring the slave instances. For this reason we
1505 * set all the instances as RECONF_SENT waiting for progresses
1506 * on this side. */
1507 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1508 SRI_RECONF_SENT);
1509 }
1510 }
1511
1512 /* Detect if the slave that is in the process of being reconfigured
1513 * changed state. */
1514 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1515 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1516 {
1517 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1518 if ((ri->flags & SRI_RECONF_SENT) &&
1519 ri->slave_master_host &&
1520 strcmp(ri->slave_master_host,
1521 ri->master->promoted_slave->addr->ip) == 0 &&
1522 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1523 {
1524 ri->flags &= ~SRI_RECONF_SENT;
1525 ri->flags |= SRI_RECONF_INPROG;
1526 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1527 }
1528
1529 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1530 if ((ri->flags & SRI_RECONF_INPROG) &&
1531 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1532 {
1533 ri->flags &= ~SRI_RECONF_INPROG;
1534 ri->flags |= SRI_RECONF_DONE;
1535 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1536 /* If we are moving forward (a new slave is now configured)
1537 * we update the change_time as we are conceptually passing
1538 * to the next slave. */
1539 ri->failover_state_change_time = mstime();
1540 }
1541 }
1542 }
1543
1544 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1545 sentinelRedisInstance *ri = c->data;
1546 redisReply *r;
1547
1548 if (ri) ri->pending_commands--;
1549 if (!reply || !ri) return;
1550 r = reply;
1551
1552 if (r->type == REDIS_REPLY_STRING) {
1553 sentinelRefreshInstanceInfo(ri,r->str);
1554 }
1555 }
1556
1557 /* Just discard the reply. We use this when we are not monitoring the return
1558 * value of the command but its effects directly. */
1559 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1560 sentinelRedisInstance *ri = c->data;
1561
1562 if (ri) ri->pending_commands--;
1563 }
1564
1565 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1566 sentinelRedisInstance *ri = c->data;
1567 redisReply *r;
1568
1569 if (ri) ri->pending_commands--;
1570 if (!reply || !ri) return;
1571 r = reply;
1572
1573 if (r->type == REDIS_REPLY_STATUS ||
1574 r->type == REDIS_REPLY_ERROR) {
1575 /* Update the "instance available" field only if this is an
1576 * acceptable reply. */
1577 if (strncmp(r->str,"PONG",4) == 0 ||
1578 strncmp(r->str,"LOADING",7) == 0 ||
1579 strncmp(r->str,"MASTERDOWN",10) == 0)
1580 {
1581 ri->last_avail_time = mstime();
1582 } else {
1583 /* Send a SCRIPT KILL command if the instance appears to be
1584 * down because of a busy script. */
1585 if (strncmp(r->str,"BUSY",4) == 0 &&
1586 (ri->flags & SRI_S_DOWN) &&
1587 !(ri->flags & SRI_SCRIPT_KILL_SENT))
1588 {
1589 redisAsyncCommand(ri->cc,
1590 sentinelDiscardReplyCallback, NULL, "SCRIPT KILL");
1591 ri->flags |= SRI_SCRIPT_KILL_SENT;
1592 }
1593 }
1594 }
1595 ri->last_pong_time = mstime();
1596 }
1597
1598 /* This is called when we get the reply about the PUBLISH command we send
1599 * to the master to advertise this sentinel. */
1600 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1601 sentinelRedisInstance *ri = c->data;
1602 redisReply *r;
1603
1604 if (ri) ri->pending_commands--;
1605 if (!reply || !ri) return;
1606 r = reply;
1607
1608 /* Only update pub_time if we actually published our message. Otherwise
1609 * we'll retry against in 100 milliseconds. */
1610 if (r->type != REDIS_REPLY_ERROR)
1611 ri->last_pub_time = mstime();
1612 }
1613
1614 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
1615 * to discover other sentinels attached at the same master. */
1616 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1617 sentinelRedisInstance *ri = c->data;
1618 redisReply *r;
1619
1620 if (!reply || !ri) return;
1621 r = reply;
1622
1623 /* Update the last activity in the pubsub channel. Note that since we
1624 * receive our messages as well this timestamp can be used to detect
1625 * if the link is probably diconnected even if it seems otherwise. */
1626 ri->pc_last_activity = mstime();
1627
1628 /* Sanity check in the reply we expect, so that the code that follows
1629 * can avoid to check for details. */
1630 if (r->type != REDIS_REPLY_ARRAY ||
1631 r->elements != 3 ||
1632 r->element[0]->type != REDIS_REPLY_STRING ||
1633 r->element[1]->type != REDIS_REPLY_STRING ||
1634 r->element[2]->type != REDIS_REPLY_STRING ||
1635 strcmp(r->element[0]->str,"message") != 0) return;
1636
1637 /* We are not interested in meeting ourselves */
1638 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1639
1640 {
1641 int numtokens, port, removed, canfailover;
1642 char **token = sdssplitlen(r->element[2]->str,
1643 r->element[2]->len,
1644 ":",1,&numtokens);
1645 sentinelRedisInstance *sentinel;
1646
1647 if (numtokens == 4) {
1648 /* First, try to see if we already have this sentinel. */
1649 port = atoi(token[1]);
1650 canfailover = atoi(token[3]);
1651 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1652 ri->sentinels,token[0],port,token[2]);
1653
1654 if (!sentinel) {
1655 /* If not, remove all the sentinels that have the same runid
1656 * OR the same ip/port, because it's either a restart or a
1657 * network topology change. */
1658 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1659 token[2]);
1660 if (removed) {
1661 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1662 "%@ #duplicate of %s:%d or %s",
1663 token[0],port,token[2]);
1664 }
1665
1666 /* Add the new sentinel. */
1667 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1668 token[0],port,ri->quorum,ri);
1669 if (sentinel) {
1670 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1671 /* The runid is NULL after a new instance creation and
1672 * for Sentinels we don't have a later chance to fill it,
1673 * so do it now. */
1674 sentinel->runid = sdsnew(token[2]);
1675 }
1676 }
1677
1678 /* Update the state of the Sentinel. */
1679 if (sentinel) {
1680 sentinel->last_hello_time = mstime();
1681 if (canfailover)
1682 sentinel->flags |= SRI_CAN_FAILOVER;
1683 else
1684 sentinel->flags &= ~SRI_CAN_FAILOVER;
1685 }
1686 }
1687 sdsfreesplitres(token,numtokens);
1688 }
1689 }
1690
1691 void sentinelPingInstance(sentinelRedisInstance *ri) {
1692 mstime_t now = mstime();
1693 mstime_t info_period;
1694 int retval;
1695
1696 /* Return ASAP if we have already a PING or INFO already pending, or
1697 * in the case the instance is not properly connected. */
1698 if (ri->flags & SRI_DISCONNECTED) return;
1699
1700 /* For INFO, PING, PUBLISH that are not critical commands to send we
1701 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1702 * want to use a lot of memory just because a link is not working
1703 * properly (note that anyway there is a redundant protection about this,
1704 * that is, the link will be disconnected and reconnected if a long
1705 * timeout condition is detected. */
1706 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1707
1708 /* If this is a slave of a master in O_DOWN condition we start sending
1709 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1710 * period. In this state we want to closely monitor slaves in case they
1711 * are turned into masters by another Sentinel, or by the sysadmin. */
1712 if ((ri->flags & SRI_SLAVE) &&
1713 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1714 info_period = 1000;
1715 } else {
1716 info_period = SENTINEL_INFO_PERIOD;
1717 }
1718
1719 if ((ri->flags & SRI_SENTINEL) == 0 &&
1720 (ri->info_refresh == 0 ||
1721 (now - ri->info_refresh) > info_period))
1722 {
1723 /* Send INFO to masters and slaves, not sentinels. */
1724 retval = redisAsyncCommand(ri->cc,
1725 sentinelInfoReplyCallback, NULL, "INFO");
1726 if (retval != REDIS_OK) return;
1727 ri->pending_commands++;
1728 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1729 /* Send PING to all the three kinds of instances. */
1730 retval = redisAsyncCommand(ri->cc,
1731 sentinelPingReplyCallback, NULL, "PING");
1732 if (retval != REDIS_OK) return;
1733 ri->pending_commands++;
1734 } else if ((ri->flags & SRI_MASTER) &&
1735 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1736 {
1737 /* PUBLISH hello messages only to masters. */
1738 struct sockaddr_in sa;
1739 socklen_t salen = sizeof(sa);
1740
1741 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1742 char myaddr[128];
1743
1744 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1745 inet_ntoa(sa.sin_addr), server.port, server.runid,
1746 (ri->flags & SRI_CAN_FAILOVER) != 0);
1747 retval = redisAsyncCommand(ri->cc,
1748 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1749 SENTINEL_HELLO_CHANNEL,myaddr);
1750 if (retval != REDIS_OK) return;
1751 ri->pending_commands++;
1752 }
1753 }
1754 }
1755
1756 /* =========================== SENTINEL command ============================= */
1757
1758 const char *sentinelFailoverStateStr(int state) {
1759 switch(state) {
1760 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1761 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1762 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1763 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1764 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1765 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1766 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1767 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1768 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1769 default: return "unknown";
1770 }
1771 }
1772
1773 /* Redis instance to Redis protocol representation. */
1774 void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1775 char *flags = sdsempty();
1776 void *mbl;
1777 int fields = 0;
1778
1779 mbl = addDeferredMultiBulkLength(c);
1780
1781 addReplyBulkCString(c,"name");
1782 addReplyBulkCString(c,ri->name);
1783 fields++;
1784
1785 addReplyBulkCString(c,"ip");
1786 addReplyBulkCString(c,ri->addr->ip);
1787 fields++;
1788
1789 addReplyBulkCString(c,"port");
1790 addReplyBulkLongLong(c,ri->addr->port);
1791 fields++;
1792
1793 addReplyBulkCString(c,"runid");
1794 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1795 fields++;
1796
1797 addReplyBulkCString(c,"flags");
1798 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1799 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1800 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1801 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1802 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1803 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1804 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1805 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1806 flags = sdscat(flags,"failover_in_progress,");
1807 if (ri->flags & SRI_I_AM_THE_LEADER)
1808 flags = sdscat(flags,"i_am_the_leader,");
1809 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1810 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1811 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1812 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1813
1814 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1815 addReplyBulkCString(c,flags);
1816 sdsfree(flags);
1817 fields++;
1818
1819 addReplyBulkCString(c,"pending-commands");
1820 addReplyBulkLongLong(c,ri->pending_commands);
1821 fields++;
1822
1823 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1824 addReplyBulkCString(c,"failover-state");
1825 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1826 fields++;
1827 }
1828
1829 addReplyBulkCString(c,"last-ok-ping-reply");
1830 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1831 fields++;
1832
1833 addReplyBulkCString(c,"last-ping-reply");
1834 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1835 fields++;
1836
1837 if (ri->flags & SRI_S_DOWN) {
1838 addReplyBulkCString(c,"s-down-time");
1839 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1840 fields++;
1841 }
1842
1843 if (ri->flags & SRI_O_DOWN) {
1844 addReplyBulkCString(c,"o-down-time");
1845 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1846 fields++;
1847 }
1848
1849 /* Masters and Slaves */
1850 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1851 addReplyBulkCString(c,"info-refresh");
1852 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1853 fields++;
1854 }
1855
1856 /* Only masters */
1857 if (ri->flags & SRI_MASTER) {
1858 addReplyBulkCString(c,"num-slaves");
1859 addReplyBulkLongLong(c,dictSize(ri->slaves));
1860 fields++;
1861
1862 addReplyBulkCString(c,"num-other-sentinels");
1863 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1864 fields++;
1865
1866 addReplyBulkCString(c,"quorum");
1867 addReplyBulkLongLong(c,ri->quorum);
1868 fields++;
1869 }
1870
1871 /* Only slaves */
1872 if (ri->flags & SRI_SLAVE) {
1873 addReplyBulkCString(c,"master-link-down-time");
1874 addReplyBulkLongLong(c,ri->master_link_down_time);
1875 fields++;
1876
1877 addReplyBulkCString(c,"master-link-status");
1878 addReplyBulkCString(c,
1879 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1880 "ok" : "err");
1881 fields++;
1882
1883 addReplyBulkCString(c,"master-host");
1884 addReplyBulkCString(c,
1885 ri->slave_master_host ? ri->slave_master_host : "?");
1886 fields++;
1887
1888 addReplyBulkCString(c,"master-port");
1889 addReplyBulkLongLong(c,ri->slave_master_port);
1890 fields++;
1891
1892 addReplyBulkCString(c,"slave-priority");
1893 addReplyBulkLongLong(c,ri->slave_priority);
1894 fields++;
1895 }
1896
1897 /* Only sentinels */
1898 if (ri->flags & SRI_SENTINEL) {
1899 addReplyBulkCString(c,"last-hello-message");
1900 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1901 fields++;
1902
1903 addReplyBulkCString(c,"can-failover-its-master");
1904 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1905 fields++;
1906
1907 if (ri->flags & SRI_MASTER_DOWN) {
1908 addReplyBulkCString(c,"subjective-leader");
1909 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1910 fields++;
1911 }
1912 }
1913
1914 setDeferredMultiBulkLength(c,mbl,fields*2);
1915 }
1916
1917 /* Output a number of instances contanined inside a dictionary as
1918 * Redis protocol. */
1919 void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1920 dictIterator *di;
1921 dictEntry *de;
1922
1923 di = dictGetIterator(instances);
1924 addReplyMultiBulkLen(c,dictSize(instances));
1925 while((de = dictNext(di)) != NULL) {
1926 sentinelRedisInstance *ri = dictGetVal(de);
1927
1928 addReplySentinelRedisInstance(c,ri);
1929 }
1930 dictReleaseIterator(di);
1931 }
1932
1933 /* Lookup the named master into sentinel.masters.
1934 * If the master is not found reply to the client with an error and returns
1935 * NULL. */
1936 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1937 robj *name)
1938 {
1939 sentinelRedisInstance *ri;
1940
1941 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1942 if (!ri) {
1943 addReplyError(c,"No such master with that name");
1944 return NULL;
1945 }
1946 return ri;
1947 }
1948
1949 void sentinelCommand(redisClient *c) {
1950 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1951 /* SENTINEL MASTERS */
1952 if (c->argc != 2) goto numargserr;
1953
1954 addReplyDictOfRedisInstances(c,sentinel.masters);
1955 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1956 /* SENTINEL SLAVES <master-name> */
1957 sentinelRedisInstance *ri;
1958
1959 if (c->argc != 3) goto numargserr;
1960 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1961 return;
1962 addReplyDictOfRedisInstances(c,ri->slaves);
1963 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1964 /* SENTINEL SENTINELS <master-name> */
1965 sentinelRedisInstance *ri;
1966
1967 if (c->argc != 3) goto numargserr;
1968 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1969 return;
1970 addReplyDictOfRedisInstances(c,ri->sentinels);
1971 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1972 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1973 sentinelRedisInstance *ri;
1974 char *leader = NULL;
1975 long port;
1976 int isdown = 0;
1977
1978 if (c->argc != 4) goto numargserr;
1979 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
1980 return;
1981 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
1982 c->argv[2]->ptr,port,NULL);
1983
1984 /* It exists? Is actually a master? Is subjectively down? It's down.
1985 * Note: if we are in tilt mode we always reply with "0". */
1986 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
1987 (ri->flags & SRI_MASTER))
1988 isdown = 1;
1989 if (ri) leader = sentinelGetSubjectiveLeader(ri);
1990
1991 /* Reply with a two-elements multi-bulk reply: down state, leader. */
1992 addReplyMultiBulkLen(c,2);
1993 addReply(c, isdown ? shared.cone : shared.czero);
1994 addReplyBulkCString(c, leader ? leader : "?");
1995 if (leader) sdsfree(leader);
1996 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
1997 /* SENTINEL RESET <pattern> */
1998 if (c->argc != 3) goto numargserr;
1999 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
2000 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
2001 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
2002 sentinelRedisInstance *ri;
2003
2004 if (c->argc != 3) goto numargserr;
2005 ri = sentinelGetMasterByName(c->argv[2]->ptr);
2006 if (ri == NULL) {
2007 addReply(c,shared.nullmultibulk);
2008 } else if (ri->info_refresh == 0) {
2009 addReplySds(c,sdsnew("-IDONTKNOW I have not enough information to reply. Please ask another Sentinel.\r\n"));
2010 } else {
2011 sentinelAddr *addr = ri->addr;
2012
2013 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
2014 addr = ri->promoted_slave->addr;
2015 addReplyMultiBulkLen(c,2);
2016 addReplyBulkCString(c,addr->ip);
2017 addReplyBulkLongLong(c,addr->port);
2018 }
2019 } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
2020 /* SENTINEL FAILOVER <master-name> */
2021 sentinelRedisInstance *ri;
2022
2023 if (c->argc != 3) goto numargserr;
2024 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
2025 return;
2026 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
2027 addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
2028 return;
2029 }
2030 if (sentinelSelectSlave(ri) == NULL) {
2031 addReplySds(c,sdsnew("-NOGOODSLAVE No suitable slave to promote\r\n"));
2032 return;
2033 }
2034 sentinelStartFailover(ri,SENTINEL_FAILOVER_STATE_WAIT_START);
2035 ri->flags |= SRI_FORCE_FAILOVER;
2036 addReply(c,shared.ok);
2037 } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
2038 /* SENTINEL PENDING-SCRIPTS */
2039
2040 if (c->argc != 2) goto numargserr;
2041 sentinelPendingScriptsCommand(c);
2042 } else {
2043 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
2044 (char*)c->argv[1]->ptr);
2045 }
2046 return;
2047
2048 numargserr:
2049 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
2050 (char*)c->argv[1]->ptr);
2051 }
2052
2053 void sentinelInfoCommand(redisClient *c) {
2054 char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
2055 sds info = sdsempty();
2056 int defsections = !strcasecmp(section,"default");
2057 int sections = 0;
2058
2059 if (c->argc > 2) {
2060 addReply(c,shared.syntaxerr);
2061 return;
2062 }
2063
2064 if (!strcasecmp(section,"server") || defsections) {
2065 if (sections++) info = sdscat(info,"\r\n");
2066 sds serversection = genRedisInfoString("server");
2067 info = sdscatlen(info,serversection,sdslen(serversection));
2068 sdsfree(serversection);
2069 }
2070
2071 if (!strcasecmp(section,"sentinel") || defsections) {
2072 dictIterator *di;
2073 dictEntry *de;
2074 int master_id = 0;
2075
2076 if (sections++) info = sdscat(info,"\r\n");
2077 info = sdscatprintf(info,
2078 "# Sentinel\r\n"
2079 "sentinel_masters:%lu\r\n"
2080 "sentinel_tilt:%d\r\n"
2081 "sentinel_running_scripts:%d\r\n"
2082 "sentinel_scripts_queue_length:%ld\r\n",
2083 dictSize(sentinel.masters),
2084 sentinel.tilt,
2085 sentinel.running_scripts,
2086 listLength(sentinel.scripts_queue));
2087
2088 di = dictGetIterator(sentinel.masters);
2089 while((de = dictNext(di)) != NULL) {
2090 sentinelRedisInstance *ri = dictGetVal(de);
2091 char *status = "ok";
2092
2093 if (ri->flags & SRI_O_DOWN) status = "odown";
2094 else if (ri->flags & SRI_S_DOWN) status = "sdown";
2095 info = sdscatprintf(info,
2096 "master%d:name=%s,status=%s,address=%s:%d,"
2097 "slaves=%lu,sentinels=%lu\r\n",
2098 master_id++, ri->name, status,
2099 ri->addr->ip, ri->addr->port,
2100 dictSize(ri->slaves),
2101 dictSize(ri->sentinels)+1);
2102 }
2103 dictReleaseIterator(di);
2104 }
2105
2106 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
2107 (unsigned long)sdslen(info)));
2108 addReplySds(c,info);
2109 addReply(c,shared.crlf);
2110 }
2111
2112 /* ===================== SENTINEL availability checks ======================= */
2113
2114 /* Is this instance down from our point of view? */
2115 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
2116 mstime_t elapsed = mstime() - ri->last_avail_time;
2117
2118 /* Check if we are in need for a reconnection of one of the
2119 * links, because we are detecting low activity.
2120 *
2121 * 1) Check if the command link seems connected, was connected not less
2122 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
2123 * idle time that is greater than down_after_period / 2 seconds. */
2124 if (ri->cc &&
2125 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
2126 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
2127 {
2128 sentinelKillLink(ri,ri->cc);
2129 }
2130
2131 /* 2) Check if the pubsub link seems connected, was connected not less
2132 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
2133 * activity in the Pub/Sub channel for more than
2134 * SENTINEL_PUBLISH_PERIOD * 3.
2135 */
2136 if (ri->pc &&
2137 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
2138 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
2139 {
2140 sentinelKillLink(ri,ri->pc);
2141 }
2142
2143 /* Update the subjectively down flag. */
2144 if (elapsed > ri->down_after_period) {
2145 /* Is subjectively down */
2146 if ((ri->flags & SRI_S_DOWN) == 0) {
2147 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
2148 ri->s_down_since_time = mstime();
2149 ri->flags |= SRI_S_DOWN;
2150 }
2151 } else {
2152 /* Is subjectively up */
2153 if (ri->flags & SRI_S_DOWN) {
2154 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
2155 ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
2156 }
2157 }
2158 }
2159
2160 /* Is this instance down accordingly to the configured quorum? */
2161 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
2162 dictIterator *di;
2163 dictEntry *de;
2164 int quorum = 0, odown = 0;
2165
2166 if (master->flags & SRI_S_DOWN) {
2167 /* Is down for enough sentinels? */
2168 quorum = 1; /* the current sentinel. */
2169 /* Count all the other sentinels. */
2170 di = dictGetIterator(master->sentinels);
2171 while((de = dictNext(di)) != NULL) {
2172 sentinelRedisInstance *ri = dictGetVal(de);
2173
2174 if (ri->flags & SRI_MASTER_DOWN) quorum++;
2175 }
2176 dictReleaseIterator(di);
2177 if (quorum >= master->quorum) odown = 1;
2178 }
2179
2180 /* Set the flag accordingly to the outcome. */
2181 if (odown) {
2182 if ((master->flags & SRI_O_DOWN) == 0) {
2183 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
2184 quorum, master->quorum);
2185 master->flags |= SRI_O_DOWN;
2186 master->o_down_since_time = mstime();
2187 }
2188 } else {
2189 if (master->flags & SRI_O_DOWN) {
2190 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
2191 master->flags &= ~SRI_O_DOWN;
2192 }
2193 }
2194 }
2195
2196 /* Receive the SENTINEL is-master-down-by-addr reply, see the
2197 * sentinelAskMasterStateToOtherSentinels() function for more information. */
2198 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
2199 sentinelRedisInstance *ri = c->data;
2200 redisReply *r;
2201
2202 if (ri) ri->pending_commands--;
2203 if (!reply || !ri) return;
2204 r = reply;
2205
2206 /* Ignore every error or unexpected reply.
2207 * Note that if the command returns an error for any reason we'll
2208 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
2209 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
2210 r->element[0]->type == REDIS_REPLY_INTEGER &&
2211 r->element[1]->type == REDIS_REPLY_STRING)
2212 {
2213 ri->last_master_down_reply_time = mstime();
2214 if (r->element[0]->integer == 1) {
2215 ri->flags |= SRI_MASTER_DOWN;
2216 } else {
2217 ri->flags &= ~SRI_MASTER_DOWN;
2218 }
2219 sdsfree(ri->leader);
2220 ri->leader = sdsnew(r->element[1]->str);
2221 }
2222 }
2223
2224 /* If we think (subjectively) the master is down, we start sending
2225 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
2226 * in order to get the replies that allow to reach the quorum and
2227 * possibly also mark the master as objectively down. */
2228 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
2229 dictIterator *di;
2230 dictEntry *de;
2231
2232 di = dictGetIterator(master->sentinels);
2233 while((de = dictNext(di)) != NULL) {
2234 sentinelRedisInstance *ri = dictGetVal(de);
2235 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
2236 char port[32];
2237 int retval;
2238
2239 /* If the master state from other sentinel is too old, we clear it. */
2240 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
2241 ri->flags &= ~SRI_MASTER_DOWN;
2242 sdsfree(ri->leader);
2243 ri->leader = NULL;
2244 }
2245
2246 /* Only ask if master is down to other sentinels if:
2247 *
2248 * 1) We believe it is down, or there is a failover in progress.
2249 * 2) Sentinel is connected.
2250 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
2251 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
2252 continue;
2253 if (ri->flags & SRI_DISCONNECTED) continue;
2254 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
2255 continue;
2256
2257 /* Ask */
2258 ll2string(port,sizeof(port),master->addr->port);
2259 retval = redisAsyncCommand(ri->cc,
2260 sentinelReceiveIsMasterDownReply, NULL,
2261 "SENTINEL is-master-down-by-addr %s %s",
2262 master->addr->ip, port);
2263 if (retval == REDIS_OK) ri->pending_commands++;
2264 }
2265 dictReleaseIterator(di);
2266 }
2267
2268 /* =============================== FAILOVER ================================= */
2269
2270 /* Given a master get the "subjective leader", that is, among all the sentinels
2271 * with given characteristics, the one with the lexicographically smaller
2272 * runid. The characteristics required are:
2273 *
2274 * 1) Has SRI_CAN_FAILOVER flag.
2275 * 2) Is not disconnected.
2276 * 3) Recently answered to our ping (no longer than
2277 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
2278 *
2279 * The function returns a pointer to an sds string representing the runid of the
2280 * leader sentinel instance (from our point of view). Otherwise NULL is
2281 * returned if there are no suitable sentinels.
2282 */
2283
2284 int compareRunID(const void *a, const void *b) {
2285 char **aptrptr = (char**)a, **bptrptr = (char**)b;
2286 return strcasecmp(*aptrptr, *bptrptr);
2287 }
2288
2289 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
2290 dictIterator *di;
2291 dictEntry *de;
2292 char **instance =
2293 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
2294 int instances = 0;
2295 char *leader = NULL;
2296
2297 if (master->flags & SRI_CAN_FAILOVER) {
2298 /* Add myself if I'm a Sentinel that can failover this master. */
2299 instance[instances++] = server.runid;
2300 }
2301
2302 di = dictGetIterator(master->sentinels);
2303 while((de = dictNext(di)) != NULL) {
2304 sentinelRedisInstance *ri = dictGetVal(de);
2305 mstime_t lag = mstime() - ri->last_avail_time;
2306
2307 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
2308 !(ri->flags & SRI_CAN_FAILOVER) ||
2309 (ri->flags & SRI_DISCONNECTED) ||
2310 ri->runid == NULL)
2311 continue;
2312 instance[instances++] = ri->runid;
2313 }
2314 dictReleaseIterator(di);
2315
2316 /* If we have at least one instance passing our checks, order the array
2317 * by runid. */
2318 if (instances) {
2319 qsort(instance,instances,sizeof(char*),compareRunID);
2320 leader = sdsnew(instance[0]);
2321 }
2322 zfree(instance);
2323 return leader;
2324 }
2325
2326 struct sentinelLeader {
2327 char *runid;
2328 unsigned long votes;
2329 };
2330
2331 /* Helper function for sentinelGetObjectiveLeader, increment the counter
2332 * relative to the specified runid. */
2333 void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
2334 dictEntry *de = dictFind(counters,runid);
2335 uint64_t oldval;
2336
2337 if (de) {
2338 oldval = dictGetUnsignedIntegerVal(de);
2339 dictSetUnsignedIntegerVal(de,oldval+1);
2340 } else {
2341 de = dictAddRaw(counters,runid);
2342 redisAssert(de != NULL);
2343 dictSetUnsignedIntegerVal(de,1);
2344 }
2345 }
2346
2347 /* Scan all the Sentinels attached to this master to check what is the
2348 * most voted leader among Sentinels. */
2349 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
2350 dict *counters;
2351 dictIterator *di;
2352 dictEntry *de;
2353 unsigned int voters = 0, voters_quorum;
2354 char *myvote;
2355 char *winner = NULL;
2356
2357 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
2358 counters = dictCreate(&leaderVotesDictType,NULL);
2359
2360 /* Count my vote. */
2361 myvote = sentinelGetSubjectiveLeader(master);
2362 if (myvote) {
2363 sentinelObjectiveLeaderIncr(counters,myvote);
2364 voters++;
2365 }
2366
2367 /* Count other sentinels votes */
2368 di = dictGetIterator(master->sentinels);
2369 while((de = dictNext(di)) != NULL) {
2370 sentinelRedisInstance *ri = dictGetVal(de);
2371 if (ri->leader == NULL) continue;
2372 /* If the failover is not already in progress we are only interested
2373 * in Sentinels that believe the master is down. Otherwise the leader
2374 * selection is useful for the "failover-takedown" when the original
2375 * leader fails. In that case we consider all the voters. */
2376 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
2377 !(ri->flags & SRI_MASTER_DOWN)) continue;
2378 sentinelObjectiveLeaderIncr(counters,ri->leader);
2379 voters++;
2380 }
2381 dictReleaseIterator(di);
2382 voters_quorum = voters/2+1;
2383
2384 /* Check what's the winner. For the winner to win, it needs two conditions:
2385 * 1) Absolute majority between voters (50% + 1).
2386 * 2) And anyway at least master->quorum votes. */
2387 {
2388 uint64_t max_votes = 0; /* Max votes so far. */
2389
2390 di = dictGetIterator(counters);
2391 while((de = dictNext(di)) != NULL) {
2392 uint64_t votes = dictGetUnsignedIntegerVal(de);
2393
2394 if (max_votes < votes) {
2395 max_votes = votes;
2396 winner = dictGetKey(de);
2397 }
2398 }
2399 dictReleaseIterator(di);
2400 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
2401 winner = NULL;
2402 }
2403 winner = winner ? sdsnew(winner) : NULL;
2404 sdsfree(myvote);
2405 dictRelease(counters);
2406 return winner;
2407 }
2408
2409 /* Setup the master state to start a failover as a leader.
2410 *
2411 * State can be either:
2412 *
2413 * SENTINEL_FAILOVER_STATE_WAIT_START: starts a failover from scratch.
2414 * SENTINEL_FAILOVER_STATE_RECONF_SLAVES: takedown a failed failover.
2415 */
2416 void sentinelStartFailover(sentinelRedisInstance *master, int state) {
2417 redisAssert(master->flags & SRI_MASTER);
2418 redisAssert(state == SENTINEL_FAILOVER_STATE_WAIT_START ||
2419 state == SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
2420
2421 master->failover_state = state;
2422 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
2423 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
2424
2425 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
2426 * a recovery of a failover started by another sentinel. */
2427 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
2428 master->failover_start_time = mstime() +
2429 SENTINEL_FAILOVER_FIXED_DELAY +
2430 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
2431 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
2432 "%@ #starting in %lld milliseconds",
2433 master->failover_start_time-mstime());
2434 }
2435 master->failover_state_change_time = mstime();
2436 }
2437
2438 /* This function checks if there are the conditions to start the failover,
2439 * that is:
2440 *
2441 * 1) Enough time has passed since O_DOWN.
2442 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
2443 * 3) We are the objectively leader for this master.
2444 *
2445 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
2446 * and SRI_I_AM_THE_LEADER.
2447 */
2448 void sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
2449 char *leader;
2450 int isleader;
2451
2452 /* We can't failover if the master is not in O_DOWN state or if
2453 * there is not already a failover in progress (to perform the
2454 * takedown if the leader died) or if this Sentinel is not allowed
2455 * to start a failover. */
2456 if (!(master->flags & SRI_CAN_FAILOVER) ||
2457 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
2458
2459 leader = sentinelGetObjectiveLeader(master);
2460 isleader = leader && strcasecmp(leader,server.runid) == 0;
2461 sdsfree(leader);
2462
2463 /* If I'm not the leader, I can't failover for sure. */
2464 if (!isleader) return;
2465
2466 /* If the failover is already in progress there are two options... */
2467 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
2468 if (master->flags & SRI_I_AM_THE_LEADER) {
2469 /* 1) I'm flagged as leader so I already started the failover.
2470 * Just return. */
2471 return;
2472 } else {
2473 mstime_t elapsed = mstime() - master->failover_state_change_time;
2474
2475 /* 2) I'm the new leader, but I'm not flagged as leader in the
2476 * master: I did not started the failover, but the original
2477 * leader has no longer the leadership.
2478 *
2479 * In this case if the failover appears to be lagging
2480 * for at least 25% of the configured failover timeout,
2481 * I can assume I can take control. Otherwise
2482 * it's better to return and wait more. */
2483 if (elapsed < (master->failover_timeout/4)) return;
2484 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
2485 /* We have already an elected slave if we are in
2486 * FAILOVER_IN_PROGRESS state, that is, the slave that we
2487 * observed turning into a master. */
2488 sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
2489 /* As an observer we flagged all the slaves as RECONF_SENT but
2490 * now we are in charge of actually sending the reconfiguration
2491 * command so let's clear this flag for all the instances. */
2492 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
2493 SRI_RECONF_SENT);
2494 }
2495 } else {
2496 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set.
2497 *
2498 * Do we have a slave to promote? Otherwise don't start a failover
2499 * at all. */
2500 if (sentinelSelectSlave(master) == NULL) return;
2501 sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_WAIT_START);
2502 }
2503 }
2504
2505 /* Select a suitable slave to promote. The current algorithm only uses
2506 * the following parameters:
2507 *
2508 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
2509 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
2510 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
2511 * 4) master_link_down_time no more than:
2512 * (now - master->s_down_since_time) + (master->down_after_period * 10).
2513 * 5) Slave priority can't be zero, otherwise the slave is discareded.
2514 *
2515 * Among all the slaves matching the above conditions we select the slave
2516 * with lower slave_priority. If priority is the same we select the slave
2517 * with lexicographically smaller runid.
2518 *
2519 * The function returns the pointer to the selected slave, otherwise
2520 * NULL if no suitable slave was found.
2521 */
2522
2523 int compareSlavesForPromotion(const void *a, const void *b) {
2524 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
2525 **sb = (sentinelRedisInstance **)b;
2526 char *sa_runid, *sb_runid;
2527
2528 if ((*sa)->slave_priority != (*sb)->slave_priority)
2529 return (*sa)->slave_priority - (*sb)->slave_priority;
2530
2531 /* If priority is the same, select the slave with that has the
2532 * lexicographically smaller runid. Note that we try to handle runid
2533 * == NULL as there are old Redis versions that don't publish runid in
2534 * INFO. A NULL runid is considered bigger than any other runid. */
2535 sa_runid = (*sa)->runid;
2536 sb_runid = (*sb)->runid;
2537 if (sa_runid == NULL && sb_runid == NULL) return 0;
2538 else if (sa_runid == NULL) return 1; /* a > b */
2539 else if (sb_runid == NULL) return -1; /* a < b */
2540 return strcasecmp(sa_runid, sb_runid);
2541 }
2542
2543 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
2544 sentinelRedisInstance **instance =
2545 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
2546 sentinelRedisInstance *selected = NULL;
2547 int instances = 0;
2548 dictIterator *di;
2549 dictEntry *de;
2550 mstime_t max_master_down_time = 0;
2551
2552 if (master->flags & SRI_S_DOWN)
2553 max_master_down_time += mstime() - master->s_down_since_time;
2554 max_master_down_time += master->down_after_period * 10;
2555
2556 di = dictGetIterator(master->slaves);
2557 while((de = dictNext(di)) != NULL) {
2558 sentinelRedisInstance *slave = dictGetVal(de);
2559 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2560
2561 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2562 if (slave->last_avail_time < info_validity_time) continue;
2563 if (slave->slave_priority == 0) continue;
2564
2565 /* If the master is in SDOWN state we get INFO for slaves every second.
2566 * Otherwise we get it with the usual period so we need to account for
2567 * a larger delay. */
2568 if ((master->flags & SRI_S_DOWN) == 0)
2569 info_validity_time -= SENTINEL_INFO_PERIOD;
2570 if (slave->info_refresh < info_validity_time) continue;
2571 if (slave->master_link_down_time > max_master_down_time) continue;
2572 instance[instances++] = slave;
2573 }
2574 dictReleaseIterator(di);
2575 if (instances) {
2576 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2577 compareSlavesForPromotion);
2578 selected = instance[0];
2579 }
2580 zfree(instance);
2581 return selected;
2582 }
2583
2584 /* ---------------- Failover state machine implementation ------------------- */
2585 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2586 /* If we in "wait start" but the master is no longer in ODOWN nor in
2587 * SDOWN condition we abort the failover. This is important as it
2588 * prevents a useless failover in a a notable case of netsplit, where
2589 * the senitnels are split from the redis instances. In this case
2590 * the failover will not start while there is the split because no
2591 * good slave can be reached. However when the split is resolved, we
2592 * can go to waitstart if the slave is back rechable a few milliseconds
2593 * before the master is. In that case when the master is back online
2594 * we cancel the failover. */
2595 if ((ri->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_FORCE_FAILOVER)) == 0) {
2596 sentinelEvent(REDIS_WARNING,"-failover-abort-master-is-back",
2597 ri,"%@");
2598 sentinelAbortFailover(ri);
2599 return;
2600 }
2601
2602 /* Start the failover going to the next state if enough time has
2603 * elapsed. */
2604 if (mstime() >= ri->failover_start_time) {
2605 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2606 ri->failover_state_change_time = mstime();
2607 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2608 }
2609 }
2610
2611 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2612 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2613
2614 if (slave == NULL) {
2615 sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
2616 sentinelAbortFailover(ri);
2617 } else {
2618 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2619 slave->flags |= SRI_PROMOTED;
2620 ri->promoted_slave = slave;
2621 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2622 ri->failover_state_change_time = mstime();
2623 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2624 slave, "%@");
2625 }
2626 }
2627
2628 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2629 int retval;
2630
2631 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2632
2633 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2634 * We actually register a generic callback for this command as we don't
2635 * really care about the reply. We check if it worked indirectly observing
2636 * if INFO returns a different role (master instead of slave). */
2637 retval = redisAsyncCommand(ri->promoted_slave->cc,
2638 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2639 if (retval != REDIS_OK) return;
2640 ri->promoted_slave->pending_commands++;
2641 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2642 ri->promoted_slave,"%@");
2643 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2644 ri->failover_state_change_time = mstime();
2645 }
2646
2647 /* We actually wait for promotion indirectly checking with INFO when the
2648 * slave turns into a master. */
2649 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2650 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2651
2652 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2653 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2654 "%@");
2655 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2656 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2657 ri->failover_state_change_time = mstime();
2658 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2659 ri->promoted_slave = NULL;
2660 }
2661 }
2662
2663 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2664 int not_reconfigured = 0, timeout = 0;
2665 dictIterator *di;
2666 dictEntry *de;
2667 mstime_t elapsed = mstime() - master->failover_state_change_time;
2668
2669 /* We can't consider failover finished if the promoted slave is
2670 * not reachable. */
2671 if (master->promoted_slave == NULL ||
2672 master->promoted_slave->flags & SRI_S_DOWN) return;
2673
2674 /* The failover terminates once all the reachable slaves are properly
2675 * configured. */
2676 di = dictGetIterator(master->slaves);
2677 while((de = dictNext(di)) != NULL) {
2678 sentinelRedisInstance *slave = dictGetVal(de);
2679
2680 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2681 if (slave->flags & SRI_S_DOWN) continue;
2682 not_reconfigured++;
2683 }
2684 dictReleaseIterator(di);
2685
2686 /* Force end of failover on timeout. */
2687 if (elapsed > master->failover_timeout) {
2688 not_reconfigured = 0;
2689 timeout = 1;
2690 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2691 }
2692
2693 if (not_reconfigured == 0) {
2694 int role = (master->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
2695 SENTINEL_OBSERVER;
2696
2697 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2698 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2699 master->failover_state_change_time = mstime();
2700 sentinelCallClientReconfScript(master,role,"end",master->addr,
2701 master->promoted_slave->addr);
2702 }
2703
2704 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2705 * command to all the slaves still not reconfigured to replicate with
2706 * the new master. */
2707 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2708 dictIterator *di;
2709 dictEntry *de;
2710 char master_port[32];
2711
2712 ll2string(master_port,sizeof(master_port),
2713 master->promoted_slave->addr->port);
2714
2715 di = dictGetIterator(master->slaves);
2716 while((de = dictNext(di)) != NULL) {
2717 sentinelRedisInstance *slave = dictGetVal(de);
2718 int retval;
2719
2720 if (slave->flags &
2721 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2722
2723 retval = redisAsyncCommand(slave->cc,
2724 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2725 master->promoted_slave->addr->ip,
2726 master_port);
2727 if (retval == REDIS_OK) {
2728 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2729 slave->flags |= SRI_RECONF_SENT;
2730 }
2731 }
2732 dictReleaseIterator(di);
2733 }
2734 }
2735
2736 /* Send SLAVE OF <new master address> to all the remaining slaves that
2737 * still don't appear to have the configuration updated. */
2738 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2739 dictIterator *di;
2740 dictEntry *de;
2741 int in_progress = 0;
2742
2743 di = dictGetIterator(master->slaves);
2744 while((de = dictNext(di)) != NULL) {
2745 sentinelRedisInstance *slave = dictGetVal(de);
2746
2747 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2748 in_progress++;
2749 }
2750 dictReleaseIterator(di);
2751
2752 di = dictGetIterator(master->slaves);
2753 while(in_progress < master->parallel_syncs &&
2754 (de = dictNext(di)) != NULL)
2755 {
2756 sentinelRedisInstance *slave = dictGetVal(de);
2757 int retval;
2758 char master_port[32];
2759
2760 /* Skip the promoted slave, and already configured slaves. */
2761 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2762
2763 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2764 * the slave moving forward to the next state. */
2765 if ((slave->flags & SRI_RECONF_SENT) &&
2766 (mstime() - slave->slave_reconf_sent_time) >
2767 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2768 {
2769 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2770 slave->flags &= ~SRI_RECONF_SENT;
2771 }
2772
2773 /* Nothing to do for instances that are disconnected or already
2774 * in RECONF_SENT state. */
2775 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2776 continue;
2777
2778 /* Send SLAVEOF <new master>. */
2779 ll2string(master_port,sizeof(master_port),
2780 master->promoted_slave->addr->port);
2781 retval = redisAsyncCommand(slave->cc,
2782 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2783 master->promoted_slave->addr->ip,
2784 master_port);
2785 if (retval == REDIS_OK) {
2786 slave->flags |= SRI_RECONF_SENT;
2787 slave->pending_commands++;
2788 slave->slave_reconf_sent_time = mstime();
2789 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2790 in_progress++;
2791 }
2792 }
2793 dictReleaseIterator(di);
2794 sentinelFailoverDetectEnd(master);
2795 }
2796
2797 /* This function is called when the slave is in
2798 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2799 * to remove it from the master table and add the promoted slave instead.
2800 *
2801 * If there are no promoted slaves as this instance is unique, we remove
2802 * and re-add it with the same address to trigger a complete state
2803 * refresh. */
2804 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2805 sentinelRedisInstance *ref = master->promoted_slave ?
2806 master->promoted_slave : master;
2807
2808 sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
2809 master->name, master->addr->ip, master->addr->port,
2810 ref->addr->ip, ref->addr->port);
2811
2812 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
2813 }
2814
2815 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2816 redisAssert(ri->flags & SRI_MASTER);
2817
2818 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2819
2820 switch(ri->failover_state) {
2821 case SENTINEL_FAILOVER_STATE_WAIT_START:
2822 sentinelFailoverWaitStart(ri);
2823 break;
2824 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2825 sentinelFailoverSelectSlave(ri);
2826 break;
2827 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2828 sentinelFailoverSendSlaveOfNoOne(ri);
2829 break;
2830 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2831 sentinelFailoverWaitPromotion(ri);
2832 break;
2833 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2834 sentinelFailoverReconfNextSlave(ri);
2835 break;
2836 case SENTINEL_FAILOVER_STATE_DETECT_END:
2837 sentinelFailoverDetectEnd(ri);
2838 break;
2839 }
2840 }
2841
2842 /* Abort a failover in progress with the following steps:
2843 * 1) If this instance is the leaer send a SLAVEOF command to all the already
2844 * reconfigured slaves if any to configure them to replicate with the
2845 * original master.
2846 * 2) For both leaders and observers: clear the failover flags and state in
2847 * the master instance.
2848 * 3) If there is already a promoted slave and we are the leader, and this
2849 * slave is not DISCONNECTED, try to reconfigure it to replicate
2850 * back to the master as well, sending a best effort SLAVEOF command.
2851 */
2852 void sentinelAbortFailover(sentinelRedisInstance *ri) {
2853 char master_port[32];
2854 dictIterator *di;
2855 dictEntry *de;
2856 int sentinel_role;
2857
2858 redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
2859 ll2string(master_port,sizeof(master_port),ri->addr->port);
2860
2861 /* Clear failover related flags from slaves.
2862 * Also if we are the leader make sure to send SLAVEOF commands to all the
2863 * already reconfigured slaves in order to turn them back into slaves of
2864 * the original master. */
2865 di = dictGetIterator(ri->slaves);
2866 while((de = dictNext(di)) != NULL) {
2867 sentinelRedisInstance *slave = dictGetVal(de);
2868 if ((ri->flags & SRI_I_AM_THE_LEADER) &&
2869 !(slave->flags & SRI_DISCONNECTED) &&
2870 (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
2871 SRI_RECONF_DONE)))
2872 {
2873 int retval;
2874
2875 retval = redisAsyncCommand(slave->cc,
2876 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2877 ri->addr->ip,
2878 master_port);
2879 if (retval == REDIS_OK)
2880 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2881 }
2882 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2883 }
2884 dictReleaseIterator(di);
2885
2886 sentinel_role = (ri->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
2887 SENTINEL_OBSERVER;
2888 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER|SRI_FORCE_FAILOVER);
2889 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2890 ri->failover_state_change_time = mstime();
2891 if (ri->promoted_slave) {
2892 sentinelCallClientReconfScript(ri,sentinel_role,"abort",
2893 ri->promoted_slave->addr,ri->addr);
2894 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2895 ri->promoted_slave = NULL;
2896 }
2897 }
2898
2899 /* The following is called only for master instances and will abort the
2900 * failover process if:
2901 *
2902 * 1) The failover is in progress.
2903 * 2) We already promoted a slave.
2904 * 3) The promoted slave is in extended SDOWN condition.
2905 */
2906 void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2907 /* Failover is in progress? Do we have a promoted slave? */
2908 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2909
2910 /* Is the promoted slave into an extended SDOWN state? */
2911 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2912 (mstime() - ri->promoted_slave->s_down_since_time) <
2913 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2914
2915 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2916 sentinelAbortFailover(ri);
2917 }
2918
2919 /* ======================== SENTINEL timer handler ==========================
2920 * This is the "main" our Sentinel, being sentinel completely non blocking
2921 * in design. The function is called every second.
2922 * -------------------------------------------------------------------------- */
2923
2924 /* Perform scheduled operations for the specified Redis instance. */
2925 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2926 /* ========== MONITORING HALF ============ */
2927 /* Every kind of instance */
2928 sentinelReconnectInstance(ri);
2929 sentinelPingInstance(ri);
2930
2931 /* Masters and slaves */
2932 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2933 /* Nothing so far. */
2934 }
2935
2936 /* Only masters */
2937 if (ri->flags & SRI_MASTER) {
2938 sentinelAskMasterStateToOtherSentinels(ri);
2939 }
2940
2941 /* ============== ACTING HALF ============= */
2942 /* We don't proceed with the acting half if we are in TILT mode.
2943 * TILT happens when we find something odd with the time, like a
2944 * sudden change in the clock. */
2945 if (sentinel.tilt) {
2946 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2947 sentinel.tilt = 0;
2948 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2949 }
2950
2951 /* Every kind of instance */
2952 sentinelCheckSubjectivelyDown(ri);
2953
2954 /* Masters and slaves */
2955 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2956 /* Nothing so far. */
2957 }
2958
2959 /* Only masters */
2960 if (ri->flags & SRI_MASTER) {
2961 sentinelCheckObjectivelyDown(ri);
2962 sentinelStartFailoverIfNeeded(ri);
2963 sentinelFailoverStateMachine(ri);
2964 sentinelAbortFailoverIfNeeded(ri);
2965 }
2966 }
2967
2968 /* Perform scheduled operations for all the instances in the dictionary.
2969 * Recursively call the function against dictionaries of slaves. */
2970 void sentinelHandleDictOfRedisInstances(dict *instances) {
2971 dictIterator *di;
2972 dictEntry *de;
2973 sentinelRedisInstance *switch_to_promoted = NULL;
2974
2975 /* There are a number of things we need to perform against every master. */
2976 di = dictGetIterator(instances);
2977 while((de = dictNext(di)) != NULL) {
2978 sentinelRedisInstance *ri = dictGetVal(de);
2979
2980 sentinelHandleRedisInstance(ri);
2981 if (ri->flags & SRI_MASTER) {
2982 sentinelHandleDictOfRedisInstances(ri->slaves);
2983 sentinelHandleDictOfRedisInstances(ri->sentinels);
2984 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
2985 switch_to_promoted = ri;
2986 }
2987 }
2988 }
2989 if (switch_to_promoted)
2990 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
2991 dictReleaseIterator(di);
2992 }
2993
2994 /* This function checks if we need to enter the TITL mode.
2995 *
2996 * The TILT mode is entered if we detect that between two invocations of the
2997 * timer interrupt, a negative amount of time, or too much time has passed.
2998 * Note that we expect that more or less just 100 milliseconds will pass
2999 * if everything is fine. However we'll see a negative number or a
3000 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
3001 * following conditions happen:
3002 *
3003 * 1) The Sentiel process for some time is blocked, for every kind of
3004 * random reason: the load is huge, the computer was freezed for some time
3005 * in I/O or alike, the process was stopped by a signal. Everything.
3006 * 2) The system clock was altered significantly.
3007 *
3008 * Under both this conditions we'll see everything as timed out and failing
3009 * without good reasons. Instead we enter the TILT mode and wait
3010 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
3011 *
3012 * During TILT time we still collect information, we just do not act. */
3013 void sentinelCheckTiltCondition(void) {
3014 mstime_t now = mstime();
3015 mstime_t delta = now - sentinel.previous_time;
3016
3017 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
3018 sentinel.tilt = 1;
3019 sentinel.tilt_start_time = mstime();
3020 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
3021 }
3022 sentinel.previous_time = mstime();
3023 }
3024
3025 void sentinelTimer(void) {
3026 sentinelCheckTiltCondition();
3027 sentinelHandleDictOfRedisInstances(sentinel.masters);
3028 sentinelRunPendingScripts();
3029 sentinelCollectTerminatedScripts();
3030 sentinelKillTimedoutScripts();
3031 }
3032