]> git.saurik.com Git - redis.git/blob - src/sentinel.c
Sentinel: ability to execute notification scripts.
[redis.git] / src / sentinel.c
1 /* Redis Sentinel implementation
2 * -----------------------------
3 *
4 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include "redis.h"
33 #include "hiredis.h"
34 #include "async.h"
35
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <sys/socket.h>
39
40 extern char **environ;
41
42 #define REDIS_SENTINEL_PORT 26379
43
44 /* ======================== Sentinel global state =========================== */
45
46 typedef long long mstime_t; /* millisecond time type. */
47
48 /* Address object, used to describe an ip:port pair. */
49 typedef struct sentinelAddr {
50 char *ip;
51 int port;
52 } sentinelAddr;
53
54 /* A Sentinel Redis Instance object is monitoring. */
55 #define SRI_MASTER (1<<0)
56 #define SRI_SLAVE (1<<1)
57 #define SRI_SENTINEL (1<<2)
58 #define SRI_DISCONNECTED (1<<3)
59 #define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
60 #define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
61 #define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
62 its master is down. */
63 /* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
64 * allowed to perform the failover for this master.
65 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
66 * perform the failover on its master. */
67 #define SRI_CAN_FAILOVER (1<<7)
68 #define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
69 this master. */
70 #define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
71 #define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
72 #define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
73 #define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
74 #define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
75
76 #define SENTINEL_INFO_PERIOD 10000
77 #define SENTINEL_PING_PERIOD 1000
78 #define SENTINEL_ASK_PERIOD 1000
79 #define SENTINEL_PUBLISH_PERIOD 5000
80 #define SENTINEL_DOWN_AFTER_PERIOD 30000
81 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
82 #define SENTINEL_TILT_TRIGGER 2000
83 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
84 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
85 #define SENTINEL_PROMOTION_RETRY_PERIOD 30000
86 #define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
87 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
88 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
89 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
90 #define SENTINEL_MAX_PENDING_COMMANDS 100
91 #define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
92
93 /* How many milliseconds is an information valid? This applies for instance
94 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
95 #define SENTINEL_INFO_VALIDITY_TIME 5000
96 #define SENTINEL_FAILOVER_FIXED_DELAY 5000
97 #define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
98
99 /* Failover machine different states. */
100 #define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
101 #define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
102 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
103 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
104 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
105 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
106 #define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
107 #define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
108 #define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
109 #define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
110 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
111
112 #define SENTINEL_MASTER_LINK_STATUS_UP 0
113 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
114
115 /* Generic flags that can be used with different functions. */
116 #define SENTINEL_NO_FLAGS 0
117 #define SENTINEL_GENERATE_EVENT 1
118
119 typedef struct sentinelRedisInstance {
120 int flags; /* See SRI_... defines */
121 char *name; /* Master name from the point of view of this sentinel. */
122 char *runid; /* run ID of this instance. */
123 sentinelAddr *addr; /* Master host. */
124 redisAsyncContext *cc; /* Hiredis context for commands. */
125 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
126 int pending_commands; /* Number of commands sent waiting for a reply. */
127 mstime_t cc_conn_time; /* cc connection time. */
128 mstime_t pc_conn_time; /* pc connection time. */
129 mstime_t pc_last_activity; /* Last time we received any message. */
130 mstime_t last_avail_time; /* Last time the instance replied to ping with
131 a reply we consider valid. */
132 mstime_t last_pong_time; /* Last time the instance replied to ping,
133 whatever the reply was. That's used to check
134 if the link is idle and must be reconnected. */
135 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
136 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
137 we received an hello from this Sentinel
138 via Pub/Sub. */
139 mstime_t last_master_down_reply_time; /* Time of last reply to
140 SENTINEL is-master-down command. */
141 mstime_t s_down_since_time; /* Subjectively down since time. */
142 mstime_t o_down_since_time; /* Objectively down since time. */
143 mstime_t down_after_period; /* Consider it down after that period. */
144 mstime_t info_refresh; /* Time at which we received INFO output from it. */
145
146 /* Master specific. */
147 dict *sentinels; /* Other sentinels monitoring the same master. */
148 dict *slaves; /* Slaves for this master instance. */
149 int quorum; /* Number of sentinels that need to agree on failure. */
150 int parallel_syncs; /* How many slaves to reconfigure at same time. */
151
152 /* Slave specific. */
153 mstime_t master_link_down_time; /* Slave replication link down time. */
154 int slave_priority; /* Slave priority according to its INFO output. */
155 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
156 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
157 char *slave_master_host; /* Master host as reported by INFO */
158 int slave_master_port; /* Master port as reported by INFO */
159 int slave_master_link_status; /* Master link status as reported by INFO */
160 /* Failover */
161 char *leader; /* If this is a master instance, this is the runid of
162 the Sentinel that should perform the failover. If
163 this is a Sentinel, this is the runid of the Sentinel
164 that this other Sentinel is voting as leader.
165 This field is valid only if SRI_MASTER_DOWN is
166 set on the Sentinel instance. */
167 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
168 mstime_t failover_state_change_time;
169 mstime_t failover_start_time; /* When to start to failover if leader. */
170 mstime_t failover_timeout; /* Max time to refresh failover state. */
171 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
172 /* Scripts executed to notify admin or reconfigure clients: when they
173 * are set to NULL no script is executed. */
174 char *notification_script;
175 char *client_reconfig_script;
176 } sentinelRedisInstance;
177
178 /* Main state. */
179 struct sentinelState {
180 dict *masters; /* Dictionary of master sentinelRedisInstances.
181 Key is the instance name, value is the
182 sentinelRedisInstance structure pointer. */
183 int tilt; /* Are we in TILT mode? */
184 mstime_t tilt_start_time; /* When TITL started. */
185 mstime_t previous_time; /* Time last time we ran the time handler. */
186 } sentinel;
187
188 /* ======================= hiredis ae.c adapters =============================
189 * Note: this implementation is taken from hiredis/adapters/ae.h, however
190 * we have our modified copy for Sentinel in order to use our allocator
191 * and to have full control over how the adapter works. */
192
193 typedef struct redisAeEvents {
194 redisAsyncContext *context;
195 aeEventLoop *loop;
196 int fd;
197 int reading, writing;
198 } redisAeEvents;
199
200 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
201 ((void)el); ((void)fd); ((void)mask);
202
203 redisAeEvents *e = (redisAeEvents*)privdata;
204 redisAsyncHandleRead(e->context);
205 }
206
207 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
208 ((void)el); ((void)fd); ((void)mask);
209
210 redisAeEvents *e = (redisAeEvents*)privdata;
211 redisAsyncHandleWrite(e->context);
212 }
213
214 static void redisAeAddRead(void *privdata) {
215 redisAeEvents *e = (redisAeEvents*)privdata;
216 aeEventLoop *loop = e->loop;
217 if (!e->reading) {
218 e->reading = 1;
219 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
220 }
221 }
222
223 static void redisAeDelRead(void *privdata) {
224 redisAeEvents *e = (redisAeEvents*)privdata;
225 aeEventLoop *loop = e->loop;
226 if (e->reading) {
227 e->reading = 0;
228 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
229 }
230 }
231
232 static void redisAeAddWrite(void *privdata) {
233 redisAeEvents *e = (redisAeEvents*)privdata;
234 aeEventLoop *loop = e->loop;
235 if (!e->writing) {
236 e->writing = 1;
237 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
238 }
239 }
240
241 static void redisAeDelWrite(void *privdata) {
242 redisAeEvents *e = (redisAeEvents*)privdata;
243 aeEventLoop *loop = e->loop;
244 if (e->writing) {
245 e->writing = 0;
246 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
247 }
248 }
249
250 static void redisAeCleanup(void *privdata) {
251 redisAeEvents *e = (redisAeEvents*)privdata;
252 redisAeDelRead(privdata);
253 redisAeDelWrite(privdata);
254 zfree(e);
255 }
256
257 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
258 redisContext *c = &(ac->c);
259 redisAeEvents *e;
260
261 /* Nothing should be attached when something is already attached */
262 if (ac->ev.data != NULL)
263 return REDIS_ERR;
264
265 /* Create container for context and r/w events */
266 e = (redisAeEvents*)zmalloc(sizeof(*e));
267 e->context = ac;
268 e->loop = loop;
269 e->fd = c->fd;
270 e->reading = e->writing = 0;
271
272 /* Register functions to start/stop listening for events */
273 ac->ev.addRead = redisAeAddRead;
274 ac->ev.delRead = redisAeDelRead;
275 ac->ev.addWrite = redisAeAddWrite;
276 ac->ev.delWrite = redisAeDelWrite;
277 ac->ev.cleanup = redisAeCleanup;
278 ac->ev.data = e;
279
280 return REDIS_OK;
281 }
282
283 /* ============================= Prototypes ================================= */
284
285 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
286 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
287 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
288 sentinelRedisInstance *sentinelGetMasterByName(char *name);
289 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
290 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
291 int yesnotoi(char *s);
292 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
293 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
294 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
295 void sentinelAbortFailover(sentinelRedisInstance *ri);
296 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
297
298 /* ========================= Dictionary types =============================== */
299
300 unsigned int dictSdsHash(const void *key);
301 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
302 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
303
304 void dictInstancesValDestructor (void *privdata, void *obj) {
305 releaseSentinelRedisInstance(obj);
306 }
307
308 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
309 *
310 * also used for: sentinelRedisInstance->sentinels dictionary that maps
311 * sentinels ip:port to last seen time in Pub/Sub hello message. */
312 dictType instancesDictType = {
313 dictSdsHash, /* hash function */
314 NULL, /* key dup */
315 NULL, /* val dup */
316 dictSdsKeyCompare, /* key compare */
317 NULL, /* key destructor */
318 dictInstancesValDestructor /* val destructor */
319 };
320
321 /* Instance runid (sds) -> votes (long casted to void*)
322 *
323 * This is useful into sentinelGetObjectiveLeader() function in order to
324 * count the votes and understand who is the leader. */
325 dictType leaderVotesDictType = {
326 dictSdsHash, /* hash function */
327 NULL, /* key dup */
328 NULL, /* val dup */
329 dictSdsKeyCompare, /* key compare */
330 NULL, /* key destructor */
331 NULL /* val destructor */
332 };
333
334 /* =========================== Initialization =============================== */
335
336 void sentinelCommand(redisClient *c);
337
338 struct redisCommand sentinelcmds[] = {
339 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
340 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
341 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
342 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
343 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
344 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}
345 };
346
347 /* This function overwrites a few normal Redis config default with Sentinel
348 * specific defaults. */
349 void initSentinelConfig(void) {
350 server.port = REDIS_SENTINEL_PORT;
351 }
352
353 /* Perform the Sentinel mode initialization. */
354 void initSentinel(void) {
355 int j;
356
357 /* Remove usual Redis commands from the command table, then just add
358 * the SENTINEL command. */
359 dictEmpty(server.commands);
360 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
361 int retval;
362 struct redisCommand *cmd = sentinelcmds+j;
363
364 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
365 redisAssert(retval == DICT_OK);
366 }
367
368 /* Initialize various data structures. */
369 sentinel.masters = dictCreate(&instancesDictType,NULL);
370 sentinel.tilt = 0;
371 sentinel.tilt_start_time = mstime();
372 sentinel.previous_time = mstime();
373 }
374
375 /* ============================== sentinelAddr ============================== */
376
377 /* Create a sentinelAddr object and return it on success.
378 * On error NULL is returned and errno is set to:
379 * ENOENT: Can't resolve the hostname.
380 * EINVAL: Invalid port number.
381 */
382 sentinelAddr *createSentinelAddr(char *hostname, int port) {
383 char buf[32];
384 sentinelAddr *sa;
385
386 if (port <= 0 || port > 65535) {
387 errno = EINVAL;
388 return NULL;
389 }
390 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
391 errno = ENOENT;
392 return NULL;
393 }
394 sa = zmalloc(sizeof(*sa));
395 sa->ip = sdsnew(buf);
396 sa->port = port;
397 return sa;
398 }
399
400 /* Free a Sentinel address. Can't fail. */
401 void releaseSentinelAddr(sentinelAddr *sa) {
402 sdsfree(sa->ip);
403 zfree(sa);
404 }
405
406 /* =========================== Events notification ========================== */
407
408 void sentinelCallNotificationScript(char *scriptpath, char *type, char *msg) {
409 pid_t pid = fork();
410
411 if (pid == -1) {
412 /* Parent on error. */
413 sentinelEvent(REDIS_WARNING,"-notification-script-error",NULL,
414 "#can't fork: %s",strerror(errno));
415 return;
416 } else if (pid == 0) {
417 /* Child */
418 char *argv[4];
419
420 argv[0] = scriptpath;
421 argv[1] = type;
422 argv[2] = msg;
423 argv[3] = NULL;
424 execve(scriptpath,argv,environ);
425 /* If we are here an error occurred. */
426 sentinelEvent(REDIS_WARNING,"-notification-script-error",NULL,
427 "#execve(2): %s",strerror(errno));
428 _exit(1);
429 } else {
430 sentinelEvent(REDIS_DEBUG,"+child",NULL,"%ld",(long)pid);
431 }
432 }
433
434 /* Send an event to log, pub/sub, user notification script.
435 *
436 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
437 * the execution of the user notification script.
438 *
439 * 'type' is the message type, also used as a pub/sub channel name.
440 *
441 * 'ri', is the redis instance target of this event if applicable, and is
442 * used to obtain the path of the notification script to execute.
443 *
444 * The remaining arguments are printf-alike.
445 * If the format specifier starts with the two characters "%@" then ri is
446 * not NULL, and the message is prefixed with an instance identifier in the
447 * following format:
448 *
449 * <instance type> <instance name> <ip> <port>
450 *
451 * If the instance type is not master, than the additional string is
452 * added to specify the originating master:
453 *
454 * @ <master name> <master ip> <master port>
455 *
456 * Any other specifier after "%@" is processed by printf itself.
457 */
458 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
459 const char *fmt, ...) {
460 va_list ap;
461 char msg[REDIS_MAX_LOGMSG_LEN];
462 robj *channel, *payload;
463
464 /* Handle %@ */
465 if (fmt[0] == '%' && fmt[1] == '@') {
466 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
467 NULL : ri->master;
468
469 if (master) {
470 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
471 sentinelRedisInstanceTypeStr(ri),
472 ri->name, ri->addr->ip, ri->addr->port,
473 master->name, master->addr->ip, master->addr->port);
474 } else {
475 snprintf(msg, sizeof(msg), "%s %s %s %d",
476 sentinelRedisInstanceTypeStr(ri),
477 ri->name, ri->addr->ip, ri->addr->port);
478 }
479 fmt += 2;
480 } else {
481 msg[0] = '\0';
482 }
483
484 /* Use vsprintf for the rest of the formatting if any. */
485 if (fmt[0] != '\0') {
486 va_start(ap, fmt);
487 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
488 va_end(ap);
489 }
490
491 /* Log the message if the log level allows it to be logged. */
492 if (level >= server.verbosity)
493 redisLog(level,"%s %s",type,msg);
494
495 /* Publish the message via Pub/Sub if it's not a debugging one. */
496 if (level != REDIS_DEBUG) {
497 channel = createStringObject(type,strlen(type));
498 payload = createStringObject(msg,strlen(msg));
499 pubsubPublishMessage(channel,payload);
500 decrRefCount(channel);
501 decrRefCount(payload);
502 }
503
504 /* Call the notification script if applicable. */
505 if (level == REDIS_WARNING && ri != NULL) {
506 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
507 ri : ri->master;
508 if (master->notification_script) {
509 sentinelCallNotificationScript(master->notification_script,
510 type,msg);
511 }
512 }
513 }
514
515 /* ========================== sentinelRedisInstance ========================= */
516
517 /* Create a redis instance, the following fields must be populated by the
518 * caller if needed:
519 * runid: set to NULL but will be populated once INFO output is received.
520 * info_refresh: is set to 0 to mean that we never received INFO so far.
521 *
522 * If SRI_MASTER is set into initial flags the instance is added to
523 * sentinel.masters table.
524 *
525 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
526 * instance is added into master->slaves or master->sentinels table.
527 *
528 * If the instance is a slave or sentinel, the name parameter is ignored and
529 * is created automatically as hostname:port.
530 *
531 * The function fails if hostname can't be resolved or port is out of range.
532 * When this happens NULL is returned and errno is set accordingly to the
533 * createSentinelAddr() function.
534 *
535 * The function may also fail and return NULL with errno set to EBUSY if
536 * a master or slave with the same name already exists. */
537 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
538 sentinelRedisInstance *ri;
539 sentinelAddr *addr;
540 dict *table;
541 char slavename[128], *sdsname;
542
543 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
544 redisAssert((flags & SRI_MASTER) || master != NULL);
545
546 /* Check address validity. */
547 addr = createSentinelAddr(hostname,port);
548 if (addr == NULL) return NULL;
549
550 /* For slaves and sentinel we use ip:port as name. */
551 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
552 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
553 name = slavename;
554 }
555
556 /* Make sure the entry is not duplicated. This may happen when the same
557 * name for a master is used multiple times inside the configuration or
558 * if we try to add multiple times a slave or sentinel with same ip/port
559 * to a master. */
560 if (flags & SRI_MASTER) table = sentinel.masters;
561 else if (flags & SRI_SLAVE) table = master->slaves;
562 else if (flags & SRI_SENTINEL) table = master->sentinels;
563 sdsname = sdsnew(name);
564 if (dictFind(table,sdsname)) {
565 sdsfree(sdsname);
566 errno = EBUSY;
567 return NULL;
568 }
569
570 /* Create the instance object. */
571 ri = zmalloc(sizeof(*ri));
572 /* Note that all the instances are started in the disconnected state,
573 * the event loop will take care of connecting them. */
574 ri->flags = flags | SRI_DISCONNECTED;
575 ri->name = sdsname;
576 ri->runid = NULL;
577 ri->addr = addr;
578 ri->cc = NULL;
579 ri->pc = NULL;
580 ri->pending_commands = 0;
581 ri->cc_conn_time = 0;
582 ri->pc_conn_time = 0;
583 ri->pc_last_activity = 0;
584 ri->last_avail_time = mstime();
585 ri->last_pong_time = mstime();
586 ri->last_pub_time = mstime();
587 ri->last_hello_time = mstime();
588 ri->last_master_down_reply_time = mstime();
589 ri->s_down_since_time = 0;
590 ri->o_down_since_time = 0;
591 ri->down_after_period = master ? master->down_after_period :
592 SENTINEL_DOWN_AFTER_PERIOD;
593 ri->master_link_down_time = 0;
594 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
595 ri->slave_reconf_sent_time = 0;
596 ri->slave_master_host = NULL;
597 ri->slave_master_port = 0;
598 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
599 ri->sentinels = dictCreate(&instancesDictType,NULL);
600 ri->quorum = quorum;
601 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
602 ri->master = master;
603 ri->slaves = dictCreate(&instancesDictType,NULL);
604 ri->info_refresh = 0;
605
606 /* Failover state. */
607 ri->leader = NULL;
608 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
609 ri->failover_state_change_time = 0;
610 ri->failover_start_time = 0;
611 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
612 ri->promoted_slave = NULL;
613 ri->notification_script = NULL;
614 ri->client_reconfig_script = NULL;
615
616 /* Add into the right table. */
617 dictAdd(table, ri->name, ri);
618 return ri;
619 }
620
621 /* Release this instance and all its slaves, sentinels, hiredis connections.
622 * This function also takes care of unlinking the instance from the main
623 * masters table (if it is a master) or from its master sentinels/slaves table
624 * if it is a slave or sentinel. */
625 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
626 /* Release all its slaves or sentinels if any. */
627 dictRelease(ri->sentinels);
628 dictRelease(ri->slaves);
629
630 /* Release hiredis connections. */
631 if (ri->cc) sentinelKillLink(ri,ri->cc);
632 if (ri->pc) sentinelKillLink(ri,ri->pc);
633
634 /* Free other resources. */
635 sdsfree(ri->name);
636 sdsfree(ri->runid);
637 sdsfree(ri->notification_script);
638 sdsfree(ri->client_reconfig_script);
639 sdsfree(ri->slave_master_host);
640 sdsfree(ri->leader);
641 releaseSentinelAddr(ri->addr);
642
643 /* Clear state into the master if needed. */
644 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
645 ri->master->promoted_slave = NULL;
646
647 zfree(ri);
648 }
649
650 /* Lookup a slave in a master Redis instance, by ip and port. */
651 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
652 sentinelRedisInstance *ri, char *ip, int port)
653 {
654 sds key;
655 sentinelRedisInstance *slave;
656
657 redisAssert(ri->flags & SRI_MASTER);
658 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
659 slave = dictFetchValue(ri->slaves,key);
660 sdsfree(key);
661 return slave;
662 }
663
664 /* Return the name of the type of the instance as a string. */
665 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
666 if (ri->flags & SRI_MASTER) return "master";
667 else if (ri->flags & SRI_SLAVE) return "slave";
668 else if (ri->flags & SRI_SENTINEL) return "sentinel";
669 else return "unknown";
670 }
671
672 /* This function removes all the instances found in the dictionary of instances
673 * 'd', having either:
674 *
675 * 1) The same ip/port as specified.
676 * 2) The same runid.
677 *
678 * "1" and "2" don't need to verify at the same time, just one is enough.
679 * If "runid" is NULL it is not checked.
680 * Similarly if "ip" is NULL it is not checked.
681 *
682 * This function is useful because every time we add a new Sentinel into
683 * a master's Sentinels dictionary, we want to be very sure about not
684 * having duplicated instances for any reason. This is so important because
685 * we use those other sentinels in order to run our quorum protocol to
686 * understand if it's time to proceeed with the fail over.
687 *
688 * Making sure no duplication is possible we greately improve the robustness
689 * of the quorum (otherwise we may end counting the same instance multiple
690 * times for some reason).
691 *
692 * The function returns the number of Sentinels removed. */
693 int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
694 dictIterator *di;
695 dictEntry *de;
696 int removed = 0;
697
698 di = dictGetSafeIterator(master->sentinels);
699 while((de = dictNext(di)) != NULL) {
700 sentinelRedisInstance *ri = dictGetVal(de);
701
702 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
703 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
704 {
705 dictDelete(master->sentinels,ri->name);
706 removed++;
707 }
708 }
709 dictReleaseIterator(di);
710 return removed;
711 }
712
713 /* Search an instance with the same runid, ip and port into a dictionary
714 * of instances. Return NULL if not found, otherwise return the instance
715 * pointer.
716 *
717 * runid or ip can be NULL. In such a case the search is performed only
718 * by the non-NULL field. */
719 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
720 dictIterator *di;
721 dictEntry *de;
722 sentinelRedisInstance *instance = NULL;
723
724 redisAssert(ip || runid); /* User must pass at least one search param. */
725 di = dictGetIterator(instances);
726 while((de = dictNext(di)) != NULL) {
727 sentinelRedisInstance *ri = dictGetVal(de);
728
729 if (runid && !ri->runid) continue;
730 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
731 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
732 ri->addr->port == port)))
733 {
734 instance = ri;
735 break;
736 }
737 }
738 dictReleaseIterator(di);
739 return instance;
740 }
741
742 /* Simple master lookup by name */
743 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
744 sentinelRedisInstance *ri;
745 sds sdsname = sdsnew(name);
746
747 ri = dictFetchValue(sentinel.masters,sdsname);
748 sdsfree(sdsname);
749 return ri;
750 }
751
752 /* Add the specified flags to all the instances in the specified dictionary. */
753 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
754 dictIterator *di;
755 dictEntry *de;
756
757 di = dictGetIterator(instances);
758 while((de = dictNext(di)) != NULL) {
759 sentinelRedisInstance *ri = dictGetVal(de);
760 ri->flags |= flags;
761 }
762 dictReleaseIterator(di);
763 }
764
765 /* Remove the specified flags to all the instances in the specified
766 * dictionary. */
767 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
768 dictIterator *di;
769 dictEntry *de;
770
771 di = dictGetIterator(instances);
772 while((de = dictNext(di)) != NULL) {
773 sentinelRedisInstance *ri = dictGetVal(de);
774 ri->flags &= ~flags;
775 }
776 dictReleaseIterator(di);
777 }
778
779 /* Reset the state of a monitored master:
780 * 1) Remove all slaves.
781 * 2) Remove all sentinels.
782 * 3) Remove most of the flags resulting from runtime operations.
783 * 4) Reset timers to their default value.
784 * 5) In the process of doing this undo the failover if in progress.
785 * 6) Disconnect the connections with the master (will reconnect automatically).
786 */
787 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
788 redisAssert(ri->flags & SRI_MASTER);
789 dictRelease(ri->slaves);
790 dictRelease(ri->sentinels);
791 ri->slaves = dictCreate(&instancesDictType,NULL);
792 ri->sentinels = dictCreate(&instancesDictType,NULL);
793 if (ri->cc) sentinelKillLink(ri,ri->cc);
794 if (ri->pc) sentinelKillLink(ri,ri->pc);
795 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
796 if (ri->leader) {
797 sdsfree(ri->leader);
798 ri->leader = NULL;
799 }
800 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
801 ri->failover_state_change_time = 0;
802 ri->failover_start_time = 0;
803 ri->promoted_slave = NULL;
804 sdsfree(ri->runid);
805 sdsfree(ri->slave_master_host);
806 ri->runid = NULL;
807 ri->slave_master_host = NULL;
808 ri->last_avail_time = mstime();
809 ri->last_pong_time = mstime();
810 if (flags & SENTINEL_GENERATE_EVENT)
811 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
812 }
813
814 /* Call sentinelResetMaster() on every master with a name matching the specified
815 * pattern. */
816 int sentinelResetMastersByPattern(char *pattern, int flags) {
817 dictIterator *di;
818 dictEntry *de;
819 int reset = 0;
820
821 di = dictGetIterator(sentinel.masters);
822 while((de = dictNext(di)) != NULL) {
823 sentinelRedisInstance *ri = dictGetVal(de);
824
825 if (ri->name) {
826 if (stringmatch(pattern,ri->name,0)) {
827 sentinelResetMaster(ri,flags);
828 reset++;
829 }
830 }
831 }
832 dictReleaseIterator(di);
833 return reset;
834 }
835
836 /* Reset the specified master with sentinelResetMaster(), and also change
837 * the ip:port address, but take the name of the instance unmodified.
838 *
839 * This is used to handle the +switch-master and +redirect-to-master events.
840 *
841 * The function returns REDIS_ERR if the address can't be resolved for some
842 * reason. Otherwise REDIS_OK is returned.
843 *
844 * TODO: make this reset so that original sentinels are re-added with
845 * same ip / port / runid.
846 */
847
848 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
849 sentinelAddr *oldaddr, *newaddr;
850
851 newaddr = createSentinelAddr(ip,port);
852 if (newaddr == NULL) return REDIS_ERR;
853 sentinelResetMaster(master,SENTINEL_NO_FLAGS);
854 oldaddr = master->addr;
855 master->addr = newaddr;
856 /* Release the old address at the end so we are safe even if the function
857 * gets the master->addr->ip and master->addr->port as arguments. */
858 releaseSentinelAddr(oldaddr);
859 return REDIS_OK;
860 }
861
862 /* ============================ Config handling ============================= */
863 char *sentinelHandleConfiguration(char **argv, int argc) {
864 sentinelRedisInstance *ri;
865
866 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
867 /* monitor <name> <host> <port> <quorum> */
868 int quorum = atoi(argv[4]);
869
870 if (quorum <= 0) return "Quorum must be 1 or greater.";
871 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
872 atoi(argv[3]),quorum,NULL) == NULL)
873 {
874 switch(errno) {
875 case EBUSY: return "Duplicated master name.";
876 case ENOENT: return "Can't resolve master instance hostname.";
877 case EINVAL: return "Invalid port number";
878 }
879 }
880 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
881 /* down-after-milliseconds <name> <milliseconds> */
882 ri = sentinelGetMasterByName(argv[1]);
883 if (!ri) return "No such master with specified name.";
884 ri->down_after_period = atoi(argv[2]);
885 if (ri->down_after_period <= 0)
886 return "negative or zero time parameter.";
887 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
888 /* failover-timeout <name> <milliseconds> */
889 ri = sentinelGetMasterByName(argv[1]);
890 if (!ri) return "No such master with specified name.";
891 ri->failover_timeout = atoi(argv[2]);
892 if (ri->failover_timeout <= 0)
893 return "negative or zero time parameter.";
894 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
895 /* can-failover <name> <yes/no> */
896 int yesno = yesnotoi(argv[2]);
897
898 ri = sentinelGetMasterByName(argv[1]);
899 if (!ri) return "No such master with specified name.";
900 if (yesno == -1) return "Argument must be either yes or no.";
901 if (yesno)
902 ri->flags |= SRI_CAN_FAILOVER;
903 else
904 ri->flags &= ~SRI_CAN_FAILOVER;
905 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
906 /* parallel-syncs <name> <milliseconds> */
907 ri = sentinelGetMasterByName(argv[1]);
908 if (!ri) return "No such master with specified name.";
909 ri->parallel_syncs = atoi(argv[2]);
910 } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
911 /* notification-script <name> <path> */
912 ri = sentinelGetMasterByName(argv[1]);
913 if (!ri) return "No such master with specified name.";
914 if (access(argv[2],X_OK) == -1)
915 return "Notification script seems non existing or non executable.";
916 ri->notification_script = sdsnew(argv[2]);
917 } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
918 /* client-reconfig-script <name> <path> */
919 ri = sentinelGetMasterByName(argv[1]);
920 if (!ri) return "No such master with specified name.";
921 if (access(argv[2],X_OK) == -1)
922 return "Client reconfiguration script seems non existing or "
923 "non executable.";
924 ri->client_reconfig_script = sdsnew(argv[2]);
925 } else {
926 return "Unrecognized sentinel configuration statement.";
927 }
928 return NULL;
929 }
930
931 /* ====================== hiredis connection handling ======================= */
932
933 /* Completely disconnect an hiredis link from an instance. */
934 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
935 if (ri->cc == c) {
936 ri->cc = NULL;
937 ri->pending_commands = 0;
938 }
939 if (ri->pc == c) ri->pc = NULL;
940 c->data = NULL;
941 ri->flags |= SRI_DISCONNECTED;
942 redisAsyncFree(c);
943 }
944
945 /* This function takes an hiredis context that is in an error condition
946 * and make sure to mark the instance as disconnected performing the
947 * cleanup needed.
948 *
949 * Note: we don't free the hiredis context as hiredis will do it for us
950 * for async conenctions. */
951 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
952 sentinelRedisInstance *ri = c->data;
953 int pubsub;
954
955 if (ri == NULL) return; /* The instance no longer exists. */
956
957 pubsub = (ri->pc == c);
958 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
959 "%@ #%s", c->errstr);
960 if (pubsub)
961 ri->pc = NULL;
962 else
963 ri->cc = NULL;
964 ri->flags |= SRI_DISCONNECTED;
965 }
966
967 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
968 if (status != REDIS_OK) {
969 sentinelDisconnectInstanceFromContext(c);
970 } else {
971 sentinelRedisInstance *ri = c->data;
972 int pubsub = (ri->pc == c);
973
974 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
975 "%@");
976 }
977 }
978
979 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
980 sentinelDisconnectInstanceFromContext(c);
981 }
982
983 /* Create the async connections for the specified instance if the instance
984 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
985 * one of the two links (commands and pub/sub) is missing. */
986 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
987 if (!(ri->flags & SRI_DISCONNECTED)) return;
988
989 /* Commands connection. */
990 if (ri->cc == NULL) {
991 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
992 if (ri->cc->err) {
993 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
994 ri->cc->errstr);
995 sentinelKillLink(ri,ri->cc);
996 } else {
997 ri->cc_conn_time = mstime();
998 ri->cc->data = ri;
999 redisAeAttach(server.el,ri->cc);
1000 redisAsyncSetConnectCallback(ri->cc,
1001 sentinelLinkEstablishedCallback);
1002 redisAsyncSetDisconnectCallback(ri->cc,
1003 sentinelDisconnectCallback);
1004 }
1005 }
1006 /* Pub / Sub */
1007 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
1008 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1009 if (ri->pc->err) {
1010 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
1011 ri->pc->errstr);
1012 sentinelKillLink(ri,ri->pc);
1013 } else {
1014 int retval;
1015
1016 ri->pc_conn_time = mstime();
1017 ri->pc->data = ri;
1018 redisAeAttach(server.el,ri->pc);
1019 redisAsyncSetConnectCallback(ri->pc,
1020 sentinelLinkEstablishedCallback);
1021 redisAsyncSetDisconnectCallback(ri->pc,
1022 sentinelDisconnectCallback);
1023 /* Now we subscribe to the Sentinels "Hello" channel. */
1024 retval = redisAsyncCommand(ri->pc,
1025 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
1026 SENTINEL_HELLO_CHANNEL);
1027 if (retval != REDIS_OK) {
1028 /* If we can't subscribe, the Pub/Sub connection is useless
1029 * and we can simply disconnect it and try again. */
1030 sentinelKillLink(ri,ri->pc);
1031 return;
1032 }
1033 }
1034 }
1035 /* Clear the DISCONNECTED flags only if we have both the connections
1036 * (or just the commands connection if this is a slave or a
1037 * sentinel instance). */
1038 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
1039 ri->flags &= ~SRI_DISCONNECTED;
1040 }
1041
1042 /* ======================== Redis instances pinging ======================== */
1043
1044 /* Process the INFO output from masters. */
1045 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
1046 sds *lines;
1047 int numlines, j;
1048 int role = 0;
1049 int runid_changed = 0; /* true if runid changed. */
1050 int first_runid = 0; /* true if this is the first runid we receive. */
1051
1052 /* The following fields must be reset to a given value in the case they
1053 * are not found at all in the INFO output. */
1054 ri->master_link_down_time = 0;
1055
1056 /* Process line by line. */
1057 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
1058 for (j = 0; j < numlines; j++) {
1059 sentinelRedisInstance *slave;
1060 sds l = lines[j];
1061
1062 /* run_id:<40 hex chars>*/
1063 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
1064 if (ri->runid == NULL) {
1065 ri->runid = sdsnewlen(l+7,40);
1066 first_runid = 1;
1067 } else {
1068 if (strncmp(ri->runid,l+7,40) != 0) {
1069 runid_changed = 1;
1070 sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
1071 sdsfree(ri->runid);
1072 ri->runid = sdsnewlen(l+7,40);
1073 }
1074 }
1075 }
1076
1077 /* slave0:<ip>,<port>,<state> */
1078 if ((ri->flags & SRI_MASTER) &&
1079 sdslen(l) >= 7 &&
1080 !memcmp(l,"slave",5) && isdigit(l[5]))
1081 {
1082 char *ip, *port, *end;
1083
1084 ip = strchr(l,':'); if (!ip) continue;
1085 ip++; /* Now ip points to start of ip address. */
1086 port = strchr(ip,','); if (!port) continue;
1087 *port = '\0'; /* nul term for easy access. */
1088 port++; /* Now port points to start of port number. */
1089 end = strchr(port,','); if (!end) continue;
1090 *end = '\0'; /* nul term for easy access. */
1091
1092 /* Check if we already have this slave into our table,
1093 * otherwise add it. */
1094 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1095 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1096 atoi(port), ri->quorum,ri)) != NULL)
1097 {
1098 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1099 }
1100 }
1101 }
1102
1103 /* master_link_down_since_seconds:<seconds> */
1104 if (sdslen(l) >= 32 &&
1105 !memcmp(l,"master_link_down_since_seconds",30))
1106 {
1107 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1108 }
1109
1110 /* role:<role> */
1111 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1112 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1113
1114 if (role == SRI_SLAVE) {
1115 /* master_host:<host> */
1116 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1117 sdsfree(ri->slave_master_host);
1118 ri->slave_master_host = sdsnew(l+12);
1119 }
1120
1121 /* master_port:<port> */
1122 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1123 ri->slave_master_port = atoi(l+12);
1124
1125 /* master_link_status:<status> */
1126 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1127 ri->slave_master_link_status =
1128 (strcasecmp(l+19,"up") == 0) ?
1129 SENTINEL_MASTER_LINK_STATUS_UP :
1130 SENTINEL_MASTER_LINK_STATUS_DOWN;
1131 }
1132 }
1133 }
1134 ri->info_refresh = mstime();
1135 sdsfreesplitres(lines,numlines);
1136
1137 if (sentinel.tilt) return;
1138
1139 /* Act if a master turned into a slave. */
1140 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
1141 if (first_runid && ri->slave_master_host) {
1142 /* If it is the first time we receive INFO from it, but it's
1143 * a slave while it was configured as a master, we want to monitor
1144 * its master instead. */
1145 sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
1146 "%s %s %d %s %d",
1147 ri->name, ri->addr->ip, ri->addr->port,
1148 ri->slave_master_host, ri->slave_master_port);
1149 sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
1150 ri->slave_master_port);
1151 return;
1152 }
1153 }
1154
1155 /* Act if a slave turned into a master. */
1156 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
1157 if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1158 (runid_changed || first_runid))
1159 {
1160 /* If a slave turned into a master, but at the same time the
1161 * runid has changed, or it is simply the first time we see and
1162 * INFO output from this instance, this is a reboot with a wrong
1163 * configuration.
1164 *
1165 * Log the event and remove the slave. */
1166 int retval;
1167
1168 sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
1169 retval = dictDelete(ri->master->slaves,ri->name);
1170 redisAssert(retval == REDIS_OK);
1171 return;
1172 } else if (ri->flags & SRI_PROMOTED) {
1173 /* If this is a promoted slave we can change state to the
1174 * failover state machine. */
1175 if (ri->master &&
1176 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1177 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1178 (ri->master->failover_state ==
1179 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1180 {
1181 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1182 ri->master->failover_state_change_time = mstime();
1183 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1184 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1185 ri->master,"%@");
1186 }
1187 } else {
1188 /* Otherwise we interpret this as the start of the failover. */
1189 if (ri->master &&
1190 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) == 0)
1191 {
1192 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1193 sentinelEvent(REDIS_WARNING,"failover-detected",ri->master,"%@");
1194 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1195 ri->master->failover_state_change_time = mstime();
1196 ri->master->promoted_slave = ri;
1197 ri->flags |= SRI_PROMOTED;
1198 /* We are an observer, so we can only assume that the leader
1199 * is reconfiguring the slave instances. For this reason we
1200 * set all the instances as RECONF_SENT waiting for progresses
1201 * on this side. */
1202 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1203 SRI_RECONF_SENT);
1204 }
1205 }
1206 }
1207
1208 /* Detect if the slave that is in the process of being reconfigured
1209 * changed state. */
1210 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1211 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1212 {
1213 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1214 if ((ri->flags & SRI_RECONF_SENT) &&
1215 ri->slave_master_host &&
1216 strcmp(ri->slave_master_host,
1217 ri->master->promoted_slave->addr->ip) == 0 &&
1218 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1219 {
1220 ri->flags &= ~SRI_RECONF_SENT;
1221 ri->flags |= SRI_RECONF_INPROG;
1222 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1223 }
1224
1225 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1226 if ((ri->flags & SRI_RECONF_INPROG) &&
1227 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1228 {
1229 ri->flags &= ~SRI_RECONF_INPROG;
1230 ri->flags |= SRI_RECONF_DONE;
1231 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1232 /* If we are moving forward (a new slave is now configured)
1233 * we update the change_time as we are conceptually passing
1234 * to the next slave. */
1235 ri->failover_state_change_time = mstime();
1236 }
1237 }
1238 }
1239
1240 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1241 sentinelRedisInstance *ri = c->data;
1242 redisReply *r;
1243
1244 if (ri) ri->pending_commands--;
1245 if (!reply || !ri) return;
1246 r = reply;
1247
1248 if (r->type == REDIS_REPLY_STRING) {
1249 sentinelRefreshInstanceInfo(ri,r->str);
1250 }
1251 }
1252
1253 /* Just discard the reply. We use this when we are not monitoring the return
1254 * value of the command but its effects directly. */
1255 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1256 sentinelRedisInstance *ri = c->data;
1257
1258 if (ri) ri->pending_commands--;
1259 }
1260
1261 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1262 sentinelRedisInstance *ri = c->data;
1263 redisReply *r;
1264
1265 if (ri) ri->pending_commands--;
1266 if (!reply || !ri) return;
1267 r = reply;
1268
1269 if (r->type == REDIS_REPLY_STATUS ||
1270 r->type == REDIS_REPLY_ERROR) {
1271 /* Update the "instance available" field only if this is an
1272 * acceptable reply. */
1273 if (strncmp(r->str,"PONG",4) == 0 ||
1274 strncmp(r->str,"LOADING",7) == 0 ||
1275 strncmp(r->str,"MASTERDOWN",10) == 0)
1276 {
1277 ri->last_avail_time = mstime();
1278 }
1279 }
1280 ri->last_pong_time = mstime();
1281 }
1282
1283 /* This is called when we get the reply about the PUBLISH command we send
1284 * to the master to advertise this sentinel. */
1285 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1286 sentinelRedisInstance *ri = c->data;
1287 redisReply *r;
1288
1289 if (ri) ri->pending_commands--;
1290 if (!reply || !ri) return;
1291 r = reply;
1292
1293 /* Only update pub_time if we actually published our message. Otherwise
1294 * we'll retry against in 100 milliseconds. */
1295 if (r->type != REDIS_REPLY_ERROR)
1296 ri->last_pub_time = mstime();
1297 }
1298
1299 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
1300 * to discover other sentinels attached at the same master. */
1301 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1302 sentinelRedisInstance *ri = c->data;
1303 redisReply *r;
1304
1305 if (!reply || !ri) return;
1306 r = reply;
1307
1308 /* Update the last activity in the pubsub channel. Note that since we
1309 * receive our messages as well this timestamp can be used to detect
1310 * if the link is probably diconnected even if it seems otherwise. */
1311 ri->pc_last_activity = mstime();
1312
1313 /* Sanity check in the reply we expect, so that the code that follows
1314 * can avoid to check for details. */
1315 if (r->type != REDIS_REPLY_ARRAY ||
1316 r->elements != 3 ||
1317 r->element[0]->type != REDIS_REPLY_STRING ||
1318 r->element[1]->type != REDIS_REPLY_STRING ||
1319 r->element[2]->type != REDIS_REPLY_STRING ||
1320 strcmp(r->element[0]->str,"message") != 0) return;
1321
1322 /* We are not interested in meeting ourselves */
1323 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1324
1325 {
1326 int numtokens, port, removed, canfailover;
1327 char **token = sdssplitlen(r->element[2]->str,
1328 r->element[2]->len,
1329 ":",1,&numtokens);
1330 sentinelRedisInstance *sentinel;
1331
1332 if (numtokens == 4) {
1333 /* First, try to see if we already have this sentinel. */
1334 port = atoi(token[1]);
1335 canfailover = atoi(token[3]);
1336 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1337 ri->sentinels,token[0],port,token[2]);
1338
1339 if (!sentinel) {
1340 /* If not, remove all the sentinels that have the same runid
1341 * OR the same ip/port, because it's either a restart or a
1342 * network topology change. */
1343 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1344 token[2]);
1345 if (removed) {
1346 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1347 "%@ #duplicate of %s:%d or %s",
1348 token[0],port,token[2]);
1349 }
1350
1351 /* Add the new sentinel. */
1352 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1353 token[0],port,ri->quorum,ri);
1354 if (sentinel) {
1355 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1356 /* The runid is NULL after a new instance creation and
1357 * for Sentinels we don't have a later chance to fill it,
1358 * so do it now. */
1359 sentinel->runid = sdsnew(token[2]);
1360 }
1361 }
1362
1363 /* Update the state of the Sentinel. */
1364 if (sentinel) {
1365 sentinel->last_hello_time = mstime();
1366 if (canfailover)
1367 sentinel->flags |= SRI_CAN_FAILOVER;
1368 else
1369 sentinel->flags &= ~SRI_CAN_FAILOVER;
1370 }
1371 }
1372 sdsfreesplitres(token,numtokens);
1373 }
1374 }
1375
1376 void sentinelPingInstance(sentinelRedisInstance *ri) {
1377 mstime_t now = mstime();
1378 mstime_t info_period;
1379 int retval;
1380
1381 /* Return ASAP if we have already a PING or INFO already pending, or
1382 * in the case the instance is not properly connected. */
1383 if (ri->flags & SRI_DISCONNECTED) return;
1384
1385 /* For INFO, PING, PUBLISH that are not critical commands to send we
1386 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1387 * want to use a lot of memory just because a link is not working
1388 * properly (note that anyway there is a redundant protection about this,
1389 * that is, the link will be disconnected and reconnected if a long
1390 * timeout condition is detected. */
1391 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1392
1393 /* If this is a slave of a master in O_DOWN condition we start sending
1394 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1395 * period. In this state we want to closely monitor slaves in case they
1396 * are turned into masters by another Sentinel, or by the sysadmin. */
1397 if ((ri->flags & SRI_SLAVE) &&
1398 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1399 info_period = 1000;
1400 } else {
1401 info_period = SENTINEL_INFO_PERIOD;
1402 }
1403
1404 if ((ri->flags & SRI_SENTINEL) == 0 &&
1405 (ri->info_refresh == 0 ||
1406 (now - ri->info_refresh) > info_period))
1407 {
1408 /* Send INFO to masters and slaves, not sentinels. */
1409 retval = redisAsyncCommand(ri->cc,
1410 sentinelInfoReplyCallback, NULL, "INFO");
1411 if (retval != REDIS_OK) return;
1412 ri->pending_commands++;
1413 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1414 /* Send PING to all the three kinds of instances. */
1415 retval = redisAsyncCommand(ri->cc,
1416 sentinelPingReplyCallback, NULL, "PING");
1417 if (retval != REDIS_OK) return;
1418 ri->pending_commands++;
1419 } else if ((ri->flags & SRI_MASTER) &&
1420 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1421 {
1422 /* PUBLISH hello messages only to masters. */
1423 struct sockaddr_in sa;
1424 socklen_t salen = sizeof(sa);
1425
1426 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1427 char myaddr[128];
1428
1429 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1430 inet_ntoa(sa.sin_addr), server.port, server.runid,
1431 (ri->flags & SRI_CAN_FAILOVER) != 0);
1432 retval = redisAsyncCommand(ri->cc,
1433 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1434 SENTINEL_HELLO_CHANNEL,myaddr);
1435 if (retval != REDIS_OK) return;
1436 ri->pending_commands++;
1437 }
1438 }
1439 }
1440
1441 /* =========================== SENTINEL command ============================= */
1442
1443 const char *sentinelFailoverStateStr(int state) {
1444 switch(state) {
1445 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1446 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1447 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1448 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1449 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1450 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1451 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1452 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1453 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1454 default: return "unknown";
1455 }
1456 }
1457
1458 /* Redis instance to Redis protocol representation. */
1459 void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1460 char *flags = sdsempty();
1461 void *mbl;
1462 int fields = 0;
1463
1464 mbl = addDeferredMultiBulkLength(c);
1465
1466 addReplyBulkCString(c,"name");
1467 addReplyBulkCString(c,ri->name);
1468 fields++;
1469
1470 addReplyBulkCString(c,"ip");
1471 addReplyBulkCString(c,ri->addr->ip);
1472 fields++;
1473
1474 addReplyBulkCString(c,"port");
1475 addReplyBulkLongLong(c,ri->addr->port);
1476 fields++;
1477
1478 addReplyBulkCString(c,"runid");
1479 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1480 fields++;
1481
1482 addReplyBulkCString(c,"flags");
1483 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1484 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1485 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1486 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1487 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1488 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1489 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1490 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1491 flags = sdscat(flags,"failover_in_progress,");
1492 if (ri->flags & SRI_I_AM_THE_LEADER)
1493 flags = sdscat(flags,"i_am_the_leader,");
1494 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1495 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1496 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1497 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1498
1499 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1500 addReplyBulkCString(c,flags);
1501 sdsfree(flags);
1502 fields++;
1503
1504 addReplyBulkCString(c,"pending-commands");
1505 addReplyBulkLongLong(c,ri->pending_commands);
1506 fields++;
1507
1508 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1509 addReplyBulkCString(c,"failover-state");
1510 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1511 fields++;
1512 }
1513
1514 addReplyBulkCString(c,"last-ok-ping-reply");
1515 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1516 fields++;
1517
1518 addReplyBulkCString(c,"last-ping-reply");
1519 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1520 fields++;
1521
1522 if (ri->flags & SRI_S_DOWN) {
1523 addReplyBulkCString(c,"s-down-time");
1524 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1525 fields++;
1526 }
1527
1528 if (ri->flags & SRI_O_DOWN) {
1529 addReplyBulkCString(c,"o-down-time");
1530 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1531 fields++;
1532 }
1533
1534 /* Masters and Slaves */
1535 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1536 addReplyBulkCString(c,"info-refresh");
1537 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1538 fields++;
1539 }
1540
1541 /* Only masters */
1542 if (ri->flags & SRI_MASTER) {
1543 addReplyBulkCString(c,"num-slaves");
1544 addReplyBulkLongLong(c,dictSize(ri->slaves));
1545 fields++;
1546
1547 addReplyBulkCString(c,"num-other-sentinels");
1548 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1549 fields++;
1550
1551 addReplyBulkCString(c,"quorum");
1552 addReplyBulkLongLong(c,ri->quorum);
1553 fields++;
1554 }
1555
1556 /* Only slaves */
1557 if (ri->flags & SRI_SLAVE) {
1558 addReplyBulkCString(c,"master-link-down-time");
1559 addReplyBulkLongLong(c,ri->master_link_down_time);
1560 fields++;
1561
1562 addReplyBulkCString(c,"master-link-status");
1563 addReplyBulkCString(c,
1564 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1565 "ok" : "err");
1566 fields++;
1567
1568 addReplyBulkCString(c,"master-host");
1569 addReplyBulkCString(c,
1570 ri->slave_master_host ? ri->slave_master_host : "?");
1571 fields++;
1572
1573 addReplyBulkCString(c,"master-port");
1574 addReplyBulkLongLong(c,ri->slave_master_port);
1575 fields++;
1576 }
1577
1578 /* Only sentinels */
1579 if (ri->flags & SRI_SENTINEL) {
1580 addReplyBulkCString(c,"last-hello-message");
1581 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1582 fields++;
1583
1584 addReplyBulkCString(c,"can-failover-its-master");
1585 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1586 fields++;
1587
1588 if (ri->flags & SRI_MASTER_DOWN) {
1589 addReplyBulkCString(c,"subjective-leader");
1590 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1591 fields++;
1592 }
1593 }
1594
1595 setDeferredMultiBulkLength(c,mbl,fields*2);
1596 }
1597
1598 /* Output a number of instances contanined inside a dictionary as
1599 * Redis protocol. */
1600 void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1601 dictIterator *di;
1602 dictEntry *de;
1603
1604 di = dictGetIterator(instances);
1605 addReplyMultiBulkLen(c,dictSize(instances));
1606 while((de = dictNext(di)) != NULL) {
1607 sentinelRedisInstance *ri = dictGetVal(de);
1608
1609 addReplySentinelRedisInstance(c,ri);
1610 }
1611 dictReleaseIterator(di);
1612 }
1613
1614 /* Lookup the named master into sentinel.masters.
1615 * If the master is not found reply to the client with an error and returns
1616 * NULL. */
1617 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1618 robj *name)
1619 {
1620 sentinelRedisInstance *ri;
1621
1622 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1623 if (!ri) {
1624 addReplyError(c,"No such master with that name");
1625 return NULL;
1626 }
1627 return ri;
1628 }
1629
1630 void sentinelCommand(redisClient *c) {
1631 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1632 /* SENTINEL MASTERS */
1633 if (c->argc != 2) goto numargserr;
1634
1635 addReplyDictOfRedisInstances(c,sentinel.masters);
1636 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1637 /* SENTINEL SLAVES <master-name> */
1638 sentinelRedisInstance *ri;
1639
1640 if (c->argc != 3) goto numargserr;
1641 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1642 return;
1643 addReplyDictOfRedisInstances(c,ri->slaves);
1644 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1645 /* SENTINEL SENTINELS <master-name> */
1646 sentinelRedisInstance *ri;
1647
1648 if (c->argc != 3) goto numargserr;
1649 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1650 return;
1651 addReplyDictOfRedisInstances(c,ri->sentinels);
1652 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1653 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1654 sentinelRedisInstance *ri;
1655 char *leader = NULL;
1656 long port;
1657 int isdown = 0;
1658
1659 if (c->argc != 4) goto numargserr;
1660 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
1661 return;
1662 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
1663 c->argv[2]->ptr,port,NULL);
1664
1665 /* It exists? Is actually a master? Is subjectively down? It's down.
1666 * Note: if we are in tilt mode we always reply with "0". */
1667 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
1668 (ri->flags & SRI_MASTER))
1669 isdown = 1;
1670 if (ri) leader = sentinelGetSubjectiveLeader(ri);
1671
1672 /* Reply with a two-elements multi-bulk reply: down state, leader. */
1673 addReplyMultiBulkLen(c,2);
1674 addReply(c, isdown ? shared.cone : shared.czero);
1675 addReplyBulkCString(c, leader ? leader : "?");
1676 if (leader) sdsfree(leader);
1677 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
1678 /* SENTINEL RESET <pattern> */
1679 if (c->argc != 3) goto numargserr;
1680 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
1681 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
1682 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
1683 sentinelRedisInstance *ri;
1684
1685 if (c->argc != 3) goto numargserr;
1686 ri = sentinelGetMasterByName(c->argv[2]->ptr);
1687 if (ri == NULL) {
1688 addReply(c,shared.nullmultibulk);
1689 } else {
1690 sentinelAddr *addr = ri->addr;
1691
1692 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
1693 addr = ri->promoted_slave->addr;
1694 addReplyMultiBulkLen(c,2);
1695 addReplyBulkCString(c,addr->ip);
1696 addReplyBulkLongLong(c,addr->port);
1697 }
1698 } else {
1699 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
1700 (char*)c->argv[1]->ptr);
1701 }
1702 return;
1703
1704 numargserr:
1705 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
1706 (char*)c->argv[1]->ptr);
1707 }
1708
1709 /* ===================== SENTINEL availability checks ======================= */
1710
1711 /* Is this instance down from our point of view? */
1712 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
1713 mstime_t elapsed = mstime() - ri->last_avail_time;
1714
1715 /* Check if we are in need for a reconnection of one of the
1716 * links, because we are detecting low activity.
1717 *
1718 * 1) Check if the command link seems connected, was connected not less
1719 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
1720 * idle time that is greater than down_after_period / 2 seconds. */
1721 if (ri->cc &&
1722 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1723 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
1724 {
1725 sentinelKillLink(ri,ri->cc);
1726 }
1727
1728 /* 2) Check if the pubsub link seems connected, was connected not less
1729 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
1730 * activity in the Pub/Sub channel for more than
1731 * SENTINEL_PUBLISH_PERIOD * 3.
1732 */
1733 if (ri->pc &&
1734 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1735 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
1736 {
1737 sentinelKillLink(ri,ri->pc);
1738 }
1739
1740 /* Update the subjectively down flag. */
1741 if (elapsed > ri->down_after_period) {
1742 /* Is subjectively down */
1743 if ((ri->flags & SRI_S_DOWN) == 0) {
1744 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
1745 ri->s_down_since_time = mstime();
1746 ri->flags |= SRI_S_DOWN;
1747 }
1748 } else {
1749 /* Is subjectively up */
1750 if (ri->flags & SRI_S_DOWN) {
1751 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
1752 ri->flags &= ~SRI_S_DOWN;
1753 }
1754 }
1755 }
1756
1757 /* Is this instance down accordingly to the configured quorum? */
1758 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
1759 dictIterator *di;
1760 dictEntry *de;
1761 int quorum = 0, odown = 0;
1762
1763 if (master->flags & SRI_S_DOWN) {
1764 /* Is down for enough sentinels? */
1765 quorum = 1; /* the current sentinel. */
1766 /* Count all the other sentinels. */
1767 di = dictGetIterator(master->sentinels);
1768 while((de = dictNext(di)) != NULL) {
1769 sentinelRedisInstance *ri = dictGetVal(de);
1770
1771 if (ri->flags & SRI_MASTER_DOWN) quorum++;
1772 }
1773 dictReleaseIterator(di);
1774 if (quorum >= master->quorum) odown = 1;
1775 }
1776
1777 /* Set the flag accordingly to the outcome. */
1778 if (odown) {
1779 if ((master->flags & SRI_O_DOWN) == 0) {
1780 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
1781 quorum, master->quorum);
1782 master->flags |= SRI_O_DOWN;
1783 master->o_down_since_time = mstime();
1784 }
1785 } else {
1786 if (master->flags & SRI_O_DOWN) {
1787 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
1788 master->flags &= ~SRI_O_DOWN;
1789 }
1790 }
1791 }
1792
1793 /* Receive the SENTINEL is-master-down-by-addr reply, see the
1794 * sentinelAskMasterStateToOtherSentinels() function for more information. */
1795 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
1796 sentinelRedisInstance *ri = c->data;
1797 redisReply *r;
1798
1799 if (ri) ri->pending_commands--;
1800 if (!reply || !ri) return;
1801 r = reply;
1802
1803 /* Ignore every error or unexpected reply.
1804 * Note that if the command returns an error for any reason we'll
1805 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
1806 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
1807 r->element[0]->type == REDIS_REPLY_INTEGER &&
1808 r->element[1]->type == REDIS_REPLY_STRING)
1809 {
1810 ri->last_master_down_reply_time = mstime();
1811 if (r->element[0]->integer == 1) {
1812 ri->flags |= SRI_MASTER_DOWN;
1813 } else {
1814 ri->flags &= ~SRI_MASTER_DOWN;
1815 }
1816 sdsfree(ri->leader);
1817 ri->leader = sdsnew(r->element[1]->str);
1818 }
1819 }
1820
1821 /* If we think (subjectively) the master is down, we start sending
1822 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
1823 * in order to get the replies that allow to reach the quorum and
1824 * possibly also mark the master as objectively down. */
1825 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
1826 dictIterator *di;
1827 dictEntry *de;
1828
1829 di = dictGetIterator(master->sentinels);
1830 while((de = dictNext(di)) != NULL) {
1831 sentinelRedisInstance *ri = dictGetVal(de);
1832 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
1833 char port[32];
1834 int retval;
1835
1836 /* If the master state from other sentinel is too old, we clear it. */
1837 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
1838 ri->flags &= ~SRI_MASTER_DOWN;
1839 sdsfree(ri->leader);
1840 ri->leader = NULL;
1841 }
1842
1843 /* Only ask if master is down to other sentinels if:
1844 *
1845 * 1) We believe it is down, or there is a failover in progress.
1846 * 2) Sentinel is connected.
1847 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
1848 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
1849 continue;
1850 if (ri->flags & SRI_DISCONNECTED) continue;
1851 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
1852 continue;
1853
1854 /* Ask */
1855 ll2string(port,sizeof(port),master->addr->port);
1856 retval = redisAsyncCommand(ri->cc,
1857 sentinelReceiveIsMasterDownReply, NULL,
1858 "SENTINEL is-master-down-by-addr %s %s",
1859 master->addr->ip, port);
1860 if (retval == REDIS_OK) ri->pending_commands++;
1861 }
1862 dictReleaseIterator(di);
1863 }
1864
1865 /* =============================== FAILOVER ================================= */
1866
1867 /* Given a master get the "subjective leader", that is, among all the sentinels
1868 * with given characteristics, the one with the lexicographically smaller
1869 * runid. The characteristics required are:
1870 *
1871 * 1) Has SRI_CAN_FAILOVER flag.
1872 * 2) Is not disconnected.
1873 * 3) Recently answered to our ping (no longer than
1874 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
1875 *
1876 * The function returns a pointer to an sds string representing the runid of the
1877 * leader sentinel instance (from our point of view). Otherwise NULL is
1878 * returned if there are no suitable sentinels.
1879 */
1880
1881 int compareRunID(const void *a, const void *b) {
1882 char **aptrptr = (char**)a, **bptrptr = (char**)b;
1883 return strcasecmp(*aptrptr, *bptrptr);
1884 }
1885
1886 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
1887 dictIterator *di;
1888 dictEntry *de;
1889 char **instance =
1890 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
1891 int instances = 0;
1892 char *leader = NULL;
1893
1894 if (master->flags & SRI_CAN_FAILOVER) {
1895 /* Add myself if I'm a Sentinel that can failover this master. */
1896 instance[instances++] = server.runid;
1897 }
1898
1899 di = dictGetIterator(master->sentinels);
1900 while((de = dictNext(di)) != NULL) {
1901 sentinelRedisInstance *ri = dictGetVal(de);
1902 mstime_t lag = mstime() - ri->last_avail_time;
1903
1904 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
1905 !(ri->flags & SRI_CAN_FAILOVER) ||
1906 (ri->flags & SRI_DISCONNECTED) ||
1907 ri->runid == NULL)
1908 continue;
1909 instance[instances++] = ri->runid;
1910 }
1911 dictReleaseIterator(di);
1912
1913 /* If we have at least one instance passing our checks, order the array
1914 * by runid. */
1915 if (instances) {
1916 qsort(instance,instances,sizeof(char*),compareRunID);
1917 leader = sdsnew(instance[0]);
1918 }
1919 zfree(instance);
1920 return leader;
1921 }
1922
1923 struct sentinelLeader {
1924 char *runid;
1925 unsigned long votes;
1926 };
1927
1928 /* Helper function for sentinelGetObjectiveLeader, increment the counter
1929 * relative to the specified runid. */
1930 void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
1931 dictEntry *de = dictFind(counters,runid);
1932 uint64_t oldval;
1933
1934 if (de) {
1935 oldval = dictGetUnsignedIntegerVal(de);
1936 dictSetUnsignedIntegerVal(de,oldval+1);
1937 } else {
1938 de = dictAddRaw(counters,runid);
1939 redisAssert(de != NULL);
1940 dictSetUnsignedIntegerVal(de,1);
1941 }
1942 }
1943
1944 /* Scan all the Sentinels attached to this master to check what is the
1945 * most voted leader among Sentinels. */
1946 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
1947 dict *counters;
1948 dictIterator *di;
1949 dictEntry *de;
1950 unsigned int voters = 0, voters_quorum;
1951 char *myvote;
1952 char *winner = NULL;
1953
1954 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
1955 counters = dictCreate(&leaderVotesDictType,NULL);
1956
1957 /* Count my vote. */
1958 myvote = sentinelGetSubjectiveLeader(master);
1959 if (myvote) {
1960 sentinelObjectiveLeaderIncr(counters,myvote);
1961 voters++;
1962 }
1963
1964 /* Count other sentinels votes */
1965 di = dictGetIterator(master->sentinels);
1966 while((de = dictNext(di)) != NULL) {
1967 sentinelRedisInstance *ri = dictGetVal(de);
1968 if (ri->leader == NULL) continue;
1969 /* If the failover is not already in progress we are only interested
1970 * in Sentinels that believe the master is down. Otherwise the leader
1971 * selection is useful for the "failover-takedown" when the original
1972 * leader fails. In that case we consider all the voters. */
1973 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1974 !(ri->flags & SRI_MASTER_DOWN)) continue;
1975 sentinelObjectiveLeaderIncr(counters,ri->leader);
1976 voters++;
1977 }
1978 dictReleaseIterator(di);
1979 voters_quorum = voters/2+1;
1980
1981 /* Check what's the winner. For the winner to win, it needs two conditions:
1982 * 1) Absolute majority between voters (50% + 1).
1983 * 2) And anyway at least master->quorum votes. */
1984 {
1985 uint64_t max_votes = 0; /* Max votes so far. */
1986
1987 di = dictGetIterator(counters);
1988 while((de = dictNext(di)) != NULL) {
1989 uint64_t votes = dictGetUnsignedIntegerVal(de);
1990
1991 if (max_votes < votes) {
1992 max_votes = votes;
1993 winner = dictGetKey(de);
1994 }
1995 }
1996 dictReleaseIterator(di);
1997 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
1998 winner = NULL;
1999 }
2000 winner = winner ? sdsnew(winner) : NULL;
2001 sdsfree(myvote);
2002 dictRelease(counters);
2003 return winner;
2004 }
2005
2006 /* This function checks if there are the conditions to start the failover,
2007 * that is:
2008 *
2009 * 1) Enough time has passed since O_DOWN.
2010 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
2011 * 3) We are the objectively leader for this master.
2012 *
2013 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
2014 * and SRI_I_AM_THE_LEADER.
2015 */
2016 void sentinelStartFailover(sentinelRedisInstance *master) {
2017 char *leader;
2018 int isleader;
2019
2020 /* We can't failover if the master is not in O_DOWN state or if
2021 * there is not already a failover in progress (to perform the
2022 * takedown if the leader died) or if this Sentinel is not allowed
2023 * to start a failover. */
2024 if (!(master->flags & SRI_CAN_FAILOVER) ||
2025 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
2026
2027 leader = sentinelGetObjectiveLeader(master);
2028 isleader = leader && strcasecmp(leader,server.runid) == 0;
2029 sdsfree(leader);
2030
2031 /* If I'm not the leader, I can't failover for sure. */
2032 if (!isleader) return;
2033
2034 /* If the failover is already in progress there are two options... */
2035 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
2036 if (master->flags & SRI_I_AM_THE_LEADER) {
2037 /* 1) I'm flagged as leader so I already started the failover.
2038 * Just return. */
2039 return;
2040 } else {
2041 mstime_t elapsed = mstime() - master->failover_state_change_time;
2042
2043 /* 2) I'm the new leader, but I'm not flagged as leader in the
2044 * master: I did not started the failover, but the original
2045 * leader has no longer the leadership.
2046 *
2047 * In this case if the failover appears to be lagging
2048 * for at least 25% of the configured failover timeout,
2049 * I can assume I can take control. Otherwise
2050 * it's better to return and wait more. */
2051 if (elapsed < (master->failover_timeout/4)) return;
2052 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
2053 /* We have already an elected slave if we are in
2054 * FAILOVER_IN_PROGRESS state, that is, the slave that we
2055 * observed turning into a master. */
2056 master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
2057 /* As an observer we flagged all the slaves as RECONF_SENT but
2058 * now we are in charge of actually sending the reconfiguration
2059 * command so let's clear this flag for all the instances. */
2060 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
2061 SRI_RECONF_SENT);
2062 }
2063 } else {
2064 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set. */
2065 master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
2066 }
2067
2068 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
2069 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
2070
2071 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
2072 * a recovery of a failover started by another sentinel. */
2073 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
2074 master->failover_start_time = mstime() +
2075 SENTINEL_FAILOVER_FIXED_DELAY +
2076 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
2077 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
2078 "%@ #starting in %lld milliseconds",
2079 master->failover_start_time-mstime());
2080 }
2081 master->failover_state_change_time = mstime();
2082 }
2083
2084 /* Select a suitable slave to promote. The current algorithm only uses
2085 * the following parameters:
2086 *
2087 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
2088 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
2089 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
2090 * 4) master_link_down_time no more than:
2091 * (now - master->s_down_since_time) + (master->down_after_period * 10).
2092 *
2093 * Among all the slaves matching the above conditions we select the slave
2094 * with lower slave_priority. If priority is the same we select the slave
2095 * with lexicographically smaller runid.
2096 *
2097 * The function returns the pointer to the selected slave, otherwise
2098 * NULL if no suitable slave was found.
2099 */
2100
2101 int compareSlavesForPromotion(const void *a, const void *b) {
2102 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
2103 **sb = (sentinelRedisInstance **)b;
2104 if ((*sa)->slave_priority != (*sb)->slave_priority)
2105 return (*sa)->slave_priority - (*sb)->slave_priority;
2106 return strcasecmp((*sa)->runid,(*sb)->runid);
2107 }
2108
2109 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
2110 sentinelRedisInstance **instance =
2111 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
2112 sentinelRedisInstance *selected = NULL;
2113 int instances = 0;
2114 dictIterator *di;
2115 dictEntry *de;
2116 mstime_t max_master_down_time;
2117
2118 max_master_down_time = (mstime() - master->s_down_since_time) +
2119 (master->down_after_period * 10);
2120
2121 di = dictGetIterator(master->slaves);
2122 while((de = dictNext(di)) != NULL) {
2123 sentinelRedisInstance *slave = dictGetVal(de);
2124 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2125
2126 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2127 if (slave->last_avail_time < info_validity_time) continue;
2128 if (slave->info_refresh < info_validity_time) continue;
2129 if (slave->master_link_down_time > max_master_down_time) continue;
2130 instance[instances++] = slave;
2131 }
2132 dictReleaseIterator(di);
2133 if (instances) {
2134 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2135 compareSlavesForPromotion);
2136 selected = instance[0];
2137 }
2138 zfree(instance);
2139 return selected;
2140 }
2141
2142 /* ---------------- Failover state machine implementation ------------------- */
2143 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2144 if (mstime() >= ri->failover_start_time) {
2145 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2146 ri->failover_state_change_time = mstime();
2147 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2148 }
2149 }
2150
2151 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2152 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2153
2154 if (slave == NULL) {
2155 sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
2156 sentinelAbortFailover(ri);
2157 } else {
2158 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2159 slave->flags |= SRI_PROMOTED;
2160 ri->promoted_slave = slave;
2161 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2162 ri->failover_state_change_time = mstime();
2163 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2164 slave, "%@");
2165 }
2166 }
2167
2168 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2169 int retval;
2170
2171 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2172
2173 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2174 * We actually register a generic callback for this command as we don't
2175 * really care about the reply. We check if it worked indirectly observing
2176 * if INFO returns a different role (master instead of slave). */
2177 retval = redisAsyncCommand(ri->promoted_slave->cc,
2178 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2179 if (retval != REDIS_OK) return;
2180 ri->promoted_slave->pending_commands++;
2181 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2182 ri->promoted_slave,"%@");
2183 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2184 ri->failover_state_change_time = mstime();
2185 }
2186
2187 /* We actually wait for promotion indirectly checking with INFO when the
2188 * slave turns into a master. */
2189 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2190 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2191
2192 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2193 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2194 "%@");
2195 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2196 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2197 ri->failover_state_change_time = mstime();
2198 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2199 ri->promoted_slave = NULL;
2200 }
2201 }
2202
2203 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2204 int not_reconfigured = 0, timeout = 0;
2205 dictIterator *di;
2206 dictEntry *de;
2207 mstime_t elapsed = mstime() - master->failover_state_change_time;
2208
2209 /* We can't consider failover finished if the promoted slave is
2210 * not reachable. */
2211 if (master->promoted_slave == NULL ||
2212 master->promoted_slave->flags & SRI_S_DOWN) return;
2213
2214 /* The failover terminates once all the reachable slaves are properly
2215 * configured. */
2216 di = dictGetIterator(master->slaves);
2217 while((de = dictNext(di)) != NULL) {
2218 sentinelRedisInstance *slave = dictGetVal(de);
2219
2220 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2221 if (slave->flags & SRI_S_DOWN) continue;
2222 not_reconfigured++;
2223 }
2224 dictReleaseIterator(di);
2225
2226 /* Force end of failover on timeout. */
2227 if (elapsed > master->failover_timeout) {
2228 not_reconfigured = 0;
2229 timeout = 1;
2230 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2231 }
2232
2233 if (not_reconfigured == 0) {
2234 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2235 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2236 master->failover_state_change_time = mstime();
2237 }
2238
2239 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2240 * command to all the slaves still not reconfigured to replicate with
2241 * the new master. */
2242 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2243 dictIterator *di;
2244 dictEntry *de;
2245 char master_port[32];
2246
2247 ll2string(master_port,sizeof(master_port),
2248 master->promoted_slave->addr->port);
2249
2250 di = dictGetIterator(master->slaves);
2251 while((de = dictNext(di)) != NULL) {
2252 sentinelRedisInstance *slave = dictGetVal(de);
2253 int retval;
2254
2255 if (slave->flags &
2256 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2257
2258 retval = redisAsyncCommand(slave->cc,
2259 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2260 master->promoted_slave->addr->ip,
2261 master_port);
2262 if (retval == REDIS_OK) {
2263 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2264 slave->flags |= SRI_RECONF_SENT;
2265 }
2266 }
2267 dictReleaseIterator(di);
2268 }
2269 }
2270
2271 /* Send SLAVE OF <new master address> to all the remaining slaves that
2272 * still don't appear to have the configuration updated. */
2273 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2274 dictIterator *di;
2275 dictEntry *de;
2276 int in_progress = 0;
2277
2278 di = dictGetIterator(master->slaves);
2279 while((de = dictNext(di)) != NULL) {
2280 sentinelRedisInstance *slave = dictGetVal(de);
2281
2282 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2283 in_progress++;
2284 }
2285 dictReleaseIterator(di);
2286
2287 di = dictGetIterator(master->slaves);
2288 while(in_progress < master->parallel_syncs &&
2289 (de = dictNext(di)) != NULL)
2290 {
2291 sentinelRedisInstance *slave = dictGetVal(de);
2292 int retval;
2293 char master_port[32];
2294
2295 /* Skip the promoted slave, and already configured slaves. */
2296 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2297
2298 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2299 * the slave moving forward to the next state. */
2300 if ((slave->flags & SRI_RECONF_SENT) &&
2301 (mstime() - slave->slave_reconf_sent_time) >
2302 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2303 {
2304 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2305 slave->flags &= ~SRI_RECONF_SENT;
2306 }
2307
2308 /* Nothing to do for instances that are disconnected or already
2309 * in RECONF_SENT state. */
2310 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2311 continue;
2312
2313 /* Send SLAVEOF <new master>. */
2314 ll2string(master_port,sizeof(master_port),
2315 master->promoted_slave->addr->port);
2316 retval = redisAsyncCommand(slave->cc,
2317 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2318 master->promoted_slave->addr->ip,
2319 master_port);
2320 if (retval == REDIS_OK) {
2321 slave->flags |= SRI_RECONF_SENT;
2322 slave->pending_commands++;
2323 slave->slave_reconf_sent_time = mstime();
2324 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2325 in_progress++;
2326 }
2327 }
2328 dictReleaseIterator(di);
2329 sentinelFailoverDetectEnd(master);
2330 }
2331
2332 /* This function is called when the slave is in
2333 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2334 * to remove it from the master table and add the promoted slave instead.
2335 *
2336 * If there are no promoted slaves as this instance is unique, we remove
2337 * and re-add it with the same address to trigger a complete state
2338 * refresh. */
2339 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2340 sentinelRedisInstance *ref = master->promoted_slave ?
2341 master->promoted_slave : master;
2342
2343 sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
2344 master->name, master->addr->ip, master->addr->port,
2345 ref->addr->ip, ref->addr->port);
2346
2347 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
2348 }
2349
2350 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2351 redisAssert(ri->flags & SRI_MASTER);
2352
2353 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2354
2355 switch(ri->failover_state) {
2356 case SENTINEL_FAILOVER_STATE_WAIT_START:
2357 sentinelFailoverWaitStart(ri);
2358 break;
2359 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2360 sentinelFailoverSelectSlave(ri);
2361 break;
2362 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2363 sentinelFailoverSendSlaveOfNoOne(ri);
2364 break;
2365 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2366 sentinelFailoverWaitPromotion(ri);
2367 break;
2368 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2369 sentinelFailoverReconfNextSlave(ri);
2370 break;
2371 case SENTINEL_FAILOVER_STATE_DETECT_END:
2372 sentinelFailoverDetectEnd(ri);
2373 break;
2374 }
2375 }
2376
2377 /* Abort a failover in progress with the following steps:
2378 * 1) If this instance is the leaer send a SLAVEOF command to all the already
2379 * reconfigured slaves if any to configure them to replicate with the
2380 * original master.
2381 * 2) For both leaders and observers: clear the failover flags and state in
2382 * the master instance.
2383 * 3) If there is already a promoted slave and we are the leader, and this
2384 * slave is not DISCONNECTED, try to reconfigure it to replicate
2385 * back to the master as well, sending a best effort SLAVEOF command.
2386 */
2387 void sentinelAbortFailover(sentinelRedisInstance *ri) {
2388 char master_port[32];
2389 dictIterator *di;
2390 dictEntry *de;
2391
2392 redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
2393 ll2string(master_port,sizeof(master_port),ri->addr->port);
2394
2395 /* Clear failover related flags from slaves.
2396 * Also if we are the leader make sure to send SLAVEOF commands to all the
2397 * already reconfigured slaves in order to turn them back into slaves of
2398 * the original master. */
2399 di = dictGetIterator(ri->slaves);
2400 while((de = dictNext(di)) != NULL) {
2401 sentinelRedisInstance *slave = dictGetVal(de);
2402 if ((ri->flags & SRI_I_AM_THE_LEADER) &&
2403 !(slave->flags & SRI_DISCONNECTED) &&
2404 (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
2405 SRI_RECONF_DONE)))
2406 {
2407 int retval;
2408
2409 retval = redisAsyncCommand(slave->cc,
2410 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2411 ri->addr->ip,
2412 master_port);
2413 if (retval == REDIS_OK)
2414 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2415 }
2416 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2417 }
2418 dictReleaseIterator(di);
2419
2420 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER);
2421 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2422 ri->failover_state_change_time = mstime();
2423 if (ri->promoted_slave) {
2424 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2425 ri->promoted_slave = NULL;
2426 }
2427 }
2428
2429 /* The following is called only for master instances and will abort the
2430 * failover process if:
2431 *
2432 * 1) The failover is in progress.
2433 * 2) We already promoted a slave.
2434 * 3) The promoted slave is in extended SDOWN condition.
2435 */
2436 void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2437 /* Failover is in progress? Do we have a promoted slave? */
2438 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2439
2440 /* Is the promoted slave into an extended SDOWN state? */
2441 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2442 (mstime() - ri->promoted_slave->s_down_since_time) <
2443 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2444
2445 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2446 sentinelAbortFailover(ri);
2447 }
2448
2449 /* ======================== SENTINEL timer handler ==========================
2450 * This is the "main" our Sentinel, being sentinel completely non blocking
2451 * in design. The function is called every second.
2452 * -------------------------------------------------------------------------- */
2453
2454 /* Perform scheduled operations for the specified Redis instance. */
2455 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2456 /* ========== MONITORING HALF ============ */
2457 /* Every kind of instance */
2458 sentinelReconnectInstance(ri);
2459 sentinelPingInstance(ri);
2460
2461 /* Masters and slaves */
2462 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2463 /* Nothing so far. */
2464 }
2465
2466 /* Only masters */
2467 if (ri->flags & SRI_MASTER) {
2468 sentinelAskMasterStateToOtherSentinels(ri);
2469 }
2470
2471 /* ============== ACTING HALF ============= */
2472 /* We don't proceed with the acting half if we are in TILT mode.
2473 * TILT happens when we find something odd with the time, like a
2474 * sudden change in the clock. */
2475 if (sentinel.tilt) {
2476 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2477 sentinel.tilt = 0;
2478 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2479 }
2480
2481 /* Every kind of instance */
2482 sentinelCheckSubjectivelyDown(ri);
2483
2484 /* Masters and slaves */
2485 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2486 /* Nothing so far. */
2487 }
2488
2489 /* Only masters */
2490 if (ri->flags & SRI_MASTER) {
2491 sentinelCheckObjectivelyDown(ri);
2492 sentinelStartFailover(ri);
2493 sentinelFailoverStateMachine(ri);
2494 sentinelAbortFailoverIfNeeded(ri);
2495 }
2496 }
2497
2498 /* Perform scheduled operations for all the instances in the dictionary.
2499 * Recursively call the function against dictionaries of slaves. */
2500 void sentinelHandleDictOfRedisInstances(dict *instances) {
2501 dictIterator *di;
2502 dictEntry *de;
2503 sentinelRedisInstance *switch_to_promoted = NULL;
2504
2505 /* There are a number of things we need to perform against every master. */
2506 di = dictGetIterator(instances);
2507 while((de = dictNext(di)) != NULL) {
2508 sentinelRedisInstance *ri = dictGetVal(de);
2509
2510 sentinelHandleRedisInstance(ri);
2511 if (ri->flags & SRI_MASTER) {
2512 sentinelHandleDictOfRedisInstances(ri->slaves);
2513 sentinelHandleDictOfRedisInstances(ri->sentinels);
2514 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
2515 switch_to_promoted = ri;
2516 }
2517 }
2518 }
2519 if (switch_to_promoted)
2520 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
2521 dictReleaseIterator(di);
2522 }
2523
2524 /* This function checks if we need to enter the TITL mode.
2525 *
2526 * The TILT mode is entered if we detect that between two invocations of the
2527 * timer interrupt, a negative amount of time, or too much time has passed.
2528 * Note that we expect that more or less just 100 milliseconds will pass
2529 * if everything is fine. However we'll see a negative number or a
2530 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
2531 * following conditions happen:
2532 *
2533 * 1) The Sentiel process for some time is blocked, for every kind of
2534 * random reason: the load is huge, the computer was freezed for some time
2535 * in I/O or alike, the process was stopped by a signal. Everything.
2536 * 2) The system clock was altered significantly.
2537 *
2538 * Under both this conditions we'll see everything as timed out and failing
2539 * without good reasons. Instead we enter the TILT mode and wait
2540 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
2541 *
2542 * During TILT time we still collect information, we just do not act. */
2543 void sentinelCheckTiltCondition(void) {
2544 mstime_t now = mstime();
2545 mstime_t delta = now - sentinel.previous_time;
2546
2547 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
2548 sentinel.tilt = 1;
2549 sentinel.tilt_start_time = mstime();
2550 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
2551 }
2552 sentinel.previous_time = mstime();
2553 }
2554
2555 /* Handle terminated childs resulting from calls to notifications and client
2556 * reconfigurations scripts. */
2557 void sentinelHandleChildren(void) {
2558 int statloc;
2559 pid_t pid;
2560
2561 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
2562 int exitcode = WEXITSTATUS(statloc);
2563 int bysignal = 0;
2564
2565 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
2566 sentinelEvent(REDIS_DEBUG,"-child",NULL,"%ld %d %d",
2567 (long)pid, exitcode, bysignal);
2568
2569 /* TODO: remove client reconfiguration scripts from the queue. */
2570 }
2571 }
2572
2573 void sentinelTimer(void) {
2574 sentinelCheckTiltCondition();
2575 sentinelHandleDictOfRedisInstances(sentinel.masters);
2576 sentinelHandleChildren();
2577 }
2578