]> git.saurik.com Git - redis.git/blame - src/sentinel.c
Sentinel: check that instance still exists in reply callbacks.
[redis.git] / src / sentinel.c
CommitLineData
120ba392 1/* Redis Sentinel implementation
2 * -----------------------------
3 *
4 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32#include "redis.h"
33#include "hiredis.h"
34#include "async.h"
35
36#include <ctype.h>
37#include <arpa/inet.h>
38#include <sys/socket.h>
39
40#define REDIS_SENTINEL_PORT 26379
41
42/* ======================== Sentinel global state =========================== */
43
44typedef long long mstime_t; /* millisecond time type. */
45
46/* Address object, used to describe an ip:port pair. */
47typedef struct sentinelAddr {
48 char *ip;
49 int port;
50} sentinelAddr;
51
52/* A Sentinel Redis Instance object is monitoring. */
53#define SRI_MASTER (1<<0)
54#define SRI_SLAVE (1<<1)
55#define SRI_SENTINEL (1<<2)
56#define SRI_DISCONNECTED (1<<3)
57#define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
58#define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
59#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
60 its master is down. */
61/* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
62 * allowed to perform the failover for this master.
63 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
64 * perform the failover on its master. */
65#define SRI_CAN_FAILOVER (1<<7)
66#define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
67 this master. */
68#define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
69#define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
70#define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
71#define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
72#define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
73
74#define SENTINEL_INFO_PERIOD 10000
75#define SENTINEL_PING_PERIOD 1000
76#define SENTINEL_ASK_PERIOD 1000
77#define SENTINEL_PUBLISH_PERIOD 5000
78#define SENTINEL_DOWN_AFTER_PERIOD 30000
79#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
80#define SENTINEL_TILT_TRIGGER 2000
81#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
82#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
83#define SENTINEL_PROMOTION_RETRY_PERIOD 30000
84#define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
85#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
86#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
87#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
88#define SENTINEL_MAX_PENDING_COMMANDS 100
89#define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
90
91/* How many milliseconds is an information valid? This applies for instance
92 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
93#define SENTINEL_INFO_VALIDITY_TIME 5000
94#define SENTINEL_FAILOVER_FIXED_DELAY 5000
95#define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
96
97/* Failover machine different states. */
98#define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
99#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
100#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
101#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
102#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
103#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
104#define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
105#define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
106#define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
107#define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
108#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
109
110#define SENTINEL_MASTER_LINK_STATUS_UP 0
111#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
112
113typedef struct sentinelRedisInstance {
114 int flags; /* See SRI_... defines */
115 char *name; /* Master name from the point of view of this sentinel. */
116 char *runid; /* run ID of this instance. */
117 sentinelAddr *addr; /* Master host. */
118 redisAsyncContext *cc; /* Hiredis context for commands. */
119 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
120 int pending_commands; /* Number of commands sent waiting for a reply. */
121 mstime_t cc_conn_time; /* cc connection time. */
122 mstime_t pc_conn_time; /* pc connection time. */
123 mstime_t pc_last_activity; /* Last time we received any message. */
124 mstime_t last_avail_time; /* Last time the instance replied to ping with
125 a reply we consider valid. */
126 mstime_t last_pong_time; /* Last time the instance replied to ping,
127 whatever the reply was. That's used to check
128 if the link is idle and must be reconnected. */
129 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
130 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
131 we received an hello from this Sentinel
132 via Pub/Sub. */
133 mstime_t last_master_down_reply_time; /* Time of last reply to
134 SENTINEL is-master-down command. */
135 mstime_t s_down_since_time; /* Subjectively down since time. */
136 mstime_t o_down_since_time; /* Objectively down since time. */
137 mstime_t down_after_period; /* Consider it down after that period. */
138 mstime_t info_refresh; /* Time at which we received INFO output from it. */
139
140 /* Master specific. */
141 dict *sentinels; /* Other sentinels monitoring the same master. */
142 dict *slaves; /* Slaves for this master instance. */
143 int quorum; /* Number of sentinels that need to agree on failure. */
144 int parallel_syncs; /* How many slaves to reconfigure at same time. */
145
146 /* Slave specific. */
147 mstime_t master_link_down_time; /* Slave replication link down time. */
148 int slave_priority; /* Slave priority according to its INFO output. */
149 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
150 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
151 char *slave_master_host; /* Master host as reported by INFO */
152 int slave_master_port; /* Master port as reported by INFO */
153 int slave_master_link_status; /* Master link status as reported by INFO */
154 /* Failover */
155 char *leader; /* If this is a master instance, this is the runid of
156 the Sentinel that should perform the failover. If
157 this is a Sentinel, this is the runid of the Sentinel
158 that this other Sentinel is voting as leader.
159 This field is valid only if SRI_MASTER_DOWN is
160 set on the Sentinel instance. */
161 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
162 mstime_t failover_state_change_time;
163 mstime_t failover_start_time; /* When to start to failover if leader. */
164 mstime_t failover_timeout; /* Max time to refresh failover state. */
165 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
166 /* Scripts executed to notify admin or reconfigure clients: when they
167 * are set to NULL no script is executed. */
168 char *notify_script;
169 char *client_reconfig_script;
170} sentinelRedisInstance;
171
172/* Main state. */
173struct sentinelState {
174 dict *masters; /* Dictionary of master sentinelRedisInstances.
175 Key is the instance name, value is the
176 sentinelRedisInstance structure pointer. */
177 int tilt; /* Are we in TILT mode? */
178 mstime_t tilt_start_time; /* When TITL started. */
179 mstime_t previous_time; /* Time last time we ran the time handler. */
180} sentinel;
181
182/* ======================= hiredis ae.c adapters =============================
183 * Note: this implementation is taken from hiredis/adapters/ae.h, however
184 * we have our modified copy for Sentinel in order to use our allocator
185 * and to have full control over how the adapter works. */
186
187typedef struct redisAeEvents {
188 redisAsyncContext *context;
189 aeEventLoop *loop;
190 int fd;
191 int reading, writing;
192} redisAeEvents;
193
194static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
195 ((void)el); ((void)fd); ((void)mask);
196
197 redisAeEvents *e = (redisAeEvents*)privdata;
198 redisAsyncHandleRead(e->context);
199}
200
201static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
202 ((void)el); ((void)fd); ((void)mask);
203
204 redisAeEvents *e = (redisAeEvents*)privdata;
205 redisAsyncHandleWrite(e->context);
206}
207
208static void redisAeAddRead(void *privdata) {
209 redisAeEvents *e = (redisAeEvents*)privdata;
210 aeEventLoop *loop = e->loop;
211 if (!e->reading) {
212 e->reading = 1;
213 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
214 }
215}
216
217static void redisAeDelRead(void *privdata) {
218 redisAeEvents *e = (redisAeEvents*)privdata;
219 aeEventLoop *loop = e->loop;
220 if (e->reading) {
221 e->reading = 0;
222 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
223 }
224}
225
226static void redisAeAddWrite(void *privdata) {
227 redisAeEvents *e = (redisAeEvents*)privdata;
228 aeEventLoop *loop = e->loop;
229 if (!e->writing) {
230 e->writing = 1;
231 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
232 }
233}
234
235static void redisAeDelWrite(void *privdata) {
236 redisAeEvents *e = (redisAeEvents*)privdata;
237 aeEventLoop *loop = e->loop;
238 if (e->writing) {
239 e->writing = 0;
240 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
241 }
242}
243
244static void redisAeCleanup(void *privdata) {
245 redisAeEvents *e = (redisAeEvents*)privdata;
246 redisAeDelRead(privdata);
247 redisAeDelWrite(privdata);
248 zfree(e);
249}
250
251static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
252 redisContext *c = &(ac->c);
253 redisAeEvents *e;
254
255 /* Nothing should be attached when something is already attached */
256 if (ac->ev.data != NULL)
257 return REDIS_ERR;
258
259 /* Create container for context and r/w events */
260 e = (redisAeEvents*)zmalloc(sizeof(*e));
261 e->context = ac;
262 e->loop = loop;
263 e->fd = c->fd;
264 e->reading = e->writing = 0;
265
266 /* Register functions to start/stop listening for events */
267 ac->ev.addRead = redisAeAddRead;
268 ac->ev.delRead = redisAeDelRead;
269 ac->ev.addWrite = redisAeAddWrite;
270 ac->ev.delWrite = redisAeDelWrite;
271 ac->ev.cleanup = redisAeCleanup;
272 ac->ev.data = e;
273
274 return REDIS_OK;
275}
276
277/* ============================= Prototypes ================================= */
278
279void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
280void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
281void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
282sentinelRedisInstance *sentinelGetMasterByName(char *name);
283char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
284char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
285int yesnotoi(char *s);
286void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
287const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
288
289/* ========================= Dictionary types =============================== */
290
291unsigned int dictSdsHash(const void *key);
292int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
293void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
294
295void dictInstancesValDestructor (void *privdata, void *obj) {
296 releaseSentinelRedisInstance(obj);
297}
298
299/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
300 *
301 * also used for: sentinelRedisInstance->sentinels dictionary that maps
302 * sentinels ip:port to last seen time in Pub/Sub hello message. */
303dictType instancesDictType = {
304 dictSdsHash, /* hash function */
305 NULL, /* key dup */
306 NULL, /* val dup */
307 dictSdsKeyCompare, /* key compare */
308 NULL, /* key destructor */
309 dictInstancesValDestructor /* val destructor */
310};
311
312/* Instance runid (sds) -> votes (long casted to void*)
313 *
314 * This is useful into sentinelGetObjectiveLeader() function in order to
315 * count the votes and understand who is the leader. */
316dictType leaderVotesDictType = {
317 dictSdsHash, /* hash function */
318 NULL, /* key dup */
319 NULL, /* val dup */
320 dictSdsKeyCompare, /* key compare */
321 NULL, /* key destructor */
322 NULL /* val destructor */
323};
324
325/* =========================== Initialization =============================== */
326
327void sentinelCommand(redisClient *c);
328
329struct redisCommand sentinelcmds[] = {
330 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
331 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
332 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
333 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
334 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
335 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}
336};
337
338/* This function overwrites a few normal Redis config default with Sentinel
339 * specific defaults. */
340void initSentinelConfig(void) {
341 server.port = REDIS_SENTINEL_PORT;
342}
343
344/* Perform the Sentinel mode initialization. */
345void initSentinel(void) {
346 int j;
347
348 /* Remove usual Redis commands from the command table, then just add
349 * the SENTINEL command. */
350 dictEmpty(server.commands);
351 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
352 int retval;
353 struct redisCommand *cmd = sentinelcmds+j;
354
355 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
356 redisAssert(retval == DICT_OK);
357 }
358
359 /* Initialize various data structures. */
360 sentinel.masters = dictCreate(&instancesDictType,NULL);
361 sentinel.tilt = 0;
362 sentinel.tilt_start_time = mstime();
363 sentinel.previous_time = mstime();
364}
365
366/* ============================== sentinelAddr ============================== */
367
368/* Create a sentinelAddr object and return it on success.
369 * On error NULL is returned and errno is set to:
370 * ENOENT: Can't resolve the hostname.
371 * EINVAL: Invalid port number.
372 */
373sentinelAddr *createSentinelAddr(char *hostname, int port) {
374 char buf[32];
375 sentinelAddr *sa;
376
377 if (port <= 0 || port > 65535) {
378 errno = EINVAL;
379 return NULL;
380 }
381 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
382 errno = ENOENT;
383 return NULL;
384 }
385 sa = zmalloc(sizeof(*sa));
386 sa->ip = sdsnew(buf);
387 sa->port = port;
388 return sa;
389}
390
391/* Free a Sentinel address. Can't fail. */
392void releaseSentinelAddr(sentinelAddr *sa) {
393 sdsfree(sa->ip);
394 zfree(sa);
395}
396
397/* =========================== Events notification ========================== */
398
399void sentinelCallNotificationScript(char *scriptpath, char *type, char *msg) {
400 /* TODO: implement it. */
401}
402
403/* Send an event to log, pub/sub, user notification script.
404 *
405 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
406 * the execution of the user notification script.
407 *
408 * 'type' is the message type, also used as a pub/sub channel name.
409 *
410 * 'ri', is the redis instance target of this event if applicable, and is
411 * used to obtain the path of the notification script to execute.
412 *
413 * The remaining arguments are printf-alike.
414 * If the format specifier starts with the two characters "%@" then ri is
415 * not NULL, and the message is prefixed with an instance identifier in the
416 * following format:
417 *
418 * <instance type> <instance name> <ip> <port>
419 *
420 * If the instance type is not master, than the additional string is
421 * added to specify the originating master:
422 *
423 * @ <master name> <master ip> <master port>
424 *
425 * Any other specifier after "%@" is processed by printf itself.
426 */
427void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
428 const char *fmt, ...) {
429 va_list ap;
430 char msg[REDIS_MAX_LOGMSG_LEN];
431 robj *channel, *payload;
432
433 /* Handle %@ */
434 if (fmt[0] == '%' && fmt[1] == '@') {
435 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
436 NULL : ri->master;
437
438 if (master) {
439 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
440 sentinelRedisInstanceTypeStr(ri),
441 ri->name, ri->addr->ip, ri->addr->port,
442 master->name, master->addr->ip, master->addr->port);
443 } else {
444 snprintf(msg, sizeof(msg), "%s %s %s %d",
445 sentinelRedisInstanceTypeStr(ri),
446 ri->name, ri->addr->ip, ri->addr->port);
447 }
448 fmt += 2;
449 } else {
450 msg[0] = '\0';
451 }
452
453 /* Use vsprintf for the rest of the formatting if any. */
454 if (fmt[0] != '\0') {
455 va_start(ap, fmt);
456 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
457 va_end(ap);
458 }
459
460 /* Log the message if the log level allows it to be logged. */
461 if (level >= server.verbosity)
462 redisLog(level,"%s %s",type,msg);
463
464 /* Publish the message via Pub/Sub if it's not a debugging one. */
465 if (level != REDIS_DEBUG) {
466 channel = createStringObject(type,strlen(type));
467 payload = createStringObject(msg,strlen(msg));
468 pubsubPublishMessage(channel,payload);
469 decrRefCount(channel);
470 decrRefCount(payload);
471 }
472
473 /* Call the notification script if applicable. */
474 if (level == REDIS_WARNING && ri != NULL) {
475 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
476 ri : ri->master;
477 if (master->notify_script) {
478 sentinelCallNotificationScript(master->notify_script,type,msg);
479 }
480 }
481}
482
483/* ========================== sentinelRedisInstance ========================= */
484
485/* Create a redis instance, the following fields must be populated by the
486 * caller if needed:
487 * runid: set to NULL but will be populated once INFO output is received.
488 * info_refresh: is set to 0 to mean that we never received INFO so far.
489 *
490 * If SRI_MASTER is set into initial flags the instance is added to
491 * sentinel.masters table.
492 *
493 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
494 * instance is added into master->slaves or master->sentinels table.
495 *
496 * If the instance is a slave or sentinel, the name parameter is ignored and
497 * is created automatically as hostname:port.
498 *
499 * The function fails if hostname can't be resolved or port is out of range.
500 * When this happens NULL is returned and errno is set accordingly to the
501 * createSentinelAddr() function.
502 *
503 * The function may also fail and return NULL with errno set to EBUSY if
504 * a master or slave with the same name already exists. */
505sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
506 sentinelRedisInstance *ri;
507 sentinelAddr *addr;
508 dict *table;
509 char slavename[128], *sdsname;
510
511 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
512 redisAssert((flags & SRI_MASTER) || master != NULL);
513
514 /* Check address validity. */
515 addr = createSentinelAddr(hostname,port);
516 if (addr == NULL) return NULL;
517
518 /* For slaves and sentinel we use ip:port as name. */
519 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
520 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
521 name = slavename;
522 }
523
524 /* Make sure the entry is not duplicated. This may happen when the same
525 * name for a master is used multiple times inside the configuration or
526 * if we try to add multiple times a slave or sentinel with same ip/port
527 * to a master. */
528 if (flags & SRI_MASTER) table = sentinel.masters;
529 else if (flags & SRI_SLAVE) table = master->slaves;
530 else if (flags & SRI_SENTINEL) table = master->sentinels;
531 sdsname = sdsnew(name);
532 if (dictFind(table,sdsname)) {
533 sdsfree(sdsname);
534 errno = EBUSY;
535 return NULL;
536 }
537
538 /* Create the instance object. */
539 ri = zmalloc(sizeof(*ri));
540 /* Note that all the instances are started in the disconnected state,
541 * the event loop will take care of connecting them. */
542 ri->flags = flags | SRI_DISCONNECTED;
543 ri->name = sdsname;
544 ri->runid = NULL;
545 ri->addr = addr;
546 ri->cc = NULL;
547 ri->pc = NULL;
548 ri->pending_commands = 0;
549 ri->cc_conn_time = 0;
550 ri->pc_conn_time = 0;
551 ri->pc_last_activity = 0;
552 ri->last_avail_time = mstime();
553 ri->last_pong_time = mstime();
554 ri->last_pub_time = mstime();
555 ri->last_hello_time = mstime();
556 ri->last_master_down_reply_time = mstime();
557 ri->s_down_since_time = 0;
558 ri->o_down_since_time = 0;
559 ri->down_after_period = master ? master->down_after_period :
560 SENTINEL_DOWN_AFTER_PERIOD;
561 ri->master_link_down_time = 0;
562 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
563 ri->slave_reconf_sent_time = 0;
564 ri->slave_master_host = NULL;
565 ri->slave_master_port = 0;
566 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
567 ri->sentinels = dictCreate(&instancesDictType,NULL);
568 ri->quorum = quorum;
569 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
570 ri->master = master;
571 ri->slaves = dictCreate(&instancesDictType,NULL);
572 ri->info_refresh = 0;
573
574 /* Failover state. */
575 ri->leader = NULL;
576 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
577 ri->failover_state_change_time = 0;
578 ri->failover_start_time = 0;
579 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
580 ri->promoted_slave = NULL;
581 ri->notify_script = NULL;
582 ri->client_reconfig_script = NULL;
583
584 /* Add into the right table. */
585 dictAdd(table, ri->name, ri);
586 return ri;
587}
588
589/* Release this instance and all its slaves, sentinels, hiredis connections.
590 * This function also takes care of unlinking the instance from the main
591 * masters table (if it is a master) or from its master sentinels/slaves table
592 * if it is a slave or sentinel. */
593void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
594 /* Release all its slaves or sentinels if any. */
595 dictRelease(ri->sentinels);
596 dictRelease(ri->slaves);
597
598 /* Release hiredis connections. Note that redisAsyncFree() will call
599 * the disconnection callback. */
600 if (ri->cc) {
e01a415d 601 ri->cc->data = NULL;
120ba392 602 redisAsyncFree(ri->cc);
603 ri->cc = NULL;
604 }
605 if (ri->pc) {
e01a415d 606 ri->pc->data = NULL;
120ba392 607 redisAsyncFree(ri->pc);
608 ri->pc = NULL;
609 }
610
611 /* Free other resources. */
612 sdsfree(ri->name);
613 sdsfree(ri->runid);
614 sdsfree(ri->notify_script);
615 sdsfree(ri->client_reconfig_script);
616 sdsfree(ri->slave_master_host);
617 sdsfree(ri->leader);
618 releaseSentinelAddr(ri->addr);
619
620 /* Clear state into the master if needed. */
621 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
622 ri->master->promoted_slave = NULL;
623
624 zfree(ri);
625}
626
627/* Lookup a slave in a master Redis instance, by ip and port. */
628sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
629 sentinelRedisInstance *ri, char *ip, int port)
630{
631 sds key;
632 sentinelRedisInstance *slave;
633
634 redisAssert(ri->flags & SRI_MASTER);
635 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
636 slave = dictFetchValue(ri->slaves,key);
637 sdsfree(key);
638 return slave;
639}
640
641/* Return the name of the type of the instance as a string. */
642const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
643 if (ri->flags & SRI_MASTER) return "master";
644 else if (ri->flags & SRI_SLAVE) return "slave";
645 else if (ri->flags & SRI_SENTINEL) return "sentinel";
646 else return "unknown";
647}
648
649/* This function removes all the instances found in the dictionary of instances
650 * 'd', having either:
651 *
652 * 1) The same ip/port as specified.
653 * 2) The same runid.
654 *
655 * "1" and "2" don't need to verify at the same time, just one is enough.
656 * If "runid" is NULL it is not checked.
657 * Similarly if "ip" is NULL it is not checked.
658 *
659 * This function is useful because every time we add a new Sentinel into
660 * a master's Sentinels dictionary, we want to be very sure about not
661 * having duplicated instances for any reason. This is so important because
662 * we use those other sentinels in order to run our quorum protocol to
663 * understand if it's time to proceeed with the fail over.
664 *
665 * Making sure no duplication is possible we greately improve the robustness
666 * of the quorum (otherwise we may end counting the same instance multiple
667 * times for some reason).
668 *
669 * The function returns the number of Sentinels removed. */
670int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
671 dictIterator *di;
672 dictEntry *de;
673 int removed = 0;
674
675 di = dictGetSafeIterator(master->sentinels);
676 while((de = dictNext(di)) != NULL) {
677 sentinelRedisInstance *ri = dictGetVal(de);
678
679 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
680 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
681 {
682 dictDelete(master->sentinels,ri->name);
683 removed++;
684 }
685 }
686 dictReleaseIterator(di);
687 return removed;
688}
689
690/* Search an instance with the same runid, ip and port into a dictionary
691 * of instances. Return NULL if not found, otherwise return the instance
692 * pointer.
693 *
694 * runid or ip can be NULL. In such a case the search is performed only
695 * by the non-NULL field. */
696sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
697 dictIterator *di;
698 dictEntry *de;
699 sentinelRedisInstance *instance = NULL;
700
701 redisAssert(ip || runid); /* User must pass at least one search param. */
702 di = dictGetIterator(instances);
703 while((de = dictNext(di)) != NULL) {
704 sentinelRedisInstance *ri = dictGetVal(de);
705
706 if (runid && !ri->runid) continue;
707 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
708 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
709 ri->addr->port == port)))
710 {
711 instance = ri;
712 break;
713 }
714 }
715 dictReleaseIterator(di);
716 return instance;
717}
718
719/* Simple master lookup by name */
720sentinelRedisInstance *sentinelGetMasterByName(char *name) {
721 sentinelRedisInstance *ri;
722 sds sdsname = sdsnew(name);
723
724 ri = dictFetchValue(sentinel.masters,sdsname);
725 sdsfree(sdsname);
726 return ri;
727}
728
729/* Add the specified flags to all the instances in the specified dictionary. */
730void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
731 dictIterator *di;
732 dictEntry *de;
733
734 di = dictGetIterator(instances);
735 while((de = dictNext(di)) != NULL) {
736 sentinelRedisInstance *ri = dictGetVal(de);
737 ri->flags |= flags;
738 }
739 dictReleaseIterator(di);
740}
741
742/* Remove the specified flags to all the instances in the specified
743 * dictionary. */
744void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
745 dictIterator *di;
746 dictEntry *de;
747
748 di = dictGetIterator(instances);
749 while((de = dictNext(di)) != NULL) {
750 sentinelRedisInstance *ri = dictGetVal(de);
751 ri->flags &= ~flags;
752 }
753 dictReleaseIterator(di);
754}
755
756/* Reset the state of a monitored master:
757 * 1) Remove all slaves.
758 * 2) Remove all sentinels.
759 * 3) Remove most of the flags resulting from runtime operations.
760 * 4) Reset timers to their default value.
761 * 5) In the process of doing this undo the failover if in progress.
762 * 6) Disconnect the connections with the master (will reconnect automatically).
763 */
764void sentinelResetMaster(sentinelRedisInstance *ri) {
765 redisAssert(ri->flags & SRI_MASTER);
766 dictRelease(ri->slaves);
767 dictRelease(ri->sentinels);
768 ri->slaves = dictCreate(&instancesDictType,NULL);
769 ri->sentinels = dictCreate(&instancesDictType,NULL);
770 if (ri->cc) redisAsyncFree(ri->cc);
771 if (ri->pc) redisAsyncFree(ri->pc);
772 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
773 if (ri->leader) {
774 sdsfree(ri->leader);
775 ri->leader = NULL;
776 }
777 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
778 ri->failover_state_change_time = 0;
779 ri->failover_start_time = 0;
780 ri->promoted_slave = NULL;
781 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
782}
783
784/* Call sentinelResetMaster() on every master with a name matching the specified
785 * pattern. */
786int sentinelResetMastersByPattern(char *pattern) {
787 dictIterator *di;
788 dictEntry *de;
789 int reset = 0;
790
791 di = dictGetIterator(sentinel.masters);
792 while((de = dictNext(di)) != NULL) {
793 sentinelRedisInstance *ri = dictGetVal(de);
794
795 if (ri->name) {
796 if (stringmatch(pattern,ri->name,0)) {
797 sentinelResetMaster(ri);
798 reset++;
799 }
800 }
801 }
802 dictReleaseIterator(di);
803 return reset;
804}
805
806/* ============================ Config handling ============================= */
807char *sentinelHandleConfiguration(char **argv, int argc) {
808 sentinelRedisInstance *ri;
809
810 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
811 /* monitor <name> <host> <port> <quorum> */
812 int quorum = atoi(argv[4]);
813
814 if (quorum <= 0) return "Quorum must be 1 or greater.";
815 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
816 atoi(argv[3]),quorum,NULL) == NULL)
817 {
818 switch(errno) {
819 case EBUSY: return "Duplicated master name.";
820 case ENOENT: return "Can't resolve master instance hostname.";
821 case EINVAL: return "Invalid port number";
822 }
823 }
824 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
825 /* down-after-milliseconds <name> <milliseconds> */
826 ri = sentinelGetMasterByName(argv[1]);
827 if (!ri) return "No such master with specified name.";
828 ri->down_after_period = atoi(argv[2]);
829 if (ri->down_after_period <= 0)
830 return "negative or zero time parameter.";
831 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
832 /* failover-timeout <name> <milliseconds> */
833 ri = sentinelGetMasterByName(argv[1]);
834 if (!ri) return "No such master with specified name.";
835 ri->failover_timeout = atoi(argv[2]);
836 if (ri->failover_timeout <= 0)
837 return "negative or zero time parameter.";
838 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
839 /* can-failover <name> <yes/no> */
840 int yesno = yesnotoi(argv[2]);
841
842 ri = sentinelGetMasterByName(argv[1]);
843 if (!ri) return "No such master with specified name.";
844 if (yesno == -1) return "Argument must be either yes or no.";
845 if (yesno)
846 ri->flags |= SRI_CAN_FAILOVER;
847 else
848 ri->flags &= ~SRI_CAN_FAILOVER;
849 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
850 /* parallel-syncs <name> <milliseconds> */
851 ri = sentinelGetMasterByName(argv[1]);
852 if (!ri) return "No such master with specified name.";
853 ri->parallel_syncs = atoi(argv[2]);
854 } else {
855 return "Unrecognized sentinel configuration statement.";
856 }
857 return NULL;
858}
859
860/* ====================== hiredis connection handling ======================= */
861
862/* This function takes an hiredis context that is in an error condition
863 * and make sure to mark the instance as disconnected performing the
864 * cleanup needed.
865 *
866 * Note: we don't free the hiredis context as hiredis will do it for us
867 * for async conenctions. */
868void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
869 sentinelRedisInstance *ri = c->data;
e01a415d 870 int pubsub;
120ba392 871
e01a415d 872 if (ri == NULL) return; /* The instance no longer exists. */
873
874 pubsub = (ri->pc == c);
120ba392 875 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
876 "%@ #%s", c->errstr);
877 if (pubsub)
878 ri->pc = NULL;
879 else
880 ri->cc = NULL;
881 ri->flags |= SRI_DISCONNECTED;
882}
883
884void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
885 if (status != REDIS_OK) {
886 sentinelDisconnectInstanceFromContext(c);
887 } else {
888 sentinelRedisInstance *ri = c->data;
889 int pubsub = (ri->pc == c);
890
891 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
892 "%@");
893 }
894}
895
896void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
897 sentinelDisconnectInstanceFromContext(c);
898}
899
900/* Create the async connections for the specified instance if the instance
901 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
902 * one of the two links (commands and pub/sub) is missing. */
903void sentinelReconnectInstance(sentinelRedisInstance *ri) {
904 if (!(ri->flags & SRI_DISCONNECTED)) return;
905
906 /* Commands connection. */
907 if (ri->cc == NULL) {
908 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
909 if (ri->cc->err) {
910 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
911 ri->cc->errstr);
912 redisAsyncFree(ri->cc);
913 ri->cc = NULL;
914 } else {
915 ri->cc_conn_time = mstime();
916 ri->cc->data = ri;
917 redisAeAttach(server.el,ri->cc);
918 redisAsyncSetConnectCallback(ri->cc,
919 sentinelLinkEstablishedCallback);
920 redisAsyncSetDisconnectCallback(ri->cc,
921 sentinelDisconnectCallback);
922 }
923 }
924 /* Pub / Sub */
925 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
926 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
927 if (ri->pc->err) {
928 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
929 ri->pc->errstr);
930 redisAsyncFree(ri->pc);
931 ri->pc = NULL;
932 } else {
933 int retval;
934
935 ri->pc_conn_time = mstime();
936 ri->pc->data = ri;
937 redisAeAttach(server.el,ri->pc);
938 redisAsyncSetConnectCallback(ri->pc,
939 sentinelLinkEstablishedCallback);
940 redisAsyncSetDisconnectCallback(ri->pc,
941 sentinelDisconnectCallback);
942 /* Now we subscribe to the Sentinels "Hello" channel. */
943 retval = redisAsyncCommand(ri->pc,
944 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
945 SENTINEL_HELLO_CHANNEL);
946 if (retval != REDIS_OK) {
947 /* If we can't subscribe, the Pub/Sub connection is useless
948 * and we can simply disconnect it and try again. */
949 redisAsyncFree(ri->pc);
950 ri->pc = NULL;
951 return;
952 }
953 }
954 }
955 /* Clear the DISCONNECTED flags only if we have both the connections
956 * (or just the commands connection if this is a slave or a
957 * sentinel instance). */
958 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
959 ri->flags &= ~SRI_DISCONNECTED;
960}
961
962/* ======================== Redis instances pinging ======================== */
963
964/* Process the INFO output from masters. */
965void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
966 sds *lines;
967 int numlines, j;
968 int role = 0;
e01a415d 969 int runid_changed = 0; /* true if runid changed. */
970 int first_runid = 0; /* true if this is the first runid we receive. */
120ba392 971
972 /* The following fields must be reset to a given value in the case they
973 * are not found at all in the INFO output. */
974 ri->master_link_down_time = 0;
975
976 /* Process line by line. */
977 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
978 for (j = 0; j < numlines; j++) {
979 sentinelRedisInstance *slave;
980 sds l = lines[j];
981
982 /* run_id:<40 hex chars>*/
983 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
984 if (ri->runid == NULL) {
985 ri->runid = sdsnewlen(l+7,40);
e01a415d 986 first_runid = 1;
120ba392 987 } else {
e01a415d 988 if (strncmp(ri->runid,l+7,40) != 0) {
989 runid_changed = 1;
990 sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
991 sdsfree(ri->runid);
992 ri->runid = sdsnewlen(l+7,40);
993 }
120ba392 994 }
995 }
996
997 /* slave0:<ip>,<port>,<state> */
998 if ((ri->flags & SRI_MASTER) &&
999 sdslen(l) >= 7 &&
1000 !memcmp(l,"slave",5) && isdigit(l[5]))
1001 {
1002 char *ip, *port, *end;
1003
1004 ip = strchr(l,':'); if (!ip) continue;
1005 ip++; /* Now ip points to start of ip address. */
1006 port = strchr(ip,','); if (!port) continue;
1007 *port = '\0'; /* nul term for easy access. */
1008 port++; /* Now port points to start of port number. */
1009 end = strchr(port,','); if (!end) continue;
1010 *end = '\0'; /* nul term for easy access. */
1011
1012 /* Check if we already have this slave into our table,
1013 * otherwise add it. */
1014 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1015 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1016 atoi(port), ri->quorum,ri)) != NULL)
1017 {
1018 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1019 }
1020 }
1021 }
1022
1023 /* master_link_down_since_seconds:<seconds> */
1024 if (sdslen(l) >= 32 &&
1025 !memcmp(l,"master_link_down_since_seconds",30))
1026 {
1027 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1028 }
1029
1030 /* role:<role> */
1031 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1032 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1033
1034 if (role == SRI_SLAVE) {
1035 /* master_host:<host> */
1036 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1037 sdsfree(ri->slave_master_host);
1038 ri->slave_master_host = sdsnew(l+12);
1039 }
1040
1041 /* master_port:<port> */
1042 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1043 ri->slave_master_port = atoi(l+12);
1044
1045 /* master_link_status:<status> */
1046 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1047 ri->slave_master_link_status =
1048 (strcasecmp(l+19,"up") == 0) ?
1049 SENTINEL_MASTER_LINK_STATUS_UP :
1050 SENTINEL_MASTER_LINK_STATUS_DOWN;
1051 }
1052 }
1053 }
1054 ri->info_refresh = mstime();
1055 sdsfreesplitres(lines,numlines);
1056
1057 if (sentinel.tilt) return;
1058
1059 /* Act if a slave turned into a master. */
1060 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
e01a415d 1061 if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1062 (runid_changed || first_runid))
1063 {
1064 int retval;
1065
1066 /* If a slave turned into a master, but at the same time the
1067 * runid has changed, or it is simply the first time we see and
1068 * INFO output from this instance, this is a reboot with a wrong
1069 * configuration.
1070 *
1071 * Log the event and remove the slave. */
1072 sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
1073 retval = dictDelete(ri->master->slaves,ri->name);
1074 redisAssert(retval == REDIS_OK);
1075 return;
1076 } else if (ri->flags & SRI_PROMOTED) {
120ba392 1077 /* If this is a promoted slave we can change state to the
1078 * failover state machine. */
1079 if (ri->master &&
1080 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1081 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1082 (ri->master->failover_state ==
1083 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1084 {
1085 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1086 ri->master->failover_state_change_time = mstime();
1087 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1088 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1089 ri->master,"%@");
1090 }
1091 } else {
1092 /* Otherwise we interpret this as the start of the failover. */
1093 if (ri->master &&
1094 (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) == 0)
1095 {
1096 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1097 sentinelEvent(REDIS_WARNING,"failover-detected",ri->master,"%@");
1098 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1099 ri->master->failover_state_change_time = mstime();
1100 ri->master->promoted_slave = ri;
1101 ri->flags |= SRI_PROMOTED;
1102 /* We are an observer, so we can only assume that the leader
1103 * is reconfiguring the slave instances. For this reason we
1104 * set all the instances as RECONF_SENT waiting for progresses
1105 * on this side. */
1106 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1107 SRI_RECONF_SENT);
1108 }
1109 }
1110 }
1111
1112 /* Detect if the slave that is in the process of being reconfigured
1113 * changed state. */
1114 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1115 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1116 {
1117 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1118 if ((ri->flags & SRI_RECONF_SENT) &&
1119 ri->slave_master_host &&
1120 strcmp(ri->slave_master_host,
1121 ri->master->promoted_slave->addr->ip) == 0 &&
1122 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1123 {
1124 ri->flags &= ~SRI_RECONF_SENT;
1125 ri->flags |= SRI_RECONF_INPROG;
1126 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1127 }
1128
1129 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1130 if ((ri->flags & SRI_RECONF_INPROG) &&
1131 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1132 {
1133 ri->flags &= ~SRI_RECONF_INPROG;
1134 ri->flags |= SRI_RECONF_DONE;
1135 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1136 /* If we are moving forward (a new slave is now configured)
1137 * we update the change_time as we are conceptually passing
1138 * to the next slave. */
1139 ri->failover_state_change_time = mstime();
1140 }
1141 }
1142}
1143
1144void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1145 sentinelRedisInstance *ri = c->data;
1146 redisReply *r;
1147
8ab7e998 1148 if (ri) ri->pending_commands--;
1149 if (!reply || !ri) return;
120ba392 1150 r = reply;
1151
1152 if (r->type == REDIS_REPLY_STRING) {
1153 sentinelRefreshInstanceInfo(ri,r->str);
1154 }
1155}
1156
1157/* Just discard the reply. We use this when we are not monitoring the return
1158 * value of the command but its effects directly. */
1159void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1160 sentinelRedisInstance *ri = c->data;
1161
8ab7e998 1162 if (ri) ri->pending_commands--;
120ba392 1163}
1164
1165void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1166 sentinelRedisInstance *ri = c->data;
1167 redisReply *r;
1168
8ab7e998 1169 if (ri) ri->pending_commands--;
1170 if (!reply || !ri) return;
120ba392 1171 r = reply;
1172
1173 if (r->type == REDIS_REPLY_STATUS ||
1174 r->type == REDIS_REPLY_ERROR) {
1175 /* Update the "instance available" field only if this is an
1176 * acceptable reply. */
1177 if (strncmp(r->str,"PONG",4) == 0 ||
1178 strncmp(r->str,"LOADING",7) == 0 ||
1179 strncmp(r->str,"MASTERDOWN",10) == 0)
1180 {
1181 ri->last_avail_time = mstime();
1182 }
1183 }
1184 ri->last_pong_time = mstime();
1185}
1186
1187/* This is called when we get the reply about the PUBLISH command we send
1188 * to the master to advertise this sentinel. */
1189void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1190 sentinelRedisInstance *ri = c->data;
1191 redisReply *r;
1192
8ab7e998 1193 if (ri) ri->pending_commands--;
1194 if (!reply || !ri) return;
120ba392 1195 r = reply;
1196
1197 /* Only update pub_time if we actually published our message. Otherwise
1198 * we'll retry against in 100 milliseconds. */
1199 if (r->type != REDIS_REPLY_ERROR)
1200 ri->last_pub_time = mstime();
1201}
1202
1203/* This is our Pub/Sub callback for the Hello channel. It's useful in order
1204 * to discover other sentinels attached at the same master. */
1205void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1206 sentinelRedisInstance *ri = c->data;
1207 redisReply *r;
1208
8ab7e998 1209 if (!reply || !ri) return;
120ba392 1210 r = reply;
1211
1212 /* Update the last activity in the pubsub channel. Note that since we
1213 * receive our messages as well this timestamp can be used to detect
1214 * if the link is probably diconnected even if it seems otherwise. */
1215 ri->pc_last_activity = mstime();
1216
1217 /* Sanity check in the reply we expect, so that the code that follows
1218 * can avoid to check for details. */
1219 if (r->type != REDIS_REPLY_ARRAY ||
1220 r->elements != 3 ||
1221 r->element[0]->type != REDIS_REPLY_STRING ||
1222 r->element[1]->type != REDIS_REPLY_STRING ||
1223 r->element[2]->type != REDIS_REPLY_STRING ||
1224 strcmp(r->element[0]->str,"message") != 0) return;
1225
1226 /* We are not interested in meeting ourselves */
1227 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1228
1229 {
1230 int numtokens, port, removed, canfailover;
1231 char **token = sdssplitlen(r->element[2]->str,
1232 r->element[2]->len,
1233 ":",1,&numtokens);
1234 sentinelRedisInstance *sentinel;
1235
1236 if (numtokens == 4) {
1237 /* First, try to see if we already have this sentinel. */
1238 port = atoi(token[1]);
1239 canfailover = atoi(token[3]);
1240 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1241 ri->sentinels,token[0],port,token[2]);
1242
1243 if (!sentinel) {
1244 /* If not, remove all the sentinels that have the same runid
1245 * OR the same ip/port, because it's either a restart or a
1246 * network topology change. */
1247 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1248 token[2]);
1249 if (removed) {
1250 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1251 "%@ #duplicate of %s:%d or %s",
1252 token[0],port,token[2]);
1253 }
1254
1255 /* Add the new sentinel. */
1256 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1257 token[0],port,ri->quorum,ri);
1258 if (sentinel) {
1259 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1260 /* The runid is NULL after a new instance creation and
1261 * for Sentinels we don't have a later chance to fill it,
1262 * so do it now. */
1263 sentinel->runid = sdsnew(token[2]);
1264 }
1265 }
1266
1267 /* Update the state of the Sentinel. */
1268 if (sentinel) {
1269 sentinel->last_hello_time = mstime();
1270 if (canfailover)
1271 sentinel->flags |= SRI_CAN_FAILOVER;
1272 else
1273 sentinel->flags &= ~SRI_CAN_FAILOVER;
1274 }
1275 }
1276 sdsfreesplitres(token,numtokens);
1277 }
1278}
1279
1280void sentinelPingInstance(sentinelRedisInstance *ri) {
1281 mstime_t now = mstime();
1282 mstime_t info_period;
1283 int retval;
1284
1285 /* Return ASAP if we have already a PING or INFO already pending, or
1286 * in the case the instance is not properly connected. */
1287 if (ri->flags & SRI_DISCONNECTED) return;
1288
1289 /* For INFO, PING, PUBLISH that are not critical commands to send we
1290 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1291 * want to use a lot of memory just because a link is not working
1292 * properly (note that anyway there is a redundant protection about this,
1293 * that is, the link will be disconnected and reconnected if a long
1294 * timeout condition is detected. */
1295 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1296
1297 /* If this is a slave of a master in O_DOWN condition we start sending
1298 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1299 * period. In this state we want to closely monitor slaves in case they
1300 * are turned into masters by another Sentinel, or by the sysadmin. */
1301 if ((ri->flags & SRI_SLAVE) &&
1302 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1303 info_period = 1000;
1304 } else {
1305 info_period = SENTINEL_INFO_PERIOD;
1306 }
1307
1308 if ((ri->flags & SRI_SENTINEL) == 0 &&
1309 (ri->info_refresh == 0 ||
1310 (now - ri->info_refresh) > info_period))
1311 {
1312 /* Send INFO to masters and slaves, not sentinels. */
1313 retval = redisAsyncCommand(ri->cc,
1314 sentinelInfoReplyCallback, NULL, "INFO");
1315 if (retval != REDIS_OK) return;
1316 ri->pending_commands++;
1317 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1318 /* Send PING to all the three kinds of instances. */
1319 retval = redisAsyncCommand(ri->cc,
1320 sentinelPingReplyCallback, NULL, "PING");
1321 if (retval != REDIS_OK) return;
1322 ri->pending_commands++;
1323 } else if ((ri->flags & SRI_MASTER) &&
1324 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1325 {
1326 /* PUBLISH hello messages only to masters. */
1327 struct sockaddr_in sa;
1328 socklen_t salen = sizeof(sa);
1329
1330 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1331 char myaddr[128];
1332
1333 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1334 inet_ntoa(sa.sin_addr), server.port, server.runid,
1335 (ri->flags & SRI_CAN_FAILOVER) != 0);
1336 retval = redisAsyncCommand(ri->cc,
1337 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1338 SENTINEL_HELLO_CHANNEL,myaddr);
1339 if (retval != REDIS_OK) return;
1340 ri->pending_commands++;
1341 }
1342 }
1343}
1344
1345/* =========================== SENTINEL command ============================= */
1346
1347const char *sentinelFailoverStateStr(int state) {
1348 switch(state) {
1349 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1350 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1351 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1352 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1353 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1354 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1355 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1356 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1357 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1358 default: return "unknown";
1359 }
1360}
1361
1362/* Redis instance to Redis protocol representation. */
1363void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1364 char *flags = sdsempty();
1365 void *mbl;
1366 int fields = 0;
1367
1368 mbl = addDeferredMultiBulkLength(c);
1369
1370 addReplyBulkCString(c,"name");
1371 addReplyBulkCString(c,ri->name);
1372 fields++;
1373
1374 addReplyBulkCString(c,"ip");
1375 addReplyBulkCString(c,ri->addr->ip);
1376 fields++;
1377
1378 addReplyBulkCString(c,"port");
1379 addReplyBulkLongLong(c,ri->addr->port);
1380 fields++;
1381
1382 addReplyBulkCString(c,"runid");
1383 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1384 fields++;
1385
1386 addReplyBulkCString(c,"flags");
1387 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1388 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1389 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1390 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1391 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1392 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1393 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1394 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1395 flags = sdscat(flags,"failover_in_progress,");
1396 if (ri->flags & SRI_I_AM_THE_LEADER)
1397 flags = sdscat(flags,"i_am_the_leader,");
1398 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1399 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1400 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1401 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1402
1403 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1404 addReplyBulkCString(c,flags);
1405 sdsfree(flags);
1406 fields++;
1407
1408 addReplyBulkCString(c,"pending-commands");
1409 addReplyBulkLongLong(c,ri->pending_commands);
1410 fields++;
1411
1412 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1413 addReplyBulkCString(c,"failover-state");
1414 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1415 fields++;
1416 }
1417
1418 addReplyBulkCString(c,"last-ok-ping-reply");
1419 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1420 fields++;
1421
1422 addReplyBulkCString(c,"last-ping-reply");
1423 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1424 fields++;
1425
1426 if (ri->flags & SRI_S_DOWN) {
1427 addReplyBulkCString(c,"s-down-time");
1428 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1429 fields++;
1430 }
1431
1432 if (ri->flags & SRI_O_DOWN) {
1433 addReplyBulkCString(c,"o-down-time");
1434 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1435 fields++;
1436 }
1437
1438 /* Masters and Slaves */
1439 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1440 addReplyBulkCString(c,"info-refresh");
1441 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1442 fields++;
1443 }
1444
1445 /* Only masters */
1446 if (ri->flags & SRI_MASTER) {
1447 addReplyBulkCString(c,"num-slaves");
1448 addReplyBulkLongLong(c,dictSize(ri->slaves));
1449 fields++;
1450
1451 addReplyBulkCString(c,"num-other-sentinels");
1452 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1453 fields++;
1454
1455 addReplyBulkCString(c,"quorum");
1456 addReplyBulkLongLong(c,ri->quorum);
1457 fields++;
1458 }
1459
1460 /* Only slaves */
1461 if (ri->flags & SRI_SLAVE) {
1462 addReplyBulkCString(c,"master-link-down-time");
1463 addReplyBulkLongLong(c,ri->master_link_down_time);
1464 fields++;
1465
1466 addReplyBulkCString(c,"master-link-status");
1467 addReplyBulkCString(c,
1468 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1469 "ok" : "err");
1470 fields++;
1471
1472 addReplyBulkCString(c,"master-host");
1473 addReplyBulkCString(c,
1474 ri->slave_master_host ? ri->slave_master_host : "?");
1475 fields++;
1476
1477 addReplyBulkCString(c,"master-port");
1478 addReplyBulkLongLong(c,ri->slave_master_port);
1479 fields++;
1480 }
1481
1482 /* Only sentinels */
1483 if (ri->flags & SRI_SENTINEL) {
1484 addReplyBulkCString(c,"last-hello-message");
1485 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1486 fields++;
1487
1488 addReplyBulkCString(c,"can-failover-its-master");
1489 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1490 fields++;
1491
1492 if (ri->flags & SRI_MASTER_DOWN) {
1493 addReplyBulkCString(c,"subjective-leader");
1494 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1495 fields++;
1496 }
1497 }
1498
1499 setDeferredMultiBulkLength(c,mbl,fields*2);
1500}
1501
1502/* Output a number of instances contanined inside a dictionary as
1503 * Redis protocol. */
1504void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1505 dictIterator *di;
1506 dictEntry *de;
1507
1508 di = dictGetIterator(instances);
1509 addReplyMultiBulkLen(c,dictSize(instances));
1510 while((de = dictNext(di)) != NULL) {
1511 sentinelRedisInstance *ri = dictGetVal(de);
1512
1513 addReplySentinelRedisInstance(c,ri);
1514 }
1515 dictReleaseIterator(di);
1516}
1517
1518/* Lookup the named master into sentinel.masters.
1519 * If the master is not found reply to the client with an error and returns
1520 * NULL. */
1521sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1522 robj *name)
1523{
1524 sentinelRedisInstance *ri;
1525
1526 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1527 if (!ri) {
1528 addReplyError(c,"No such master with that name");
1529 return NULL;
1530 }
1531 return ri;
1532}
1533
1534void sentinelCommand(redisClient *c) {
1535 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1536 /* SENTINEL MASTERS */
1537 if (c->argc != 2) goto numargserr;
1538
1539 addReplyDictOfRedisInstances(c,sentinel.masters);
1540 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1541 /* SENTINEL SLAVES <master-name> */
1542 sentinelRedisInstance *ri;
1543
1544 if (c->argc != 3) goto numargserr;
1545 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1546 return;
1547 addReplyDictOfRedisInstances(c,ri->slaves);
1548 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1549 /* SENTINEL SENTINELS <master-name> */
1550 sentinelRedisInstance *ri;
1551
1552 if (c->argc != 3) goto numargserr;
1553 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1554 return;
1555 addReplyDictOfRedisInstances(c,ri->sentinels);
1556 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1557 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1558 sentinelRedisInstance *ri;
1559 char *leader = NULL;
1560 long port;
1561 int isdown = 0;
1562
1563 if (c->argc != 4) goto numargserr;
1564 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
1565 return;
1566 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
1567 c->argv[2]->ptr,port,NULL);
1568
1569 /* It exists? Is actually a master? Is subjectively down? It's down.
1570 * Note: if we are in tilt mode we always reply with "0". */
1571 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
1572 (ri->flags & SRI_MASTER))
1573 isdown = 1;
1574 if (ri) leader = sentinelGetSubjectiveLeader(ri);
1575
1576 /* Reply with a two-elements multi-bulk reply: down state, leader. */
1577 addReplyMultiBulkLen(c,2);
1578 addReply(c, isdown ? shared.cone : shared.czero);
1579 addReplyBulkCString(c, leader ? leader : "?");
1580 if (leader) sdsfree(leader);
1581 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
1582 /* SENTINEL RESET <pattern> */
1583 if (c->argc != 3) goto numargserr;
1584 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr));
1585 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
1586 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
1587 sentinelRedisInstance *ri;
1588
1589 if (c->argc != 3) goto numargserr;
1590 ri = sentinelGetMasterByName(c->argv[2]->ptr);
1591 if (ri == NULL) {
1592 addReply(c,shared.nullmultibulk);
1593 } else {
1594 sentinelAddr *addr = ri->addr;
1595
1596 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
1597 addr = ri->promoted_slave->addr;
1598 addReplyMultiBulkLen(c,2);
1599 addReplyBulkCString(c,addr->ip);
1600 addReplyBulkLongLong(c,addr->port);
1601 }
1602 } else {
1603 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
1604 (char*)c->argv[1]->ptr);
1605 }
1606 return;
1607
1608numargserr:
1609 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
1610 (char*)c->argv[1]->ptr);
1611}
1612
1613/* ===================== SENTINEL availability checks ======================= */
1614
1615/* Is this instance down from our point of view? */
1616void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
1617 mstime_t elapsed = mstime() - ri->last_avail_time;
1618
1619 /* Check if we are in need for a reconnection of one of the
1620 * links, because we are detecting low activity.
1621 *
1622 * 1) Check if the command link seems connected, was connected not less
1623 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
1624 * idle time that is greater than down_after_period / 2 seconds. */
1625 if (ri->cc &&
1626 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1627 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
1628 {
1629 redisAsyncFree(ri->cc); /* will call the disconnection callback */
1630 }
1631
1632 /* 2) Check if the pubsub link seems connected, was connected not less
1633 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
1634 * activity in the Pub/Sub channel for more than
1635 * SENTINEL_PUBLISH_PERIOD * 3.
1636 */
1637 if (ri->pc &&
1638 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
1639 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
1640 {
1641 redisAsyncFree(ri->pc); /* will call the disconnection callback */
1642 }
1643
1644 /* Update the subjectively down flag. */
1645 if (elapsed > ri->down_after_period) {
1646 /* Is subjectively down */
1647 if ((ri->flags & SRI_S_DOWN) == 0) {
1648 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
1649 ri->s_down_since_time = mstime();
1650 ri->flags |= SRI_S_DOWN;
1651 }
1652 } else {
1653 /* Is subjectively up */
1654 if (ri->flags & SRI_S_DOWN) {
1655 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
1656 ri->flags &= ~SRI_S_DOWN;
1657 }
1658 }
1659}
1660
1661/* Is this instance down accordingly to the configured quorum? */
1662void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
1663 dictIterator *di;
1664 dictEntry *de;
1665 int quorum = 0, odown = 0;
1666
1667 if (master->flags & SRI_S_DOWN) {
1668 /* Is down for enough sentinels? */
1669 quorum = 1; /* the current sentinel. */
1670 /* Count all the other sentinels. */
1671 di = dictGetIterator(master->sentinels);
1672 while((de = dictNext(di)) != NULL) {
1673 sentinelRedisInstance *ri = dictGetVal(de);
1674
1675 if (ri->flags & SRI_MASTER_DOWN) quorum++;
1676 }
1677 dictReleaseIterator(di);
1678 if (quorum >= master->quorum) odown = 1;
1679 }
1680
1681 /* Set the flag accordingly to the outcome. */
1682 if (odown) {
1683 if ((master->flags & SRI_O_DOWN) == 0) {
1684 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
1685 quorum, master->quorum);
1686 master->flags |= SRI_O_DOWN;
1687 master->o_down_since_time = mstime();
1688 }
1689 } else {
1690 if (master->flags & SRI_O_DOWN) {
1691 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
1692 master->flags &= ~SRI_O_DOWN;
1693 }
1694 }
1695}
1696
1697/* Receive the SENTINEL is-master-down-by-addr reply, see the
1698 * sentinelAskMasterStateToOtherSentinels() function for more information. */
1699void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
1700 sentinelRedisInstance *ri = c->data;
1701 redisReply *r;
1702
8ab7e998 1703 if (ri) ri->pending_commands--;
1704 if (!reply || !ri) return;
120ba392 1705 r = reply;
1706
1707 /* Ignore every error or unexpected reply.
1708 * Note that if the command returns an error for any reason we'll
1709 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
1710 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
1711 r->element[0]->type == REDIS_REPLY_INTEGER &&
1712 r->element[1]->type == REDIS_REPLY_STRING)
1713 {
1714 ri->last_master_down_reply_time = mstime();
1715 if (r->element[0]->integer == 1) {
1716 ri->flags |= SRI_MASTER_DOWN;
1717 } else {
1718 ri->flags &= ~SRI_MASTER_DOWN;
1719 }
1720 sdsfree(ri->leader);
1721 ri->leader = sdsnew(r->element[1]->str);
1722 }
1723}
1724
1725/* If we think (subjectively) the master is down, we start sending
1726 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
1727 * in order to get the replies that allow to reach the quorum and
1728 * possibly also mark the master as objectively down. */
1729void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
1730 dictIterator *di;
1731 dictEntry *de;
1732
1733 di = dictGetIterator(master->sentinels);
1734 while((de = dictNext(di)) != NULL) {
1735 sentinelRedisInstance *ri = dictGetVal(de);
1736 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
1737 char port[32];
1738 int retval;
1739
1740 /* If the master state from other sentinel is too old, we clear it. */
1741 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
1742 ri->flags &= ~SRI_MASTER_DOWN;
1743 sdsfree(ri->leader);
1744 ri->leader = NULL;
1745 }
1746
1747 /* Only ask if master is down to other sentinels if:
1748 *
1749 * 1) We believe it is down, or there is a failover in progress.
1750 * 2) Sentinel is connected.
1751 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
1752 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
1753 continue;
1754 if (ri->flags & SRI_DISCONNECTED) continue;
1755 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
1756 continue;
1757
1758 /* Ask */
1759 ll2string(port,sizeof(port),master->addr->port);
1760 retval = redisAsyncCommand(ri->cc,
1761 sentinelReceiveIsMasterDownReply, NULL,
1762 "SENTINEL is-master-down-by-addr %s %s",
1763 master->addr->ip, port);
1764 if (retval == REDIS_OK) ri->pending_commands++;
1765 }
1766 dictReleaseIterator(di);
1767}
1768
1769/* =============================== FAILOVER ================================= */
1770
1771/* Given a master get the "subjective leader", that is, among all the sentinels
1772 * with given characteristics, the one with the lexicographically smaller
1773 * runid. The characteristics required are:
1774 *
1775 * 1) Has SRI_CAN_FAILOVER flag.
1776 * 2) Is not disconnected.
1777 * 3) Recently answered to our ping (no longer than
1778 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
1779 *
1780 * The function returns a pointer to an sds string representing the runid of the
1781 * leader sentinel instance (from our point of view). Otherwise NULL is
1782 * returned if there are no suitable sentinels.
1783 */
1784
1785int compareRunID(const void *a, const void *b) {
1786 char **aptrptr = (char**)a, **bptrptr = (char**)b;
1787 return strcasecmp(*aptrptr, *bptrptr);
1788}
1789
1790char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
1791 dictIterator *di;
1792 dictEntry *de;
1793 char **instance =
1794 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
1795 int instances = 0;
1796 char *leader = NULL;
1797
1798 if (master->flags & SRI_CAN_FAILOVER) {
1799 /* Add myself if I'm a Sentinel that can failover this master. */
1800 instance[instances++] = server.runid;
1801 }
1802
1803 di = dictGetIterator(master->sentinels);
1804 while((de = dictNext(di)) != NULL) {
1805 sentinelRedisInstance *ri = dictGetVal(de);
1806 mstime_t lag = mstime() - ri->last_avail_time;
1807
1808 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
1809 !(ri->flags & SRI_CAN_FAILOVER) ||
1810 (ri->flags & SRI_DISCONNECTED) ||
1811 ri->runid == NULL)
1812 continue;
1813 instance[instances++] = ri->runid;
1814 }
1815 dictReleaseIterator(di);
1816
1817 /* If we have at least one instance passing our checks, order the array
1818 * by runid. */
1819 if (instances) {
1820 qsort(instance,instances,sizeof(char*),compareRunID);
1821 leader = sdsnew(instance[0]);
1822 }
1823 zfree(instance);
1824 return leader;
1825}
1826
1827struct sentinelLeader {
1828 char *runid;
1829 unsigned long votes;
1830};
1831
1832/* Helper function for sentinelGetObjectiveLeader, increment the counter
1833 * relative to the specified runid. */
1834void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
1835 dictEntry *de = dictFind(counters,runid);
1836 uint64_t oldval;
1837
1838 if (de) {
1839 oldval = dictGetUnsignedIntegerVal(de);
1840 dictSetUnsignedIntegerVal(de,oldval+1);
1841 } else {
1842 de = dictAddRaw(counters,runid);
1843 redisAssert(de != NULL);
1844 dictSetUnsignedIntegerVal(de,1);
1845 }
1846}
1847
1848/* Scan all the Sentinels attached to this master to check what is the
1849 * most voted leader among Sentinels. */
1850char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
1851 dict *counters;
1852 dictIterator *di;
1853 dictEntry *de;
1854 unsigned int voters = 0, voters_quorum;
1855 char *myvote;
1856 char *winner = NULL;
1857
1858 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
1859 counters = dictCreate(&leaderVotesDictType,NULL);
1860
1861 /* Count my vote. */
1862 myvote = sentinelGetSubjectiveLeader(master);
1863 if (myvote) {
1864 sentinelObjectiveLeaderIncr(counters,myvote);
1865 voters++;
1866 }
1867
1868 /* Count other sentinels votes */
1869 di = dictGetIterator(master->sentinels);
1870 while((de = dictNext(di)) != NULL) {
1871 sentinelRedisInstance *ri = dictGetVal(de);
1872 if (ri->leader == NULL) continue;
1873 /* If the failover is not already in progress we are only interested
1874 * in Sentinels that believe the master is down. Otherwise the leader
1875 * selection is useful for the "failover-takedown" when the original
1876 * leader fails. In that case we consider all the voters. */
1877 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1878 !(ri->flags & SRI_MASTER_DOWN)) continue;
1879 sentinelObjectiveLeaderIncr(counters,ri->leader);
1880 voters++;
1881 }
1882 dictReleaseIterator(di);
1883 voters_quorum = voters/2+1;
1884
1885 /* Check what's the winner. For the winner to win, it needs two conditions:
1886 * 1) Absolute majority between voters (50% + 1).
1887 * 2) And anyway at least master->quorum votes. */
1888 {
1889 uint64_t max_votes = 0; /* Max votes so far. */
1890
1891 di = dictGetIterator(counters);
1892 while((de = dictNext(di)) != NULL) {
1893 uint64_t votes = dictGetUnsignedIntegerVal(de);
1894
1895 if (max_votes < votes) {
1896 max_votes = votes;
1897 winner = dictGetKey(de);
1898 }
1899 }
1900 dictReleaseIterator(di);
1901 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
1902 winner = NULL;
1903 }
1904 winner = winner ? sdsnew(winner) : NULL;
1905 sdsfree(myvote);
1906 dictRelease(counters);
1907 return winner;
1908}
1909
1910/* This function checks if there are the conditions to start the failover,
1911 * that is:
1912 *
1913 * 1) Enough time has passed since O_DOWN.
1914 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
1915 * 3) We are the objectively leader for this master.
1916 *
1917 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
1918 * and SRI_I_AM_THE_LEADER.
1919 */
1920void sentinelStartFailover(sentinelRedisInstance *master) {
1921 char *leader;
1922 int isleader;
1923
1924 /* We can't failover if the master is not in O_DOWN state or if
1925 * there is not already a failover in progress (to perform the
1926 * takedown if the leader died) or if this Sentinel is not allowed
1927 * to start a failover. */
1928 if (!(master->flags & SRI_CAN_FAILOVER) ||
1929 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
1930
1931 leader = sentinelGetObjectiveLeader(master);
1932 isleader = leader && strcasecmp(leader,server.runid) == 0;
1933 sdsfree(leader);
1934
1935 /* If I'm not the leader, I can't failover for sure. */
1936 if (!isleader) return;
1937
1938 /* If the failover is already in progress there are two options... */
1939 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
1940 if (master->flags & SRI_I_AM_THE_LEADER) {
1941 /* 1) I'm flagged as leader so I already started the failover.
1942 * Just return. */
1943 return;
1944 } else {
1945 mstime_t elapsed = mstime() - master->failover_state_change_time;
1946
1947 /* 2) I'm the new leader, but I'm not flagged as leader in the
1948 * master: I did not started the failover, but the original
1949 * leader has no longer the leadership.
1950 *
1951 * In this case if the failover appears to be lagging
1952 * for at least 25% of the configured failover timeout,
1953 * I can assume I can take control. Otherwise
1954 * it's better to return and wait more. */
1955 if (elapsed < (master->failover_timeout/4)) return;
1956 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
1957 /* We have already an elected slave if we are in
1958 * FAILOVER_IN_PROGRESS state, that is, the slave that we
1959 * observed turning into a master. */
1960 master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1961 /* As an observer we flagged all the slaves as RECONF_SENT but
1962 * now we are in charge of actually sending the reconfiguration
1963 * command so let's clear this flag for all the instances. */
1964 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
1965 SRI_RECONF_SENT);
1966 }
1967 } else {
1968 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set. */
1969 master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
1970 }
1971
1972 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
1973 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
1974
1975 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
1976 * a recovery of a failover started by another sentinel. */
1977 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
1978 master->failover_start_time = mstime() +
1979 SENTINEL_FAILOVER_FIXED_DELAY +
1980 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
1981 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
1982 "%@ #starting in %lld milliseconds",
1983 master->failover_start_time-mstime());
1984 }
1985 master->failover_state_change_time = mstime();
1986}
1987
1988/* Select a suitable slave to promote. The current algorithm only uses
1989 * the following parameters:
1990 *
1991 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
1992 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
1993 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
1994 * 4) master_link_down_time no more than:
1995 * (now - master->s_down_since_time) + (master->down_after_period * 10).
1996 *
1997 * Among all the slaves matching the above conditions we select the slave
1998 * with lower slave_priority. If priority is the same we select the slave
1999 * with lexicographically smaller runid.
2000 *
2001 * The function returns the pointer to the selected slave, otherwise
2002 * NULL if no suitable slave was found.
2003 */
2004
2005int compareSlavesForPromotion(const void *a, const void *b) {
2006 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
2007 **sb = (sentinelRedisInstance **)b;
2008 if ((*sa)->slave_priority != (*sb)->slave_priority)
2009 return (*sa)->slave_priority - (*sb)->slave_priority;
2010 return strcasecmp((*sa)->runid,(*sb)->runid);
2011}
2012
2013sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
2014 sentinelRedisInstance **instance =
2015 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
2016 sentinelRedisInstance *selected = NULL;
2017 int instances = 0;
2018 dictIterator *di;
2019 dictEntry *de;
2020 mstime_t max_master_down_time;
2021
2022 max_master_down_time = (mstime() - master->s_down_since_time) +
2023 (master->down_after_period * 10);
2024
2025 di = dictGetIterator(master->slaves);
2026 while((de = dictNext(di)) != NULL) {
2027 sentinelRedisInstance *slave = dictGetVal(de);
2028 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2029
2030 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2031 if (slave->last_avail_time < info_validity_time) continue;
2032 if (slave->info_refresh < info_validity_time) continue;
2033 if (slave->master_link_down_time > max_master_down_time) continue;
2034 instance[instances++] = slave;
2035 }
2036 dictReleaseIterator(di);
2037 if (instances) {
2038 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2039 compareSlavesForPromotion);
2040 selected = instance[0];
2041 }
2042 zfree(instance);
2043 return selected;
2044}
2045
2046/* ---------------- Failover state machine implementation ------------------- */
2047void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2048 if (mstime() >= ri->failover_start_time) {
2049 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2050 ri->failover_state_change_time = mstime();
2051 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2052 }
2053}
2054
2055void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2056 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2057
2058 if (slave == NULL) {
2059 sentinelEvent(REDIS_WARNING,"-no-good-slave",ri,
2060 "%@ #retrying in %d seconds",
2061 (SENTINEL_FAILOVER_FIXED_DELAY+
2062 SENTINEL_FAILOVER_MAX_RANDOM_DELAY)/1000);
2063 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
2064 ri->failover_start_time = mstime() + SENTINEL_FAILOVER_FIXED_DELAY +
2065 SENTINEL_FAILOVER_MAX_RANDOM_DELAY;
2066 } else {
2067 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2068 slave->flags |= SRI_PROMOTED;
2069 ri->promoted_slave = slave;
2070 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2071 ri->failover_state_change_time = mstime();
2072 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2073 slave, "%@");
2074 }
2075}
2076
2077void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2078 int retval;
2079
2080 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2081
2082 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2083 * We actually register a generic callback for this command as we don't
2084 * really care about the reply. We check if it worked indirectly observing
2085 * if INFO returns a different role (master instead of slave). */
2086 retval = redisAsyncCommand(ri->promoted_slave->cc,
2087 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2088 if (retval != REDIS_OK) return;
2089 ri->promoted_slave->pending_commands++;
2090 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2091 ri->promoted_slave,"%@");
2092 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2093 ri->failover_state_change_time = mstime();
2094}
2095
2096/* We actually wait for promotion indirectly checking with INFO when the
2097 * slave turns into a master. */
2098void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2099 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2100
2101 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2102 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2103 "%@");
2104 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2105 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2106 ri->failover_state_change_time = mstime();
2107 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2108 ri->promoted_slave = NULL;
2109 }
2110}
2111
2112void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2113 int not_reconfigured = 0, timeout = 0;
2114 dictIterator *di;
2115 dictEntry *de;
2116 mstime_t elapsed = mstime() - master->failover_state_change_time;
2117
2118 /* We can't consider failover finished if the promoted slave is
2119 * not reachable. */
2120 if (master->promoted_slave == NULL ||
2121 master->promoted_slave->flags & SRI_S_DOWN) return;
2122
2123 /* The failover terminates once all the reachable slaves are properly
2124 * configured. */
2125 di = dictGetIterator(master->slaves);
2126 while((de = dictNext(di)) != NULL) {
2127 sentinelRedisInstance *slave = dictGetVal(de);
2128
2129 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2130 if (slave->flags & SRI_S_DOWN) continue;
2131 not_reconfigured++;
2132 }
2133 dictReleaseIterator(di);
2134
2135 /* Force end of failover on timeout. */
2136 if (elapsed > master->failover_timeout) {
2137 not_reconfigured = 0;
2138 timeout = 1;
2139 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2140 }
2141
2142 if (not_reconfigured == 0) {
2143 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2144 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2145 master->failover_state_change_time = mstime();
2146 }
2147
2148 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2149 * command to all the slaves still not reconfigured to replicate with
2150 * the new master. */
2151 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2152 dictIterator *di;
2153 dictEntry *de;
2154 char master_port[32];
2155
2156 ll2string(master_port,sizeof(master_port),
2157 master->promoted_slave->addr->port);
2158
2159 di = dictGetIterator(master->slaves);
2160 while((de = dictNext(di)) != NULL) {
2161 sentinelRedisInstance *slave = dictGetVal(de);
2162 int retval;
2163
2164 if (slave->flags &
2165 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2166
2167 retval = redisAsyncCommand(slave->cc,
2168 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2169 master->promoted_slave->addr->ip,
2170 master_port);
2171 if (retval == REDIS_OK) {
2172 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2173 slave->flags |= SRI_RECONF_SENT;
2174 }
2175 }
2176 dictReleaseIterator(di);
2177 }
2178}
2179
2180/* Send SLAVE OF <new master address> to all the remaining slaves that
2181 * still don't appear to have the configuration updated. */
2182void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2183 dictIterator *di;
2184 dictEntry *de;
2185 int in_progress = 0;
2186
2187 di = dictGetIterator(master->slaves);
2188 while((de = dictNext(di)) != NULL) {
2189 sentinelRedisInstance *slave = dictGetVal(de);
2190
2191 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2192 in_progress++;
2193 }
2194 dictReleaseIterator(di);
2195
2196 di = dictGetIterator(master->slaves);
2197 while(in_progress < master->parallel_syncs &&
2198 (de = dictNext(di)) != NULL)
2199 {
2200 sentinelRedisInstance *slave = dictGetVal(de);
2201 int retval;
2202 char master_port[32];
2203
2204 /* Skip the promoted slave, and already configured slaves. */
2205 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2206
2207 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2208 * the slave moving forward to the next state. */
2209 if ((slave->flags & SRI_RECONF_SENT) &&
2210 (mstime() - slave->slave_reconf_sent_time) >
2211 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2212 {
2213 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2214 slave->flags &= ~SRI_RECONF_SENT;
2215 }
2216
2217 /* Nothing to do for instances that are disconnected or already
2218 * in RECONF_SENT state. */
2219 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2220 continue;
2221
2222 /* Send SLAVEOF <new master>. */
2223 ll2string(master_port,sizeof(master_port),
2224 master->promoted_slave->addr->port);
2225 retval = redisAsyncCommand(slave->cc,
2226 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2227 master->promoted_slave->addr->ip,
2228 master_port);
2229 if (retval == REDIS_OK) {
2230 slave->flags |= SRI_RECONF_SENT;
2231 slave->pending_commands++;
2232 slave->slave_reconf_sent_time = mstime();
2233 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2234 in_progress++;
2235 }
2236 }
2237 dictReleaseIterator(di);
2238 sentinelFailoverDetectEnd(master);
2239}
2240
2241/* This function is called when the slave is in
2242 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2243 * to remove it from the master table and add the promoted slave instead.
2244 *
2245 * If there are no promoted slaves as this instance is unique, we remove
2246 * and re-add it with the same address to trigger a complete state
2247 * refresh. */
2248void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2249 sentinelRedisInstance *new, *ref = master->promoted_slave ?
2250 master->promoted_slave : master;
2251 int quorum = ref->quorum, parallel_syncs = ref->parallel_syncs;
2252 char *name = sdsnew(master->name);
2253 char *ip = sdsnew(ref->addr->ip), *oldip = sdsnew(master->addr->ip);
2254 int port = ref->addr->port, oldport = master->addr->port;
2255 int retval, oldflags = master->flags;
2256 mstime_t old_down_after_period = master->down_after_period;
2257 mstime_t old_failover_timeout = master->failover_timeout;
2258
2259 retval = dictDelete(sentinel.masters,master->name);
2260 redisAssert(retval == DICT_OK);
2261 new = createSentinelRedisInstance(name,SRI_MASTER,ip,port,quorum,NULL);
2262 redisAssert(new != NULL);
2263 new->parallel_syncs = parallel_syncs;
2264 new->flags |= (oldflags & SRI_CAN_FAILOVER);
2265 new->down_after_period = old_down_after_period;
2266 new->failover_timeout = old_failover_timeout;
2267 /* TODO: ... set the scripts as well. */
2268 sentinelEvent(REDIS_WARNING,"+switch-master",new,"%s %s %d %s %d",
2269 name, oldip, oldport, ip, port);
2270 sdsfree(name);
2271 sdsfree(ip);
2272 sdsfree(oldip);
2273}
2274
2275void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2276 redisAssert(ri->flags & SRI_MASTER);
2277
2278 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2279
2280 switch(ri->failover_state) {
2281 case SENTINEL_FAILOVER_STATE_WAIT_START:
2282 sentinelFailoverWaitStart(ri);
2283 break;
2284 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2285 sentinelFailoverSelectSlave(ri);
2286 break;
2287 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2288 sentinelFailoverSendSlaveOfNoOne(ri);
2289 break;
2290 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2291 sentinelFailoverWaitPromotion(ri);
2292 break;
2293 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2294 sentinelFailoverReconfNextSlave(ri);
2295 break;
2296 case SENTINEL_FAILOVER_STATE_DETECT_END:
2297 sentinelFailoverDetectEnd(ri);
2298 break;
2299 }
2300}
2301
2302/* The following is called only for master instances and will abort the
2303 * failover process if:
2304 *
2305 * 1) The failover is in progress.
2306 * 2) We already promoted a slave.
2307 * 3) The promoted slave is in extended SDOWN condition.
2308 */
2309void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2310 dictIterator *di;
2311 dictEntry *de;
2312
2313 /* Failover is in progress? Do we have a promoted slave? */
2314 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2315
2316 /* Is the promoted slave into an extended SDOWN state? */
2317 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2318 (mstime() - ri->promoted_slave->s_down_since_time) <
2319 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2320
2321 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2322
2323 /* Clear failover related flags from slaves.
2324 * Also if we are the leader make sure to send SLAVEOF commands to all the
2325 * already reconfigured slaves in order to turn them back into slaves of
2326 * the original master. */
2327
2328 di = dictGetIterator(ri->slaves);
2329 while((de = dictNext(di)) != NULL) {
2330 sentinelRedisInstance *slave = dictGetVal(de);
2331 if (ri->flags & SRI_I_AM_THE_LEADER) {
2332 char master_port[32];
2333 int retval;
2334
2335 ll2string(master_port,sizeof(master_port),ri->addr->port);
2336 retval = redisAsyncCommand(slave->cc,
2337 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2338 ri->addr->ip,
2339 master_port);
2340 if (retval == REDIS_OK)
2341 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2342 }
2343 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2344 }
2345 dictReleaseIterator(di);
2346
2347 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER);
2348 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2349 ri->failover_state_change_time = mstime();
2350 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2351 ri->promoted_slave = NULL;
2352}
2353
2354/* ======================== SENTINEL timer handler ==========================
2355 * This is the "main" our Sentinel, being sentinel completely non blocking
2356 * in design. The function is called every second.
2357 * -------------------------------------------------------------------------- */
2358
2359/* Perform scheduled operations for the specified Redis instance. */
2360void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2361 /* ========== MONITORING HALF ============ */
2362 /* Every kind of instance */
2363 sentinelReconnectInstance(ri);
2364 sentinelPingInstance(ri);
2365
2366 /* Masters and slaves */
2367 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2368 /* Nothing so far. */
2369 }
2370
2371 /* Only masters */
2372 if (ri->flags & SRI_MASTER) {
2373 sentinelAskMasterStateToOtherSentinels(ri);
2374 }
2375
2376 /* ============== ACTING HALF ============= */
2377 /* We don't proceed with the acting half if we are in TILT mode.
2378 * TILT happens when we find something odd with the time, like a
2379 * sudden change in the clock. */
2380 if (sentinel.tilt) {
2381 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2382 sentinel.tilt = 0;
2383 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2384 }
2385
2386 /* Every kind of instance */
2387 sentinelCheckSubjectivelyDown(ri);
2388
2389 /* Masters and slaves */
2390 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2391 /* Nothing so far. */
2392 }
2393
2394 /* Only masters */
2395 if (ri->flags & SRI_MASTER) {
2396 sentinelCheckObjectivelyDown(ri);
2397 sentinelStartFailover(ri);
2398 sentinelFailoverStateMachine(ri);
2399 sentinelAbortFailoverIfNeeded(ri);
2400 }
2401}
2402
2403/* Perform scheduled operations for all the instances in the dictionary.
2404 * Recursively call the function against dictionaries of slaves. */
2405void sentinelHandleDictOfRedisInstances(dict *instances) {
2406 dictIterator *di;
2407 dictEntry *de;
2408 sentinelRedisInstance *switch_to_promoted = NULL;
2409
2410 /* There are a number of things we need to perform against every master. */
2411 di = dictGetIterator(instances);
2412 while((de = dictNext(di)) != NULL) {
2413 sentinelRedisInstance *ri = dictGetVal(de);
2414
2415 sentinelHandleRedisInstance(ri);
2416 if (ri->flags & SRI_MASTER) {
2417 sentinelHandleDictOfRedisInstances(ri->slaves);
2418 sentinelHandleDictOfRedisInstances(ri->sentinels);
2419 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
2420 switch_to_promoted = ri;
2421 }
2422 }
2423 }
2424 if (switch_to_promoted)
2425 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
2426 dictReleaseIterator(di);
2427}
2428
2429/* This function checks if we need to enter the TITL mode.
2430 *
2431 * The TILT mode is entered if we detect that between two invocations of the
2432 * timer interrupt, a negative amount of time, or too much time has passed.
2433 * Note that we expect that more or less just 100 milliseconds will pass
2434 * if everything is fine. However we'll see a negative number or a
2435 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
2436 * following conditions happen:
2437 *
2438 * 1) The Sentiel process for some time is blocked, for every kind of
2439 * random reason: the load is huge, the computer was freezed for some time
2440 * in I/O or alike, the process was stopped by a signal. Everything.
2441 * 2) The system clock was altered significantly.
2442 *
2443 * Under both this conditions we'll see everything as timed out and failing
2444 * without good reasons. Instead we enter the TILT mode and wait
2445 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
2446 *
2447 * During TILT time we still collect information, we just do not act. */
2448void sentinelCheckTiltCondition(void) {
2449 mstime_t now = mstime();
2450 mstime_t delta = now - sentinel.previous_time;
2451
2452 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
2453 sentinel.tilt = 1;
2454 sentinel.tilt_start_time = mstime();
2455 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
2456 }
2457 sentinel.previous_time = mstime();
2458}
2459
2460void sentinelTimer(void) {
2461 sentinelCheckTiltCondition();
2462 sentinelHandleDictOfRedisInstances(sentinel.masters);
2463}
2464