#include <ctype.h>
#include <arpa/inet.h>
#include <sys/socket.h>
+#include <sys/wait.h>
+
+extern char **environ;
#define REDIS_SENTINEL_PORT 26379
#define SENTINEL_MASTER_LINK_STATUS_UP 0
#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
+/* Generic flags that can be used with different functions. */
+#define SENTINEL_NO_FLAGS 0
+#define SENTINEL_GENERATE_EVENT 1
+
+/* Script execution flags and limits. */
+#define SENTINEL_SCRIPT_NONE 0
+#define SENTINEL_SCRIPT_RUNNING 1
+#define SENTINEL_SCRIPT_MAX_QUEUE 256
+#define SENTINEL_SCRIPT_MAX_RUNNING 16
+#define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
+#define SENTINEL_SCRIPT_MAX_RETRY 10
+#define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
+
typedef struct sentinelRedisInstance {
int flags; /* See SRI_... defines */
char *name; /* Master name from the point of view of this sentinel. */
struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
/* Scripts executed to notify admin or reconfigure clients: when they
* are set to NULL no script is executed. */
- char *notify_script;
+ char *notification_script;
char *client_reconfig_script;
} sentinelRedisInstance;
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
int tilt; /* Are we in TILT mode? */
+ int running_scripts; /* Number of scripts in execution right now. */
mstime_t tilt_start_time; /* When TITL started. */
mstime_t previous_time; /* Time last time we ran the time handler. */
+ list *scripts_queue; /* Queue of user scripts to execute. */
} sentinel;
+/* A script execution job. */
+typedef struct sentinelScriptJob {
+ int flags; /* Script job flags: SENTINEL_SCRIPT_* */
+ int retry_num; /* Number of times we tried to execute it. */
+ char **argv; /* Arguments to call the script. */
+ mstime_t start_time; /* Script execution time if the script is running,
+ otherwise 0 if we are allowed to retry the
+ execution at any time. If the script is not
+ running and it's not 0, it means: do not run
+ before the specified time. */
+ pid_t pid; /* Script execution pid. */
+} sentinelScriptJob;
+
/* ======================= hiredis ae.c adapters =============================
* Note: this implementation is taken from hiredis/adapters/ae.h, however
* we have our modified copy for Sentinel in order to use our allocator
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
int yesnotoi(char *s);
void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
+void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
+void sentinelAbortFailover(sentinelRedisInstance *ri);
+void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
+sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
+void sentinelScheduleScriptExecution(char *path, ...);
/* ========================= Dictionary types =============================== */
sentinel.tilt = 0;
sentinel.tilt_start_time = mstime();
sentinel.previous_time = mstime();
+ sentinel.running_scripts = 0;
+ sentinel.scripts_queue = listCreate();
}
/* ============================== sentinelAddr ============================== */
/* =========================== Events notification ========================== */
-void sentinelCallNotificationScript(char *scriptpath, char *type, char *msg) {
- /* TODO: implement it. */
-}
-
/* Send an event to log, pub/sub, user notification script.
*
* 'level' is the log level for logging. Only REDIS_WARNING events will trigger
if (level == REDIS_WARNING && ri != NULL) {
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
ri : ri->master;
- if (master->notify_script) {
- sentinelCallNotificationScript(master->notify_script,type,msg);
+ if (master->notification_script) {
+ sentinelScheduleScriptExecution(master->notification_script,
+ type,msg,NULL);
+ }
+ }
+}
+
+/* ============================ script execution ============================ */
+
+/* Release a script job structure and all the associated data. */
+void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
+ int j = 0;
+
+ while(sj->argv[j]) sdsfree(sj->argv[j++]);
+ zfree(sj->argv);
+ zfree(sj);
+}
+
+#define SENTINEL_SCRIPT_MAX_ARGS 16
+void sentinelScheduleScriptExecution(char *path, ...) {
+ va_list ap;
+ char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
+ int argc = 1;
+ sentinelScriptJob *sj;
+
+ va_start(ap, path);
+ while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
+ argv[argc] = va_arg(ap,char*);
+ if (!argv[argc]) break;
+ argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
+ argc++;
+ }
+ va_end(ap);
+ argv[0] = sdsnew(path);
+
+ sj = zmalloc(sizeof(*sj));
+ sj->flags = SENTINEL_SCRIPT_NONE;
+ sj->retry_num = 0;
+ sj->argv = zmalloc(sizeof(char*)*(argc+1));
+ sj->start_time = 0;
+ sj->pid = 0;
+ memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
+
+ listAddNodeTail(sentinel.scripts_queue,sj);
+
+ /* Remove the oldest non running script if we already hit the limit. */
+ if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
+ listNode *ln;
+ listIter li;
+
+ listRewind(sentinel.scripts_queue,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ sj = ln->value;
+
+ if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
+ /* The first node is the oldest as we add on tail. */
+ listDelNode(sentinel.scripts_queue,ln);
+ sentinelReleaseScriptJob(sj);
+ break;
+ }
+ redisAssert(listLength(sentinel.scripts_queue) <=
+ SENTINEL_SCRIPT_MAX_QUEUE);
+ }
+}
+
+/* Lookup a script in the scripts queue via pid, and returns the list node
+ * (so that we can easily remove it from the queue if needed). */
+listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
+ listNode *ln;
+ listIter li;
+
+ listRewind(sentinel.scripts_queue,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ sentinelScriptJob *sj = ln->value;
+
+ if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
+ return ln;
+ }
+ return NULL;
+}
+
+/* Run pending scripts if we are not already at max number of running
+ * scripts. */
+void sentinelRunPendingScripts(void) {
+ listNode *ln;
+ listIter li;
+ mstime_t now = mstime();
+
+ /* Find jobs that are not running and run them, from the top to the
+ * tail of the queue, so we run older jobs first. */
+ listRewind(sentinel.scripts_queue,&li);
+ while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
+ (ln = listNext(&li)) != NULL)
+ {
+ sentinelScriptJob *sj = ln->value;
+ pid_t pid;
+
+ /* Skip if already running. */
+ if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
+
+ /* Skip if it's a retry, but not enough time has elapsed. */
+ if (sj->start_time && sj->start_time > now) continue;
+
+ sj->flags |= SENTINEL_SCRIPT_RUNNING;
+ sj->start_time = mstime();
+ sj->retry_num++;
+ pid = fork();
+
+ if (pid == -1) {
+ /* Parent (fork error).
+ * We report fork errors as signal 99, in order to unify the
+ * reporting with other kind of errors. */
+ sentinelEvent(REDIS_WARNING,"-script-error",NULL,
+ "%s %d %d", sj->argv[0], 99, 0);
+ sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
+ sj->pid = 0;
+ } else if (pid == 0) {
+ /* Child */
+ execve(sj->argv[0],sj->argv,environ);
+ /* If we are here an error occurred. */
+ _exit(2); /* Don't retry execution. */
+ } else {
+ sentinel.running_scripts++;
+ sj->pid = pid;
+ sentinelEvent(REDIS_DEBUG,"+script-child",NULL,"%ld",(long)pid);
+ }
+ }
+}
+
+/* How much to delay the execution of a script that we need to retry after
+ * an error?
+ *
+ * We double the retry delay for every further retry we do. So for instance
+ * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
+ * starting from the second attempt to execute the script the delays are:
+ * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
+mstime_t sentinelScriptRetryDelay(int retry_num) {
+ mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
+
+ while (retry_num-- > 1) delay *= 2;
+ return delay;
+}
+
+/* Check for scripts that terminated, and remove them from the queue if the
+ * script terminated successfully. If instead the script was terminated by
+ * a signal, or returned exit code "1", it is scheduled to run again if
+ * the max number of retries did not already elapsed. */
+void sentinelCollectTerminatedScripts(void) {
+ int statloc;
+ pid_t pid;
+
+ while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
+ int exitcode = WEXITSTATUS(statloc);
+ int bysignal = 0;
+ listNode *ln;
+ sentinelScriptJob *sj;
+
+ if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
+ sentinelEvent(REDIS_DEBUG,"-script-child",NULL,"%ld %d %d",
+ (long)pid, exitcode, bysignal);
+
+ ln = sentinelGetScriptListNodeByPid(pid);
+ if (ln == NULL) {
+ redisLog(REDIS_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
+ continue;
+ }
+ sj = ln->value;
+
+ /* If the script was terminated by a signal or returns an
+ * exit code of "1" (that means: please retry), we reschedule it
+ * if the max number of retries is not already reached. */
+ if ((bysignal || exitcode == 1) &&
+ sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
+ {
+ sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
+ sj->pid = 0;
+ sj->start_time = mstime() +
+ sentinelScriptRetryDelay(sj->retry_num);
+ } else {
+ /* Otherwise let's remove the script, but log the event if the
+ * execution did not terminated in the best of the ways. */
+ if (bysignal || exitcode != 0) {
+ sentinelEvent(REDIS_WARNING,"-script-error",NULL,
+ "%s %d %d", sj->argv[0], bysignal, exitcode);
+ }
+ listDelNode(sentinel.scripts_queue,ln);
+ sentinelReleaseScriptJob(sj);
+ sentinel.running_scripts--;
}
}
}
+/* Kill scripts in timeout, they'll be collected by the
+ * sentinelCollectTerminatedScripts() function. */
+void sentinelKillTimedoutScripts(void) {
+ listNode *ln;
+ listIter li;
+ mstime_t now = mstime();
+
+ listRewind(sentinel.scripts_queue,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ sentinelScriptJob *sj = ln->value;
+
+ if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
+ (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
+ {
+ sentinelEvent(REDIS_WARNING,"-script-timeout",NULL,"%s %ld",
+ sj->argv[0], (long)sj->pid);
+ kill(sj->pid,SIGKILL);
+ }
+ }
+}
+
+/* Implements SENTINEL PENDING-SCRIPTS command. */
+void sentinelPendingScriptsCommand(redisClient *c) {
+ listNode *ln;
+ listIter li;
+
+ addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
+ listRewind(sentinel.scripts_queue,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ sentinelScriptJob *sj = ln->value;
+ int j = 0;
+
+ addReplyMultiBulkLen(c,10);
+
+ addReplyBulkCString(c,"argv");
+ while (sj->argv[j]) j++;
+ addReplyMultiBulkLen(c,j);
+ j = 0;
+ while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
+
+ addReplyBulkCString(c,"flags");
+ addReplyBulkCString(c,
+ (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
+
+ addReplyBulkCString(c,"pid");
+ addReplyBulkLongLong(c,sj->pid);
+
+ if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
+ addReplyBulkCString(c,"run-time");
+ addReplyBulkLongLong(c,mstime() - sj->start_time);
+ } else {
+ mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
+ if (delay < 0) delay = 0;
+ addReplyBulkCString(c,"run-delay");
+ addReplyBulkLongLong(c,delay);
+ }
+
+ addReplyBulkCString(c,"retry-num");
+ addReplyBulkLongLong(c,sj->retry_num);
+ }
+}
+
/* ========================== sentinelRedisInstance ========================= */
/* Create a redis instance, the following fields must be populated by the
ri->failover_start_time = 0;
ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
ri->promoted_slave = NULL;
- ri->notify_script = NULL;
+ ri->notification_script = NULL;
ri->client_reconfig_script = NULL;
/* Add into the right table. */
dictRelease(ri->sentinels);
dictRelease(ri->slaves);
- /* Release hiredis connections. Note that redisAsyncFree() will call
- * the disconnection callback. */
- if (ri->cc) {
- redisAsyncFree(ri->cc);
- ri->cc = NULL;
- }
- if (ri->pc) {
- redisAsyncFree(ri->pc);
- ri->pc = NULL;
- }
+ /* Release hiredis connections. */
+ if (ri->cc) sentinelKillLink(ri,ri->cc);
+ if (ri->pc) sentinelKillLink(ri,ri->pc);
/* Free other resources. */
sdsfree(ri->name);
sdsfree(ri->runid);
- sdsfree(ri->notify_script);
+ sdsfree(ri->notification_script);
sdsfree(ri->client_reconfig_script);
sdsfree(ri->slave_master_host);
sdsfree(ri->leader);
* 5) In the process of doing this undo the failover if in progress.
* 6) Disconnect the connections with the master (will reconnect automatically).
*/
-void sentinelResetMaster(sentinelRedisInstance *ri) {
+void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
redisAssert(ri->flags & SRI_MASTER);
dictRelease(ri->slaves);
dictRelease(ri->sentinels);
ri->slaves = dictCreate(&instancesDictType,NULL);
ri->sentinels = dictCreate(&instancesDictType,NULL);
- if (ri->cc) redisAsyncFree(ri->cc);
- if (ri->pc) redisAsyncFree(ri->pc);
+ if (ri->cc) sentinelKillLink(ri,ri->cc);
+ if (ri->pc) sentinelKillLink(ri,ri->pc);
ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
if (ri->leader) {
sdsfree(ri->leader);
ri->failover_state_change_time = 0;
ri->failover_start_time = 0;
ri->promoted_slave = NULL;
- sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
+ sdsfree(ri->runid);
+ sdsfree(ri->slave_master_host);
+ ri->runid = NULL;
+ ri->slave_master_host = NULL;
+ ri->last_avail_time = mstime();
+ ri->last_pong_time = mstime();
+ if (flags & SENTINEL_GENERATE_EVENT)
+ sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
}
/* Call sentinelResetMaster() on every master with a name matching the specified
* pattern. */
-int sentinelResetMastersByPattern(char *pattern) {
+int sentinelResetMastersByPattern(char *pattern, int flags) {
dictIterator *di;
dictEntry *de;
int reset = 0;
if (ri->name) {
if (stringmatch(pattern,ri->name,0)) {
- sentinelResetMaster(ri);
+ sentinelResetMaster(ri,flags);
reset++;
}
}
return reset;
}
+/* Reset the specified master with sentinelResetMaster(), and also change
+ * the ip:port address, but take the name of the instance unmodified.
+ *
+ * This is used to handle the +switch-master and +redirect-to-master events.
+ *
+ * The function returns REDIS_ERR if the address can't be resolved for some
+ * reason. Otherwise REDIS_OK is returned.
+ *
+ * TODO: make this reset so that original sentinels are re-added with
+ * same ip / port / runid.
+ */
+
+int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
+ sentinelAddr *oldaddr, *newaddr;
+
+ newaddr = createSentinelAddr(ip,port);
+ if (newaddr == NULL) return REDIS_ERR;
+ sentinelResetMaster(master,SENTINEL_NO_FLAGS);
+ oldaddr = master->addr;
+ master->addr = newaddr;
+ /* Release the old address at the end so we are safe even if the function
+ * gets the master->addr->ip and master->addr->port as arguments. */
+ releaseSentinelAddr(oldaddr);
+ return REDIS_OK;
+}
+
/* ============================ Config handling ============================= */
char *sentinelHandleConfiguration(char **argv, int argc) {
sentinelRedisInstance *ri;
ri = sentinelGetMasterByName(argv[1]);
if (!ri) return "No such master with specified name.";
ri->parallel_syncs = atoi(argv[2]);
+ } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
+ /* notification-script <name> <path> */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ if (access(argv[2],X_OK) == -1)
+ return "Notification script seems non existing or non executable.";
+ ri->notification_script = sdsnew(argv[2]);
+ } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
+ /* client-reconfig-script <name> <path> */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ if (access(argv[2],X_OK) == -1)
+ return "Client reconfiguration script seems non existing or "
+ "non executable.";
+ ri->client_reconfig_script = sdsnew(argv[2]);
} else {
return "Unrecognized sentinel configuration statement.";
}
/* ====================== hiredis connection handling ======================= */
+/* Completely disconnect an hiredis link from an instance. */
+void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
+ if (ri->cc == c) {
+ ri->cc = NULL;
+ ri->pending_commands = 0;
+ }
+ if (ri->pc == c) ri->pc = NULL;
+ c->data = NULL;
+ ri->flags |= SRI_DISCONNECTED;
+ redisAsyncFree(c);
+}
+
/* This function takes an hiredis context that is in an error condition
* and make sure to mark the instance as disconnected performing the
* cleanup needed.
* for async conenctions. */
void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
sentinelRedisInstance *ri = c->data;
- int pubsub = (ri->pc == c);
+ int pubsub;
+ if (ri == NULL) return; /* The instance no longer exists. */
+
+ pubsub = (ri->pc == c);
sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
"%@ #%s", c->errstr);
if (pubsub)
if (ri->cc->err) {
sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
ri->cc->errstr);
- redisAsyncFree(ri->cc);
- ri->cc = NULL;
+ sentinelKillLink(ri,ri->cc);
} else {
ri->cc_conn_time = mstime();
ri->cc->data = ri;
if (ri->pc->err) {
sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
ri->pc->errstr);
- redisAsyncFree(ri->pc);
- ri->pc = NULL;
+ sentinelKillLink(ri,ri->pc);
} else {
int retval;
if (retval != REDIS_OK) {
/* If we can't subscribe, the Pub/Sub connection is useless
* and we can simply disconnect it and try again. */
- redisAsyncFree(ri->pc);
- ri->pc = NULL;
+ sentinelKillLink(ri,ri->pc);
return;
}
}
sds *lines;
int numlines, j;
int role = 0;
-
+ int runid_changed = 0; /* true if runid changed. */
+ int first_runid = 0; /* true if this is the first runid we receive. */
/* The following fields must be reset to a given value in the case they
* are not found at all in the INFO output. */
if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
if (ri->runid == NULL) {
ri->runid = sdsnewlen(l+7,40);
+ first_runid = 1;
} else {
- /* TODO: check if run_id has changed. This means the
- * instance has been restarted, we want to set a flag
- * and notify this event. */
+ if (strncmp(ri->runid,l+7,40) != 0) {
+ runid_changed = 1;
+ sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
+ sdsfree(ri->runid);
+ ri->runid = sdsnewlen(l+7,40);
+ }
}
}
ri->info_refresh = mstime();
sdsfreesplitres(lines,numlines);
+ /* ---------------------------- Acting half ----------------------------- */
if (sentinel.tilt) return;
+ /* Act if a master turned into a slave. */
+ if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
+ if (first_runid && ri->slave_master_host) {
+ /* If it is the first time we receive INFO from it, but it's
+ * a slave while it was configured as a master, we want to monitor
+ * its master instead. */
+ sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
+ "%s %s %d %s %d",
+ ri->name, ri->addr->ip, ri->addr->port,
+ ri->slave_master_host, ri->slave_master_port);
+ sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
+ ri->slave_master_port);
+ return;
+ }
+ }
+
/* Act if a slave turned into a master. */
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
- if (ri->flags & SRI_PROMOTED) {
+ if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+ (runid_changed || first_runid))
+ {
+ /* If a slave turned into maser but:
+ *
+ * 1) Failover not in progress.
+ * 2) RunID hs changed, or its the first time we see an INFO output.
+ *
+ * We assume this is a reboot with a wrong configuration.
+ * Log the event and remove the slave. */
+ int retval;
+
+ sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
+ retval = dictDelete(ri->master->slaves,ri->name);
+ redisAssert(retval == REDIS_OK);
+ return;
+ } else if (ri->flags & SRI_PROMOTED) {
/* If this is a promoted slave we can change state to the
* failover state machine. */
if (ri->master &&
sentinelRedisInstance *ri = c->data;
redisReply *r;
- ri->pending_commands--;
- if (!reply) return;
+ if (ri) ri->pending_commands--;
+ if (!reply || !ri) return;
r = reply;
if (r->type == REDIS_REPLY_STRING) {
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = c->data;
- ri->pending_commands--;
+ if (ri) ri->pending_commands--;
}
void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
sentinelRedisInstance *ri = c->data;
redisReply *r;
- ri->pending_commands--;
- if (!reply) return;
+ if (ri) ri->pending_commands--;
+ if (!reply || !ri) return;
r = reply;
if (r->type == REDIS_REPLY_STATUS ||
sentinelRedisInstance *ri = c->data;
redisReply *r;
- ri->pending_commands--;
- if (!reply) return;
+ if (ri) ri->pending_commands--;
+ if (!reply || !ri) return;
r = reply;
/* Only update pub_time if we actually published our message. Otherwise
sentinelRedisInstance *ri = c->data;
redisReply *r;
- if (!reply) return;
+ if (!reply || !ri) return;
r = reply;
/* Update the last activity in the pubsub channel. Note that since we
} else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
/* SENTINEL RESET <pattern> */
if (c->argc != 3) goto numargserr;
- addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr));
+ addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
} else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
/* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
sentinelRedisInstance *ri;
addReplyBulkCString(c,addr->ip);
addReplyBulkLongLong(c,addr->port);
}
+ } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
+ /* SENTINEL PENDING-SCRIPTS */
+
+ if (c->argc != 2) goto numargserr;
+ sentinelPendingScriptsCommand(c);
} else {
addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
(char*)c->argv[1]->ptr);
(mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->last_pong_time) > (ri->down_after_period/2))
{
- redisAsyncFree(ri->cc); /* will call the disconnection callback */
+ sentinelKillLink(ri,ri->cc);
}
/* 2) Check if the pubsub link seems connected, was connected not less
(mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
(mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
{
- redisAsyncFree(ri->pc); /* will call the disconnection callback */
+ sentinelKillLink(ri,ri->pc);
}
/* Update the subjectively down flag. */
sentinelRedisInstance *ri = c->data;
redisReply *r;
- ri->pending_commands--;
- if (!reply) return;
+ if (ri) ri->pending_commands--;
+ if (!reply || !ri) return;
r = reply;
/* Ignore every error or unexpected reply.
SRI_RECONF_SENT);
}
} else {
- /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set. */
+ /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set.
+ *
+ * Do we have a slave to promote? Otherwise don't start a failover
+ * at all. */
+ if (sentinelSelectSlave(master) == NULL) return;
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
}
/* ---------------- Failover state machine implementation ------------------- */
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
+ /* If we in "wait start" but the master is no longer in ODOWN nor in
+ * SDOWN condition we abort the failover. This is important as it
+ * prevents a useless failover in a a notable case of netsplit, where
+ * the senitnels are split from the redis instances. In this case
+ * the failover will not start while there is the split because no
+ * good slave can be reached. However when the split is resolved, we
+ * can go to waitstart if the slave is back rechable a few milliseconds
+ * before the master is. In that case when the master is back online
+ * we cancel the failover. */
+ if ((ri->flags & (SRI_S_DOWN|SRI_O_DOWN)) == 0) {
+ sentinelEvent(REDIS_WARNING,"-failover-abort-master-is-back",
+ ri,"%@");
+ sentinelAbortFailover(ri);
+ return;
+ }
+
+ /* Start the failover going to the next state if enough time has
+ * elapsed. */
if (mstime() >= ri->failover_start_time) {
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
ri->failover_state_change_time = mstime();
sentinelRedisInstance *slave = sentinelSelectSlave(ri);
if (slave == NULL) {
- sentinelEvent(REDIS_WARNING,"-no-good-slave",ri,
- "%@ #retrying in %d seconds",
- (SENTINEL_FAILOVER_FIXED_DELAY+
- SENTINEL_FAILOVER_MAX_RANDOM_DELAY)/1000);
- ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
- ri->failover_start_time = mstime() + SENTINEL_FAILOVER_FIXED_DELAY +
- SENTINEL_FAILOVER_MAX_RANDOM_DELAY;
+ sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
+ sentinelAbortFailover(ri);
} else {
sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
slave->flags |= SRI_PROMOTED;
* and re-add it with the same address to trigger a complete state
* refresh. */
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
- sentinelRedisInstance *new, *ref = master->promoted_slave ?
- master->promoted_slave : master;
- int quorum = ref->quorum, parallel_syncs = ref->parallel_syncs;
- char *name = sdsnew(master->name);
- char *ip = sdsnew(ref->addr->ip), *oldip = sdsnew(master->addr->ip);
- int port = ref->addr->port, oldport = master->addr->port;
- int retval, oldflags = master->flags;
- mstime_t old_down_after_period = master->down_after_period;
- mstime_t old_failover_timeout = master->failover_timeout;
-
- retval = dictDelete(sentinel.masters,master->name);
- redisAssert(retval == DICT_OK);
- new = createSentinelRedisInstance(name,SRI_MASTER,ip,port,quorum,NULL);
- redisAssert(new != NULL);
- new->parallel_syncs = parallel_syncs;
- new->flags |= (oldflags & SRI_CAN_FAILOVER);
- new->down_after_period = old_down_after_period;
- new->failover_timeout = old_failover_timeout;
- /* TODO: ... set the scripts as well. */
- sentinelEvent(REDIS_WARNING,"+switch-master",new,"%s %s %d %s %d",
- name, oldip, oldport, ip, port);
- sdsfree(name);
- sdsfree(ip);
- sdsfree(oldip);
+ sentinelRedisInstance *ref = master->promoted_slave ?
+ master->promoted_slave : master;
+
+ sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
+ master->name, master->addr->ip, master->addr->port,
+ ref->addr->ip, ref->addr->port);
+
+ sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
}
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
}
}
-/* The following is called only for master instances and will abort the
- * failover process if:
- *
- * 1) The failover is in progress.
- * 2) We already promoted a slave.
- * 3) The promoted slave is in extended SDOWN condition.
+/* Abort a failover in progress with the following steps:
+ * 1) If this instance is the leaer send a SLAVEOF command to all the already
+ * reconfigured slaves if any to configure them to replicate with the
+ * original master.
+ * 2) For both leaders and observers: clear the failover flags and state in
+ * the master instance.
+ * 3) If there is already a promoted slave and we are the leader, and this
+ * slave is not DISCONNECTED, try to reconfigure it to replicate
+ * back to the master as well, sending a best effort SLAVEOF command.
*/
-void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
+void sentinelAbortFailover(sentinelRedisInstance *ri) {
+ char master_port[32];
dictIterator *di;
dictEntry *de;
- /* Failover is in progress? Do we have a promoted slave? */
- if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
-
- /* Is the promoted slave into an extended SDOWN state? */
- if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
- (mstime() - ri->promoted_slave->s_down_since_time) <
- (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
-
- sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
+ redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
+ ll2string(master_port,sizeof(master_port),ri->addr->port);
/* Clear failover related flags from slaves.
* Also if we are the leader make sure to send SLAVEOF commands to all the
* already reconfigured slaves in order to turn them back into slaves of
* the original master. */
-
di = dictGetIterator(ri->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
- if (ri->flags & SRI_I_AM_THE_LEADER) {
- char master_port[32];
+ if ((ri->flags & SRI_I_AM_THE_LEADER) &&
+ !(slave->flags & SRI_DISCONNECTED) &&
+ (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
+ SRI_RECONF_DONE)))
+ {
int retval;
- ll2string(master_port,sizeof(master_port),ri->addr->port);
retval = redisAsyncCommand(slave->cc,
sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
ri->addr->ip,
ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER);
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
ri->failover_state_change_time = mstime();
- ri->promoted_slave->flags &= ~SRI_PROMOTED;
- ri->promoted_slave = NULL;
+ if (ri->promoted_slave) {
+ ri->promoted_slave->flags &= ~SRI_PROMOTED;
+ ri->promoted_slave = NULL;
+ }
+}
+
+/* The following is called only for master instances and will abort the
+ * failover process if:
+ *
+ * 1) The failover is in progress.
+ * 2) We already promoted a slave.
+ * 3) The promoted slave is in extended SDOWN condition.
+ */
+void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
+ /* Failover is in progress? Do we have a promoted slave? */
+ if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
+
+ /* Is the promoted slave into an extended SDOWN state? */
+ if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
+ (mstime() - ri->promoted_slave->s_down_since_time) <
+ (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
+
+ sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
+ sentinelAbortFailover(ri);
}
/* ======================== SENTINEL timer handler ==========================
void sentinelTimer(void) {
sentinelCheckTiltCondition();
sentinelHandleDictOfRedisInstances(sentinel.masters);
+ sentinelRunPendingScripts();
+ sentinelCollectTerminatedScripts();
+ sentinelKillTimedoutScripts();
}