]> git.saurik.com Git - redis.git/blob - src/sentinel.c
Fix warning in redis.c for sentinel config load
[redis.git] / src / sentinel.c
1 /* Redis Sentinel implementation
2 * -----------------------------
3 *
4 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include "redis.h"
33 #include "hiredis.h"
34 #include "async.h"
35
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <sys/socket.h>
39
40 #define REDIS_SENTINEL_PORT 26379
41
42 /* ======================== Sentinel global state =========================== */
43
44 typedef long long mstime_t; /* millisecond time type. */
45
46 /* Address object, used to describe an ip:port pair. */
47 typedef struct sentinelAddr {
48 char *ip;
49 int port;
50 } sentinelAddr;
51
52 /* A Sentinel Redis Instance object is monitoring. */
53 #define SRI_MASTER (1<<0)
54 #define SRI_SLAVE (1<<1)
55 #define SRI_SENTINEL (1<<2)
56 #define SRI_DISCONNECTED (1<<3)
57 #define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
58 #define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
59 #define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
60 its master is down. */
61 /* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
62 * allowed to perform the failover for this master.
63 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
64 * perform the failover on its master. */
65 #define SRI_CAN_FAILOVER (1<<7)
66 #define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
67 this master. */
68 #define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
69 #define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
70 #define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
71 #define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
72 #define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
73
74 #define SENTINEL_INFO_PERIOD 10000
75 #define SENTINEL_PING_PERIOD 1000
76 #define SENTINEL_ASK_PERIOD 1000
77 #define SENTINEL_PUBLISH_PERIOD 5000
78 #define SENTINEL_DOWN_AFTER_PERIOD 30000
79 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
80 #define SENTINEL_TILT_TRIGGER 2000
81 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
82 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
83 #define SENTINEL_PROMOTION_RETRY_PERIOD 30000
84 #define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
85 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
86 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
87 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
88 #define SENTINEL_MAX_PENDING_COMMANDS 100
89 #define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
90
91 /* How many milliseconds is an information valid? This applies for instance
92 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
93 #define SENTINEL_INFO_VALIDITY_TIME 5000
94 #define SENTINEL_FAILOVER_FIXED_DELAY 5000
95 #define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
96
97 /* Failover machine different states. */
98 #define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
99 #define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
100 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
101 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
102 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
103 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
104 #define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
105 #define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
106 #define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
107 #define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
108 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
109
110 #define SENTINEL_MASTER_LINK_STATUS_UP 0
111 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
112
113 /* Generic flags that can be used with different functions. */
114 #define SENTINEL_NO_FLAGS 0
115 #define SENTINEL_GENERATE_EVENT 1
116
117 typedef struct sentinelRedisInstance {
118 int flags; /* See SRI_... defines */
119 char *name; /* Master name from the point of view of this sentinel. */
120 char *runid; /* run ID of this instance. */
121 sentinelAddr *addr; /* Master host. */
122 redisAsyncContext *cc; /* Hiredis context for commands. */
123 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
124 int pending_commands; /* Number of commands sent waiting for a reply. */
125 mstime_t cc_conn_time; /* cc connection time. */
126 mstime_t pc_conn_time; /* pc connection time. */
127 mstime_t pc_last_activity; /* Last time we received any message. */
128 mstime_t last_avail_time; /* Last time the instance replied to ping with
129 a reply we consider valid. */
130 mstime_t last_pong_time; /* Last time the instance replied to ping,
131 whatever the reply was. That's used to check
132 if the link is idle and must be reconnected. */
133 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
134 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
135 we received an hello from this Sentinel
136 via Pub/Sub. */
137 mstime_t last_master_down_reply_time; /* Time of last reply to
138 SENTINEL is-master-down command. */
139 mstime_t s_down_since_time; /* Subjectively down since time. */
140 mstime_t o_down_since_time; /* Objectively down since time. */
141 mstime_t down_after_period; /* Consider it down after that period. */
142 mstime_t info_refresh; /* Time at which we received INFO output from it. */
143
144 /* Master specific. */
145 dict *sentinels; /* Other sentinels monitoring the same master. */
146 dict *slaves; /* Slaves for this master instance. */
147 int quorum; /* Number of sentinels that need to agree on failure. */
148 int parallel_syncs; /* How many slaves to reconfigure at same time. */
149
150 /* Slave specific. */
151 mstime_t master_link_down_time; /* Slave replication link down time. */
152 int slave_priority; /* Slave priority according to its INFO output. */
153 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
154 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
155 char *slave_master_host; /* Master host as reported by INFO */
156 int slave_master_port; /* Master port as reported by INFO */
157 int slave_master_link_status; /* Master link status as reported by INFO */
158 /* Failover */
159 char *leader; /* If this is a master instance, this is the runid of
160 the Sentinel that should perform the failover. If
161 this is a Sentinel, this is the runid of the Sentinel
162 that this other Sentinel is voting as leader.
163 This field is valid only if SRI_MASTER_DOWN is
164 set on the Sentinel instance. */
165 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
166 mstime_t failover_state_change_time;
167 mstime_t failover_start_time; /* When to start to failover if leader. */
168 mstime_t failover_timeout; /* Max time to refresh failover state. */
169 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
170 /* Scripts executed to notify admin or reconfigure clients: when they
171 * are set to NULL no script is executed. */
172 char *notify_script;
173 char *client_reconfig_script;
174 } sentinelRedisInstance;
175
176 /* Main state. */
177 struct sentinelState {
178 dict *masters; /* Dictionary of master sentinelRedisInstances.
179 Key is the instance name, value is the
180 sentinelRedisInstance structure pointer. */
181 int tilt; /* Are we in TILT mode? */
182 mstime_t tilt_start_time; /* When TITL started. */
183 mstime_t previous_time; /* Time last time we ran the time handler. */
184 } sentinel;
185
186 /* ======================= hiredis ae.c adapters =============================
187 * Note: this implementation is taken from hiredis/adapters/ae.h, however
188 * we have our modified copy for Sentinel in order to use our allocator
189 * and to have full control over how the adapter works. */
190
191 typedef struct redisAeEvents {
192 redisAsyncContext *context;
193 aeEventLoop *loop;
194 int fd;
195 int reading, writing;
196 } redisAeEvents;
197
198 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
199 ((void)el); ((void)fd); ((void)mask);
200
201 redisAeEvents *e = (redisAeEvents*)privdata;
202 redisAsyncHandleRead(e->context);
203 }
204
205 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
206 ((void)el); ((void)fd); ((void)mask);
207
208 redisAeEvents *e = (redisAeEvents*)privdata;
209 redisAsyncHandleWrite(e->context);
210 }
211
212 static void redisAeAddRead(void *privdata) {
213 redisAeEvents *e = (redisAeEvents*)privdata;
214 aeEventLoop *loop = e->loop;
215 if (!e->reading) {
216 e->reading = 1;
217 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
218 }
219 }
220
221 static void redisAeDelRead(void *privdata) {
222 redisAeEvents *e = (redisAeEvents*)privdata;
223 aeEventLoop *loop = e->loop;
224 if (e->reading) {
225 e->reading = 0;
226 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
227 }
228 }
229
230 static void redisAeAddWrite(void *privdata) {
231 redisAeEvents *e = (redisAeEvents*)privdata;
232 aeEventLoop *loop = e->loop;
233 if (!e->writing) {
234 e->writing = 1;
235 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
236 }
237 }
238
239 static void redisAeDelWrite(void *privdata) {
240 redisAeEvents *e = (redisAeEvents*)privdata;
241 aeEventLoop *loop = e->loop;
242 if (e->writing) {
243 e->writing = 0;
244 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
245 }
246 }
247
248 static void redisAeCleanup(void *privdata) {
249 redisAeEvents *e = (redisAeEvents*)privdata;
250 redisAeDelRead(privdata);
251 redisAeDelWrite(privdata);
252 zfree(e);
253 }
254
255 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
256 redisContext *c = &(ac->c);
257 redisAeEvents *e;
258
259 /* Nothing should be attached when something is already attached */
260 if (ac->ev.data != NULL)
261 return REDIS_ERR;
262
263 /* Create container for context and r/w events */
264 e = (redisAeEvents*)zmalloc(sizeof(*e));
265 e->context = ac;
266 e->loop = loop;
267 e->fd = c->fd;
268 e->reading = e->writing = 0;
269
270 /* Register functions to start/stop listening for events */
271 ac->ev.addRead = redisAeAddRead;
272 ac->ev.delRead = redisAeDelRead;
273 ac->ev.addWrite = redisAeAddWrite;
274 ac->ev.delWrite = redisAeDelWrite;
275 ac->ev.cleanup = redisAeCleanup;
276 ac->ev.data = e;
277
278 return REDIS_OK;
279 }
280
281 /* ============================= Prototypes ================================= */
282
283 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
284 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
285 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
286 sentinelRedisInstance *sentinelGetMasterByName(char *name);
287 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
288 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
289 int yesnotoi(char *s);
290 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
291 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
292 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
293 void sentinelAbortFailover(sentinelRedisInstance *ri);
294
295 /* ========================= Dictionary types =============================== */
296
297 unsigned int dictSdsHash(const void *key);
298 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
299 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
300
301 void dictInstancesValDestructor (void *privdata, void *obj) {
302 releaseSentinelRedisInstance(obj);
303 }
304
305 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
306 *
307 * also used for: sentinelRedisInstance->sentinels dictionary that maps
308 * sentinels ip:port to last seen time in Pub/Sub hello message. */
309 dictType instancesDictType = {
310 dictSdsHash, /* hash function */
311 NULL, /* key dup */
312 NULL, /* val dup */
313 dictSdsKeyCompare, /* key compare */
314 NULL, /* key destructor */
315 dictInstancesValDestructor /* val destructor */
316 };
317
318 /* Instance runid (sds) -> votes (long casted to void*)
319 *
320 * This is useful into sentinelGetObjectiveLeader() function in order to
321 * count the votes and understand who is the leader. */
322 dictType leaderVotesDictType = {
323 dictSdsHash, /* hash function */
324 NULL, /* key dup */
325 NULL, /* val dup */
326 dictSdsKeyCompare, /* key compare */
327 NULL, /* key destructor */
328 NULL /* val destructor */
329 };
330
331 /* =========================== Initialization =============================== */
332
333 void sentinelCommand(redisClient *c);
334
335 struct redisCommand sentinelcmds[] = {
336 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
337 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
338 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
339 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
340 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
341 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}
342 };
343
344 /* This function overwrites a few normal Redis config default with Sentinel
345 * specific defaults. */
346 void initSentinelConfig(void) {
347 server.port = REDIS_SENTINEL_PORT;
348 }
349
350 /* Perform the Sentinel mode initialization. */
351 void initSentinel(void) {
352 int j;
353
354 /* Remove usual Redis commands from the command table, then just add
355 * the SENTINEL command. */
356 dictEmpty(server.commands);
357 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
358 int retval;
359 struct redisCommand *cmd = sentinelcmds+j;
360
361 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
362 redisAssert(retval == DICT_OK);
363 }
364
365 /* Initialize various data structures. */
366 sentinel.masters = dictCreate(&instancesDictType,NULL);
367 sentinel.tilt = 0;
368 sentinel.tilt_start_time = mstime();
369 sentinel.previous_time = mstime();
370 }
371
372 /* ============================== sentinelAddr ============================== */
373
374 /* Create a sentinelAddr object and return it on success.
375 * On error NULL is returned and errno is set to:
376 * ENOENT: Can't resolve the hostname.
377 * EINVAL: Invalid port number.
378 */
379 sentinelAddr *createSentinelAddr(char *hostname, int port) {
380 char buf[32];
381 sentinelAddr *sa;
382
383 if (port <= 0 || port > 65535) {
384 errno = EINVAL;
385 return NULL;
386 }
387 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
388 errno = ENOENT;
389 return NULL;
390 }
391 sa = zmalloc(sizeof(*sa));
392 sa->ip = sdsnew(buf);
393 sa->port = port;
394 return sa;
395 }
396
397 /* Free a Sentinel address. Can't fail. */
398 void releaseSentinelAddr(sentinelAddr *sa) {
399 sdsfree(sa->ip);
400 zfree(sa);
401 }
402
403 /* =========================== Events notification ========================== */
404
405 void sentinelCallNotificationScript(char *scriptpath, char *type, char *msg) {
406 /* TODO: implement it. */
407 }
408
409 /* Send an event to log, pub/sub, user notification script.
410 *
411 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
412 * the execution of the user notification script.
413 *
414 * 'type' is the message type, also used as a pub/sub channel name.
415 *
416 * 'ri', is the redis instance target of this event if applicable, and is
417 * used to obtain the path of the notification script to execute.
418 *
419 * The remaining arguments are printf-alike.
420 * If the format specifier starts with the two characters "%@" then ri is
421 * not NULL, and the message is prefixed with an instance identifier in the
422 * following format:
423 *
424 * <instance type> <instance name> <ip> <port>
425 *
426 * If the instance type is not master, than the additional string is
427 * added to specify the originating master:
428 *
429 * @ <master name> <master ip> <master port>
430 *
431 * Any other specifier after "%@" is processed by printf itself.
432 */
433 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
434 const char *fmt, ...) {
435 va_list ap;
436 char msg[REDIS_MAX_LOGMSG_LEN];
437 robj *channel, *payload;
438
439 /* Handle %@ */
440 if (fmt[0] == '%' && fmt[1] == '@') {
441 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
442 NULL : ri->master;
443
444 if (master) {
445 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
446 sentinelRedisInstanceTypeStr(ri),
447 ri->name, ri->addr->ip, ri->addr->port,
448 master->name, master->addr->ip, master->addr->port);
449 } else {
450 snprintf(msg, sizeof(msg), "%s %s %s %d",
451 sentinelRedisInstanceTypeStr(ri),
452 ri->name, ri->addr->ip, ri->addr->port);
453 }
454 fmt += 2;
455 } else {
456 msg[0] = '\0';
457 }
458
459 /* Use vsprintf for the rest of the formatting if any. */
460 if (fmt[0] != '\0') {
461 va_start(ap, fmt);
462 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
463 va_end(ap);
464 }
465
466 /* Log the message if the log level allows it to be logged. */
467 if (level >= server.verbosity)
468 redisLog(level,"%s %s",type,msg);
469
470 /* Publish the message via Pub/Sub if it's not a debugging one. */
471 if (level != REDIS_DEBUG) {
472 channel = createStringObject(type,strlen(type));
473 payload = createStringObject(msg,strlen(msg));
474 pubsubPublishMessage(channel,payload);
475 decrRefCount(channel);
476 decrRefCount(payload);
477 }
478
479 /* Call the notification script if applicable. */
480 if (level == REDIS_WARNING && ri != NULL) {
481 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
482 ri : ri->master;
483 if (master->notify_script) {
484 sentinelCallNotificationScript(master->notify_script,type,msg);
485 }
486 }
487 }
488
489 /* ========================== sentinelRedisInstance ========================= */
490
491 /* Create a redis instance, the following fields must be populated by the
492 * caller if needed:
493 * runid: set to NULL but will be populated once INFO output is received.
494 * info_refresh: is set to 0 to mean that we never received INFO so far.
495 *
496 * If SRI_MASTER is set into initial flags the instance is added to
497 * sentinel.masters table.
498 *
499 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
500 * instance is added into master->slaves or master->sentinels table.
501 *
502 * If the instance is a slave or sentinel, the name parameter is ignored and
503 * is created automatically as hostname:port.
504 *
505 * The function fails if hostname can't be resolved or port is out of range.
506 * When this happens NULL is returned and errno is set accordingly to the
507 * createSentinelAddr() function.
508 *
509 * The function may also fail and return NULL with errno set to EBUSY if
510 * a master or slave with the same name already exists. */
511 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
512 sentinelRedisInstance *ri;
513 sentinelAddr *addr;
514 dict *table;
515 char slavename[128], *sdsname;
516
517 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
518 redisAssert((flags & SRI_MASTER) || master != NULL);
519
520 /* Check address validity. */
521 addr = createSentinelAddr(hostname,port);
522 if (addr == NULL) return NULL;
523
524 /* For slaves and sentinel we use ip:port as name. */
525 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
526 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
527 name = slavename;
528 }
529
530 /* Make sure the entry is not duplicated. This may happen when the same
531 * name for a master is used multiple times inside the configuration or
532 * if we try to add multiple times a slave or sentinel with same ip/port
533 * to a master. */
534 if (flags & SRI_MASTER) table = sentinel.masters;
535 else if (flags & SRI_SLAVE) table = master->slaves;
536 else if (flags & SRI_SENTINEL) table = master->sentinels;
537 sdsname = sdsnew(name);
538 if (dictFind(table,sdsname)) {
539 sdsfree(sdsname);
540 errno = EBUSY;
541 return NULL;
542 }
543
544 /* Create the instance object. */
545 ri = zmalloc(sizeof(*ri));
546 /* Note that all the instances are started in the disconnected state,
547 * the event loop will take care of connecting them. */
548 ri->flags = flags | SRI_DISCONNECTED;
549 ri->name = sdsname;
550 ri->runid = NULL;
551 ri->addr = addr;
552 ri->cc = NULL;
553 ri->pc = NULL;
554 ri->pending_commands = 0;
555 ri->cc_conn_time = 0;
556 ri->pc_conn_time = 0;
557 ri->pc_last_activity = 0;
558 ri->last_avail_time = mstime();
559 ri->last_pong_time = mstime();
560 ri->last_pub_time = mstime();
561 ri->last_hello_time = mstime();
562 ri->last_master_down_reply_time = mstime();
563 ri->s_down_since_time = 0;
564 ri->o_down_since_time = 0;
565 ri->down_after_period = master ? master->down_after_period :
566 SENTINEL_DOWN_AFTER_PERIOD;
567 ri->master_link_down_time = 0;
568 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
569 ri->slave_reconf_sent_time = 0;
570 ri->slave_master_host = NULL;
571 ri->slave_master_port = 0;
572 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
573 ri->sentinels = dictCreate(&instancesDictType,NULL);
574 ri->quorum = quorum;
575 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
576 ri->master = master;
577 ri->slaves = dictCreate(&instancesDictType,NULL);
578 ri->info_refresh = 0;
579
580 /* Failover state. */
581 ri->leader = NULL;
582 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
583 ri->failover_state_change_time = 0;
584 ri->failover_start_time = 0;
585 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
586 ri->promoted_slave = NULL;
587 ri->notify_script = NULL;
588 ri->client_reconfig_script = NULL;
589
590 /* Add into the right table. */
591 dictAdd(table, ri->name, ri);
592 return ri;
593 }
594
595 /* Release this instance and all its slaves, sentinels, hiredis connections.
596 * This function also takes care of unlinking the instance from the main
597 * masters table (if it is a master) or from its master sentinels/slaves table
598 * if it is a slave or sentinel. */
599 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
600 /* Release all its slaves or sentinels if any. */
601 dictRelease(ri->sentinels);
602 dictRelease(ri->slaves);
603
604 /* Release hiredis connections. */
605 if (ri->cc) sentinelKillLink(ri,ri->cc);
606 if (ri->pc) sentinelKillLink(ri,ri->pc);
607
608 /* Free other resources. */
609 sdsfree(ri->name);
610 sdsfree(ri->runid);
611 sdsfree(ri->notify_script);
612 sdsfree(ri->client_reconfig_script);
613 sdsfree(ri->slave_master_host);
614 sdsfree(ri->leader);
615 releaseSentinelAddr(ri->addr);
616
617 /* Clear state into the master if needed. */
618 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
619 ri->master->promoted_slave = NULL;
620
621 zfree(ri);
622 }
623
624 /* Lookup a slave in a master Redis instance, by ip and port. */
625 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
626 sentinelRedisInstance *ri, char *ip, int port)
627 {
628 sds key;
629 sentinelRedisInstance *slave;
630
631 redisAssert(ri->flags & SRI_MASTER);
632 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
633 slave = dictFetchValue(ri->slaves,key);
634 sdsfree(key);
635 return slave;
636 }
637
638 /* Return the name of the type of the instance as a string. */
639 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
640 if (ri->flags & SRI_MASTER) return "master";
641 else if (ri->flags & SRI_SLAVE) return "slave";
642 else if (ri->flags & SRI_SENTINEL) return "sentinel";
643 else return "unknown";
644 }
645
646 /* This function removes all the instances found in the dictionary of instances
647 * 'd', having either:
648 *
649 * 1) The same ip/port as specified.
650 * 2) The same runid.
651 *
652 * "1" and "2" don't need to verify at the same time, just one is enough.
653 * If "runid" is NULL it is not checked.
654 * Similarly if "ip" is NULL it is not checked.
655 *
656 * This function is useful because every time we add a new Sentinel into
657 * a master's Sentinels dictionary, we want to be very sure about not
658 * having duplicated instances for any reason. This is so important because
659 * we use those other sentinels in order to run our quorum protocol to
660 * understand if it's time to proceeed with the fail over.
661 *
662 * Making sure no duplication is possible we greately improve the robustness
663 * of the quorum (otherwise we may end counting the same instance multiple
664 * times for some reason).
665 *
666 * The function returns the number of Sentinels removed. */
667 int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
668 dictIterator *di;
669 dictEntry *de;
670 int removed = 0;
671
672 di = dictGetSafeIterator(master->sentinels);
673 while((de = dictNext(di)) != NULL) {
674 sentinelRedisInstance *ri = dictGetVal(de);
675
676 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
677 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
678 {
679 dictDelete(master->sentinels,ri->name);
680 removed++;
681 }
682 }
683 dictReleaseIterator(di);
684 return removed;
685 }
686
687 /* Search an instance with the same runid, ip and port into a dictionary
688 * of instances. Return NULL if not found, otherwise return the instance
689 * pointer.
690 *
691 * runid or ip can be NULL. In such a case the search is performed only
692 * by the non-NULL field. */
693 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
694 dictIterator *di;
695 dictEntry *de;
696 sentinelRedisInstance *instance = NULL;
697
698 redisAssert(ip || runid); /* User must pass at least one search param. */
699 di = dictGetIterator(instances);
700 while((de = dictNext(di)) != NULL) {
701 sentinelRedisInstance *ri = dictGetVal(de);
702
703 if (runid && !ri->runid) continue;
704 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
705 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
706 ri->addr->port == port)))
707 {
708 instance = ri;
709 break;
710 }
711 }
712 dictReleaseIterator(di);
713 return instance;
714 }
715
716 /* Simple master lookup by name */
717 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
718 sentinelRedisInstance *ri;
719 sds sdsname = sdsnew(name);
720
721 ri = dictFetchValue(sentinel.masters,sdsname);
722 sdsfree(sdsname);
723 return ri;
724 }
725
726 /* Add the specified flags to all the instances in the specified dictionary. */
727 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
728 dictIterator *di;
729 dictEntry *de;
730
731 di = dictGetIterator(instances);
732 while((de = dictNext(di)) != NULL) {
733 sentinelRedisInstance *ri = dictGetVal(de);
734 ri->flags |= flags;
735 }
736 dictReleaseIterator(di);
737 }
738
739 /* Remove the specified flags to all the instances in the specified
740 * dictionary. */
741 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
742 dictIterator *di;
743 dictEntry *de;
744
745 di = dictGetIterator(instances);
746 while((de = dictNext(di)) != NULL) {
747 sentinelRedisInstance *ri = dictGetVal(de);
748 ri->flags &= ~flags;
749 }
750 dictReleaseIterator(di);
751 }
752
753 /* Reset the state of a monitored master:
754 * 1) Remove all slaves.
755 * 2) Remove all sentinels.
756 * 3) Remove most of the flags resulting from runtime operations.
757 * 4) Reset timers to their default value.
758 * 5) In the process of doing this undo the failover if in progress.
759 * 6) Disconnect the connections with the master (will reconnect automatically).
760 */
761 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
762 redisAssert(ri->flags & SRI_MASTER);
763 dictRelease(ri->slaves);
764 dictRelease(ri->sentinels);
765 ri->slaves = dictCreate(&instancesDictType,NULL);
766 ri->sentinels = dictCreate(&instancesDictType,NULL);
767 if (ri->cc) sentinelKillLink(ri,ri->cc);
768 if (ri->pc) sentinelKillLink(ri,ri->pc);
769 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
770 if (ri->leader) {
771 sdsfree(ri->leader);
772 ri->leader = NULL;
773 }
774 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
775 ri->failover_state_change_time = 0;
776 ri->failover_start_time = 0;
777 ri->promoted_slave = NULL;
778 sdsfree(ri->runid);
779 sdsfree(ri->slave_master_host);
780 ri->runid = NULL;
781 ri->slave_master_host = NULL;
782 ri->last_avail_time = mstime();
783 ri->last_pong_time = mstime();
784 if (flags & SENTINEL_GENERATE_EVENT)
785 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
786 }
787
788 /* Call sentinelResetMaster() on every master with a name matching the specified
789 * pattern. */
790 int sentinelResetMastersByPattern(char *pattern, int flags) {
791 dictIterator *di;
792 dictEntry *de;
793 int reset = 0;
794
795 di = dictGetIterator(sentinel.masters);
796 while((de = dictNext(di)) != NULL) {
797 sentinelRedisInstance *ri = dictGetVal(de);
798
799 if (ri->name) {
800 if (stringmatch(pattern,ri->name,0)) {
801 sentinelResetMaster(ri,flags);
802 reset++;
803 }
804 }
805 }
806 dictReleaseIterator(di);
807 return reset;
808 }
809
810 /* Reset the specified master with sentinelResetMaster(), and also change
811 * the ip:port address, but take the name of the instance unmodified.
812 *
813 * This is used to handle the +switch-master and +redirect-to-master events.
814 *
815 * The function returns REDIS_ERR if the address can't be resolved for some
816 * reason. Otherwise REDIS_OK is returned.
817 *
818 * TODO: make this reset so that original sentinels are re-added with
819 * same ip / port / runid.
820 */
821
822 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
823 sentinelAddr *oldaddr, *newaddr;
824
825 newaddr = createSentinelAddr(ip,port);
826 if (newaddr == NULL) return REDIS_ERR;
827 sentinelResetMaster(master,SENTINEL_NO_FLAGS);
828 oldaddr = master->addr;
829 master->addr = newaddr;
830 /* Release the old address at the end so we are safe even if the function
831 * gets the master->addr->ip and master->addr->port as arguments. */
832 releaseSentinelAddr(oldaddr);
833 return REDIS_OK;
834 }
835
836 /* ============================ Config handling ============================= */
837 char *sentinelHandleConfiguration(char **argv, int argc) {
838 sentinelRedisInstance *ri;
839
840 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
841 /* monitor <name> <host> <port> <quorum> */
842 int quorum = atoi(argv[4]);
843
844 if (quorum <= 0) return "Quorum must be 1 or greater.";
845 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
846 atoi(argv[3]),quorum,NULL) == NULL)
847 {
848 switch(errno) {
849 case EBUSY: return "Duplicated master name.";
850 case ENOENT: return "Can't resolve master instance hostname.";
851 case EINVAL: return "Invalid port number";
852 }
853 }
854 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
855 /* down-after-milliseconds <name> <milliseconds> */
856 ri = sentinelGetMasterByName(argv[1]);
857 if (!ri) return "No such master with specified name.";
858 ri->down_after_period = atoi(argv[2]);
859 if (ri->down_after_period <= 0)
860 return "negative or zero time parameter.";
861 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
862 /* failover-timeout <name> <milliseconds> */
863 ri = sentinelGetMasterByName(argv[1]);
864 if (!ri) return "No such master with specified name.";
865 ri->failover_timeout = atoi(argv[2]);
866 if (ri->failover_timeout <= 0)
867 return "negative or zero time parameter.";
868 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
869 /* can-failover <name> <yes/no> */
870 int yesno = yesnotoi(argv[2]);
871
872 ri = sentinelGetMasterByName(argv[1]);
873 if (!ri) return "No such master with specified name.";
874 if (yesno == -1) return "Argument must be either yes or no.";
875 if (yesno)
876 ri->flags |= SRI_CAN_FAILOVER;
877 else
878 ri->flags &= ~SRI_CAN_FAILOVER;
879 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
880 /* parallel-syncs <name> <milliseconds> */
881 ri = sentinelGetMasterByName(argv[1]);
882 if (!ri) return "No such master with specified name.";
883 ri->parallel_syncs = atoi(argv[2]);
884 } else {
885 return "Unrecognized sentinel configuration statement.";
886 }
887 return NULL;
888 }
889
890 /* ====================== hiredis connection handling ======================= */
891
892 /* Completely disconnect an hiredis link from an instance. */
893 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
894 if (ri->cc == c) {
895 ri->cc = NULL;
896 ri->pending_commands = 0;
897 }
898 if (ri->pc == c) ri->pc = NULL;
899 c->data = NULL;
900 ri->flags |= SRI_DISCONNECTED;
901 redisAsyncFree(c);
902 }
903
904 /* This function takes an hiredis context that is in an error condition
905 * and make sure to mark the instance as disconnected performing the
906 * cleanup needed.
907 *
908 * Note: we don't free the hiredis context as hiredis will do it for us
909 * for async conenctions. */
910 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
911 sentinelRedisInstance *ri = c->data;
912 int pubsub;
913
914 if (ri == NULL) return; /* The instance no longer exists. */
915
916 pubsub = (ri->pc == c);
917 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
918 "%@ #%s", c->errstr);
919 if (pubsub)
920 ri->pc = NULL;
921 else
922 ri->cc = NULL;
923 ri->flags |= SRI_DISCONNECTED;
924 }
925
926 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
927 if (status != REDIS_OK) {
928 sentinelDisconnectInstanceFromContext(c);
929 } else {
930 sentinelRedisInstance *ri = c->data;
931 int pubsub = (ri->pc == c);
932
933 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
934 "%@");
935 }
936 }
937
938 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
939 sentinelDisconnectInstanceFromContext(c);
940 }
941
942 /* Create the async connections for the specified instance if the instance
943 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
944 * one of the two links (commands and pub/sub) is missing. */
945 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
946 if (!(ri->flags & SRI_DISCONNECTED)) return;
947
948 /* Commands connection. */
949 if (ri->cc == NULL) {
950 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
951 if (ri->cc->err) {
952 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
953 ri->cc->errstr);
954 sentinelKillLink(ri,ri->cc);
955 } else {
956 ri->cc_conn_time = mstime();
957 ri->cc->data = ri;
958 redisAeAttach(server.el,ri->cc);
959 redisAsyncSetConnectCallback(ri->cc,
960 sentinelLinkEstablishedCallback);
961 redisAsyncSetDisconnectCallback(ri->cc,
962 sentinelDisconnectCallback);
963 }
964 }
965 /* Pub / Sub */
966 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
967 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
968 if (ri->pc->err) {
969 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
970 ri->pc->errstr);
971 sentinelKillLink(ri,ri->pc);
972 } else {
973 int retval;
974
975 ri->pc_conn_time = mstime();
976 ri->pc->data = ri;
977 redisAeAttach(server.el,ri->pc);
978 redisAsyncSetConnectCallback(ri->pc,
979 sentinelLinkEstablishedCallback);
980 redisAsyncSetDisconnectCallback(ri->pc,
981 sentinelDisconnectCallback);
982 /* Now we subscribe to the Sentinels "Hello" channel. */
983 retval = redisAsyncCommand(ri->pc,
984 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
985 SENTINEL_HELLO_CHANNEL);
986 if (retval != REDIS_OK) {
987 /* If we can't subscribe, the Pub/Sub connection is useless
988 * and we can simply disconnect it and try again. */
989 sentinelKillLink(ri,ri->pc);
990 return;
991 }
992 }
993 }
994 /* Clear the DISCONNECTED flags only if we have both the connections
995 * (or just the commands connection if this is a slave or a
996 * sentinel instance). */
997 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
998 ri->flags &= ~SRI_DISCONNECTED;
999 }
1000
1001 /* ======================== Redis instances pinging ======================== */
1002
1003 /* Process the INFO output from masters. */
1004 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
1005 sds *lines;
1006 int numlines, j;
1007 int role = 0;
1008 int runid_changed = 0; /* true if runid changed. */
1009 int first_runid = 0; /* true if this is the first runid we receive. */
1010
1011 /* The following fields must be reset to a given value in the case they
1012 * are not found at all in the INFO output. */
1013 ri->master_link_down_time = 0;
1014
1015 /* Process line by line. */
1016 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
1017 for (j = 0; j < numlines; j++) {
1018 sentinelRedisInstance *slave;
1019 sds l = lines[j];
1020
1021 /* run_id:<40 hex chars>*/
1022 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
1023 if (ri->runid == NULL) {
1024 ri->runid = sdsnewlen(l+7,40);
1025 first_runid = 1;
1026 } else {
1027 if (strncmp(ri->runid,l+7,40) != 0) {
1028 runid_changed = 1;
1029 sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
1030 sdsfree(ri->runid);
1031 ri->runid = sdsnewlen(l+7,40);
1032 }
1033 }
1034 }
1035
1036 /* slave0:<ip>,<port>,<state> */
1037 if ((ri->flags & SRI_MASTER) &&
1038 sdslen(l) >= 7 &&
1039 !memcmp(l,"slave",5) && isdigit(l[5]))
1040 {
1041 char *ip, *port, *end;
1042
1043 ip = strchr(l,':'); if (!ip) continue;
1044 ip++; /* Now ip points to start of ip address. */
1045 port = strchr(ip,','); if (!port) continue;
1046 *port = '\0'; /* nul term for easy access. */
1047 port++; /* Now port points to start of port number. */
1048 end = strchr(port,','); if (!end) continue;
1049 *end = '\0'; /* nul term for easy access. */
1050
1051 /* Check if we already have this slave into our table,
1052 * otherwise add it. */
1053 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1054 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1055 atoi(port), ri->quorum,ri)) != NULL)
1056 {
1057 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1058 }
1059 }
1060 }
1061
1062 /* master_link_down_since_seconds:<seconds> */
1063 if (sdslen(l) >= 32 &&
1064 !memcmp(l,"master_link_down_since_seconds",30))
1065 {
1066 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1067 }
1068
1069 /* role:<role> */
1070 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1071 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1072
1073 if (role == SRI_SLAVE) {
1074 /* master_host:<host> */
1075 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1076 sdsfree(ri->slave_master_host);
1077 ri->slave_master_host = sdsnew(l+12);
1078 }
1079
1080 /* master_port:<port> */
1081 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1082 ri->slave_master_port = atoi(l+12);
1083
1084 /* master_link_status:<status> */
1085 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1086 ri->slave_master_link_status =
1087 (strcasecmp(l+19,"up") == 0) ?
1088 SENTINEL_MASTER_LINK_STATUS_UP :
1089 SENTINEL_MASTER_LINK_STATUS_DOWN;
1090 }
1091 }
1092 }
1093 ri->info_refresh = mstime();
1094 sdsfreesplitres(lines,numlines);
1095
1096 if (sentinel.tilt) return;
1097
1098 /* Act if a master turned into a slave. */
1099 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
1100 if (first_runid && ri->slave_master_host) {
1101 /* If it is the first time we receive INFO from it, but it's
1102 * a slave while it was configured as a master, we want to monitor
1103 * its master instead. */
1104 sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
1105 "%s %s %d %s %d",
1106 ri->name, ri->addr->ip, ri->addr->port,
1107 ri->slave_master_host, ri->slave_master_port);
1108 sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
1109 ri->slave_master_port);
1110 return;
1111 }
1112 }
1113
1114 /* Act if a slave turned into a master. */
1115 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
1116 if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1117 (runid_changed || first_runid))
1118 {
1119 /* If a slave turned into a master, but at the same time the
1120 * runid has changed, or it is simply the first time we see and
1121 * INFO output from this instance, this is a reboot with a wrong
1122 * configuration.
1123 *
1124 * Log the event and remove the slave. */
1125 int retval;
1126
1127 sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
1128 retval = dictDelete(ri->master->slaves,ri->name);
1129 redisAssert(retval == REDIS_OK);
1130 return;
1131 } else if (ri->flags & SRI_PROMOTED) {
1132 /* If this is a promoted slave we can change state to the
1133 * failover state machine. */
1134 if (ri->master &&
1135 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1136 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1137 (ri->master->failover_state ==
1138 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1139 {
1140 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1141 ri->master->failover_state_change_time = mstime();
1142 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1143 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1144 ri->master,"%@");
1145 }
1146 } else {
1147 /* Otherwise we interpret this as the start of the failover. */
1148 if (ri->master &&
1149 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) == 0)
1150 {
1151 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1152 sentinelEvent(REDIS_WARNING,"failover-detected",ri->master,"%@");
1153 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1154 ri->master->failover_state_change_time = mstime();
1155 ri->master->promoted_slave = ri;
1156 ri->flags |= SRI_PROMOTED;
1157 /* We are an observer, so we can only assume that the leader
1158 * is reconfiguring the slave instances. For this reason we
1159 * set all the instances as RECONF_SENT waiting for progresses
1160 * on this side. */
1161 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1162 SRI_RECONF_SENT);
1163 }
1164 }
1165 }
1166
1167 /* Detect if the slave that is in the process of being reconfigured
1168 * changed state. */
1169 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1170 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1171 {
1172 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1173 if ((ri->flags & SRI_RECONF_SENT) &&
1174 ri->slave_master_host &&
1175 strcmp(ri->slave_master_host,
1176 ri->master->promoted_slave->addr->ip) == 0 &&
1177 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1178 {
1179 ri->flags &= ~SRI_RECONF_SENT;
1180 ri->flags |= SRI_RECONF_INPROG;
1181 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1182 }
1183
1184 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1185 if ((ri->flags & SRI_RECONF_INPROG) &&
1186 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1187 {
1188 ri->flags &= ~SRI_RECONF_INPROG;
1189 ri->flags |= SRI_RECONF_DONE;
1190 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1191 /* If we are moving forward (a new slave is now configured)
1192 * we update the change_time as we are conceptually passing
1193 * to the next slave. */
1194 ri->failover_state_change_time = mstime();
1195 }
1196 }
1197 }
1198
1199 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1200 sentinelRedisInstance *ri = c->data;
1201 redisReply *r;
1202
1203 if (ri) ri->pending_commands--;
1204 if (!reply || !ri) return;
1205 r = reply;
1206
1207 if (r->type == REDIS_REPLY_STRING) {
1208 sentinelRefreshInstanceInfo(ri,r->str);
1209 }
1210 }
1211
1212 /* Just discard the reply. We use this when we are not monitoring the return
1213 * value of the command but its effects directly. */
1214 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1215 sentinelRedisInstance *ri = c->data;
1216
1217 if (ri) ri->pending_commands--;
1218 }
1219
1220 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1221 sentinelRedisInstance *ri = c->data;
1222 redisReply *r;
1223
1224 if (ri) ri->pending_commands--;
1225 if (!reply || !ri) return;
1226 r = reply;
1227
1228 if (r->type == REDIS_REPLY_STATUS ||
1229 r->type == REDIS_REPLY_ERROR) {
1230 /* Update the "instance available" field only if this is an
1231 * acceptable reply. */
1232 if (strncmp(r->str,"PONG",4) == 0 ||
1233 strncmp(r->str,"LOADING",7) == 0 ||
1234 strncmp(r->str,"MASTERDOWN",10) == 0)
1235 {
1236 ri->last_avail_time = mstime();
1237 }
1238 }
1239 ri->last_pong_time = mstime();
1240 }
1241
1242 /* This is called when we get the reply about the PUBLISH command we send
1243 * to the master to advertise this sentinel. */
1244 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1245 sentinelRedisInstance *ri = c->data;
1246 redisReply *r;
1247
1248 if (ri) ri->pending_commands--;
1249 if (!reply || !ri) return;
1250 r = reply;
1251
1252 /* Only update pub_time if we actually published our message. Otherwise
1253 * we'll retry against in 100 milliseconds. */
1254 if (r->type != REDIS_REPLY_ERROR)
1255 ri->last_pub_time = mstime();
1256 }
1257
1258 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
1259 * to discover other sentinels attached at the same master. */
1260 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1261 sentinelRedisInstance *ri = c->data;
1262 redisReply *r;
1263
1264 if (!reply || !ri) return;
1265 r = reply;
1266
1267 /* Update the last activity in the pubsub channel. Note that since we
1268 * receive our messages as well this timestamp can be used to detect
1269 * if the link is probably diconnected even if it seems otherwise. */
1270 ri->pc_last_activity = mstime();
1271
1272 /* Sanity check in the reply we expect, so that the code that follows
1273 * can avoid to check for details. */
1274 if (r->type != REDIS_REPLY_ARRAY ||
1275 r->elements != 3 ||
1276 r->element[0]->type != REDIS_REPLY_STRING ||
1277 r->element[1]->type != REDIS_REPLY_STRING ||
1278 r->element[2]->type != REDIS_REPLY_STRING ||
1279 strcmp(r->element[0]->str,"message") != 0) return;
1280
1281 /* We are not interested in meeting ourselves */
1282 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1283
1284 {
1285 int numtokens, port, removed, canfailover;
1286 char **token = sdssplitlen(r->element[2]->str,
1287 r->element[2]->len,
1288 ":",1,&numtokens);
1289 sentinelRedisInstance *sentinel;
1290
1291 if (numtokens == 4) {
1292 /* First, try to see if we already have this sentinel. */
1293 port = atoi(token[1]);
1294 canfailover = atoi(token[3]);
1295 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1296 ri->sentinels,token[0],port,token[2]);
1297
1298 if (!sentinel) {
1299 /* If not, remove all the sentinels that have the same runid
1300 * OR the same ip/port, because it's either a restart or a
1301 * network topology change. */
1302 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1303 token[2]);
1304 if (removed) {
1305 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1306 "%@ #duplicate of %s:%d or %s",
1307 token[0],port,token[2]);
1308 }
1309
1310 /* Add the new sentinel. */
1311 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1312 token[0],port,ri->quorum,ri);
1313 if (sentinel) {
1314 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1315 /* The runid is NULL after a new instance creation and
1316 * for Sentinels we don't have a later chance to fill it,
1317 * so do it now. */
1318 sentinel->runid = sdsnew(token[2]);
1319 }
1320 }
1321
1322 /* Update the state of the Sentinel. */
1323 if (sentinel) {
1324 sentinel->last_hello_time = mstime();
1325 if (canfailover)
1326 sentinel->flags |= SRI_CAN_FAILOVER;
1327 else
1328 sentinel->flags &= ~SRI_CAN_FAILOVER;
1329 }
1330 }
1331 sdsfreesplitres(token,numtokens);
1332 }
1333 }
1334
1335 void sentinelPingInstance(sentinelRedisInstance *ri) {
1336 mstime_t now = mstime();
1337 mstime_t info_period;
1338 int retval;
1339
1340 /* Return ASAP if we have already a PING or INFO already pending, or
1341 * in the case the instance is not properly connected. */
1342 if (ri->flags & SRI_DISCONNECTED) return;
1343
1344 /* For INFO, PING, PUBLISH that are not critical commands to send we
1345 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1346 * want to use a lot of memory just because a link is not working
1347 * properly (note that anyway there is a redundant protection about this,
1348 * that is, the link will be disconnected and reconnected if a long
1349 * timeout condition is detected. */
1350 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1351
1352 /* If this is a slave of a master in O_DOWN condition we start sending
1353 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1354 * period. In this state we want to closely monitor slaves in case they
1355 * are turned into masters by another Sentinel, or by the sysadmin. */
1356 if ((ri->flags & SRI_SLAVE) &&
1357 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1358 info_period = 1000;
1359 } else {
1360 info_period = SENTINEL_INFO_PERIOD;
1361 }
1362
1363 if ((ri->flags & SRI_SENTINEL) == 0 &&
1364 (ri->info_refresh == 0 ||
1365 (now - ri->info_refresh) > info_period))
1366 {
1367 /* Send INFO to masters and slaves, not sentinels. */
1368 retval = redisAsyncCommand(ri->cc,
1369 sentinelInfoReplyCallback, NULL, "INFO");
1370 if (retval != REDIS_OK) return;
1371 ri->pending_commands++;
1372 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1373 /* Send PING to all the three kinds of instances. */
1374 retval = redisAsyncCommand(ri->cc,
1375 sentinelPingReplyCallback, NULL, "PING");
1376 if (retval != REDIS_OK) return;
1377 ri->pending_commands++;
1378 } else if ((ri->flags & SRI_MASTER) &&
1379 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1380 {
1381 /* PUBLISH hello messages only to masters. */
1382 struct sockaddr_in sa;
1383 socklen_t salen = sizeof(sa);
1384
1385 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1386 char myaddr[128];
1387
1388 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1389 inet_ntoa(sa.sin_addr), server.port, server.runid,
1390 (ri->flags & SRI_CAN_FAILOVER) != 0);
1391 retval = redisAsyncCommand(ri->cc,
1392 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1393 SENTINEL_HELLO_CHANNEL,myaddr);
1394 if (retval != REDIS_OK) return;
1395 ri->pending_commands++;
1396 }
1397 }
1398 }
1399
1400 /* =========================== SENTINEL command ============================= */
1401
1402 const char *sentinelFailoverStateStr(int state) {
1403 switch(state) {
1404 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1405 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1406 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1407 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1408 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1409 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1410 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1411 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1412 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1413 default: return "unknown";
1414 }
1415 }
1416
1417 /* Redis instance to Redis protocol representation. */
1418 void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1419 char *flags = sdsempty();
1420 void *mbl;
1421 int fields = 0;
1422
1423 mbl = addDeferredMultiBulkLength(c);
1424
1425 addReplyBulkCString(c,"name");
1426 addReplyBulkCString(c,ri->name);
1427 fields++;
1428
1429 addReplyBulkCString(c,"ip");
1430 addReplyBulkCString(c,ri->addr->ip);
1431 fields++;
1432
1433 addReplyBulkCString(c,"port");
1434 addReplyBulkLongLong(c,ri->addr->port);
1435 fields++;
1436
1437 addReplyBulkCString(c,"runid");
1438 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1439 fields++;
1440
1441 addReplyBulkCString(c,"flags");
1442 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1443 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1444 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1445 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1446 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1447 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1448 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1449 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1450 flags = sdscat(flags,"failover_in_progress,");
1451 if (ri->flags & SRI_I_AM_THE_LEADER)
1452 flags = sdscat(flags,"i_am_the_leader,");
1453 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1454 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1455 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1456 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1457
1458 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1459 addReplyBulkCString(c,flags);
1460 sdsfree(flags);
1461 fields++;
1462
1463 addReplyBulkCString(c,"pending-commands");
1464 addReplyBulkLongLong(c,ri->pending_commands);
1465 fields++;
1466
1467 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1468 addReplyBulkCString(c,"failover-state");
1469 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1470 fields++;
1471 }
1472
1473 addReplyBulkCString(c,"last-ok-ping-reply");
1474 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1475 fields++;
1476
1477 addReplyBulkCString(c,"last-ping-reply");
1478 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1479 fields++;
1480
1481 if (ri->flags & SRI_S_DOWN) {
1482 addReplyBulkCString(c,"s-down-time");
1483 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1484 fields++;
1485 }
1486
1487 if (ri->flags & SRI_O_DOWN) {
1488 addReplyBulkCString(c,"o-down-time");
1489 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1490 fields++;
1491 }
1492
1493 /* Masters and Slaves */
1494 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1495 addReplyBulkCString(c,"info-refresh");
1496 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1497 fields++;
1498 }
1499
1500 /* Only masters */
1501 if (ri->flags & SRI_MASTER) {
1502 addReplyBulkCString(c,"num-slaves");
1503 addReplyBulkLongLong(c,dictSize(ri->slaves));
1504 fields++;
1505
1506 addReplyBulkCString(c,"num-other-sentinels");
1507 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1508 fields++;
1509
1510 addReplyBulkCString(c,"quorum");
1511 addReplyBulkLongLong(c,ri->quorum);
1512 fields++;
1513 }
1514
1515 /* Only slaves */
1516 if (ri->flags & SRI_SLAVE) {
1517 addReplyBulkCString(c,"master-link-down-time");
1518 addReplyBulkLongLong(c,ri->master_link_down_time);
1519 fields++;
1520
1521 addReplyBulkCString(c,"master-link-status");
1522 addReplyBulkCString(c,
1523 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1524 "ok" : "err");
1525 fields++;
1526
1527 addReplyBulkCString(c,"master-host");
1528 addReplyBulkCString(c,
1529 ri->slave_master_host ? ri->slave_master_host : "?");
1530 fields++;
1531
1532 addReplyBulkCString(c,"master-port");
1533 addReplyBulkLongLong(c,ri->slave_master_port);
1534 fields++;
1535 }
1536
1537 /* Only sentinels */
1538 if (ri->flags & SRI_SENTINEL) {
1539 addReplyBulkCString(c,"last-hello-message");
1540 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1541 fields++;
1542
1543 addReplyBulkCString(c,"can-failover-its-master");
1544 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1545 fields++;
1546
1547 if (ri->flags & SRI_MASTER_DOWN) {
1548 addReplyBulkCString(c,"subjective-leader");
1549 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1550 fields++;
1551 }
1552 }
1553
1554 setDeferredMultiBulkLength(c,mbl,fields*2);
1555 }
1556
1557 /* Output a number of instances contanined inside a dictionary as
1558 * Redis protocol. */
1559 void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1560 dictIterator *di;
1561 dictEntry *de;
1562
1563 di = dictGetIterator(instances);
1564 addReplyMultiBulkLen(c,dictSize(instances));
1565 while((de = dictNext(di)) != NULL) {
1566 sentinelRedisInstance *ri = dictGetVal(de);
1567
1568 addReplySentinelRedisInstance(c,ri);
1569 }
1570 dictReleaseIterator(di);
1571 }
1572
1573 /* Lookup the named master into sentinel.masters.
1574 * If the master is not found reply to the client with an error and returns
1575 * NULL. */
1576 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1577 robj *name)
1578 {
1579 sentinelRedisInstance *ri;
1580
1581 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1582 if (!ri) {
1583 addReplyError(c,"No such master with that name");
1584 return NULL;
1585 }
1586 return ri;
1587 }
1588
1589 void sentinelCommand(redisClient *c) {
1590 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1591 /* SENTINEL MASTERS */
1592 if (c->argc != 2) goto numargserr;
1593
1594 addReplyDictOfRedisInstances(c,sentinel.masters);
1595 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1596 /* SENTINEL SLAVES <master-name> */
1597 sentinelRedisInstance *ri;
1598
1599 if (c->argc != 3) goto numargserr;
1600 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1601 return;
1602 addReplyDictOfRedisInstances(c,ri->slaves);
1603 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1604 /* SENTINEL SENTINELS <master-name> */
1605 sentinelRedisInstance *ri;
1606
1607 if (c->argc != 3) goto numargserr;
1608 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1609 return;
1610 addReplyDictOfRedisInstances(c,ri->sentinels);
1611 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1612 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1613 sentinelRedisInstance *ri;
1614 char *leader = NULL;
1615 long port;
1616 int isdown = 0;
1617
1618 if (c->argc != 4) goto numargserr;
1619 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
1620 return;
1621 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
1622 c->argv[2]->ptr,port,NULL);
1623
1624 /* It exists? Is actually a master? Is subjectively down? It's down.
1625 * Note: if we are in tilt mode we always reply with "0". */
1626 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
1627 (ri->flags & SRI_MASTER))
1628 isdown = 1;
1629 if (ri) leader = sentinelGetSubjectiveLeader(ri);
1630
1631 /* Reply with a two-elements multi-bulk reply: down state, leader. */
1632 addReplyMultiBulkLen(c,2);
1633 addReply(c, isdown ? shared.cone : shared.czero);
1634 addReplyBulkCString(c, leader ? leader : "?");
1635 if (leader) sdsfree(leader);
1636 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
1637 /* SENTINEL RESET <pattern> */
1638 if (c->argc != 3) goto numargserr;
1639 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
1640 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
1641 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
1642 sentinelRedisInstance *ri;
1643
1644 if (c->argc != 3) goto numargserr;
1645 ri = sentinelGetMasterByName(c->argv[2]->ptr);
1646 if (ri == NULL) {
1647 addReply(c,shared.nullmultibulk);
1648 } else {
1649 sentinelAddr *addr = ri->addr;
1650
1651 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
1652 addr = ri->promoted_slave->addr;
1653 addReplyMultiBulkLen(c,2);
1654 addReplyBulkCString(c,addr->ip);
1655 addReplyBulkLongLong(c,addr->port);
1656 }
1657 } else {
1658 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
1659 (char*)c->argv[1]->ptr);
1660 }
1661 return;
1662
1663 numargserr:
1664 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
1665 (char*)c->argv[1]->ptr);
1666 }
1667
1668 /* ===================== SENTINEL availability checks ======================= */
1669
1670 /* Is this instance down from our point of view? */
1671 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
1672 mstime_t elapsed = mstime() - ri->last_avail_time;
1673
1674 /* Check if we are in need for a reconnection of one of the
1675 * links, because we are detecting low activity.
1676 *
1677 * 1) Check if the command link seems connected, was connected not less
1678 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
1679 * idle time that is greater than down_after_period / 2 seconds. */
1680 if (ri->cc &&
1681 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1682 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
1683 {
1684 sentinelKillLink(ri,ri->cc);
1685 }
1686
1687 /* 2) Check if the pubsub link seems connected, was connected not less
1688 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
1689 * activity in the Pub/Sub channel for more than
1690 * SENTINEL_PUBLISH_PERIOD * 3.
1691 */
1692 if (ri->pc &&
1693 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1694 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
1695 {
1696 sentinelKillLink(ri,ri->pc);
1697 }
1698
1699 /* Update the subjectively down flag. */
1700 if (elapsed > ri->down_after_period) {
1701 /* Is subjectively down */
1702 if ((ri->flags & SRI_S_DOWN) == 0) {
1703 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
1704 ri->s_down_since_time = mstime();
1705 ri->flags |= SRI_S_DOWN;
1706 }
1707 } else {
1708 /* Is subjectively up */
1709 if (ri->flags & SRI_S_DOWN) {
1710 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
1711 ri->flags &= ~SRI_S_DOWN;
1712 }
1713 }
1714 }
1715
1716 /* Is this instance down accordingly to the configured quorum? */
1717 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
1718 dictIterator *di;
1719 dictEntry *de;
1720 int quorum = 0, odown = 0;
1721
1722 if (master->flags & SRI_S_DOWN) {
1723 /* Is down for enough sentinels? */
1724 quorum = 1; /* the current sentinel. */
1725 /* Count all the other sentinels. */
1726 di = dictGetIterator(master->sentinels);
1727 while((de = dictNext(di)) != NULL) {
1728 sentinelRedisInstance *ri = dictGetVal(de);
1729
1730 if (ri->flags & SRI_MASTER_DOWN) quorum++;
1731 }
1732 dictReleaseIterator(di);
1733 if (quorum >= master->quorum) odown = 1;
1734 }
1735
1736 /* Set the flag accordingly to the outcome. */
1737 if (odown) {
1738 if ((master->flags & SRI_O_DOWN) == 0) {
1739 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
1740 quorum, master->quorum);
1741 master->flags |= SRI_O_DOWN;
1742 master->o_down_since_time = mstime();
1743 }
1744 } else {
1745 if (master->flags & SRI_O_DOWN) {
1746 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
1747 master->flags &= ~SRI_O_DOWN;
1748 }
1749 }
1750 }
1751
1752 /* Receive the SENTINEL is-master-down-by-addr reply, see the
1753 * sentinelAskMasterStateToOtherSentinels() function for more information. */
1754 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
1755 sentinelRedisInstance *ri = c->data;
1756 redisReply *r;
1757
1758 if (ri) ri->pending_commands--;
1759 if (!reply || !ri) return;
1760 r = reply;
1761
1762 /* Ignore every error or unexpected reply.
1763 * Note that if the command returns an error for any reason we'll
1764 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
1765 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
1766 r->element[0]->type == REDIS_REPLY_INTEGER &&
1767 r->element[1]->type == REDIS_REPLY_STRING)
1768 {
1769 ri->last_master_down_reply_time = mstime();
1770 if (r->element[0]->integer == 1) {
1771 ri->flags |= SRI_MASTER_DOWN;
1772 } else {
1773 ri->flags &= ~SRI_MASTER_DOWN;
1774 }
1775 sdsfree(ri->leader);
1776 ri->leader = sdsnew(r->element[1]->str);
1777 }
1778 }
1779
1780 /* If we think (subjectively) the master is down, we start sending
1781 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
1782 * in order to get the replies that allow to reach the quorum and
1783 * possibly also mark the master as objectively down. */
1784 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
1785 dictIterator *di;
1786 dictEntry *de;
1787
1788 di = dictGetIterator(master->sentinels);
1789 while((de = dictNext(di)) != NULL) {
1790 sentinelRedisInstance *ri = dictGetVal(de);
1791 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
1792 char port[32];
1793 int retval;
1794
1795 /* If the master state from other sentinel is too old, we clear it. */
1796 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
1797 ri->flags &= ~SRI_MASTER_DOWN;
1798 sdsfree(ri->leader);
1799 ri->leader = NULL;
1800 }
1801
1802 /* Only ask if master is down to other sentinels if:
1803 *
1804 * 1) We believe it is down, or there is a failover in progress.
1805 * 2) Sentinel is connected.
1806 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
1807 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
1808 continue;
1809 if (ri->flags & SRI_DISCONNECTED) continue;
1810 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
1811 continue;
1812
1813 /* Ask */
1814 ll2string(port,sizeof(port),master->addr->port);
1815 retval = redisAsyncCommand(ri->cc,
1816 sentinelReceiveIsMasterDownReply, NULL,
1817 "SENTINEL is-master-down-by-addr %s %s",
1818 master->addr->ip, port);
1819 if (retval == REDIS_OK) ri->pending_commands++;
1820 }
1821 dictReleaseIterator(di);
1822 }
1823
1824 /* =============================== FAILOVER ================================= */
1825
1826 /* Given a master get the "subjective leader", that is, among all the sentinels
1827 * with given characteristics, the one with the lexicographically smaller
1828 * runid. The characteristics required are:
1829 *
1830 * 1) Has SRI_CAN_FAILOVER flag.
1831 * 2) Is not disconnected.
1832 * 3) Recently answered to our ping (no longer than
1833 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
1834 *
1835 * The function returns a pointer to an sds string representing the runid of the
1836 * leader sentinel instance (from our point of view). Otherwise NULL is
1837 * returned if there are no suitable sentinels.
1838 */
1839
1840 int compareRunID(const void *a, const void *b) {
1841 char **aptrptr = (char**)a, **bptrptr = (char**)b;
1842 return strcasecmp(*aptrptr, *bptrptr);
1843 }
1844
1845 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
1846 dictIterator *di;
1847 dictEntry *de;
1848 char **instance =
1849 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
1850 int instances = 0;
1851 char *leader = NULL;
1852
1853 if (master->flags & SRI_CAN_FAILOVER) {
1854 /* Add myself if I'm a Sentinel that can failover this master. */
1855 instance[instances++] = server.runid;
1856 }
1857
1858 di = dictGetIterator(master->sentinels);
1859 while((de = dictNext(di)) != NULL) {
1860 sentinelRedisInstance *ri = dictGetVal(de);
1861 mstime_t lag = mstime() - ri->last_avail_time;
1862
1863 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
1864 !(ri->flags & SRI_CAN_FAILOVER) ||
1865 (ri->flags & SRI_DISCONNECTED) ||
1866 ri->runid == NULL)
1867 continue;
1868 instance[instances++] = ri->runid;
1869 }
1870 dictReleaseIterator(di);
1871
1872 /* If we have at least one instance passing our checks, order the array
1873 * by runid. */
1874 if (instances) {
1875 qsort(instance,instances,sizeof(char*),compareRunID);
1876 leader = sdsnew(instance[0]);
1877 }
1878 zfree(instance);
1879 return leader;
1880 }
1881
1882 struct sentinelLeader {
1883 char *runid;
1884 unsigned long votes;
1885 };
1886
1887 /* Helper function for sentinelGetObjectiveLeader, increment the counter
1888 * relative to the specified runid. */
1889 void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
1890 dictEntry *de = dictFind(counters,runid);
1891 uint64_t oldval;
1892
1893 if (de) {
1894 oldval = dictGetUnsignedIntegerVal(de);
1895 dictSetUnsignedIntegerVal(de,oldval+1);
1896 } else {
1897 de = dictAddRaw(counters,runid);
1898 redisAssert(de != NULL);
1899 dictSetUnsignedIntegerVal(de,1);
1900 }
1901 }
1902
1903 /* Scan all the Sentinels attached to this master to check what is the
1904 * most voted leader among Sentinels. */
1905 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
1906 dict *counters;
1907 dictIterator *di;
1908 dictEntry *de;
1909 unsigned int voters = 0, voters_quorum;
1910 char *myvote;
1911 char *winner = NULL;
1912
1913 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
1914 counters = dictCreate(&leaderVotesDictType,NULL);
1915
1916 /* Count my vote. */
1917 myvote = sentinelGetSubjectiveLeader(master);
1918 if (myvote) {
1919 sentinelObjectiveLeaderIncr(counters,myvote);
1920 voters++;
1921 }
1922
1923 /* Count other sentinels votes */
1924 di = dictGetIterator(master->sentinels);
1925 while((de = dictNext(di)) != NULL) {
1926 sentinelRedisInstance *ri = dictGetVal(de);
1927 if (ri->leader == NULL) continue;
1928 /* If the failover is not already in progress we are only interested
1929 * in Sentinels that believe the master is down. Otherwise the leader
1930 * selection is useful for the "failover-takedown" when the original
1931 * leader fails. In that case we consider all the voters. */
1932 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1933 !(ri->flags & SRI_MASTER_DOWN)) continue;
1934 sentinelObjectiveLeaderIncr(counters,ri->leader);
1935 voters++;
1936 }
1937 dictReleaseIterator(di);
1938 voters_quorum = voters/2+1;
1939
1940 /* Check what's the winner. For the winner to win, it needs two conditions:
1941 * 1) Absolute majority between voters (50% + 1).
1942 * 2) And anyway at least master->quorum votes. */
1943 {
1944 uint64_t max_votes = 0; /* Max votes so far. */
1945
1946 di = dictGetIterator(counters);
1947 while((de = dictNext(di)) != NULL) {
1948 uint64_t votes = dictGetUnsignedIntegerVal(de);
1949
1950 if (max_votes < votes) {
1951 max_votes = votes;
1952 winner = dictGetKey(de);
1953 }
1954 }
1955 dictReleaseIterator(di);
1956 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
1957 winner = NULL;
1958 }
1959 winner = winner ? sdsnew(winner) : NULL;
1960 sdsfree(myvote);
1961 dictRelease(counters);
1962 return winner;
1963 }
1964
1965 /* This function checks if there are the conditions to start the failover,
1966 * that is:
1967 *
1968 * 1) Enough time has passed since O_DOWN.
1969 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
1970 * 3) We are the objectively leader for this master.
1971 *
1972 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
1973 * and SRI_I_AM_THE_LEADER.
1974 */
1975 void sentinelStartFailover(sentinelRedisInstance *master) {
1976 char *leader;
1977 int isleader;
1978
1979 /* We can't failover if the master is not in O_DOWN state or if
1980 * there is not already a failover in progress (to perform the
1981 * takedown if the leader died) or if this Sentinel is not allowed
1982 * to start a failover. */
1983 if (!(master->flags & SRI_CAN_FAILOVER) ||
1984 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
1985
1986 leader = sentinelGetObjectiveLeader(master);
1987 isleader = leader && strcasecmp(leader,server.runid) == 0;
1988 sdsfree(leader);
1989
1990 /* If I'm not the leader, I can't failover for sure. */
1991 if (!isleader) return;
1992
1993 /* If the failover is already in progress there are two options... */
1994 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
1995 if (master->flags & SRI_I_AM_THE_LEADER) {
1996 /* 1) I'm flagged as leader so I already started the failover.
1997 * Just return. */
1998 return;
1999 } else {
2000 mstime_t elapsed = mstime() - master->failover_state_change_time;
2001
2002 /* 2) I'm the new leader, but I'm not flagged as leader in the
2003 * master: I did not started the failover, but the original
2004 * leader has no longer the leadership.
2005 *
2006 * In this case if the failover appears to be lagging
2007 * for at least 25% of the configured failover timeout,
2008 * I can assume I can take control. Otherwise
2009 * it's better to return and wait more. */
2010 if (elapsed < (master->failover_timeout/4)) return;
2011 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
2012 /* We have already an elected slave if we are in
2013 * FAILOVER_IN_PROGRESS state, that is, the slave that we
2014 * observed turning into a master. */
2015 master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
2016 /* As an observer we flagged all the slaves as RECONF_SENT but
2017 * now we are in charge of actually sending the reconfiguration
2018 * command so let's clear this flag for all the instances. */
2019 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
2020 SRI_RECONF_SENT);
2021 }
2022 } else {
2023 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set. */
2024 master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
2025 }
2026
2027 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
2028 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
2029
2030 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
2031 * a recovery of a failover started by another sentinel. */
2032 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
2033 master->failover_start_time = mstime() +
2034 SENTINEL_FAILOVER_FIXED_DELAY +
2035 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
2036 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
2037 "%@ #starting in %lld milliseconds",
2038 master->failover_start_time-mstime());
2039 }
2040 master->failover_state_change_time = mstime();
2041 }
2042
2043 /* Select a suitable slave to promote. The current algorithm only uses
2044 * the following parameters:
2045 *
2046 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
2047 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
2048 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
2049 * 4) master_link_down_time no more than:
2050 * (now - master->s_down_since_time) + (master->down_after_period * 10).
2051 *
2052 * Among all the slaves matching the above conditions we select the slave
2053 * with lower slave_priority. If priority is the same we select the slave
2054 * with lexicographically smaller runid.
2055 *
2056 * The function returns the pointer to the selected slave, otherwise
2057 * NULL if no suitable slave was found.
2058 */
2059
2060 int compareSlavesForPromotion(const void *a, const void *b) {
2061 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
2062 **sb = (sentinelRedisInstance **)b;
2063 if ((*sa)->slave_priority != (*sb)->slave_priority)
2064 return (*sa)->slave_priority - (*sb)->slave_priority;
2065 return strcasecmp((*sa)->runid,(*sb)->runid);
2066 }
2067
2068 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
2069 sentinelRedisInstance **instance =
2070 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
2071 sentinelRedisInstance *selected = NULL;
2072 int instances = 0;
2073 dictIterator *di;
2074 dictEntry *de;
2075 mstime_t max_master_down_time;
2076
2077 max_master_down_time = (mstime() - master->s_down_since_time) +
2078 (master->down_after_period * 10);
2079
2080 di = dictGetIterator(master->slaves);
2081 while((de = dictNext(di)) != NULL) {
2082 sentinelRedisInstance *slave = dictGetVal(de);
2083 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2084
2085 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2086 if (slave->last_avail_time < info_validity_time) continue;
2087 if (slave->info_refresh < info_validity_time) continue;
2088 if (slave->master_link_down_time > max_master_down_time) continue;
2089 instance[instances++] = slave;
2090 }
2091 dictReleaseIterator(di);
2092 if (instances) {
2093 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2094 compareSlavesForPromotion);
2095 selected = instance[0];
2096 }
2097 zfree(instance);
2098 return selected;
2099 }
2100
2101 /* ---------------- Failover state machine implementation ------------------- */
2102 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2103 if (mstime() >= ri->failover_start_time) {
2104 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2105 ri->failover_state_change_time = mstime();
2106 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2107 }
2108 }
2109
2110 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2111 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2112
2113 if (slave == NULL) {
2114 sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
2115 sentinelAbortFailover(ri);
2116 } else {
2117 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2118 slave->flags |= SRI_PROMOTED;
2119 ri->promoted_slave = slave;
2120 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2121 ri->failover_state_change_time = mstime();
2122 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2123 slave, "%@");
2124 }
2125 }
2126
2127 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2128 int retval;
2129
2130 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2131
2132 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2133 * We actually register a generic callback for this command as we don't
2134 * really care about the reply. We check if it worked indirectly observing
2135 * if INFO returns a different role (master instead of slave). */
2136 retval = redisAsyncCommand(ri->promoted_slave->cc,
2137 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2138 if (retval != REDIS_OK) return;
2139 ri->promoted_slave->pending_commands++;
2140 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2141 ri->promoted_slave,"%@");
2142 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2143 ri->failover_state_change_time = mstime();
2144 }
2145
2146 /* We actually wait for promotion indirectly checking with INFO when the
2147 * slave turns into a master. */
2148 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2149 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2150
2151 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2152 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2153 "%@");
2154 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2155 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2156 ri->failover_state_change_time = mstime();
2157 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2158 ri->promoted_slave = NULL;
2159 }
2160 }
2161
2162 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2163 int not_reconfigured = 0, timeout = 0;
2164 dictIterator *di;
2165 dictEntry *de;
2166 mstime_t elapsed = mstime() - master->failover_state_change_time;
2167
2168 /* We can't consider failover finished if the promoted slave is
2169 * not reachable. */
2170 if (master->promoted_slave == NULL ||
2171 master->promoted_slave->flags & SRI_S_DOWN) return;
2172
2173 /* The failover terminates once all the reachable slaves are properly
2174 * configured. */
2175 di = dictGetIterator(master->slaves);
2176 while((de = dictNext(di)) != NULL) {
2177 sentinelRedisInstance *slave = dictGetVal(de);
2178
2179 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2180 if (slave->flags & SRI_S_DOWN) continue;
2181 not_reconfigured++;
2182 }
2183 dictReleaseIterator(di);
2184
2185 /* Force end of failover on timeout. */
2186 if (elapsed > master->failover_timeout) {
2187 not_reconfigured = 0;
2188 timeout = 1;
2189 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2190 }
2191
2192 if (not_reconfigured == 0) {
2193 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2194 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2195 master->failover_state_change_time = mstime();
2196 }
2197
2198 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2199 * command to all the slaves still not reconfigured to replicate with
2200 * the new master. */
2201 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2202 dictIterator *di;
2203 dictEntry *de;
2204 char master_port[32];
2205
2206 ll2string(master_port,sizeof(master_port),
2207 master->promoted_slave->addr->port);
2208
2209 di = dictGetIterator(master->slaves);
2210 while((de = dictNext(di)) != NULL) {
2211 sentinelRedisInstance *slave = dictGetVal(de);
2212 int retval;
2213
2214 if (slave->flags &
2215 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2216
2217 retval = redisAsyncCommand(slave->cc,
2218 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2219 master->promoted_slave->addr->ip,
2220 master_port);
2221 if (retval == REDIS_OK) {
2222 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2223 slave->flags |= SRI_RECONF_SENT;
2224 }
2225 }
2226 dictReleaseIterator(di);
2227 }
2228 }
2229
2230 /* Send SLAVE OF <new master address> to all the remaining slaves that
2231 * still don't appear to have the configuration updated. */
2232 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2233 dictIterator *di;
2234 dictEntry *de;
2235 int in_progress = 0;
2236
2237 di = dictGetIterator(master->slaves);
2238 while((de = dictNext(di)) != NULL) {
2239 sentinelRedisInstance *slave = dictGetVal(de);
2240
2241 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2242 in_progress++;
2243 }
2244 dictReleaseIterator(di);
2245
2246 di = dictGetIterator(master->slaves);
2247 while(in_progress < master->parallel_syncs &&
2248 (de = dictNext(di)) != NULL)
2249 {
2250 sentinelRedisInstance *slave = dictGetVal(de);
2251 int retval;
2252 char master_port[32];
2253
2254 /* Skip the promoted slave, and already configured slaves. */
2255 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2256
2257 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2258 * the slave moving forward to the next state. */
2259 if ((slave->flags & SRI_RECONF_SENT) &&
2260 (mstime() - slave->slave_reconf_sent_time) >
2261 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2262 {
2263 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2264 slave->flags &= ~SRI_RECONF_SENT;
2265 }
2266
2267 /* Nothing to do for instances that are disconnected or already
2268 * in RECONF_SENT state. */
2269 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2270 continue;
2271
2272 /* Send SLAVEOF <new master>. */
2273 ll2string(master_port,sizeof(master_port),
2274 master->promoted_slave->addr->port);
2275 retval = redisAsyncCommand(slave->cc,
2276 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2277 master->promoted_slave->addr->ip,
2278 master_port);
2279 if (retval == REDIS_OK) {
2280 slave->flags |= SRI_RECONF_SENT;
2281 slave->pending_commands++;
2282 slave->slave_reconf_sent_time = mstime();
2283 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2284 in_progress++;
2285 }
2286 }
2287 dictReleaseIterator(di);
2288 sentinelFailoverDetectEnd(master);
2289 }
2290
2291 /* This function is called when the slave is in
2292 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2293 * to remove it from the master table and add the promoted slave instead.
2294 *
2295 * If there are no promoted slaves as this instance is unique, we remove
2296 * and re-add it with the same address to trigger a complete state
2297 * refresh. */
2298 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2299 sentinelRedisInstance *ref = master->promoted_slave ?
2300 master->promoted_slave : master;
2301
2302 sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
2303 master->name, master->addr->ip, master->addr->port,
2304 ref->addr->ip, ref->addr->port);
2305
2306 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
2307 }
2308
2309 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2310 redisAssert(ri->flags & SRI_MASTER);
2311
2312 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2313
2314 switch(ri->failover_state) {
2315 case SENTINEL_FAILOVER_STATE_WAIT_START:
2316 sentinelFailoverWaitStart(ri);
2317 break;
2318 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2319 sentinelFailoverSelectSlave(ri);
2320 break;
2321 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2322 sentinelFailoverSendSlaveOfNoOne(ri);
2323 break;
2324 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2325 sentinelFailoverWaitPromotion(ri);
2326 break;
2327 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2328 sentinelFailoverReconfNextSlave(ri);
2329 break;
2330 case SENTINEL_FAILOVER_STATE_DETECT_END:
2331 sentinelFailoverDetectEnd(ri);
2332 break;
2333 }
2334 }
2335
2336 /* Abort a failover in progress with the following steps:
2337 * 1) If this instance is the leaer send a SLAVEOF command to all the already
2338 * reconfigured slaves if any to configure them to replicate with the
2339 * original master.
2340 * 2) For both leaders and observers: clear the failover flags and state in
2341 * the master instance.
2342 * 3) If there is already a promoted slave and we are the leader, and this
2343 * slave is not DISCONNECTED, try to reconfigure it to replicate
2344 * back to the master as well, sending a best effort SLAVEOF command.
2345 */
2346 void sentinelAbortFailover(sentinelRedisInstance *ri) {
2347 char master_port[32];
2348 dictIterator *di;
2349 dictEntry *de;
2350
2351 redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
2352 ll2string(master_port,sizeof(master_port),ri->addr->port);
2353
2354 /* Clear failover related flags from slaves.
2355 * Also if we are the leader make sure to send SLAVEOF commands to all the
2356 * already reconfigured slaves in order to turn them back into slaves of
2357 * the original master. */
2358 di = dictGetIterator(ri->slaves);
2359 while((de = dictNext(di)) != NULL) {
2360 sentinelRedisInstance *slave = dictGetVal(de);
2361 if ((ri->flags & SRI_I_AM_THE_LEADER) &&
2362 !(slave->flags & SRI_DISCONNECTED) &&
2363 (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
2364 SRI_RECONF_DONE)))
2365 {
2366 int retval;
2367
2368 retval = redisAsyncCommand(slave->cc,
2369 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2370 ri->addr->ip,
2371 master_port);
2372 if (retval == REDIS_OK)
2373 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2374 }
2375 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2376 }
2377 dictReleaseIterator(di);
2378
2379 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER);
2380 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2381 ri->failover_state_change_time = mstime();
2382 if (ri->promoted_slave) {
2383 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2384 ri->promoted_slave = NULL;
2385 }
2386 }
2387
2388 /* The following is called only for master instances and will abort the
2389 * failover process if:
2390 *
2391 * 1) The failover is in progress.
2392 * 2) We already promoted a slave.
2393 * 3) The promoted slave is in extended SDOWN condition.
2394 */
2395 void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2396 /* Failover is in progress? Do we have a promoted slave? */
2397 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2398
2399 /* Is the promoted slave into an extended SDOWN state? */
2400 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2401 (mstime() - ri->promoted_slave->s_down_since_time) <
2402 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2403
2404 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2405 sentinelAbortFailover(ri);
2406 }
2407
2408 /* ======================== SENTINEL timer handler ==========================
2409 * This is the "main" our Sentinel, being sentinel completely non blocking
2410 * in design. The function is called every second.
2411 * -------------------------------------------------------------------------- */
2412
2413 /* Perform scheduled operations for the specified Redis instance. */
2414 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2415 /* ========== MONITORING HALF ============ */
2416 /* Every kind of instance */
2417 sentinelReconnectInstance(ri);
2418 sentinelPingInstance(ri);
2419
2420 /* Masters and slaves */
2421 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2422 /* Nothing so far. */
2423 }
2424
2425 /* Only masters */
2426 if (ri->flags & SRI_MASTER) {
2427 sentinelAskMasterStateToOtherSentinels(ri);
2428 }
2429
2430 /* ============== ACTING HALF ============= */
2431 /* We don't proceed with the acting half if we are in TILT mode.
2432 * TILT happens when we find something odd with the time, like a
2433 * sudden change in the clock. */
2434 if (sentinel.tilt) {
2435 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2436 sentinel.tilt = 0;
2437 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2438 }
2439
2440 /* Every kind of instance */
2441 sentinelCheckSubjectivelyDown(ri);
2442
2443 /* Masters and slaves */
2444 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2445 /* Nothing so far. */
2446 }
2447
2448 /* Only masters */
2449 if (ri->flags & SRI_MASTER) {
2450 sentinelCheckObjectivelyDown(ri);
2451 sentinelStartFailover(ri);
2452 sentinelFailoverStateMachine(ri);
2453 sentinelAbortFailoverIfNeeded(ri);
2454 }
2455 }
2456
2457 /* Perform scheduled operations for all the instances in the dictionary.
2458 * Recursively call the function against dictionaries of slaves. */
2459 void sentinelHandleDictOfRedisInstances(dict *instances) {
2460 dictIterator *di;
2461 dictEntry *de;
2462 sentinelRedisInstance *switch_to_promoted = NULL;
2463
2464 /* There are a number of things we need to perform against every master. */
2465 di = dictGetIterator(instances);
2466 while((de = dictNext(di)) != NULL) {
2467 sentinelRedisInstance *ri = dictGetVal(de);
2468
2469 sentinelHandleRedisInstance(ri);
2470 if (ri->flags & SRI_MASTER) {
2471 sentinelHandleDictOfRedisInstances(ri->slaves);
2472 sentinelHandleDictOfRedisInstances(ri->sentinels);
2473 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
2474 switch_to_promoted = ri;
2475 }
2476 }
2477 }
2478 if (switch_to_promoted)
2479 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
2480 dictReleaseIterator(di);
2481 }
2482
2483 /* This function checks if we need to enter the TITL mode.
2484 *
2485 * The TILT mode is entered if we detect that between two invocations of the
2486 * timer interrupt, a negative amount of time, or too much time has passed.
2487 * Note that we expect that more or less just 100 milliseconds will pass
2488 * if everything is fine. However we'll see a negative number or a
2489 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
2490 * following conditions happen:
2491 *
2492 * 1) The Sentiel process for some time is blocked, for every kind of
2493 * random reason: the load is huge, the computer was freezed for some time
2494 * in I/O or alike, the process was stopped by a signal. Everything.
2495 * 2) The system clock was altered significantly.
2496 *
2497 * Under both this conditions we'll see everything as timed out and failing
2498 * without good reasons. Instead we enter the TILT mode and wait
2499 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
2500 *
2501 * During TILT time we still collect information, we just do not act. */
2502 void sentinelCheckTiltCondition(void) {
2503 mstime_t now = mstime();
2504 mstime_t delta = now - sentinel.previous_time;
2505
2506 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
2507 sentinel.tilt = 1;
2508 sentinel.tilt_start_time = mstime();
2509 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
2510 }
2511 sentinel.previous_time = mstime();
2512 }
2513
2514 void sentinelTimer(void) {
2515 sentinelCheckTiltCondition();
2516 sentinelHandleDictOfRedisInstances(sentinel.masters);
2517 }
2518