]> git.saurik.com Git - redis.git/blob - src/sentinel.c
193320a5c9f76a83efcdde2a5a755b83be30dee3
[redis.git] / src / sentinel.c
1 /* Redis Sentinel implementation
2 * -----------------------------
3 *
4 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include "redis.h"
33 #include "hiredis.h"
34 #include "async.h"
35
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <sys/socket.h>
39 #include <sys/wait.h>
40
41 extern char **environ;
42
43 #define REDIS_SENTINEL_PORT 26379
44
45 /* ======================== Sentinel global state =========================== */
46
47 typedef long long mstime_t; /* millisecond time type. */
48
49 /* Address object, used to describe an ip:port pair. */
50 typedef struct sentinelAddr {
51 char *ip;
52 int port;
53 } sentinelAddr;
54
55 /* A Sentinel Redis Instance object is monitoring. */
56 #define SRI_MASTER (1<<0)
57 #define SRI_SLAVE (1<<1)
58 #define SRI_SENTINEL (1<<2)
59 #define SRI_DISCONNECTED (1<<3)
60 #define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
61 #define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
62 #define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
63 its master is down. */
64 /* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
65 * allowed to perform the failover for this master.
66 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
67 * perform the failover on its master. */
68 #define SRI_CAN_FAILOVER (1<<7)
69 #define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
70 this master. */
71 #define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
72 #define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
73 #define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
74 #define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
75 #define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
76
77 #define SENTINEL_INFO_PERIOD 10000
78 #define SENTINEL_PING_PERIOD 1000
79 #define SENTINEL_ASK_PERIOD 1000
80 #define SENTINEL_PUBLISH_PERIOD 5000
81 #define SENTINEL_DOWN_AFTER_PERIOD 30000
82 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
83 #define SENTINEL_TILT_TRIGGER 2000
84 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
85 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
86 #define SENTINEL_PROMOTION_RETRY_PERIOD 30000
87 #define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
88 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
89 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
90 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
91 #define SENTINEL_MAX_PENDING_COMMANDS 100
92 #define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
93
94 /* How many milliseconds is an information valid? This applies for instance
95 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
96 #define SENTINEL_INFO_VALIDITY_TIME 5000
97 #define SENTINEL_FAILOVER_FIXED_DELAY 5000
98 #define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
99
100 /* Failover machine different states. */
101 #define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
102 #define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
103 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
104 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
105 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
106 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
107 #define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
108 #define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
109 #define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
110 #define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
111 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
112
113 #define SENTINEL_MASTER_LINK_STATUS_UP 0
114 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
115
116 /* Generic flags that can be used with different functions. */
117 #define SENTINEL_NO_FLAGS 0
118 #define SENTINEL_GENERATE_EVENT 1
119
120 typedef struct sentinelRedisInstance {
121 int flags; /* See SRI_... defines */
122 char *name; /* Master name from the point of view of this sentinel. */
123 char *runid; /* run ID of this instance. */
124 sentinelAddr *addr; /* Master host. */
125 redisAsyncContext *cc; /* Hiredis context for commands. */
126 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
127 int pending_commands; /* Number of commands sent waiting for a reply. */
128 mstime_t cc_conn_time; /* cc connection time. */
129 mstime_t pc_conn_time; /* pc connection time. */
130 mstime_t pc_last_activity; /* Last time we received any message. */
131 mstime_t last_avail_time; /* Last time the instance replied to ping with
132 a reply we consider valid. */
133 mstime_t last_pong_time; /* Last time the instance replied to ping,
134 whatever the reply was. That's used to check
135 if the link is idle and must be reconnected. */
136 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
137 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
138 we received an hello from this Sentinel
139 via Pub/Sub. */
140 mstime_t last_master_down_reply_time; /* Time of last reply to
141 SENTINEL is-master-down command. */
142 mstime_t s_down_since_time; /* Subjectively down since time. */
143 mstime_t o_down_since_time; /* Objectively down since time. */
144 mstime_t down_after_period; /* Consider it down after that period. */
145 mstime_t info_refresh; /* Time at which we received INFO output from it. */
146
147 /* Master specific. */
148 dict *sentinels; /* Other sentinels monitoring the same master. */
149 dict *slaves; /* Slaves for this master instance. */
150 int quorum; /* Number of sentinels that need to agree on failure. */
151 int parallel_syncs; /* How many slaves to reconfigure at same time. */
152
153 /* Slave specific. */
154 mstime_t master_link_down_time; /* Slave replication link down time. */
155 int slave_priority; /* Slave priority according to its INFO output. */
156 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
157 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
158 char *slave_master_host; /* Master host as reported by INFO */
159 int slave_master_port; /* Master port as reported by INFO */
160 int slave_master_link_status; /* Master link status as reported by INFO */
161 /* Failover */
162 char *leader; /* If this is a master instance, this is the runid of
163 the Sentinel that should perform the failover. If
164 this is a Sentinel, this is the runid of the Sentinel
165 that this other Sentinel is voting as leader.
166 This field is valid only if SRI_MASTER_DOWN is
167 set on the Sentinel instance. */
168 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
169 mstime_t failover_state_change_time;
170 mstime_t failover_start_time; /* When to start to failover if leader. */
171 mstime_t failover_timeout; /* Max time to refresh failover state. */
172 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
173 /* Scripts executed to notify admin or reconfigure clients: when they
174 * are set to NULL no script is executed. */
175 char *notification_script;
176 char *client_reconfig_script;
177 } sentinelRedisInstance;
178
179 /* Main state. */
180 struct sentinelState {
181 dict *masters; /* Dictionary of master sentinelRedisInstances.
182 Key is the instance name, value is the
183 sentinelRedisInstance structure pointer. */
184 int tilt; /* Are we in TILT mode? */
185 mstime_t tilt_start_time; /* When TITL started. */
186 mstime_t previous_time; /* Time last time we ran the time handler. */
187 } sentinel;
188
189 /* ======================= hiredis ae.c adapters =============================
190 * Note: this implementation is taken from hiredis/adapters/ae.h, however
191 * we have our modified copy for Sentinel in order to use our allocator
192 * and to have full control over how the adapter works. */
193
194 typedef struct redisAeEvents {
195 redisAsyncContext *context;
196 aeEventLoop *loop;
197 int fd;
198 int reading, writing;
199 } redisAeEvents;
200
201 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
202 ((void)el); ((void)fd); ((void)mask);
203
204 redisAeEvents *e = (redisAeEvents*)privdata;
205 redisAsyncHandleRead(e->context);
206 }
207
208 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
209 ((void)el); ((void)fd); ((void)mask);
210
211 redisAeEvents *e = (redisAeEvents*)privdata;
212 redisAsyncHandleWrite(e->context);
213 }
214
215 static void redisAeAddRead(void *privdata) {
216 redisAeEvents *e = (redisAeEvents*)privdata;
217 aeEventLoop *loop = e->loop;
218 if (!e->reading) {
219 e->reading = 1;
220 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
221 }
222 }
223
224 static void redisAeDelRead(void *privdata) {
225 redisAeEvents *e = (redisAeEvents*)privdata;
226 aeEventLoop *loop = e->loop;
227 if (e->reading) {
228 e->reading = 0;
229 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
230 }
231 }
232
233 static void redisAeAddWrite(void *privdata) {
234 redisAeEvents *e = (redisAeEvents*)privdata;
235 aeEventLoop *loop = e->loop;
236 if (!e->writing) {
237 e->writing = 1;
238 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
239 }
240 }
241
242 static void redisAeDelWrite(void *privdata) {
243 redisAeEvents *e = (redisAeEvents*)privdata;
244 aeEventLoop *loop = e->loop;
245 if (e->writing) {
246 e->writing = 0;
247 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
248 }
249 }
250
251 static void redisAeCleanup(void *privdata) {
252 redisAeEvents *e = (redisAeEvents*)privdata;
253 redisAeDelRead(privdata);
254 redisAeDelWrite(privdata);
255 zfree(e);
256 }
257
258 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
259 redisContext *c = &(ac->c);
260 redisAeEvents *e;
261
262 /* Nothing should be attached when something is already attached */
263 if (ac->ev.data != NULL)
264 return REDIS_ERR;
265
266 /* Create container for context and r/w events */
267 e = (redisAeEvents*)zmalloc(sizeof(*e));
268 e->context = ac;
269 e->loop = loop;
270 e->fd = c->fd;
271 e->reading = e->writing = 0;
272
273 /* Register functions to start/stop listening for events */
274 ac->ev.addRead = redisAeAddRead;
275 ac->ev.delRead = redisAeDelRead;
276 ac->ev.addWrite = redisAeAddWrite;
277 ac->ev.delWrite = redisAeDelWrite;
278 ac->ev.cleanup = redisAeCleanup;
279 ac->ev.data = e;
280
281 return REDIS_OK;
282 }
283
284 /* ============================= Prototypes ================================= */
285
286 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
287 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
288 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
289 sentinelRedisInstance *sentinelGetMasterByName(char *name);
290 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
291 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
292 int yesnotoi(char *s);
293 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
294 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
295 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
296 void sentinelAbortFailover(sentinelRedisInstance *ri);
297 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
298 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
299
300 /* ========================= Dictionary types =============================== */
301
302 unsigned int dictSdsHash(const void *key);
303 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
304 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
305
306 void dictInstancesValDestructor (void *privdata, void *obj) {
307 releaseSentinelRedisInstance(obj);
308 }
309
310 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
311 *
312 * also used for: sentinelRedisInstance->sentinels dictionary that maps
313 * sentinels ip:port to last seen time in Pub/Sub hello message. */
314 dictType instancesDictType = {
315 dictSdsHash, /* hash function */
316 NULL, /* key dup */
317 NULL, /* val dup */
318 dictSdsKeyCompare, /* key compare */
319 NULL, /* key destructor */
320 dictInstancesValDestructor /* val destructor */
321 };
322
323 /* Instance runid (sds) -> votes (long casted to void*)
324 *
325 * This is useful into sentinelGetObjectiveLeader() function in order to
326 * count the votes and understand who is the leader. */
327 dictType leaderVotesDictType = {
328 dictSdsHash, /* hash function */
329 NULL, /* key dup */
330 NULL, /* val dup */
331 dictSdsKeyCompare, /* key compare */
332 NULL, /* key destructor */
333 NULL /* val destructor */
334 };
335
336 /* =========================== Initialization =============================== */
337
338 void sentinelCommand(redisClient *c);
339
340 struct redisCommand sentinelcmds[] = {
341 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
342 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
343 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
344 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
345 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
346 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}
347 };
348
349 /* This function overwrites a few normal Redis config default with Sentinel
350 * specific defaults. */
351 void initSentinelConfig(void) {
352 server.port = REDIS_SENTINEL_PORT;
353 }
354
355 /* Perform the Sentinel mode initialization. */
356 void initSentinel(void) {
357 int j;
358
359 /* Remove usual Redis commands from the command table, then just add
360 * the SENTINEL command. */
361 dictEmpty(server.commands);
362 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
363 int retval;
364 struct redisCommand *cmd = sentinelcmds+j;
365
366 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
367 redisAssert(retval == DICT_OK);
368 }
369
370 /* Initialize various data structures. */
371 sentinel.masters = dictCreate(&instancesDictType,NULL);
372 sentinel.tilt = 0;
373 sentinel.tilt_start_time = mstime();
374 sentinel.previous_time = mstime();
375 }
376
377 /* ============================== sentinelAddr ============================== */
378
379 /* Create a sentinelAddr object and return it on success.
380 * On error NULL is returned and errno is set to:
381 * ENOENT: Can't resolve the hostname.
382 * EINVAL: Invalid port number.
383 */
384 sentinelAddr *createSentinelAddr(char *hostname, int port) {
385 char buf[32];
386 sentinelAddr *sa;
387
388 if (port <= 0 || port > 65535) {
389 errno = EINVAL;
390 return NULL;
391 }
392 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
393 errno = ENOENT;
394 return NULL;
395 }
396 sa = zmalloc(sizeof(*sa));
397 sa->ip = sdsnew(buf);
398 sa->port = port;
399 return sa;
400 }
401
402 /* Free a Sentinel address. Can't fail. */
403 void releaseSentinelAddr(sentinelAddr *sa) {
404 sdsfree(sa->ip);
405 zfree(sa);
406 }
407
408 /* =========================== Events notification ========================== */
409
410 void sentinelCallNotificationScript(char *scriptpath, char *type, char *msg) {
411 pid_t pid = fork();
412
413 if (pid == -1) {
414 /* Parent on error. */
415 sentinelEvent(REDIS_WARNING,"-notification-script-error",NULL,
416 "#can't fork: %s",strerror(errno));
417 return;
418 } else if (pid == 0) {
419 /* Child */
420 char *argv[4];
421
422 argv[0] = scriptpath;
423 argv[1] = type;
424 argv[2] = msg;
425 argv[3] = NULL;
426 execve(scriptpath,argv,environ);
427 /* If we are here an error occurred. */
428 sentinelEvent(REDIS_WARNING,"-notification-script-error",NULL,
429 "#execve(2): %s",strerror(errno));
430 _exit(1);
431 } else {
432 sentinelEvent(REDIS_DEBUG,"+child",NULL,"%ld",(long)pid);
433 }
434 }
435
436 /* Send an event to log, pub/sub, user notification script.
437 *
438 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
439 * the execution of the user notification script.
440 *
441 * 'type' is the message type, also used as a pub/sub channel name.
442 *
443 * 'ri', is the redis instance target of this event if applicable, and is
444 * used to obtain the path of the notification script to execute.
445 *
446 * The remaining arguments are printf-alike.
447 * If the format specifier starts with the two characters "%@" then ri is
448 * not NULL, and the message is prefixed with an instance identifier in the
449 * following format:
450 *
451 * <instance type> <instance name> <ip> <port>
452 *
453 * If the instance type is not master, than the additional string is
454 * added to specify the originating master:
455 *
456 * @ <master name> <master ip> <master port>
457 *
458 * Any other specifier after "%@" is processed by printf itself.
459 */
460 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
461 const char *fmt, ...) {
462 va_list ap;
463 char msg[REDIS_MAX_LOGMSG_LEN];
464 robj *channel, *payload;
465
466 /* Handle %@ */
467 if (fmt[0] == '%' && fmt[1] == '@') {
468 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
469 NULL : ri->master;
470
471 if (master) {
472 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
473 sentinelRedisInstanceTypeStr(ri),
474 ri->name, ri->addr->ip, ri->addr->port,
475 master->name, master->addr->ip, master->addr->port);
476 } else {
477 snprintf(msg, sizeof(msg), "%s %s %s %d",
478 sentinelRedisInstanceTypeStr(ri),
479 ri->name, ri->addr->ip, ri->addr->port);
480 }
481 fmt += 2;
482 } else {
483 msg[0] = '\0';
484 }
485
486 /* Use vsprintf for the rest of the formatting if any. */
487 if (fmt[0] != '\0') {
488 va_start(ap, fmt);
489 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
490 va_end(ap);
491 }
492
493 /* Log the message if the log level allows it to be logged. */
494 if (level >= server.verbosity)
495 redisLog(level,"%s %s",type,msg);
496
497 /* Publish the message via Pub/Sub if it's not a debugging one. */
498 if (level != REDIS_DEBUG) {
499 channel = createStringObject(type,strlen(type));
500 payload = createStringObject(msg,strlen(msg));
501 pubsubPublishMessage(channel,payload);
502 decrRefCount(channel);
503 decrRefCount(payload);
504 }
505
506 /* Call the notification script if applicable. */
507 if (level == REDIS_WARNING && ri != NULL) {
508 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
509 ri : ri->master;
510 if (master->notification_script) {
511 sentinelCallNotificationScript(master->notification_script,
512 type,msg);
513 }
514 }
515 }
516
517 /* ========================== sentinelRedisInstance ========================= */
518
519 /* Create a redis instance, the following fields must be populated by the
520 * caller if needed:
521 * runid: set to NULL but will be populated once INFO output is received.
522 * info_refresh: is set to 0 to mean that we never received INFO so far.
523 *
524 * If SRI_MASTER is set into initial flags the instance is added to
525 * sentinel.masters table.
526 *
527 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
528 * instance is added into master->slaves or master->sentinels table.
529 *
530 * If the instance is a slave or sentinel, the name parameter is ignored and
531 * is created automatically as hostname:port.
532 *
533 * The function fails if hostname can't be resolved or port is out of range.
534 * When this happens NULL is returned and errno is set accordingly to the
535 * createSentinelAddr() function.
536 *
537 * The function may also fail and return NULL with errno set to EBUSY if
538 * a master or slave with the same name already exists. */
539 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
540 sentinelRedisInstance *ri;
541 sentinelAddr *addr;
542 dict *table;
543 char slavename[128], *sdsname;
544
545 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
546 redisAssert((flags & SRI_MASTER) || master != NULL);
547
548 /* Check address validity. */
549 addr = createSentinelAddr(hostname,port);
550 if (addr == NULL) return NULL;
551
552 /* For slaves and sentinel we use ip:port as name. */
553 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
554 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
555 name = slavename;
556 }
557
558 /* Make sure the entry is not duplicated. This may happen when the same
559 * name for a master is used multiple times inside the configuration or
560 * if we try to add multiple times a slave or sentinel with same ip/port
561 * to a master. */
562 if (flags & SRI_MASTER) table = sentinel.masters;
563 else if (flags & SRI_SLAVE) table = master->slaves;
564 else if (flags & SRI_SENTINEL) table = master->sentinels;
565 sdsname = sdsnew(name);
566 if (dictFind(table,sdsname)) {
567 sdsfree(sdsname);
568 errno = EBUSY;
569 return NULL;
570 }
571
572 /* Create the instance object. */
573 ri = zmalloc(sizeof(*ri));
574 /* Note that all the instances are started in the disconnected state,
575 * the event loop will take care of connecting them. */
576 ri->flags = flags | SRI_DISCONNECTED;
577 ri->name = sdsname;
578 ri->runid = NULL;
579 ri->addr = addr;
580 ri->cc = NULL;
581 ri->pc = NULL;
582 ri->pending_commands = 0;
583 ri->cc_conn_time = 0;
584 ri->pc_conn_time = 0;
585 ri->pc_last_activity = 0;
586 ri->last_avail_time = mstime();
587 ri->last_pong_time = mstime();
588 ri->last_pub_time = mstime();
589 ri->last_hello_time = mstime();
590 ri->last_master_down_reply_time = mstime();
591 ri->s_down_since_time = 0;
592 ri->o_down_since_time = 0;
593 ri->down_after_period = master ? master->down_after_period :
594 SENTINEL_DOWN_AFTER_PERIOD;
595 ri->master_link_down_time = 0;
596 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
597 ri->slave_reconf_sent_time = 0;
598 ri->slave_master_host = NULL;
599 ri->slave_master_port = 0;
600 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
601 ri->sentinels = dictCreate(&instancesDictType,NULL);
602 ri->quorum = quorum;
603 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
604 ri->master = master;
605 ri->slaves = dictCreate(&instancesDictType,NULL);
606 ri->info_refresh = 0;
607
608 /* Failover state. */
609 ri->leader = NULL;
610 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
611 ri->failover_state_change_time = 0;
612 ri->failover_start_time = 0;
613 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
614 ri->promoted_slave = NULL;
615 ri->notification_script = NULL;
616 ri->client_reconfig_script = NULL;
617
618 /* Add into the right table. */
619 dictAdd(table, ri->name, ri);
620 return ri;
621 }
622
623 /* Release this instance and all its slaves, sentinels, hiredis connections.
624 * This function also takes care of unlinking the instance from the main
625 * masters table (if it is a master) or from its master sentinels/slaves table
626 * if it is a slave or sentinel. */
627 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
628 /* Release all its slaves or sentinels if any. */
629 dictRelease(ri->sentinels);
630 dictRelease(ri->slaves);
631
632 /* Release hiredis connections. */
633 if (ri->cc) sentinelKillLink(ri,ri->cc);
634 if (ri->pc) sentinelKillLink(ri,ri->pc);
635
636 /* Free other resources. */
637 sdsfree(ri->name);
638 sdsfree(ri->runid);
639 sdsfree(ri->notification_script);
640 sdsfree(ri->client_reconfig_script);
641 sdsfree(ri->slave_master_host);
642 sdsfree(ri->leader);
643 releaseSentinelAddr(ri->addr);
644
645 /* Clear state into the master if needed. */
646 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
647 ri->master->promoted_slave = NULL;
648
649 zfree(ri);
650 }
651
652 /* Lookup a slave in a master Redis instance, by ip and port. */
653 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
654 sentinelRedisInstance *ri, char *ip, int port)
655 {
656 sds key;
657 sentinelRedisInstance *slave;
658
659 redisAssert(ri->flags & SRI_MASTER);
660 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
661 slave = dictFetchValue(ri->slaves,key);
662 sdsfree(key);
663 return slave;
664 }
665
666 /* Return the name of the type of the instance as a string. */
667 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
668 if (ri->flags & SRI_MASTER) return "master";
669 else if (ri->flags & SRI_SLAVE) return "slave";
670 else if (ri->flags & SRI_SENTINEL) return "sentinel";
671 else return "unknown";
672 }
673
674 /* This function removes all the instances found in the dictionary of instances
675 * 'd', having either:
676 *
677 * 1) The same ip/port as specified.
678 * 2) The same runid.
679 *
680 * "1" and "2" don't need to verify at the same time, just one is enough.
681 * If "runid" is NULL it is not checked.
682 * Similarly if "ip" is NULL it is not checked.
683 *
684 * This function is useful because every time we add a new Sentinel into
685 * a master's Sentinels dictionary, we want to be very sure about not
686 * having duplicated instances for any reason. This is so important because
687 * we use those other sentinels in order to run our quorum protocol to
688 * understand if it's time to proceeed with the fail over.
689 *
690 * Making sure no duplication is possible we greately improve the robustness
691 * of the quorum (otherwise we may end counting the same instance multiple
692 * times for some reason).
693 *
694 * The function returns the number of Sentinels removed. */
695 int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
696 dictIterator *di;
697 dictEntry *de;
698 int removed = 0;
699
700 di = dictGetSafeIterator(master->sentinels);
701 while((de = dictNext(di)) != NULL) {
702 sentinelRedisInstance *ri = dictGetVal(de);
703
704 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
705 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
706 {
707 dictDelete(master->sentinels,ri->name);
708 removed++;
709 }
710 }
711 dictReleaseIterator(di);
712 return removed;
713 }
714
715 /* Search an instance with the same runid, ip and port into a dictionary
716 * of instances. Return NULL if not found, otherwise return the instance
717 * pointer.
718 *
719 * runid or ip can be NULL. In such a case the search is performed only
720 * by the non-NULL field. */
721 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
722 dictIterator *di;
723 dictEntry *de;
724 sentinelRedisInstance *instance = NULL;
725
726 redisAssert(ip || runid); /* User must pass at least one search param. */
727 di = dictGetIterator(instances);
728 while((de = dictNext(di)) != NULL) {
729 sentinelRedisInstance *ri = dictGetVal(de);
730
731 if (runid && !ri->runid) continue;
732 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
733 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
734 ri->addr->port == port)))
735 {
736 instance = ri;
737 break;
738 }
739 }
740 dictReleaseIterator(di);
741 return instance;
742 }
743
744 /* Simple master lookup by name */
745 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
746 sentinelRedisInstance *ri;
747 sds sdsname = sdsnew(name);
748
749 ri = dictFetchValue(sentinel.masters,sdsname);
750 sdsfree(sdsname);
751 return ri;
752 }
753
754 /* Add the specified flags to all the instances in the specified dictionary. */
755 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
756 dictIterator *di;
757 dictEntry *de;
758
759 di = dictGetIterator(instances);
760 while((de = dictNext(di)) != NULL) {
761 sentinelRedisInstance *ri = dictGetVal(de);
762 ri->flags |= flags;
763 }
764 dictReleaseIterator(di);
765 }
766
767 /* Remove the specified flags to all the instances in the specified
768 * dictionary. */
769 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
770 dictIterator *di;
771 dictEntry *de;
772
773 di = dictGetIterator(instances);
774 while((de = dictNext(di)) != NULL) {
775 sentinelRedisInstance *ri = dictGetVal(de);
776 ri->flags &= ~flags;
777 }
778 dictReleaseIterator(di);
779 }
780
781 /* Reset the state of a monitored master:
782 * 1) Remove all slaves.
783 * 2) Remove all sentinels.
784 * 3) Remove most of the flags resulting from runtime operations.
785 * 4) Reset timers to their default value.
786 * 5) In the process of doing this undo the failover if in progress.
787 * 6) Disconnect the connections with the master (will reconnect automatically).
788 */
789 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
790 redisAssert(ri->flags & SRI_MASTER);
791 dictRelease(ri->slaves);
792 dictRelease(ri->sentinels);
793 ri->slaves = dictCreate(&instancesDictType,NULL);
794 ri->sentinels = dictCreate(&instancesDictType,NULL);
795 if (ri->cc) sentinelKillLink(ri,ri->cc);
796 if (ri->pc) sentinelKillLink(ri,ri->pc);
797 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
798 if (ri->leader) {
799 sdsfree(ri->leader);
800 ri->leader = NULL;
801 }
802 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
803 ri->failover_state_change_time = 0;
804 ri->failover_start_time = 0;
805 ri->promoted_slave = NULL;
806 sdsfree(ri->runid);
807 sdsfree(ri->slave_master_host);
808 ri->runid = NULL;
809 ri->slave_master_host = NULL;
810 ri->last_avail_time = mstime();
811 ri->last_pong_time = mstime();
812 if (flags & SENTINEL_GENERATE_EVENT)
813 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
814 }
815
816 /* Call sentinelResetMaster() on every master with a name matching the specified
817 * pattern. */
818 int sentinelResetMastersByPattern(char *pattern, int flags) {
819 dictIterator *di;
820 dictEntry *de;
821 int reset = 0;
822
823 di = dictGetIterator(sentinel.masters);
824 while((de = dictNext(di)) != NULL) {
825 sentinelRedisInstance *ri = dictGetVal(de);
826
827 if (ri->name) {
828 if (stringmatch(pattern,ri->name,0)) {
829 sentinelResetMaster(ri,flags);
830 reset++;
831 }
832 }
833 }
834 dictReleaseIterator(di);
835 return reset;
836 }
837
838 /* Reset the specified master with sentinelResetMaster(), and also change
839 * the ip:port address, but take the name of the instance unmodified.
840 *
841 * This is used to handle the +switch-master and +redirect-to-master events.
842 *
843 * The function returns REDIS_ERR if the address can't be resolved for some
844 * reason. Otherwise REDIS_OK is returned.
845 *
846 * TODO: make this reset so that original sentinels are re-added with
847 * same ip / port / runid.
848 */
849
850 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
851 sentinelAddr *oldaddr, *newaddr;
852
853 newaddr = createSentinelAddr(ip,port);
854 if (newaddr == NULL) return REDIS_ERR;
855 sentinelResetMaster(master,SENTINEL_NO_FLAGS);
856 oldaddr = master->addr;
857 master->addr = newaddr;
858 /* Release the old address at the end so we are safe even if the function
859 * gets the master->addr->ip and master->addr->port as arguments. */
860 releaseSentinelAddr(oldaddr);
861 return REDIS_OK;
862 }
863
864 /* ============================ Config handling ============================= */
865 char *sentinelHandleConfiguration(char **argv, int argc) {
866 sentinelRedisInstance *ri;
867
868 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
869 /* monitor <name> <host> <port> <quorum> */
870 int quorum = atoi(argv[4]);
871
872 if (quorum <= 0) return "Quorum must be 1 or greater.";
873 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
874 atoi(argv[3]),quorum,NULL) == NULL)
875 {
876 switch(errno) {
877 case EBUSY: return "Duplicated master name.";
878 case ENOENT: return "Can't resolve master instance hostname.";
879 case EINVAL: return "Invalid port number";
880 }
881 }
882 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
883 /* down-after-milliseconds <name> <milliseconds> */
884 ri = sentinelGetMasterByName(argv[1]);
885 if (!ri) return "No such master with specified name.";
886 ri->down_after_period = atoi(argv[2]);
887 if (ri->down_after_period <= 0)
888 return "negative or zero time parameter.";
889 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
890 /* failover-timeout <name> <milliseconds> */
891 ri = sentinelGetMasterByName(argv[1]);
892 if (!ri) return "No such master with specified name.";
893 ri->failover_timeout = atoi(argv[2]);
894 if (ri->failover_timeout <= 0)
895 return "negative or zero time parameter.";
896 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
897 /* can-failover <name> <yes/no> */
898 int yesno = yesnotoi(argv[2]);
899
900 ri = sentinelGetMasterByName(argv[1]);
901 if (!ri) return "No such master with specified name.";
902 if (yesno == -1) return "Argument must be either yes or no.";
903 if (yesno)
904 ri->flags |= SRI_CAN_FAILOVER;
905 else
906 ri->flags &= ~SRI_CAN_FAILOVER;
907 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
908 /* parallel-syncs <name> <milliseconds> */
909 ri = sentinelGetMasterByName(argv[1]);
910 if (!ri) return "No such master with specified name.";
911 ri->parallel_syncs = atoi(argv[2]);
912 } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
913 /* notification-script <name> <path> */
914 ri = sentinelGetMasterByName(argv[1]);
915 if (!ri) return "No such master with specified name.";
916 if (access(argv[2],X_OK) == -1)
917 return "Notification script seems non existing or non executable.";
918 ri->notification_script = sdsnew(argv[2]);
919 } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
920 /* client-reconfig-script <name> <path> */
921 ri = sentinelGetMasterByName(argv[1]);
922 if (!ri) return "No such master with specified name.";
923 if (access(argv[2],X_OK) == -1)
924 return "Client reconfiguration script seems non existing or "
925 "non executable.";
926 ri->client_reconfig_script = sdsnew(argv[2]);
927 } else {
928 return "Unrecognized sentinel configuration statement.";
929 }
930 return NULL;
931 }
932
933 /* ====================== hiredis connection handling ======================= */
934
935 /* Completely disconnect an hiredis link from an instance. */
936 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
937 if (ri->cc == c) {
938 ri->cc = NULL;
939 ri->pending_commands = 0;
940 }
941 if (ri->pc == c) ri->pc = NULL;
942 c->data = NULL;
943 ri->flags |= SRI_DISCONNECTED;
944 redisAsyncFree(c);
945 }
946
947 /* This function takes an hiredis context that is in an error condition
948 * and make sure to mark the instance as disconnected performing the
949 * cleanup needed.
950 *
951 * Note: we don't free the hiredis context as hiredis will do it for us
952 * for async conenctions. */
953 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
954 sentinelRedisInstance *ri = c->data;
955 int pubsub;
956
957 if (ri == NULL) return; /* The instance no longer exists. */
958
959 pubsub = (ri->pc == c);
960 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
961 "%@ #%s", c->errstr);
962 if (pubsub)
963 ri->pc = NULL;
964 else
965 ri->cc = NULL;
966 ri->flags |= SRI_DISCONNECTED;
967 }
968
969 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
970 if (status != REDIS_OK) {
971 sentinelDisconnectInstanceFromContext(c);
972 } else {
973 sentinelRedisInstance *ri = c->data;
974 int pubsub = (ri->pc == c);
975
976 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
977 "%@");
978 }
979 }
980
981 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
982 sentinelDisconnectInstanceFromContext(c);
983 }
984
985 /* Create the async connections for the specified instance if the instance
986 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
987 * one of the two links (commands and pub/sub) is missing. */
988 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
989 if (!(ri->flags & SRI_DISCONNECTED)) return;
990
991 /* Commands connection. */
992 if (ri->cc == NULL) {
993 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
994 if (ri->cc->err) {
995 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
996 ri->cc->errstr);
997 sentinelKillLink(ri,ri->cc);
998 } else {
999 ri->cc_conn_time = mstime();
1000 ri->cc->data = ri;
1001 redisAeAttach(server.el,ri->cc);
1002 redisAsyncSetConnectCallback(ri->cc,
1003 sentinelLinkEstablishedCallback);
1004 redisAsyncSetDisconnectCallback(ri->cc,
1005 sentinelDisconnectCallback);
1006 }
1007 }
1008 /* Pub / Sub */
1009 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
1010 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1011 if (ri->pc->err) {
1012 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
1013 ri->pc->errstr);
1014 sentinelKillLink(ri,ri->pc);
1015 } else {
1016 int retval;
1017
1018 ri->pc_conn_time = mstime();
1019 ri->pc->data = ri;
1020 redisAeAttach(server.el,ri->pc);
1021 redisAsyncSetConnectCallback(ri->pc,
1022 sentinelLinkEstablishedCallback);
1023 redisAsyncSetDisconnectCallback(ri->pc,
1024 sentinelDisconnectCallback);
1025 /* Now we subscribe to the Sentinels "Hello" channel. */
1026 retval = redisAsyncCommand(ri->pc,
1027 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
1028 SENTINEL_HELLO_CHANNEL);
1029 if (retval != REDIS_OK) {
1030 /* If we can't subscribe, the Pub/Sub connection is useless
1031 * and we can simply disconnect it and try again. */
1032 sentinelKillLink(ri,ri->pc);
1033 return;
1034 }
1035 }
1036 }
1037 /* Clear the DISCONNECTED flags only if we have both the connections
1038 * (or just the commands connection if this is a slave or a
1039 * sentinel instance). */
1040 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
1041 ri->flags &= ~SRI_DISCONNECTED;
1042 }
1043
1044 /* ======================== Redis instances pinging ======================== */
1045
1046 /* Process the INFO output from masters. */
1047 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
1048 sds *lines;
1049 int numlines, j;
1050 int role = 0;
1051 int runid_changed = 0; /* true if runid changed. */
1052 int first_runid = 0; /* true if this is the first runid we receive. */
1053
1054 /* The following fields must be reset to a given value in the case they
1055 * are not found at all in the INFO output. */
1056 ri->master_link_down_time = 0;
1057
1058 /* Process line by line. */
1059 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
1060 for (j = 0; j < numlines; j++) {
1061 sentinelRedisInstance *slave;
1062 sds l = lines[j];
1063
1064 /* run_id:<40 hex chars>*/
1065 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
1066 if (ri->runid == NULL) {
1067 ri->runid = sdsnewlen(l+7,40);
1068 first_runid = 1;
1069 } else {
1070 if (strncmp(ri->runid,l+7,40) != 0) {
1071 runid_changed = 1;
1072 sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
1073 sdsfree(ri->runid);
1074 ri->runid = sdsnewlen(l+7,40);
1075 }
1076 }
1077 }
1078
1079 /* slave0:<ip>,<port>,<state> */
1080 if ((ri->flags & SRI_MASTER) &&
1081 sdslen(l) >= 7 &&
1082 !memcmp(l,"slave",5) && isdigit(l[5]))
1083 {
1084 char *ip, *port, *end;
1085
1086 ip = strchr(l,':'); if (!ip) continue;
1087 ip++; /* Now ip points to start of ip address. */
1088 port = strchr(ip,','); if (!port) continue;
1089 *port = '\0'; /* nul term for easy access. */
1090 port++; /* Now port points to start of port number. */
1091 end = strchr(port,','); if (!end) continue;
1092 *end = '\0'; /* nul term for easy access. */
1093
1094 /* Check if we already have this slave into our table,
1095 * otherwise add it. */
1096 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1097 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1098 atoi(port), ri->quorum,ri)) != NULL)
1099 {
1100 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1101 }
1102 }
1103 }
1104
1105 /* master_link_down_since_seconds:<seconds> */
1106 if (sdslen(l) >= 32 &&
1107 !memcmp(l,"master_link_down_since_seconds",30))
1108 {
1109 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1110 }
1111
1112 /* role:<role> */
1113 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1114 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1115
1116 if (role == SRI_SLAVE) {
1117 /* master_host:<host> */
1118 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1119 sdsfree(ri->slave_master_host);
1120 ri->slave_master_host = sdsnew(l+12);
1121 }
1122
1123 /* master_port:<port> */
1124 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1125 ri->slave_master_port = atoi(l+12);
1126
1127 /* master_link_status:<status> */
1128 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1129 ri->slave_master_link_status =
1130 (strcasecmp(l+19,"up") == 0) ?
1131 SENTINEL_MASTER_LINK_STATUS_UP :
1132 SENTINEL_MASTER_LINK_STATUS_DOWN;
1133 }
1134 }
1135 }
1136 ri->info_refresh = mstime();
1137 sdsfreesplitres(lines,numlines);
1138
1139 if (sentinel.tilt) return;
1140
1141 /* Act if a master turned into a slave. */
1142 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
1143 if (first_runid && ri->slave_master_host) {
1144 /* If it is the first time we receive INFO from it, but it's
1145 * a slave while it was configured as a master, we want to monitor
1146 * its master instead. */
1147 sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
1148 "%s %s %d %s %d",
1149 ri->name, ri->addr->ip, ri->addr->port,
1150 ri->slave_master_host, ri->slave_master_port);
1151 sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
1152 ri->slave_master_port);
1153 return;
1154 }
1155 }
1156
1157 /* Act if a slave turned into a master. */
1158 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
1159 if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1160 (runid_changed || first_runid))
1161 {
1162 /* If a slave turned into a master, but at the same time the
1163 * runid has changed, or it is simply the first time we see and
1164 * INFO output from this instance, this is a reboot with a wrong
1165 * configuration.
1166 *
1167 * Log the event and remove the slave. */
1168 int retval;
1169
1170 sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
1171 retval = dictDelete(ri->master->slaves,ri->name);
1172 redisAssert(retval == REDIS_OK);
1173 return;
1174 } else if (ri->flags & SRI_PROMOTED) {
1175 /* If this is a promoted slave we can change state to the
1176 * failover state machine. */
1177 if (ri->master &&
1178 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1179 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1180 (ri->master->failover_state ==
1181 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1182 {
1183 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1184 ri->master->failover_state_change_time = mstime();
1185 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1186 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1187 ri->master,"%@");
1188 }
1189 } else {
1190 /* Otherwise we interpret this as the start of the failover. */
1191 if (ri->master &&
1192 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) == 0)
1193 {
1194 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1195 sentinelEvent(REDIS_WARNING,"failover-detected",ri->master,"%@");
1196 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1197 ri->master->failover_state_change_time = mstime();
1198 ri->master->promoted_slave = ri;
1199 ri->flags |= SRI_PROMOTED;
1200 /* We are an observer, so we can only assume that the leader
1201 * is reconfiguring the slave instances. For this reason we
1202 * set all the instances as RECONF_SENT waiting for progresses
1203 * on this side. */
1204 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1205 SRI_RECONF_SENT);
1206 }
1207 }
1208 }
1209
1210 /* Detect if the slave that is in the process of being reconfigured
1211 * changed state. */
1212 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1213 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1214 {
1215 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1216 if ((ri->flags & SRI_RECONF_SENT) &&
1217 ri->slave_master_host &&
1218 strcmp(ri->slave_master_host,
1219 ri->master->promoted_slave->addr->ip) == 0 &&
1220 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1221 {
1222 ri->flags &= ~SRI_RECONF_SENT;
1223 ri->flags |= SRI_RECONF_INPROG;
1224 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1225 }
1226
1227 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1228 if ((ri->flags & SRI_RECONF_INPROG) &&
1229 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1230 {
1231 ri->flags &= ~SRI_RECONF_INPROG;
1232 ri->flags |= SRI_RECONF_DONE;
1233 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1234 /* If we are moving forward (a new slave is now configured)
1235 * we update the change_time as we are conceptually passing
1236 * to the next slave. */
1237 ri->failover_state_change_time = mstime();
1238 }
1239 }
1240 }
1241
1242 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1243 sentinelRedisInstance *ri = c->data;
1244 redisReply *r;
1245
1246 if (ri) ri->pending_commands--;
1247 if (!reply || !ri) return;
1248 r = reply;
1249
1250 if (r->type == REDIS_REPLY_STRING) {
1251 sentinelRefreshInstanceInfo(ri,r->str);
1252 }
1253 }
1254
1255 /* Just discard the reply. We use this when we are not monitoring the return
1256 * value of the command but its effects directly. */
1257 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1258 sentinelRedisInstance *ri = c->data;
1259
1260 if (ri) ri->pending_commands--;
1261 }
1262
1263 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1264 sentinelRedisInstance *ri = c->data;
1265 redisReply *r;
1266
1267 if (ri) ri->pending_commands--;
1268 if (!reply || !ri) return;
1269 r = reply;
1270
1271 if (r->type == REDIS_REPLY_STATUS ||
1272 r->type == REDIS_REPLY_ERROR) {
1273 /* Update the "instance available" field only if this is an
1274 * acceptable reply. */
1275 if (strncmp(r->str,"PONG",4) == 0 ||
1276 strncmp(r->str,"LOADING",7) == 0 ||
1277 strncmp(r->str,"MASTERDOWN",10) == 0)
1278 {
1279 ri->last_avail_time = mstime();
1280 }
1281 }
1282 ri->last_pong_time = mstime();
1283 }
1284
1285 /* This is called when we get the reply about the PUBLISH command we send
1286 * to the master to advertise this sentinel. */
1287 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1288 sentinelRedisInstance *ri = c->data;
1289 redisReply *r;
1290
1291 if (ri) ri->pending_commands--;
1292 if (!reply || !ri) return;
1293 r = reply;
1294
1295 /* Only update pub_time if we actually published our message. Otherwise
1296 * we'll retry against in 100 milliseconds. */
1297 if (r->type != REDIS_REPLY_ERROR)
1298 ri->last_pub_time = mstime();
1299 }
1300
1301 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
1302 * to discover other sentinels attached at the same master. */
1303 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1304 sentinelRedisInstance *ri = c->data;
1305 redisReply *r;
1306
1307 if (!reply || !ri) return;
1308 r = reply;
1309
1310 /* Update the last activity in the pubsub channel. Note that since we
1311 * receive our messages as well this timestamp can be used to detect
1312 * if the link is probably diconnected even if it seems otherwise. */
1313 ri->pc_last_activity = mstime();
1314
1315 /* Sanity check in the reply we expect, so that the code that follows
1316 * can avoid to check for details. */
1317 if (r->type != REDIS_REPLY_ARRAY ||
1318 r->elements != 3 ||
1319 r->element[0]->type != REDIS_REPLY_STRING ||
1320 r->element[1]->type != REDIS_REPLY_STRING ||
1321 r->element[2]->type != REDIS_REPLY_STRING ||
1322 strcmp(r->element[0]->str,"message") != 0) return;
1323
1324 /* We are not interested in meeting ourselves */
1325 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1326
1327 {
1328 int numtokens, port, removed, canfailover;
1329 char **token = sdssplitlen(r->element[2]->str,
1330 r->element[2]->len,
1331 ":",1,&numtokens);
1332 sentinelRedisInstance *sentinel;
1333
1334 if (numtokens == 4) {
1335 /* First, try to see if we already have this sentinel. */
1336 port = atoi(token[1]);
1337 canfailover = atoi(token[3]);
1338 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1339 ri->sentinels,token[0],port,token[2]);
1340
1341 if (!sentinel) {
1342 /* If not, remove all the sentinels that have the same runid
1343 * OR the same ip/port, because it's either a restart or a
1344 * network topology change. */
1345 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1346 token[2]);
1347 if (removed) {
1348 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1349 "%@ #duplicate of %s:%d or %s",
1350 token[0],port,token[2]);
1351 }
1352
1353 /* Add the new sentinel. */
1354 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1355 token[0],port,ri->quorum,ri);
1356 if (sentinel) {
1357 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1358 /* The runid is NULL after a new instance creation and
1359 * for Sentinels we don't have a later chance to fill it,
1360 * so do it now. */
1361 sentinel->runid = sdsnew(token[2]);
1362 }
1363 }
1364
1365 /* Update the state of the Sentinel. */
1366 if (sentinel) {
1367 sentinel->last_hello_time = mstime();
1368 if (canfailover)
1369 sentinel->flags |= SRI_CAN_FAILOVER;
1370 else
1371 sentinel->flags &= ~SRI_CAN_FAILOVER;
1372 }
1373 }
1374 sdsfreesplitres(token,numtokens);
1375 }
1376 }
1377
1378 void sentinelPingInstance(sentinelRedisInstance *ri) {
1379 mstime_t now = mstime();
1380 mstime_t info_period;
1381 int retval;
1382
1383 /* Return ASAP if we have already a PING or INFO already pending, or
1384 * in the case the instance is not properly connected. */
1385 if (ri->flags & SRI_DISCONNECTED) return;
1386
1387 /* For INFO, PING, PUBLISH that are not critical commands to send we
1388 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1389 * want to use a lot of memory just because a link is not working
1390 * properly (note that anyway there is a redundant protection about this,
1391 * that is, the link will be disconnected and reconnected if a long
1392 * timeout condition is detected. */
1393 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1394
1395 /* If this is a slave of a master in O_DOWN condition we start sending
1396 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1397 * period. In this state we want to closely monitor slaves in case they
1398 * are turned into masters by another Sentinel, or by the sysadmin. */
1399 if ((ri->flags & SRI_SLAVE) &&
1400 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1401 info_period = 1000;
1402 } else {
1403 info_period = SENTINEL_INFO_PERIOD;
1404 }
1405
1406 if ((ri->flags & SRI_SENTINEL) == 0 &&
1407 (ri->info_refresh == 0 ||
1408 (now - ri->info_refresh) > info_period))
1409 {
1410 /* Send INFO to masters and slaves, not sentinels. */
1411 retval = redisAsyncCommand(ri->cc,
1412 sentinelInfoReplyCallback, NULL, "INFO");
1413 if (retval != REDIS_OK) return;
1414 ri->pending_commands++;
1415 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1416 /* Send PING to all the three kinds of instances. */
1417 retval = redisAsyncCommand(ri->cc,
1418 sentinelPingReplyCallback, NULL, "PING");
1419 if (retval != REDIS_OK) return;
1420 ri->pending_commands++;
1421 } else if ((ri->flags & SRI_MASTER) &&
1422 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1423 {
1424 /* PUBLISH hello messages only to masters. */
1425 struct sockaddr_in sa;
1426 socklen_t salen = sizeof(sa);
1427
1428 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1429 char myaddr[128];
1430
1431 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1432 inet_ntoa(sa.sin_addr), server.port, server.runid,
1433 (ri->flags & SRI_CAN_FAILOVER) != 0);
1434 retval = redisAsyncCommand(ri->cc,
1435 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1436 SENTINEL_HELLO_CHANNEL,myaddr);
1437 if (retval != REDIS_OK) return;
1438 ri->pending_commands++;
1439 }
1440 }
1441 }
1442
1443 /* =========================== SENTINEL command ============================= */
1444
1445 const char *sentinelFailoverStateStr(int state) {
1446 switch(state) {
1447 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1448 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1449 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1450 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1451 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1452 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1453 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1454 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1455 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1456 default: return "unknown";
1457 }
1458 }
1459
1460 /* Redis instance to Redis protocol representation. */
1461 void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1462 char *flags = sdsempty();
1463 void *mbl;
1464 int fields = 0;
1465
1466 mbl = addDeferredMultiBulkLength(c);
1467
1468 addReplyBulkCString(c,"name");
1469 addReplyBulkCString(c,ri->name);
1470 fields++;
1471
1472 addReplyBulkCString(c,"ip");
1473 addReplyBulkCString(c,ri->addr->ip);
1474 fields++;
1475
1476 addReplyBulkCString(c,"port");
1477 addReplyBulkLongLong(c,ri->addr->port);
1478 fields++;
1479
1480 addReplyBulkCString(c,"runid");
1481 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1482 fields++;
1483
1484 addReplyBulkCString(c,"flags");
1485 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1486 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1487 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1488 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1489 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1490 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1491 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1492 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1493 flags = sdscat(flags,"failover_in_progress,");
1494 if (ri->flags & SRI_I_AM_THE_LEADER)
1495 flags = sdscat(flags,"i_am_the_leader,");
1496 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1497 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1498 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1499 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1500
1501 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1502 addReplyBulkCString(c,flags);
1503 sdsfree(flags);
1504 fields++;
1505
1506 addReplyBulkCString(c,"pending-commands");
1507 addReplyBulkLongLong(c,ri->pending_commands);
1508 fields++;
1509
1510 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1511 addReplyBulkCString(c,"failover-state");
1512 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1513 fields++;
1514 }
1515
1516 addReplyBulkCString(c,"last-ok-ping-reply");
1517 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1518 fields++;
1519
1520 addReplyBulkCString(c,"last-ping-reply");
1521 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1522 fields++;
1523
1524 if (ri->flags & SRI_S_DOWN) {
1525 addReplyBulkCString(c,"s-down-time");
1526 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1527 fields++;
1528 }
1529
1530 if (ri->flags & SRI_O_DOWN) {
1531 addReplyBulkCString(c,"o-down-time");
1532 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1533 fields++;
1534 }
1535
1536 /* Masters and Slaves */
1537 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1538 addReplyBulkCString(c,"info-refresh");
1539 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1540 fields++;
1541 }
1542
1543 /* Only masters */
1544 if (ri->flags & SRI_MASTER) {
1545 addReplyBulkCString(c,"num-slaves");
1546 addReplyBulkLongLong(c,dictSize(ri->slaves));
1547 fields++;
1548
1549 addReplyBulkCString(c,"num-other-sentinels");
1550 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1551 fields++;
1552
1553 addReplyBulkCString(c,"quorum");
1554 addReplyBulkLongLong(c,ri->quorum);
1555 fields++;
1556 }
1557
1558 /* Only slaves */
1559 if (ri->flags & SRI_SLAVE) {
1560 addReplyBulkCString(c,"master-link-down-time");
1561 addReplyBulkLongLong(c,ri->master_link_down_time);
1562 fields++;
1563
1564 addReplyBulkCString(c,"master-link-status");
1565 addReplyBulkCString(c,
1566 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1567 "ok" : "err");
1568 fields++;
1569
1570 addReplyBulkCString(c,"master-host");
1571 addReplyBulkCString(c,
1572 ri->slave_master_host ? ri->slave_master_host : "?");
1573 fields++;
1574
1575 addReplyBulkCString(c,"master-port");
1576 addReplyBulkLongLong(c,ri->slave_master_port);
1577 fields++;
1578 }
1579
1580 /* Only sentinels */
1581 if (ri->flags & SRI_SENTINEL) {
1582 addReplyBulkCString(c,"last-hello-message");
1583 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1584 fields++;
1585
1586 addReplyBulkCString(c,"can-failover-its-master");
1587 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1588 fields++;
1589
1590 if (ri->flags & SRI_MASTER_DOWN) {
1591 addReplyBulkCString(c,"subjective-leader");
1592 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1593 fields++;
1594 }
1595 }
1596
1597 setDeferredMultiBulkLength(c,mbl,fields*2);
1598 }
1599
1600 /* Output a number of instances contanined inside a dictionary as
1601 * Redis protocol. */
1602 void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1603 dictIterator *di;
1604 dictEntry *de;
1605
1606 di = dictGetIterator(instances);
1607 addReplyMultiBulkLen(c,dictSize(instances));
1608 while((de = dictNext(di)) != NULL) {
1609 sentinelRedisInstance *ri = dictGetVal(de);
1610
1611 addReplySentinelRedisInstance(c,ri);
1612 }
1613 dictReleaseIterator(di);
1614 }
1615
1616 /* Lookup the named master into sentinel.masters.
1617 * If the master is not found reply to the client with an error and returns
1618 * NULL. */
1619 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1620 robj *name)
1621 {
1622 sentinelRedisInstance *ri;
1623
1624 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1625 if (!ri) {
1626 addReplyError(c,"No such master with that name");
1627 return NULL;
1628 }
1629 return ri;
1630 }
1631
1632 void sentinelCommand(redisClient *c) {
1633 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1634 /* SENTINEL MASTERS */
1635 if (c->argc != 2) goto numargserr;
1636
1637 addReplyDictOfRedisInstances(c,sentinel.masters);
1638 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1639 /* SENTINEL SLAVES <master-name> */
1640 sentinelRedisInstance *ri;
1641
1642 if (c->argc != 3) goto numargserr;
1643 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1644 return;
1645 addReplyDictOfRedisInstances(c,ri->slaves);
1646 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1647 /* SENTINEL SENTINELS <master-name> */
1648 sentinelRedisInstance *ri;
1649
1650 if (c->argc != 3) goto numargserr;
1651 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1652 return;
1653 addReplyDictOfRedisInstances(c,ri->sentinels);
1654 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1655 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1656 sentinelRedisInstance *ri;
1657 char *leader = NULL;
1658 long port;
1659 int isdown = 0;
1660
1661 if (c->argc != 4) goto numargserr;
1662 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
1663 return;
1664 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
1665 c->argv[2]->ptr,port,NULL);
1666
1667 /* It exists? Is actually a master? Is subjectively down? It's down.
1668 * Note: if we are in tilt mode we always reply with "0". */
1669 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
1670 (ri->flags & SRI_MASTER))
1671 isdown = 1;
1672 if (ri) leader = sentinelGetSubjectiveLeader(ri);
1673
1674 /* Reply with a two-elements multi-bulk reply: down state, leader. */
1675 addReplyMultiBulkLen(c,2);
1676 addReply(c, isdown ? shared.cone : shared.czero);
1677 addReplyBulkCString(c, leader ? leader : "?");
1678 if (leader) sdsfree(leader);
1679 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
1680 /* SENTINEL RESET <pattern> */
1681 if (c->argc != 3) goto numargserr;
1682 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
1683 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
1684 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
1685 sentinelRedisInstance *ri;
1686
1687 if (c->argc != 3) goto numargserr;
1688 ri = sentinelGetMasterByName(c->argv[2]->ptr);
1689 if (ri == NULL) {
1690 addReply(c,shared.nullmultibulk);
1691 } else {
1692 sentinelAddr *addr = ri->addr;
1693
1694 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
1695 addr = ri->promoted_slave->addr;
1696 addReplyMultiBulkLen(c,2);
1697 addReplyBulkCString(c,addr->ip);
1698 addReplyBulkLongLong(c,addr->port);
1699 }
1700 } else {
1701 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
1702 (char*)c->argv[1]->ptr);
1703 }
1704 return;
1705
1706 numargserr:
1707 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
1708 (char*)c->argv[1]->ptr);
1709 }
1710
1711 /* ===================== SENTINEL availability checks ======================= */
1712
1713 /* Is this instance down from our point of view? */
1714 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
1715 mstime_t elapsed = mstime() - ri->last_avail_time;
1716
1717 /* Check if we are in need for a reconnection of one of the
1718 * links, because we are detecting low activity.
1719 *
1720 * 1) Check if the command link seems connected, was connected not less
1721 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
1722 * idle time that is greater than down_after_period / 2 seconds. */
1723 if (ri->cc &&
1724 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1725 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
1726 {
1727 sentinelKillLink(ri,ri->cc);
1728 }
1729
1730 /* 2) Check if the pubsub link seems connected, was connected not less
1731 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
1732 * activity in the Pub/Sub channel for more than
1733 * SENTINEL_PUBLISH_PERIOD * 3.
1734 */
1735 if (ri->pc &&
1736 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1737 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
1738 {
1739 sentinelKillLink(ri,ri->pc);
1740 }
1741
1742 /* Update the subjectively down flag. */
1743 if (elapsed > ri->down_after_period) {
1744 /* Is subjectively down */
1745 if ((ri->flags & SRI_S_DOWN) == 0) {
1746 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
1747 ri->s_down_since_time = mstime();
1748 ri->flags |= SRI_S_DOWN;
1749 }
1750 } else {
1751 /* Is subjectively up */
1752 if (ri->flags & SRI_S_DOWN) {
1753 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
1754 ri->flags &= ~SRI_S_DOWN;
1755 }
1756 }
1757 }
1758
1759 /* Is this instance down accordingly to the configured quorum? */
1760 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
1761 dictIterator *di;
1762 dictEntry *de;
1763 int quorum = 0, odown = 0;
1764
1765 if (master->flags & SRI_S_DOWN) {
1766 /* Is down for enough sentinels? */
1767 quorum = 1; /* the current sentinel. */
1768 /* Count all the other sentinels. */
1769 di = dictGetIterator(master->sentinels);
1770 while((de = dictNext(di)) != NULL) {
1771 sentinelRedisInstance *ri = dictGetVal(de);
1772
1773 if (ri->flags & SRI_MASTER_DOWN) quorum++;
1774 }
1775 dictReleaseIterator(di);
1776 if (quorum >= master->quorum) odown = 1;
1777 }
1778
1779 /* Set the flag accordingly to the outcome. */
1780 if (odown) {
1781 if ((master->flags & SRI_O_DOWN) == 0) {
1782 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
1783 quorum, master->quorum);
1784 master->flags |= SRI_O_DOWN;
1785 master->o_down_since_time = mstime();
1786 }
1787 } else {
1788 if (master->flags & SRI_O_DOWN) {
1789 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
1790 master->flags &= ~SRI_O_DOWN;
1791 }
1792 }
1793 }
1794
1795 /* Receive the SENTINEL is-master-down-by-addr reply, see the
1796 * sentinelAskMasterStateToOtherSentinels() function for more information. */
1797 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
1798 sentinelRedisInstance *ri = c->data;
1799 redisReply *r;
1800
1801 if (ri) ri->pending_commands--;
1802 if (!reply || !ri) return;
1803 r = reply;
1804
1805 /* Ignore every error or unexpected reply.
1806 * Note that if the command returns an error for any reason we'll
1807 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
1808 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
1809 r->element[0]->type == REDIS_REPLY_INTEGER &&
1810 r->element[1]->type == REDIS_REPLY_STRING)
1811 {
1812 ri->last_master_down_reply_time = mstime();
1813 if (r->element[0]->integer == 1) {
1814 ri->flags |= SRI_MASTER_DOWN;
1815 } else {
1816 ri->flags &= ~SRI_MASTER_DOWN;
1817 }
1818 sdsfree(ri->leader);
1819 ri->leader = sdsnew(r->element[1]->str);
1820 }
1821 }
1822
1823 /* If we think (subjectively) the master is down, we start sending
1824 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
1825 * in order to get the replies that allow to reach the quorum and
1826 * possibly also mark the master as objectively down. */
1827 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
1828 dictIterator *di;
1829 dictEntry *de;
1830
1831 di = dictGetIterator(master->sentinels);
1832 while((de = dictNext(di)) != NULL) {
1833 sentinelRedisInstance *ri = dictGetVal(de);
1834 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
1835 char port[32];
1836 int retval;
1837
1838 /* If the master state from other sentinel is too old, we clear it. */
1839 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
1840 ri->flags &= ~SRI_MASTER_DOWN;
1841 sdsfree(ri->leader);
1842 ri->leader = NULL;
1843 }
1844
1845 /* Only ask if master is down to other sentinels if:
1846 *
1847 * 1) We believe it is down, or there is a failover in progress.
1848 * 2) Sentinel is connected.
1849 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
1850 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
1851 continue;
1852 if (ri->flags & SRI_DISCONNECTED) continue;
1853 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
1854 continue;
1855
1856 /* Ask */
1857 ll2string(port,sizeof(port),master->addr->port);
1858 retval = redisAsyncCommand(ri->cc,
1859 sentinelReceiveIsMasterDownReply, NULL,
1860 "SENTINEL is-master-down-by-addr %s %s",
1861 master->addr->ip, port);
1862 if (retval == REDIS_OK) ri->pending_commands++;
1863 }
1864 dictReleaseIterator(di);
1865 }
1866
1867 /* =============================== FAILOVER ================================= */
1868
1869 /* Given a master get the "subjective leader", that is, among all the sentinels
1870 * with given characteristics, the one with the lexicographically smaller
1871 * runid. The characteristics required are:
1872 *
1873 * 1) Has SRI_CAN_FAILOVER flag.
1874 * 2) Is not disconnected.
1875 * 3) Recently answered to our ping (no longer than
1876 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
1877 *
1878 * The function returns a pointer to an sds string representing the runid of the
1879 * leader sentinel instance (from our point of view). Otherwise NULL is
1880 * returned if there are no suitable sentinels.
1881 */
1882
1883 int compareRunID(const void *a, const void *b) {
1884 char **aptrptr = (char**)a, **bptrptr = (char**)b;
1885 return strcasecmp(*aptrptr, *bptrptr);
1886 }
1887
1888 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
1889 dictIterator *di;
1890 dictEntry *de;
1891 char **instance =
1892 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
1893 int instances = 0;
1894 char *leader = NULL;
1895
1896 if (master->flags & SRI_CAN_FAILOVER) {
1897 /* Add myself if I'm a Sentinel that can failover this master. */
1898 instance[instances++] = server.runid;
1899 }
1900
1901 di = dictGetIterator(master->sentinels);
1902 while((de = dictNext(di)) != NULL) {
1903 sentinelRedisInstance *ri = dictGetVal(de);
1904 mstime_t lag = mstime() - ri->last_avail_time;
1905
1906 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
1907 !(ri->flags & SRI_CAN_FAILOVER) ||
1908 (ri->flags & SRI_DISCONNECTED) ||
1909 ri->runid == NULL)
1910 continue;
1911 instance[instances++] = ri->runid;
1912 }
1913 dictReleaseIterator(di);
1914
1915 /* If we have at least one instance passing our checks, order the array
1916 * by runid. */
1917 if (instances) {
1918 qsort(instance,instances,sizeof(char*),compareRunID);
1919 leader = sdsnew(instance[0]);
1920 }
1921 zfree(instance);
1922 return leader;
1923 }
1924
1925 struct sentinelLeader {
1926 char *runid;
1927 unsigned long votes;
1928 };
1929
1930 /* Helper function for sentinelGetObjectiveLeader, increment the counter
1931 * relative to the specified runid. */
1932 void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
1933 dictEntry *de = dictFind(counters,runid);
1934 uint64_t oldval;
1935
1936 if (de) {
1937 oldval = dictGetUnsignedIntegerVal(de);
1938 dictSetUnsignedIntegerVal(de,oldval+1);
1939 } else {
1940 de = dictAddRaw(counters,runid);
1941 redisAssert(de != NULL);
1942 dictSetUnsignedIntegerVal(de,1);
1943 }
1944 }
1945
1946 /* Scan all the Sentinels attached to this master to check what is the
1947 * most voted leader among Sentinels. */
1948 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
1949 dict *counters;
1950 dictIterator *di;
1951 dictEntry *de;
1952 unsigned int voters = 0, voters_quorum;
1953 char *myvote;
1954 char *winner = NULL;
1955
1956 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
1957 counters = dictCreate(&leaderVotesDictType,NULL);
1958
1959 /* Count my vote. */
1960 myvote = sentinelGetSubjectiveLeader(master);
1961 if (myvote) {
1962 sentinelObjectiveLeaderIncr(counters,myvote);
1963 voters++;
1964 }
1965
1966 /* Count other sentinels votes */
1967 di = dictGetIterator(master->sentinels);
1968 while((de = dictNext(di)) != NULL) {
1969 sentinelRedisInstance *ri = dictGetVal(de);
1970 if (ri->leader == NULL) continue;
1971 /* If the failover is not already in progress we are only interested
1972 * in Sentinels that believe the master is down. Otherwise the leader
1973 * selection is useful for the "failover-takedown" when the original
1974 * leader fails. In that case we consider all the voters. */
1975 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1976 !(ri->flags & SRI_MASTER_DOWN)) continue;
1977 sentinelObjectiveLeaderIncr(counters,ri->leader);
1978 voters++;
1979 }
1980 dictReleaseIterator(di);
1981 voters_quorum = voters/2+1;
1982
1983 /* Check what's the winner. For the winner to win, it needs two conditions:
1984 * 1) Absolute majority between voters (50% + 1).
1985 * 2) And anyway at least master->quorum votes. */
1986 {
1987 uint64_t max_votes = 0; /* Max votes so far. */
1988
1989 di = dictGetIterator(counters);
1990 while((de = dictNext(di)) != NULL) {
1991 uint64_t votes = dictGetUnsignedIntegerVal(de);
1992
1993 if (max_votes < votes) {
1994 max_votes = votes;
1995 winner = dictGetKey(de);
1996 }
1997 }
1998 dictReleaseIterator(di);
1999 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
2000 winner = NULL;
2001 }
2002 winner = winner ? sdsnew(winner) : NULL;
2003 sdsfree(myvote);
2004 dictRelease(counters);
2005 return winner;
2006 }
2007
2008 /* This function checks if there are the conditions to start the failover,
2009 * that is:
2010 *
2011 * 1) Enough time has passed since O_DOWN.
2012 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
2013 * 3) We are the objectively leader for this master.
2014 *
2015 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
2016 * and SRI_I_AM_THE_LEADER.
2017 */
2018 void sentinelStartFailover(sentinelRedisInstance *master) {
2019 char *leader;
2020 int isleader;
2021
2022 /* We can't failover if the master is not in O_DOWN state or if
2023 * there is not already a failover in progress (to perform the
2024 * takedown if the leader died) or if this Sentinel is not allowed
2025 * to start a failover. */
2026 if (!(master->flags & SRI_CAN_FAILOVER) ||
2027 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
2028
2029 leader = sentinelGetObjectiveLeader(master);
2030 isleader = leader && strcasecmp(leader,server.runid) == 0;
2031 sdsfree(leader);
2032
2033 /* If I'm not the leader, I can't failover for sure. */
2034 if (!isleader) return;
2035
2036 /* If the failover is already in progress there are two options... */
2037 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
2038 if (master->flags & SRI_I_AM_THE_LEADER) {
2039 /* 1) I'm flagged as leader so I already started the failover.
2040 * Just return. */
2041 return;
2042 } else {
2043 mstime_t elapsed = mstime() - master->failover_state_change_time;
2044
2045 /* 2) I'm the new leader, but I'm not flagged as leader in the
2046 * master: I did not started the failover, but the original
2047 * leader has no longer the leadership.
2048 *
2049 * In this case if the failover appears to be lagging
2050 * for at least 25% of the configured failover timeout,
2051 * I can assume I can take control. Otherwise
2052 * it's better to return and wait more. */
2053 if (elapsed < (master->failover_timeout/4)) return;
2054 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
2055 /* We have already an elected slave if we are in
2056 * FAILOVER_IN_PROGRESS state, that is, the slave that we
2057 * observed turning into a master. */
2058 master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
2059 /* As an observer we flagged all the slaves as RECONF_SENT but
2060 * now we are in charge of actually sending the reconfiguration
2061 * command so let's clear this flag for all the instances. */
2062 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
2063 SRI_RECONF_SENT);
2064 }
2065 } else {
2066 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set.
2067 *
2068 * Do we have a slave to promote? Otherwise don't start a failover
2069 * at all. */
2070 if (sentinelSelectSlave(master) == NULL) return;
2071 master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
2072 }
2073
2074 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
2075 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
2076
2077 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
2078 * a recovery of a failover started by another sentinel. */
2079 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
2080 master->failover_start_time = mstime() +
2081 SENTINEL_FAILOVER_FIXED_DELAY +
2082 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
2083 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
2084 "%@ #starting in %lld milliseconds",
2085 master->failover_start_time-mstime());
2086 }
2087 master->failover_state_change_time = mstime();
2088 }
2089
2090 /* Select a suitable slave to promote. The current algorithm only uses
2091 * the following parameters:
2092 *
2093 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
2094 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
2095 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
2096 * 4) master_link_down_time no more than:
2097 * (now - master->s_down_since_time) + (master->down_after_period * 10).
2098 *
2099 * Among all the slaves matching the above conditions we select the slave
2100 * with lower slave_priority. If priority is the same we select the slave
2101 * with lexicographically smaller runid.
2102 *
2103 * The function returns the pointer to the selected slave, otherwise
2104 * NULL if no suitable slave was found.
2105 */
2106
2107 int compareSlavesForPromotion(const void *a, const void *b) {
2108 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
2109 **sb = (sentinelRedisInstance **)b;
2110 if ((*sa)->slave_priority != (*sb)->slave_priority)
2111 return (*sa)->slave_priority - (*sb)->slave_priority;
2112 return strcasecmp((*sa)->runid,(*sb)->runid);
2113 }
2114
2115 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
2116 sentinelRedisInstance **instance =
2117 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
2118 sentinelRedisInstance *selected = NULL;
2119 int instances = 0;
2120 dictIterator *di;
2121 dictEntry *de;
2122 mstime_t max_master_down_time;
2123
2124 max_master_down_time = (mstime() - master->s_down_since_time) +
2125 (master->down_after_period * 10);
2126
2127 di = dictGetIterator(master->slaves);
2128 while((de = dictNext(di)) != NULL) {
2129 sentinelRedisInstance *slave = dictGetVal(de);
2130 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2131
2132 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2133 if (slave->last_avail_time < info_validity_time) continue;
2134 if (slave->info_refresh < info_validity_time) continue;
2135 if (slave->master_link_down_time > max_master_down_time) continue;
2136 instance[instances++] = slave;
2137 }
2138 dictReleaseIterator(di);
2139 if (instances) {
2140 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2141 compareSlavesForPromotion);
2142 selected = instance[0];
2143 }
2144 zfree(instance);
2145 return selected;
2146 }
2147
2148 /* ---------------- Failover state machine implementation ------------------- */
2149 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2150 if (mstime() >= ri->failover_start_time) {
2151 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2152 ri->failover_state_change_time = mstime();
2153 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2154 }
2155 }
2156
2157 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2158 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2159
2160 if (slave == NULL) {
2161 sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
2162 sentinelAbortFailover(ri);
2163 } else {
2164 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2165 slave->flags |= SRI_PROMOTED;
2166 ri->promoted_slave = slave;
2167 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2168 ri->failover_state_change_time = mstime();
2169 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2170 slave, "%@");
2171 }
2172 }
2173
2174 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2175 int retval;
2176
2177 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2178
2179 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2180 * We actually register a generic callback for this command as we don't
2181 * really care about the reply. We check if it worked indirectly observing
2182 * if INFO returns a different role (master instead of slave). */
2183 retval = redisAsyncCommand(ri->promoted_slave->cc,
2184 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2185 if (retval != REDIS_OK) return;
2186 ri->promoted_slave->pending_commands++;
2187 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2188 ri->promoted_slave,"%@");
2189 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2190 ri->failover_state_change_time = mstime();
2191 }
2192
2193 /* We actually wait for promotion indirectly checking with INFO when the
2194 * slave turns into a master. */
2195 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2196 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2197
2198 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2199 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2200 "%@");
2201 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2202 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2203 ri->failover_state_change_time = mstime();
2204 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2205 ri->promoted_slave = NULL;
2206 }
2207 }
2208
2209 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2210 int not_reconfigured = 0, timeout = 0;
2211 dictIterator *di;
2212 dictEntry *de;
2213 mstime_t elapsed = mstime() - master->failover_state_change_time;
2214
2215 /* We can't consider failover finished if the promoted slave is
2216 * not reachable. */
2217 if (master->promoted_slave == NULL ||
2218 master->promoted_slave->flags & SRI_S_DOWN) return;
2219
2220 /* The failover terminates once all the reachable slaves are properly
2221 * configured. */
2222 di = dictGetIterator(master->slaves);
2223 while((de = dictNext(di)) != NULL) {
2224 sentinelRedisInstance *slave = dictGetVal(de);
2225
2226 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2227 if (slave->flags & SRI_S_DOWN) continue;
2228 not_reconfigured++;
2229 }
2230 dictReleaseIterator(di);
2231
2232 /* Force end of failover on timeout. */
2233 if (elapsed > master->failover_timeout) {
2234 not_reconfigured = 0;
2235 timeout = 1;
2236 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2237 }
2238
2239 if (not_reconfigured == 0) {
2240 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2241 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2242 master->failover_state_change_time = mstime();
2243 }
2244
2245 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2246 * command to all the slaves still not reconfigured to replicate with
2247 * the new master. */
2248 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2249 dictIterator *di;
2250 dictEntry *de;
2251 char master_port[32];
2252
2253 ll2string(master_port,sizeof(master_port),
2254 master->promoted_slave->addr->port);
2255
2256 di = dictGetIterator(master->slaves);
2257 while((de = dictNext(di)) != NULL) {
2258 sentinelRedisInstance *slave = dictGetVal(de);
2259 int retval;
2260
2261 if (slave->flags &
2262 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2263
2264 retval = redisAsyncCommand(slave->cc,
2265 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2266 master->promoted_slave->addr->ip,
2267 master_port);
2268 if (retval == REDIS_OK) {
2269 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2270 slave->flags |= SRI_RECONF_SENT;
2271 }
2272 }
2273 dictReleaseIterator(di);
2274 }
2275 }
2276
2277 /* Send SLAVE OF <new master address> to all the remaining slaves that
2278 * still don't appear to have the configuration updated. */
2279 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2280 dictIterator *di;
2281 dictEntry *de;
2282 int in_progress = 0;
2283
2284 di = dictGetIterator(master->slaves);
2285 while((de = dictNext(di)) != NULL) {
2286 sentinelRedisInstance *slave = dictGetVal(de);
2287
2288 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2289 in_progress++;
2290 }
2291 dictReleaseIterator(di);
2292
2293 di = dictGetIterator(master->slaves);
2294 while(in_progress < master->parallel_syncs &&
2295 (de = dictNext(di)) != NULL)
2296 {
2297 sentinelRedisInstance *slave = dictGetVal(de);
2298 int retval;
2299 char master_port[32];
2300
2301 /* Skip the promoted slave, and already configured slaves. */
2302 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2303
2304 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2305 * the slave moving forward to the next state. */
2306 if ((slave->flags & SRI_RECONF_SENT) &&
2307 (mstime() - slave->slave_reconf_sent_time) >
2308 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2309 {
2310 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2311 slave->flags &= ~SRI_RECONF_SENT;
2312 }
2313
2314 /* Nothing to do for instances that are disconnected or already
2315 * in RECONF_SENT state. */
2316 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2317 continue;
2318
2319 /* Send SLAVEOF <new master>. */
2320 ll2string(master_port,sizeof(master_port),
2321 master->promoted_slave->addr->port);
2322 retval = redisAsyncCommand(slave->cc,
2323 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2324 master->promoted_slave->addr->ip,
2325 master_port);
2326 if (retval == REDIS_OK) {
2327 slave->flags |= SRI_RECONF_SENT;
2328 slave->pending_commands++;
2329 slave->slave_reconf_sent_time = mstime();
2330 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2331 in_progress++;
2332 }
2333 }
2334 dictReleaseIterator(di);
2335 sentinelFailoverDetectEnd(master);
2336 }
2337
2338 /* This function is called when the slave is in
2339 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2340 * to remove it from the master table and add the promoted slave instead.
2341 *
2342 * If there are no promoted slaves as this instance is unique, we remove
2343 * and re-add it with the same address to trigger a complete state
2344 * refresh. */
2345 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2346 sentinelRedisInstance *ref = master->promoted_slave ?
2347 master->promoted_slave : master;
2348
2349 sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
2350 master->name, master->addr->ip, master->addr->port,
2351 ref->addr->ip, ref->addr->port);
2352
2353 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
2354 }
2355
2356 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2357 redisAssert(ri->flags & SRI_MASTER);
2358
2359 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2360
2361 switch(ri->failover_state) {
2362 case SENTINEL_FAILOVER_STATE_WAIT_START:
2363 sentinelFailoverWaitStart(ri);
2364 break;
2365 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2366 sentinelFailoverSelectSlave(ri);
2367 break;
2368 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2369 sentinelFailoverSendSlaveOfNoOne(ri);
2370 break;
2371 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2372 sentinelFailoverWaitPromotion(ri);
2373 break;
2374 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2375 sentinelFailoverReconfNextSlave(ri);
2376 break;
2377 case SENTINEL_FAILOVER_STATE_DETECT_END:
2378 sentinelFailoverDetectEnd(ri);
2379 break;
2380 }
2381 }
2382
2383 /* Abort a failover in progress with the following steps:
2384 * 1) If this instance is the leaer send a SLAVEOF command to all the already
2385 * reconfigured slaves if any to configure them to replicate with the
2386 * original master.
2387 * 2) For both leaders and observers: clear the failover flags and state in
2388 * the master instance.
2389 * 3) If there is already a promoted slave and we are the leader, and this
2390 * slave is not DISCONNECTED, try to reconfigure it to replicate
2391 * back to the master as well, sending a best effort SLAVEOF command.
2392 */
2393 void sentinelAbortFailover(sentinelRedisInstance *ri) {
2394 char master_port[32];
2395 dictIterator *di;
2396 dictEntry *de;
2397
2398 redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
2399 ll2string(master_port,sizeof(master_port),ri->addr->port);
2400
2401 /* Clear failover related flags from slaves.
2402 * Also if we are the leader make sure to send SLAVEOF commands to all the
2403 * already reconfigured slaves in order to turn them back into slaves of
2404 * the original master. */
2405 di = dictGetIterator(ri->slaves);
2406 while((de = dictNext(di)) != NULL) {
2407 sentinelRedisInstance *slave = dictGetVal(de);
2408 if ((ri->flags & SRI_I_AM_THE_LEADER) &&
2409 !(slave->flags & SRI_DISCONNECTED) &&
2410 (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
2411 SRI_RECONF_DONE)))
2412 {
2413 int retval;
2414
2415 retval = redisAsyncCommand(slave->cc,
2416 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2417 ri->addr->ip,
2418 master_port);
2419 if (retval == REDIS_OK)
2420 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2421 }
2422 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2423 }
2424 dictReleaseIterator(di);
2425
2426 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER);
2427 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2428 ri->failover_state_change_time = mstime();
2429 if (ri->promoted_slave) {
2430 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2431 ri->promoted_slave = NULL;
2432 }
2433 }
2434
2435 /* The following is called only for master instances and will abort the
2436 * failover process if:
2437 *
2438 * 1) The failover is in progress.
2439 * 2) We already promoted a slave.
2440 * 3) The promoted slave is in extended SDOWN condition.
2441 */
2442 void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2443 /* Failover is in progress? Do we have a promoted slave? */
2444 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2445
2446 /* Is the promoted slave into an extended SDOWN state? */
2447 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2448 (mstime() - ri->promoted_slave->s_down_since_time) <
2449 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2450
2451 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2452 sentinelAbortFailover(ri);
2453 }
2454
2455 /* ======================== SENTINEL timer handler ==========================
2456 * This is the "main" our Sentinel, being sentinel completely non blocking
2457 * in design. The function is called every second.
2458 * -------------------------------------------------------------------------- */
2459
2460 /* Perform scheduled operations for the specified Redis instance. */
2461 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2462 /* ========== MONITORING HALF ============ */
2463 /* Every kind of instance */
2464 sentinelReconnectInstance(ri);
2465 sentinelPingInstance(ri);
2466
2467 /* Masters and slaves */
2468 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2469 /* Nothing so far. */
2470 }
2471
2472 /* Only masters */
2473 if (ri->flags & SRI_MASTER) {
2474 sentinelAskMasterStateToOtherSentinels(ri);
2475 }
2476
2477 /* ============== ACTING HALF ============= */
2478 /* We don't proceed with the acting half if we are in TILT mode.
2479 * TILT happens when we find something odd with the time, like a
2480 * sudden change in the clock. */
2481 if (sentinel.tilt) {
2482 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2483 sentinel.tilt = 0;
2484 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2485 }
2486
2487 /* Every kind of instance */
2488 sentinelCheckSubjectivelyDown(ri);
2489
2490 /* Masters and slaves */
2491 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2492 /* Nothing so far. */
2493 }
2494
2495 /* Only masters */
2496 if (ri->flags & SRI_MASTER) {
2497 sentinelCheckObjectivelyDown(ri);
2498 sentinelStartFailover(ri);
2499 sentinelFailoverStateMachine(ri);
2500 sentinelAbortFailoverIfNeeded(ri);
2501 }
2502 }
2503
2504 /* Perform scheduled operations for all the instances in the dictionary.
2505 * Recursively call the function against dictionaries of slaves. */
2506 void sentinelHandleDictOfRedisInstances(dict *instances) {
2507 dictIterator *di;
2508 dictEntry *de;
2509 sentinelRedisInstance *switch_to_promoted = NULL;
2510
2511 /* There are a number of things we need to perform against every master. */
2512 di = dictGetIterator(instances);
2513 while((de = dictNext(di)) != NULL) {
2514 sentinelRedisInstance *ri = dictGetVal(de);
2515
2516 sentinelHandleRedisInstance(ri);
2517 if (ri->flags & SRI_MASTER) {
2518 sentinelHandleDictOfRedisInstances(ri->slaves);
2519 sentinelHandleDictOfRedisInstances(ri->sentinels);
2520 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
2521 switch_to_promoted = ri;
2522 }
2523 }
2524 }
2525 if (switch_to_promoted)
2526 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
2527 dictReleaseIterator(di);
2528 }
2529
2530 /* This function checks if we need to enter the TITL mode.
2531 *
2532 * The TILT mode is entered if we detect that between two invocations of the
2533 * timer interrupt, a negative amount of time, or too much time has passed.
2534 * Note that we expect that more or less just 100 milliseconds will pass
2535 * if everything is fine. However we'll see a negative number or a
2536 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
2537 * following conditions happen:
2538 *
2539 * 1) The Sentiel process for some time is blocked, for every kind of
2540 * random reason: the load is huge, the computer was freezed for some time
2541 * in I/O or alike, the process was stopped by a signal. Everything.
2542 * 2) The system clock was altered significantly.
2543 *
2544 * Under both this conditions we'll see everything as timed out and failing
2545 * without good reasons. Instead we enter the TILT mode and wait
2546 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
2547 *
2548 * During TILT time we still collect information, we just do not act. */
2549 void sentinelCheckTiltCondition(void) {
2550 mstime_t now = mstime();
2551 mstime_t delta = now - sentinel.previous_time;
2552
2553 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
2554 sentinel.tilt = 1;
2555 sentinel.tilt_start_time = mstime();
2556 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
2557 }
2558 sentinel.previous_time = mstime();
2559 }
2560
2561 /* Handle terminated childs resulting from calls to notifications and client
2562 * reconfigurations scripts. */
2563 void sentinelHandleChildren(void) {
2564 int statloc;
2565 pid_t pid;
2566
2567 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
2568 int exitcode = WEXITSTATUS(statloc);
2569 int bysignal = 0;
2570
2571 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
2572 sentinelEvent(REDIS_DEBUG,"-child",NULL,"%ld %d %d",
2573 (long)pid, exitcode, bysignal);
2574
2575 /* TODO: remove client reconfiguration scripts from the queue. */
2576 }
2577 }
2578
2579 void sentinelTimer(void) {
2580 sentinelCheckTiltCondition();
2581 sentinelHandleDictOfRedisInstances(sentinel.masters);
2582 sentinelHandleChildren();
2583 }
2584