]> git.saurik.com Git - redis.git/blob - src/sentinel.c
Fixed an error in the example sentinel.conf.
[redis.git] / src / sentinel.c
1 /* Redis Sentinel implementation
2 * -----------------------------
3 *
4 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include "redis.h"
33 #include "hiredis.h"
34 #include "async.h"
35
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <sys/socket.h>
39
40 #define REDIS_SENTINEL_PORT 26379
41
42 /* ======================== Sentinel global state =========================== */
43
44 typedef long long mstime_t; /* millisecond time type. */
45
46 /* Address object, used to describe an ip:port pair. */
47 typedef struct sentinelAddr {
48 char *ip;
49 int port;
50 } sentinelAddr;
51
52 /* A Sentinel Redis Instance object is monitoring. */
53 #define SRI_MASTER (1<<0)
54 #define SRI_SLAVE (1<<1)
55 #define SRI_SENTINEL (1<<2)
56 #define SRI_DISCONNECTED (1<<3)
57 #define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
58 #define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
59 #define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
60 its master is down. */
61 /* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
62 * allowed to perform the failover for this master.
63 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
64 * perform the failover on its master. */
65 #define SRI_CAN_FAILOVER (1<<7)
66 #define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
67 this master. */
68 #define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
69 #define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
70 #define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
71 #define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
72 #define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
73
74 #define SENTINEL_INFO_PERIOD 10000
75 #define SENTINEL_PING_PERIOD 1000
76 #define SENTINEL_ASK_PERIOD 1000
77 #define SENTINEL_PUBLISH_PERIOD 5000
78 #define SENTINEL_DOWN_AFTER_PERIOD 30000
79 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
80 #define SENTINEL_TILT_TRIGGER 2000
81 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
82 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
83 #define SENTINEL_PROMOTION_RETRY_PERIOD 30000
84 #define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
85 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
86 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
87 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
88 #define SENTINEL_MAX_PENDING_COMMANDS 100
89 #define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
90
91 /* How many milliseconds is an information valid? This applies for instance
92 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
93 #define SENTINEL_INFO_VALIDITY_TIME 5000
94 #define SENTINEL_FAILOVER_FIXED_DELAY 5000
95 #define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
96
97 /* Failover machine different states. */
98 #define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
99 #define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
100 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
101 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
102 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
103 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
104 #define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
105 #define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
106 #define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
107 #define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
108 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
109
110 #define SENTINEL_MASTER_LINK_STATUS_UP 0
111 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
112
113 typedef struct sentinelRedisInstance {
114 int flags; /* See SRI_... defines */
115 char *name; /* Master name from the point of view of this sentinel. */
116 char *runid; /* run ID of this instance. */
117 sentinelAddr *addr; /* Master host. */
118 redisAsyncContext *cc; /* Hiredis context for commands. */
119 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
120 int pending_commands; /* Number of commands sent waiting for a reply. */
121 mstime_t cc_conn_time; /* cc connection time. */
122 mstime_t pc_conn_time; /* pc connection time. */
123 mstime_t pc_last_activity; /* Last time we received any message. */
124 mstime_t last_avail_time; /* Last time the instance replied to ping with
125 a reply we consider valid. */
126 mstime_t last_pong_time; /* Last time the instance replied to ping,
127 whatever the reply was. That's used to check
128 if the link is idle and must be reconnected. */
129 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
130 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
131 we received an hello from this Sentinel
132 via Pub/Sub. */
133 mstime_t last_master_down_reply_time; /* Time of last reply to
134 SENTINEL is-master-down command. */
135 mstime_t s_down_since_time; /* Subjectively down since time. */
136 mstime_t o_down_since_time; /* Objectively down since time. */
137 mstime_t down_after_period; /* Consider it down after that period. */
138 mstime_t info_refresh; /* Time at which we received INFO output from it. */
139
140 /* Master specific. */
141 dict *sentinels; /* Other sentinels monitoring the same master. */
142 dict *slaves; /* Slaves for this master instance. */
143 int quorum; /* Number of sentinels that need to agree on failure. */
144 int parallel_syncs; /* How many slaves to reconfigure at same time. */
145
146 /* Slave specific. */
147 mstime_t master_link_down_time; /* Slave replication link down time. */
148 int slave_priority; /* Slave priority according to its INFO output. */
149 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
150 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
151 char *slave_master_host; /* Master host as reported by INFO */
152 int slave_master_port; /* Master port as reported by INFO */
153 int slave_master_link_status; /* Master link status as reported by INFO */
154 /* Failover */
155 char *leader; /* If this is a master instance, this is the runid of
156 the Sentinel that should perform the failover. If
157 this is a Sentinel, this is the runid of the Sentinel
158 that this other Sentinel is voting as leader.
159 This field is valid only if SRI_MASTER_DOWN is
160 set on the Sentinel instance. */
161 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
162 mstime_t failover_state_change_time;
163 mstime_t failover_start_time; /* When to start to failover if leader. */
164 mstime_t failover_timeout; /* Max time to refresh failover state. */
165 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
166 /* Scripts executed to notify admin or reconfigure clients: when they
167 * are set to NULL no script is executed. */
168 char *notify_script;
169 char *client_reconfig_script;
170 } sentinelRedisInstance;
171
172 /* Main state. */
173 struct sentinelState {
174 dict *masters; /* Dictionary of master sentinelRedisInstances.
175 Key is the instance name, value is the
176 sentinelRedisInstance structure pointer. */
177 int tilt; /* Are we in TILT mode? */
178 mstime_t tilt_start_time; /* When TITL started. */
179 mstime_t previous_time; /* Time last time we ran the time handler. */
180 } sentinel;
181
182 /* ======================= hiredis ae.c adapters =============================
183 * Note: this implementation is taken from hiredis/adapters/ae.h, however
184 * we have our modified copy for Sentinel in order to use our allocator
185 * and to have full control over how the adapter works. */
186
187 typedef struct redisAeEvents {
188 redisAsyncContext *context;
189 aeEventLoop *loop;
190 int fd;
191 int reading, writing;
192 } redisAeEvents;
193
194 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
195 ((void)el); ((void)fd); ((void)mask);
196
197 redisAeEvents *e = (redisAeEvents*)privdata;
198 redisAsyncHandleRead(e->context);
199 }
200
201 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
202 ((void)el); ((void)fd); ((void)mask);
203
204 redisAeEvents *e = (redisAeEvents*)privdata;
205 redisAsyncHandleWrite(e->context);
206 }
207
208 static void redisAeAddRead(void *privdata) {
209 redisAeEvents *e = (redisAeEvents*)privdata;
210 aeEventLoop *loop = e->loop;
211 if (!e->reading) {
212 e->reading = 1;
213 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
214 }
215 }
216
217 static void redisAeDelRead(void *privdata) {
218 redisAeEvents *e = (redisAeEvents*)privdata;
219 aeEventLoop *loop = e->loop;
220 if (e->reading) {
221 e->reading = 0;
222 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
223 }
224 }
225
226 static void redisAeAddWrite(void *privdata) {
227 redisAeEvents *e = (redisAeEvents*)privdata;
228 aeEventLoop *loop = e->loop;
229 if (!e->writing) {
230 e->writing = 1;
231 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
232 }
233 }
234
235 static void redisAeDelWrite(void *privdata) {
236 redisAeEvents *e = (redisAeEvents*)privdata;
237 aeEventLoop *loop = e->loop;
238 if (e->writing) {
239 e->writing = 0;
240 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
241 }
242 }
243
244 static void redisAeCleanup(void *privdata) {
245 redisAeEvents *e = (redisAeEvents*)privdata;
246 redisAeDelRead(privdata);
247 redisAeDelWrite(privdata);
248 zfree(e);
249 }
250
251 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
252 redisContext *c = &(ac->c);
253 redisAeEvents *e;
254
255 /* Nothing should be attached when something is already attached */
256 if (ac->ev.data != NULL)
257 return REDIS_ERR;
258
259 /* Create container for context and r/w events */
260 e = (redisAeEvents*)zmalloc(sizeof(*e));
261 e->context = ac;
262 e->loop = loop;
263 e->fd = c->fd;
264 e->reading = e->writing = 0;
265
266 /* Register functions to start/stop listening for events */
267 ac->ev.addRead = redisAeAddRead;
268 ac->ev.delRead = redisAeDelRead;
269 ac->ev.addWrite = redisAeAddWrite;
270 ac->ev.delWrite = redisAeDelWrite;
271 ac->ev.cleanup = redisAeCleanup;
272 ac->ev.data = e;
273
274 return REDIS_OK;
275 }
276
277 /* ============================= Prototypes ================================= */
278
279 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
280 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
281 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
282 sentinelRedisInstance *sentinelGetMasterByName(char *name);
283 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
284 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
285 int yesnotoi(char *s);
286 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
287 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
288
289 /* ========================= Dictionary types =============================== */
290
291 unsigned int dictSdsHash(const void *key);
292 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
293 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
294
295 void dictInstancesValDestructor (void *privdata, void *obj) {
296 releaseSentinelRedisInstance(obj);
297 }
298
299 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
300 *
301 * also used for: sentinelRedisInstance->sentinels dictionary that maps
302 * sentinels ip:port to last seen time in Pub/Sub hello message. */
303 dictType instancesDictType = {
304 dictSdsHash, /* hash function */
305 NULL, /* key dup */
306 NULL, /* val dup */
307 dictSdsKeyCompare, /* key compare */
308 NULL, /* key destructor */
309 dictInstancesValDestructor /* val destructor */
310 };
311
312 /* Instance runid (sds) -> votes (long casted to void*)
313 *
314 * This is useful into sentinelGetObjectiveLeader() function in order to
315 * count the votes and understand who is the leader. */
316 dictType leaderVotesDictType = {
317 dictSdsHash, /* hash function */
318 NULL, /* key dup */
319 NULL, /* val dup */
320 dictSdsKeyCompare, /* key compare */
321 NULL, /* key destructor */
322 NULL /* val destructor */
323 };
324
325 /* =========================== Initialization =============================== */
326
327 void sentinelCommand(redisClient *c);
328
329 struct redisCommand sentinelcmds[] = {
330 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
331 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
332 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
333 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
334 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
335 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}
336 };
337
338 /* This function overwrites a few normal Redis config default with Sentinel
339 * specific defaults. */
340 void initSentinelConfig(void) {
341 server.port = REDIS_SENTINEL_PORT;
342 }
343
344 /* Perform the Sentinel mode initialization. */
345 void initSentinel(void) {
346 int j;
347
348 /* Remove usual Redis commands from the command table, then just add
349 * the SENTINEL command. */
350 dictEmpty(server.commands);
351 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
352 int retval;
353 struct redisCommand *cmd = sentinelcmds+j;
354
355 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
356 redisAssert(retval == DICT_OK);
357 }
358
359 /* Initialize various data structures. */
360 sentinel.masters = dictCreate(&instancesDictType,NULL);
361 sentinel.tilt = 0;
362 sentinel.tilt_start_time = mstime();
363 sentinel.previous_time = mstime();
364 }
365
366 /* ============================== sentinelAddr ============================== */
367
368 /* Create a sentinelAddr object and return it on success.
369 * On error NULL is returned and errno is set to:
370 * ENOENT: Can't resolve the hostname.
371 * EINVAL: Invalid port number.
372 */
373 sentinelAddr *createSentinelAddr(char *hostname, int port) {
374 char buf[32];
375 sentinelAddr *sa;
376
377 if (port <= 0 || port > 65535) {
378 errno = EINVAL;
379 return NULL;
380 }
381 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
382 errno = ENOENT;
383 return NULL;
384 }
385 sa = zmalloc(sizeof(*sa));
386 sa->ip = sdsnew(buf);
387 sa->port = port;
388 return sa;
389 }
390
391 /* Free a Sentinel address. Can't fail. */
392 void releaseSentinelAddr(sentinelAddr *sa) {
393 sdsfree(sa->ip);
394 zfree(sa);
395 }
396
397 /* =========================== Events notification ========================== */
398
399 void sentinelCallNotificationScript(char *scriptpath, char *type, char *msg) {
400 /* TODO: implement it. */
401 }
402
403 /* Send an event to log, pub/sub, user notification script.
404 *
405 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
406 * the execution of the user notification script.
407 *
408 * 'type' is the message type, also used as a pub/sub channel name.
409 *
410 * 'ri', is the redis instance target of this event if applicable, and is
411 * used to obtain the path of the notification script to execute.
412 *
413 * The remaining arguments are printf-alike.
414 * If the format specifier starts with the two characters "%@" then ri is
415 * not NULL, and the message is prefixed with an instance identifier in the
416 * following format:
417 *
418 * <instance type> <instance name> <ip> <port>
419 *
420 * If the instance type is not master, than the additional string is
421 * added to specify the originating master:
422 *
423 * @ <master name> <master ip> <master port>
424 *
425 * Any other specifier after "%@" is processed by printf itself.
426 */
427 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
428 const char *fmt, ...) {
429 va_list ap;
430 char msg[REDIS_MAX_LOGMSG_LEN];
431 robj *channel, *payload;
432
433 /* Handle %@ */
434 if (fmt[0] == '%' && fmt[1] == '@') {
435 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
436 NULL : ri->master;
437
438 if (master) {
439 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
440 sentinelRedisInstanceTypeStr(ri),
441 ri->name, ri->addr->ip, ri->addr->port,
442 master->name, master->addr->ip, master->addr->port);
443 } else {
444 snprintf(msg, sizeof(msg), "%s %s %s %d",
445 sentinelRedisInstanceTypeStr(ri),
446 ri->name, ri->addr->ip, ri->addr->port);
447 }
448 fmt += 2;
449 } else {
450 msg[0] = '\0';
451 }
452
453 /* Use vsprintf for the rest of the formatting if any. */
454 if (fmt[0] != '\0') {
455 va_start(ap, fmt);
456 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
457 va_end(ap);
458 }
459
460 /* Log the message if the log level allows it to be logged. */
461 if (level >= server.verbosity)
462 redisLog(level,"%s %s",type,msg);
463
464 /* Publish the message via Pub/Sub if it's not a debugging one. */
465 if (level != REDIS_DEBUG) {
466 channel = createStringObject(type,strlen(type));
467 payload = createStringObject(msg,strlen(msg));
468 pubsubPublishMessage(channel,payload);
469 decrRefCount(channel);
470 decrRefCount(payload);
471 }
472
473 /* Call the notification script if applicable. */
474 if (level == REDIS_WARNING && ri != NULL) {
475 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
476 ri : ri->master;
477 if (master->notify_script) {
478 sentinelCallNotificationScript(master->notify_script,type,msg);
479 }
480 }
481 }
482
483 /* ========================== sentinelRedisInstance ========================= */
484
485 /* Create a redis instance, the following fields must be populated by the
486 * caller if needed:
487 * runid: set to NULL but will be populated once INFO output is received.
488 * info_refresh: is set to 0 to mean that we never received INFO so far.
489 *
490 * If SRI_MASTER is set into initial flags the instance is added to
491 * sentinel.masters table.
492 *
493 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
494 * instance is added into master->slaves or master->sentinels table.
495 *
496 * If the instance is a slave or sentinel, the name parameter is ignored and
497 * is created automatically as hostname:port.
498 *
499 * The function fails if hostname can't be resolved or port is out of range.
500 * When this happens NULL is returned and errno is set accordingly to the
501 * createSentinelAddr() function.
502 *
503 * The function may also fail and return NULL with errno set to EBUSY if
504 * a master or slave with the same name already exists. */
505 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
506 sentinelRedisInstance *ri;
507 sentinelAddr *addr;
508 dict *table;
509 char slavename[128], *sdsname;
510
511 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
512 redisAssert((flags & SRI_MASTER) || master != NULL);
513
514 /* Check address validity. */
515 addr = createSentinelAddr(hostname,port);
516 if (addr == NULL) return NULL;
517
518 /* For slaves and sentinel we use ip:port as name. */
519 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
520 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
521 name = slavename;
522 }
523
524 /* Make sure the entry is not duplicated. This may happen when the same
525 * name for a master is used multiple times inside the configuration or
526 * if we try to add multiple times a slave or sentinel with same ip/port
527 * to a master. */
528 if (flags & SRI_MASTER) table = sentinel.masters;
529 else if (flags & SRI_SLAVE) table = master->slaves;
530 else if (flags & SRI_SENTINEL) table = master->sentinels;
531 sdsname = sdsnew(name);
532 if (dictFind(table,sdsname)) {
533 sdsfree(sdsname);
534 errno = EBUSY;
535 return NULL;
536 }
537
538 /* Create the instance object. */
539 ri = zmalloc(sizeof(*ri));
540 /* Note that all the instances are started in the disconnected state,
541 * the event loop will take care of connecting them. */
542 ri->flags = flags | SRI_DISCONNECTED;
543 ri->name = sdsname;
544 ri->runid = NULL;
545 ri->addr = addr;
546 ri->cc = NULL;
547 ri->pc = NULL;
548 ri->pending_commands = 0;
549 ri->cc_conn_time = 0;
550 ri->pc_conn_time = 0;
551 ri->pc_last_activity = 0;
552 ri->last_avail_time = mstime();
553 ri->last_pong_time = mstime();
554 ri->last_pub_time = mstime();
555 ri->last_hello_time = mstime();
556 ri->last_master_down_reply_time = mstime();
557 ri->s_down_since_time = 0;
558 ri->o_down_since_time = 0;
559 ri->down_after_period = master ? master->down_after_period :
560 SENTINEL_DOWN_AFTER_PERIOD;
561 ri->master_link_down_time = 0;
562 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
563 ri->slave_reconf_sent_time = 0;
564 ri->slave_master_host = NULL;
565 ri->slave_master_port = 0;
566 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
567 ri->sentinels = dictCreate(&instancesDictType,NULL);
568 ri->quorum = quorum;
569 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
570 ri->master = master;
571 ri->slaves = dictCreate(&instancesDictType,NULL);
572 ri->info_refresh = 0;
573
574 /* Failover state. */
575 ri->leader = NULL;
576 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
577 ri->failover_state_change_time = 0;
578 ri->failover_start_time = 0;
579 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
580 ri->promoted_slave = NULL;
581 ri->notify_script = NULL;
582 ri->client_reconfig_script = NULL;
583
584 /* Add into the right table. */
585 dictAdd(table, ri->name, ri);
586 return ri;
587 }
588
589 /* Release this instance and all its slaves, sentinels, hiredis connections.
590 * This function also takes care of unlinking the instance from the main
591 * masters table (if it is a master) or from its master sentinels/slaves table
592 * if it is a slave or sentinel. */
593 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
594 /* Release all its slaves or sentinels if any. */
595 dictRelease(ri->sentinels);
596 dictRelease(ri->slaves);
597
598 /* Release hiredis connections. Note that redisAsyncFree() will call
599 * the disconnection callback. */
600 if (ri->cc) {
601 redisAsyncFree(ri->cc);
602 ri->cc = NULL;
603 }
604 if (ri->pc) {
605 redisAsyncFree(ri->pc);
606 ri->pc = NULL;
607 }
608
609 /* Free other resources. */
610 sdsfree(ri->name);
611 sdsfree(ri->runid);
612 sdsfree(ri->notify_script);
613 sdsfree(ri->client_reconfig_script);
614 sdsfree(ri->slave_master_host);
615 sdsfree(ri->leader);
616 releaseSentinelAddr(ri->addr);
617
618 /* Clear state into the master if needed. */
619 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
620 ri->master->promoted_slave = NULL;
621
622 zfree(ri);
623 }
624
625 /* Lookup a slave in a master Redis instance, by ip and port. */
626 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
627 sentinelRedisInstance *ri, char *ip, int port)
628 {
629 sds key;
630 sentinelRedisInstance *slave;
631
632 redisAssert(ri->flags & SRI_MASTER);
633 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
634 slave = dictFetchValue(ri->slaves,key);
635 sdsfree(key);
636 return slave;
637 }
638
639 /* Return the name of the type of the instance as a string. */
640 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
641 if (ri->flags & SRI_MASTER) return "master";
642 else if (ri->flags & SRI_SLAVE) return "slave";
643 else if (ri->flags & SRI_SENTINEL) return "sentinel";
644 else return "unknown";
645 }
646
647 /* This function removes all the instances found in the dictionary of instances
648 * 'd', having either:
649 *
650 * 1) The same ip/port as specified.
651 * 2) The same runid.
652 *
653 * "1" and "2" don't need to verify at the same time, just one is enough.
654 * If "runid" is NULL it is not checked.
655 * Similarly if "ip" is NULL it is not checked.
656 *
657 * This function is useful because every time we add a new Sentinel into
658 * a master's Sentinels dictionary, we want to be very sure about not
659 * having duplicated instances for any reason. This is so important because
660 * we use those other sentinels in order to run our quorum protocol to
661 * understand if it's time to proceeed with the fail over.
662 *
663 * Making sure no duplication is possible we greately improve the robustness
664 * of the quorum (otherwise we may end counting the same instance multiple
665 * times for some reason).
666 *
667 * The function returns the number of Sentinels removed. */
668 int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
669 dictIterator *di;
670 dictEntry *de;
671 int removed = 0;
672
673 di = dictGetSafeIterator(master->sentinels);
674 while((de = dictNext(di)) != NULL) {
675 sentinelRedisInstance *ri = dictGetVal(de);
676
677 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
678 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
679 {
680 dictDelete(master->sentinels,ri->name);
681 removed++;
682 }
683 }
684 dictReleaseIterator(di);
685 return removed;
686 }
687
688 /* Search an instance with the same runid, ip and port into a dictionary
689 * of instances. Return NULL if not found, otherwise return the instance
690 * pointer.
691 *
692 * runid or ip can be NULL. In such a case the search is performed only
693 * by the non-NULL field. */
694 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
695 dictIterator *di;
696 dictEntry *de;
697 sentinelRedisInstance *instance = NULL;
698
699 redisAssert(ip || runid); /* User must pass at least one search param. */
700 di = dictGetIterator(instances);
701 while((de = dictNext(di)) != NULL) {
702 sentinelRedisInstance *ri = dictGetVal(de);
703
704 if (runid && !ri->runid) continue;
705 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
706 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
707 ri->addr->port == port)))
708 {
709 instance = ri;
710 break;
711 }
712 }
713 dictReleaseIterator(di);
714 return instance;
715 }
716
717 /* Simple master lookup by name */
718 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
719 sentinelRedisInstance *ri;
720 sds sdsname = sdsnew(name);
721
722 ri = dictFetchValue(sentinel.masters,sdsname);
723 sdsfree(sdsname);
724 return ri;
725 }
726
727 /* Add the specified flags to all the instances in the specified dictionary. */
728 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
729 dictIterator *di;
730 dictEntry *de;
731
732 di = dictGetIterator(instances);
733 while((de = dictNext(di)) != NULL) {
734 sentinelRedisInstance *ri = dictGetVal(de);
735 ri->flags |= flags;
736 }
737 dictReleaseIterator(di);
738 }
739
740 /* Remove the specified flags to all the instances in the specified
741 * dictionary. */
742 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
743 dictIterator *di;
744 dictEntry *de;
745
746 di = dictGetIterator(instances);
747 while((de = dictNext(di)) != NULL) {
748 sentinelRedisInstance *ri = dictGetVal(de);
749 ri->flags &= ~flags;
750 }
751 dictReleaseIterator(di);
752 }
753
754 /* Reset the state of a monitored master:
755 * 1) Remove all slaves.
756 * 2) Remove all sentinels.
757 * 3) Remove most of the flags resulting from runtime operations.
758 * 4) Reset timers to their default value.
759 * 5) In the process of doing this undo the failover if in progress.
760 * 6) Disconnect the connections with the master (will reconnect automatically).
761 */
762 void sentinelResetMaster(sentinelRedisInstance *ri) {
763 redisAssert(ri->flags & SRI_MASTER);
764 dictRelease(ri->slaves);
765 dictRelease(ri->sentinels);
766 ri->slaves = dictCreate(&instancesDictType,NULL);
767 ri->sentinels = dictCreate(&instancesDictType,NULL);
768 if (ri->cc) redisAsyncFree(ri->cc);
769 if (ri->pc) redisAsyncFree(ri->pc);
770 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
771 if (ri->leader) {
772 sdsfree(ri->leader);
773 ri->leader = NULL;
774 }
775 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
776 ri->failover_state_change_time = 0;
777 ri->failover_start_time = 0;
778 ri->promoted_slave = NULL;
779 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
780 }
781
782 /* Call sentinelResetMaster() on every master with a name matching the specified
783 * pattern. */
784 int sentinelResetMastersByPattern(char *pattern) {
785 dictIterator *di;
786 dictEntry *de;
787 int reset = 0;
788
789 di = dictGetIterator(sentinel.masters);
790 while((de = dictNext(di)) != NULL) {
791 sentinelRedisInstance *ri = dictGetVal(de);
792
793 if (ri->name) {
794 if (stringmatch(pattern,ri->name,0)) {
795 sentinelResetMaster(ri);
796 reset++;
797 }
798 }
799 }
800 dictReleaseIterator(di);
801 return reset;
802 }
803
804 /* ============================ Config handling ============================= */
805 char *sentinelHandleConfiguration(char **argv, int argc) {
806 sentinelRedisInstance *ri;
807
808 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
809 /* monitor <name> <host> <port> <quorum> */
810 int quorum = atoi(argv[4]);
811
812 if (quorum <= 0) return "Quorum must be 1 or greater.";
813 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
814 atoi(argv[3]),quorum,NULL) == NULL)
815 {
816 switch(errno) {
817 case EBUSY: return "Duplicated master name.";
818 case ENOENT: return "Can't resolve master instance hostname.";
819 case EINVAL: return "Invalid port number";
820 }
821 }
822 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
823 /* down-after-milliseconds <name> <milliseconds> */
824 ri = sentinelGetMasterByName(argv[1]);
825 if (!ri) return "No such master with specified name.";
826 ri->down_after_period = atoi(argv[2]);
827 if (ri->down_after_period <= 0)
828 return "negative or zero time parameter.";
829 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
830 /* failover-timeout <name> <milliseconds> */
831 ri = sentinelGetMasterByName(argv[1]);
832 if (!ri) return "No such master with specified name.";
833 ri->failover_timeout = atoi(argv[2]);
834 if (ri->failover_timeout <= 0)
835 return "negative or zero time parameter.";
836 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
837 /* can-failover <name> <yes/no> */
838 int yesno = yesnotoi(argv[2]);
839
840 ri = sentinelGetMasterByName(argv[1]);
841 if (!ri) return "No such master with specified name.";
842 if (yesno == -1) return "Argument must be either yes or no.";
843 if (yesno)
844 ri->flags |= SRI_CAN_FAILOVER;
845 else
846 ri->flags &= ~SRI_CAN_FAILOVER;
847 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
848 /* parallel-syncs <name> <milliseconds> */
849 ri = sentinelGetMasterByName(argv[1]);
850 if (!ri) return "No such master with specified name.";
851 ri->parallel_syncs = atoi(argv[2]);
852 } else {
853 return "Unrecognized sentinel configuration statement.";
854 }
855 return NULL;
856 }
857
858 /* ====================== hiredis connection handling ======================= */
859
860 /* This function takes an hiredis context that is in an error condition
861 * and make sure to mark the instance as disconnected performing the
862 * cleanup needed.
863 *
864 * Note: we don't free the hiredis context as hiredis will do it for us
865 * for async conenctions. */
866 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
867 sentinelRedisInstance *ri = c->data;
868 int pubsub = (ri->pc == c);
869
870 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
871 "%@ #%s", c->errstr);
872 if (pubsub)
873 ri->pc = NULL;
874 else
875 ri->cc = NULL;
876 ri->flags |= SRI_DISCONNECTED;
877 }
878
879 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
880 if (status != REDIS_OK) {
881 sentinelDisconnectInstanceFromContext(c);
882 } else {
883 sentinelRedisInstance *ri = c->data;
884 int pubsub = (ri->pc == c);
885
886 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
887 "%@");
888 }
889 }
890
891 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
892 sentinelDisconnectInstanceFromContext(c);
893 }
894
895 /* Create the async connections for the specified instance if the instance
896 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
897 * one of the two links (commands and pub/sub) is missing. */
898 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
899 if (!(ri->flags & SRI_DISCONNECTED)) return;
900
901 /* Commands connection. */
902 if (ri->cc == NULL) {
903 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
904 if (ri->cc->err) {
905 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
906 ri->cc->errstr);
907 redisAsyncFree(ri->cc);
908 ri->cc = NULL;
909 } else {
910 ri->cc_conn_time = mstime();
911 ri->cc->data = ri;
912 redisAeAttach(server.el,ri->cc);
913 redisAsyncSetConnectCallback(ri->cc,
914 sentinelLinkEstablishedCallback);
915 redisAsyncSetDisconnectCallback(ri->cc,
916 sentinelDisconnectCallback);
917 }
918 }
919 /* Pub / Sub */
920 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
921 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
922 if (ri->pc->err) {
923 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
924 ri->pc->errstr);
925 redisAsyncFree(ri->pc);
926 ri->pc = NULL;
927 } else {
928 int retval;
929
930 ri->pc_conn_time = mstime();
931 ri->pc->data = ri;
932 redisAeAttach(server.el,ri->pc);
933 redisAsyncSetConnectCallback(ri->pc,
934 sentinelLinkEstablishedCallback);
935 redisAsyncSetDisconnectCallback(ri->pc,
936 sentinelDisconnectCallback);
937 /* Now we subscribe to the Sentinels "Hello" channel. */
938 retval = redisAsyncCommand(ri->pc,
939 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
940 SENTINEL_HELLO_CHANNEL);
941 if (retval != REDIS_OK) {
942 /* If we can't subscribe, the Pub/Sub connection is useless
943 * and we can simply disconnect it and try again. */
944 redisAsyncFree(ri->pc);
945 ri->pc = NULL;
946 return;
947 }
948 }
949 }
950 /* Clear the DISCONNECTED flags only if we have both the connections
951 * (or just the commands connection if this is a slave or a
952 * sentinel instance). */
953 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
954 ri->flags &= ~SRI_DISCONNECTED;
955 }
956
957 /* ======================== Redis instances pinging ======================== */
958
959 /* Process the INFO output from masters. */
960 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
961 sds *lines;
962 int numlines, j;
963 int role = 0;
964
965
966 /* The following fields must be reset to a given value in the case they
967 * are not found at all in the INFO output. */
968 ri->master_link_down_time = 0;
969
970 /* Process line by line. */
971 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
972 for (j = 0; j < numlines; j++) {
973 sentinelRedisInstance *slave;
974 sds l = lines[j];
975
976 /* run_id:<40 hex chars>*/
977 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
978 if (ri->runid == NULL) {
979 ri->runid = sdsnewlen(l+7,40);
980 } else {
981 /* TODO: check if run_id has changed. This means the
982 * instance has been restarted, we want to set a flag
983 * and notify this event. */
984 }
985 }
986
987 /* slave0:<ip>,<port>,<state> */
988 if ((ri->flags & SRI_MASTER) &&
989 sdslen(l) >= 7 &&
990 !memcmp(l,"slave",5) && isdigit(l[5]))
991 {
992 char *ip, *port, *end;
993
994 ip = strchr(l,':'); if (!ip) continue;
995 ip++; /* Now ip points to start of ip address. */
996 port = strchr(ip,','); if (!port) continue;
997 *port = '\0'; /* nul term for easy access. */
998 port++; /* Now port points to start of port number. */
999 end = strchr(port,','); if (!end) continue;
1000 *end = '\0'; /* nul term for easy access. */
1001
1002 /* Check if we already have this slave into our table,
1003 * otherwise add it. */
1004 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1005 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1006 atoi(port), ri->quorum,ri)) != NULL)
1007 {
1008 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1009 }
1010 }
1011 }
1012
1013 /* master_link_down_since_seconds:<seconds> */
1014 if (sdslen(l) >= 32 &&
1015 !memcmp(l,"master_link_down_since_seconds",30))
1016 {
1017 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1018 }
1019
1020 /* role:<role> */
1021 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1022 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1023
1024 if (role == SRI_SLAVE) {
1025 /* master_host:<host> */
1026 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1027 sdsfree(ri->slave_master_host);
1028 ri->slave_master_host = sdsnew(l+12);
1029 }
1030
1031 /* master_port:<port> */
1032 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1033 ri->slave_master_port = atoi(l+12);
1034
1035 /* master_link_status:<status> */
1036 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1037 ri->slave_master_link_status =
1038 (strcasecmp(l+19,"up") == 0) ?
1039 SENTINEL_MASTER_LINK_STATUS_UP :
1040 SENTINEL_MASTER_LINK_STATUS_DOWN;
1041 }
1042 }
1043 }
1044 ri->info_refresh = mstime();
1045 sdsfreesplitres(lines,numlines);
1046
1047 if (sentinel.tilt) return;
1048
1049 /* Act if a slave turned into a master. */
1050 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
1051 if (ri->flags & SRI_PROMOTED) {
1052 /* If this is a promoted slave we can change state to the
1053 * failover state machine. */
1054 if (ri->master &&
1055 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1056 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1057 (ri->master->failover_state ==
1058 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1059 {
1060 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1061 ri->master->failover_state_change_time = mstime();
1062 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1063 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1064 ri->master,"%@");
1065 }
1066 } else {
1067 /* Otherwise we interpret this as the start of the failover. */
1068 if (ri->master &&
1069 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) == 0)
1070 {
1071 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1072 sentinelEvent(REDIS_WARNING,"failover-detected",ri->master,"%@");
1073 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1074 ri->master->failover_state_change_time = mstime();
1075 ri->master->promoted_slave = ri;
1076 ri->flags |= SRI_PROMOTED;
1077 /* We are an observer, so we can only assume that the leader
1078 * is reconfiguring the slave instances. For this reason we
1079 * set all the instances as RECONF_SENT waiting for progresses
1080 * on this side. */
1081 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1082 SRI_RECONF_SENT);
1083 }
1084 }
1085 }
1086
1087 /* Detect if the slave that is in the process of being reconfigured
1088 * changed state. */
1089 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1090 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1091 {
1092 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1093 if ((ri->flags & SRI_RECONF_SENT) &&
1094 ri->slave_master_host &&
1095 strcmp(ri->slave_master_host,
1096 ri->master->promoted_slave->addr->ip) == 0 &&
1097 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1098 {
1099 ri->flags &= ~SRI_RECONF_SENT;
1100 ri->flags |= SRI_RECONF_INPROG;
1101 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1102 }
1103
1104 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1105 if ((ri->flags & SRI_RECONF_INPROG) &&
1106 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1107 {
1108 ri->flags &= ~SRI_RECONF_INPROG;
1109 ri->flags |= SRI_RECONF_DONE;
1110 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1111 /* If we are moving forward (a new slave is now configured)
1112 * we update the change_time as we are conceptually passing
1113 * to the next slave. */
1114 ri->failover_state_change_time = mstime();
1115 }
1116 }
1117 }
1118
1119 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1120 sentinelRedisInstance *ri = c->data;
1121 redisReply *r;
1122
1123 ri->pending_commands--;
1124 if (!reply) return;
1125 r = reply;
1126
1127 if (r->type == REDIS_REPLY_STRING) {
1128 sentinelRefreshInstanceInfo(ri,r->str);
1129 }
1130 }
1131
1132 /* Just discard the reply. We use this when we are not monitoring the return
1133 * value of the command but its effects directly. */
1134 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1135 sentinelRedisInstance *ri = c->data;
1136
1137 ri->pending_commands--;
1138 }
1139
1140 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1141 sentinelRedisInstance *ri = c->data;
1142 redisReply *r;
1143
1144 ri->pending_commands--;
1145 if (!reply) return;
1146 r = reply;
1147
1148 if (r->type == REDIS_REPLY_STATUS ||
1149 r->type == REDIS_REPLY_ERROR) {
1150 /* Update the "instance available" field only if this is an
1151 * acceptable reply. */
1152 if (strncmp(r->str,"PONG",4) == 0 ||
1153 strncmp(r->str,"LOADING",7) == 0 ||
1154 strncmp(r->str,"MASTERDOWN",10) == 0)
1155 {
1156 ri->last_avail_time = mstime();
1157 }
1158 }
1159 ri->last_pong_time = mstime();
1160 }
1161
1162 /* This is called when we get the reply about the PUBLISH command we send
1163 * to the master to advertise this sentinel. */
1164 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1165 sentinelRedisInstance *ri = c->data;
1166 redisReply *r;
1167
1168 ri->pending_commands--;
1169 if (!reply) return;
1170 r = reply;
1171
1172 /* Only update pub_time if we actually published our message. Otherwise
1173 * we'll retry against in 100 milliseconds. */
1174 if (r->type != REDIS_REPLY_ERROR)
1175 ri->last_pub_time = mstime();
1176 }
1177
1178 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
1179 * to discover other sentinels attached at the same master. */
1180 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1181 sentinelRedisInstance *ri = c->data;
1182 redisReply *r;
1183
1184 if (!reply) return;
1185 r = reply;
1186
1187 /* Update the last activity in the pubsub channel. Note that since we
1188 * receive our messages as well this timestamp can be used to detect
1189 * if the link is probably diconnected even if it seems otherwise. */
1190 ri->pc_last_activity = mstime();
1191
1192 /* Sanity check in the reply we expect, so that the code that follows
1193 * can avoid to check for details. */
1194 if (r->type != REDIS_REPLY_ARRAY ||
1195 r->elements != 3 ||
1196 r->element[0]->type != REDIS_REPLY_STRING ||
1197 r->element[1]->type != REDIS_REPLY_STRING ||
1198 r->element[2]->type != REDIS_REPLY_STRING ||
1199 strcmp(r->element[0]->str,"message") != 0) return;
1200
1201 /* We are not interested in meeting ourselves */
1202 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1203
1204 {
1205 int numtokens, port, removed, canfailover;
1206 char **token = sdssplitlen(r->element[2]->str,
1207 r->element[2]->len,
1208 ":",1,&numtokens);
1209 sentinelRedisInstance *sentinel;
1210
1211 if (numtokens == 4) {
1212 /* First, try to see if we already have this sentinel. */
1213 port = atoi(token[1]);
1214 canfailover = atoi(token[3]);
1215 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1216 ri->sentinels,token[0],port,token[2]);
1217
1218 if (!sentinel) {
1219 /* If not, remove all the sentinels that have the same runid
1220 * OR the same ip/port, because it's either a restart or a
1221 * network topology change. */
1222 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1223 token[2]);
1224 if (removed) {
1225 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1226 "%@ #duplicate of %s:%d or %s",
1227 token[0],port,token[2]);
1228 }
1229
1230 /* Add the new sentinel. */
1231 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1232 token[0],port,ri->quorum,ri);
1233 if (sentinel) {
1234 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1235 /* The runid is NULL after a new instance creation and
1236 * for Sentinels we don't have a later chance to fill it,
1237 * so do it now. */
1238 sentinel->runid = sdsnew(token[2]);
1239 }
1240 }
1241
1242 /* Update the state of the Sentinel. */
1243 if (sentinel) {
1244 sentinel->last_hello_time = mstime();
1245 if (canfailover)
1246 sentinel->flags |= SRI_CAN_FAILOVER;
1247 else
1248 sentinel->flags &= ~SRI_CAN_FAILOVER;
1249 }
1250 }
1251 sdsfreesplitres(token,numtokens);
1252 }
1253 }
1254
1255 void sentinelPingInstance(sentinelRedisInstance *ri) {
1256 mstime_t now = mstime();
1257 mstime_t info_period;
1258 int retval;
1259
1260 /* Return ASAP if we have already a PING or INFO already pending, or
1261 * in the case the instance is not properly connected. */
1262 if (ri->flags & SRI_DISCONNECTED) return;
1263
1264 /* For INFO, PING, PUBLISH that are not critical commands to send we
1265 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1266 * want to use a lot of memory just because a link is not working
1267 * properly (note that anyway there is a redundant protection about this,
1268 * that is, the link will be disconnected and reconnected if a long
1269 * timeout condition is detected. */
1270 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1271
1272 /* If this is a slave of a master in O_DOWN condition we start sending
1273 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1274 * period. In this state we want to closely monitor slaves in case they
1275 * are turned into masters by another Sentinel, or by the sysadmin. */
1276 if ((ri->flags & SRI_SLAVE) &&
1277 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1278 info_period = 1000;
1279 } else {
1280 info_period = SENTINEL_INFO_PERIOD;
1281 }
1282
1283 if ((ri->flags & SRI_SENTINEL) == 0 &&
1284 (ri->info_refresh == 0 ||
1285 (now - ri->info_refresh) > info_period))
1286 {
1287 /* Send INFO to masters and slaves, not sentinels. */
1288 retval = redisAsyncCommand(ri->cc,
1289 sentinelInfoReplyCallback, NULL, "INFO");
1290 if (retval != REDIS_OK) return;
1291 ri->pending_commands++;
1292 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1293 /* Send PING to all the three kinds of instances. */
1294 retval = redisAsyncCommand(ri->cc,
1295 sentinelPingReplyCallback, NULL, "PING");
1296 if (retval != REDIS_OK) return;
1297 ri->pending_commands++;
1298 } else if ((ri->flags & SRI_MASTER) &&
1299 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1300 {
1301 /* PUBLISH hello messages only to masters. */
1302 struct sockaddr_in sa;
1303 socklen_t salen = sizeof(sa);
1304
1305 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1306 char myaddr[128];
1307
1308 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1309 inet_ntoa(sa.sin_addr), server.port, server.runid,
1310 (ri->flags & SRI_CAN_FAILOVER) != 0);
1311 retval = redisAsyncCommand(ri->cc,
1312 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1313 SENTINEL_HELLO_CHANNEL,myaddr);
1314 if (retval != REDIS_OK) return;
1315 ri->pending_commands++;
1316 }
1317 }
1318 }
1319
1320 /* =========================== SENTINEL command ============================= */
1321
1322 const char *sentinelFailoverStateStr(int state) {
1323 switch(state) {
1324 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1325 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1326 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1327 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1328 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1329 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1330 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1331 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1332 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1333 default: return "unknown";
1334 }
1335 }
1336
1337 /* Redis instance to Redis protocol representation. */
1338 void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1339 char *flags = sdsempty();
1340 void *mbl;
1341 int fields = 0;
1342
1343 mbl = addDeferredMultiBulkLength(c);
1344
1345 addReplyBulkCString(c,"name");
1346 addReplyBulkCString(c,ri->name);
1347 fields++;
1348
1349 addReplyBulkCString(c,"ip");
1350 addReplyBulkCString(c,ri->addr->ip);
1351 fields++;
1352
1353 addReplyBulkCString(c,"port");
1354 addReplyBulkLongLong(c,ri->addr->port);
1355 fields++;
1356
1357 addReplyBulkCString(c,"runid");
1358 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1359 fields++;
1360
1361 addReplyBulkCString(c,"flags");
1362 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1363 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1364 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1365 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1366 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1367 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1368 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1369 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1370 flags = sdscat(flags,"failover_in_progress,");
1371 if (ri->flags & SRI_I_AM_THE_LEADER)
1372 flags = sdscat(flags,"i_am_the_leader,");
1373 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1374 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1375 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1376 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1377
1378 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1379 addReplyBulkCString(c,flags);
1380 sdsfree(flags);
1381 fields++;
1382
1383 addReplyBulkCString(c,"pending-commands");
1384 addReplyBulkLongLong(c,ri->pending_commands);
1385 fields++;
1386
1387 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1388 addReplyBulkCString(c,"failover-state");
1389 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1390 fields++;
1391 }
1392
1393 addReplyBulkCString(c,"last-ok-ping-reply");
1394 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1395 fields++;
1396
1397 addReplyBulkCString(c,"last-ping-reply");
1398 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1399 fields++;
1400
1401 if (ri->flags & SRI_S_DOWN) {
1402 addReplyBulkCString(c,"s-down-time");
1403 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1404 fields++;
1405 }
1406
1407 if (ri->flags & SRI_O_DOWN) {
1408 addReplyBulkCString(c,"o-down-time");
1409 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1410 fields++;
1411 }
1412
1413 /* Masters and Slaves */
1414 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1415 addReplyBulkCString(c,"info-refresh");
1416 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1417 fields++;
1418 }
1419
1420 /* Only masters */
1421 if (ri->flags & SRI_MASTER) {
1422 addReplyBulkCString(c,"num-slaves");
1423 addReplyBulkLongLong(c,dictSize(ri->slaves));
1424 fields++;
1425
1426 addReplyBulkCString(c,"num-other-sentinels");
1427 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1428 fields++;
1429
1430 addReplyBulkCString(c,"quorum");
1431 addReplyBulkLongLong(c,ri->quorum);
1432 fields++;
1433 }
1434
1435 /* Only slaves */
1436 if (ri->flags & SRI_SLAVE) {
1437 addReplyBulkCString(c,"master-link-down-time");
1438 addReplyBulkLongLong(c,ri->master_link_down_time);
1439 fields++;
1440
1441 addReplyBulkCString(c,"master-link-status");
1442 addReplyBulkCString(c,
1443 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1444 "ok" : "err");
1445 fields++;
1446
1447 addReplyBulkCString(c,"master-host");
1448 addReplyBulkCString(c,
1449 ri->slave_master_host ? ri->slave_master_host : "?");
1450 fields++;
1451
1452 addReplyBulkCString(c,"master-port");
1453 addReplyBulkLongLong(c,ri->slave_master_port);
1454 fields++;
1455 }
1456
1457 /* Only sentinels */
1458 if (ri->flags & SRI_SENTINEL) {
1459 addReplyBulkCString(c,"last-hello-message");
1460 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1461 fields++;
1462
1463 addReplyBulkCString(c,"can-failover-its-master");
1464 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1465 fields++;
1466
1467 if (ri->flags & SRI_MASTER_DOWN) {
1468 addReplyBulkCString(c,"subjective-leader");
1469 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1470 fields++;
1471 }
1472 }
1473
1474 setDeferredMultiBulkLength(c,mbl,fields*2);
1475 }
1476
1477 /* Output a number of instances contanined inside a dictionary as
1478 * Redis protocol. */
1479 void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1480 dictIterator *di;
1481 dictEntry *de;
1482
1483 di = dictGetIterator(instances);
1484 addReplyMultiBulkLen(c,dictSize(instances));
1485 while((de = dictNext(di)) != NULL) {
1486 sentinelRedisInstance *ri = dictGetVal(de);
1487
1488 addReplySentinelRedisInstance(c,ri);
1489 }
1490 dictReleaseIterator(di);
1491 }
1492
1493 /* Lookup the named master into sentinel.masters.
1494 * If the master is not found reply to the client with an error and returns
1495 * NULL. */
1496 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1497 robj *name)
1498 {
1499 sentinelRedisInstance *ri;
1500
1501 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1502 if (!ri) {
1503 addReplyError(c,"No such master with that name");
1504 return NULL;
1505 }
1506 return ri;
1507 }
1508
1509 void sentinelCommand(redisClient *c) {
1510 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1511 /* SENTINEL MASTERS */
1512 if (c->argc != 2) goto numargserr;
1513
1514 addReplyDictOfRedisInstances(c,sentinel.masters);
1515 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1516 /* SENTINEL SLAVES <master-name> */
1517 sentinelRedisInstance *ri;
1518
1519 if (c->argc != 3) goto numargserr;
1520 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1521 return;
1522 addReplyDictOfRedisInstances(c,ri->slaves);
1523 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1524 /* SENTINEL SENTINELS <master-name> */
1525 sentinelRedisInstance *ri;
1526
1527 if (c->argc != 3) goto numargserr;
1528 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1529 return;
1530 addReplyDictOfRedisInstances(c,ri->sentinels);
1531 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1532 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1533 sentinelRedisInstance *ri;
1534 char *leader = NULL;
1535 long port;
1536 int isdown = 0;
1537
1538 if (c->argc != 4) goto numargserr;
1539 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
1540 return;
1541 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
1542 c->argv[2]->ptr,port,NULL);
1543
1544 /* It exists? Is actually a master? Is subjectively down? It's down.
1545 * Note: if we are in tilt mode we always reply with "0". */
1546 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
1547 (ri->flags & SRI_MASTER))
1548 isdown = 1;
1549 if (ri) leader = sentinelGetSubjectiveLeader(ri);
1550
1551 /* Reply with a two-elements multi-bulk reply: down state, leader. */
1552 addReplyMultiBulkLen(c,2);
1553 addReply(c, isdown ? shared.cone : shared.czero);
1554 addReplyBulkCString(c, leader ? leader : "?");
1555 if (leader) sdsfree(leader);
1556 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
1557 /* SENTINEL RESET <pattern> */
1558 if (c->argc != 3) goto numargserr;
1559 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr));
1560 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
1561 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
1562 sentinelRedisInstance *ri;
1563
1564 if (c->argc != 3) goto numargserr;
1565 ri = sentinelGetMasterByName(c->argv[2]->ptr);
1566 if (ri == NULL) {
1567 addReply(c,shared.nullmultibulk);
1568 } else {
1569 sentinelAddr *addr = ri->addr;
1570
1571 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
1572 addr = ri->promoted_slave->addr;
1573 addReplyMultiBulkLen(c,2);
1574 addReplyBulkCString(c,addr->ip);
1575 addReplyBulkLongLong(c,addr->port);
1576 }
1577 } else {
1578 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
1579 (char*)c->argv[1]->ptr);
1580 }
1581 return;
1582
1583 numargserr:
1584 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
1585 (char*)c->argv[1]->ptr);
1586 }
1587
1588 /* ===================== SENTINEL availability checks ======================= */
1589
1590 /* Is this instance down from our point of view? */
1591 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
1592 mstime_t elapsed = mstime() - ri->last_avail_time;
1593
1594 /* Check if we are in need for a reconnection of one of the
1595 * links, because we are detecting low activity.
1596 *
1597 * 1) Check if the command link seems connected, was connected not less
1598 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
1599 * idle time that is greater than down_after_period / 2 seconds. */
1600 if (ri->cc &&
1601 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1602 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
1603 {
1604 redisAsyncFree(ri->cc); /* will call the disconnection callback */
1605 }
1606
1607 /* 2) Check if the pubsub link seems connected, was connected not less
1608 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
1609 * activity in the Pub/Sub channel for more than
1610 * SENTINEL_PUBLISH_PERIOD * 3.
1611 */
1612 if (ri->pc &&
1613 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1614 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
1615 {
1616 redisAsyncFree(ri->pc); /* will call the disconnection callback */
1617 }
1618
1619 /* Update the subjectively down flag. */
1620 if (elapsed > ri->down_after_period) {
1621 /* Is subjectively down */
1622 if ((ri->flags & SRI_S_DOWN) == 0) {
1623 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
1624 ri->s_down_since_time = mstime();
1625 ri->flags |= SRI_S_DOWN;
1626 }
1627 } else {
1628 /* Is subjectively up */
1629 if (ri->flags & SRI_S_DOWN) {
1630 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
1631 ri->flags &= ~SRI_S_DOWN;
1632 }
1633 }
1634 }
1635
1636 /* Is this instance down accordingly to the configured quorum? */
1637 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
1638 dictIterator *di;
1639 dictEntry *de;
1640 int quorum = 0, odown = 0;
1641
1642 if (master->flags & SRI_S_DOWN) {
1643 /* Is down for enough sentinels? */
1644 quorum = 1; /* the current sentinel. */
1645 /* Count all the other sentinels. */
1646 di = dictGetIterator(master->sentinels);
1647 while((de = dictNext(di)) != NULL) {
1648 sentinelRedisInstance *ri = dictGetVal(de);
1649
1650 if (ri->flags & SRI_MASTER_DOWN) quorum++;
1651 }
1652 dictReleaseIterator(di);
1653 if (quorum >= master->quorum) odown = 1;
1654 }
1655
1656 /* Set the flag accordingly to the outcome. */
1657 if (odown) {
1658 if ((master->flags & SRI_O_DOWN) == 0) {
1659 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
1660 quorum, master->quorum);
1661 master->flags |= SRI_O_DOWN;
1662 master->o_down_since_time = mstime();
1663 }
1664 } else {
1665 if (master->flags & SRI_O_DOWN) {
1666 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
1667 master->flags &= ~SRI_O_DOWN;
1668 }
1669 }
1670 }
1671
1672 /* Receive the SENTINEL is-master-down-by-addr reply, see the
1673 * sentinelAskMasterStateToOtherSentinels() function for more information. */
1674 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
1675 sentinelRedisInstance *ri = c->data;
1676 redisReply *r;
1677
1678 ri->pending_commands--;
1679 if (!reply) return;
1680 r = reply;
1681
1682 /* Ignore every error or unexpected reply.
1683 * Note that if the command returns an error for any reason we'll
1684 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
1685 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
1686 r->element[0]->type == REDIS_REPLY_INTEGER &&
1687 r->element[1]->type == REDIS_REPLY_STRING)
1688 {
1689 ri->last_master_down_reply_time = mstime();
1690 if (r->element[0]->integer == 1) {
1691 ri->flags |= SRI_MASTER_DOWN;
1692 } else {
1693 ri->flags &= ~SRI_MASTER_DOWN;
1694 }
1695 sdsfree(ri->leader);
1696 ri->leader = sdsnew(r->element[1]->str);
1697 }
1698 }
1699
1700 /* If we think (subjectively) the master is down, we start sending
1701 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
1702 * in order to get the replies that allow to reach the quorum and
1703 * possibly also mark the master as objectively down. */
1704 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
1705 dictIterator *di;
1706 dictEntry *de;
1707
1708 di = dictGetIterator(master->sentinels);
1709 while((de = dictNext(di)) != NULL) {
1710 sentinelRedisInstance *ri = dictGetVal(de);
1711 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
1712 char port[32];
1713 int retval;
1714
1715 /* If the master state from other sentinel is too old, we clear it. */
1716 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
1717 ri->flags &= ~SRI_MASTER_DOWN;
1718 sdsfree(ri->leader);
1719 ri->leader = NULL;
1720 }
1721
1722 /* Only ask if master is down to other sentinels if:
1723 *
1724 * 1) We believe it is down, or there is a failover in progress.
1725 * 2) Sentinel is connected.
1726 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
1727 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
1728 continue;
1729 if (ri->flags & SRI_DISCONNECTED) continue;
1730 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
1731 continue;
1732
1733 /* Ask */
1734 ll2string(port,sizeof(port),master->addr->port);
1735 retval = redisAsyncCommand(ri->cc,
1736 sentinelReceiveIsMasterDownReply, NULL,
1737 "SENTINEL is-master-down-by-addr %s %s",
1738 master->addr->ip, port);
1739 if (retval == REDIS_OK) ri->pending_commands++;
1740 }
1741 dictReleaseIterator(di);
1742 }
1743
1744 /* =============================== FAILOVER ================================= */
1745
1746 /* Given a master get the "subjective leader", that is, among all the sentinels
1747 * with given characteristics, the one with the lexicographically smaller
1748 * runid. The characteristics required are:
1749 *
1750 * 1) Has SRI_CAN_FAILOVER flag.
1751 * 2) Is not disconnected.
1752 * 3) Recently answered to our ping (no longer than
1753 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
1754 *
1755 * The function returns a pointer to an sds string representing the runid of the
1756 * leader sentinel instance (from our point of view). Otherwise NULL is
1757 * returned if there are no suitable sentinels.
1758 */
1759
1760 int compareRunID(const void *a, const void *b) {
1761 char **aptrptr = (char**)a, **bptrptr = (char**)b;
1762 return strcasecmp(*aptrptr, *bptrptr);
1763 }
1764
1765 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
1766 dictIterator *di;
1767 dictEntry *de;
1768 char **instance =
1769 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
1770 int instances = 0;
1771 char *leader = NULL;
1772
1773 if (master->flags & SRI_CAN_FAILOVER) {
1774 /* Add myself if I'm a Sentinel that can failover this master. */
1775 instance[instances++] = server.runid;
1776 }
1777
1778 di = dictGetIterator(master->sentinels);
1779 while((de = dictNext(di)) != NULL) {
1780 sentinelRedisInstance *ri = dictGetVal(de);
1781 mstime_t lag = mstime() - ri->last_avail_time;
1782
1783 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
1784 !(ri->flags & SRI_CAN_FAILOVER) ||
1785 (ri->flags & SRI_DISCONNECTED) ||
1786 ri->runid == NULL)
1787 continue;
1788 instance[instances++] = ri->runid;
1789 }
1790 dictReleaseIterator(di);
1791
1792 /* If we have at least one instance passing our checks, order the array
1793 * by runid. */
1794 if (instances) {
1795 qsort(instance,instances,sizeof(char*),compareRunID);
1796 leader = sdsnew(instance[0]);
1797 }
1798 zfree(instance);
1799 return leader;
1800 }
1801
1802 struct sentinelLeader {
1803 char *runid;
1804 unsigned long votes;
1805 };
1806
1807 /* Helper function for sentinelGetObjectiveLeader, increment the counter
1808 * relative to the specified runid. */
1809 void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
1810 dictEntry *de = dictFind(counters,runid);
1811 uint64_t oldval;
1812
1813 if (de) {
1814 oldval = dictGetUnsignedIntegerVal(de);
1815 dictSetUnsignedIntegerVal(de,oldval+1);
1816 } else {
1817 de = dictAddRaw(counters,runid);
1818 redisAssert(de != NULL);
1819 dictSetUnsignedIntegerVal(de,1);
1820 }
1821 }
1822
1823 /* Scan all the Sentinels attached to this master to check what is the
1824 * most voted leader among Sentinels. */
1825 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
1826 dict *counters;
1827 dictIterator *di;
1828 dictEntry *de;
1829 unsigned int voters = 0, voters_quorum;
1830 char *myvote;
1831 char *winner = NULL;
1832
1833 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
1834 counters = dictCreate(&leaderVotesDictType,NULL);
1835
1836 /* Count my vote. */
1837 myvote = sentinelGetSubjectiveLeader(master);
1838 if (myvote) {
1839 sentinelObjectiveLeaderIncr(counters,myvote);
1840 voters++;
1841 }
1842
1843 /* Count other sentinels votes */
1844 di = dictGetIterator(master->sentinels);
1845 while((de = dictNext(di)) != NULL) {
1846 sentinelRedisInstance *ri = dictGetVal(de);
1847 if (ri->leader == NULL) continue;
1848 /* If the failover is not already in progress we are only interested
1849 * in Sentinels that believe the master is down. Otherwise the leader
1850 * selection is useful for the "failover-takedown" when the original
1851 * leader fails. In that case we consider all the voters. */
1852 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1853 !(ri->flags & SRI_MASTER_DOWN)) continue;
1854 sentinelObjectiveLeaderIncr(counters,ri->leader);
1855 voters++;
1856 }
1857 dictReleaseIterator(di);
1858 voters_quorum = voters/2+1;
1859
1860 /* Check what's the winner. For the winner to win, it needs two conditions:
1861 * 1) Absolute majority between voters (50% + 1).
1862 * 2) And anyway at least master->quorum votes. */
1863 {
1864 uint64_t max_votes = 0; /* Max votes so far. */
1865
1866 di = dictGetIterator(counters);
1867 while((de = dictNext(di)) != NULL) {
1868 uint64_t votes = dictGetUnsignedIntegerVal(de);
1869
1870 if (max_votes < votes) {
1871 max_votes = votes;
1872 winner = dictGetKey(de);
1873 }
1874 }
1875 dictReleaseIterator(di);
1876 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
1877 winner = NULL;
1878 }
1879 winner = winner ? sdsnew(winner) : NULL;
1880 sdsfree(myvote);
1881 dictRelease(counters);
1882 return winner;
1883 }
1884
1885 /* This function checks if there are the conditions to start the failover,
1886 * that is:
1887 *
1888 * 1) Enough time has passed since O_DOWN.
1889 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
1890 * 3) We are the objectively leader for this master.
1891 *
1892 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
1893 * and SRI_I_AM_THE_LEADER.
1894 */
1895 void sentinelStartFailover(sentinelRedisInstance *master) {
1896 char *leader;
1897 int isleader;
1898
1899 /* We can't failover if the master is not in O_DOWN state or if
1900 * there is not already a failover in progress (to perform the
1901 * takedown if the leader died) or if this Sentinel is not allowed
1902 * to start a failover. */
1903 if (!(master->flags & SRI_CAN_FAILOVER) ||
1904 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
1905
1906 leader = sentinelGetObjectiveLeader(master);
1907 isleader = leader && strcasecmp(leader,server.runid) == 0;
1908 sdsfree(leader);
1909
1910 /* If I'm not the leader, I can't failover for sure. */
1911 if (!isleader) return;
1912
1913 /* If the failover is already in progress there are two options... */
1914 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
1915 if (master->flags & SRI_I_AM_THE_LEADER) {
1916 /* 1) I'm flagged as leader so I already started the failover.
1917 * Just return. */
1918 return;
1919 } else {
1920 mstime_t elapsed = mstime() - master->failover_state_change_time;
1921
1922 /* 2) I'm the new leader, but I'm not flagged as leader in the
1923 * master: I did not started the failover, but the original
1924 * leader has no longer the leadership.
1925 *
1926 * In this case if the failover appears to be lagging
1927 * for at least 25% of the configured failover timeout,
1928 * I can assume I can take control. Otherwise
1929 * it's better to return and wait more. */
1930 if (elapsed < (master->failover_timeout/4)) return;
1931 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
1932 /* We have already an elected slave if we are in
1933 * FAILOVER_IN_PROGRESS state, that is, the slave that we
1934 * observed turning into a master. */
1935 master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1936 /* As an observer we flagged all the slaves as RECONF_SENT but
1937 * now we are in charge of actually sending the reconfiguration
1938 * command so let's clear this flag for all the instances. */
1939 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
1940 SRI_RECONF_SENT);
1941 }
1942 } else {
1943 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set. */
1944 master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
1945 }
1946
1947 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
1948 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
1949
1950 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
1951 * a recovery of a failover started by another sentinel. */
1952 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
1953 master->failover_start_time = mstime() +
1954 SENTINEL_FAILOVER_FIXED_DELAY +
1955 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
1956 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
1957 "%@ #starting in %lld milliseconds",
1958 master->failover_start_time-mstime());
1959 }
1960 master->failover_state_change_time = mstime();
1961 }
1962
1963 /* Select a suitable slave to promote. The current algorithm only uses
1964 * the following parameters:
1965 *
1966 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
1967 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
1968 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
1969 * 4) master_link_down_time no more than:
1970 * (now - master->s_down_since_time) + (master->down_after_period * 10).
1971 *
1972 * Among all the slaves matching the above conditions we select the slave
1973 * with lower slave_priority. If priority is the same we select the slave
1974 * with lexicographically smaller runid.
1975 *
1976 * The function returns the pointer to the selected slave, otherwise
1977 * NULL if no suitable slave was found.
1978 */
1979
1980 int compareSlavesForPromotion(const void *a, const void *b) {
1981 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
1982 **sb = (sentinelRedisInstance **)b;
1983 if ((*sa)->slave_priority != (*sb)->slave_priority)
1984 return (*sa)->slave_priority - (*sb)->slave_priority;
1985 return strcasecmp((*sa)->runid,(*sb)->runid);
1986 }
1987
1988 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
1989 sentinelRedisInstance **instance =
1990 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
1991 sentinelRedisInstance *selected = NULL;
1992 int instances = 0;
1993 dictIterator *di;
1994 dictEntry *de;
1995 mstime_t max_master_down_time;
1996
1997 max_master_down_time = (mstime() - master->s_down_since_time) +
1998 (master->down_after_period * 10);
1999
2000 di = dictGetIterator(master->slaves);
2001 while((de = dictNext(di)) != NULL) {
2002 sentinelRedisInstance *slave = dictGetVal(de);
2003 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2004
2005 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2006 if (slave->last_avail_time < info_validity_time) continue;
2007 if (slave->info_refresh < info_validity_time) continue;
2008 if (slave->master_link_down_time > max_master_down_time) continue;
2009 instance[instances++] = slave;
2010 }
2011 dictReleaseIterator(di);
2012 if (instances) {
2013 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2014 compareSlavesForPromotion);
2015 selected = instance[0];
2016 }
2017 zfree(instance);
2018 return selected;
2019 }
2020
2021 /* ---------------- Failover state machine implementation ------------------- */
2022 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2023 if (mstime() >= ri->failover_start_time) {
2024 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2025 ri->failover_state_change_time = mstime();
2026 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2027 }
2028 }
2029
2030 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2031 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2032
2033 if (slave == NULL) {
2034 sentinelEvent(REDIS_WARNING,"-no-good-slave",ri,
2035 "%@ #retrying in %d seconds",
2036 (SENTINEL_FAILOVER_FIXED_DELAY+
2037 SENTINEL_FAILOVER_MAX_RANDOM_DELAY)/1000);
2038 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
2039 ri->failover_start_time = mstime() + SENTINEL_FAILOVER_FIXED_DELAY +
2040 SENTINEL_FAILOVER_MAX_RANDOM_DELAY;
2041 } else {
2042 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2043 slave->flags |= SRI_PROMOTED;
2044 ri->promoted_slave = slave;
2045 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2046 ri->failover_state_change_time = mstime();
2047 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2048 slave, "%@");
2049 }
2050 }
2051
2052 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2053 int retval;
2054
2055 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2056
2057 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2058 * We actually register a generic callback for this command as we don't
2059 * really care about the reply. We check if it worked indirectly observing
2060 * if INFO returns a different role (master instead of slave). */
2061 retval = redisAsyncCommand(ri->promoted_slave->cc,
2062 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2063 if (retval != REDIS_OK) return;
2064 ri->promoted_slave->pending_commands++;
2065 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2066 ri->promoted_slave,"%@");
2067 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2068 ri->failover_state_change_time = mstime();
2069 }
2070
2071 /* We actually wait for promotion indirectly checking with INFO when the
2072 * slave turns into a master. */
2073 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2074 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2075
2076 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2077 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2078 "%@");
2079 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2080 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2081 ri->failover_state_change_time = mstime();
2082 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2083 ri->promoted_slave = NULL;
2084 }
2085 }
2086
2087 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2088 int not_reconfigured = 0, timeout = 0;
2089 dictIterator *di;
2090 dictEntry *de;
2091 mstime_t elapsed = mstime() - master->failover_state_change_time;
2092
2093 /* We can't consider failover finished if the promoted slave is
2094 * not reachable. */
2095 if (master->promoted_slave == NULL ||
2096 master->promoted_slave->flags & SRI_S_DOWN) return;
2097
2098 /* The failover terminates once all the reachable slaves are properly
2099 * configured. */
2100 di = dictGetIterator(master->slaves);
2101 while((de = dictNext(di)) != NULL) {
2102 sentinelRedisInstance *slave = dictGetVal(de);
2103
2104 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2105 if (slave->flags & SRI_S_DOWN) continue;
2106 not_reconfigured++;
2107 }
2108 dictReleaseIterator(di);
2109
2110 /* Force end of failover on timeout. */
2111 if (elapsed > master->failover_timeout) {
2112 not_reconfigured = 0;
2113 timeout = 1;
2114 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2115 }
2116
2117 if (not_reconfigured == 0) {
2118 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2119 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2120 master->failover_state_change_time = mstime();
2121 }
2122
2123 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2124 * command to all the slaves still not reconfigured to replicate with
2125 * the new master. */
2126 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2127 dictIterator *di;
2128 dictEntry *de;
2129 char master_port[32];
2130
2131 ll2string(master_port,sizeof(master_port),
2132 master->promoted_slave->addr->port);
2133
2134 di = dictGetIterator(master->slaves);
2135 while((de = dictNext(di)) != NULL) {
2136 sentinelRedisInstance *slave = dictGetVal(de);
2137 int retval;
2138
2139 if (slave->flags &
2140 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2141
2142 retval = redisAsyncCommand(slave->cc,
2143 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2144 master->promoted_slave->addr->ip,
2145 master_port);
2146 if (retval == REDIS_OK) {
2147 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2148 slave->flags |= SRI_RECONF_SENT;
2149 }
2150 }
2151 dictReleaseIterator(di);
2152 }
2153 }
2154
2155 /* Send SLAVE OF <new master address> to all the remaining slaves that
2156 * still don't appear to have the configuration updated. */
2157 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2158 dictIterator *di;
2159 dictEntry *de;
2160 int in_progress = 0;
2161
2162 di = dictGetIterator(master->slaves);
2163 while((de = dictNext(di)) != NULL) {
2164 sentinelRedisInstance *slave = dictGetVal(de);
2165
2166 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2167 in_progress++;
2168 }
2169 dictReleaseIterator(di);
2170
2171 di = dictGetIterator(master->slaves);
2172 while(in_progress < master->parallel_syncs &&
2173 (de = dictNext(di)) != NULL)
2174 {
2175 sentinelRedisInstance *slave = dictGetVal(de);
2176 int retval;
2177 char master_port[32];
2178
2179 /* Skip the promoted slave, and already configured slaves. */
2180 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2181
2182 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2183 * the slave moving forward to the next state. */
2184 if ((slave->flags & SRI_RECONF_SENT) &&
2185 (mstime() - slave->slave_reconf_sent_time) >
2186 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2187 {
2188 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2189 slave->flags &= ~SRI_RECONF_SENT;
2190 }
2191
2192 /* Nothing to do for instances that are disconnected or already
2193 * in RECONF_SENT state. */
2194 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2195 continue;
2196
2197 /* Send SLAVEOF <new master>. */
2198 ll2string(master_port,sizeof(master_port),
2199 master->promoted_slave->addr->port);
2200 retval = redisAsyncCommand(slave->cc,
2201 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2202 master->promoted_slave->addr->ip,
2203 master_port);
2204 if (retval == REDIS_OK) {
2205 slave->flags |= SRI_RECONF_SENT;
2206 slave->pending_commands++;
2207 slave->slave_reconf_sent_time = mstime();
2208 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2209 in_progress++;
2210 }
2211 }
2212 dictReleaseIterator(di);
2213 sentinelFailoverDetectEnd(master);
2214 }
2215
2216 /* This function is called when the slave is in
2217 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2218 * to remove it from the master table and add the promoted slave instead.
2219 *
2220 * If there are no promoted slaves as this instance is unique, we remove
2221 * and re-add it with the same address to trigger a complete state
2222 * refresh. */
2223 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2224 sentinelRedisInstance *new, *ref = master->promoted_slave ?
2225 master->promoted_slave : master;
2226 int quorum = ref->quorum, parallel_syncs = ref->parallel_syncs;
2227 char *name = sdsnew(master->name);
2228 char *ip = sdsnew(ref->addr->ip), *oldip = sdsnew(master->addr->ip);
2229 int port = ref->addr->port, oldport = master->addr->port;
2230 int retval, oldflags = master->flags;
2231 mstime_t old_down_after_period = master->down_after_period;
2232 mstime_t old_failover_timeout = master->failover_timeout;
2233
2234 retval = dictDelete(sentinel.masters,master->name);
2235 redisAssert(retval == DICT_OK);
2236 new = createSentinelRedisInstance(name,SRI_MASTER,ip,port,quorum,NULL);
2237 redisAssert(new != NULL);
2238 new->parallel_syncs = parallel_syncs;
2239 new->flags |= (oldflags & SRI_CAN_FAILOVER);
2240 new->down_after_period = old_down_after_period;
2241 new->failover_timeout = old_failover_timeout;
2242 /* TODO: ... set the scripts as well. */
2243 sentinelEvent(REDIS_WARNING,"+switch-master",new,"%s %s %d %s %d",
2244 name, oldip, oldport, ip, port);
2245 sdsfree(name);
2246 sdsfree(ip);
2247 sdsfree(oldip);
2248 }
2249
2250 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2251 redisAssert(ri->flags & SRI_MASTER);
2252
2253 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2254
2255 switch(ri->failover_state) {
2256 case SENTINEL_FAILOVER_STATE_WAIT_START:
2257 sentinelFailoverWaitStart(ri);
2258 break;
2259 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2260 sentinelFailoverSelectSlave(ri);
2261 break;
2262 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2263 sentinelFailoverSendSlaveOfNoOne(ri);
2264 break;
2265 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2266 sentinelFailoverWaitPromotion(ri);
2267 break;
2268 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2269 sentinelFailoverReconfNextSlave(ri);
2270 break;
2271 case SENTINEL_FAILOVER_STATE_DETECT_END:
2272 sentinelFailoverDetectEnd(ri);
2273 break;
2274 }
2275 }
2276
2277 /* The following is called only for master instances and will abort the
2278 * failover process if:
2279 *
2280 * 1) The failover is in progress.
2281 * 2) We already promoted a slave.
2282 * 3) The promoted slave is in extended SDOWN condition.
2283 */
2284 void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2285 dictIterator *di;
2286 dictEntry *de;
2287
2288 /* Failover is in progress? Do we have a promoted slave? */
2289 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2290
2291 /* Is the promoted slave into an extended SDOWN state? */
2292 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2293 (mstime() - ri->promoted_slave->s_down_since_time) <
2294 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2295
2296 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2297
2298 /* Clear failover related flags from slaves.
2299 * Also if we are the leader make sure to send SLAVEOF commands to all the
2300 * already reconfigured slaves in order to turn them back into slaves of
2301 * the original master. */
2302
2303 di = dictGetIterator(ri->slaves);
2304 while((de = dictNext(di)) != NULL) {
2305 sentinelRedisInstance *slave = dictGetVal(de);
2306 if (ri->flags & SRI_I_AM_THE_LEADER) {
2307 char master_port[32];
2308 int retval;
2309
2310 ll2string(master_port,sizeof(master_port),ri->addr->port);
2311 retval = redisAsyncCommand(slave->cc,
2312 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2313 ri->addr->ip,
2314 master_port);
2315 if (retval == REDIS_OK)
2316 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2317 }
2318 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2319 }
2320 dictReleaseIterator(di);
2321
2322 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER);
2323 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2324 ri->failover_state_change_time = mstime();
2325 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2326 ri->promoted_slave = NULL;
2327 }
2328
2329 /* ======================== SENTINEL timer handler ==========================
2330 * This is the "main" our Sentinel, being sentinel completely non blocking
2331 * in design. The function is called every second.
2332 * -------------------------------------------------------------------------- */
2333
2334 /* Perform scheduled operations for the specified Redis instance. */
2335 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2336 /* ========== MONITORING HALF ============ */
2337 /* Every kind of instance */
2338 sentinelReconnectInstance(ri);
2339 sentinelPingInstance(ri);
2340
2341 /* Masters and slaves */
2342 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2343 /* Nothing so far. */
2344 }
2345
2346 /* Only masters */
2347 if (ri->flags & SRI_MASTER) {
2348 sentinelAskMasterStateToOtherSentinels(ri);
2349 }
2350
2351 /* ============== ACTING HALF ============= */
2352 /* We don't proceed with the acting half if we are in TILT mode.
2353 * TILT happens when we find something odd with the time, like a
2354 * sudden change in the clock. */
2355 if (sentinel.tilt) {
2356 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2357 sentinel.tilt = 0;
2358 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2359 }
2360
2361 /* Every kind of instance */
2362 sentinelCheckSubjectivelyDown(ri);
2363
2364 /* Masters and slaves */
2365 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2366 /* Nothing so far. */
2367 }
2368
2369 /* Only masters */
2370 if (ri->flags & SRI_MASTER) {
2371 sentinelCheckObjectivelyDown(ri);
2372 sentinelStartFailover(ri);
2373 sentinelFailoverStateMachine(ri);
2374 sentinelAbortFailoverIfNeeded(ri);
2375 }
2376 }
2377
2378 /* Perform scheduled operations for all the instances in the dictionary.
2379 * Recursively call the function against dictionaries of slaves. */
2380 void sentinelHandleDictOfRedisInstances(dict *instances) {
2381 dictIterator *di;
2382 dictEntry *de;
2383 sentinelRedisInstance *switch_to_promoted = NULL;
2384
2385 /* There are a number of things we need to perform against every master. */
2386 di = dictGetIterator(instances);
2387 while((de = dictNext(di)) != NULL) {
2388 sentinelRedisInstance *ri = dictGetVal(de);
2389
2390 sentinelHandleRedisInstance(ri);
2391 if (ri->flags & SRI_MASTER) {
2392 sentinelHandleDictOfRedisInstances(ri->slaves);
2393 sentinelHandleDictOfRedisInstances(ri->sentinels);
2394 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
2395 switch_to_promoted = ri;
2396 }
2397 }
2398 }
2399 if (switch_to_promoted)
2400 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
2401 dictReleaseIterator(di);
2402 }
2403
2404 /* This function checks if we need to enter the TITL mode.
2405 *
2406 * The TILT mode is entered if we detect that between two invocations of the
2407 * timer interrupt, a negative amount of time, or too much time has passed.
2408 * Note that we expect that more or less just 100 milliseconds will pass
2409 * if everything is fine. However we'll see a negative number or a
2410 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
2411 * following conditions happen:
2412 *
2413 * 1) The Sentiel process for some time is blocked, for every kind of
2414 * random reason: the load is huge, the computer was freezed for some time
2415 * in I/O or alike, the process was stopped by a signal. Everything.
2416 * 2) The system clock was altered significantly.
2417 *
2418 * Under both this conditions we'll see everything as timed out and failing
2419 * without good reasons. Instead we enter the TILT mode and wait
2420 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
2421 *
2422 * During TILT time we still collect information, we just do not act. */
2423 void sentinelCheckTiltCondition(void) {
2424 mstime_t now = mstime();
2425 mstime_t delta = now - sentinel.previous_time;
2426
2427 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
2428 sentinel.tilt = 1;
2429 sentinel.tilt_start_time = mstime();
2430 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
2431 }
2432 sentinel.previous_time = mstime();
2433 }
2434
2435 void sentinelTimer(void) {
2436 sentinelCheckTiltCondition();
2437 sentinelHandleDictOfRedisInstances(sentinel.masters);
2438 }
2439