]> git.saurik.com Git - redis.git/blob - src/sentinel.c
Safer handling of MULTI/EXEC on errors.
[redis.git] / src / sentinel.c
1 /* Redis Sentinel implementation
2 *
3 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice,
10 * this list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 * * Neither the name of Redis nor the names of its contributors may be used
15 * to endorse or promote products derived from this software without
16 * specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28 * POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #include "redis.h"
32 #include "hiredis.h"
33 #include "async.h"
34
35 #include <ctype.h>
36 #include <arpa/inet.h>
37 #include <sys/socket.h>
38 #include <sys/wait.h>
39
40 extern char **environ;
41
42 #define REDIS_SENTINEL_PORT 26379
43
44 /* ======================== Sentinel global state =========================== */
45
46 typedef long long mstime_t; /* millisecond time type. */
47
48 /* Address object, used to describe an ip:port pair. */
49 typedef struct sentinelAddr {
50 char *ip;
51 int port;
52 } sentinelAddr;
53
54 /* A Sentinel Redis Instance object is monitoring. */
55 #define SRI_MASTER (1<<0)
56 #define SRI_SLAVE (1<<1)
57 #define SRI_SENTINEL (1<<2)
58 #define SRI_DISCONNECTED (1<<3)
59 #define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
60 #define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
61 #define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
62 its master is down. */
63 /* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
64 * allowed to perform the failover for this master.
65 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
66 * perform the failover on its master. */
67 #define SRI_CAN_FAILOVER (1<<7)
68 #define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
69 this master. */
70 #define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
71 #define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
72 #define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
73 #define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
74 #define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
75 #define SRI_FORCE_FAILOVER (1<<14) /* Force failover with master up. */
76 #define SRI_SCRIPT_KILL_SENT (1<<15) /* SCRIPT KILL already sent on -BUSY */
77
78 #define SENTINEL_INFO_PERIOD 10000
79 #define SENTINEL_PING_PERIOD 1000
80 #define SENTINEL_ASK_PERIOD 1000
81 #define SENTINEL_PUBLISH_PERIOD 5000
82 #define SENTINEL_DOWN_AFTER_PERIOD 30000
83 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
84 #define SENTINEL_TILT_TRIGGER 2000
85 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
86 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
87 #define SENTINEL_PROMOTION_RETRY_PERIOD 30000
88 #define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
89 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
90 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
91 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
92 #define SENTINEL_MAX_PENDING_COMMANDS 100
93 #define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
94
95 /* How many milliseconds is an information valid? This applies for instance
96 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
97 #define SENTINEL_INFO_VALIDITY_TIME 5000
98 #define SENTINEL_FAILOVER_FIXED_DELAY 5000
99 #define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
100
101 /* Failover machine different states. */
102 #define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
103 #define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
104 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
105 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
106 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
107 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
108 #define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
109 #define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
110 #define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
111 #define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
112 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
113
114 #define SENTINEL_MASTER_LINK_STATUS_UP 0
115 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
116
117 /* Generic flags that can be used with different functions. */
118 #define SENTINEL_NO_FLAGS 0
119 #define SENTINEL_GENERATE_EVENT 1
120 #define SENTINEL_LEADER 2
121 #define SENTINEL_OBSERVER 4
122
123 /* Script execution flags and limits. */
124 #define SENTINEL_SCRIPT_NONE 0
125 #define SENTINEL_SCRIPT_RUNNING 1
126 #define SENTINEL_SCRIPT_MAX_QUEUE 256
127 #define SENTINEL_SCRIPT_MAX_RUNNING 16
128 #define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
129 #define SENTINEL_SCRIPT_MAX_RETRY 10
130 #define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
131
132 typedef struct sentinelRedisInstance {
133 int flags; /* See SRI_... defines */
134 char *name; /* Master name from the point of view of this sentinel. */
135 char *runid; /* run ID of this instance. */
136 sentinelAddr *addr; /* Master host. */
137 redisAsyncContext *cc; /* Hiredis context for commands. */
138 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
139 int pending_commands; /* Number of commands sent waiting for a reply. */
140 mstime_t cc_conn_time; /* cc connection time. */
141 mstime_t pc_conn_time; /* pc connection time. */
142 mstime_t pc_last_activity; /* Last time we received any message. */
143 mstime_t last_avail_time; /* Last time the instance replied to ping with
144 a reply we consider valid. */
145 mstime_t last_pong_time; /* Last time the instance replied to ping,
146 whatever the reply was. That's used to check
147 if the link is idle and must be reconnected. */
148 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
149 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
150 we received an hello from this Sentinel
151 via Pub/Sub. */
152 mstime_t last_master_down_reply_time; /* Time of last reply to
153 SENTINEL is-master-down command. */
154 mstime_t s_down_since_time; /* Subjectively down since time. */
155 mstime_t o_down_since_time; /* Objectively down since time. */
156 mstime_t down_after_period; /* Consider it down after that period. */
157 mstime_t info_refresh; /* Time at which we received INFO output from it. */
158
159 /* Master specific. */
160 dict *sentinels; /* Other sentinels monitoring the same master. */
161 dict *slaves; /* Slaves for this master instance. */
162 int quorum; /* Number of sentinels that need to agree on failure. */
163 int parallel_syncs; /* How many slaves to reconfigure at same time. */
164 char *auth_pass; /* Password to use for AUTH against master & slaves. */
165
166 /* Slave specific. */
167 mstime_t master_link_down_time; /* Slave replication link down time. */
168 int slave_priority; /* Slave priority according to its INFO output. */
169 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
170 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
171 char *slave_master_host; /* Master host as reported by INFO */
172 int slave_master_port; /* Master port as reported by INFO */
173 int slave_master_link_status; /* Master link status as reported by INFO */
174 /* Failover */
175 char *leader; /* If this is a master instance, this is the runid of
176 the Sentinel that should perform the failover. If
177 this is a Sentinel, this is the runid of the Sentinel
178 that this other Sentinel is voting as leader.
179 This field is valid only if SRI_MASTER_DOWN is
180 set on the Sentinel instance. */
181 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
182 mstime_t failover_state_change_time;
183 mstime_t failover_start_time; /* When to start to failover if leader. */
184 mstime_t failover_timeout; /* Max time to refresh failover state. */
185 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
186 /* Scripts executed to notify admin or reconfigure clients: when they
187 * are set to NULL no script is executed. */
188 char *notification_script;
189 char *client_reconfig_script;
190 } sentinelRedisInstance;
191
192 /* Main state. */
193 struct sentinelState {
194 dict *masters; /* Dictionary of master sentinelRedisInstances.
195 Key is the instance name, value is the
196 sentinelRedisInstance structure pointer. */
197 int tilt; /* Are we in TILT mode? */
198 int running_scripts; /* Number of scripts in execution right now. */
199 mstime_t tilt_start_time; /* When TITL started. */
200 mstime_t previous_time; /* Time last time we ran the time handler. */
201 list *scripts_queue; /* Queue of user scripts to execute. */
202 } sentinel;
203
204 /* A script execution job. */
205 typedef struct sentinelScriptJob {
206 int flags; /* Script job flags: SENTINEL_SCRIPT_* */
207 int retry_num; /* Number of times we tried to execute it. */
208 char **argv; /* Arguments to call the script. */
209 mstime_t start_time; /* Script execution time if the script is running,
210 otherwise 0 if we are allowed to retry the
211 execution at any time. If the script is not
212 running and it's not 0, it means: do not run
213 before the specified time. */
214 pid_t pid; /* Script execution pid. */
215 } sentinelScriptJob;
216
217 /* ======================= hiredis ae.c adapters =============================
218 * Note: this implementation is taken from hiredis/adapters/ae.h, however
219 * we have our modified copy for Sentinel in order to use our allocator
220 * and to have full control over how the adapter works. */
221
222 typedef struct redisAeEvents {
223 redisAsyncContext *context;
224 aeEventLoop *loop;
225 int fd;
226 int reading, writing;
227 } redisAeEvents;
228
229 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
230 ((void)el); ((void)fd); ((void)mask);
231
232 redisAeEvents *e = (redisAeEvents*)privdata;
233 redisAsyncHandleRead(e->context);
234 }
235
236 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
237 ((void)el); ((void)fd); ((void)mask);
238
239 redisAeEvents *e = (redisAeEvents*)privdata;
240 redisAsyncHandleWrite(e->context);
241 }
242
243 static void redisAeAddRead(void *privdata) {
244 redisAeEvents *e = (redisAeEvents*)privdata;
245 aeEventLoop *loop = e->loop;
246 if (!e->reading) {
247 e->reading = 1;
248 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
249 }
250 }
251
252 static void redisAeDelRead(void *privdata) {
253 redisAeEvents *e = (redisAeEvents*)privdata;
254 aeEventLoop *loop = e->loop;
255 if (e->reading) {
256 e->reading = 0;
257 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
258 }
259 }
260
261 static void redisAeAddWrite(void *privdata) {
262 redisAeEvents *e = (redisAeEvents*)privdata;
263 aeEventLoop *loop = e->loop;
264 if (!e->writing) {
265 e->writing = 1;
266 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
267 }
268 }
269
270 static void redisAeDelWrite(void *privdata) {
271 redisAeEvents *e = (redisAeEvents*)privdata;
272 aeEventLoop *loop = e->loop;
273 if (e->writing) {
274 e->writing = 0;
275 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
276 }
277 }
278
279 static void redisAeCleanup(void *privdata) {
280 redisAeEvents *e = (redisAeEvents*)privdata;
281 redisAeDelRead(privdata);
282 redisAeDelWrite(privdata);
283 zfree(e);
284 }
285
286 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
287 redisContext *c = &(ac->c);
288 redisAeEvents *e;
289
290 /* Nothing should be attached when something is already attached */
291 if (ac->ev.data != NULL)
292 return REDIS_ERR;
293
294 /* Create container for context and r/w events */
295 e = (redisAeEvents*)zmalloc(sizeof(*e));
296 e->context = ac;
297 e->loop = loop;
298 e->fd = c->fd;
299 e->reading = e->writing = 0;
300
301 /* Register functions to start/stop listening for events */
302 ac->ev.addRead = redisAeAddRead;
303 ac->ev.delRead = redisAeDelRead;
304 ac->ev.addWrite = redisAeAddWrite;
305 ac->ev.delWrite = redisAeDelWrite;
306 ac->ev.cleanup = redisAeCleanup;
307 ac->ev.data = e;
308
309 return REDIS_OK;
310 }
311
312 /* ============================= Prototypes ================================= */
313
314 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
315 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
316 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
317 sentinelRedisInstance *sentinelGetMasterByName(char *name);
318 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
319 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
320 int yesnotoi(char *s);
321 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
322 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
323 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
324 void sentinelAbortFailover(sentinelRedisInstance *ri);
325 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
326 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
327 void sentinelScheduleScriptExecution(char *path, ...);
328 void sentinelStartFailover(sentinelRedisInstance *master, int state);
329 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
330
331 /* ========================= Dictionary types =============================== */
332
333 unsigned int dictSdsHash(const void *key);
334 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
335 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
336
337 void dictInstancesValDestructor (void *privdata, void *obj) {
338 releaseSentinelRedisInstance(obj);
339 }
340
341 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
342 *
343 * also used for: sentinelRedisInstance->sentinels dictionary that maps
344 * sentinels ip:port to last seen time in Pub/Sub hello message. */
345 dictType instancesDictType = {
346 dictSdsHash, /* hash function */
347 NULL, /* key dup */
348 NULL, /* val dup */
349 dictSdsKeyCompare, /* key compare */
350 NULL, /* key destructor */
351 dictInstancesValDestructor /* val destructor */
352 };
353
354 /* Instance runid (sds) -> votes (long casted to void*)
355 *
356 * This is useful into sentinelGetObjectiveLeader() function in order to
357 * count the votes and understand who is the leader. */
358 dictType leaderVotesDictType = {
359 dictSdsHash, /* hash function */
360 NULL, /* key dup */
361 NULL, /* val dup */
362 dictSdsKeyCompare, /* key compare */
363 NULL, /* key destructor */
364 NULL /* val destructor */
365 };
366
367 /* =========================== Initialization =============================== */
368
369 void sentinelCommand(redisClient *c);
370 void sentinelInfoCommand(redisClient *c);
371
372 struct redisCommand sentinelcmds[] = {
373 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
374 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
375 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
376 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
377 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
378 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
379 {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0}
380 };
381
382 /* This function overwrites a few normal Redis config default with Sentinel
383 * specific defaults. */
384 void initSentinelConfig(void) {
385 server.port = REDIS_SENTINEL_PORT;
386 }
387
388 /* Perform the Sentinel mode initialization. */
389 void initSentinel(void) {
390 int j;
391
392 /* Remove usual Redis commands from the command table, then just add
393 * the SENTINEL command. */
394 dictEmpty(server.commands);
395 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
396 int retval;
397 struct redisCommand *cmd = sentinelcmds+j;
398
399 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
400 redisAssert(retval == DICT_OK);
401 }
402
403 /* Initialize various data structures. */
404 sentinel.masters = dictCreate(&instancesDictType,NULL);
405 sentinel.tilt = 0;
406 sentinel.tilt_start_time = mstime();
407 sentinel.previous_time = mstime();
408 sentinel.running_scripts = 0;
409 sentinel.scripts_queue = listCreate();
410 }
411
412 /* ============================== sentinelAddr ============================== */
413
414 /* Create a sentinelAddr object and return it on success.
415 * On error NULL is returned and errno is set to:
416 * ENOENT: Can't resolve the hostname.
417 * EINVAL: Invalid port number.
418 */
419 sentinelAddr *createSentinelAddr(char *hostname, int port) {
420 char buf[32];
421 sentinelAddr *sa;
422
423 if (port <= 0 || port > 65535) {
424 errno = EINVAL;
425 return NULL;
426 }
427 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
428 errno = ENOENT;
429 return NULL;
430 }
431 sa = zmalloc(sizeof(*sa));
432 sa->ip = sdsnew(buf);
433 sa->port = port;
434 return sa;
435 }
436
437 /* Free a Sentinel address. Can't fail. */
438 void releaseSentinelAddr(sentinelAddr *sa) {
439 sdsfree(sa->ip);
440 zfree(sa);
441 }
442
443 /* =========================== Events notification ========================== */
444
445 /* Send an event to log, pub/sub, user notification script.
446 *
447 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
448 * the execution of the user notification script.
449 *
450 * 'type' is the message type, also used as a pub/sub channel name.
451 *
452 * 'ri', is the redis instance target of this event if applicable, and is
453 * used to obtain the path of the notification script to execute.
454 *
455 * The remaining arguments are printf-alike.
456 * If the format specifier starts with the two characters "%@" then ri is
457 * not NULL, and the message is prefixed with an instance identifier in the
458 * following format:
459 *
460 * <instance type> <instance name> <ip> <port>
461 *
462 * If the instance type is not master, than the additional string is
463 * added to specify the originating master:
464 *
465 * @ <master name> <master ip> <master port>
466 *
467 * Any other specifier after "%@" is processed by printf itself.
468 */
469 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
470 const char *fmt, ...) {
471 va_list ap;
472 char msg[REDIS_MAX_LOGMSG_LEN];
473 robj *channel, *payload;
474
475 /* Handle %@ */
476 if (fmt[0] == '%' && fmt[1] == '@') {
477 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
478 NULL : ri->master;
479
480 if (master) {
481 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
482 sentinelRedisInstanceTypeStr(ri),
483 ri->name, ri->addr->ip, ri->addr->port,
484 master->name, master->addr->ip, master->addr->port);
485 } else {
486 snprintf(msg, sizeof(msg), "%s %s %s %d",
487 sentinelRedisInstanceTypeStr(ri),
488 ri->name, ri->addr->ip, ri->addr->port);
489 }
490 fmt += 2;
491 } else {
492 msg[0] = '\0';
493 }
494
495 /* Use vsprintf for the rest of the formatting if any. */
496 if (fmt[0] != '\0') {
497 va_start(ap, fmt);
498 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
499 va_end(ap);
500 }
501
502 /* Log the message if the log level allows it to be logged. */
503 if (level >= server.verbosity)
504 redisLog(level,"%s %s",type,msg);
505
506 /* Publish the message via Pub/Sub if it's not a debugging one. */
507 if (level != REDIS_DEBUG) {
508 channel = createStringObject(type,strlen(type));
509 payload = createStringObject(msg,strlen(msg));
510 pubsubPublishMessage(channel,payload);
511 decrRefCount(channel);
512 decrRefCount(payload);
513 }
514
515 /* Call the notification script if applicable. */
516 if (level == REDIS_WARNING && ri != NULL) {
517 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
518 ri : ri->master;
519 if (master->notification_script) {
520 sentinelScheduleScriptExecution(master->notification_script,
521 type,msg,NULL);
522 }
523 }
524 }
525
526 /* ============================ script execution ============================ */
527
528 /* Release a script job structure and all the associated data. */
529 void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
530 int j = 0;
531
532 while(sj->argv[j]) sdsfree(sj->argv[j++]);
533 zfree(sj->argv);
534 zfree(sj);
535 }
536
537 #define SENTINEL_SCRIPT_MAX_ARGS 16
538 void sentinelScheduleScriptExecution(char *path, ...) {
539 va_list ap;
540 char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
541 int argc = 1;
542 sentinelScriptJob *sj;
543
544 va_start(ap, path);
545 while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
546 argv[argc] = va_arg(ap,char*);
547 if (!argv[argc]) break;
548 argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
549 argc++;
550 }
551 va_end(ap);
552 argv[0] = sdsnew(path);
553
554 sj = zmalloc(sizeof(*sj));
555 sj->flags = SENTINEL_SCRIPT_NONE;
556 sj->retry_num = 0;
557 sj->argv = zmalloc(sizeof(char*)*(argc+1));
558 sj->start_time = 0;
559 sj->pid = 0;
560 memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
561
562 listAddNodeTail(sentinel.scripts_queue,sj);
563
564 /* Remove the oldest non running script if we already hit the limit. */
565 if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
566 listNode *ln;
567 listIter li;
568
569 listRewind(sentinel.scripts_queue,&li);
570 while ((ln = listNext(&li)) != NULL) {
571 sj = ln->value;
572
573 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
574 /* The first node is the oldest as we add on tail. */
575 listDelNode(sentinel.scripts_queue,ln);
576 sentinelReleaseScriptJob(sj);
577 break;
578 }
579 redisAssert(listLength(sentinel.scripts_queue) <=
580 SENTINEL_SCRIPT_MAX_QUEUE);
581 }
582 }
583
584 /* Lookup a script in the scripts queue via pid, and returns the list node
585 * (so that we can easily remove it from the queue if needed). */
586 listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
587 listNode *ln;
588 listIter li;
589
590 listRewind(sentinel.scripts_queue,&li);
591 while ((ln = listNext(&li)) != NULL) {
592 sentinelScriptJob *sj = ln->value;
593
594 if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
595 return ln;
596 }
597 return NULL;
598 }
599
600 /* Run pending scripts if we are not already at max number of running
601 * scripts. */
602 void sentinelRunPendingScripts(void) {
603 listNode *ln;
604 listIter li;
605 mstime_t now = mstime();
606
607 /* Find jobs that are not running and run them, from the top to the
608 * tail of the queue, so we run older jobs first. */
609 listRewind(sentinel.scripts_queue,&li);
610 while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
611 (ln = listNext(&li)) != NULL)
612 {
613 sentinelScriptJob *sj = ln->value;
614 pid_t pid;
615
616 /* Skip if already running. */
617 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
618
619 /* Skip if it's a retry, but not enough time has elapsed. */
620 if (sj->start_time && sj->start_time > now) continue;
621
622 sj->flags |= SENTINEL_SCRIPT_RUNNING;
623 sj->start_time = mstime();
624 sj->retry_num++;
625 pid = fork();
626
627 if (pid == -1) {
628 /* Parent (fork error).
629 * We report fork errors as signal 99, in order to unify the
630 * reporting with other kind of errors. */
631 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
632 "%s %d %d", sj->argv[0], 99, 0);
633 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
634 sj->pid = 0;
635 } else if (pid == 0) {
636 /* Child */
637 execve(sj->argv[0],sj->argv,environ);
638 /* If we are here an error occurred. */
639 _exit(2); /* Don't retry execution. */
640 } else {
641 sentinel.running_scripts++;
642 sj->pid = pid;
643 sentinelEvent(REDIS_DEBUG,"+script-child",NULL,"%ld",(long)pid);
644 }
645 }
646 }
647
648 /* How much to delay the execution of a script that we need to retry after
649 * an error?
650 *
651 * We double the retry delay for every further retry we do. So for instance
652 * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
653 * starting from the second attempt to execute the script the delays are:
654 * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
655 mstime_t sentinelScriptRetryDelay(int retry_num) {
656 mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
657
658 while (retry_num-- > 1) delay *= 2;
659 return delay;
660 }
661
662 /* Check for scripts that terminated, and remove them from the queue if the
663 * script terminated successfully. If instead the script was terminated by
664 * a signal, or returned exit code "1", it is scheduled to run again if
665 * the max number of retries did not already elapsed. */
666 void sentinelCollectTerminatedScripts(void) {
667 int statloc;
668 pid_t pid;
669
670 while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
671 int exitcode = WEXITSTATUS(statloc);
672 int bysignal = 0;
673 listNode *ln;
674 sentinelScriptJob *sj;
675
676 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
677 sentinelEvent(REDIS_DEBUG,"-script-child",NULL,"%ld %d %d",
678 (long)pid, exitcode, bysignal);
679
680 ln = sentinelGetScriptListNodeByPid(pid);
681 if (ln == NULL) {
682 redisLog(REDIS_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
683 continue;
684 }
685 sj = ln->value;
686
687 /* If the script was terminated by a signal or returns an
688 * exit code of "1" (that means: please retry), we reschedule it
689 * if the max number of retries is not already reached. */
690 if ((bysignal || exitcode == 1) &&
691 sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
692 {
693 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
694 sj->pid = 0;
695 sj->start_time = mstime() +
696 sentinelScriptRetryDelay(sj->retry_num);
697 } else {
698 /* Otherwise let's remove the script, but log the event if the
699 * execution did not terminated in the best of the ways. */
700 if (bysignal || exitcode != 0) {
701 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
702 "%s %d %d", sj->argv[0], bysignal, exitcode);
703 }
704 listDelNode(sentinel.scripts_queue,ln);
705 sentinelReleaseScriptJob(sj);
706 sentinel.running_scripts--;
707 }
708 }
709 }
710
711 /* Kill scripts in timeout, they'll be collected by the
712 * sentinelCollectTerminatedScripts() function. */
713 void sentinelKillTimedoutScripts(void) {
714 listNode *ln;
715 listIter li;
716 mstime_t now = mstime();
717
718 listRewind(sentinel.scripts_queue,&li);
719 while ((ln = listNext(&li)) != NULL) {
720 sentinelScriptJob *sj = ln->value;
721
722 if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
723 (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
724 {
725 sentinelEvent(REDIS_WARNING,"-script-timeout",NULL,"%s %ld",
726 sj->argv[0], (long)sj->pid);
727 kill(sj->pid,SIGKILL);
728 }
729 }
730 }
731
732 /* Implements SENTINEL PENDING-SCRIPTS command. */
733 void sentinelPendingScriptsCommand(redisClient *c) {
734 listNode *ln;
735 listIter li;
736
737 addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
738 listRewind(sentinel.scripts_queue,&li);
739 while ((ln = listNext(&li)) != NULL) {
740 sentinelScriptJob *sj = ln->value;
741 int j = 0;
742
743 addReplyMultiBulkLen(c,10);
744
745 addReplyBulkCString(c,"argv");
746 while (sj->argv[j]) j++;
747 addReplyMultiBulkLen(c,j);
748 j = 0;
749 while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
750
751 addReplyBulkCString(c,"flags");
752 addReplyBulkCString(c,
753 (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
754
755 addReplyBulkCString(c,"pid");
756 addReplyBulkLongLong(c,sj->pid);
757
758 if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
759 addReplyBulkCString(c,"run-time");
760 addReplyBulkLongLong(c,mstime() - sj->start_time);
761 } else {
762 mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
763 if (delay < 0) delay = 0;
764 addReplyBulkCString(c,"run-delay");
765 addReplyBulkLongLong(c,delay);
766 }
767
768 addReplyBulkCString(c,"retry-num");
769 addReplyBulkLongLong(c,sj->retry_num);
770 }
771 }
772
773 /* This function calls, if any, the client reconfiguration script with the
774 * following parameters:
775 *
776 * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
777 *
778 * It is called every time a failover starts, ends, or is aborted.
779 *
780 * <state> is "start", "end" or "abort".
781 * <role> is either "leader" or "observer".
782 *
783 * from/to fields are respectively master -> promoted slave addresses for
784 * "start" and "end", or the reverse (promoted slave -> master) in case of
785 * "abort".
786 */
787 void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
788 char fromport[32], toport[32];
789
790 if (master->client_reconfig_script == NULL) return;
791 ll2string(fromport,sizeof(fromport),from->port);
792 ll2string(toport,sizeof(toport),to->port);
793 sentinelScheduleScriptExecution(master->client_reconfig_script,
794 master->name,
795 (role == SENTINEL_LEADER) ? "leader" : "observer",
796 state, from->ip, fromport, to->ip, toport, NULL);
797 }
798
799 /* ========================== sentinelRedisInstance ========================= */
800
801 /* Create a redis instance, the following fields must be populated by the
802 * caller if needed:
803 * runid: set to NULL but will be populated once INFO output is received.
804 * info_refresh: is set to 0 to mean that we never received INFO so far.
805 *
806 * If SRI_MASTER is set into initial flags the instance is added to
807 * sentinel.masters table.
808 *
809 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
810 * instance is added into master->slaves or master->sentinels table.
811 *
812 * If the instance is a slave or sentinel, the name parameter is ignored and
813 * is created automatically as hostname:port.
814 *
815 * The function fails if hostname can't be resolved or port is out of range.
816 * When this happens NULL is returned and errno is set accordingly to the
817 * createSentinelAddr() function.
818 *
819 * The function may also fail and return NULL with errno set to EBUSY if
820 * a master or slave with the same name already exists. */
821 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
822 sentinelRedisInstance *ri;
823 sentinelAddr *addr;
824 dict *table = NULL;
825 char slavename[128], *sdsname;
826
827 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
828 redisAssert((flags & SRI_MASTER) || master != NULL);
829
830 /* Check address validity. */
831 addr = createSentinelAddr(hostname,port);
832 if (addr == NULL) return NULL;
833
834 /* For slaves and sentinel we use ip:port as name. */
835 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
836 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
837 name = slavename;
838 }
839
840 /* Make sure the entry is not duplicated. This may happen when the same
841 * name for a master is used multiple times inside the configuration or
842 * if we try to add multiple times a slave or sentinel with same ip/port
843 * to a master. */
844 if (flags & SRI_MASTER) table = sentinel.masters;
845 else if (flags & SRI_SLAVE) table = master->slaves;
846 else if (flags & SRI_SENTINEL) table = master->sentinels;
847 sdsname = sdsnew(name);
848 if (dictFind(table,sdsname)) {
849 sdsfree(sdsname);
850 errno = EBUSY;
851 return NULL;
852 }
853
854 /* Create the instance object. */
855 ri = zmalloc(sizeof(*ri));
856 /* Note that all the instances are started in the disconnected state,
857 * the event loop will take care of connecting them. */
858 ri->flags = flags | SRI_DISCONNECTED;
859 ri->name = sdsname;
860 ri->runid = NULL;
861 ri->addr = addr;
862 ri->cc = NULL;
863 ri->pc = NULL;
864 ri->pending_commands = 0;
865 ri->cc_conn_time = 0;
866 ri->pc_conn_time = 0;
867 ri->pc_last_activity = 0;
868 ri->last_avail_time = mstime();
869 ri->last_pong_time = mstime();
870 ri->last_pub_time = mstime();
871 ri->last_hello_time = mstime();
872 ri->last_master_down_reply_time = mstime();
873 ri->s_down_since_time = 0;
874 ri->o_down_since_time = 0;
875 ri->down_after_period = master ? master->down_after_period :
876 SENTINEL_DOWN_AFTER_PERIOD;
877 ri->master_link_down_time = 0;
878 ri->auth_pass = NULL;
879 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
880 ri->slave_reconf_sent_time = 0;
881 ri->slave_master_host = NULL;
882 ri->slave_master_port = 0;
883 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
884 ri->sentinels = dictCreate(&instancesDictType,NULL);
885 ri->quorum = quorum;
886 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
887 ri->master = master;
888 ri->slaves = dictCreate(&instancesDictType,NULL);
889 ri->info_refresh = 0;
890
891 /* Failover state. */
892 ri->leader = NULL;
893 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
894 ri->failover_state_change_time = 0;
895 ri->failover_start_time = 0;
896 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
897 ri->promoted_slave = NULL;
898 ri->notification_script = NULL;
899 ri->client_reconfig_script = NULL;
900
901 /* Add into the right table. */
902 dictAdd(table, ri->name, ri);
903 return ri;
904 }
905
906 /* Release this instance and all its slaves, sentinels, hiredis connections.
907 * This function also takes care of unlinking the instance from the main
908 * masters table (if it is a master) or from its master sentinels/slaves table
909 * if it is a slave or sentinel. */
910 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
911 /* Release all its slaves or sentinels if any. */
912 dictRelease(ri->sentinels);
913 dictRelease(ri->slaves);
914
915 /* Release hiredis connections. */
916 if (ri->cc) sentinelKillLink(ri,ri->cc);
917 if (ri->pc) sentinelKillLink(ri,ri->pc);
918
919 /* Free other resources. */
920 sdsfree(ri->name);
921 sdsfree(ri->runid);
922 sdsfree(ri->notification_script);
923 sdsfree(ri->client_reconfig_script);
924 sdsfree(ri->slave_master_host);
925 sdsfree(ri->leader);
926 sdsfree(ri->auth_pass);
927 releaseSentinelAddr(ri->addr);
928
929 /* Clear state into the master if needed. */
930 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
931 ri->master->promoted_slave = NULL;
932
933 zfree(ri);
934 }
935
936 /* Lookup a slave in a master Redis instance, by ip and port. */
937 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
938 sentinelRedisInstance *ri, char *ip, int port)
939 {
940 sds key;
941 sentinelRedisInstance *slave;
942
943 redisAssert(ri->flags & SRI_MASTER);
944 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
945 slave = dictFetchValue(ri->slaves,key);
946 sdsfree(key);
947 return slave;
948 }
949
950 /* Return the name of the type of the instance as a string. */
951 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
952 if (ri->flags & SRI_MASTER) return "master";
953 else if (ri->flags & SRI_SLAVE) return "slave";
954 else if (ri->flags & SRI_SENTINEL) return "sentinel";
955 else return "unknown";
956 }
957
958 /* This function removes all the instances found in the dictionary of instances
959 * 'd', having either:
960 *
961 * 1) The same ip/port as specified.
962 * 2) The same runid.
963 *
964 * "1" and "2" don't need to verify at the same time, just one is enough.
965 * If "runid" is NULL it is not checked.
966 * Similarly if "ip" is NULL it is not checked.
967 *
968 * This function is useful because every time we add a new Sentinel into
969 * a master's Sentinels dictionary, we want to be very sure about not
970 * having duplicated instances for any reason. This is so important because
971 * we use those other sentinels in order to run our quorum protocol to
972 * understand if it's time to proceeed with the fail over.
973 *
974 * Making sure no duplication is possible we greately improve the robustness
975 * of the quorum (otherwise we may end counting the same instance multiple
976 * times for some reason).
977 *
978 * The function returns the number of Sentinels removed. */
979 int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
980 dictIterator *di;
981 dictEntry *de;
982 int removed = 0;
983
984 di = dictGetSafeIterator(master->sentinels);
985 while((de = dictNext(di)) != NULL) {
986 sentinelRedisInstance *ri = dictGetVal(de);
987
988 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
989 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
990 {
991 dictDelete(master->sentinels,ri->name);
992 removed++;
993 }
994 }
995 dictReleaseIterator(di);
996 return removed;
997 }
998
999 /* Search an instance with the same runid, ip and port into a dictionary
1000 * of instances. Return NULL if not found, otherwise return the instance
1001 * pointer.
1002 *
1003 * runid or ip can be NULL. In such a case the search is performed only
1004 * by the non-NULL field. */
1005 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
1006 dictIterator *di;
1007 dictEntry *de;
1008 sentinelRedisInstance *instance = NULL;
1009
1010 redisAssert(ip || runid); /* User must pass at least one search param. */
1011 di = dictGetIterator(instances);
1012 while((de = dictNext(di)) != NULL) {
1013 sentinelRedisInstance *ri = dictGetVal(de);
1014
1015 if (runid && !ri->runid) continue;
1016 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
1017 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
1018 ri->addr->port == port)))
1019 {
1020 instance = ri;
1021 break;
1022 }
1023 }
1024 dictReleaseIterator(di);
1025 return instance;
1026 }
1027
1028 /* Simple master lookup by name */
1029 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
1030 sentinelRedisInstance *ri;
1031 sds sdsname = sdsnew(name);
1032
1033 ri = dictFetchValue(sentinel.masters,sdsname);
1034 sdsfree(sdsname);
1035 return ri;
1036 }
1037
1038 /* Add the specified flags to all the instances in the specified dictionary. */
1039 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
1040 dictIterator *di;
1041 dictEntry *de;
1042
1043 di = dictGetIterator(instances);
1044 while((de = dictNext(di)) != NULL) {
1045 sentinelRedisInstance *ri = dictGetVal(de);
1046 ri->flags |= flags;
1047 }
1048 dictReleaseIterator(di);
1049 }
1050
1051 /* Remove the specified flags to all the instances in the specified
1052 * dictionary. */
1053 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
1054 dictIterator *di;
1055 dictEntry *de;
1056
1057 di = dictGetIterator(instances);
1058 while((de = dictNext(di)) != NULL) {
1059 sentinelRedisInstance *ri = dictGetVal(de);
1060 ri->flags &= ~flags;
1061 }
1062 dictReleaseIterator(di);
1063 }
1064
1065 /* Reset the state of a monitored master:
1066 * 1) Remove all slaves.
1067 * 2) Remove all sentinels.
1068 * 3) Remove most of the flags resulting from runtime operations.
1069 * 4) Reset timers to their default value.
1070 * 5) In the process of doing this undo the failover if in progress.
1071 * 6) Disconnect the connections with the master (will reconnect automatically).
1072 */
1073 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
1074 redisAssert(ri->flags & SRI_MASTER);
1075 dictRelease(ri->slaves);
1076 dictRelease(ri->sentinels);
1077 ri->slaves = dictCreate(&instancesDictType,NULL);
1078 ri->sentinels = dictCreate(&instancesDictType,NULL);
1079 if (ri->cc) sentinelKillLink(ri,ri->cc);
1080 if (ri->pc) sentinelKillLink(ri,ri->pc);
1081 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
1082 if (ri->leader) {
1083 sdsfree(ri->leader);
1084 ri->leader = NULL;
1085 }
1086 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1087 ri->failover_state_change_time = 0;
1088 ri->failover_start_time = 0;
1089 ri->promoted_slave = NULL;
1090 sdsfree(ri->runid);
1091 sdsfree(ri->slave_master_host);
1092 ri->runid = NULL;
1093 ri->slave_master_host = NULL;
1094 ri->last_avail_time = mstime();
1095 ri->last_pong_time = mstime();
1096 if (flags & SENTINEL_GENERATE_EVENT)
1097 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
1098 }
1099
1100 /* Call sentinelResetMaster() on every master with a name matching the specified
1101 * pattern. */
1102 int sentinelResetMastersByPattern(char *pattern, int flags) {
1103 dictIterator *di;
1104 dictEntry *de;
1105 int reset = 0;
1106
1107 di = dictGetIterator(sentinel.masters);
1108 while((de = dictNext(di)) != NULL) {
1109 sentinelRedisInstance *ri = dictGetVal(de);
1110
1111 if (ri->name) {
1112 if (stringmatch(pattern,ri->name,0)) {
1113 sentinelResetMaster(ri,flags);
1114 reset++;
1115 }
1116 }
1117 }
1118 dictReleaseIterator(di);
1119 return reset;
1120 }
1121
1122 /* Reset the specified master with sentinelResetMaster(), and also change
1123 * the ip:port address, but take the name of the instance unmodified.
1124 *
1125 * This is used to handle the +switch-master and +redirect-to-master events.
1126 *
1127 * The function returns REDIS_ERR if the address can't be resolved for some
1128 * reason. Otherwise REDIS_OK is returned.
1129 *
1130 * TODO: make this reset so that original sentinels are re-added with
1131 * same ip / port / runid.
1132 */
1133
1134 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
1135 sentinelAddr *oldaddr, *newaddr;
1136
1137 newaddr = createSentinelAddr(ip,port);
1138 if (newaddr == NULL) return REDIS_ERR;
1139 sentinelResetMaster(master,SENTINEL_NO_FLAGS);
1140 oldaddr = master->addr;
1141 master->addr = newaddr;
1142 /* Release the old address at the end so we are safe even if the function
1143 * gets the master->addr->ip and master->addr->port as arguments. */
1144 releaseSentinelAddr(oldaddr);
1145 return REDIS_OK;
1146 }
1147
1148 /* ============================ Config handling ============================= */
1149 char *sentinelHandleConfiguration(char **argv, int argc) {
1150 sentinelRedisInstance *ri;
1151
1152 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
1153 /* monitor <name> <host> <port> <quorum> */
1154 int quorum = atoi(argv[4]);
1155
1156 if (quorum <= 0) return "Quorum must be 1 or greater.";
1157 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
1158 atoi(argv[3]),quorum,NULL) == NULL)
1159 {
1160 switch(errno) {
1161 case EBUSY: return "Duplicated master name.";
1162 case ENOENT: return "Can't resolve master instance hostname.";
1163 case EINVAL: return "Invalid port number";
1164 }
1165 }
1166 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
1167 /* down-after-milliseconds <name> <milliseconds> */
1168 ri = sentinelGetMasterByName(argv[1]);
1169 if (!ri) return "No such master with specified name.";
1170 ri->down_after_period = atoi(argv[2]);
1171 if (ri->down_after_period <= 0)
1172 return "negative or zero time parameter.";
1173 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
1174 /* failover-timeout <name> <milliseconds> */
1175 ri = sentinelGetMasterByName(argv[1]);
1176 if (!ri) return "No such master with specified name.";
1177 ri->failover_timeout = atoi(argv[2]);
1178 if (ri->failover_timeout <= 0)
1179 return "negative or zero time parameter.";
1180 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
1181 /* can-failover <name> <yes/no> */
1182 int yesno = yesnotoi(argv[2]);
1183
1184 ri = sentinelGetMasterByName(argv[1]);
1185 if (!ri) return "No such master with specified name.";
1186 if (yesno == -1) return "Argument must be either yes or no.";
1187 if (yesno)
1188 ri->flags |= SRI_CAN_FAILOVER;
1189 else
1190 ri->flags &= ~SRI_CAN_FAILOVER;
1191 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
1192 /* parallel-syncs <name> <milliseconds> */
1193 ri = sentinelGetMasterByName(argv[1]);
1194 if (!ri) return "No such master with specified name.";
1195 ri->parallel_syncs = atoi(argv[2]);
1196 } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
1197 /* notification-script <name> <path> */
1198 ri = sentinelGetMasterByName(argv[1]);
1199 if (!ri) return "No such master with specified name.";
1200 if (access(argv[2],X_OK) == -1)
1201 return "Notification script seems non existing or non executable.";
1202 ri->notification_script = sdsnew(argv[2]);
1203 } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
1204 /* client-reconfig-script <name> <path> */
1205 ri = sentinelGetMasterByName(argv[1]);
1206 if (!ri) return "No such master with specified name.";
1207 if (access(argv[2],X_OK) == -1)
1208 return "Client reconfiguration script seems non existing or "
1209 "non executable.";
1210 ri->client_reconfig_script = sdsnew(argv[2]);
1211 } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
1212 /* auth-pass <name> <password> */
1213 ri = sentinelGetMasterByName(argv[1]);
1214 if (!ri) return "No such master with specified name.";
1215 ri->auth_pass = sdsnew(argv[2]);
1216 } else {
1217 return "Unrecognized sentinel configuration statement.";
1218 }
1219 return NULL;
1220 }
1221
1222 /* ====================== hiredis connection handling ======================= */
1223
1224 /* Completely disconnect an hiredis link from an instance. */
1225 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
1226 if (ri->cc == c) {
1227 ri->cc = NULL;
1228 ri->pending_commands = 0;
1229 }
1230 if (ri->pc == c) ri->pc = NULL;
1231 c->data = NULL;
1232 ri->flags |= SRI_DISCONNECTED;
1233 redisAsyncFree(c);
1234 }
1235
1236 /* This function takes an hiredis context that is in an error condition
1237 * and make sure to mark the instance as disconnected performing the
1238 * cleanup needed.
1239 *
1240 * Note: we don't free the hiredis context as hiredis will do it for us
1241 * for async conenctions. */
1242 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
1243 sentinelRedisInstance *ri = c->data;
1244 int pubsub;
1245
1246 if (ri == NULL) return; /* The instance no longer exists. */
1247
1248 pubsub = (ri->pc == c);
1249 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
1250 "%@ #%s", c->errstr);
1251 if (pubsub)
1252 ri->pc = NULL;
1253 else
1254 ri->cc = NULL;
1255 ri->flags |= SRI_DISCONNECTED;
1256 }
1257
1258 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
1259 if (status != REDIS_OK) {
1260 sentinelDisconnectInstanceFromContext(c);
1261 } else {
1262 sentinelRedisInstance *ri = c->data;
1263 int pubsub = (ri->pc == c);
1264
1265 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
1266 "%@");
1267 }
1268 }
1269
1270 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
1271 sentinelDisconnectInstanceFromContext(c);
1272 }
1273
1274 /* Send the AUTH command with the specified master password if needed.
1275 * Note that for slaves the password set for the master is used.
1276 *
1277 * We don't check at all if the command was successfully transmitted
1278 * to the instance as if it fails Sentinel will detect the instance down,
1279 * will disconnect and reconnect the link and so forth. */
1280 void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
1281 char *auth_pass = (ri->flags & SRI_MASTER) ? ri->auth_pass :
1282 ri->master->auth_pass;
1283
1284 if (auth_pass)
1285 redisAsyncCommand(c, sentinelDiscardReplyCallback, NULL, "AUTH %s",
1286 auth_pass);
1287 }
1288
1289 /* Create the async connections for the specified instance if the instance
1290 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
1291 * one of the two links (commands and pub/sub) is missing. */
1292 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
1293 if (!(ri->flags & SRI_DISCONNECTED)) return;
1294
1295 /* Commands connection. */
1296 if (ri->cc == NULL) {
1297 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1298 if (ri->cc->err) {
1299 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
1300 ri->cc->errstr);
1301 sentinelKillLink(ri,ri->cc);
1302 } else {
1303 ri->cc_conn_time = mstime();
1304 ri->cc->data = ri;
1305 redisAeAttach(server.el,ri->cc);
1306 redisAsyncSetConnectCallback(ri->cc,
1307 sentinelLinkEstablishedCallback);
1308 redisAsyncSetDisconnectCallback(ri->cc,
1309 sentinelDisconnectCallback);
1310 sentinelSendAuthIfNeeded(ri,ri->cc);
1311 }
1312 }
1313 /* Pub / Sub */
1314 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
1315 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1316 if (ri->pc->err) {
1317 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
1318 ri->pc->errstr);
1319 sentinelKillLink(ri,ri->pc);
1320 } else {
1321 int retval;
1322
1323 ri->pc_conn_time = mstime();
1324 ri->pc->data = ri;
1325 redisAeAttach(server.el,ri->pc);
1326 redisAsyncSetConnectCallback(ri->pc,
1327 sentinelLinkEstablishedCallback);
1328 redisAsyncSetDisconnectCallback(ri->pc,
1329 sentinelDisconnectCallback);
1330 sentinelSendAuthIfNeeded(ri,ri->pc);
1331 /* Now we subscribe to the Sentinels "Hello" channel. */
1332 retval = redisAsyncCommand(ri->pc,
1333 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
1334 SENTINEL_HELLO_CHANNEL);
1335 if (retval != REDIS_OK) {
1336 /* If we can't subscribe, the Pub/Sub connection is useless
1337 * and we can simply disconnect it and try again. */
1338 sentinelKillLink(ri,ri->pc);
1339 return;
1340 }
1341 }
1342 }
1343 /* Clear the DISCONNECTED flags only if we have both the connections
1344 * (or just the commands connection if this is a slave or a
1345 * sentinel instance). */
1346 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
1347 ri->flags &= ~SRI_DISCONNECTED;
1348 }
1349
1350 /* ======================== Redis instances pinging ======================== */
1351
1352 /* Process the INFO output from masters. */
1353 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
1354 sds *lines;
1355 int numlines, j;
1356 int role = 0;
1357 int runid_changed = 0; /* true if runid changed. */
1358 int first_runid = 0; /* true if this is the first runid we receive. */
1359
1360 /* The following fields must be reset to a given value in the case they
1361 * are not found at all in the INFO output. */
1362 ri->master_link_down_time = 0;
1363
1364 /* Process line by line. */
1365 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
1366 for (j = 0; j < numlines; j++) {
1367 sentinelRedisInstance *slave;
1368 sds l = lines[j];
1369
1370 /* run_id:<40 hex chars>*/
1371 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
1372 if (ri->runid == NULL) {
1373 ri->runid = sdsnewlen(l+7,40);
1374 first_runid = 1;
1375 } else {
1376 if (strncmp(ri->runid,l+7,40) != 0) {
1377 runid_changed = 1;
1378 sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
1379 sdsfree(ri->runid);
1380 ri->runid = sdsnewlen(l+7,40);
1381 }
1382 }
1383 }
1384
1385 /* slave0:<ip>,<port>,<state> */
1386 if ((ri->flags & SRI_MASTER) &&
1387 sdslen(l) >= 7 &&
1388 !memcmp(l,"slave",5) && isdigit(l[5]))
1389 {
1390 char *ip, *port, *end;
1391
1392 ip = strchr(l,':'); if (!ip) continue;
1393 ip++; /* Now ip points to start of ip address. */
1394 port = strchr(ip,','); if (!port) continue;
1395 *port = '\0'; /* nul term for easy access. */
1396 port++; /* Now port points to start of port number. */
1397 end = strchr(port,','); if (!end) continue;
1398 *end = '\0'; /* nul term for easy access. */
1399
1400 /* Check if we already have this slave into our table,
1401 * otherwise add it. */
1402 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1403 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1404 atoi(port), ri->quorum,ri)) != NULL)
1405 {
1406 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1407 }
1408 }
1409 }
1410
1411 /* master_link_down_since_seconds:<seconds> */
1412 if (sdslen(l) >= 32 &&
1413 !memcmp(l,"master_link_down_since_seconds",30))
1414 {
1415 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1416 }
1417
1418 /* role:<role> */
1419 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1420 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1421
1422 if (role == SRI_SLAVE) {
1423 /* master_host:<host> */
1424 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1425 sdsfree(ri->slave_master_host);
1426 ri->slave_master_host = sdsnew(l+12);
1427 }
1428
1429 /* master_port:<port> */
1430 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1431 ri->slave_master_port = atoi(l+12);
1432
1433 /* master_link_status:<status> */
1434 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1435 ri->slave_master_link_status =
1436 (strcasecmp(l+19,"up") == 0) ?
1437 SENTINEL_MASTER_LINK_STATUS_UP :
1438 SENTINEL_MASTER_LINK_STATUS_DOWN;
1439 }
1440
1441 /* slave_priority:<priority> */
1442 if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
1443 ri->slave_priority = atoi(l+15);
1444 }
1445 }
1446 ri->info_refresh = mstime();
1447 sdsfreesplitres(lines,numlines);
1448
1449 /* ---------------------------- Acting half ----------------------------- */
1450 if (sentinel.tilt) return;
1451
1452 /* Act if a master turned into a slave. */
1453 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
1454 if ((first_runid || runid_changed) && ri->slave_master_host) {
1455 /* If it is the first time we receive INFO from it, but it's
1456 * a slave while it was configured as a master, we want to monitor
1457 * its master instead. */
1458 sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
1459 "%s %s %d %s %d",
1460 ri->name, ri->addr->ip, ri->addr->port,
1461 ri->slave_master_host, ri->slave_master_port);
1462 sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
1463 ri->slave_master_port);
1464 return;
1465 }
1466 }
1467
1468 /* Act if a slave turned into a master. */
1469 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
1470 if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1471 (runid_changed || first_runid))
1472 {
1473 /* If a slave turned into master but:
1474 *
1475 * 1) Failover not in progress.
1476 * 2) RunID hs changed, or its the first time we see an INFO output.
1477 *
1478 * We assume this is a reboot with a wrong configuration.
1479 * Log the event and remove the slave. */
1480 int retval;
1481
1482 sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
1483 retval = dictDelete(ri->master->slaves,ri->name);
1484 redisAssert(retval == REDIS_OK);
1485 return;
1486 } else if (ri->flags & SRI_PROMOTED) {
1487 /* If this is a promoted slave we can change state to the
1488 * failover state machine. */
1489 if ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1490 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1491 (ri->master->failover_state ==
1492 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1493 {
1494 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1495 ri->master->failover_state_change_time = mstime();
1496 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1497 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1498 ri->master,"%@");
1499 sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
1500 "start",ri->master->addr,ri->addr);
1501 }
1502 } else if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) ||
1503 ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1504 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1505 ri->master->failover_state ==
1506 SENTINEL_FAILOVER_STATE_WAIT_START))
1507 {
1508 /* No failover in progress? Then it is the start of a failover
1509 * and we are an observer.
1510 *
1511 * We also do that if we are a leader doing a failover, in wait
1512 * start, but well, somebody else started before us. */
1513
1514 if (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) {
1515 sentinelEvent(REDIS_WARNING,"-failover-abort-race",
1516 ri->master, "%@");
1517 sentinelAbortFailover(ri->master);
1518 }
1519
1520 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1521 sentinelEvent(REDIS_WARNING,"+failover-detected",ri->master,"%@");
1522 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1523 ri->master->failover_state_change_time = mstime();
1524 ri->master->promoted_slave = ri;
1525 ri->flags |= SRI_PROMOTED;
1526 sentinelCallClientReconfScript(ri->master,SENTINEL_OBSERVER,
1527 "start", ri->master->addr,ri->addr);
1528 /* We are an observer, so we can only assume that the leader
1529 * is reconfiguring the slave instances. For this reason we
1530 * set all the instances as RECONF_SENT waiting for progresses
1531 * on this side. */
1532 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1533 SRI_RECONF_SENT);
1534 }
1535 }
1536
1537 /* Detect if the slave that is in the process of being reconfigured
1538 * changed state. */
1539 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1540 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1541 {
1542 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1543 if ((ri->flags & SRI_RECONF_SENT) &&
1544 ri->slave_master_host &&
1545 strcmp(ri->slave_master_host,
1546 ri->master->promoted_slave->addr->ip) == 0 &&
1547 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1548 {
1549 ri->flags &= ~SRI_RECONF_SENT;
1550 ri->flags |= SRI_RECONF_INPROG;
1551 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1552 }
1553
1554 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1555 if ((ri->flags & SRI_RECONF_INPROG) &&
1556 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1557 {
1558 ri->flags &= ~SRI_RECONF_INPROG;
1559 ri->flags |= SRI_RECONF_DONE;
1560 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1561 /* If we are moving forward (a new slave is now configured)
1562 * we update the change_time as we are conceptually passing
1563 * to the next slave. */
1564 ri->failover_state_change_time = mstime();
1565 }
1566 }
1567 }
1568
1569 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1570 sentinelRedisInstance *ri = c->data;
1571 redisReply *r;
1572
1573 if (ri) ri->pending_commands--;
1574 if (!reply || !ri) return;
1575 r = reply;
1576
1577 if (r->type == REDIS_REPLY_STRING) {
1578 sentinelRefreshInstanceInfo(ri,r->str);
1579 }
1580 }
1581
1582 /* Just discard the reply. We use this when we are not monitoring the return
1583 * value of the command but its effects directly. */
1584 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1585 sentinelRedisInstance *ri = c->data;
1586
1587 if (ri) ri->pending_commands--;
1588 }
1589
1590 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1591 sentinelRedisInstance *ri = c->data;
1592 redisReply *r;
1593
1594 if (ri) ri->pending_commands--;
1595 if (!reply || !ri) return;
1596 r = reply;
1597
1598 if (r->type == REDIS_REPLY_STATUS ||
1599 r->type == REDIS_REPLY_ERROR) {
1600 /* Update the "instance available" field only if this is an
1601 * acceptable reply. */
1602 if (strncmp(r->str,"PONG",4) == 0 ||
1603 strncmp(r->str,"LOADING",7) == 0 ||
1604 strncmp(r->str,"MASTERDOWN",10) == 0)
1605 {
1606 ri->last_avail_time = mstime();
1607 } else {
1608 /* Send a SCRIPT KILL command if the instance appears to be
1609 * down because of a busy script. */
1610 if (strncmp(r->str,"BUSY",4) == 0 &&
1611 (ri->flags & SRI_S_DOWN) &&
1612 !(ri->flags & SRI_SCRIPT_KILL_SENT))
1613 {
1614 redisAsyncCommand(ri->cc,
1615 sentinelDiscardReplyCallback, NULL, "SCRIPT KILL");
1616 ri->flags |= SRI_SCRIPT_KILL_SENT;
1617 }
1618 }
1619 }
1620 ri->last_pong_time = mstime();
1621 }
1622
1623 /* This is called when we get the reply about the PUBLISH command we send
1624 * to the master to advertise this sentinel. */
1625 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1626 sentinelRedisInstance *ri = c->data;
1627 redisReply *r;
1628
1629 if (ri) ri->pending_commands--;
1630 if (!reply || !ri) return;
1631 r = reply;
1632
1633 /* Only update pub_time if we actually published our message. Otherwise
1634 * we'll retry against in 100 milliseconds. */
1635 if (r->type != REDIS_REPLY_ERROR)
1636 ri->last_pub_time = mstime();
1637 }
1638
1639 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
1640 * to discover other sentinels attached at the same master. */
1641 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1642 sentinelRedisInstance *ri = c->data;
1643 redisReply *r;
1644
1645 if (!reply || !ri) return;
1646 r = reply;
1647
1648 /* Update the last activity in the pubsub channel. Note that since we
1649 * receive our messages as well this timestamp can be used to detect
1650 * if the link is probably diconnected even if it seems otherwise. */
1651 ri->pc_last_activity = mstime();
1652
1653 /* Sanity check in the reply we expect, so that the code that follows
1654 * can avoid to check for details. */
1655 if (r->type != REDIS_REPLY_ARRAY ||
1656 r->elements != 3 ||
1657 r->element[0]->type != REDIS_REPLY_STRING ||
1658 r->element[1]->type != REDIS_REPLY_STRING ||
1659 r->element[2]->type != REDIS_REPLY_STRING ||
1660 strcmp(r->element[0]->str,"message") != 0) return;
1661
1662 /* We are not interested in meeting ourselves */
1663 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1664
1665 {
1666 int numtokens, port, removed, canfailover;
1667 char **token = sdssplitlen(r->element[2]->str,
1668 r->element[2]->len,
1669 ":",1,&numtokens);
1670 sentinelRedisInstance *sentinel;
1671
1672 if (numtokens == 4) {
1673 /* First, try to see if we already have this sentinel. */
1674 port = atoi(token[1]);
1675 canfailover = atoi(token[3]);
1676 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1677 ri->sentinels,token[0],port,token[2]);
1678
1679 if (!sentinel) {
1680 /* If not, remove all the sentinels that have the same runid
1681 * OR the same ip/port, because it's either a restart or a
1682 * network topology change. */
1683 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1684 token[2]);
1685 if (removed) {
1686 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1687 "%@ #duplicate of %s:%d or %s",
1688 token[0],port,token[2]);
1689 }
1690
1691 /* Add the new sentinel. */
1692 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1693 token[0],port,ri->quorum,ri);
1694 if (sentinel) {
1695 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1696 /* The runid is NULL after a new instance creation and
1697 * for Sentinels we don't have a later chance to fill it,
1698 * so do it now. */
1699 sentinel->runid = sdsnew(token[2]);
1700 }
1701 }
1702
1703 /* Update the state of the Sentinel. */
1704 if (sentinel) {
1705 sentinel->last_hello_time = mstime();
1706 if (canfailover)
1707 sentinel->flags |= SRI_CAN_FAILOVER;
1708 else
1709 sentinel->flags &= ~SRI_CAN_FAILOVER;
1710 }
1711 }
1712 sdsfreesplitres(token,numtokens);
1713 }
1714 }
1715
1716 void sentinelPingInstance(sentinelRedisInstance *ri) {
1717 mstime_t now = mstime();
1718 mstime_t info_period;
1719 int retval;
1720
1721 /* Return ASAP if we have already a PING or INFO already pending, or
1722 * in the case the instance is not properly connected. */
1723 if (ri->flags & SRI_DISCONNECTED) return;
1724
1725 /* For INFO, PING, PUBLISH that are not critical commands to send we
1726 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1727 * want to use a lot of memory just because a link is not working
1728 * properly (note that anyway there is a redundant protection about this,
1729 * that is, the link will be disconnected and reconnected if a long
1730 * timeout condition is detected. */
1731 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1732
1733 /* If this is a slave of a master in O_DOWN condition we start sending
1734 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1735 * period. In this state we want to closely monitor slaves in case they
1736 * are turned into masters by another Sentinel, or by the sysadmin. */
1737 if ((ri->flags & SRI_SLAVE) &&
1738 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1739 info_period = 1000;
1740 } else {
1741 info_period = SENTINEL_INFO_PERIOD;
1742 }
1743
1744 if ((ri->flags & SRI_SENTINEL) == 0 &&
1745 (ri->info_refresh == 0 ||
1746 (now - ri->info_refresh) > info_period))
1747 {
1748 /* Send INFO to masters and slaves, not sentinels. */
1749 retval = redisAsyncCommand(ri->cc,
1750 sentinelInfoReplyCallback, NULL, "INFO");
1751 if (retval != REDIS_OK) return;
1752 ri->pending_commands++;
1753 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1754 /* Send PING to all the three kinds of instances. */
1755 retval = redisAsyncCommand(ri->cc,
1756 sentinelPingReplyCallback, NULL, "PING");
1757 if (retval != REDIS_OK) return;
1758 ri->pending_commands++;
1759 } else if ((ri->flags & SRI_MASTER) &&
1760 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1761 {
1762 /* PUBLISH hello messages only to masters. */
1763 struct sockaddr_in sa;
1764 socklen_t salen = sizeof(sa);
1765
1766 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1767 char myaddr[128];
1768
1769 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1770 inet_ntoa(sa.sin_addr), server.port, server.runid,
1771 (ri->flags & SRI_CAN_FAILOVER) != 0);
1772 retval = redisAsyncCommand(ri->cc,
1773 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1774 SENTINEL_HELLO_CHANNEL,myaddr);
1775 if (retval != REDIS_OK) return;
1776 ri->pending_commands++;
1777 }
1778 }
1779 }
1780
1781 /* =========================== SENTINEL command ============================= */
1782
1783 const char *sentinelFailoverStateStr(int state) {
1784 switch(state) {
1785 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1786 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1787 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1788 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1789 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1790 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1791 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1792 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1793 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1794 default: return "unknown";
1795 }
1796 }
1797
1798 /* Redis instance to Redis protocol representation. */
1799 void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1800 char *flags = sdsempty();
1801 void *mbl;
1802 int fields = 0;
1803
1804 mbl = addDeferredMultiBulkLength(c);
1805
1806 addReplyBulkCString(c,"name");
1807 addReplyBulkCString(c,ri->name);
1808 fields++;
1809
1810 addReplyBulkCString(c,"ip");
1811 addReplyBulkCString(c,ri->addr->ip);
1812 fields++;
1813
1814 addReplyBulkCString(c,"port");
1815 addReplyBulkLongLong(c,ri->addr->port);
1816 fields++;
1817
1818 addReplyBulkCString(c,"runid");
1819 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1820 fields++;
1821
1822 addReplyBulkCString(c,"flags");
1823 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1824 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1825 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1826 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1827 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1828 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1829 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1830 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1831 flags = sdscat(flags,"failover_in_progress,");
1832 if (ri->flags & SRI_I_AM_THE_LEADER)
1833 flags = sdscat(flags,"i_am_the_leader,");
1834 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1835 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1836 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1837 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1838
1839 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1840 addReplyBulkCString(c,flags);
1841 sdsfree(flags);
1842 fields++;
1843
1844 addReplyBulkCString(c,"pending-commands");
1845 addReplyBulkLongLong(c,ri->pending_commands);
1846 fields++;
1847
1848 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1849 addReplyBulkCString(c,"failover-state");
1850 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1851 fields++;
1852 }
1853
1854 addReplyBulkCString(c,"last-ok-ping-reply");
1855 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1856 fields++;
1857
1858 addReplyBulkCString(c,"last-ping-reply");
1859 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1860 fields++;
1861
1862 if (ri->flags & SRI_S_DOWN) {
1863 addReplyBulkCString(c,"s-down-time");
1864 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1865 fields++;
1866 }
1867
1868 if (ri->flags & SRI_O_DOWN) {
1869 addReplyBulkCString(c,"o-down-time");
1870 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1871 fields++;
1872 }
1873
1874 /* Masters and Slaves */
1875 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1876 addReplyBulkCString(c,"info-refresh");
1877 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1878 fields++;
1879 }
1880
1881 /* Only masters */
1882 if (ri->flags & SRI_MASTER) {
1883 addReplyBulkCString(c,"num-slaves");
1884 addReplyBulkLongLong(c,dictSize(ri->slaves));
1885 fields++;
1886
1887 addReplyBulkCString(c,"num-other-sentinels");
1888 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1889 fields++;
1890
1891 addReplyBulkCString(c,"quorum");
1892 addReplyBulkLongLong(c,ri->quorum);
1893 fields++;
1894 }
1895
1896 /* Only slaves */
1897 if (ri->flags & SRI_SLAVE) {
1898 addReplyBulkCString(c,"master-link-down-time");
1899 addReplyBulkLongLong(c,ri->master_link_down_time);
1900 fields++;
1901
1902 addReplyBulkCString(c,"master-link-status");
1903 addReplyBulkCString(c,
1904 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1905 "ok" : "err");
1906 fields++;
1907
1908 addReplyBulkCString(c,"master-host");
1909 addReplyBulkCString(c,
1910 ri->slave_master_host ? ri->slave_master_host : "?");
1911 fields++;
1912
1913 addReplyBulkCString(c,"master-port");
1914 addReplyBulkLongLong(c,ri->slave_master_port);
1915 fields++;
1916
1917 addReplyBulkCString(c,"slave-priority");
1918 addReplyBulkLongLong(c,ri->slave_priority);
1919 fields++;
1920 }
1921
1922 /* Only sentinels */
1923 if (ri->flags & SRI_SENTINEL) {
1924 addReplyBulkCString(c,"last-hello-message");
1925 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1926 fields++;
1927
1928 addReplyBulkCString(c,"can-failover-its-master");
1929 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1930 fields++;
1931
1932 if (ri->flags & SRI_MASTER_DOWN) {
1933 addReplyBulkCString(c,"subjective-leader");
1934 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1935 fields++;
1936 }
1937 }
1938
1939 setDeferredMultiBulkLength(c,mbl,fields*2);
1940 }
1941
1942 /* Output a number of instances contanined inside a dictionary as
1943 * Redis protocol. */
1944 void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1945 dictIterator *di;
1946 dictEntry *de;
1947
1948 di = dictGetIterator(instances);
1949 addReplyMultiBulkLen(c,dictSize(instances));
1950 while((de = dictNext(di)) != NULL) {
1951 sentinelRedisInstance *ri = dictGetVal(de);
1952
1953 addReplySentinelRedisInstance(c,ri);
1954 }
1955 dictReleaseIterator(di);
1956 }
1957
1958 /* Lookup the named master into sentinel.masters.
1959 * If the master is not found reply to the client with an error and returns
1960 * NULL. */
1961 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1962 robj *name)
1963 {
1964 sentinelRedisInstance *ri;
1965
1966 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1967 if (!ri) {
1968 addReplyError(c,"No such master with that name");
1969 return NULL;
1970 }
1971 return ri;
1972 }
1973
1974 void sentinelCommand(redisClient *c) {
1975 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1976 /* SENTINEL MASTERS */
1977 if (c->argc != 2) goto numargserr;
1978
1979 addReplyDictOfRedisInstances(c,sentinel.masters);
1980 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1981 /* SENTINEL SLAVES <master-name> */
1982 sentinelRedisInstance *ri;
1983
1984 if (c->argc != 3) goto numargserr;
1985 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1986 return;
1987 addReplyDictOfRedisInstances(c,ri->slaves);
1988 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1989 /* SENTINEL SENTINELS <master-name> */
1990 sentinelRedisInstance *ri;
1991
1992 if (c->argc != 3) goto numargserr;
1993 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1994 return;
1995 addReplyDictOfRedisInstances(c,ri->sentinels);
1996 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1997 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1998 sentinelRedisInstance *ri;
1999 char *leader = NULL;
2000 long port;
2001 int isdown = 0;
2002
2003 if (c->argc != 4) goto numargserr;
2004 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
2005 return;
2006 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
2007 c->argv[2]->ptr,port,NULL);
2008
2009 /* It exists? Is actually a master? Is subjectively down? It's down.
2010 * Note: if we are in tilt mode we always reply with "0". */
2011 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
2012 (ri->flags & SRI_MASTER))
2013 isdown = 1;
2014 if (ri) leader = sentinelGetSubjectiveLeader(ri);
2015
2016 /* Reply with a two-elements multi-bulk reply: down state, leader. */
2017 addReplyMultiBulkLen(c,2);
2018 addReply(c, isdown ? shared.cone : shared.czero);
2019 addReplyBulkCString(c, leader ? leader : "?");
2020 if (leader) sdsfree(leader);
2021 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
2022 /* SENTINEL RESET <pattern> */
2023 if (c->argc != 3) goto numargserr;
2024 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
2025 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
2026 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
2027 sentinelRedisInstance *ri;
2028
2029 if (c->argc != 3) goto numargserr;
2030 ri = sentinelGetMasterByName(c->argv[2]->ptr);
2031 if (ri == NULL) {
2032 addReply(c,shared.nullmultibulk);
2033 } else if (ri->info_refresh == 0) {
2034 addReplySds(c,sdsnew("-IDONTKNOW I have not enough information to reply. Please ask another Sentinel.\r\n"));
2035 } else {
2036 sentinelAddr *addr = ri->addr;
2037
2038 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
2039 addr = ri->promoted_slave->addr;
2040 addReplyMultiBulkLen(c,2);
2041 addReplyBulkCString(c,addr->ip);
2042 addReplyBulkLongLong(c,addr->port);
2043 }
2044 } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
2045 /* SENTINEL FAILOVER <master-name> */
2046 sentinelRedisInstance *ri;
2047
2048 if (c->argc != 3) goto numargserr;
2049 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
2050 return;
2051 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
2052 addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
2053 return;
2054 }
2055 if (sentinelSelectSlave(ri) == NULL) {
2056 addReplySds(c,sdsnew("-NOGOODSLAVE No suitable slave to promote\r\n"));
2057 return;
2058 }
2059 sentinelStartFailover(ri,SENTINEL_FAILOVER_STATE_WAIT_START);
2060 ri->flags |= SRI_FORCE_FAILOVER;
2061 addReply(c,shared.ok);
2062 } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
2063 /* SENTINEL PENDING-SCRIPTS */
2064
2065 if (c->argc != 2) goto numargserr;
2066 sentinelPendingScriptsCommand(c);
2067 } else {
2068 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
2069 (char*)c->argv[1]->ptr);
2070 }
2071 return;
2072
2073 numargserr:
2074 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
2075 (char*)c->argv[1]->ptr);
2076 }
2077
2078 void sentinelInfoCommand(redisClient *c) {
2079 char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
2080 sds info = sdsempty();
2081 int defsections = !strcasecmp(section,"default");
2082 int sections = 0;
2083
2084 if (c->argc > 2) {
2085 addReply(c,shared.syntaxerr);
2086 return;
2087 }
2088
2089 if (!strcasecmp(section,"server") || defsections) {
2090 if (sections++) info = sdscat(info,"\r\n");
2091 sds serversection = genRedisInfoString("server");
2092 info = sdscatlen(info,serversection,sdslen(serversection));
2093 sdsfree(serversection);
2094 }
2095
2096 if (!strcasecmp(section,"sentinel") || defsections) {
2097 dictIterator *di;
2098 dictEntry *de;
2099 int master_id = 0;
2100
2101 if (sections++) info = sdscat(info,"\r\n");
2102 info = sdscatprintf(info,
2103 "# Sentinel\r\n"
2104 "sentinel_masters:%lu\r\n"
2105 "sentinel_tilt:%d\r\n"
2106 "sentinel_running_scripts:%d\r\n"
2107 "sentinel_scripts_queue_length:%ld\r\n",
2108 dictSize(sentinel.masters),
2109 sentinel.tilt,
2110 sentinel.running_scripts,
2111 listLength(sentinel.scripts_queue));
2112
2113 di = dictGetIterator(sentinel.masters);
2114 while((de = dictNext(di)) != NULL) {
2115 sentinelRedisInstance *ri = dictGetVal(de);
2116 char *status = "ok";
2117
2118 if (ri->flags & SRI_O_DOWN) status = "odown";
2119 else if (ri->flags & SRI_S_DOWN) status = "sdown";
2120 info = sdscatprintf(info,
2121 "master%d:name=%s,status=%s,address=%s:%d,"
2122 "slaves=%lu,sentinels=%lu\r\n",
2123 master_id++, ri->name, status,
2124 ri->addr->ip, ri->addr->port,
2125 dictSize(ri->slaves),
2126 dictSize(ri->sentinels)+1);
2127 }
2128 dictReleaseIterator(di);
2129 }
2130
2131 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
2132 (unsigned long)sdslen(info)));
2133 addReplySds(c,info);
2134 addReply(c,shared.crlf);
2135 }
2136
2137 /* ===================== SENTINEL availability checks ======================= */
2138
2139 /* Is this instance down from our point of view? */
2140 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
2141 mstime_t elapsed = mstime() - ri->last_avail_time;
2142
2143 /* Check if we are in need for a reconnection of one of the
2144 * links, because we are detecting low activity.
2145 *
2146 * 1) Check if the command link seems connected, was connected not less
2147 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
2148 * idle time that is greater than down_after_period / 2 seconds. */
2149 if (ri->cc &&
2150 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
2151 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
2152 {
2153 sentinelKillLink(ri,ri->cc);
2154 }
2155
2156 /* 2) Check if the pubsub link seems connected, was connected not less
2157 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
2158 * activity in the Pub/Sub channel for more than
2159 * SENTINEL_PUBLISH_PERIOD * 3.
2160 */
2161 if (ri->pc &&
2162 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
2163 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
2164 {
2165 sentinelKillLink(ri,ri->pc);
2166 }
2167
2168 /* Update the subjectively down flag. */
2169 if (elapsed > ri->down_after_period) {
2170 /* Is subjectively down */
2171 if ((ri->flags & SRI_S_DOWN) == 0) {
2172 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
2173 ri->s_down_since_time = mstime();
2174 ri->flags |= SRI_S_DOWN;
2175 }
2176 } else {
2177 /* Is subjectively up */
2178 if (ri->flags & SRI_S_DOWN) {
2179 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
2180 ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
2181 }
2182 }
2183 }
2184
2185 /* Is this instance down accordingly to the configured quorum? */
2186 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
2187 dictIterator *di;
2188 dictEntry *de;
2189 int quorum = 0, odown = 0;
2190
2191 if (master->flags & SRI_S_DOWN) {
2192 /* Is down for enough sentinels? */
2193 quorum = 1; /* the current sentinel. */
2194 /* Count all the other sentinels. */
2195 di = dictGetIterator(master->sentinels);
2196 while((de = dictNext(di)) != NULL) {
2197 sentinelRedisInstance *ri = dictGetVal(de);
2198
2199 if (ri->flags & SRI_MASTER_DOWN) quorum++;
2200 }
2201 dictReleaseIterator(di);
2202 if (quorum >= master->quorum) odown = 1;
2203 }
2204
2205 /* Set the flag accordingly to the outcome. */
2206 if (odown) {
2207 if ((master->flags & SRI_O_DOWN) == 0) {
2208 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
2209 quorum, master->quorum);
2210 master->flags |= SRI_O_DOWN;
2211 master->o_down_since_time = mstime();
2212 }
2213 } else {
2214 if (master->flags & SRI_O_DOWN) {
2215 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
2216 master->flags &= ~SRI_O_DOWN;
2217 }
2218 }
2219 }
2220
2221 /* Receive the SENTINEL is-master-down-by-addr reply, see the
2222 * sentinelAskMasterStateToOtherSentinels() function for more information. */
2223 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
2224 sentinelRedisInstance *ri = c->data;
2225 redisReply *r;
2226
2227 if (ri) ri->pending_commands--;
2228 if (!reply || !ri) return;
2229 r = reply;
2230
2231 /* Ignore every error or unexpected reply.
2232 * Note that if the command returns an error for any reason we'll
2233 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
2234 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
2235 r->element[0]->type == REDIS_REPLY_INTEGER &&
2236 r->element[1]->type == REDIS_REPLY_STRING)
2237 {
2238 ri->last_master_down_reply_time = mstime();
2239 if (r->element[0]->integer == 1) {
2240 ri->flags |= SRI_MASTER_DOWN;
2241 } else {
2242 ri->flags &= ~SRI_MASTER_DOWN;
2243 }
2244 sdsfree(ri->leader);
2245 ri->leader = sdsnew(r->element[1]->str);
2246 }
2247 }
2248
2249 /* If we think (subjectively) the master is down, we start sending
2250 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
2251 * in order to get the replies that allow to reach the quorum and
2252 * possibly also mark the master as objectively down. */
2253 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
2254 dictIterator *di;
2255 dictEntry *de;
2256
2257 di = dictGetIterator(master->sentinels);
2258 while((de = dictNext(di)) != NULL) {
2259 sentinelRedisInstance *ri = dictGetVal(de);
2260 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
2261 char port[32];
2262 int retval;
2263
2264 /* If the master state from other sentinel is too old, we clear it. */
2265 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
2266 ri->flags &= ~SRI_MASTER_DOWN;
2267 sdsfree(ri->leader);
2268 ri->leader = NULL;
2269 }
2270
2271 /* Only ask if master is down to other sentinels if:
2272 *
2273 * 1) We believe it is down, or there is a failover in progress.
2274 * 2) Sentinel is connected.
2275 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
2276 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
2277 continue;
2278 if (ri->flags & SRI_DISCONNECTED) continue;
2279 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
2280 continue;
2281
2282 /* Ask */
2283 ll2string(port,sizeof(port),master->addr->port);
2284 retval = redisAsyncCommand(ri->cc,
2285 sentinelReceiveIsMasterDownReply, NULL,
2286 "SENTINEL is-master-down-by-addr %s %s",
2287 master->addr->ip, port);
2288 if (retval == REDIS_OK) ri->pending_commands++;
2289 }
2290 dictReleaseIterator(di);
2291 }
2292
2293 /* =============================== FAILOVER ================================= */
2294
2295 /* Given a master get the "subjective leader", that is, among all the sentinels
2296 * with given characteristics, the one with the lexicographically smaller
2297 * runid. The characteristics required are:
2298 *
2299 * 1) Has SRI_CAN_FAILOVER flag.
2300 * 2) Is not disconnected.
2301 * 3) Recently answered to our ping (no longer than
2302 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
2303 *
2304 * The function returns a pointer to an sds string representing the runid of the
2305 * leader sentinel instance (from our point of view). Otherwise NULL is
2306 * returned if there are no suitable sentinels.
2307 */
2308
2309 int compareRunID(const void *a, const void *b) {
2310 char **aptrptr = (char**)a, **bptrptr = (char**)b;
2311 return strcasecmp(*aptrptr, *bptrptr);
2312 }
2313
2314 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
2315 dictIterator *di;
2316 dictEntry *de;
2317 char **instance =
2318 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
2319 int instances = 0;
2320 char *leader = NULL;
2321
2322 if (master->flags & SRI_CAN_FAILOVER) {
2323 /* Add myself if I'm a Sentinel that can failover this master. */
2324 instance[instances++] = server.runid;
2325 }
2326
2327 di = dictGetIterator(master->sentinels);
2328 while((de = dictNext(di)) != NULL) {
2329 sentinelRedisInstance *ri = dictGetVal(de);
2330 mstime_t lag = mstime() - ri->last_avail_time;
2331
2332 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
2333 !(ri->flags & SRI_CAN_FAILOVER) ||
2334 (ri->flags & SRI_DISCONNECTED) ||
2335 ri->runid == NULL)
2336 continue;
2337 instance[instances++] = ri->runid;
2338 }
2339 dictReleaseIterator(di);
2340
2341 /* If we have at least one instance passing our checks, order the array
2342 * by runid. */
2343 if (instances) {
2344 qsort(instance,instances,sizeof(char*),compareRunID);
2345 leader = sdsnew(instance[0]);
2346 }
2347 zfree(instance);
2348 return leader;
2349 }
2350
2351 struct sentinelLeader {
2352 char *runid;
2353 unsigned long votes;
2354 };
2355
2356 /* Helper function for sentinelGetObjectiveLeader, increment the counter
2357 * relative to the specified runid. */
2358 void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
2359 dictEntry *de = dictFind(counters,runid);
2360 uint64_t oldval;
2361
2362 if (de) {
2363 oldval = dictGetUnsignedIntegerVal(de);
2364 dictSetUnsignedIntegerVal(de,oldval+1);
2365 } else {
2366 de = dictAddRaw(counters,runid);
2367 redisAssert(de != NULL);
2368 dictSetUnsignedIntegerVal(de,1);
2369 }
2370 }
2371
2372 /* Scan all the Sentinels attached to this master to check what is the
2373 * most voted leader among Sentinels. */
2374 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
2375 dict *counters;
2376 dictIterator *di;
2377 dictEntry *de;
2378 unsigned int voters = 0, voters_quorum;
2379 char *myvote;
2380 char *winner = NULL;
2381
2382 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
2383 counters = dictCreate(&leaderVotesDictType,NULL);
2384
2385 /* Count my vote. */
2386 myvote = sentinelGetSubjectiveLeader(master);
2387 if (myvote) {
2388 sentinelObjectiveLeaderIncr(counters,myvote);
2389 voters++;
2390 }
2391
2392 /* Count other sentinels votes */
2393 di = dictGetIterator(master->sentinels);
2394 while((de = dictNext(di)) != NULL) {
2395 sentinelRedisInstance *ri = dictGetVal(de);
2396 if (ri->leader == NULL) continue;
2397 /* If the failover is not already in progress we are only interested
2398 * in Sentinels that believe the master is down. Otherwise the leader
2399 * selection is useful for the "failover-takedown" when the original
2400 * leader fails. In that case we consider all the voters. */
2401 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
2402 !(ri->flags & SRI_MASTER_DOWN)) continue;
2403 sentinelObjectiveLeaderIncr(counters,ri->leader);
2404 voters++;
2405 }
2406 dictReleaseIterator(di);
2407 voters_quorum = voters/2+1;
2408
2409 /* Check what's the winner. For the winner to win, it needs two conditions:
2410 * 1) Absolute majority between voters (50% + 1).
2411 * 2) And anyway at least master->quorum votes. */
2412 {
2413 uint64_t max_votes = 0; /* Max votes so far. */
2414
2415 di = dictGetIterator(counters);
2416 while((de = dictNext(di)) != NULL) {
2417 uint64_t votes = dictGetUnsignedIntegerVal(de);
2418
2419 if (max_votes < votes) {
2420 max_votes = votes;
2421 winner = dictGetKey(de);
2422 }
2423 }
2424 dictReleaseIterator(di);
2425 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
2426 winner = NULL;
2427 }
2428 winner = winner ? sdsnew(winner) : NULL;
2429 sdsfree(myvote);
2430 dictRelease(counters);
2431 return winner;
2432 }
2433
2434 /* Setup the master state to start a failover as a leader.
2435 *
2436 * State can be either:
2437 *
2438 * SENTINEL_FAILOVER_STATE_WAIT_START: starts a failover from scratch.
2439 * SENTINEL_FAILOVER_STATE_RECONF_SLAVES: takedown a failed failover.
2440 */
2441 void sentinelStartFailover(sentinelRedisInstance *master, int state) {
2442 redisAssert(master->flags & SRI_MASTER);
2443 redisAssert(state == SENTINEL_FAILOVER_STATE_WAIT_START ||
2444 state == SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
2445
2446 master->failover_state = state;
2447 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
2448 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
2449
2450 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
2451 * a recovery of a failover started by another sentinel. */
2452 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
2453 master->failover_start_time = mstime() +
2454 SENTINEL_FAILOVER_FIXED_DELAY +
2455 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
2456 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
2457 "%@ #starting in %lld milliseconds",
2458 master->failover_start_time-mstime());
2459 }
2460 master->failover_state_change_time = mstime();
2461 }
2462
2463 /* This function checks if there are the conditions to start the failover,
2464 * that is:
2465 *
2466 * 1) Enough time has passed since O_DOWN.
2467 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
2468 * 3) We are the objectively leader for this master.
2469 *
2470 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
2471 * and SRI_I_AM_THE_LEADER.
2472 */
2473 void sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
2474 char *leader;
2475 int isleader;
2476
2477 /* We can't failover if the master is not in O_DOWN state or if
2478 * there is not already a failover in progress (to perform the
2479 * takedown if the leader died) or if this Sentinel is not allowed
2480 * to start a failover. */
2481 if (!(master->flags & SRI_CAN_FAILOVER) ||
2482 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
2483
2484 leader = sentinelGetObjectiveLeader(master);
2485 isleader = leader && strcasecmp(leader,server.runid) == 0;
2486 sdsfree(leader);
2487
2488 /* If I'm not the leader, I can't failover for sure. */
2489 if (!isleader) return;
2490
2491 /* If the failover is already in progress there are two options... */
2492 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
2493 if (master->flags & SRI_I_AM_THE_LEADER) {
2494 /* 1) I'm flagged as leader so I already started the failover.
2495 * Just return. */
2496 return;
2497 } else {
2498 mstime_t elapsed = mstime() - master->failover_state_change_time;
2499
2500 /* 2) I'm the new leader, but I'm not flagged as leader in the
2501 * master: I did not started the failover, but the original
2502 * leader has no longer the leadership.
2503 *
2504 * In this case if the failover appears to be lagging
2505 * for at least 25% of the configured failover timeout,
2506 * I can assume I can take control. Otherwise
2507 * it's better to return and wait more. */
2508 if (elapsed < (master->failover_timeout/4)) return;
2509 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
2510 /* We have already an elected slave if we are in
2511 * FAILOVER_IN_PROGRESS state, that is, the slave that we
2512 * observed turning into a master. */
2513 sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
2514 /* As an observer we flagged all the slaves as RECONF_SENT but
2515 * now we are in charge of actually sending the reconfiguration
2516 * command so let's clear this flag for all the instances. */
2517 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
2518 SRI_RECONF_SENT);
2519 }
2520 } else {
2521 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set.
2522 *
2523 * Do we have a slave to promote? Otherwise don't start a failover
2524 * at all. */
2525 if (sentinelSelectSlave(master) == NULL) return;
2526 sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_WAIT_START);
2527 }
2528 }
2529
2530 /* Select a suitable slave to promote. The current algorithm only uses
2531 * the following parameters:
2532 *
2533 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
2534 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
2535 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
2536 * 4) master_link_down_time no more than:
2537 * (now - master->s_down_since_time) + (master->down_after_period * 10).
2538 * 5) Slave priority can't be zero, otherwise the slave is discareded.
2539 *
2540 * Among all the slaves matching the above conditions we select the slave
2541 * with lower slave_priority. If priority is the same we select the slave
2542 * with lexicographically smaller runid.
2543 *
2544 * The function returns the pointer to the selected slave, otherwise
2545 * NULL if no suitable slave was found.
2546 */
2547
2548 int compareSlavesForPromotion(const void *a, const void *b) {
2549 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
2550 **sb = (sentinelRedisInstance **)b;
2551 char *sa_runid, *sb_runid;
2552
2553 if ((*sa)->slave_priority != (*sb)->slave_priority)
2554 return (*sa)->slave_priority - (*sb)->slave_priority;
2555
2556 /* If priority is the same, select the slave with that has the
2557 * lexicographically smaller runid. Note that we try to handle runid
2558 * == NULL as there are old Redis versions that don't publish runid in
2559 * INFO. A NULL runid is considered bigger than any other runid. */
2560 sa_runid = (*sa)->runid;
2561 sb_runid = (*sb)->runid;
2562 if (sa_runid == NULL && sb_runid == NULL) return 0;
2563 else if (sa_runid == NULL) return 1; /* a > b */
2564 else if (sb_runid == NULL) return -1; /* a < b */
2565 return strcasecmp(sa_runid, sb_runid);
2566 }
2567
2568 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
2569 sentinelRedisInstance **instance =
2570 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
2571 sentinelRedisInstance *selected = NULL;
2572 int instances = 0;
2573 dictIterator *di;
2574 dictEntry *de;
2575 mstime_t max_master_down_time = 0;
2576
2577 if (master->flags & SRI_S_DOWN)
2578 max_master_down_time += mstime() - master->s_down_since_time;
2579 max_master_down_time += master->down_after_period * 10;
2580
2581 di = dictGetIterator(master->slaves);
2582 while((de = dictNext(di)) != NULL) {
2583 sentinelRedisInstance *slave = dictGetVal(de);
2584 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2585
2586 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2587 if (slave->last_avail_time < info_validity_time) continue;
2588 if (slave->slave_priority == 0) continue;
2589
2590 /* If the master is in SDOWN state we get INFO for slaves every second.
2591 * Otherwise we get it with the usual period so we need to account for
2592 * a larger delay. */
2593 if ((master->flags & SRI_S_DOWN) == 0)
2594 info_validity_time -= SENTINEL_INFO_PERIOD;
2595 if (slave->info_refresh < info_validity_time) continue;
2596 if (slave->master_link_down_time > max_master_down_time) continue;
2597 instance[instances++] = slave;
2598 }
2599 dictReleaseIterator(di);
2600 if (instances) {
2601 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2602 compareSlavesForPromotion);
2603 selected = instance[0];
2604 }
2605 zfree(instance);
2606 return selected;
2607 }
2608
2609 /* ---------------- Failover state machine implementation ------------------- */
2610 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2611 /* If we in "wait start" but the master is no longer in ODOWN nor in
2612 * SDOWN condition we abort the failover. This is important as it
2613 * prevents a useless failover in a a notable case of netsplit, where
2614 * the senitnels are split from the redis instances. In this case
2615 * the failover will not start while there is the split because no
2616 * good slave can be reached. However when the split is resolved, we
2617 * can go to waitstart if the slave is back rechable a few milliseconds
2618 * before the master is. In that case when the master is back online
2619 * we cancel the failover. */
2620 if ((ri->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_FORCE_FAILOVER)) == 0) {
2621 sentinelEvent(REDIS_WARNING,"-failover-abort-master-is-back",
2622 ri,"%@");
2623 sentinelAbortFailover(ri);
2624 return;
2625 }
2626
2627 /* Start the failover going to the next state if enough time has
2628 * elapsed. */
2629 if (mstime() >= ri->failover_start_time) {
2630 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2631 ri->failover_state_change_time = mstime();
2632 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2633 }
2634 }
2635
2636 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2637 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2638
2639 if (slave == NULL) {
2640 sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
2641 sentinelAbortFailover(ri);
2642 } else {
2643 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2644 slave->flags |= SRI_PROMOTED;
2645 ri->promoted_slave = slave;
2646 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2647 ri->failover_state_change_time = mstime();
2648 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2649 slave, "%@");
2650 }
2651 }
2652
2653 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2654 int retval;
2655
2656 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2657
2658 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2659 * We actually register a generic callback for this command as we don't
2660 * really care about the reply. We check if it worked indirectly observing
2661 * if INFO returns a different role (master instead of slave). */
2662 retval = redisAsyncCommand(ri->promoted_slave->cc,
2663 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2664 if (retval != REDIS_OK) return;
2665 ri->promoted_slave->pending_commands++;
2666 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2667 ri->promoted_slave,"%@");
2668 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2669 ri->failover_state_change_time = mstime();
2670 }
2671
2672 /* We actually wait for promotion indirectly checking with INFO when the
2673 * slave turns into a master. */
2674 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2675 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2676
2677 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2678 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2679 "%@");
2680 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2681 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2682 ri->failover_state_change_time = mstime();
2683 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2684 ri->promoted_slave = NULL;
2685 }
2686 }
2687
2688 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2689 int not_reconfigured = 0, timeout = 0;
2690 dictIterator *di;
2691 dictEntry *de;
2692 mstime_t elapsed = mstime() - master->failover_state_change_time;
2693
2694 /* We can't consider failover finished if the promoted slave is
2695 * not reachable. */
2696 if (master->promoted_slave == NULL ||
2697 master->promoted_slave->flags & SRI_S_DOWN) return;
2698
2699 /* The failover terminates once all the reachable slaves are properly
2700 * configured. */
2701 di = dictGetIterator(master->slaves);
2702 while((de = dictNext(di)) != NULL) {
2703 sentinelRedisInstance *slave = dictGetVal(de);
2704
2705 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2706 if (slave->flags & SRI_S_DOWN) continue;
2707 not_reconfigured++;
2708 }
2709 dictReleaseIterator(di);
2710
2711 /* Force end of failover on timeout. */
2712 if (elapsed > master->failover_timeout) {
2713 not_reconfigured = 0;
2714 timeout = 1;
2715 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2716 }
2717
2718 if (not_reconfigured == 0) {
2719 int role = (master->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
2720 SENTINEL_OBSERVER;
2721
2722 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2723 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2724 master->failover_state_change_time = mstime();
2725 sentinelCallClientReconfScript(master,role,"end",master->addr,
2726 master->promoted_slave->addr);
2727 }
2728
2729 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2730 * command to all the slaves still not reconfigured to replicate with
2731 * the new master. */
2732 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2733 dictIterator *di;
2734 dictEntry *de;
2735 char master_port[32];
2736
2737 ll2string(master_port,sizeof(master_port),
2738 master->promoted_slave->addr->port);
2739
2740 di = dictGetIterator(master->slaves);
2741 while((de = dictNext(di)) != NULL) {
2742 sentinelRedisInstance *slave = dictGetVal(de);
2743 int retval;
2744
2745 if (slave->flags &
2746 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2747
2748 retval = redisAsyncCommand(slave->cc,
2749 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2750 master->promoted_slave->addr->ip,
2751 master_port);
2752 if (retval == REDIS_OK) {
2753 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2754 slave->flags |= SRI_RECONF_SENT;
2755 }
2756 }
2757 dictReleaseIterator(di);
2758 }
2759 }
2760
2761 /* Send SLAVE OF <new master address> to all the remaining slaves that
2762 * still don't appear to have the configuration updated. */
2763 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2764 dictIterator *di;
2765 dictEntry *de;
2766 int in_progress = 0;
2767
2768 di = dictGetIterator(master->slaves);
2769 while((de = dictNext(di)) != NULL) {
2770 sentinelRedisInstance *slave = dictGetVal(de);
2771
2772 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2773 in_progress++;
2774 }
2775 dictReleaseIterator(di);
2776
2777 di = dictGetIterator(master->slaves);
2778 while(in_progress < master->parallel_syncs &&
2779 (de = dictNext(di)) != NULL)
2780 {
2781 sentinelRedisInstance *slave = dictGetVal(de);
2782 int retval;
2783 char master_port[32];
2784
2785 /* Skip the promoted slave, and already configured slaves. */
2786 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2787
2788 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2789 * the slave moving forward to the next state. */
2790 if ((slave->flags & SRI_RECONF_SENT) &&
2791 (mstime() - slave->slave_reconf_sent_time) >
2792 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2793 {
2794 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2795 slave->flags &= ~SRI_RECONF_SENT;
2796 }
2797
2798 /* Nothing to do for instances that are disconnected or already
2799 * in RECONF_SENT state. */
2800 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2801 continue;
2802
2803 /* Send SLAVEOF <new master>. */
2804 ll2string(master_port,sizeof(master_port),
2805 master->promoted_slave->addr->port);
2806 retval = redisAsyncCommand(slave->cc,
2807 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2808 master->promoted_slave->addr->ip,
2809 master_port);
2810 if (retval == REDIS_OK) {
2811 slave->flags |= SRI_RECONF_SENT;
2812 slave->pending_commands++;
2813 slave->slave_reconf_sent_time = mstime();
2814 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2815 in_progress++;
2816 }
2817 }
2818 dictReleaseIterator(di);
2819 sentinelFailoverDetectEnd(master);
2820 }
2821
2822 /* This function is called when the slave is in
2823 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2824 * to remove it from the master table and add the promoted slave instead.
2825 *
2826 * If there are no promoted slaves as this instance is unique, we remove
2827 * and re-add it with the same address to trigger a complete state
2828 * refresh. */
2829 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2830 sentinelRedisInstance *ref = master->promoted_slave ?
2831 master->promoted_slave : master;
2832
2833 sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
2834 master->name, master->addr->ip, master->addr->port,
2835 ref->addr->ip, ref->addr->port);
2836
2837 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
2838 }
2839
2840 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2841 redisAssert(ri->flags & SRI_MASTER);
2842
2843 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2844
2845 switch(ri->failover_state) {
2846 case SENTINEL_FAILOVER_STATE_WAIT_START:
2847 sentinelFailoverWaitStart(ri);
2848 break;
2849 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2850 sentinelFailoverSelectSlave(ri);
2851 break;
2852 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2853 sentinelFailoverSendSlaveOfNoOne(ri);
2854 break;
2855 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2856 sentinelFailoverWaitPromotion(ri);
2857 break;
2858 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2859 sentinelFailoverReconfNextSlave(ri);
2860 break;
2861 case SENTINEL_FAILOVER_STATE_DETECT_END:
2862 sentinelFailoverDetectEnd(ri);
2863 break;
2864 }
2865 }
2866
2867 /* Abort a failover in progress with the following steps:
2868 * 1) If this instance is the leaer send a SLAVEOF command to all the already
2869 * reconfigured slaves if any to configure them to replicate with the
2870 * original master.
2871 * 2) For both leaders and observers: clear the failover flags and state in
2872 * the master instance.
2873 * 3) If there is already a promoted slave and we are the leader, and this
2874 * slave is not DISCONNECTED, try to reconfigure it to replicate
2875 * back to the master as well, sending a best effort SLAVEOF command.
2876 */
2877 void sentinelAbortFailover(sentinelRedisInstance *ri) {
2878 char master_port[32];
2879 dictIterator *di;
2880 dictEntry *de;
2881 int sentinel_role;
2882
2883 redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
2884 ll2string(master_port,sizeof(master_port),ri->addr->port);
2885
2886 /* Clear failover related flags from slaves.
2887 * Also if we are the leader make sure to send SLAVEOF commands to all the
2888 * already reconfigured slaves in order to turn them back into slaves of
2889 * the original master. */
2890 di = dictGetIterator(ri->slaves);
2891 while((de = dictNext(di)) != NULL) {
2892 sentinelRedisInstance *slave = dictGetVal(de);
2893 if ((ri->flags & SRI_I_AM_THE_LEADER) &&
2894 !(slave->flags & SRI_DISCONNECTED) &&
2895 (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
2896 SRI_RECONF_DONE)))
2897 {
2898 int retval;
2899
2900 retval = redisAsyncCommand(slave->cc,
2901 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2902 ri->addr->ip,
2903 master_port);
2904 if (retval == REDIS_OK)
2905 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2906 }
2907 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2908 }
2909 dictReleaseIterator(di);
2910
2911 sentinel_role = (ri->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
2912 SENTINEL_OBSERVER;
2913 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER|SRI_FORCE_FAILOVER);
2914 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2915 ri->failover_state_change_time = mstime();
2916 if (ri->promoted_slave) {
2917 sentinelCallClientReconfScript(ri,sentinel_role,"abort",
2918 ri->promoted_slave->addr,ri->addr);
2919 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2920 ri->promoted_slave = NULL;
2921 }
2922 }
2923
2924 /* The following is called only for master instances and will abort the
2925 * failover process if:
2926 *
2927 * 1) The failover is in progress.
2928 * 2) We already promoted a slave.
2929 * 3) The promoted slave is in extended SDOWN condition.
2930 */
2931 void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2932 /* Failover is in progress? Do we have a promoted slave? */
2933 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2934
2935 /* Is the promoted slave into an extended SDOWN state? */
2936 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2937 (mstime() - ri->promoted_slave->s_down_since_time) <
2938 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2939
2940 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2941 sentinelAbortFailover(ri);
2942 }
2943
2944 /* ======================== SENTINEL timer handler ==========================
2945 * This is the "main" our Sentinel, being sentinel completely non blocking
2946 * in design. The function is called every second.
2947 * -------------------------------------------------------------------------- */
2948
2949 /* Perform scheduled operations for the specified Redis instance. */
2950 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2951 /* ========== MONITORING HALF ============ */
2952 /* Every kind of instance */
2953 sentinelReconnectInstance(ri);
2954 sentinelPingInstance(ri);
2955
2956 /* Masters and slaves */
2957 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2958 /* Nothing so far. */
2959 }
2960
2961 /* Only masters */
2962 if (ri->flags & SRI_MASTER) {
2963 sentinelAskMasterStateToOtherSentinels(ri);
2964 }
2965
2966 /* ============== ACTING HALF ============= */
2967 /* We don't proceed with the acting half if we are in TILT mode.
2968 * TILT happens when we find something odd with the time, like a
2969 * sudden change in the clock. */
2970 if (sentinel.tilt) {
2971 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2972 sentinel.tilt = 0;
2973 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2974 }
2975
2976 /* Every kind of instance */
2977 sentinelCheckSubjectivelyDown(ri);
2978
2979 /* Masters and slaves */
2980 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2981 /* Nothing so far. */
2982 }
2983
2984 /* Only masters */
2985 if (ri->flags & SRI_MASTER) {
2986 sentinelCheckObjectivelyDown(ri);
2987 sentinelStartFailoverIfNeeded(ri);
2988 sentinelFailoverStateMachine(ri);
2989 sentinelAbortFailoverIfNeeded(ri);
2990 }
2991 }
2992
2993 /* Perform scheduled operations for all the instances in the dictionary.
2994 * Recursively call the function against dictionaries of slaves. */
2995 void sentinelHandleDictOfRedisInstances(dict *instances) {
2996 dictIterator *di;
2997 dictEntry *de;
2998 sentinelRedisInstance *switch_to_promoted = NULL;
2999
3000 /* There are a number of things we need to perform against every master. */
3001 di = dictGetIterator(instances);
3002 while((de = dictNext(di)) != NULL) {
3003 sentinelRedisInstance *ri = dictGetVal(de);
3004
3005 sentinelHandleRedisInstance(ri);
3006 if (ri->flags & SRI_MASTER) {
3007 sentinelHandleDictOfRedisInstances(ri->slaves);
3008 sentinelHandleDictOfRedisInstances(ri->sentinels);
3009 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
3010 switch_to_promoted = ri;
3011 }
3012 }
3013 }
3014 if (switch_to_promoted)
3015 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
3016 dictReleaseIterator(di);
3017 }
3018
3019 /* This function checks if we need to enter the TITL mode.
3020 *
3021 * The TILT mode is entered if we detect that between two invocations of the
3022 * timer interrupt, a negative amount of time, or too much time has passed.
3023 * Note that we expect that more or less just 100 milliseconds will pass
3024 * if everything is fine. However we'll see a negative number or a
3025 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
3026 * following conditions happen:
3027 *
3028 * 1) The Sentiel process for some time is blocked, for every kind of
3029 * random reason: the load is huge, the computer was freezed for some time
3030 * in I/O or alike, the process was stopped by a signal. Everything.
3031 * 2) The system clock was altered significantly.
3032 *
3033 * Under both this conditions we'll see everything as timed out and failing
3034 * without good reasons. Instead we enter the TILT mode and wait
3035 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
3036 *
3037 * During TILT time we still collect information, we just do not act. */
3038 void sentinelCheckTiltCondition(void) {
3039 mstime_t now = mstime();
3040 mstime_t delta = now - sentinel.previous_time;
3041
3042 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
3043 sentinel.tilt = 1;
3044 sentinel.tilt_start_time = mstime();
3045 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
3046 }
3047 sentinel.previous_time = mstime();
3048 }
3049
3050 void sentinelTimer(void) {
3051 sentinelCheckTiltCondition();
3052 sentinelHandleDictOfRedisInstances(sentinel.masters);
3053 sentinelRunPendingScripts();
3054 sentinelCollectTerminatedScripts();
3055 sentinelKillTimedoutScripts();
3056 }
3057