]> git.saurik.com Git - redis.git/blob - src/sentinel.c
Revert "Scripting: redis.NIL to return nil bulk replies."
[redis.git] / src / sentinel.c
1 /* Redis Sentinel implementation
2 * -----------------------------
3 *
4 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are met:
9 *
10 * * Redistributions of source code must retain the above copyright notice,
11 * this list of conditions and the following disclaimer.
12 * * Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 * * Neither the name of Redis nor the names of its contributors may be used
16 * to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
30 */
31
32 #include "redis.h"
33 #include "hiredis.h"
34 #include "async.h"
35
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <sys/socket.h>
39 #include <sys/wait.h>
40
41 extern char **environ;
42
43 #define REDIS_SENTINEL_PORT 26379
44
45 /* ======================== Sentinel global state =========================== */
46
47 typedef long long mstime_t; /* millisecond time type. */
48
49 /* Address object, used to describe an ip:port pair. */
50 typedef struct sentinelAddr {
51 char *ip;
52 int port;
53 } sentinelAddr;
54
55 /* A Sentinel Redis Instance object is monitoring. */
56 #define SRI_MASTER (1<<0)
57 #define SRI_SLAVE (1<<1)
58 #define SRI_SENTINEL (1<<2)
59 #define SRI_DISCONNECTED (1<<3)
60 #define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
61 #define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
62 #define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
63 its master is down. */
64 /* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
65 * allowed to perform the failover for this master.
66 * When set in a SRI_SENTINEL instance means that sentinel is allowed to
67 * perform the failover on its master. */
68 #define SRI_CAN_FAILOVER (1<<7)
69 #define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
70 this master. */
71 #define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
72 #define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
73 #define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
74 #define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
75 #define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
76 #define SRI_FORCE_FAILOVER (1<<14) /* Force failover with master up. */
77 #define SRI_SCRIPT_KILL_SENT (1<<15) /* SCRIPT KILL already sent on -BUSY */
78
79 #define SENTINEL_INFO_PERIOD 10000
80 #define SENTINEL_PING_PERIOD 1000
81 #define SENTINEL_ASK_PERIOD 1000
82 #define SENTINEL_PUBLISH_PERIOD 5000
83 #define SENTINEL_DOWN_AFTER_PERIOD 30000
84 #define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
85 #define SENTINEL_TILT_TRIGGER 2000
86 #define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
87 #define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
88 #define SENTINEL_PROMOTION_RETRY_PERIOD 30000
89 #define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
90 #define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
91 #define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
92 #define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
93 #define SENTINEL_MAX_PENDING_COMMANDS 100
94 #define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
95
96 /* How many milliseconds is an information valid? This applies for instance
97 * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
98 #define SENTINEL_INFO_VALIDITY_TIME 5000
99 #define SENTINEL_FAILOVER_FIXED_DELAY 5000
100 #define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
101
102 /* Failover machine different states. */
103 #define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
104 #define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
105 #define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
106 #define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
107 #define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
108 #define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
109 #define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
110 #define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
111 #define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
112 #define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
113 #define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
114
115 #define SENTINEL_MASTER_LINK_STATUS_UP 0
116 #define SENTINEL_MASTER_LINK_STATUS_DOWN 1
117
118 /* Generic flags that can be used with different functions. */
119 #define SENTINEL_NO_FLAGS 0
120 #define SENTINEL_GENERATE_EVENT 1
121 #define SENTINEL_LEADER 2
122 #define SENTINEL_OBSERVER 4
123
124 /* Script execution flags and limits. */
125 #define SENTINEL_SCRIPT_NONE 0
126 #define SENTINEL_SCRIPT_RUNNING 1
127 #define SENTINEL_SCRIPT_MAX_QUEUE 256
128 #define SENTINEL_SCRIPT_MAX_RUNNING 16
129 #define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
130 #define SENTINEL_SCRIPT_MAX_RETRY 10
131 #define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
132
133 typedef struct sentinelRedisInstance {
134 int flags; /* See SRI_... defines */
135 char *name; /* Master name from the point of view of this sentinel. */
136 char *runid; /* run ID of this instance. */
137 sentinelAddr *addr; /* Master host. */
138 redisAsyncContext *cc; /* Hiredis context for commands. */
139 redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
140 int pending_commands; /* Number of commands sent waiting for a reply. */
141 mstime_t cc_conn_time; /* cc connection time. */
142 mstime_t pc_conn_time; /* pc connection time. */
143 mstime_t pc_last_activity; /* Last time we received any message. */
144 mstime_t last_avail_time; /* Last time the instance replied to ping with
145 a reply we consider valid. */
146 mstime_t last_pong_time; /* Last time the instance replied to ping,
147 whatever the reply was. That's used to check
148 if the link is idle and must be reconnected. */
149 mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
150 mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
151 we received an hello from this Sentinel
152 via Pub/Sub. */
153 mstime_t last_master_down_reply_time; /* Time of last reply to
154 SENTINEL is-master-down command. */
155 mstime_t s_down_since_time; /* Subjectively down since time. */
156 mstime_t o_down_since_time; /* Objectively down since time. */
157 mstime_t down_after_period; /* Consider it down after that period. */
158 mstime_t info_refresh; /* Time at which we received INFO output from it. */
159
160 /* Master specific. */
161 dict *sentinels; /* Other sentinels monitoring the same master. */
162 dict *slaves; /* Slaves for this master instance. */
163 int quorum; /* Number of sentinels that need to agree on failure. */
164 int parallel_syncs; /* How many slaves to reconfigure at same time. */
165 char *auth_pass; /* Password to use for AUTH against master & slaves. */
166
167 /* Slave specific. */
168 mstime_t master_link_down_time; /* Slave replication link down time. */
169 int slave_priority; /* Slave priority according to its INFO output. */
170 mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
171 struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
172 char *slave_master_host; /* Master host as reported by INFO */
173 int slave_master_port; /* Master port as reported by INFO */
174 int slave_master_link_status; /* Master link status as reported by INFO */
175 /* Failover */
176 char *leader; /* If this is a master instance, this is the runid of
177 the Sentinel that should perform the failover. If
178 this is a Sentinel, this is the runid of the Sentinel
179 that this other Sentinel is voting as leader.
180 This field is valid only if SRI_MASTER_DOWN is
181 set on the Sentinel instance. */
182 int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
183 mstime_t failover_state_change_time;
184 mstime_t failover_start_time; /* When to start to failover if leader. */
185 mstime_t failover_timeout; /* Max time to refresh failover state. */
186 struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
187 /* Scripts executed to notify admin or reconfigure clients: when they
188 * are set to NULL no script is executed. */
189 char *notification_script;
190 char *client_reconfig_script;
191 } sentinelRedisInstance;
192
193 /* Main state. */
194 struct sentinelState {
195 dict *masters; /* Dictionary of master sentinelRedisInstances.
196 Key is the instance name, value is the
197 sentinelRedisInstance structure pointer. */
198 int tilt; /* Are we in TILT mode? */
199 int running_scripts; /* Number of scripts in execution right now. */
200 mstime_t tilt_start_time; /* When TITL started. */
201 mstime_t previous_time; /* Time last time we ran the time handler. */
202 list *scripts_queue; /* Queue of user scripts to execute. */
203 } sentinel;
204
205 /* A script execution job. */
206 typedef struct sentinelScriptJob {
207 int flags; /* Script job flags: SENTINEL_SCRIPT_* */
208 int retry_num; /* Number of times we tried to execute it. */
209 char **argv; /* Arguments to call the script. */
210 mstime_t start_time; /* Script execution time if the script is running,
211 otherwise 0 if we are allowed to retry the
212 execution at any time. If the script is not
213 running and it's not 0, it means: do not run
214 before the specified time. */
215 pid_t pid; /* Script execution pid. */
216 } sentinelScriptJob;
217
218 /* ======================= hiredis ae.c adapters =============================
219 * Note: this implementation is taken from hiredis/adapters/ae.h, however
220 * we have our modified copy for Sentinel in order to use our allocator
221 * and to have full control over how the adapter works. */
222
223 typedef struct redisAeEvents {
224 redisAsyncContext *context;
225 aeEventLoop *loop;
226 int fd;
227 int reading, writing;
228 } redisAeEvents;
229
230 static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
231 ((void)el); ((void)fd); ((void)mask);
232
233 redisAeEvents *e = (redisAeEvents*)privdata;
234 redisAsyncHandleRead(e->context);
235 }
236
237 static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
238 ((void)el); ((void)fd); ((void)mask);
239
240 redisAeEvents *e = (redisAeEvents*)privdata;
241 redisAsyncHandleWrite(e->context);
242 }
243
244 static void redisAeAddRead(void *privdata) {
245 redisAeEvents *e = (redisAeEvents*)privdata;
246 aeEventLoop *loop = e->loop;
247 if (!e->reading) {
248 e->reading = 1;
249 aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
250 }
251 }
252
253 static void redisAeDelRead(void *privdata) {
254 redisAeEvents *e = (redisAeEvents*)privdata;
255 aeEventLoop *loop = e->loop;
256 if (e->reading) {
257 e->reading = 0;
258 aeDeleteFileEvent(loop,e->fd,AE_READABLE);
259 }
260 }
261
262 static void redisAeAddWrite(void *privdata) {
263 redisAeEvents *e = (redisAeEvents*)privdata;
264 aeEventLoop *loop = e->loop;
265 if (!e->writing) {
266 e->writing = 1;
267 aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
268 }
269 }
270
271 static void redisAeDelWrite(void *privdata) {
272 redisAeEvents *e = (redisAeEvents*)privdata;
273 aeEventLoop *loop = e->loop;
274 if (e->writing) {
275 e->writing = 0;
276 aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
277 }
278 }
279
280 static void redisAeCleanup(void *privdata) {
281 redisAeEvents *e = (redisAeEvents*)privdata;
282 redisAeDelRead(privdata);
283 redisAeDelWrite(privdata);
284 zfree(e);
285 }
286
287 static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
288 redisContext *c = &(ac->c);
289 redisAeEvents *e;
290
291 /* Nothing should be attached when something is already attached */
292 if (ac->ev.data != NULL)
293 return REDIS_ERR;
294
295 /* Create container for context and r/w events */
296 e = (redisAeEvents*)zmalloc(sizeof(*e));
297 e->context = ac;
298 e->loop = loop;
299 e->fd = c->fd;
300 e->reading = e->writing = 0;
301
302 /* Register functions to start/stop listening for events */
303 ac->ev.addRead = redisAeAddRead;
304 ac->ev.delRead = redisAeDelRead;
305 ac->ev.addWrite = redisAeAddWrite;
306 ac->ev.delWrite = redisAeDelWrite;
307 ac->ev.cleanup = redisAeCleanup;
308 ac->ev.data = e;
309
310 return REDIS_OK;
311 }
312
313 /* ============================= Prototypes ================================= */
314
315 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
316 void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
317 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
318 sentinelRedisInstance *sentinelGetMasterByName(char *name);
319 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
320 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
321 int yesnotoi(char *s);
322 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
323 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
324 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
325 void sentinelAbortFailover(sentinelRedisInstance *ri);
326 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
327 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
328 void sentinelScheduleScriptExecution(char *path, ...);
329 void sentinelStartFailover(sentinelRedisInstance *master, int state);
330 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
331
332 /* ========================= Dictionary types =============================== */
333
334 unsigned int dictSdsHash(const void *key);
335 int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
336 void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
337
338 void dictInstancesValDestructor (void *privdata, void *obj) {
339 releaseSentinelRedisInstance(obj);
340 }
341
342 /* Instance name (sds) -> instance (sentinelRedisInstance pointer)
343 *
344 * also used for: sentinelRedisInstance->sentinels dictionary that maps
345 * sentinels ip:port to last seen time in Pub/Sub hello message. */
346 dictType instancesDictType = {
347 dictSdsHash, /* hash function */
348 NULL, /* key dup */
349 NULL, /* val dup */
350 dictSdsKeyCompare, /* key compare */
351 NULL, /* key destructor */
352 dictInstancesValDestructor /* val destructor */
353 };
354
355 /* Instance runid (sds) -> votes (long casted to void*)
356 *
357 * This is useful into sentinelGetObjectiveLeader() function in order to
358 * count the votes and understand who is the leader. */
359 dictType leaderVotesDictType = {
360 dictSdsHash, /* hash function */
361 NULL, /* key dup */
362 NULL, /* val dup */
363 dictSdsKeyCompare, /* key compare */
364 NULL, /* key destructor */
365 NULL /* val destructor */
366 };
367
368 /* =========================== Initialization =============================== */
369
370 void sentinelCommand(redisClient *c);
371 void sentinelInfoCommand(redisClient *c);
372
373 struct redisCommand sentinelcmds[] = {
374 {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
375 {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
376 {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
377 {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
378 {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
379 {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
380 {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0}
381 };
382
383 /* This function overwrites a few normal Redis config default with Sentinel
384 * specific defaults. */
385 void initSentinelConfig(void) {
386 server.port = REDIS_SENTINEL_PORT;
387 }
388
389 /* Perform the Sentinel mode initialization. */
390 void initSentinel(void) {
391 int j;
392
393 /* Remove usual Redis commands from the command table, then just add
394 * the SENTINEL command. */
395 dictEmpty(server.commands);
396 for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
397 int retval;
398 struct redisCommand *cmd = sentinelcmds+j;
399
400 retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
401 redisAssert(retval == DICT_OK);
402 }
403
404 /* Initialize various data structures. */
405 sentinel.masters = dictCreate(&instancesDictType,NULL);
406 sentinel.tilt = 0;
407 sentinel.tilt_start_time = mstime();
408 sentinel.previous_time = mstime();
409 sentinel.running_scripts = 0;
410 sentinel.scripts_queue = listCreate();
411 }
412
413 /* ============================== sentinelAddr ============================== */
414
415 /* Create a sentinelAddr object and return it on success.
416 * On error NULL is returned and errno is set to:
417 * ENOENT: Can't resolve the hostname.
418 * EINVAL: Invalid port number.
419 */
420 sentinelAddr *createSentinelAddr(char *hostname, int port) {
421 char buf[32];
422 sentinelAddr *sa;
423
424 if (port <= 0 || port > 65535) {
425 errno = EINVAL;
426 return NULL;
427 }
428 if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
429 errno = ENOENT;
430 return NULL;
431 }
432 sa = zmalloc(sizeof(*sa));
433 sa->ip = sdsnew(buf);
434 sa->port = port;
435 return sa;
436 }
437
438 /* Free a Sentinel address. Can't fail. */
439 void releaseSentinelAddr(sentinelAddr *sa) {
440 sdsfree(sa->ip);
441 zfree(sa);
442 }
443
444 /* =========================== Events notification ========================== */
445
446 /* Send an event to log, pub/sub, user notification script.
447 *
448 * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
449 * the execution of the user notification script.
450 *
451 * 'type' is the message type, also used as a pub/sub channel name.
452 *
453 * 'ri', is the redis instance target of this event if applicable, and is
454 * used to obtain the path of the notification script to execute.
455 *
456 * The remaining arguments are printf-alike.
457 * If the format specifier starts with the two characters "%@" then ri is
458 * not NULL, and the message is prefixed with an instance identifier in the
459 * following format:
460 *
461 * <instance type> <instance name> <ip> <port>
462 *
463 * If the instance type is not master, than the additional string is
464 * added to specify the originating master:
465 *
466 * @ <master name> <master ip> <master port>
467 *
468 * Any other specifier after "%@" is processed by printf itself.
469 */
470 void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
471 const char *fmt, ...) {
472 va_list ap;
473 char msg[REDIS_MAX_LOGMSG_LEN];
474 robj *channel, *payload;
475
476 /* Handle %@ */
477 if (fmt[0] == '%' && fmt[1] == '@') {
478 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
479 NULL : ri->master;
480
481 if (master) {
482 snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
483 sentinelRedisInstanceTypeStr(ri),
484 ri->name, ri->addr->ip, ri->addr->port,
485 master->name, master->addr->ip, master->addr->port);
486 } else {
487 snprintf(msg, sizeof(msg), "%s %s %s %d",
488 sentinelRedisInstanceTypeStr(ri),
489 ri->name, ri->addr->ip, ri->addr->port);
490 }
491 fmt += 2;
492 } else {
493 msg[0] = '\0';
494 }
495
496 /* Use vsprintf for the rest of the formatting if any. */
497 if (fmt[0] != '\0') {
498 va_start(ap, fmt);
499 vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
500 va_end(ap);
501 }
502
503 /* Log the message if the log level allows it to be logged. */
504 if (level >= server.verbosity)
505 redisLog(level,"%s %s",type,msg);
506
507 /* Publish the message via Pub/Sub if it's not a debugging one. */
508 if (level != REDIS_DEBUG) {
509 channel = createStringObject(type,strlen(type));
510 payload = createStringObject(msg,strlen(msg));
511 pubsubPublishMessage(channel,payload);
512 decrRefCount(channel);
513 decrRefCount(payload);
514 }
515
516 /* Call the notification script if applicable. */
517 if (level == REDIS_WARNING && ri != NULL) {
518 sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
519 ri : ri->master;
520 if (master->notification_script) {
521 sentinelScheduleScriptExecution(master->notification_script,
522 type,msg,NULL);
523 }
524 }
525 }
526
527 /* ============================ script execution ============================ */
528
529 /* Release a script job structure and all the associated data. */
530 void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
531 int j = 0;
532
533 while(sj->argv[j]) sdsfree(sj->argv[j++]);
534 zfree(sj->argv);
535 zfree(sj);
536 }
537
538 #define SENTINEL_SCRIPT_MAX_ARGS 16
539 void sentinelScheduleScriptExecution(char *path, ...) {
540 va_list ap;
541 char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
542 int argc = 1;
543 sentinelScriptJob *sj;
544
545 va_start(ap, path);
546 while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
547 argv[argc] = va_arg(ap,char*);
548 if (!argv[argc]) break;
549 argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
550 argc++;
551 }
552 va_end(ap);
553 argv[0] = sdsnew(path);
554
555 sj = zmalloc(sizeof(*sj));
556 sj->flags = SENTINEL_SCRIPT_NONE;
557 sj->retry_num = 0;
558 sj->argv = zmalloc(sizeof(char*)*(argc+1));
559 sj->start_time = 0;
560 sj->pid = 0;
561 memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
562
563 listAddNodeTail(sentinel.scripts_queue,sj);
564
565 /* Remove the oldest non running script if we already hit the limit. */
566 if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
567 listNode *ln;
568 listIter li;
569
570 listRewind(sentinel.scripts_queue,&li);
571 while ((ln = listNext(&li)) != NULL) {
572 sj = ln->value;
573
574 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
575 /* The first node is the oldest as we add on tail. */
576 listDelNode(sentinel.scripts_queue,ln);
577 sentinelReleaseScriptJob(sj);
578 break;
579 }
580 redisAssert(listLength(sentinel.scripts_queue) <=
581 SENTINEL_SCRIPT_MAX_QUEUE);
582 }
583 }
584
585 /* Lookup a script in the scripts queue via pid, and returns the list node
586 * (so that we can easily remove it from the queue if needed). */
587 listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
588 listNode *ln;
589 listIter li;
590
591 listRewind(sentinel.scripts_queue,&li);
592 while ((ln = listNext(&li)) != NULL) {
593 sentinelScriptJob *sj = ln->value;
594
595 if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
596 return ln;
597 }
598 return NULL;
599 }
600
601 /* Run pending scripts if we are not already at max number of running
602 * scripts. */
603 void sentinelRunPendingScripts(void) {
604 listNode *ln;
605 listIter li;
606 mstime_t now = mstime();
607
608 /* Find jobs that are not running and run them, from the top to the
609 * tail of the queue, so we run older jobs first. */
610 listRewind(sentinel.scripts_queue,&li);
611 while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
612 (ln = listNext(&li)) != NULL)
613 {
614 sentinelScriptJob *sj = ln->value;
615 pid_t pid;
616
617 /* Skip if already running. */
618 if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
619
620 /* Skip if it's a retry, but not enough time has elapsed. */
621 if (sj->start_time && sj->start_time > now) continue;
622
623 sj->flags |= SENTINEL_SCRIPT_RUNNING;
624 sj->start_time = mstime();
625 sj->retry_num++;
626 pid = fork();
627
628 if (pid == -1) {
629 /* Parent (fork error).
630 * We report fork errors as signal 99, in order to unify the
631 * reporting with other kind of errors. */
632 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
633 "%s %d %d", sj->argv[0], 99, 0);
634 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
635 sj->pid = 0;
636 } else if (pid == 0) {
637 /* Child */
638 execve(sj->argv[0],sj->argv,environ);
639 /* If we are here an error occurred. */
640 _exit(2); /* Don't retry execution. */
641 } else {
642 sentinel.running_scripts++;
643 sj->pid = pid;
644 sentinelEvent(REDIS_DEBUG,"+script-child",NULL,"%ld",(long)pid);
645 }
646 }
647 }
648
649 /* How much to delay the execution of a script that we need to retry after
650 * an error?
651 *
652 * We double the retry delay for every further retry we do. So for instance
653 * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
654 * starting from the second attempt to execute the script the delays are:
655 * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
656 mstime_t sentinelScriptRetryDelay(int retry_num) {
657 mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
658
659 while (retry_num-- > 1) delay *= 2;
660 return delay;
661 }
662
663 /* Check for scripts that terminated, and remove them from the queue if the
664 * script terminated successfully. If instead the script was terminated by
665 * a signal, or returned exit code "1", it is scheduled to run again if
666 * the max number of retries did not already elapsed. */
667 void sentinelCollectTerminatedScripts(void) {
668 int statloc;
669 pid_t pid;
670
671 while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
672 int exitcode = WEXITSTATUS(statloc);
673 int bysignal = 0;
674 listNode *ln;
675 sentinelScriptJob *sj;
676
677 if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
678 sentinelEvent(REDIS_DEBUG,"-script-child",NULL,"%ld %d %d",
679 (long)pid, exitcode, bysignal);
680
681 ln = sentinelGetScriptListNodeByPid(pid);
682 if (ln == NULL) {
683 redisLog(REDIS_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
684 continue;
685 }
686 sj = ln->value;
687
688 /* If the script was terminated by a signal or returns an
689 * exit code of "1" (that means: please retry), we reschedule it
690 * if the max number of retries is not already reached. */
691 if ((bysignal || exitcode == 1) &&
692 sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
693 {
694 sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
695 sj->pid = 0;
696 sj->start_time = mstime() +
697 sentinelScriptRetryDelay(sj->retry_num);
698 } else {
699 /* Otherwise let's remove the script, but log the event if the
700 * execution did not terminated in the best of the ways. */
701 if (bysignal || exitcode != 0) {
702 sentinelEvent(REDIS_WARNING,"-script-error",NULL,
703 "%s %d %d", sj->argv[0], bysignal, exitcode);
704 }
705 listDelNode(sentinel.scripts_queue,ln);
706 sentinelReleaseScriptJob(sj);
707 sentinel.running_scripts--;
708 }
709 }
710 }
711
712 /* Kill scripts in timeout, they'll be collected by the
713 * sentinelCollectTerminatedScripts() function. */
714 void sentinelKillTimedoutScripts(void) {
715 listNode *ln;
716 listIter li;
717 mstime_t now = mstime();
718
719 listRewind(sentinel.scripts_queue,&li);
720 while ((ln = listNext(&li)) != NULL) {
721 sentinelScriptJob *sj = ln->value;
722
723 if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
724 (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
725 {
726 sentinelEvent(REDIS_WARNING,"-script-timeout",NULL,"%s %ld",
727 sj->argv[0], (long)sj->pid);
728 kill(sj->pid,SIGKILL);
729 }
730 }
731 }
732
733 /* Implements SENTINEL PENDING-SCRIPTS command. */
734 void sentinelPendingScriptsCommand(redisClient *c) {
735 listNode *ln;
736 listIter li;
737
738 addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
739 listRewind(sentinel.scripts_queue,&li);
740 while ((ln = listNext(&li)) != NULL) {
741 sentinelScriptJob *sj = ln->value;
742 int j = 0;
743
744 addReplyMultiBulkLen(c,10);
745
746 addReplyBulkCString(c,"argv");
747 while (sj->argv[j]) j++;
748 addReplyMultiBulkLen(c,j);
749 j = 0;
750 while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
751
752 addReplyBulkCString(c,"flags");
753 addReplyBulkCString(c,
754 (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
755
756 addReplyBulkCString(c,"pid");
757 addReplyBulkLongLong(c,sj->pid);
758
759 if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
760 addReplyBulkCString(c,"run-time");
761 addReplyBulkLongLong(c,mstime() - sj->start_time);
762 } else {
763 mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
764 if (delay < 0) delay = 0;
765 addReplyBulkCString(c,"run-delay");
766 addReplyBulkLongLong(c,delay);
767 }
768
769 addReplyBulkCString(c,"retry-num");
770 addReplyBulkLongLong(c,sj->retry_num);
771 }
772 }
773
774 /* This function calls, if any, the client reconfiguration script with the
775 * following parameters:
776 *
777 * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
778 *
779 * It is called every time a failover starts, ends, or is aborted.
780 *
781 * <state> is "start", "end" or "abort".
782 * <role> is either "leader" or "observer".
783 *
784 * from/to fields are respectively master -> promoted slave addresses for
785 * "start" and "end", or the reverse (promoted slave -> master) in case of
786 * "abort".
787 */
788 void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
789 char fromport[32], toport[32];
790
791 if (master->client_reconfig_script == NULL) return;
792 ll2string(fromport,sizeof(fromport),from->port);
793 ll2string(toport,sizeof(toport),to->port);
794 sentinelScheduleScriptExecution(master->client_reconfig_script,
795 master->name,
796 (role == SENTINEL_LEADER) ? "leader" : "observer",
797 state, from->ip, fromport, to->ip, toport, NULL);
798 }
799
800 /* ========================== sentinelRedisInstance ========================= */
801
802 /* Create a redis instance, the following fields must be populated by the
803 * caller if needed:
804 * runid: set to NULL but will be populated once INFO output is received.
805 * info_refresh: is set to 0 to mean that we never received INFO so far.
806 *
807 * If SRI_MASTER is set into initial flags the instance is added to
808 * sentinel.masters table.
809 *
810 * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
811 * instance is added into master->slaves or master->sentinels table.
812 *
813 * If the instance is a slave or sentinel, the name parameter is ignored and
814 * is created automatically as hostname:port.
815 *
816 * The function fails if hostname can't be resolved or port is out of range.
817 * When this happens NULL is returned and errno is set accordingly to the
818 * createSentinelAddr() function.
819 *
820 * The function may also fail and return NULL with errno set to EBUSY if
821 * a master or slave with the same name already exists. */
822 sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
823 sentinelRedisInstance *ri;
824 sentinelAddr *addr;
825 dict *table = NULL;
826 char slavename[128], *sdsname;
827
828 redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
829 redisAssert((flags & SRI_MASTER) || master != NULL);
830
831 /* Check address validity. */
832 addr = createSentinelAddr(hostname,port);
833 if (addr == NULL) return NULL;
834
835 /* For slaves and sentinel we use ip:port as name. */
836 if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
837 snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
838 name = slavename;
839 }
840
841 /* Make sure the entry is not duplicated. This may happen when the same
842 * name for a master is used multiple times inside the configuration or
843 * if we try to add multiple times a slave or sentinel with same ip/port
844 * to a master. */
845 if (flags & SRI_MASTER) table = sentinel.masters;
846 else if (flags & SRI_SLAVE) table = master->slaves;
847 else if (flags & SRI_SENTINEL) table = master->sentinels;
848 sdsname = sdsnew(name);
849 if (dictFind(table,sdsname)) {
850 sdsfree(sdsname);
851 errno = EBUSY;
852 return NULL;
853 }
854
855 /* Create the instance object. */
856 ri = zmalloc(sizeof(*ri));
857 /* Note that all the instances are started in the disconnected state,
858 * the event loop will take care of connecting them. */
859 ri->flags = flags | SRI_DISCONNECTED;
860 ri->name = sdsname;
861 ri->runid = NULL;
862 ri->addr = addr;
863 ri->cc = NULL;
864 ri->pc = NULL;
865 ri->pending_commands = 0;
866 ri->cc_conn_time = 0;
867 ri->pc_conn_time = 0;
868 ri->pc_last_activity = 0;
869 ri->last_avail_time = mstime();
870 ri->last_pong_time = mstime();
871 ri->last_pub_time = mstime();
872 ri->last_hello_time = mstime();
873 ri->last_master_down_reply_time = mstime();
874 ri->s_down_since_time = 0;
875 ri->o_down_since_time = 0;
876 ri->down_after_period = master ? master->down_after_period :
877 SENTINEL_DOWN_AFTER_PERIOD;
878 ri->master_link_down_time = 0;
879 ri->auth_pass = NULL;
880 ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
881 ri->slave_reconf_sent_time = 0;
882 ri->slave_master_host = NULL;
883 ri->slave_master_port = 0;
884 ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
885 ri->sentinels = dictCreate(&instancesDictType,NULL);
886 ri->quorum = quorum;
887 ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
888 ri->master = master;
889 ri->slaves = dictCreate(&instancesDictType,NULL);
890 ri->info_refresh = 0;
891
892 /* Failover state. */
893 ri->leader = NULL;
894 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
895 ri->failover_state_change_time = 0;
896 ri->failover_start_time = 0;
897 ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
898 ri->promoted_slave = NULL;
899 ri->notification_script = NULL;
900 ri->client_reconfig_script = NULL;
901
902 /* Add into the right table. */
903 dictAdd(table, ri->name, ri);
904 return ri;
905 }
906
907 /* Release this instance and all its slaves, sentinels, hiredis connections.
908 * This function also takes care of unlinking the instance from the main
909 * masters table (if it is a master) or from its master sentinels/slaves table
910 * if it is a slave or sentinel. */
911 void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
912 /* Release all its slaves or sentinels if any. */
913 dictRelease(ri->sentinels);
914 dictRelease(ri->slaves);
915
916 /* Release hiredis connections. */
917 if (ri->cc) sentinelKillLink(ri,ri->cc);
918 if (ri->pc) sentinelKillLink(ri,ri->pc);
919
920 /* Free other resources. */
921 sdsfree(ri->name);
922 sdsfree(ri->runid);
923 sdsfree(ri->notification_script);
924 sdsfree(ri->client_reconfig_script);
925 sdsfree(ri->slave_master_host);
926 sdsfree(ri->leader);
927 sdsfree(ri->auth_pass);
928 releaseSentinelAddr(ri->addr);
929
930 /* Clear state into the master if needed. */
931 if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
932 ri->master->promoted_slave = NULL;
933
934 zfree(ri);
935 }
936
937 /* Lookup a slave in a master Redis instance, by ip and port. */
938 sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
939 sentinelRedisInstance *ri, char *ip, int port)
940 {
941 sds key;
942 sentinelRedisInstance *slave;
943
944 redisAssert(ri->flags & SRI_MASTER);
945 key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
946 slave = dictFetchValue(ri->slaves,key);
947 sdsfree(key);
948 return slave;
949 }
950
951 /* Return the name of the type of the instance as a string. */
952 const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
953 if (ri->flags & SRI_MASTER) return "master";
954 else if (ri->flags & SRI_SLAVE) return "slave";
955 else if (ri->flags & SRI_SENTINEL) return "sentinel";
956 else return "unknown";
957 }
958
959 /* This function removes all the instances found in the dictionary of instances
960 * 'd', having either:
961 *
962 * 1) The same ip/port as specified.
963 * 2) The same runid.
964 *
965 * "1" and "2" don't need to verify at the same time, just one is enough.
966 * If "runid" is NULL it is not checked.
967 * Similarly if "ip" is NULL it is not checked.
968 *
969 * This function is useful because every time we add a new Sentinel into
970 * a master's Sentinels dictionary, we want to be very sure about not
971 * having duplicated instances for any reason. This is so important because
972 * we use those other sentinels in order to run our quorum protocol to
973 * understand if it's time to proceeed with the fail over.
974 *
975 * Making sure no duplication is possible we greately improve the robustness
976 * of the quorum (otherwise we may end counting the same instance multiple
977 * times for some reason).
978 *
979 * The function returns the number of Sentinels removed. */
980 int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
981 dictIterator *di;
982 dictEntry *de;
983 int removed = 0;
984
985 di = dictGetSafeIterator(master->sentinels);
986 while((de = dictNext(di)) != NULL) {
987 sentinelRedisInstance *ri = dictGetVal(de);
988
989 if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
990 (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
991 {
992 dictDelete(master->sentinels,ri->name);
993 removed++;
994 }
995 }
996 dictReleaseIterator(di);
997 return removed;
998 }
999
1000 /* Search an instance with the same runid, ip and port into a dictionary
1001 * of instances. Return NULL if not found, otherwise return the instance
1002 * pointer.
1003 *
1004 * runid or ip can be NULL. In such a case the search is performed only
1005 * by the non-NULL field. */
1006 sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
1007 dictIterator *di;
1008 dictEntry *de;
1009 sentinelRedisInstance *instance = NULL;
1010
1011 redisAssert(ip || runid); /* User must pass at least one search param. */
1012 di = dictGetIterator(instances);
1013 while((de = dictNext(di)) != NULL) {
1014 sentinelRedisInstance *ri = dictGetVal(de);
1015
1016 if (runid && !ri->runid) continue;
1017 if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
1018 (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
1019 ri->addr->port == port)))
1020 {
1021 instance = ri;
1022 break;
1023 }
1024 }
1025 dictReleaseIterator(di);
1026 return instance;
1027 }
1028
1029 /* Simple master lookup by name */
1030 sentinelRedisInstance *sentinelGetMasterByName(char *name) {
1031 sentinelRedisInstance *ri;
1032 sds sdsname = sdsnew(name);
1033
1034 ri = dictFetchValue(sentinel.masters,sdsname);
1035 sdsfree(sdsname);
1036 return ri;
1037 }
1038
1039 /* Add the specified flags to all the instances in the specified dictionary. */
1040 void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
1041 dictIterator *di;
1042 dictEntry *de;
1043
1044 di = dictGetIterator(instances);
1045 while((de = dictNext(di)) != NULL) {
1046 sentinelRedisInstance *ri = dictGetVal(de);
1047 ri->flags |= flags;
1048 }
1049 dictReleaseIterator(di);
1050 }
1051
1052 /* Remove the specified flags to all the instances in the specified
1053 * dictionary. */
1054 void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
1055 dictIterator *di;
1056 dictEntry *de;
1057
1058 di = dictGetIterator(instances);
1059 while((de = dictNext(di)) != NULL) {
1060 sentinelRedisInstance *ri = dictGetVal(de);
1061 ri->flags &= ~flags;
1062 }
1063 dictReleaseIterator(di);
1064 }
1065
1066 /* Reset the state of a monitored master:
1067 * 1) Remove all slaves.
1068 * 2) Remove all sentinels.
1069 * 3) Remove most of the flags resulting from runtime operations.
1070 * 4) Reset timers to their default value.
1071 * 5) In the process of doing this undo the failover if in progress.
1072 * 6) Disconnect the connections with the master (will reconnect automatically).
1073 */
1074 void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
1075 redisAssert(ri->flags & SRI_MASTER);
1076 dictRelease(ri->slaves);
1077 dictRelease(ri->sentinels);
1078 ri->slaves = dictCreate(&instancesDictType,NULL);
1079 ri->sentinels = dictCreate(&instancesDictType,NULL);
1080 if (ri->cc) sentinelKillLink(ri,ri->cc);
1081 if (ri->pc) sentinelKillLink(ri,ri->pc);
1082 ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
1083 if (ri->leader) {
1084 sdsfree(ri->leader);
1085 ri->leader = NULL;
1086 }
1087 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
1088 ri->failover_state_change_time = 0;
1089 ri->failover_start_time = 0;
1090 ri->promoted_slave = NULL;
1091 sdsfree(ri->runid);
1092 sdsfree(ri->slave_master_host);
1093 ri->runid = NULL;
1094 ri->slave_master_host = NULL;
1095 ri->last_avail_time = mstime();
1096 ri->last_pong_time = mstime();
1097 if (flags & SENTINEL_GENERATE_EVENT)
1098 sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
1099 }
1100
1101 /* Call sentinelResetMaster() on every master with a name matching the specified
1102 * pattern. */
1103 int sentinelResetMastersByPattern(char *pattern, int flags) {
1104 dictIterator *di;
1105 dictEntry *de;
1106 int reset = 0;
1107
1108 di = dictGetIterator(sentinel.masters);
1109 while((de = dictNext(di)) != NULL) {
1110 sentinelRedisInstance *ri = dictGetVal(de);
1111
1112 if (ri->name) {
1113 if (stringmatch(pattern,ri->name,0)) {
1114 sentinelResetMaster(ri,flags);
1115 reset++;
1116 }
1117 }
1118 }
1119 dictReleaseIterator(di);
1120 return reset;
1121 }
1122
1123 /* Reset the specified master with sentinelResetMaster(), and also change
1124 * the ip:port address, but take the name of the instance unmodified.
1125 *
1126 * This is used to handle the +switch-master and +redirect-to-master events.
1127 *
1128 * The function returns REDIS_ERR if the address can't be resolved for some
1129 * reason. Otherwise REDIS_OK is returned.
1130 *
1131 * TODO: make this reset so that original sentinels are re-added with
1132 * same ip / port / runid.
1133 */
1134
1135 int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
1136 sentinelAddr *oldaddr, *newaddr;
1137
1138 newaddr = createSentinelAddr(ip,port);
1139 if (newaddr == NULL) return REDIS_ERR;
1140 sentinelResetMaster(master,SENTINEL_NO_FLAGS);
1141 oldaddr = master->addr;
1142 master->addr = newaddr;
1143 /* Release the old address at the end so we are safe even if the function
1144 * gets the master->addr->ip and master->addr->port as arguments. */
1145 releaseSentinelAddr(oldaddr);
1146 return REDIS_OK;
1147 }
1148
1149 /* ============================ Config handling ============================= */
1150 char *sentinelHandleConfiguration(char **argv, int argc) {
1151 sentinelRedisInstance *ri;
1152
1153 if (!strcasecmp(argv[0],"monitor") && argc == 5) {
1154 /* monitor <name> <host> <port> <quorum> */
1155 int quorum = atoi(argv[4]);
1156
1157 if (quorum <= 0) return "Quorum must be 1 or greater.";
1158 if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
1159 atoi(argv[3]),quorum,NULL) == NULL)
1160 {
1161 switch(errno) {
1162 case EBUSY: return "Duplicated master name.";
1163 case ENOENT: return "Can't resolve master instance hostname.";
1164 case EINVAL: return "Invalid port number";
1165 }
1166 }
1167 } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
1168 /* down-after-milliseconds <name> <milliseconds> */
1169 ri = sentinelGetMasterByName(argv[1]);
1170 if (!ri) return "No such master with specified name.";
1171 ri->down_after_period = atoi(argv[2]);
1172 if (ri->down_after_period <= 0)
1173 return "negative or zero time parameter.";
1174 } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
1175 /* failover-timeout <name> <milliseconds> */
1176 ri = sentinelGetMasterByName(argv[1]);
1177 if (!ri) return "No such master with specified name.";
1178 ri->failover_timeout = atoi(argv[2]);
1179 if (ri->failover_timeout <= 0)
1180 return "negative or zero time parameter.";
1181 } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
1182 /* can-failover <name> <yes/no> */
1183 int yesno = yesnotoi(argv[2]);
1184
1185 ri = sentinelGetMasterByName(argv[1]);
1186 if (!ri) return "No such master with specified name.";
1187 if (yesno == -1) return "Argument must be either yes or no.";
1188 if (yesno)
1189 ri->flags |= SRI_CAN_FAILOVER;
1190 else
1191 ri->flags &= ~SRI_CAN_FAILOVER;
1192 } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
1193 /* parallel-syncs <name> <milliseconds> */
1194 ri = sentinelGetMasterByName(argv[1]);
1195 if (!ri) return "No such master with specified name.";
1196 ri->parallel_syncs = atoi(argv[2]);
1197 } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
1198 /* notification-script <name> <path> */
1199 ri = sentinelGetMasterByName(argv[1]);
1200 if (!ri) return "No such master with specified name.";
1201 if (access(argv[2],X_OK) == -1)
1202 return "Notification script seems non existing or non executable.";
1203 ri->notification_script = sdsnew(argv[2]);
1204 } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
1205 /* client-reconfig-script <name> <path> */
1206 ri = sentinelGetMasterByName(argv[1]);
1207 if (!ri) return "No such master with specified name.";
1208 if (access(argv[2],X_OK) == -1)
1209 return "Client reconfiguration script seems non existing or "
1210 "non executable.";
1211 ri->client_reconfig_script = sdsnew(argv[2]);
1212 } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
1213 /* auth-pass <name> <password> */
1214 ri = sentinelGetMasterByName(argv[1]);
1215 if (!ri) return "No such master with specified name.";
1216 ri->auth_pass = sdsnew(argv[2]);
1217 } else {
1218 return "Unrecognized sentinel configuration statement.";
1219 }
1220 return NULL;
1221 }
1222
1223 /* ====================== hiredis connection handling ======================= */
1224
1225 /* Completely disconnect an hiredis link from an instance. */
1226 void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
1227 if (ri->cc == c) {
1228 ri->cc = NULL;
1229 ri->pending_commands = 0;
1230 }
1231 if (ri->pc == c) ri->pc = NULL;
1232 c->data = NULL;
1233 ri->flags |= SRI_DISCONNECTED;
1234 redisAsyncFree(c);
1235 }
1236
1237 /* This function takes an hiredis context that is in an error condition
1238 * and make sure to mark the instance as disconnected performing the
1239 * cleanup needed.
1240 *
1241 * Note: we don't free the hiredis context as hiredis will do it for us
1242 * for async conenctions. */
1243 void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
1244 sentinelRedisInstance *ri = c->data;
1245 int pubsub;
1246
1247 if (ri == NULL) return; /* The instance no longer exists. */
1248
1249 pubsub = (ri->pc == c);
1250 sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
1251 "%@ #%s", c->errstr);
1252 if (pubsub)
1253 ri->pc = NULL;
1254 else
1255 ri->cc = NULL;
1256 ri->flags |= SRI_DISCONNECTED;
1257 }
1258
1259 void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
1260 if (status != REDIS_OK) {
1261 sentinelDisconnectInstanceFromContext(c);
1262 } else {
1263 sentinelRedisInstance *ri = c->data;
1264 int pubsub = (ri->pc == c);
1265
1266 sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
1267 "%@");
1268 }
1269 }
1270
1271 void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
1272 sentinelDisconnectInstanceFromContext(c);
1273 }
1274
1275 /* Send the AUTH command with the specified master password if needed.
1276 * Note that for slaves the password set for the master is used.
1277 *
1278 * We don't check at all if the command was successfully transmitted
1279 * to the instance as if it fails Sentinel will detect the instance down,
1280 * will disconnect and reconnect the link and so forth. */
1281 void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
1282 char *auth_pass = (ri->flags & SRI_MASTER) ? ri->auth_pass :
1283 ri->master->auth_pass;
1284
1285 if (auth_pass)
1286 redisAsyncCommand(c, sentinelDiscardReplyCallback, NULL, "AUTH %s",
1287 auth_pass);
1288 }
1289
1290 /* Create the async connections for the specified instance if the instance
1291 * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
1292 * one of the two links (commands and pub/sub) is missing. */
1293 void sentinelReconnectInstance(sentinelRedisInstance *ri) {
1294 if (!(ri->flags & SRI_DISCONNECTED)) return;
1295
1296 /* Commands connection. */
1297 if (ri->cc == NULL) {
1298 ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1299 if (ri->cc->err) {
1300 sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
1301 ri->cc->errstr);
1302 sentinelKillLink(ri,ri->cc);
1303 } else {
1304 ri->cc_conn_time = mstime();
1305 ri->cc->data = ri;
1306 redisAeAttach(server.el,ri->cc);
1307 redisAsyncSetConnectCallback(ri->cc,
1308 sentinelLinkEstablishedCallback);
1309 redisAsyncSetDisconnectCallback(ri->cc,
1310 sentinelDisconnectCallback);
1311 sentinelSendAuthIfNeeded(ri,ri->cc);
1312 }
1313 }
1314 /* Pub / Sub */
1315 if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
1316 ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
1317 if (ri->pc->err) {
1318 sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
1319 ri->pc->errstr);
1320 sentinelKillLink(ri,ri->pc);
1321 } else {
1322 int retval;
1323
1324 ri->pc_conn_time = mstime();
1325 ri->pc->data = ri;
1326 redisAeAttach(server.el,ri->pc);
1327 redisAsyncSetConnectCallback(ri->pc,
1328 sentinelLinkEstablishedCallback);
1329 redisAsyncSetDisconnectCallback(ri->pc,
1330 sentinelDisconnectCallback);
1331 sentinelSendAuthIfNeeded(ri,ri->pc);
1332 /* Now we subscribe to the Sentinels "Hello" channel. */
1333 retval = redisAsyncCommand(ri->pc,
1334 sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
1335 SENTINEL_HELLO_CHANNEL);
1336 if (retval != REDIS_OK) {
1337 /* If we can't subscribe, the Pub/Sub connection is useless
1338 * and we can simply disconnect it and try again. */
1339 sentinelKillLink(ri,ri->pc);
1340 return;
1341 }
1342 }
1343 }
1344 /* Clear the DISCONNECTED flags only if we have both the connections
1345 * (or just the commands connection if this is a slave or a
1346 * sentinel instance). */
1347 if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
1348 ri->flags &= ~SRI_DISCONNECTED;
1349 }
1350
1351 /* ======================== Redis instances pinging ======================== */
1352
1353 /* Process the INFO output from masters. */
1354 void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
1355 sds *lines;
1356 int numlines, j;
1357 int role = 0;
1358 int runid_changed = 0; /* true if runid changed. */
1359 int first_runid = 0; /* true if this is the first runid we receive. */
1360
1361 /* The following fields must be reset to a given value in the case they
1362 * are not found at all in the INFO output. */
1363 ri->master_link_down_time = 0;
1364
1365 /* Process line by line. */
1366 lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
1367 for (j = 0; j < numlines; j++) {
1368 sentinelRedisInstance *slave;
1369 sds l = lines[j];
1370
1371 /* run_id:<40 hex chars>*/
1372 if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
1373 if (ri->runid == NULL) {
1374 ri->runid = sdsnewlen(l+7,40);
1375 first_runid = 1;
1376 } else {
1377 if (strncmp(ri->runid,l+7,40) != 0) {
1378 runid_changed = 1;
1379 sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
1380 sdsfree(ri->runid);
1381 ri->runid = sdsnewlen(l+7,40);
1382 }
1383 }
1384 }
1385
1386 /* slave0:<ip>,<port>,<state> */
1387 if ((ri->flags & SRI_MASTER) &&
1388 sdslen(l) >= 7 &&
1389 !memcmp(l,"slave",5) && isdigit(l[5]))
1390 {
1391 char *ip, *port, *end;
1392
1393 ip = strchr(l,':'); if (!ip) continue;
1394 ip++; /* Now ip points to start of ip address. */
1395 port = strchr(ip,','); if (!port) continue;
1396 *port = '\0'; /* nul term for easy access. */
1397 port++; /* Now port points to start of port number. */
1398 end = strchr(port,','); if (!end) continue;
1399 *end = '\0'; /* nul term for easy access. */
1400
1401 /* Check if we already have this slave into our table,
1402 * otherwise add it. */
1403 if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
1404 if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
1405 atoi(port), ri->quorum,ri)) != NULL)
1406 {
1407 sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
1408 }
1409 }
1410 }
1411
1412 /* master_link_down_since_seconds:<seconds> */
1413 if (sdslen(l) >= 32 &&
1414 !memcmp(l,"master_link_down_since_seconds",30))
1415 {
1416 ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
1417 }
1418
1419 /* role:<role> */
1420 if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
1421 else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
1422
1423 if (role == SRI_SLAVE) {
1424 /* master_host:<host> */
1425 if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
1426 sdsfree(ri->slave_master_host);
1427 ri->slave_master_host = sdsnew(l+12);
1428 }
1429
1430 /* master_port:<port> */
1431 if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
1432 ri->slave_master_port = atoi(l+12);
1433
1434 /* master_link_status:<status> */
1435 if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
1436 ri->slave_master_link_status =
1437 (strcasecmp(l+19,"up") == 0) ?
1438 SENTINEL_MASTER_LINK_STATUS_UP :
1439 SENTINEL_MASTER_LINK_STATUS_DOWN;
1440 }
1441
1442 /* slave_priority:<priority> */
1443 if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
1444 ri->slave_priority = atoi(l+15);
1445 }
1446 }
1447 ri->info_refresh = mstime();
1448 sdsfreesplitres(lines,numlines);
1449
1450 /* ---------------------------- Acting half ----------------------------- */
1451 if (sentinel.tilt) return;
1452
1453 /* Act if a master turned into a slave. */
1454 if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
1455 if ((first_runid || runid_changed) && ri->slave_master_host) {
1456 /* If it is the first time we receive INFO from it, but it's
1457 * a slave while it was configured as a master, we want to monitor
1458 * its master instead. */
1459 sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
1460 "%s %s %d %s %d",
1461 ri->name, ri->addr->ip, ri->addr->port,
1462 ri->slave_master_host, ri->slave_master_port);
1463 sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
1464 ri->slave_master_port);
1465 return;
1466 }
1467 }
1468
1469 /* Act if a slave turned into a master. */
1470 if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
1471 if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1472 (runid_changed || first_runid))
1473 {
1474 /* If a slave turned into master but:
1475 *
1476 * 1) Failover not in progress.
1477 * 2) RunID hs changed, or its the first time we see an INFO output.
1478 *
1479 * We assume this is a reboot with a wrong configuration.
1480 * Log the event and remove the slave. */
1481 int retval;
1482
1483 sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
1484 retval = dictDelete(ri->master->slaves,ri->name);
1485 redisAssert(retval == REDIS_OK);
1486 return;
1487 } else if (ri->flags & SRI_PROMOTED) {
1488 /* If this is a promoted slave we can change state to the
1489 * failover state machine. */
1490 if ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1491 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1492 (ri->master->failover_state ==
1493 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
1494 {
1495 ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
1496 ri->master->failover_state_change_time = mstime();
1497 sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
1498 sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
1499 ri->master,"%@");
1500 sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
1501 "start",ri->master->addr,ri->addr);
1502 }
1503 } else if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) ||
1504 ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
1505 (ri->master->flags & SRI_I_AM_THE_LEADER) &&
1506 ri->master->failover_state ==
1507 SENTINEL_FAILOVER_STATE_WAIT_START))
1508 {
1509 /* No failover in progress? Then it is the start of a failover
1510 * and we are an observer.
1511 *
1512 * We also do that if we are a leader doing a failover, in wait
1513 * start, but well, somebody else started before us. */
1514
1515 if (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) {
1516 sentinelEvent(REDIS_WARNING,"-failover-abort-race",
1517 ri->master, "%@");
1518 sentinelAbortFailover(ri->master);
1519 }
1520
1521 ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
1522 sentinelEvent(REDIS_WARNING,"+failover-detected",ri->master,"%@");
1523 ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
1524 ri->master->failover_state_change_time = mstime();
1525 ri->master->promoted_slave = ri;
1526 ri->flags |= SRI_PROMOTED;
1527 sentinelCallClientReconfScript(ri->master,SENTINEL_OBSERVER,
1528 "start", ri->master->addr,ri->addr);
1529 /* We are an observer, so we can only assume that the leader
1530 * is reconfiguring the slave instances. For this reason we
1531 * set all the instances as RECONF_SENT waiting for progresses
1532 * on this side. */
1533 sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
1534 SRI_RECONF_SENT);
1535 }
1536 }
1537
1538 /* Detect if the slave that is in the process of being reconfigured
1539 * changed state. */
1540 if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
1541 (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
1542 {
1543 /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
1544 if ((ri->flags & SRI_RECONF_SENT) &&
1545 ri->slave_master_host &&
1546 strcmp(ri->slave_master_host,
1547 ri->master->promoted_slave->addr->ip) == 0 &&
1548 ri->slave_master_port == ri->master->promoted_slave->addr->port)
1549 {
1550 ri->flags &= ~SRI_RECONF_SENT;
1551 ri->flags |= SRI_RECONF_INPROG;
1552 sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
1553 }
1554
1555 /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
1556 if ((ri->flags & SRI_RECONF_INPROG) &&
1557 ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
1558 {
1559 ri->flags &= ~SRI_RECONF_INPROG;
1560 ri->flags |= SRI_RECONF_DONE;
1561 sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
1562 /* If we are moving forward (a new slave is now configured)
1563 * we update the change_time as we are conceptually passing
1564 * to the next slave. */
1565 ri->failover_state_change_time = mstime();
1566 }
1567 }
1568 }
1569
1570 void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1571 sentinelRedisInstance *ri = c->data;
1572 redisReply *r;
1573
1574 if (ri) ri->pending_commands--;
1575 if (!reply || !ri) return;
1576 r = reply;
1577
1578 if (r->type == REDIS_REPLY_STRING) {
1579 sentinelRefreshInstanceInfo(ri,r->str);
1580 }
1581 }
1582
1583 /* Just discard the reply. We use this when we are not monitoring the return
1584 * value of the command but its effects directly. */
1585 void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1586 sentinelRedisInstance *ri = c->data;
1587
1588 if (ri) ri->pending_commands--;
1589 }
1590
1591 void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1592 sentinelRedisInstance *ri = c->data;
1593 redisReply *r;
1594
1595 if (ri) ri->pending_commands--;
1596 if (!reply || !ri) return;
1597 r = reply;
1598
1599 if (r->type == REDIS_REPLY_STATUS ||
1600 r->type == REDIS_REPLY_ERROR) {
1601 /* Update the "instance available" field only if this is an
1602 * acceptable reply. */
1603 if (strncmp(r->str,"PONG",4) == 0 ||
1604 strncmp(r->str,"LOADING",7) == 0 ||
1605 strncmp(r->str,"MASTERDOWN",10) == 0)
1606 {
1607 ri->last_avail_time = mstime();
1608 } else {
1609 /* Send a SCRIPT KILL command if the instance appears to be
1610 * down because of a busy script. */
1611 if (strncmp(r->str,"BUSY",4) == 0 &&
1612 (ri->flags & SRI_S_DOWN) &&
1613 !(ri->flags & SRI_SCRIPT_KILL_SENT))
1614 {
1615 redisAsyncCommand(ri->cc,
1616 sentinelDiscardReplyCallback, NULL, "SCRIPT KILL");
1617 ri->flags |= SRI_SCRIPT_KILL_SENT;
1618 }
1619 }
1620 }
1621 ri->last_pong_time = mstime();
1622 }
1623
1624 /* This is called when we get the reply about the PUBLISH command we send
1625 * to the master to advertise this sentinel. */
1626 void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
1627 sentinelRedisInstance *ri = c->data;
1628 redisReply *r;
1629
1630 if (ri) ri->pending_commands--;
1631 if (!reply || !ri) return;
1632 r = reply;
1633
1634 /* Only update pub_time if we actually published our message. Otherwise
1635 * we'll retry against in 100 milliseconds. */
1636 if (r->type != REDIS_REPLY_ERROR)
1637 ri->last_pub_time = mstime();
1638 }
1639
1640 /* This is our Pub/Sub callback for the Hello channel. It's useful in order
1641 * to discover other sentinels attached at the same master. */
1642 void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
1643 sentinelRedisInstance *ri = c->data;
1644 redisReply *r;
1645
1646 if (!reply || !ri) return;
1647 r = reply;
1648
1649 /* Update the last activity in the pubsub channel. Note that since we
1650 * receive our messages as well this timestamp can be used to detect
1651 * if the link is probably diconnected even if it seems otherwise. */
1652 ri->pc_last_activity = mstime();
1653
1654 /* Sanity check in the reply we expect, so that the code that follows
1655 * can avoid to check for details. */
1656 if (r->type != REDIS_REPLY_ARRAY ||
1657 r->elements != 3 ||
1658 r->element[0]->type != REDIS_REPLY_STRING ||
1659 r->element[1]->type != REDIS_REPLY_STRING ||
1660 r->element[2]->type != REDIS_REPLY_STRING ||
1661 strcmp(r->element[0]->str,"message") != 0) return;
1662
1663 /* We are not interested in meeting ourselves */
1664 if (strstr(r->element[2]->str,server.runid) != NULL) return;
1665
1666 {
1667 int numtokens, port, removed, canfailover;
1668 char **token = sdssplitlen(r->element[2]->str,
1669 r->element[2]->len,
1670 ":",1,&numtokens);
1671 sentinelRedisInstance *sentinel;
1672
1673 if (numtokens == 4) {
1674 /* First, try to see if we already have this sentinel. */
1675 port = atoi(token[1]);
1676 canfailover = atoi(token[3]);
1677 sentinel = getSentinelRedisInstanceByAddrAndRunID(
1678 ri->sentinels,token[0],port,token[2]);
1679
1680 if (!sentinel) {
1681 /* If not, remove all the sentinels that have the same runid
1682 * OR the same ip/port, because it's either a restart or a
1683 * network topology change. */
1684 removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
1685 token[2]);
1686 if (removed) {
1687 sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
1688 "%@ #duplicate of %s:%d or %s",
1689 token[0],port,token[2]);
1690 }
1691
1692 /* Add the new sentinel. */
1693 sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
1694 token[0],port,ri->quorum,ri);
1695 if (sentinel) {
1696 sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
1697 /* The runid is NULL after a new instance creation and
1698 * for Sentinels we don't have a later chance to fill it,
1699 * so do it now. */
1700 sentinel->runid = sdsnew(token[2]);
1701 }
1702 }
1703
1704 /* Update the state of the Sentinel. */
1705 if (sentinel) {
1706 sentinel->last_hello_time = mstime();
1707 if (canfailover)
1708 sentinel->flags |= SRI_CAN_FAILOVER;
1709 else
1710 sentinel->flags &= ~SRI_CAN_FAILOVER;
1711 }
1712 }
1713 sdsfreesplitres(token,numtokens);
1714 }
1715 }
1716
1717 void sentinelPingInstance(sentinelRedisInstance *ri) {
1718 mstime_t now = mstime();
1719 mstime_t info_period;
1720 int retval;
1721
1722 /* Return ASAP if we have already a PING or INFO already pending, or
1723 * in the case the instance is not properly connected. */
1724 if (ri->flags & SRI_DISCONNECTED) return;
1725
1726 /* For INFO, PING, PUBLISH that are not critical commands to send we
1727 * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
1728 * want to use a lot of memory just because a link is not working
1729 * properly (note that anyway there is a redundant protection about this,
1730 * that is, the link will be disconnected and reconnected if a long
1731 * timeout condition is detected. */
1732 if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
1733
1734 /* If this is a slave of a master in O_DOWN condition we start sending
1735 * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
1736 * period. In this state we want to closely monitor slaves in case they
1737 * are turned into masters by another Sentinel, or by the sysadmin. */
1738 if ((ri->flags & SRI_SLAVE) &&
1739 (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
1740 info_period = 1000;
1741 } else {
1742 info_period = SENTINEL_INFO_PERIOD;
1743 }
1744
1745 if ((ri->flags & SRI_SENTINEL) == 0 &&
1746 (ri->info_refresh == 0 ||
1747 (now - ri->info_refresh) > info_period))
1748 {
1749 /* Send INFO to masters and slaves, not sentinels. */
1750 retval = redisAsyncCommand(ri->cc,
1751 sentinelInfoReplyCallback, NULL, "INFO");
1752 if (retval != REDIS_OK) return;
1753 ri->pending_commands++;
1754 } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
1755 /* Send PING to all the three kinds of instances. */
1756 retval = redisAsyncCommand(ri->cc,
1757 sentinelPingReplyCallback, NULL, "PING");
1758 if (retval != REDIS_OK) return;
1759 ri->pending_commands++;
1760 } else if ((ri->flags & SRI_MASTER) &&
1761 (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
1762 {
1763 /* PUBLISH hello messages only to masters. */
1764 struct sockaddr_in sa;
1765 socklen_t salen = sizeof(sa);
1766
1767 if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
1768 char myaddr[128];
1769
1770 snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
1771 inet_ntoa(sa.sin_addr), server.port, server.runid,
1772 (ri->flags & SRI_CAN_FAILOVER) != 0);
1773 retval = redisAsyncCommand(ri->cc,
1774 sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
1775 SENTINEL_HELLO_CHANNEL,myaddr);
1776 if (retval != REDIS_OK) return;
1777 ri->pending_commands++;
1778 }
1779 }
1780 }
1781
1782 /* =========================== SENTINEL command ============================= */
1783
1784 const char *sentinelFailoverStateStr(int state) {
1785 switch(state) {
1786 case SENTINEL_FAILOVER_STATE_NONE: return "none";
1787 case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
1788 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
1789 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
1790 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
1791 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
1792 case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
1793 case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
1794 case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
1795 default: return "unknown";
1796 }
1797 }
1798
1799 /* Redis instance to Redis protocol representation. */
1800 void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
1801 char *flags = sdsempty();
1802 void *mbl;
1803 int fields = 0;
1804
1805 mbl = addDeferredMultiBulkLength(c);
1806
1807 addReplyBulkCString(c,"name");
1808 addReplyBulkCString(c,ri->name);
1809 fields++;
1810
1811 addReplyBulkCString(c,"ip");
1812 addReplyBulkCString(c,ri->addr->ip);
1813 fields++;
1814
1815 addReplyBulkCString(c,"port");
1816 addReplyBulkLongLong(c,ri->addr->port);
1817 fields++;
1818
1819 addReplyBulkCString(c,"runid");
1820 addReplyBulkCString(c,ri->runid ? ri->runid : "");
1821 fields++;
1822
1823 addReplyBulkCString(c,"flags");
1824 if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
1825 if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
1826 if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
1827 if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
1828 if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
1829 if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
1830 if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
1831 if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
1832 flags = sdscat(flags,"failover_in_progress,");
1833 if (ri->flags & SRI_I_AM_THE_LEADER)
1834 flags = sdscat(flags,"i_am_the_leader,");
1835 if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
1836 if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
1837 if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
1838 if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
1839
1840 if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
1841 addReplyBulkCString(c,flags);
1842 sdsfree(flags);
1843 fields++;
1844
1845 addReplyBulkCString(c,"pending-commands");
1846 addReplyBulkLongLong(c,ri->pending_commands);
1847 fields++;
1848
1849 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
1850 addReplyBulkCString(c,"failover-state");
1851 addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
1852 fields++;
1853 }
1854
1855 addReplyBulkCString(c,"last-ok-ping-reply");
1856 addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
1857 fields++;
1858
1859 addReplyBulkCString(c,"last-ping-reply");
1860 addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
1861 fields++;
1862
1863 if (ri->flags & SRI_S_DOWN) {
1864 addReplyBulkCString(c,"s-down-time");
1865 addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
1866 fields++;
1867 }
1868
1869 if (ri->flags & SRI_O_DOWN) {
1870 addReplyBulkCString(c,"o-down-time");
1871 addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
1872 fields++;
1873 }
1874
1875 /* Masters and Slaves */
1876 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
1877 addReplyBulkCString(c,"info-refresh");
1878 addReplyBulkLongLong(c,mstime() - ri->info_refresh);
1879 fields++;
1880 }
1881
1882 /* Only masters */
1883 if (ri->flags & SRI_MASTER) {
1884 addReplyBulkCString(c,"num-slaves");
1885 addReplyBulkLongLong(c,dictSize(ri->slaves));
1886 fields++;
1887
1888 addReplyBulkCString(c,"num-other-sentinels");
1889 addReplyBulkLongLong(c,dictSize(ri->sentinels));
1890 fields++;
1891
1892 addReplyBulkCString(c,"quorum");
1893 addReplyBulkLongLong(c,ri->quorum);
1894 fields++;
1895 }
1896
1897 /* Only slaves */
1898 if (ri->flags & SRI_SLAVE) {
1899 addReplyBulkCString(c,"master-link-down-time");
1900 addReplyBulkLongLong(c,ri->master_link_down_time);
1901 fields++;
1902
1903 addReplyBulkCString(c,"master-link-status");
1904 addReplyBulkCString(c,
1905 (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
1906 "ok" : "err");
1907 fields++;
1908
1909 addReplyBulkCString(c,"master-host");
1910 addReplyBulkCString(c,
1911 ri->slave_master_host ? ri->slave_master_host : "?");
1912 fields++;
1913
1914 addReplyBulkCString(c,"master-port");
1915 addReplyBulkLongLong(c,ri->slave_master_port);
1916 fields++;
1917
1918 addReplyBulkCString(c,"slave-priority");
1919 addReplyBulkLongLong(c,ri->slave_priority);
1920 fields++;
1921 }
1922
1923 /* Only sentinels */
1924 if (ri->flags & SRI_SENTINEL) {
1925 addReplyBulkCString(c,"last-hello-message");
1926 addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
1927 fields++;
1928
1929 addReplyBulkCString(c,"can-failover-its-master");
1930 addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
1931 fields++;
1932
1933 if (ri->flags & SRI_MASTER_DOWN) {
1934 addReplyBulkCString(c,"subjective-leader");
1935 addReplyBulkCString(c,ri->leader ? ri->leader : "?");
1936 fields++;
1937 }
1938 }
1939
1940 setDeferredMultiBulkLength(c,mbl,fields*2);
1941 }
1942
1943 /* Output a number of instances contanined inside a dictionary as
1944 * Redis protocol. */
1945 void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
1946 dictIterator *di;
1947 dictEntry *de;
1948
1949 di = dictGetIterator(instances);
1950 addReplyMultiBulkLen(c,dictSize(instances));
1951 while((de = dictNext(di)) != NULL) {
1952 sentinelRedisInstance *ri = dictGetVal(de);
1953
1954 addReplySentinelRedisInstance(c,ri);
1955 }
1956 dictReleaseIterator(di);
1957 }
1958
1959 /* Lookup the named master into sentinel.masters.
1960 * If the master is not found reply to the client with an error and returns
1961 * NULL. */
1962 sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
1963 robj *name)
1964 {
1965 sentinelRedisInstance *ri;
1966
1967 ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
1968 if (!ri) {
1969 addReplyError(c,"No such master with that name");
1970 return NULL;
1971 }
1972 return ri;
1973 }
1974
1975 void sentinelCommand(redisClient *c) {
1976 if (!strcasecmp(c->argv[1]->ptr,"masters")) {
1977 /* SENTINEL MASTERS */
1978 if (c->argc != 2) goto numargserr;
1979
1980 addReplyDictOfRedisInstances(c,sentinel.masters);
1981 } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
1982 /* SENTINEL SLAVES <master-name> */
1983 sentinelRedisInstance *ri;
1984
1985 if (c->argc != 3) goto numargserr;
1986 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1987 return;
1988 addReplyDictOfRedisInstances(c,ri->slaves);
1989 } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
1990 /* SENTINEL SENTINELS <master-name> */
1991 sentinelRedisInstance *ri;
1992
1993 if (c->argc != 3) goto numargserr;
1994 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
1995 return;
1996 addReplyDictOfRedisInstances(c,ri->sentinels);
1997 } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
1998 /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
1999 sentinelRedisInstance *ri;
2000 char *leader = NULL;
2001 long port;
2002 int isdown = 0;
2003
2004 if (c->argc != 4) goto numargserr;
2005 if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
2006 return;
2007 ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
2008 c->argv[2]->ptr,port,NULL);
2009
2010 /* It exists? Is actually a master? Is subjectively down? It's down.
2011 * Note: if we are in tilt mode we always reply with "0". */
2012 if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
2013 (ri->flags & SRI_MASTER))
2014 isdown = 1;
2015 if (ri) leader = sentinelGetSubjectiveLeader(ri);
2016
2017 /* Reply with a two-elements multi-bulk reply: down state, leader. */
2018 addReplyMultiBulkLen(c,2);
2019 addReply(c, isdown ? shared.cone : shared.czero);
2020 addReplyBulkCString(c, leader ? leader : "?");
2021 if (leader) sdsfree(leader);
2022 } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
2023 /* SENTINEL RESET <pattern> */
2024 if (c->argc != 3) goto numargserr;
2025 addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
2026 } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
2027 /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
2028 sentinelRedisInstance *ri;
2029
2030 if (c->argc != 3) goto numargserr;
2031 ri = sentinelGetMasterByName(c->argv[2]->ptr);
2032 if (ri == NULL) {
2033 addReply(c,shared.nullmultibulk);
2034 } else if (ri->info_refresh == 0) {
2035 addReplySds(c,sdsnew("-IDONTKNOW I have not enough information to reply. Please ask another Sentinel.\r\n"));
2036 } else {
2037 sentinelAddr *addr = ri->addr;
2038
2039 if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
2040 addr = ri->promoted_slave->addr;
2041 addReplyMultiBulkLen(c,2);
2042 addReplyBulkCString(c,addr->ip);
2043 addReplyBulkLongLong(c,addr->port);
2044 }
2045 } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
2046 /* SENTINEL FAILOVER <master-name> */
2047 sentinelRedisInstance *ri;
2048
2049 if (c->argc != 3) goto numargserr;
2050 if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
2051 return;
2052 if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
2053 addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
2054 return;
2055 }
2056 if (sentinelSelectSlave(ri) == NULL) {
2057 addReplySds(c,sdsnew("-NOGOODSLAVE No suitable slave to promote\r\n"));
2058 return;
2059 }
2060 sentinelStartFailover(ri,SENTINEL_FAILOVER_STATE_WAIT_START);
2061 ri->flags |= SRI_FORCE_FAILOVER;
2062 addReply(c,shared.ok);
2063 } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
2064 /* SENTINEL PENDING-SCRIPTS */
2065
2066 if (c->argc != 2) goto numargserr;
2067 sentinelPendingScriptsCommand(c);
2068 } else {
2069 addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
2070 (char*)c->argv[1]->ptr);
2071 }
2072 return;
2073
2074 numargserr:
2075 addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
2076 (char*)c->argv[1]->ptr);
2077 }
2078
2079 void sentinelInfoCommand(redisClient *c) {
2080 char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
2081 sds info = sdsempty();
2082 int defsections = !strcasecmp(section,"default");
2083 int sections = 0;
2084
2085 if (c->argc > 2) {
2086 addReply(c,shared.syntaxerr);
2087 return;
2088 }
2089
2090 if (!strcasecmp(section,"server") || defsections) {
2091 if (sections++) info = sdscat(info,"\r\n");
2092 sds serversection = genRedisInfoString("server");
2093 info = sdscatlen(info,serversection,sdslen(serversection));
2094 sdsfree(serversection);
2095 }
2096
2097 if (!strcasecmp(section,"sentinel") || defsections) {
2098 dictIterator *di;
2099 dictEntry *de;
2100 int master_id = 0;
2101
2102 if (sections++) info = sdscat(info,"\r\n");
2103 info = sdscatprintf(info,
2104 "# Sentinel\r\n"
2105 "sentinel_masters:%lu\r\n"
2106 "sentinel_tilt:%d\r\n"
2107 "sentinel_running_scripts:%d\r\n"
2108 "sentinel_scripts_queue_length:%ld\r\n",
2109 dictSize(sentinel.masters),
2110 sentinel.tilt,
2111 sentinel.running_scripts,
2112 listLength(sentinel.scripts_queue));
2113
2114 di = dictGetIterator(sentinel.masters);
2115 while((de = dictNext(di)) != NULL) {
2116 sentinelRedisInstance *ri = dictGetVal(de);
2117 char *status = "ok";
2118
2119 if (ri->flags & SRI_O_DOWN) status = "odown";
2120 else if (ri->flags & SRI_S_DOWN) status = "sdown";
2121 info = sdscatprintf(info,
2122 "master%d:name=%s,status=%s,address=%s:%d,"
2123 "slaves=%lu,sentinels=%lu\r\n",
2124 master_id++, ri->name, status,
2125 ri->addr->ip, ri->addr->port,
2126 dictSize(ri->slaves),
2127 dictSize(ri->sentinels)+1);
2128 }
2129 dictReleaseIterator(di);
2130 }
2131
2132 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
2133 (unsigned long)sdslen(info)));
2134 addReplySds(c,info);
2135 addReply(c,shared.crlf);
2136 }
2137
2138 /* ===================== SENTINEL availability checks ======================= */
2139
2140 /* Is this instance down from our point of view? */
2141 void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
2142 mstime_t elapsed = mstime() - ri->last_avail_time;
2143
2144 /* Check if we are in need for a reconnection of one of the
2145 * links, because we are detecting low activity.
2146 *
2147 * 1) Check if the command link seems connected, was connected not less
2148 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
2149 * idle time that is greater than down_after_period / 2 seconds. */
2150 if (ri->cc &&
2151 (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
2152 (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
2153 {
2154 sentinelKillLink(ri,ri->cc);
2155 }
2156
2157 /* 2) Check if the pubsub link seems connected, was connected not less
2158 * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
2159 * activity in the Pub/Sub channel for more than
2160 * SENTINEL_PUBLISH_PERIOD * 3.
2161 */
2162 if (ri->pc &&
2163 (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
2164 (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
2165 {
2166 sentinelKillLink(ri,ri->pc);
2167 }
2168
2169 /* Update the subjectively down flag. */
2170 if (elapsed > ri->down_after_period) {
2171 /* Is subjectively down */
2172 if ((ri->flags & SRI_S_DOWN) == 0) {
2173 sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
2174 ri->s_down_since_time = mstime();
2175 ri->flags |= SRI_S_DOWN;
2176 }
2177 } else {
2178 /* Is subjectively up */
2179 if (ri->flags & SRI_S_DOWN) {
2180 sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
2181 ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
2182 }
2183 }
2184 }
2185
2186 /* Is this instance down accordingly to the configured quorum? */
2187 void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
2188 dictIterator *di;
2189 dictEntry *de;
2190 int quorum = 0, odown = 0;
2191
2192 if (master->flags & SRI_S_DOWN) {
2193 /* Is down for enough sentinels? */
2194 quorum = 1; /* the current sentinel. */
2195 /* Count all the other sentinels. */
2196 di = dictGetIterator(master->sentinels);
2197 while((de = dictNext(di)) != NULL) {
2198 sentinelRedisInstance *ri = dictGetVal(de);
2199
2200 if (ri->flags & SRI_MASTER_DOWN) quorum++;
2201 }
2202 dictReleaseIterator(di);
2203 if (quorum >= master->quorum) odown = 1;
2204 }
2205
2206 /* Set the flag accordingly to the outcome. */
2207 if (odown) {
2208 if ((master->flags & SRI_O_DOWN) == 0) {
2209 sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
2210 quorum, master->quorum);
2211 master->flags |= SRI_O_DOWN;
2212 master->o_down_since_time = mstime();
2213 }
2214 } else {
2215 if (master->flags & SRI_O_DOWN) {
2216 sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
2217 master->flags &= ~SRI_O_DOWN;
2218 }
2219 }
2220 }
2221
2222 /* Receive the SENTINEL is-master-down-by-addr reply, see the
2223 * sentinelAskMasterStateToOtherSentinels() function for more information. */
2224 void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
2225 sentinelRedisInstance *ri = c->data;
2226 redisReply *r;
2227
2228 if (ri) ri->pending_commands--;
2229 if (!reply || !ri) return;
2230 r = reply;
2231
2232 /* Ignore every error or unexpected reply.
2233 * Note that if the command returns an error for any reason we'll
2234 * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
2235 if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
2236 r->element[0]->type == REDIS_REPLY_INTEGER &&
2237 r->element[1]->type == REDIS_REPLY_STRING)
2238 {
2239 ri->last_master_down_reply_time = mstime();
2240 if (r->element[0]->integer == 1) {
2241 ri->flags |= SRI_MASTER_DOWN;
2242 } else {
2243 ri->flags &= ~SRI_MASTER_DOWN;
2244 }
2245 sdsfree(ri->leader);
2246 ri->leader = sdsnew(r->element[1]->str);
2247 }
2248 }
2249
2250 /* If we think (subjectively) the master is down, we start sending
2251 * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
2252 * in order to get the replies that allow to reach the quorum and
2253 * possibly also mark the master as objectively down. */
2254 void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
2255 dictIterator *di;
2256 dictEntry *de;
2257
2258 di = dictGetIterator(master->sentinels);
2259 while((de = dictNext(di)) != NULL) {
2260 sentinelRedisInstance *ri = dictGetVal(de);
2261 mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
2262 char port[32];
2263 int retval;
2264
2265 /* If the master state from other sentinel is too old, we clear it. */
2266 if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
2267 ri->flags &= ~SRI_MASTER_DOWN;
2268 sdsfree(ri->leader);
2269 ri->leader = NULL;
2270 }
2271
2272 /* Only ask if master is down to other sentinels if:
2273 *
2274 * 1) We believe it is down, or there is a failover in progress.
2275 * 2) Sentinel is connected.
2276 * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
2277 if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
2278 continue;
2279 if (ri->flags & SRI_DISCONNECTED) continue;
2280 if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
2281 continue;
2282
2283 /* Ask */
2284 ll2string(port,sizeof(port),master->addr->port);
2285 retval = redisAsyncCommand(ri->cc,
2286 sentinelReceiveIsMasterDownReply, NULL,
2287 "SENTINEL is-master-down-by-addr %s %s",
2288 master->addr->ip, port);
2289 if (retval == REDIS_OK) ri->pending_commands++;
2290 }
2291 dictReleaseIterator(di);
2292 }
2293
2294 /* =============================== FAILOVER ================================= */
2295
2296 /* Given a master get the "subjective leader", that is, among all the sentinels
2297 * with given characteristics, the one with the lexicographically smaller
2298 * runid. The characteristics required are:
2299 *
2300 * 1) Has SRI_CAN_FAILOVER flag.
2301 * 2) Is not disconnected.
2302 * 3) Recently answered to our ping (no longer than
2303 * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
2304 *
2305 * The function returns a pointer to an sds string representing the runid of the
2306 * leader sentinel instance (from our point of view). Otherwise NULL is
2307 * returned if there are no suitable sentinels.
2308 */
2309
2310 int compareRunID(const void *a, const void *b) {
2311 char **aptrptr = (char**)a, **bptrptr = (char**)b;
2312 return strcasecmp(*aptrptr, *bptrptr);
2313 }
2314
2315 char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
2316 dictIterator *di;
2317 dictEntry *de;
2318 char **instance =
2319 zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
2320 int instances = 0;
2321 char *leader = NULL;
2322
2323 if (master->flags & SRI_CAN_FAILOVER) {
2324 /* Add myself if I'm a Sentinel that can failover this master. */
2325 instance[instances++] = server.runid;
2326 }
2327
2328 di = dictGetIterator(master->sentinels);
2329 while((de = dictNext(di)) != NULL) {
2330 sentinelRedisInstance *ri = dictGetVal(de);
2331 mstime_t lag = mstime() - ri->last_avail_time;
2332
2333 if (lag > SENTINEL_INFO_VALIDITY_TIME ||
2334 !(ri->flags & SRI_CAN_FAILOVER) ||
2335 (ri->flags & SRI_DISCONNECTED) ||
2336 ri->runid == NULL)
2337 continue;
2338 instance[instances++] = ri->runid;
2339 }
2340 dictReleaseIterator(di);
2341
2342 /* If we have at least one instance passing our checks, order the array
2343 * by runid. */
2344 if (instances) {
2345 qsort(instance,instances,sizeof(char*),compareRunID);
2346 leader = sdsnew(instance[0]);
2347 }
2348 zfree(instance);
2349 return leader;
2350 }
2351
2352 struct sentinelLeader {
2353 char *runid;
2354 unsigned long votes;
2355 };
2356
2357 /* Helper function for sentinelGetObjectiveLeader, increment the counter
2358 * relative to the specified runid. */
2359 void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
2360 dictEntry *de = dictFind(counters,runid);
2361 uint64_t oldval;
2362
2363 if (de) {
2364 oldval = dictGetUnsignedIntegerVal(de);
2365 dictSetUnsignedIntegerVal(de,oldval+1);
2366 } else {
2367 de = dictAddRaw(counters,runid);
2368 redisAssert(de != NULL);
2369 dictSetUnsignedIntegerVal(de,1);
2370 }
2371 }
2372
2373 /* Scan all the Sentinels attached to this master to check what is the
2374 * most voted leader among Sentinels. */
2375 char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
2376 dict *counters;
2377 dictIterator *di;
2378 dictEntry *de;
2379 unsigned int voters = 0, voters_quorum;
2380 char *myvote;
2381 char *winner = NULL;
2382
2383 redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
2384 counters = dictCreate(&leaderVotesDictType,NULL);
2385
2386 /* Count my vote. */
2387 myvote = sentinelGetSubjectiveLeader(master);
2388 if (myvote) {
2389 sentinelObjectiveLeaderIncr(counters,myvote);
2390 voters++;
2391 }
2392
2393 /* Count other sentinels votes */
2394 di = dictGetIterator(master->sentinels);
2395 while((de = dictNext(di)) != NULL) {
2396 sentinelRedisInstance *ri = dictGetVal(de);
2397 if (ri->leader == NULL) continue;
2398 /* If the failover is not already in progress we are only interested
2399 * in Sentinels that believe the master is down. Otherwise the leader
2400 * selection is useful for the "failover-takedown" when the original
2401 * leader fails. In that case we consider all the voters. */
2402 if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
2403 !(ri->flags & SRI_MASTER_DOWN)) continue;
2404 sentinelObjectiveLeaderIncr(counters,ri->leader);
2405 voters++;
2406 }
2407 dictReleaseIterator(di);
2408 voters_quorum = voters/2+1;
2409
2410 /* Check what's the winner. For the winner to win, it needs two conditions:
2411 * 1) Absolute majority between voters (50% + 1).
2412 * 2) And anyway at least master->quorum votes. */
2413 {
2414 uint64_t max_votes = 0; /* Max votes so far. */
2415
2416 di = dictGetIterator(counters);
2417 while((de = dictNext(di)) != NULL) {
2418 uint64_t votes = dictGetUnsignedIntegerVal(de);
2419
2420 if (max_votes < votes) {
2421 max_votes = votes;
2422 winner = dictGetKey(de);
2423 }
2424 }
2425 dictReleaseIterator(di);
2426 if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
2427 winner = NULL;
2428 }
2429 winner = winner ? sdsnew(winner) : NULL;
2430 sdsfree(myvote);
2431 dictRelease(counters);
2432 return winner;
2433 }
2434
2435 /* Setup the master state to start a failover as a leader.
2436 *
2437 * State can be either:
2438 *
2439 * SENTINEL_FAILOVER_STATE_WAIT_START: starts a failover from scratch.
2440 * SENTINEL_FAILOVER_STATE_RECONF_SLAVES: takedown a failed failover.
2441 */
2442 void sentinelStartFailover(sentinelRedisInstance *master, int state) {
2443 redisAssert(master->flags & SRI_MASTER);
2444 redisAssert(state == SENTINEL_FAILOVER_STATE_WAIT_START ||
2445 state == SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
2446
2447 master->failover_state = state;
2448 master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
2449 sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
2450
2451 /* Pick a random delay if it's a fresh failover (WAIT_START), and not
2452 * a recovery of a failover started by another sentinel. */
2453 if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
2454 master->failover_start_time = mstime() +
2455 SENTINEL_FAILOVER_FIXED_DELAY +
2456 (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
2457 sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
2458 "%@ #starting in %lld milliseconds",
2459 master->failover_start_time-mstime());
2460 }
2461 master->failover_state_change_time = mstime();
2462 }
2463
2464 /* This function checks if there are the conditions to start the failover,
2465 * that is:
2466 *
2467 * 1) Enough time has passed since O_DOWN.
2468 * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
2469 * 3) We are the objectively leader for this master.
2470 *
2471 * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
2472 * and SRI_I_AM_THE_LEADER.
2473 */
2474 void sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
2475 char *leader;
2476 int isleader;
2477
2478 /* We can't failover if the master is not in O_DOWN state or if
2479 * there is not already a failover in progress (to perform the
2480 * takedown if the leader died) or if this Sentinel is not allowed
2481 * to start a failover. */
2482 if (!(master->flags & SRI_CAN_FAILOVER) ||
2483 !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
2484
2485 leader = sentinelGetObjectiveLeader(master);
2486 isleader = leader && strcasecmp(leader,server.runid) == 0;
2487 sdsfree(leader);
2488
2489 /* If I'm not the leader, I can't failover for sure. */
2490 if (!isleader) return;
2491
2492 /* If the failover is already in progress there are two options... */
2493 if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
2494 if (master->flags & SRI_I_AM_THE_LEADER) {
2495 /* 1) I'm flagged as leader so I already started the failover.
2496 * Just return. */
2497 return;
2498 } else {
2499 mstime_t elapsed = mstime() - master->failover_state_change_time;
2500
2501 /* 2) I'm the new leader, but I'm not flagged as leader in the
2502 * master: I did not started the failover, but the original
2503 * leader has no longer the leadership.
2504 *
2505 * In this case if the failover appears to be lagging
2506 * for at least 25% of the configured failover timeout,
2507 * I can assume I can take control. Otherwise
2508 * it's better to return and wait more. */
2509 if (elapsed < (master->failover_timeout/4)) return;
2510 sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
2511 /* We have already an elected slave if we are in
2512 * FAILOVER_IN_PROGRESS state, that is, the slave that we
2513 * observed turning into a master. */
2514 sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
2515 /* As an observer we flagged all the slaves as RECONF_SENT but
2516 * now we are in charge of actually sending the reconfiguration
2517 * command so let's clear this flag for all the instances. */
2518 sentinelDelFlagsToDictOfRedisInstances(master->slaves,
2519 SRI_RECONF_SENT);
2520 }
2521 } else {
2522 /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set.
2523 *
2524 * Do we have a slave to promote? Otherwise don't start a failover
2525 * at all. */
2526 if (sentinelSelectSlave(master) == NULL) return;
2527 sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_WAIT_START);
2528 }
2529 }
2530
2531 /* Select a suitable slave to promote. The current algorithm only uses
2532 * the following parameters:
2533 *
2534 * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
2535 * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
2536 * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
2537 * 4) master_link_down_time no more than:
2538 * (now - master->s_down_since_time) + (master->down_after_period * 10).
2539 * 5) Slave priority can't be zero, otherwise the slave is discareded.
2540 *
2541 * Among all the slaves matching the above conditions we select the slave
2542 * with lower slave_priority. If priority is the same we select the slave
2543 * with lexicographically smaller runid.
2544 *
2545 * The function returns the pointer to the selected slave, otherwise
2546 * NULL if no suitable slave was found.
2547 */
2548
2549 int compareSlavesForPromotion(const void *a, const void *b) {
2550 sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
2551 **sb = (sentinelRedisInstance **)b;
2552 char *sa_runid, *sb_runid;
2553
2554 if ((*sa)->slave_priority != (*sb)->slave_priority)
2555 return (*sa)->slave_priority - (*sb)->slave_priority;
2556
2557 /* If priority is the same, select the slave with that has the
2558 * lexicographically smaller runid. Note that we try to handle runid
2559 * == NULL as there are old Redis versions that don't publish runid in
2560 * INFO. A NULL runid is considered bigger than any other runid. */
2561 sa_runid = (*sa)->runid;
2562 sb_runid = (*sb)->runid;
2563 if (sa_runid == NULL && sb_runid == NULL) return 0;
2564 else if (sa_runid == NULL) return 1; /* a > b */
2565 else if (sb_runid == NULL) return -1; /* a < b */
2566 return strcasecmp(sa_runid, sb_runid);
2567 }
2568
2569 sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
2570 sentinelRedisInstance **instance =
2571 zmalloc(sizeof(instance[0])*dictSize(master->slaves));
2572 sentinelRedisInstance *selected = NULL;
2573 int instances = 0;
2574 dictIterator *di;
2575 dictEntry *de;
2576 mstime_t max_master_down_time = 0;
2577
2578 if (master->flags & SRI_S_DOWN)
2579 max_master_down_time += mstime() - master->s_down_since_time;
2580 max_master_down_time += master->down_after_period * 10;
2581
2582 di = dictGetIterator(master->slaves);
2583 while((de = dictNext(di)) != NULL) {
2584 sentinelRedisInstance *slave = dictGetVal(de);
2585 mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
2586
2587 if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
2588 if (slave->last_avail_time < info_validity_time) continue;
2589 if (slave->slave_priority == 0) continue;
2590
2591 /* If the master is in SDOWN state we get INFO for slaves every second.
2592 * Otherwise we get it with the usual period so we need to account for
2593 * a larger delay. */
2594 if ((master->flags & SRI_S_DOWN) == 0)
2595 info_validity_time -= SENTINEL_INFO_PERIOD;
2596 if (slave->info_refresh < info_validity_time) continue;
2597 if (slave->master_link_down_time > max_master_down_time) continue;
2598 instance[instances++] = slave;
2599 }
2600 dictReleaseIterator(di);
2601 if (instances) {
2602 qsort(instance,instances,sizeof(sentinelRedisInstance*),
2603 compareSlavesForPromotion);
2604 selected = instance[0];
2605 }
2606 zfree(instance);
2607 return selected;
2608 }
2609
2610 /* ---------------- Failover state machine implementation ------------------- */
2611 void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
2612 /* If we in "wait start" but the master is no longer in ODOWN nor in
2613 * SDOWN condition we abort the failover. This is important as it
2614 * prevents a useless failover in a a notable case of netsplit, where
2615 * the senitnels are split from the redis instances. In this case
2616 * the failover will not start while there is the split because no
2617 * good slave can be reached. However when the split is resolved, we
2618 * can go to waitstart if the slave is back rechable a few milliseconds
2619 * before the master is. In that case when the master is back online
2620 * we cancel the failover. */
2621 if ((ri->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_FORCE_FAILOVER)) == 0) {
2622 sentinelEvent(REDIS_WARNING,"-failover-abort-master-is-back",
2623 ri,"%@");
2624 sentinelAbortFailover(ri);
2625 return;
2626 }
2627
2628 /* Start the failover going to the next state if enough time has
2629 * elapsed. */
2630 if (mstime() >= ri->failover_start_time) {
2631 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2632 ri->failover_state_change_time = mstime();
2633 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2634 }
2635 }
2636
2637 void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
2638 sentinelRedisInstance *slave = sentinelSelectSlave(ri);
2639
2640 if (slave == NULL) {
2641 sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
2642 sentinelAbortFailover(ri);
2643 } else {
2644 sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
2645 slave->flags |= SRI_PROMOTED;
2646 ri->promoted_slave = slave;
2647 ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
2648 ri->failover_state_change_time = mstime();
2649 sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
2650 slave, "%@");
2651 }
2652 }
2653
2654 void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
2655 int retval;
2656
2657 if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
2658
2659 /* Send SLAVEOF NO ONE command to turn the slave into a master.
2660 * We actually register a generic callback for this command as we don't
2661 * really care about the reply. We check if it worked indirectly observing
2662 * if INFO returns a different role (master instead of slave). */
2663 retval = redisAsyncCommand(ri->promoted_slave->cc,
2664 sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
2665 if (retval != REDIS_OK) return;
2666 ri->promoted_slave->pending_commands++;
2667 sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
2668 ri->promoted_slave,"%@");
2669 ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
2670 ri->failover_state_change_time = mstime();
2671 }
2672
2673 /* We actually wait for promotion indirectly checking with INFO when the
2674 * slave turns into a master. */
2675 void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
2676 mstime_t elapsed = mstime() - ri->failover_state_change_time;
2677
2678 if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
2679 sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
2680 "%@");
2681 sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
2682 ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
2683 ri->failover_state_change_time = mstime();
2684 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2685 ri->promoted_slave = NULL;
2686 }
2687 }
2688
2689 void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
2690 int not_reconfigured = 0, timeout = 0;
2691 dictIterator *di;
2692 dictEntry *de;
2693 mstime_t elapsed = mstime() - master->failover_state_change_time;
2694
2695 /* We can't consider failover finished if the promoted slave is
2696 * not reachable. */
2697 if (master->promoted_slave == NULL ||
2698 master->promoted_slave->flags & SRI_S_DOWN) return;
2699
2700 /* The failover terminates once all the reachable slaves are properly
2701 * configured. */
2702 di = dictGetIterator(master->slaves);
2703 while((de = dictNext(di)) != NULL) {
2704 sentinelRedisInstance *slave = dictGetVal(de);
2705
2706 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2707 if (slave->flags & SRI_S_DOWN) continue;
2708 not_reconfigured++;
2709 }
2710 dictReleaseIterator(di);
2711
2712 /* Force end of failover on timeout. */
2713 if (elapsed > master->failover_timeout) {
2714 not_reconfigured = 0;
2715 timeout = 1;
2716 sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
2717 }
2718
2719 if (not_reconfigured == 0) {
2720 int role = (master->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
2721 SENTINEL_OBSERVER;
2722
2723 sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
2724 master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
2725 master->failover_state_change_time = mstime();
2726 sentinelCallClientReconfScript(master,role,"end",master->addr,
2727 master->promoted_slave->addr);
2728 }
2729
2730 /* If I'm the leader it is a good idea to send a best effort SLAVEOF
2731 * command to all the slaves still not reconfigured to replicate with
2732 * the new master. */
2733 if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
2734 dictIterator *di;
2735 dictEntry *de;
2736 char master_port[32];
2737
2738 ll2string(master_port,sizeof(master_port),
2739 master->promoted_slave->addr->port);
2740
2741 di = dictGetIterator(master->slaves);
2742 while((de = dictNext(di)) != NULL) {
2743 sentinelRedisInstance *slave = dictGetVal(de);
2744 int retval;
2745
2746 if (slave->flags &
2747 (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
2748
2749 retval = redisAsyncCommand(slave->cc,
2750 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2751 master->promoted_slave->addr->ip,
2752 master_port);
2753 if (retval == REDIS_OK) {
2754 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
2755 slave->flags |= SRI_RECONF_SENT;
2756 }
2757 }
2758 dictReleaseIterator(di);
2759 }
2760 }
2761
2762 /* Send SLAVE OF <new master address> to all the remaining slaves that
2763 * still don't appear to have the configuration updated. */
2764 void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
2765 dictIterator *di;
2766 dictEntry *de;
2767 int in_progress = 0;
2768
2769 di = dictGetIterator(master->slaves);
2770 while((de = dictNext(di)) != NULL) {
2771 sentinelRedisInstance *slave = dictGetVal(de);
2772
2773 if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
2774 in_progress++;
2775 }
2776 dictReleaseIterator(di);
2777
2778 di = dictGetIterator(master->slaves);
2779 while(in_progress < master->parallel_syncs &&
2780 (de = dictNext(di)) != NULL)
2781 {
2782 sentinelRedisInstance *slave = dictGetVal(de);
2783 int retval;
2784 char master_port[32];
2785
2786 /* Skip the promoted slave, and already configured slaves. */
2787 if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
2788
2789 /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
2790 * the slave moving forward to the next state. */
2791 if ((slave->flags & SRI_RECONF_SENT) &&
2792 (mstime() - slave->slave_reconf_sent_time) >
2793 SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
2794 {
2795 sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
2796 slave->flags &= ~SRI_RECONF_SENT;
2797 }
2798
2799 /* Nothing to do for instances that are disconnected or already
2800 * in RECONF_SENT state. */
2801 if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
2802 continue;
2803
2804 /* Send SLAVEOF <new master>. */
2805 ll2string(master_port,sizeof(master_port),
2806 master->promoted_slave->addr->port);
2807 retval = redisAsyncCommand(slave->cc,
2808 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2809 master->promoted_slave->addr->ip,
2810 master_port);
2811 if (retval == REDIS_OK) {
2812 slave->flags |= SRI_RECONF_SENT;
2813 slave->pending_commands++;
2814 slave->slave_reconf_sent_time = mstime();
2815 sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
2816 in_progress++;
2817 }
2818 }
2819 dictReleaseIterator(di);
2820 sentinelFailoverDetectEnd(master);
2821 }
2822
2823 /* This function is called when the slave is in
2824 * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
2825 * to remove it from the master table and add the promoted slave instead.
2826 *
2827 * If there are no promoted slaves as this instance is unique, we remove
2828 * and re-add it with the same address to trigger a complete state
2829 * refresh. */
2830 void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
2831 sentinelRedisInstance *ref = master->promoted_slave ?
2832 master->promoted_slave : master;
2833
2834 sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
2835 master->name, master->addr->ip, master->addr->port,
2836 ref->addr->ip, ref->addr->port);
2837
2838 sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
2839 }
2840
2841 void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
2842 redisAssert(ri->flags & SRI_MASTER);
2843
2844 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
2845
2846 switch(ri->failover_state) {
2847 case SENTINEL_FAILOVER_STATE_WAIT_START:
2848 sentinelFailoverWaitStart(ri);
2849 break;
2850 case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
2851 sentinelFailoverSelectSlave(ri);
2852 break;
2853 case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
2854 sentinelFailoverSendSlaveOfNoOne(ri);
2855 break;
2856 case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
2857 sentinelFailoverWaitPromotion(ri);
2858 break;
2859 case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
2860 sentinelFailoverReconfNextSlave(ri);
2861 break;
2862 case SENTINEL_FAILOVER_STATE_DETECT_END:
2863 sentinelFailoverDetectEnd(ri);
2864 break;
2865 }
2866 }
2867
2868 /* Abort a failover in progress with the following steps:
2869 * 1) If this instance is the leaer send a SLAVEOF command to all the already
2870 * reconfigured slaves if any to configure them to replicate with the
2871 * original master.
2872 * 2) For both leaders and observers: clear the failover flags and state in
2873 * the master instance.
2874 * 3) If there is already a promoted slave and we are the leader, and this
2875 * slave is not DISCONNECTED, try to reconfigure it to replicate
2876 * back to the master as well, sending a best effort SLAVEOF command.
2877 */
2878 void sentinelAbortFailover(sentinelRedisInstance *ri) {
2879 char master_port[32];
2880 dictIterator *di;
2881 dictEntry *de;
2882 int sentinel_role;
2883
2884 redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
2885 ll2string(master_port,sizeof(master_port),ri->addr->port);
2886
2887 /* Clear failover related flags from slaves.
2888 * Also if we are the leader make sure to send SLAVEOF commands to all the
2889 * already reconfigured slaves in order to turn them back into slaves of
2890 * the original master. */
2891 di = dictGetIterator(ri->slaves);
2892 while((de = dictNext(di)) != NULL) {
2893 sentinelRedisInstance *slave = dictGetVal(de);
2894 if ((ri->flags & SRI_I_AM_THE_LEADER) &&
2895 !(slave->flags & SRI_DISCONNECTED) &&
2896 (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
2897 SRI_RECONF_DONE)))
2898 {
2899 int retval;
2900
2901 retval = redisAsyncCommand(slave->cc,
2902 sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
2903 ri->addr->ip,
2904 master_port);
2905 if (retval == REDIS_OK)
2906 sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
2907 }
2908 slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
2909 }
2910 dictReleaseIterator(di);
2911
2912 sentinel_role = (ri->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
2913 SENTINEL_OBSERVER;
2914 ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER|SRI_FORCE_FAILOVER);
2915 ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
2916 ri->failover_state_change_time = mstime();
2917 if (ri->promoted_slave) {
2918 sentinelCallClientReconfScript(ri,sentinel_role,"abort",
2919 ri->promoted_slave->addr,ri->addr);
2920 ri->promoted_slave->flags &= ~SRI_PROMOTED;
2921 ri->promoted_slave = NULL;
2922 }
2923 }
2924
2925 /* The following is called only for master instances and will abort the
2926 * failover process if:
2927 *
2928 * 1) The failover is in progress.
2929 * 2) We already promoted a slave.
2930 * 3) The promoted slave is in extended SDOWN condition.
2931 */
2932 void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
2933 /* Failover is in progress? Do we have a promoted slave? */
2934 if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
2935
2936 /* Is the promoted slave into an extended SDOWN state? */
2937 if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
2938 (mstime() - ri->promoted_slave->s_down_since_time) <
2939 (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
2940
2941 sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
2942 sentinelAbortFailover(ri);
2943 }
2944
2945 /* ======================== SENTINEL timer handler ==========================
2946 * This is the "main" our Sentinel, being sentinel completely non blocking
2947 * in design. The function is called every second.
2948 * -------------------------------------------------------------------------- */
2949
2950 /* Perform scheduled operations for the specified Redis instance. */
2951 void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
2952 /* ========== MONITORING HALF ============ */
2953 /* Every kind of instance */
2954 sentinelReconnectInstance(ri);
2955 sentinelPingInstance(ri);
2956
2957 /* Masters and slaves */
2958 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2959 /* Nothing so far. */
2960 }
2961
2962 /* Only masters */
2963 if (ri->flags & SRI_MASTER) {
2964 sentinelAskMasterStateToOtherSentinels(ri);
2965 }
2966
2967 /* ============== ACTING HALF ============= */
2968 /* We don't proceed with the acting half if we are in TILT mode.
2969 * TILT happens when we find something odd with the time, like a
2970 * sudden change in the clock. */
2971 if (sentinel.tilt) {
2972 if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
2973 sentinel.tilt = 0;
2974 sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
2975 }
2976
2977 /* Every kind of instance */
2978 sentinelCheckSubjectivelyDown(ri);
2979
2980 /* Masters and slaves */
2981 if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
2982 /* Nothing so far. */
2983 }
2984
2985 /* Only masters */
2986 if (ri->flags & SRI_MASTER) {
2987 sentinelCheckObjectivelyDown(ri);
2988 sentinelStartFailoverIfNeeded(ri);
2989 sentinelFailoverStateMachine(ri);
2990 sentinelAbortFailoverIfNeeded(ri);
2991 }
2992 }
2993
2994 /* Perform scheduled operations for all the instances in the dictionary.
2995 * Recursively call the function against dictionaries of slaves. */
2996 void sentinelHandleDictOfRedisInstances(dict *instances) {
2997 dictIterator *di;
2998 dictEntry *de;
2999 sentinelRedisInstance *switch_to_promoted = NULL;
3000
3001 /* There are a number of things we need to perform against every master. */
3002 di = dictGetIterator(instances);
3003 while((de = dictNext(di)) != NULL) {
3004 sentinelRedisInstance *ri = dictGetVal(de);
3005
3006 sentinelHandleRedisInstance(ri);
3007 if (ri->flags & SRI_MASTER) {
3008 sentinelHandleDictOfRedisInstances(ri->slaves);
3009 sentinelHandleDictOfRedisInstances(ri->sentinels);
3010 if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
3011 switch_to_promoted = ri;
3012 }
3013 }
3014 }
3015 if (switch_to_promoted)
3016 sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
3017 dictReleaseIterator(di);
3018 }
3019
3020 /* This function checks if we need to enter the TITL mode.
3021 *
3022 * The TILT mode is entered if we detect that between two invocations of the
3023 * timer interrupt, a negative amount of time, or too much time has passed.
3024 * Note that we expect that more or less just 100 milliseconds will pass
3025 * if everything is fine. However we'll see a negative number or a
3026 * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
3027 * following conditions happen:
3028 *
3029 * 1) The Sentiel process for some time is blocked, for every kind of
3030 * random reason: the load is huge, the computer was freezed for some time
3031 * in I/O or alike, the process was stopped by a signal. Everything.
3032 * 2) The system clock was altered significantly.
3033 *
3034 * Under both this conditions we'll see everything as timed out and failing
3035 * without good reasons. Instead we enter the TILT mode and wait
3036 * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
3037 *
3038 * During TILT time we still collect information, we just do not act. */
3039 void sentinelCheckTiltCondition(void) {
3040 mstime_t now = mstime();
3041 mstime_t delta = now - sentinel.previous_time;
3042
3043 if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
3044 sentinel.tilt = 1;
3045 sentinel.tilt_start_time = mstime();
3046 sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
3047 }
3048 sentinel.previous_time = mstime();
3049 }
3050
3051 void sentinelTimer(void) {
3052 sentinelCheckTiltCondition();
3053 sentinelHandleDictOfRedisInstances(sentinel.masters);
3054 sentinelRunPendingScripts();
3055 sentinelCollectTerminatedScripts();
3056 sentinelKillTimedoutScripts();
3057 }
3058