*.o
*.rdb
*.log
-redis-cli
-redis-server
-redis-benchmark
-redis-check-dump
-redis-check-aof
+redis-*
doc-tools
release
misc/*
release.h
src/transfer.sh
src/configs
-src/redis-server.dSYM
redis.ds
src/redis.conf
deps/lua/src/lua
deps/lua/src/liblua.a
.make-*
.prerequisites
+*.dSYM
LOW: No need to upgrade unless there are new features you want to use.
MODERATE: Program an upgrade of the server, but it's not urgent.
-HIGH: There is a critical bug that may affect some part of users. Upgrade!
+HIGH: There is a critical bug that may affect a subset of users. Upgrade!
CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
--------------------------------------------------------------------------------
+---[ Redis 2.5.13 (2.6 Release Candidate 7) ]
+
+UPGRADE URGENCY: HIGH
+
+* [BUGFIX] Theoretical bug in ziplist fixed.
+* [BUGFIX] Better out of memory handling (Log produced in log file).
+* [BUGFIX] Incrementally flush RDB file on slave side while performing the
+ first synchronization with the master. This makes Redis less
+ blocking in environments where disk I/O is slow.
+* [BUGFIX] Don't crash with Lua's redis.call() without arguments.
+* [BUGFIX] Don't crash after a big number of Lua calls on 32 bit systems
+ because of a failed assertion.
+* [BUGFIX] Fix SORT behaviour when called from scripting.
+* [BUGFIX] Adjust slave PING period accordingly to REDIS_HZ define.
+* [BUGFIX] BITCOUNT: fix crash on overflowing arguments.
+* [BUGFIX] Return an error when SELECT argument is not an integer.
+* [BUGFIX] Blocking operations on lists were completely reimplemented for
+ correctness. Now blocking list ops and pushes originated from
+ Lua scripts will play well together and will be replicated
+ and transmitted to the AOF correctly.
+* [IMPROVED] Send async PING before starting replication to avoid blocking if
+ master allows us to connect but it is actually not able to reply.
+* [IMPROVED] Support slave-priority for Redis Sentinel.
+* [IMPROVED] Hiredis library updated.
+
+---[ Redis 2.5.12 (2.6 Release Candidate 6) ]
+
+UPGRADE URGENCY: MODERATE.
+
+* [BUGFIX] Fixed a timing attack on AUTH (Issue #560).
+* [BUGFIX] Don't assume that "char" is signed.
+* [BUGFIX] Check that we have connection before enabling pipe mode.
+* [BUGFIX] Use the optimized version of the function to convert a double to
+ its string representation. Compilation was disabled because of
+ a typo in the #if statement.
+* [IMPROVED} REPLCONF internal command introduced, now INFO shows slaves with
+ correct port numbers. This makes 2.5.12 Redis Sentinel compatible.
+* [IMPROVED] Truncate short write from the AOF for a cleaner restart. On short
+ writes (for instance out of space) Redis will now try to remove
+ the half-written data so that the next restart will work without
+ the need for the "redis-check-aof" utility.
+* [IMPROVED] New in INFO: aof_last_bgrewrite_status
+* [IMPROVED] Allow Pub/Sub in contexts where other commands are blocked.
+* [BUGFIX] mark fd as writable when EPOLLERR or EPOLLHUP is returned by
+ epoll_wait.
+
+---[ Redis 2.5.11 (2.6 Release Candidate 5) ]
+
+UPGRADE URGENCY: HIGH.
+
+* [BUGFIX] Fixed Hash corruption when loading an RDB file generated by
+ previous versions of Redis that encoded hashes using
+ a different ziplist encoding format for small integers.
+ All the fileds that are integers in the range 0-255 may not
+ be recognized, or duplicated un updates, causing a crash
+ when the ziplist is converted to a real hash. (Issue #547).
+* [BUGFIX] Fixed the count of memory used by output buffers in the
+ setDeferredMultiBulkLength() function.
+
---[ Redis 2.5.10 (2.6 Release Candidate 4) ]
UPGRADE URGENCY: HIGH.
that you should be aware of:
* You can't use .rdb and AOF files generated with 2.6 into a 2.4 instance.
-* 2.4 slaves can be attached to 2.6 masters, but not the contrary, and only
+* 2.6 slaves can be attached to 2.4 masters, but not the contrary, and only
for the time needed to perform the version upgrade.
There are also a few API differences, that are unlikely to cause problems,
One or more spaces separates arguments, so you can use the specifiers
anywhere in an argument:
- reply = redisCommand("SET key:%s %s", myid, value);
+ reply = redisCommand(context, "SET key:%s %s", myid, value);
### Using replies
int redisReaderFeed(redisReader *reader, const char *buf, size_t len);
int redisReaderGetReply(redisReader *reader, void **reply);
+The same set of functions are used internally by hiredis when creating a
+normal Redis context, the above API just exposes it to the user for a direct
+usage.
+
### Usage
The function `redisReaderCreate` creates a `redisReader` structure that holds a
For example, [hiredis-rb](https://github.com/pietern/hiredis-rb/blob/master/ext/hiredis_ext/reader.c)
uses customized reply object functions to create Ruby objects.
+### Reader max buffer
+
+Both when using the Reader API directly or when using it indirectly via a
+normal Redis context, the redisReader structure uses a buffer in order to
+accumulate data from the server.
+Usually this buffer is destroyed when it is empty and is larger than 16
+kb in order to avoid wasting memory in unused buffers
+
+However when working with very big payloads destroying the buffer may slow
+down performances considerably, so it is possible to modify the max size of
+an idle buffer changing the value of the `maxbuf` field of the reader structure
+to the desired value. The special value of 0 means that there is no maximum
+value for an idle buffer, so the buffer will never get freed.
+
+For instance if you have a normal Redis context you can set the maximum idle
+buffer to zero (unlimited) just with:
+
+ context->reader->maxbuf = 0;
+
+This should be done only in order to maximize performances when working with
+large payloads. The context should be set back to `REDIS_READER_MAX_BUF` again
+as soon as possible in order to prevent allocation of useless memory.
+
## AUTHORS
Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and
__redisAsyncDisconnect(ac);
return;
}
+
+ /* If monitor mode, repush callback */
+ if(c->flags & REDIS_MONITORING) {
+ __redisPushCallback(&ac->replies,&cb);
+ }
/* When the connection is not being disconnected, simply stop
* trying to get replies and wait for the next loop tick. */
/* Even if the context is subscribed, pending regular callbacks will
* get a reply before pub/sub messages arrive. */
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
- /* A spontaneous reply in a not-subscribed context can only be the
- * error reply that is sent when a new connection exceeds the
- * maximum number of allowed connections on the server side. This
- * is seen as an error instead of a regular reply because the
- * server closes the connection after sending it. To prevent the
- * error from being overwritten by an EOF error the connection is
- * closed here. See issue #43. */
- if ( !(c->flags & REDIS_SUBSCRIBED) && ((redisReply*)reply)->type == REDIS_REPLY_ERROR ) {
+ /*
+ * A spontaneous reply in a not-subscribed context can be the error
+ * reply that is sent when a new connection exceeds the maximum
+ * number of allowed connections on the server side.
+ *
+ * This is seen as an error instead of a regular reply because the
+ * server closes the connection after sending it.
+ *
+ * To prevent the error from being overwritten by an EOF error the
+ * connection is closed here. See issue #43.
+ *
+ * Another possibility is that the server is loading its dataset.
+ * In this case we also want to close the connection, and have the
+ * user wait until the server is ready to take our request.
+ */
+ if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
c->err = REDIS_ERR_OTHER;
snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
__redisAsyncDisconnect(ac);
return;
}
- /* No more regular callbacks and no errors, the context *must* be subscribed. */
- assert(c->flags & REDIS_SUBSCRIBED);
- __redisGetSubscribeCallback(ac,reply,&cb);
+ /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
+ assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
+ if(c->flags & REDIS_SUBSCRIBED)
+ __redisGetSubscribeCallback(ac,reply,&cb);
}
if (cb.fn != NULL) {
/* (P)UNSUBSCRIBE does not have its own response: every channel or
* pattern that is unsubscribed will receive a message. This means we
* should not append a callback function for this command. */
+ } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
+ /* Set monitor flag and push callback */
+ c->flags |= REDIS_MONITORING;
+ __redisPushCallback(&ac->replies,&cb);
} else {
if (c->flags & REDIS_SUBSCRIBED)
/* This will likely result in an error reply, but it needs to be
long elements;
int root = 0;
- /* Set error for nested multi bulks with depth > 2 */
+ /* Set error for nested multi bulks with depth > 7 */
if (r->ridx == 8) {
__redisReaderSetError(r,REDIS_ERR_PROTOCOL,
"No support for nested multi bulk replies with depth > 7");
r->errstr[0] = '\0';
r->fn = &defaultFunctions;
r->buf = sdsempty();
+ r->maxbuf = REDIS_READER_MAX_BUF;
if (r->buf == NULL) {
free(r);
return NULL;
/* Copy the provided buffer. */
if (buf != NULL && len >= 1) {
-#if 0
/* Destroy internal buffer when it is empty and is quite large. */
- if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
+ if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) {
sdsfree(r->buf);
r->buf = sdsempty();
r->pos = 0;
/* r->buf should not be NULL since we just free'd a larger one. */
assert(r->buf != NULL);
}
-#endif
newbuf = sdscatlen(r->buf,buf,len);
if (newbuf == NULL) {
/* Flag that is set when the async context has one or more subscriptions. */
#define REDIS_SUBSCRIBED 0x20
+/* Flag that is set when monitor mode is active */
+#define REDIS_MONITORING 0x40
+
#define REDIS_REPLY_STRING 1
#define REDIS_REPLY_ARRAY 2
#define REDIS_REPLY_INTEGER 3
#define REDIS_REPLY_STATUS 5
#define REDIS_REPLY_ERROR 6
+#define REDIS_READER_MAX_BUF (1024*16) /* Default max unused reader buffer. */
+
#ifdef __cplusplus
extern "C" {
#endif
char *buf; /* Read buffer */
size_t pos; /* Buffer cursor */
size_t len; /* Buffer length */
+ size_t maxbuf; /* Max length of unused buffer */
redisReadTask rstack[9];
int ridx; /* Index of current read task */
#include <errno.h>
#include <stdarg.h>
#include <stdio.h>
+#include <poll.h>
+#include <limits.h>
#include "net.h"
#include "sds.h"
return REDIS_OK;
}
+#define __MAX_MSEC (((LONG_MAX) - 999) / 1000)
+
static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) {
- struct timeval to;
- struct timeval *toptr = NULL;
- fd_set wfd;
+ struct pollfd wfd[1];
+ long msec;
+
+ msec = -1;
+ wfd[0].fd = fd;
+ wfd[0].events = POLLOUT;
/* Only use timeout when not NULL. */
if (timeout != NULL) {
- to = *timeout;
- toptr = &to;
+ if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) {
+ close(fd);
+ return REDIS_ERR;
+ }
+
+ msec = (timeout->tv_sec * 1000) + ((timeout->tv_usec + 999) / 1000);
+
+ if (msec < 0 || msec > INT_MAX) {
+ msec = INT_MAX;
+ }
}
if (errno == EINPROGRESS) {
- FD_ZERO(&wfd);
- FD_SET(fd, &wfd);
+ int res;
- if (select(FD_SETSIZE, NULL, &wfd, NULL, toptr) == -1) {
- __redisSetErrorFromErrno(c,REDIS_ERR_IO,"select(2)");
+ if ((res = poll(wfd, 1, msec)) == -1) {
+ __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)");
close(fd);
return REDIS_ERR;
- }
-
- if (!FD_ISSET(fd, &wfd)) {
+ } else if (res == 0) {
errno = ETIMEDOUT;
__redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
close(fd);
#
# repl-timeout 60
+# The slave priority is an integer number published by Redis in the INFO output.
+# It is used by Redis Sentinel in order to select a slave to promote into a
+# master if the master is no longer working correctly.
+#
+# A slave with a low priority number is considered better for promotion, so
+# for instance if there are three slaves with priority 10, 100, 25 Sentinel will
+# pick the one wtih priority 10, that is the lowest.
+#
+# However a special priority of 0 marks the slave as not able to perform the
+# role of master, so a slave with priority of 0 will never be selected by
+# Redis Sentinel for promotion.
+#
+# By default the priority is 100.
+slave-priority 100
+
################################## SECURITY ###################################
# Require clients to issue AUTH <PASSWORD> before processing any other
--- /dev/null
+# Example sentinel.conf
+
+# port <sentinel-port>
+# The port that this sentinel instance will run on
+port 26379
+
+# sentinel monitor <master-name> <ip> <redis-port> <quorum>
+#
+# Tells Sentinel to monitor this slave, and to consider it in O_DOWN
+# (Objectively Down) state only if at least <quorum> sentinels agree.
+#
+# Note: master name should not include special characters or spaces.
+# The valid charset is A-z 0-9 and the three characters ".-_".
+sentinel monitor mymaster 127.0.0.1 6379 2
+
+# sentinel auth-pass <master-name> <password>
+#
+# Set the password to use to authenticate with the master and slaves.
+# Useful if there is a password set in the Redis instances to monitor.
+#
+# Note that the master password is also used for slaves, so it is not
+# possible to set a different password in masters and slaves instances
+# if you want to be able to monitor these instances with Sentinel.
+#
+# However you can have Redis instances without the authentication enabled
+# mixed with Redis instances requiring the authentication (as long as the
+# password set is the same for all the instances requiring the password) as
+# the AUTH command will have no effect in Redis instances with authentication
+# switched off.
+#
+# Example:
+#
+# sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
+
+# sentinel down-after-milliseconds <master-name> <milliseconds>
+#
+# Number of milliseconds the master (or any attached slave or sentinel) should
+# be unreachable (as in, not acceptable reply to PING, continuously, for the
+# specified period) in order to consider it in S_DOWN state (Subjectively
+# Down).
+#
+# Default is 30 seconds.
+sentinel down-after-milliseconds mymaster 30000
+
+# sentinel can-failover <master-name> <yes|no>
+#
+# Specify if this Sentinel can start the failover for this master.
+sentinel can-failover mymaster yes
+
+# sentinel parallel-syncs <master-name> <numslaves>
+#
+# How many slaves we can reconfigure to point to the new slave simultaneously
+# during the failover. Use a low number if you use the slaves to serve query
+# to avoid that all the slaves will be unreachable at about the same
+# time while performing the synchronization with the master.
+sentinel parallel-syncs mymaster 1
+
+# sentinel failover-timeout <master-name> <milliseconds>
+#
+# Specifies the failover timeout in milliseconds. When this time has elapsed
+# without any progress in the failover process, it is considered concluded by
+# the sentinel even if not all the attached slaves were correctly configured
+# to replicate with the new master (however a "best effort" SLAVEOF command
+# is sent to all the slaves before).
+#
+# Also when 25% of this time has elapsed without any advancement, and there
+# is a leader switch (the sentinel did not started the failover but is now
+# elected as leader), the sentinel will continue the failover doing a
+# "takeover".
+#
+# Default is 15 minutes.
+sentinel failover-timeout mymaster 900000
+
+# SCRIPTS EXECTION
+#
+# sentinel notification-script and sentinel reconfig-script are used in order
+# to configure scripts that are called to notify the system administrator
+# or to reconfigure clients after a failover. The scripts are executed
+# with the following rules for error handling:
+#
+# If script exists with "1" the execution is retried later (up to a maximum
+# number of times currently set to 10).
+#
+# If script exists with "2" (or an higher value) the script execution is
+# not retried.
+#
+# If script terminates because it receives a signal the behavior is the same
+# as exit code 1.
+#
+# A script has a maximum running time of 60 seconds. After this limit is
+# reached the script is terminated with a SIGKILL and the execution retried.
+
+# NOTIFICATION SCRIPT
+#
+# sentinel notification-script <master-name> <script-path>
+#
+# Call the specified notification script for any sentienl event that is
+# generated in the WARNING level (for instance -sdown, -odown, and so forth).
+# This script should notify the system administrator via email, SMS, or any
+# other messaging system, that there is something wrong with the monitored
+# Redis systems.
+#
+# The script is called with just two arguments: the first is the event type
+# and the second the event description.
+#
+# The script must exist and be executable in order for sentinel to start if
+# this option is provided.
+#
+# Example:
+#
+# sentinel notification-script mymaster /var/redis/notify.sh
+
+# CLIENTS RECONFIGURATION SCRIPT
+#
+# sentinel client-reconfig-script <master-name> <script-path>
+#
+# When the failover starts, ends, or is aborted, a script can be called in
+# order to perform application-specific tasks to notify the clients that the
+# configuration has changed and the master is at a different address.
+#
+# The script is called in the following cases:
+#
+# Failover started (a slave is already promoted)
+# Failover finished (all the additional slaves already reconfigured)
+# Failover aborted (in that case the script was previously called when the
+# failover started, and now gets called again with swapped
+# addresses).
+#
+# The following arguments are passed to the script:
+#
+# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
+#
+# <state> is "start", "end" or "abort"
+# <role> is either "leader" or "observer"
+#
+# The arguments from-ip, from-port, to-ip, to-port are used to communicate
+# the old address of the master and the new address of the elected slave
+# (now a master) in the case state is "start" or "end".
+#
+# For abort instead the "from" is the address of the promoted slave and
+# "to" is the address of the original master address, since the failover
+# was aborted.
+#
+# This script should be resistant to multiple invocations.
+#
+# Example:
+#
+# sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
+
+
REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS)
REDIS_LD=$(QUIET_LINK)$(CC) $(FINAL_LDFLAGS)
+REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL)
PREFIX?=/usr/local
INSTALL_BIN= $(PREFIX)/bin
ifndef V
QUIET_CC = @printf ' %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR) 1>&2;
QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) 1>&2;
+QUIET_INSTALL = @printf ' %b %b\n' $(LINKCOLOR)INSTALL$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) 1>&2;
endif
REDIS_SERVER_NAME= redis-server
-REDIS_SERVER_OBJ= adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o migrate.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o
+REDIS_SENTINEL_NAME= redis-sentinel
+REDIS_SERVER_OBJ= adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o migrate.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o
REDIS_CLI_NAME= redis-cli
REDIS_CLI_OBJ= anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o
REDIS_BENCHMARK_NAME= redis-benchmark
REDIS_CHECK_AOF_NAME= redis-check-aof
REDIS_CHECK_AOF_OBJ= redis-check-aof.o
-all: $(REDIS_SERVER_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME)
+all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME)
@echo ""
@echo "Hint: To run 'make test' is a good idea ;)"
@echo ""
# redis-server
$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ)
- $(REDIS_LD) -o $@ $^ ../deps/lua/src/liblua.a $(FINAL_LIBS)
+ $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a $(FINAL_LIBS)
+
+# redis-sentinel
+$(REDIS_SENTINEL_NAME): $(REDIS_SERVER_NAME)
+ $(REDIS_INSTALL) $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME)
# redis-cli
$(REDIS_CLI_NAME): $(REDIS_CLI_OBJ)
$(REDIS_CC) -c $<
clean:
- rm -rf $(REDIS_SERVER_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html
+ rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html
.PHONY: clean
install: all
mkdir -p $(INSTALL_BIN)
- $(INSTALL) $(REDIS_SERVER_NAME) $(INSTALL_BIN)
- $(INSTALL) $(REDIS_BENCHMARK_NAME) $(INSTALL_BIN)
- $(INSTALL) $(REDIS_CLI_NAME) $(INSTALL_BIN)
- $(INSTALL) $(REDIS_CHECK_DUMP_NAME) $(INSTALL_BIN)
- $(INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN)
+ $(REDIS_INSTALL) $(REDIS_SERVER_NAME) $(INSTALL_BIN)
+ $(REDIS_INSTALL) $(REDIS_BENCHMARK_NAME) $(INSTALL_BIN)
+ $(REDIS_INSTALL) $(REDIS_CLI_NAME) $(INSTALL_BIN)
+ $(REDIS_INSTALL) $(REDIS_CHECK_DUMP_NAME) $(INSTALL_BIN)
+ $(REDIS_INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN)
#include <stdlib.h>
#include <poll.h>
#include <string.h>
+#include <time.h>
#include "ae.h"
#include "zmalloc.h"
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
+ eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
int processed = 0;
aeTimeEvent *te;
long long maxId;
+ time_t now = time(NULL);
+
+ /* If the system clock is moved to the future, and then set back to the
+ * right value, time events may be delayed in a random way. Often this
+ * means that scheduled operations will not be performed soon enough.
+ *
+ * Here we try to detect system clock skews, and force all the time
+ * events to be processed ASAP when this happens: the idea is that
+ * processing events earlier is less dangerous than delaying them
+ * indefinitely, and practice suggests it is. */
+ if (now < eventLoop->lastTime) {
+ te = eventLoop->timeEventHead;
+ while(te) {
+ te->when_sec = 0;
+ te = te->next;
+ }
+ }
+ eventLoop->lastTime = now;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
+ time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
+ if (e->events & EPOLLERR) mask |= AE_WRITABLE;
+ if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
if (port) *port = ntohs(sa.sin_port);
return 0;
}
+
+int anetSockName(int fd, char *ip, int *port) {
+ struct sockaddr_in sa;
+ socklen_t salen = sizeof(sa);
+
+ if (getsockname(fd,(struct sockaddr*)&sa,&salen) == -1) {
+ *port = 0;
+ ip[0] = '?';
+ ip[1] = '\0';
+ return -1;
+ }
+ if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
+ if (port) *port = ntohs(sa.sin_port);
+ return 0;
+}
strerror(errno),
(long)nwritten,
(long)sdslen(server.aof_buf));
+
+ if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
+ redisLog(REDIS_WARNING, "Could not remove short write "
+ "from the append-only file. Redis may refuse "
+ "to load the AOF the next time it starts. "
+ "ftruncate: %s", strerror(errno));
+ }
}
exit(1);
}
server.aof_buf = sdsempty();
}
+ server.aof_lastbgrewrite_status = REDIS_OK;
+
redisLog(REDIS_NOTICE, "Background AOF rewrite finished successfully");
/* Change state from WAIT_REWRITE to ON if needed */
if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
redisLog(REDIS_VERBOSE,
"Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {
+ server.aof_lastbgrewrite_status = REDIS_ERR;
+
redisLog(REDIS_WARNING,
"Background AOF rewrite terminated with error");
} else {
+ server.aof_lastbgrewrite_status = REDIS_ERR;
+
redisLog(REDIS_WARNING,
"Background AOF rewrite terminated by signal %d", bysignal);
}
o->ptr = sdsgrowzero(o->ptr,byte+1);
/* Get current values */
- byteval = ((char*)o->ptr)[byte];
+ byteval = ((uint8_t*)o->ptr)[byte];
bit = 7 - (bitoffset & 0x7);
bitval = byteval & (1 << bit);
/* Update byte with new bit value and return original value */
byteval &= ~(1 << bit);
byteval |= ((on & 0x1) << bit);
- ((char*)o->ptr)[byte] = byteval;
+ ((uint8_t*)o->ptr)[byte] = byteval;
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c, bitval ? shared.cone : shared.czero);
bitval = llbuf[byte] & (1 << bit);
} else {
if (byte < sdslen(o->ptr))
- bitval = ((char*)o->ptr)[byte] & (1 << bit);
+ bitval = ((uint8_t*)o->ptr)[byte] & (1 << bit);
}
addReply(c, bitval ? shared.cone : shared.czero);
/* BITCOUNT key [start end] */
void bitcountCommand(redisClient *c) {
robj *o;
- long start, end;
+ long start, end, strlen;
unsigned char *p;
char llbuf[32];
- size_t strlen;
/* Lookup, check for type, and return 0 for non existing keys. */
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
if (end < 0) end = strlen+end;
if (start < 0) start = 0;
if (end < 0) end = 0;
- if ((unsigned)end >= strlen) end = strlen-1;
+ if (end >= strlen) end = strlen-1;
} else if (c->argc == 2) {
/* The whole string. */
start = 0;
{
server.aof_rewrite_min_size = memtoll(argv[1],NULL);
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
+ if (strlen(argv[1]) > REDIS_AUTHPASS_MAX_LEN) {
+ err = "Password is longer than REDIS_AUTHPASS_MAX_LEN";
+ goto loaderr;
+ }
server.requirepass = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
zfree(server.pidfile);
if ((server.stop_writes_on_bgsave_err = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
+ } else if (!strcasecmp(argv[0],"slave-priority") && argc == 2) {
+ server.slave_priority = atoi(argv[1]);
+ } else if (!strcasecmp(argv[0],"sentinel")) {
+ /* argc == 1 is handled by main() as we need to enter the sentinel
+ * mode ASAP. */
+ if (argc != 1) {
+ if (!server.sentinel_mode) {
+ err = "sentinel directive while not in sentinel mode";
+ goto loaderr;
+ }
+ err = sentinelHandleConfiguration(argv+1,argc-1);
+ if (err) goto loaderr;
+ }
} else {
err = "Bad directive or wrong number of arguments"; goto loaderr;
}
zfree(server.rdb_filename);
server.rdb_filename = zstrdup(o->ptr);
} else if (!strcasecmp(c->argv[2]->ptr,"requirepass")) {
+ if (sdslen(o->ptr) > REDIS_AUTHPASS_MAX_LEN) goto badfmt;
zfree(server.requirepass);
server.requirepass = ((char*)o->ptr)[0] ? zstrdup(o->ptr) : NULL;
} else if (!strcasecmp(c->argv[2]->ptr,"masterauth")) {
if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
ll < 0) goto badfmt;
server.maxmemory = ll;
- if (server.maxmemory) freeMemoryIfNeeded();
+ if (server.maxmemory) {
+ if (server.maxmemory < zmalloc_used_memory()) {
+ redisLog(REDIS_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in keys eviction and/or inability to accept new write commands depending on the maxmemory-policy.");
+ }
+ freeMemoryIfNeeded();
+ }
} else if (!strcasecmp(c->argv[2]->ptr,"maxmemory-policy")) {
if (!strcasecmp(o->ptr,"volatile-lru")) {
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
if (yn == -1) goto badfmt;
server.rdb_checksum = yn;
+ } else if (!strcasecmp(c->argv[2]->ptr,"slave-priority")) {
+ if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+ ll <= 0) goto badfmt;
+ server.slave_priority = ll;
} else {
addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
(char*)c->argv[2]->ptr);
config_get_numerical_field("repl-timeout",server.repl_timeout);
config_get_numerical_field("maxclients",server.maxclients);
config_get_numerical_field("watchdog-period",server.watchdog_period);
+ config_get_numerical_field("slave-priority",server.slave_priority);
/* Bool (yes/no) values */
config_get_bool_field("no-appendfsync-on-rewrite",
#define aof_fsync fsync
#endif
+/* Define rdb_fsync_range to sync_file_range() on Linux, otherwise we use
+ * the plain fsync() call. */
+#ifdef __linux__
+#define rdb_fsync_range(fd,off,size) sync_file_range(fd,off,size,SYNC_FILE_RANGE_WAIT_BEFORE|SYNC_FILE_RANGE_WRITE)
+#else
+#define rdb_fsync_range(fd,off,size) fsync(fd)
+#endif
+
/* Byte ordering detection */
#include <sys/types.h> /* This will likely define BYTE_ORDER */
}
void selectCommand(redisClient *c) {
- int id = atoi(c->argv[1]->ptr);
+ long id;
+
+ if (getLongFromObjectOrReply(c, c->argv[1], &id,
+ "invalid DB index") != REDIS_OK)
+ return;
if (selectDb(c,id) == REDIS_ERR) {
addReplyError(c,"invalid DB index");
void debugCommand(redisClient *c) {
if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
*((char*)-1) = 'x';
+ } else if (!strcasecmp(c->argv[1]->ptr,"oom")) {
+ void *ptr = zmalloc(ULONG_MAX); /* Should trigger an out of memory. */
+ zfree(ptr);
+ addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"assert")) {
if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]);
redisAssertWithInfo(c,c->argv[0],1 == 2);
}
#endif /* HAVE_BACKTRACE */
+/* ==================== Logging functions for debugging ===================== */
+
+void redisLogHexDump(int level, char *descr, void *value, size_t len) {
+ char buf[65], *b;
+ unsigned char *v = value;
+ char charset[] = "0123456789abcdef";
+
+ redisLog(level,"%s (hexdump):", descr);
+ b = buf;
+ while(len) {
+ b[0] = charset[(*v)>>4];
+ b[1] = charset[(*v)&0xf];
+ b[2] = '\0';
+ b += 2;
+ len--;
+ v++;
+ if (b-buf == 64 || len == 0) {
+ redisLogRaw(level|REDIS_LOG_RAW,buf);
+ b = buf;
+ }
+ }
+ redisLogRaw(level|REDIS_LOG_RAW,"\n");
+}
+
/* =========================== Software Watchdog ============================ */
#include <sys/time.h>
return key;
}
-static int dict_hash_function_seed = 5381;
+static uint32_t dict_hash_function_seed = 5381;
-void dictSetHashFunctionSeed(unsigned int seed) {
+void dictSetHashFunctionSeed(uint32_t seed) {
dict_hash_function_seed = seed;
}
-unsigned int dictGetHashFunctionSeed(void) {
+uint32_t dictGetHashFunctionSeed(void) {
return dict_hash_function_seed;
}
-/* Generic hash function (a popular one from Bernstein).
- * I tested a few and this was the best. */
-unsigned int dictGenHashFunction(const unsigned char *buf, int len) {
- unsigned int hash = dict_hash_function_seed;
+/* MurmurHash2, by Austin Appleby
+ * Note - This code makes a few assumptions about how your machine behaves -
+ * 1. We can read a 4-byte value from any address without crashing
+ * 2. sizeof(int) == 4
+ *
+ * And it has a few limitations -
+ *
+ * 1. It will not work incrementally.
+ * 2. It will not produce the same results on little-endian and big-endian
+ * machines.
+ */
+unsigned int dictGenHashFunction(const void *key, int len) {
+ /* 'm' and 'r' are mixing constants generated offline.
+ They're not really 'magic', they just happen to work well. */
+ uint32_t seed = dict_hash_function_seed;
+ const uint32_t m = 0x5bd1e995;
+ const int r = 24;
- while (len--)
- hash = ((hash << 5) + hash) + (*buf++); /* hash * 33 + c */
- return hash;
+ /* Initialize the hash to a 'random' value */
+ uint32_t h = seed ^ len;
+
+ /* Mix 4 bytes at a time into the hash */
+ const unsigned char *data = (const unsigned char *)key;
+
+ while(len >= 4) {
+ uint32_t k = *(uint32_t*)data;
+
+ k *= m;
+ k ^= k >> r;
+ k *= m;
+
+ h *= m;
+ h ^= k;
+
+ data += 4;
+ len -= 4;
+ }
+
+ /* Handle the last few bytes of the input array */
+ switch(len) {
+ case 3: h ^= data[2] << 16;
+ case 2: h ^= data[1] << 8;
+ case 1: h ^= data[0]; h *= m;
+ };
+
+ /* Do a few final mixes of the hash to ensure the last few
+ * bytes are well-incorporated. */
+ h ^= h >> 13;
+ h *= m;
+ h ^= h >> 15;
+
+ return (unsigned int)h;
}
-/* And a case insensitive version */
+/* And a case insensitive hash function (based on djb hash) */
unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len) {
- unsigned int hash = dict_hash_function_seed;
+ unsigned int hash = (unsigned int)dict_hash_function_seed;
while (len--)
hash = ((hash << 5) + hash) + (tolower(*buf++)); /* hash * 33 + c */
/* ----------------------------- API implementation ------------------------- */
-/* Reset an hashtable already initialized with ht_init().
- * NOTE: This function should only called by ht_destroy(). */
+/* Reset a hash table already initialized with ht_init().
+ * NOTE: This function should only be called by ht_destroy(). */
static void _dictReset(dictht *ht)
{
ht->table = NULL;
return dictExpand(d, minimal);
}
-/* Expand or create the hashtable */
+/* Expand or create the hash table */
int dictExpand(dict *d, unsigned long size)
{
- dictht n; /* the new hashtable */
+ dictht n; /* the new hash table */
unsigned long realsize = _dictNextPower(size);
/* the size is invalid if it is smaller than the number of
- * elements already inside the hashtable */
+ * elements already inside the hash table */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
- /* Allocate the new hashtable and initialize all pointers to NULL */
+ /* Allocate the new hash table and initialize all pointers to NULL */
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
* a value returns the dictEntry structure to the user, that will make
* sure to fill the value field as he wishes.
*
- * This function is also directly expoed to user API to be called
+ * This function is also directly exposed to user API to be called
* mainly in order to store non-pointers inside the hash value, example:
*
* entry = dictAddRaw(dict,mykey);
unsigned int h, idx, table;
dictEntry *he;
- /* Expand the hashtable if needed */
+ /* Expand the hash table if needed */
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
/* Compute the key hash value */
void dictReleaseIterator(dictIterator *iter);
dictEntry *dictGetRandomKey(dict *d);
void dictPrintStats(dict *d);
-unsigned int dictGenHashFunction(const unsigned char *buf, int len);
+unsigned int dictGenHashFunction(const void *key, int len);
unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len);
void dictEmpty(dict *d);
void dictEnableResize(void);
#define _BSD_SOURCE
+#if defined(__linux__)
+#define _GNU_SOURCE
+#endif
+
#if defined(__linux__) || defined(__OpenBSD__)
#define _XOPEN_SOURCE 700
#else
c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
+ c->slave_listening_port = 0;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
/* Only glue when the next node is non-NULL (an sds in this case) */
if (next->ptr != NULL) {
+ c->reply_bytes -= zmalloc_size_sds(len->ptr);
+ c->reply_bytes -= zmalloc_size_sds(next->ptr);
len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
+ c->reply_bytes += zmalloc_size_sds(len->ptr);
listDelNode(c->reply,ln->next);
}
}
* called from contexts where the client can't be freed safely, i.e. from the
* lower level functions pushing data inside the client output buffers. */
void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
+ redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));
if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
if (checkClientOutputBufferLimits(c)) {
sds client = getClientInfoString(c);
}
/* This will also be called when the set was just converted
- * to regular hashtable encoded set */
+ * to regular hash table encoded set */
if (o->encoding == REDIS_ENCODING_HT) {
dictAdd((dict*)o->ptr,ele,NULL);
} else {
fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr);
exit(1);
}
+ /* Suppress hiredis cleanup of unused buffers for max speed. */
+ c->context->reader->maxbuf = 0;
/* Queue N requests accordingly to the pipeline size. */
c->obuf = sdsempty();
for (j = 0; j < config.pipeline; j++)
" -a <password> Password to use when connecting to the server\n"
" -r <repeat> Execute specified command N times\n"
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
-" It is possible to specify sub-second times like -i 0.1.\n"
+" It is possible to specify sub-second times like -i 0.1\n"
" -n <db> Database number\n"
" -x Read last argument from STDIN\n"
" -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n)\n"
" -c Enable cluster mode (follow -ASK and -MOVED redirections)\n"
" --raw Use raw formatting for replies (default when STDOUT is not a tty)\n"
-" --latency Enter a special mode continuously sampling latency.\n"
-" --slave Simulate a slave showing commands received from the master.\n"
-" --pipe Transfer raw Redis protocol from stdin to server.\n"
-" --bigkeys Sample Redis keys looking for big keys.\n"
-" --eval <file> Send an EVAL command using the Lua script at <file>.\n"
+" --latency Enter a special mode continuously sampling latency\n"
+" --slave Simulate a slave showing commands received from the master\n"
+" --pipe Transfer raw Redis protocol from stdin to server\n"
+" --bigkeys Sample Redis keys looking for big keys\n"
+" --eval <file> Send an EVAL command using the Lua script at <file>\n"
" --help Output this help and exit\n"
" --version Output version and exit\n"
"\n"
/* Pipe mode */
if (config.pipe_mode) {
- cliConnect(0);
+ if (cliConnect(0) == REDIS_ERR) exit(1);
pipeMode();
}
* results. For instance SPOP and RANDOMKEY are two random commands.
* S: Sort command output array if called from script, so that the output
* is deterministic.
+ * l: Allow command while loading the database.
+ * t: Allow command while a slave has stale data but is not allowed to
+ * server this data. Normally no command is accepted in this condition
+ * but just a few.
*/
struct redisCommand redisCommandTable[] = {
{"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
{"sismember",sismemberCommand,3,"r",0,NULL,1,1,1,0,0},
{"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0},
{"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0},
- {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0},
+ {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
{"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
{"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
{"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
{"exec",execCommand,1,"s",0,NULL,0,0,0,0,0},
{"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
{"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
+ {"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0},
{"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
{"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
- {"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0},
- {"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0},
+ {"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0},
+ {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0},
{"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
{"ttl",ttlCommand,2,"r",0,NULL,1,1,1,0,0},
{"pttl",pttlCommand,2,"r",0,NULL,1,1,1,0,0},
{"persist",persistCommand,2,"w",0,NULL,1,1,1,0,0},
- {"slaveof",slaveofCommand,3,"as",0,NULL,0,0,0,0,0},
+ {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
{"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
{"config",configCommand,-2,"ar",0,NULL,0,0,0,0,0},
- {"subscribe",subscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
- {"unsubscribe",unsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
- {"psubscribe",psubscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
- {"punsubscribe",punsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
- {"publish",publishCommand,3,"pf",0,NULL,0,0,0,0,0},
+ {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
+ {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
+ {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
+ {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
+ {"publish",publishCommand,3,"pflt",0,NULL,0,0,0,0,0},
{"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
{"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
{"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
if (server.logfile) close(fd);
}
-/* Redis generally does not try to recover from out of memory conditions
- * when allocating objects or strings, it is not clear if it will be possible
- * to report this condition to the client since the networking layer itself
- * is based on heap allocation for send buffers, so we simply abort.
- * At least the code will be simpler to read... */
-void oom(const char *msg) {
- redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
- sleep(1);
- abort();
-}
-
/* Return the UNIX time in microseconds */
long long ustime(void) {
struct timeval tv;
* a macro is used: run_with_period(milliseconds) { .... }
*/
-/* Using the following macro you can run code inside serverCron() with the
- * specified period, specified in milliseconds.
- * The actual resolution depends on REDIS_HZ. */
-#define run_with_period(_ms_) if (!(loops % ((_ms_)/(1000/REDIS_HZ))))
-
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
- int j, loops = server.cronloops;
+ int j;
REDIS_NOTUSED(eventLoop);
REDIS_NOTUSED(id);
REDIS_NOTUSED(clientData);
}
/* Show information about connected clients */
- run_with_period(5000) {
- redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
- listLength(server.clients)-listLength(server.slaves),
- listLength(server.slaves),
- zmalloc_used_memory());
+ if (!server.sentinel_mode) {
+ run_with_period(5000) {
+ redisLog(REDIS_VERBOSE,
+ "%d clients connected (%d slaves), %zu bytes in use",
+ listLength(server.clients)-listLength(server.slaves),
+ listLength(server.slaves),
+ zmalloc_used_memory());
+ }
}
/* We need to do a few operations on clients asynchronously. */
* to detect transfer failures. */
run_with_period(1000) replicationCron();
+ /* Run the sentinel timer if we are in sentinel mode. */
+ run_with_period(100) {
+ if (server.sentinel_mode) sentinelTimer();
+ }
+
server.cronloops++;
return 1000/REDIS_HZ;
}
shared.del = createStringObject("DEL",3);
shared.rpop = createStringObject("RPOP",4);
shared.lpop = createStringObject("LPOP",4);
+ shared.lpush = createStringObject("LPUSH",5);
for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
shared.integers[j]->encoding = REDIS_ENCODING_INT;
server.aof_last_fsync = time(NULL);
server.aof_rewrite_time_last = -1;
server.aof_rewrite_time_start = -1;
+ server.aof_lastbgrewrite_status = REDIS_OK;
server.aof_delayed_fsync = 0;
server.aof_fd = -1;
server.aof_selected_db = -1; /* Make sure the first time will not match */
server.repl_serve_stale_data = 1;
server.repl_slave_ro = 1;
server.repl_down_since = time(NULL);
+ server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
/* Client output buffer limits */
server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
+ server.lpopCommand = lookupCommandByCString("lpop");
+ server.rpopCommand = lookupCommandByCString("rpop");
/* Slow log */
server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
server.slaves = listCreate();
server.monitors = listCreate();
server.unblocked_clients = listCreate();
+ server.ready_keys = listCreate();
createSharedObjects();
adjustOpenFilesLimit();
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
+ server.db[j].ready_keys = dictCreate(&setDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
}
server.stop_writes_on_bgsave_err = 1;
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
- acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
+ acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event.");
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
- acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");
+ acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
if (server.aof_state == REDIS_AOF_ON) {
server.aof_fd = open(server.aof_filename,
case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
case 'R': c->flags |= REDIS_CMD_RANDOM; break;
case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
+ case 'l': c->flags |= REDIS_CMD_LOADING; break;
+ case 't': c->flags |= REDIS_CMD_STALE; break;
default: redisPanic("Unsupported command flag"); break;
}
f++;
return REDIS_OK;
}
- /* Don't accept wirte commands if this is a read only slave. But
+ /* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & REDIS_MASTER) &&
* we are a slave with a broken link with master. */
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
server.repl_serve_stale_data == 0 &&
- c->cmd->proc != infoCommand && c->cmd->proc != slaveofCommand)
+ !(c->cmd->flags & REDIS_CMD_STALE))
{
addReply(c, shared.masterdownerr);
return REDIS_OK;
}
- /* Loading DB? Return an error if the command is not INFO */
- if (server.loading && c->cmd->proc != infoCommand) {
+ /* Loading DB? Return an error if the command has not the
+ * REDIS_CMD_LOADING flag. */
+ if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
addReply(c, shared.loadingerr);
return REDIS_OK;
}
- /* Lua script too slow? Only allow SHUTDOWN NOSAVE and SCRIPT KILL. */
+ /* Lua script too slow? Only allow commands with REDIS_CMD_STALE flag. */
if (server.lua_timedout &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
addReply(c,shared.queued);
} else {
call(c,REDIS_CALL_FULL);
+ if (listLength(server.ready_keys))
+ handleClientsBlockedOnLists();
}
return REDIS_OK;
}
/*================================== Commands =============================== */
+/* Return zero if strings are the same, non-zero if they are not.
+ * The comparison is performed in a way that prevents an attacker to obtain
+ * information about the nature of the strings just monitoring the execution
+ * time of the function.
+ *
+ * Note that limiting the comparison length to strings up to 512 bytes we
+ * can avoid leaking any information about the password length and any
+ * possible branch misprediction related leak.
+ */
+int time_independent_strcmp(char *a, char *b) {
+ char bufa[REDIS_AUTHPASS_MAX_LEN], bufb[REDIS_AUTHPASS_MAX_LEN];
+ /* The above two strlen perform len(a) + len(b) operations where either
+ * a or b are fixed (our password) length, and the difference is only
+ * relative to the length of the user provided string, so no information
+ * leak is possible in the following two lines of code. */
+ int alen = strlen(a);
+ int blen = strlen(b);
+ int j;
+ int diff = 0;
+
+ /* We can't compare strings longer than our static buffers.
+ * Note that this will never pass the first test in practical circumstances
+ * so there is no info leak. */
+ if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
+
+ memset(bufa,0,sizeof(bufa)); /* Constant time. */
+ memset(bufb,0,sizeof(bufb)); /* Constant time. */
+ /* Again the time of the following two copies is proportional to
+ * len(a) + len(b) so no info is leaked. */
+ memcpy(bufa,a,alen);
+ memcpy(bufb,b,blen);
+
+ /* Always compare all the chars in the two buffers without
+ * conditional expressions. */
+ for (j = 0; j < sizeof(bufa); j++) {
+ diff |= (bufa[j] ^ bufb[j]);
+ }
+ /* Length must be equal as well. */
+ diff |= alen ^ blen;
+ return diff; /* If zero strings are the same. */
+}
+
void authCommand(redisClient *c) {
if (!server.requirepass) {
addReplyError(c,"Client sent AUTH, but no password is set");
- } else if (!strcmp(c->argv[1]->ptr, server.requirepass)) {
+ } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
c->authenticated = 1;
addReply(c,shared.ok);
} else {
unsigned long lol, bib;
int allsections = 0, defsections = 0;
int sections = 0;
-
+
if (section) {
allsections = strcasecmp(section,"all") == 0;
defsections = strcasecmp(section,"default") == 0;
/* Server */
if (allsections || defsections || !strcasecmp(section,"server")) {
struct utsname name;
+ char *mode;
+ if (server.sentinel_mode) mode = "sentinel";
+ else mode = "standalone";
+
if (sections++) info = sdscat(info,"\r\n");
uname(&name);
info = sdscatprintf(info,
"redis_version:%s\r\n"
"redis_git_sha1:%s\r\n"
"redis_git_dirty:%d\r\n"
+ "redis_mode:%s\r\n"
"os:%s %s %s\r\n"
"arch_bits:%d\r\n"
"multiplexing_api:%s\r\n"
REDIS_VERSION,
redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0,
+ mode,
name.sysname, name.release, name.machine,
server.arch_bits,
aeGetApiName(),
"aof_rewrite_in_progress:%d\r\n"
"aof_rewrite_scheduled:%d\r\n"
"aof_last_rewrite_time_sec:%ld\r\n"
- "aof_current_rewrite_time_sec:%ld\r\n",
+ "aof_current_rewrite_time_sec:%ld\r\n"
+ "aof_last_bgrewrite_status:%s\r\n",
server.loading,
server.dirty,
server.rdb_child_pid != -1,
server.lastsave,
- server.lastbgsave_status == REDIS_OK ? "ok" : "err",
+ (server.lastbgsave_status == REDIS_OK) ? "ok" : "err",
server.rdb_save_time_last,
(server.rdb_child_pid == -1) ?
-1 : time(NULL)-server.rdb_save_time_start,
server.aof_rewrite_scheduled,
server.aof_rewrite_time_last,
(server.aof_child_pid == -1) ?
- -1 : time(NULL)-server.aof_rewrite_time_start);
+ -1 : time(NULL)-server.aof_rewrite_time_start,
+ (server.aof_lastbgrewrite_status == REDIS_OK) ? "ok" : "err");
if (server.aof_state != REDIS_AOF_OFF) {
info = sdscatprintf(info,
"aof_base_size:%lld\r\n"
"aof_pending_rewrite:%d\r\n"
"aof_buffer_length:%zu\r\n"
- "aof_rewrite_buffer_length:%zu\r\n"
+ "aof_rewrite_buffer_length:%lu\r\n"
"aof_pending_bio_fsync:%llu\r\n"
"aof_delayed_fsync:%lu\r\n",
(long long) server.aof_current_size,
if (server.repl_state == REDIS_REPL_TRANSFER) {
info = sdscatprintf(info,
- "master_sync_left_bytes:%ld\r\n"
+ "master_sync_left_bytes:%lld\r\n"
"master_sync_last_io_seconds_ago:%d\r\n"
- ,(long)server.repl_transfer_left,
+ , (long long)
+ (server.repl_transfer_size - server.repl_transfer_read),
(int)(server.unixtime-server.repl_transfer_lastio)
);
}
"master_link_down_since_seconds:%ld\r\n",
(long)server.unixtime-server.repl_down_since);
}
+ info = sdscatprintf(info,
+ "slave_priority:%d\r\n", server.slave_priority);
}
info = sdscatprintf(info,
"connected_slaves:%lu\r\n",
}
if (state == NULL) continue;
info = sdscatprintf(info,"slave%d:%s,%d,%s\r\n",
- slaveid,ip,port,state);
+ slaveid,ip,slave->slave_listening_port,state);
slaveid++;
}
}
fprintf(stderr," ./redis-server /etc/redis/6379.conf\n");
fprintf(stderr," ./redis-server --port 7777\n");
fprintf(stderr," ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
- fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n");
+ fprintf(stderr," ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
+ fprintf(stderr,"Sentinel mode:\n");
+ fprintf(stderr," ./redis-server /etc/sentinel.conf --sentinel\n");
exit(1);
}
void redisAsciiArt(void) {
#include "asciilogo.h"
char *buf = zmalloc(1024*16);
+ char *mode = "stand alone";
+
+ if (server.sentinel_mode) mode = "sentinel";
snprintf(buf,1024*16,ascii_logo,
REDIS_VERSION,
redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0,
(sizeof(long) == 8) ? "64" : "32",
- "stand alone",
- server.port,
+ mode, server.port,
(long) getpid()
);
redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf);
void memtest(size_t megabytes, int passes);
+/* Returns 1 if there is --sentinel among the arguments or if
+ * argv[0] is exactly "redis-sentinel". */
+int checkForSentinelMode(int argc, char **argv) {
+ int j;
+
+ if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
+ for (j = 1; j < argc; j++)
+ if (!strcmp(argv[j],"--sentinel")) return 1;
+ return 0;
+}
+
+/* Function called at startup to load RDB or AOF file in memory. */
+void loadDataFromDisk(void) {
+ long long start = ustime();
+ if (server.aof_state == REDIS_AOF_ON) {
+ if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
+ redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
+ } else {
+ if (rdbLoad(server.rdb_filename) == REDIS_OK) {
+ redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
+ (float)(ustime()-start)/1000000);
+ } else if (errno != ENOENT) {
+ redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
+ exit(1);
+ }
+ }
+}
+
+void redisOutOfMemoryHandler(size_t allocation_size) {
+ redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!",
+ allocation_size);
+ redisPanic("OOM");
+}
+
int main(int argc, char **argv) {
- long long start;
struct timeval tv;
/* We need to initialize our libraries, and the server configuration. */
zmalloc_enable_thread_safeness();
+ zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
+ server.sentinel_mode = checkForSentinelMode(argc,argv);
initServerConfig();
+ /* We need to init sentinel right now as parsing the configuration file
+ * in sentinel mode will have the effect of populating the sentinel
+ * data structures with master nodes to monitor. */
+ if (server.sentinel_mode) {
+ initSentinelConfig();
+ initSentinel();
+ }
+
if (argc >= 2) {
int j = 1; /* First option to parse in argv[] */
sds options = sdsempty();
loadServerConfig(configfile,options);
sdsfree(options);
} else {
- redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
+ redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
}
if (server.daemonize) daemonize();
initServer();
if (server.daemonize) createPidFile();
redisAsciiArt();
- redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
-#ifdef __linux__
- linuxOvercommitMemoryWarning();
-#endif
- start = ustime();
- if (server.aof_state == REDIS_AOF_ON) {
- if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
- redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
- } else {
- if (rdbLoad(server.rdb_filename) == REDIS_OK) {
- redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
- (float)(ustime()-start)/1000000);
- } else if (errno != ENOENT) {
- redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
- exit(1);
- }
+
+ if (!server.sentinel_mode) {
+ /* Things only needed when not runnign in Sentinel mode. */
+ redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
+ #ifdef __linux__
+ linuxOvercommitMemoryWarning();
+ #endif
+ loadDataFromDisk();
+ if (server.ipfd > 0)
+ redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+ if (server.sofd > 0)
+ redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
}
- if (server.ipfd > 0)
- redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
- if (server.sofd > 0)
- redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
+
+ /* Warning the user about suspicious maxmemory setting. */
+ if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
+ redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
+ }
+
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
#define REDIS_SLOWLOG_LOG_SLOWER_THAN 10000
#define REDIS_SLOWLOG_MAX_LEN 128
#define REDIS_MAX_CLIENTS 10000
-
+#define REDIS_AUTHPASS_MAX_LEN 512
+#define REDIS_DEFAULT_SLAVE_PRIORITY 100
#define REDIS_REPL_TIMEOUT 60
#define REDIS_REPL_PING_SLAVE_PERIOD 10
-
#define REDIS_RUN_ID_SIZE 40
#define REDIS_OPS_SEC_SAMPLES 16
#define REDIS_CMD_NOSCRIPT 64 /* "s" flag */
#define REDIS_CMD_RANDOM 128 /* "R" flag */
#define REDIS_CMD_SORT_FOR_SCRIPT 256 /* "S" flag */
+#define REDIS_CMD_LOADING 512 /* "l" flag */
+#define REDIS_CMD_STALE 1024 /* "t" flag */
/* Object types */
#define REDIS_STRING 0
#define REDIS_REPL_NONE 0 /* No active replication */
#define REDIS_REPL_CONNECT 1 /* Must connect to master */
#define REDIS_REPL_CONNECTING 2 /* Connecting to master */
-#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
-#define REDIS_REPL_CONNECTED 4 /* Connected to master */
+#define REDIS_REPL_RECEIVE_PONG 3 /* Wait for PING reply */
+#define REDIS_REPL_TRANSFER 4 /* Receiving .rdb from master */
+#define REDIS_REPL_CONNECTED 5 /* Connected to master */
/* Synchronous read timeout - slave side */
#define REDIS_REPL_SYNCIO_TIMEOUT 5
#define REDIS_PROPAGATE_AOF 1
#define REDIS_PROPAGATE_REPL 2
+/* Using the following macro you can run code inside serverCron() with the
+ * specified period, specified in milliseconds.
+ * The actual resolution depends on REDIS_HZ. */
+#define run_with_period(_ms_) if (!(server.cronloops%((_ms_)/(1000/REDIS_HZ))))
+
/* We can print the stacktrace, so our assert is defined this way: */
#define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1)))
#define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
+ dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
* for BRPOPLPUSH. */
} blockingState;
+/* The following structure represents a node in the server.ready_keys list,
+ * where we accumulate all the keys that had clients blocked with a blocking
+ * operation such as B[LR]POP, but received new data in the context of the
+ * last executed command.
+ *
+ * After the execution of every command or script, we run this list to check
+ * if as a result we should serve data to clients blocked, unblocking them.
+ * Note that server.ready_keys will not have duplicates as there dictionary
+ * also called ready_keys in every structure representing a Redis database,
+ * where we make sure to remember if a given key was already added in the
+ * server.ready_keys list. */
+typedef struct readyList {
+ redisDb *db;
+ robj *key;
+} readyList;
+
/* With multiplexing we need to take per-clinet state.
* Clients are taken in a liked list. */
typedef struct redisClient {
int repldbfd; /* replication DB file descriptor */
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
+ int slave_listening_port; /* As configured with: SLAVECONF listening-port */
multiState mstate; /* MULTI/EXEC state */
blockingState bpop; /* blocking state */
list *io_keys; /* Keys this client is waiting to be loaded from the
*masterdownerr, *roslaveerr,
*oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
+ *lpush,
*select[REDIS_SHARED_SELECT_CMDS],
*integers[REDIS_SHARED_INTEGERS],
*mbulkhdr[REDIS_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
int arch_bits; /* 32 or 64 depending on sizeof(long) */
int cronloops; /* Number of times the cron function run */
char runid[REDIS_RUN_ID_SIZE+1]; /* ID always different at every exec. */
+ int sentinel_mode; /* True if this instance is a Sentinel. */
/* Networking */
int port; /* TCP listening port */
char *bindaddr; /* Bind address or NULL */
off_t loading_loaded_bytes;
time_t loading_start_time;
/* Fast pointers to often looked up command */
- struct redisCommand *delCommand, *multiCommand, *lpushCommand;
+ struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
+ *rpopCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */
time_t aof_last_fsync; /* UNIX time of last fsync() */
time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */
time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */
+ int aof_lastbgrewrite_status; /* REDIS_OK or REDIS_ERR */
unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */
/* RDB persistence */
long long dirty; /* Changes to DB from the last save */
char *masterauth; /* AUTH with this password with master */
char *masterhost; /* Hostname of master */
int masterport; /* Port of master */
- int repl_ping_slave_period; /* Master pings the salve every N seconds */
+ int repl_ping_slave_period; /* Master pings the slave every N seconds */
int repl_timeout; /* Timeout after N seconds of master idle */
redisClient *master; /* Client that is master for this slave */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a slave */
- off_t repl_transfer_left; /* Bytes left reading .rdb */
+ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
+ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
+ off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
int repl_transfer_s; /* Slave -> Master SYNC socket */
int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
int repl_serve_stale_data; /* Serve stale data when link is down? */
int repl_slave_ro; /* Slave is read only? */
time_t repl_down_since; /* Unix time at which link with master went down */
+ int slave_priority; /* Reported in INFO and used by Sentinel. */
/* Limits */
unsigned int maxclients; /* Max number of simultaneous clients */
unsigned long long maxmemory; /* Max number of memory bytes to use */
/* Blocked clients */
unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
list *unblocked_clients; /* list of clients to unblock before next loop */
+ list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
- int sort_dontsort;
int sort_desc;
int sort_alpha;
int sort_bypattern;
void listTypeDelete(listTypeEntry *entry);
void listTypeConvert(robj *subject, int enc);
void unblockClientWaitingData(redisClient *c);
-int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
+void handleClientsBlockedOnLists(void);
void popGenericCommand(redisClient *c, int where);
/* MULTI/EXEC/WATCH... */
int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
+/* Sentinel */
+void initSentinelConfig(void);
+void initSentinel(void);
+void sentinelTimer(void);
+char *sentinelHandleConfiguration(char **argv, int argc);
+
/* Scripting */
void scriptingInit(void);
void timeCommand(redisClient *c);
void bitopCommand(redisClient *c);
void bitcountCommand(redisClient *c);
+void replconfCommand(redisClient *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
void enableWatchdog(int period);
void disableWatchdog(void);
void watchdogScheduleSignal(int period);
+void redisLogHexDump(int level, char *descr, void *value, size_t len);
+
+#define redisDebug(fmt, ...) \
+ printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__)
+#define redisDebugMark() \
+ printf("-- MARK %s:%d --\n", __FILE__, __LINE__)
+
#endif
#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
+#include <sys/socket.h>
#include <sys/stat.h>
/* ---------------------------------- MASTER -------------------------------- */
return;
}
+/* REPLCONF <option> <value> <option> <value> ...
+ * This command is used by a slave in order to configure the replication
+ * process before starting it with the SYNC command.
+ *
+ * Currently the only use of this command is to communicate to the master
+ * what is the listening port of the Slave redis instance, so that the
+ * master can accurately list slaves and their listening ports in
+ * the INFO output.
+ *
+ * In the future the same command can be used in order to configure
+ * the replication to initiate an incremental replication instead of a
+ * full resync. */
+void replconfCommand(redisClient *c) {
+ int j;
+
+ if ((c->argc % 2) == 0) {
+ /* Number of arguments must be odd to make sure that every
+ * option has a corresponding value. */
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+
+ /* Process every option-value pair. */
+ for (j = 1; j < c->argc; j+=2) {
+ if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
+ long port;
+
+ if ((getLongFromObjectOrReply(c,c->argv[j+1],
+ &port,NULL) != REDIS_OK))
+ return;
+ c->slave_listening_port = port;
+ } else {
+ addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
+ (char*)c->argv[j]->ptr);
+ return;
+ }
+ }
+ addReply(c,shared.ok);
+}
+
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *slave = privdata;
REDIS_NOTUSED(el);
}
/* Asynchronously read the SYNC payload we receive from a master */
+#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen;
+ off_t left;
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
- /* If repl_transfer_left == -1 we still have to read the bulk length
+ /* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
- if (server.repl_transfer_left == -1) {
+ if (server.repl_transfer_size == -1) {
if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,
"I/O error reading bulk count from MASTER: %s",
redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
goto error;
}
- server.repl_transfer_left = strtol(buf+1,NULL,10);
+ server.repl_transfer_size = strtol(buf+1,NULL,10);
redisLog(REDIS_NOTICE,
"MASTER <-> SLAVE sync: receiving %ld bytes from master",
- server.repl_transfer_left);
+ server.repl_transfer_size);
return;
}
/* Read bulk data */
- readlen = (server.repl_transfer_left < (signed)sizeof(buf)) ?
- server.repl_transfer_left : (signed)sizeof(buf);
+ left = server.repl_transfer_size - server.repl_transfer_read;
+ readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
nread = read(fd,buf,readlen);
if (nread <= 0) {
redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
goto error;
}
- server.repl_transfer_left -= nread;
+ server.repl_transfer_read += nread;
+
+ /* Sync data on disk from time to time, otherwise at the end of the transfer
+ * we may suffer a big delay as the memory buffers are copied into the
+ * actual disk. */
+ if (server.repl_transfer_read >=
+ server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
+ {
+ off_t sync_size = server.repl_transfer_read -
+ server.repl_transfer_last_fsync_off;
+ rdb_fsync_range(server.repl_transfer_fd,
+ server.repl_transfer_last_fsync_off, sync_size);
+ server.repl_transfer_last_fsync_off += sync_size;
+ }
+
/* Check if the transfer is now complete */
- if (server.repl_transfer_left == 0) {
+ if (server.repl_transfer_read == server.repl_transfer_size) {
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
replicationAbortSyncTransfer();
return;
}
+/* Send a synchronous command to the master. Used to send AUTH and
+ * REPLCONF commands before starting the replication with SYNC.
+ *
+ * On success NULL is returned.
+ * On error an sds string describing the error is returned.
+ */
+char *sendSynchronousCommand(int fd, ...) {
+ va_list ap;
+ sds cmd = sdsempty();
+ char *arg, buf[256];
+
+ /* Create the command to send to the master, we use simple inline
+ * protocol for simplicity as currently we only send simple strings. */
+ va_start(ap,fd);
+ while(1) {
+ arg = va_arg(ap, char*);
+ if (arg == NULL) break;
+
+ if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
+ cmd = sdscat(cmd,arg);
+ }
+ cmd = sdscatlen(cmd,"\r\n",2);
+
+ /* Transfer command to the server. */
+ if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
+ sdsfree(cmd);
+ return sdscatprintf(sdsempty(),"Writing to master: %s",
+ strerror(errno));
+ }
+ sdsfree(cmd);
+
+ /* Read the reply from the server. */
+ if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
+ {
+ return sdscatprintf(sdsempty(),"Reading from master: %s",
+ strerror(errno));
+ }
+
+ /* Check for errors from the server. */
+ if (buf[0] != '+') {
+ return sdscatprintf(sdsempty(),"Error from master: %s", buf);
+ }
+
+ return NULL; /* No errors. */
+}
+
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
- char buf[1024], tmpfile[256];
+ char tmpfile[256], *err;
int dfd, maxtries = 5;
+ int sockerr = 0;
+ socklen_t errlen = sizeof(sockerr);
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
return;
}
- redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
- /* This event should only be triggered once since it is used to have a
- * non-blocking connect(2) to the master. It has been triggered when this
- * function is called, so we can delete it. */
- aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
+ /* Check for errors in the socket. */
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
+ sockerr = errno;
+ if (sockerr) {
+ aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
+ redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
+ strerror(sockerr));
+ goto error;
+ }
- /* AUTH with the master if required. */
- if(server.masterauth) {
- char authcmd[1024];
- size_t authlen;
+ /* If we were connecting, it's time to send a non blocking PING, we want to
+ * make sure the master is able to reply before going into the actual
+ * replication process where we have long timeouts in the order of
+ * seconds (in the meantime the slave would block). */
+ if (server.repl_state == REDIS_REPL_CONNECTING) {
+ redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
+ /* Delete the writable event so that the readable event remains
+ * registered and we can wait for the PONG reply. */
+ aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
+ server.repl_state = REDIS_REPL_RECEIVE_PONG;
+ /* Send the PING, don't check for errors at all, we have the timeout
+ * that will take care about this. */
+ syncWrite(fd,"PING\r\n",6,100);
+ return;
+ }
+
+ /* Receive the PONG command. */
+ if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
+ char buf[1024];
+
+ /* Delete the readable event, we no longer need it now that there is
+ * the PING reply to read. */
+ aeDeleteFileEvent(server.el,fd,AE_READABLE);
- authlen = snprintf(authcmd,sizeof(authcmd),"AUTH %s\r\n",server.masterauth);
- if (syncWrite(fd,authcmd,authlen,server.repl_syncio_timeout*1000) == -1) {
- redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
+ /* Read the reply with explicit timeout. */
+ buf[0] = '\0';
+ if (syncReadLine(fd,buf,sizeof(buf),
+ server.repl_syncio_timeout*1000) == -1)
+ {
+ redisLog(REDIS_WARNING,
+ "I/O error reading PING reply from master: %s",
strerror(errno));
goto error;
}
- /* Read the AUTH result. */
- if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
- redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
- strerror(errno));
+
+ /* We don't care about the reply, it can be +PONG or an error since
+ * the server requires AUTH. As long as it replies correctly, it's
+ * fine from our point of view. */
+ if (buf[0] != '-' && buf[0] != '+') {
+ redisLog(REDIS_WARNING,"Unexpected reply to PING from master.");
goto error;
+ } else {
+ redisLog(REDIS_NOTICE,
+ "Master replied to PING, replication can continue...");
}
- if (buf[0] != '+') {
- redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
+ }
+
+ /* AUTH with the master if required. */
+ if(server.masterauth) {
+ err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
+ if (err) {
+ redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
+ sdsfree(err);
goto error;
}
}
+ /* Set the slave port, so that Master's INFO command can list the
+ * slave listening port correctly. */
+ {
+ sds port = sdsfromlonglong(server.port);
+ err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
+ NULL);
+ sdsfree(port);
+ /* Ignore the error if any, not all the Redis versions support
+ * REPLCONF listening-port. */
+ if (err) {
+ redisLog(REDIS_NOTICE,"(non critical): Master does not understand REPLCONF listening-port: %s", err);
+ sdsfree(err);
+ }
+ }
+
/* Issue the SYNC command */
if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
}
server.repl_state = REDIS_REPL_TRANSFER;
- server.repl_transfer_left = -1;
+ server.repl_transfer_size = -1;
+ server.repl_transfer_read = 0;
+ server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
error:
- server.repl_state = REDIS_REPL_CONNECT;
close(fd);
+ server.repl_transfer_s = -1;
+ server.repl_state = REDIS_REPL_CONNECT;
return;
}
void undoConnectWithMaster(void) {
int fd = server.repl_transfer_s;
- redisAssert(server.repl_state == REDIS_REPL_CONNECTING);
+ redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
+ server.repl_state == REDIS_REPL_RECEIVE_PONG);
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
close(fd);
server.repl_transfer_s = -1;
if (server.master) freeClient(server.master);
if (server.repl_state == REDIS_REPL_TRANSFER)
replicationAbortSyncTransfer();
- else if (server.repl_state == REDIS_REPL_CONNECTING)
+ else if (server.repl_state == REDIS_REPL_CONNECTING ||
+ server.repl_state == REDIS_REPL_RECEIVE_PONG)
undoConnectWithMaster();
server.repl_state = REDIS_REPL_NONE;
redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
void replicationCron(void) {
/* Non blocking connection timeout? */
- if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTING &&
+ if (server.masterhost &&
+ (server.repl_state == REDIS_REPL_CONNECTING ||
+ server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER &&
(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
{
- redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER...");
+ redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
replicationAbortSyncTransfer();
}
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
* will not actually go down. */
- if (!(server.cronloops % (server.repl_ping_slave_period*10))) {
+ if (!(server.cronloops % (server.repl_ping_slave_period * REDIS_HZ))) {
listIter li;
listNode *ln;
redisClient *c = server.lua_client;
sds reply;
+ /* Require at least one argument */
+ if (argc == 0) {
+ luaPushError(lua,
+ "Please specify at least one argument for redis.call()");
+ return 1;
+ }
+
/* Build the arguments vector */
argv = zmalloc(sizeof(robj*)*argc);
for (j = 0; j < argc; j++) {
* reply as expected. */
if ((cmd->flags & REDIS_CMD_SORT_FOR_SCRIPT) &&
(reply[0] == '*' && reply[1] != '-')) {
- /* Skip this step if command is SORT but output was already sorted */
- if (cmd->proc != sortCommand || server.sort_dontsort)
luaSortArray(lua);
}
sdsfree(reply);
+ c->reply_bytes = 0;
cleanup:
/* Clean up. Command code may have changed argv/argc so we use the
return 1;
}
+/* Returns a table with a single field 'field' set to the string value
+ * passed as argument. This helper function is handy when returning
+ * a Redis Protocol error or status reply from Lua:
+ *
+ * return redis.error_reply("ERR Some Error")
+ * return redis.status_reply("ERR Some Error")
+ */
+int luaRedisReturnSingleFieldTable(lua_State *lua, char *field) {
+ if (lua_gettop(lua) != 1 || lua_type(lua,-1) != LUA_TSTRING) {
+ luaPushError(lua, "wrong number or type of arguments");
+ return 1;
+ }
+
+ lua_newtable(lua);
+ lua_pushstring(lua, field);
+ lua_pushvalue(lua, -3);
+ lua_settable(lua, -3);
+ return 1;
+}
+
+int luaRedisErrorReplyCommand(lua_State *lua) {
+ return luaRedisReturnSingleFieldTable(lua,"err");
+}
+
+int luaRedisStatusReplyCommand(lua_State *lua) {
+ return luaRedisReturnSingleFieldTable(lua,"ok");
+}
+
int luaLogCommand(lua_State *lua) {
int j, argc = lua_gettop(lua);
int level;
lua_pushcfunction(lua, luaRedisSha1hexCommand);
lua_settable(lua, -3);
+ /* redis.error_reply and redis.status_reply */
+ lua_pushstring(lua, "error_reply");
+ lua_pushcfunction(lua, luaRedisErrorReplyCommand);
+ lua_settable(lua, -3);
+ lua_pushstring(lua, "status_reply");
+ lua_pushcfunction(lua, luaRedisStatusReplyCommand);
+ lua_settable(lua, -3);
+
/* Finally set the table as 'redis' global var. */
lua_setglobal(lua,"redis");
return s;
}
-sds sdscatlen(sds s, void *t, size_t len) {
+sds sdscatlen(sds s, const void *t, size_t len) {
struct sdshdr *sh;
size_t curlen = sdslen(s);
return s;
}
-sds sdscat(sds s, char *t) {
+sds sdscat(sds s, const char *t) {
return sdscatlen(s, t, strlen(t));
}
-sds sdscatsds(sds s, sds t) {
+sds sdscatsds(sds s, const sds t) {
return sdscatlen(s, t, sdslen(t));
}
-sds sdscpylen(sds s, char *t, size_t len) {
+sds sdscpylen(sds s, const char *t, size_t len) {
struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr)));
size_t totlen = sh->free+sh->len;
return s;
}
-sds sdscpy(sds s, char *t) {
+sds sdscpy(sds s, const char *t) {
return sdscpylen(s, t, strlen(t));
}
for (j = 0; j < len; j++) s[j] = toupper(s[j]);
}
-int sdscmp(sds s1, sds s2) {
+int sdscmp(const sds s1, const sds s2) {
size_t l1, l2, minlen;
int cmp;
* requires length arguments. sdssplit() is just the
* same function but for zero-terminated strings.
*/
-sds *sdssplitlen(char *s, int len, char *sep, int seplen, int *count) {
+sds *sdssplitlen(const char *s, int len, const char *sep, int seplen, int *count) {
int elements = 0, slots = 5, start = 0, j;
sds *tokens;
return sdsnewlen(p,32-(p-buf));
}
-sds sdscatrepr(sds s, char *p, size_t len) {
+sds sdscatrepr(sds s, const char *p, size_t len) {
s = sdscatlen(s,"\"",1);
while(len--) {
switch(*p) {
* Note that sdscatrepr() is able to convert back a string into
* a quoted string in the same format sdssplitargs() is able to parse.
*/
-sds *sdssplitargs(char *line, int *argc) {
- char *p = line;
+sds *sdssplitargs(const char *line, int *argc) {
+ const char *p = line;
char *current = NULL;
char **vector = NULL;
*
* The function returns the sds string pointer, that is always the same
* as the input pointer since no resize is needed. */
-sds sdsmapchars(sds s, char *from, char *to, size_t setlen) {
+sds sdsmapchars(sds s, const char *from, const char *to, size_t setlen) {
size_t j, i, l = sdslen(s);
for (j = 0; j < l; j++) {
size_t sdslen(const sds s);
sds sdsdup(const sds s);
void sdsfree(sds s);
-size_t sdsavail(sds s);
+size_t sdsavail(const sds s);
sds sdsgrowzero(sds s, size_t len);
-sds sdscatlen(sds s, void *t, size_t len);
-sds sdscat(sds s, char *t);
-sds sdscatsds(sds s, sds t);
-sds sdscpylen(sds s, char *t, size_t len);
-sds sdscpy(sds s, char *t);
+sds sdscatlen(sds s, const void *t, size_t len);
+sds sdscat(sds s, const char *t);
+sds sdscatsds(sds s, const sds t);
+sds sdscpylen(sds s, const char *t, size_t len);
+sds sdscpy(sds s, const char *t);
sds sdscatvprintf(sds s, const char *fmt, va_list ap);
#ifdef __GNUC__
sds sdsrange(sds s, int start, int end);
void sdsupdatelen(sds s);
void sdsclear(sds s);
-int sdscmp(sds s1, sds s2);
-sds *sdssplitlen(char *s, int len, char *sep, int seplen, int *count);
+int sdscmp(const sds s1, const sds s2);
+sds *sdssplitlen(const char *s, int len, const char *sep, int seplen, int *count);
void sdsfreesplitres(sds *tokens, int count);
void sdstolower(sds s);
void sdstoupper(sds s);
sds sdsfromlonglong(long long value);
-sds sdscatrepr(sds s, char *p, size_t len);
-sds *sdssplitargs(char *line, int *argc);
+sds sdscatrepr(sds s, const char *p, size_t len);
+sds *sdssplitargs(const char *line, int *argc);
void sdssplitargs_free(sds *argv, int argc);
-sds sdsmapchars(sds s, char *from, char *to, size_t setlen);
+sds sdsmapchars(sds s, const char *from, const char *to, size_t setlen);
/* Low level functions exposed to the user API */
sds sdsMakeRoomFor(sds s, size_t addlen);
--- /dev/null
+/* Redis Sentinel implementation
+ * -----------------------------
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "redis.h"
+#include "hiredis.h"
+#include "async.h"
+
+#include <ctype.h>
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+
+extern char **environ;
+
+#define REDIS_SENTINEL_PORT 26379
+
+/* ======================== Sentinel global state =========================== */
+
+typedef long long mstime_t; /* millisecond time type. */
+
+/* Address object, used to describe an ip:port pair. */
+typedef struct sentinelAddr {
+ char *ip;
+ int port;
+} sentinelAddr;
+
+/* A Sentinel Redis Instance object is monitoring. */
+#define SRI_MASTER (1<<0)
+#define SRI_SLAVE (1<<1)
+#define SRI_SENTINEL (1<<2)
+#define SRI_DISCONNECTED (1<<3)
+#define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
+#define SRI_O_DOWN (1<<5) /* Objectively down (quorum reached). */
+#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
+ its master is down. */
+/* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
+ * allowed to perform the failover for this master.
+ * When set in a SRI_SENTINEL instance means that sentinel is allowed to
+ * perform the failover on its master. */
+#define SRI_CAN_FAILOVER (1<<7)
+#define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
+ this master. */
+#define SRI_I_AM_THE_LEADER (1<<9) /* We are the leader for this master. */
+#define SRI_PROMOTED (1<<10) /* Slave selected for promotion. */
+#define SRI_RECONF_SENT (1<<11) /* SLAVEOF <newmaster> sent. */
+#define SRI_RECONF_INPROG (1<<12) /* Slave synchronization in progress. */
+#define SRI_RECONF_DONE (1<<13) /* Slave synchronized with new master. */
+#define SRI_FORCE_FAILOVER (1<<14) /* Force failover with master up. */
+#define SRI_SCRIPT_KILL_SENT (1<<15) /* SCRIPT KILL already sent on -BUSY */
+
+#define SENTINEL_INFO_PERIOD 10000
+#define SENTINEL_PING_PERIOD 1000
+#define SENTINEL_ASK_PERIOD 1000
+#define SENTINEL_PUBLISH_PERIOD 5000
+#define SENTINEL_DOWN_AFTER_PERIOD 30000
+#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
+#define SENTINEL_TILT_TRIGGER 2000
+#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
+#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
+#define SENTINEL_PROMOTION_RETRY_PERIOD 30000
+#define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
+#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
+#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
+#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
+#define SENTINEL_MAX_PENDING_COMMANDS 100
+#define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
+
+/* How many milliseconds is an information valid? This applies for instance
+ * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
+#define SENTINEL_INFO_VALIDITY_TIME 5000
+#define SENTINEL_FAILOVER_FIXED_DELAY 5000
+#define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
+
+/* Failover machine different states. */
+#define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
+#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
+#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
+#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
+#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
+#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
+#define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
+#define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
+#define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
+#define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
+#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
+
+#define SENTINEL_MASTER_LINK_STATUS_UP 0
+#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
+
+/* Generic flags that can be used with different functions. */
+#define SENTINEL_NO_FLAGS 0
+#define SENTINEL_GENERATE_EVENT 1
+#define SENTINEL_LEADER 2
+#define SENTINEL_OBSERVER 4
+
+/* Script execution flags and limits. */
+#define SENTINEL_SCRIPT_NONE 0
+#define SENTINEL_SCRIPT_RUNNING 1
+#define SENTINEL_SCRIPT_MAX_QUEUE 256
+#define SENTINEL_SCRIPT_MAX_RUNNING 16
+#define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
+#define SENTINEL_SCRIPT_MAX_RETRY 10
+#define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
+
+typedef struct sentinelRedisInstance {
+ int flags; /* See SRI_... defines */
+ char *name; /* Master name from the point of view of this sentinel. */
+ char *runid; /* run ID of this instance. */
+ sentinelAddr *addr; /* Master host. */
+ redisAsyncContext *cc; /* Hiredis context for commands. */
+ redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
+ int pending_commands; /* Number of commands sent waiting for a reply. */
+ mstime_t cc_conn_time; /* cc connection time. */
+ mstime_t pc_conn_time; /* pc connection time. */
+ mstime_t pc_last_activity; /* Last time we received any message. */
+ mstime_t last_avail_time; /* Last time the instance replied to ping with
+ a reply we consider valid. */
+ mstime_t last_pong_time; /* Last time the instance replied to ping,
+ whatever the reply was. That's used to check
+ if the link is idle and must be reconnected. */
+ mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
+ mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
+ we received an hello from this Sentinel
+ via Pub/Sub. */
+ mstime_t last_master_down_reply_time; /* Time of last reply to
+ SENTINEL is-master-down command. */
+ mstime_t s_down_since_time; /* Subjectively down since time. */
+ mstime_t o_down_since_time; /* Objectively down since time. */
+ mstime_t down_after_period; /* Consider it down after that period. */
+ mstime_t info_refresh; /* Time at which we received INFO output from it. */
+
+ /* Master specific. */
+ dict *sentinels; /* Other sentinels monitoring the same master. */
+ dict *slaves; /* Slaves for this master instance. */
+ int quorum; /* Number of sentinels that need to agree on failure. */
+ int parallel_syncs; /* How many slaves to reconfigure at same time. */
+ char *auth_pass; /* Password to use for AUTH against master & slaves. */
+
+ /* Slave specific. */
+ mstime_t master_link_down_time; /* Slave replication link down time. */
+ int slave_priority; /* Slave priority according to its INFO output. */
+ mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
+ struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
+ char *slave_master_host; /* Master host as reported by INFO */
+ int slave_master_port; /* Master port as reported by INFO */
+ int slave_master_link_status; /* Master link status as reported by INFO */
+ /* Failover */
+ char *leader; /* If this is a master instance, this is the runid of
+ the Sentinel that should perform the failover. If
+ this is a Sentinel, this is the runid of the Sentinel
+ that this other Sentinel is voting as leader.
+ This field is valid only if SRI_MASTER_DOWN is
+ set on the Sentinel instance. */
+ int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
+ mstime_t failover_state_change_time;
+ mstime_t failover_start_time; /* When to start to failover if leader. */
+ mstime_t failover_timeout; /* Max time to refresh failover state. */
+ struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
+ /* Scripts executed to notify admin or reconfigure clients: when they
+ * are set to NULL no script is executed. */
+ char *notification_script;
+ char *client_reconfig_script;
+} sentinelRedisInstance;
+
+/* Main state. */
+struct sentinelState {
+ dict *masters; /* Dictionary of master sentinelRedisInstances.
+ Key is the instance name, value is the
+ sentinelRedisInstance structure pointer. */
+ int tilt; /* Are we in TILT mode? */
+ int running_scripts; /* Number of scripts in execution right now. */
+ mstime_t tilt_start_time; /* When TITL started. */
+ mstime_t previous_time; /* Time last time we ran the time handler. */
+ list *scripts_queue; /* Queue of user scripts to execute. */
+} sentinel;
+
+/* A script execution job. */
+typedef struct sentinelScriptJob {
+ int flags; /* Script job flags: SENTINEL_SCRIPT_* */
+ int retry_num; /* Number of times we tried to execute it. */
+ char **argv; /* Arguments to call the script. */
+ mstime_t start_time; /* Script execution time if the script is running,
+ otherwise 0 if we are allowed to retry the
+ execution at any time. If the script is not
+ running and it's not 0, it means: do not run
+ before the specified time. */
+ pid_t pid; /* Script execution pid. */
+} sentinelScriptJob;
+
+/* ======================= hiredis ae.c adapters =============================
+ * Note: this implementation is taken from hiredis/adapters/ae.h, however
+ * we have our modified copy for Sentinel in order to use our allocator
+ * and to have full control over how the adapter works. */
+
+typedef struct redisAeEvents {
+ redisAsyncContext *context;
+ aeEventLoop *loop;
+ int fd;
+ int reading, writing;
+} redisAeEvents;
+
+static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
+ ((void)el); ((void)fd); ((void)mask);
+
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ redisAsyncHandleRead(e->context);
+}
+
+static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
+ ((void)el); ((void)fd); ((void)mask);
+
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ redisAsyncHandleWrite(e->context);
+}
+
+static void redisAeAddRead(void *privdata) {
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ aeEventLoop *loop = e->loop;
+ if (!e->reading) {
+ e->reading = 1;
+ aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
+ }
+}
+
+static void redisAeDelRead(void *privdata) {
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ aeEventLoop *loop = e->loop;
+ if (e->reading) {
+ e->reading = 0;
+ aeDeleteFileEvent(loop,e->fd,AE_READABLE);
+ }
+}
+
+static void redisAeAddWrite(void *privdata) {
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ aeEventLoop *loop = e->loop;
+ if (!e->writing) {
+ e->writing = 1;
+ aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
+ }
+}
+
+static void redisAeDelWrite(void *privdata) {
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ aeEventLoop *loop = e->loop;
+ if (e->writing) {
+ e->writing = 0;
+ aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
+ }
+}
+
+static void redisAeCleanup(void *privdata) {
+ redisAeEvents *e = (redisAeEvents*)privdata;
+ redisAeDelRead(privdata);
+ redisAeDelWrite(privdata);
+ zfree(e);
+}
+
+static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
+ redisContext *c = &(ac->c);
+ redisAeEvents *e;
+
+ /* Nothing should be attached when something is already attached */
+ if (ac->ev.data != NULL)
+ return REDIS_ERR;
+
+ /* Create container for context and r/w events */
+ e = (redisAeEvents*)zmalloc(sizeof(*e));
+ e->context = ac;
+ e->loop = loop;
+ e->fd = c->fd;
+ e->reading = e->writing = 0;
+
+ /* Register functions to start/stop listening for events */
+ ac->ev.addRead = redisAeAddRead;
+ ac->ev.delRead = redisAeDelRead;
+ ac->ev.addWrite = redisAeAddWrite;
+ ac->ev.delWrite = redisAeDelWrite;
+ ac->ev.cleanup = redisAeCleanup;
+ ac->ev.data = e;
+
+ return REDIS_OK;
+}
+
+/* ============================= Prototypes ================================= */
+
+void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
+void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
+void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
+sentinelRedisInstance *sentinelGetMasterByName(char *name);
+char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
+char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
+int yesnotoi(char *s);
+void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
+void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
+const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
+void sentinelAbortFailover(sentinelRedisInstance *ri);
+void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
+sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
+void sentinelScheduleScriptExecution(char *path, ...);
+void sentinelStartFailover(sentinelRedisInstance *master, int state);
+void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
+
+/* ========================= Dictionary types =============================== */
+
+unsigned int dictSdsHash(const void *key);
+int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
+void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
+
+void dictInstancesValDestructor (void *privdata, void *obj) {
+ releaseSentinelRedisInstance(obj);
+}
+
+/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
+ *
+ * also used for: sentinelRedisInstance->sentinels dictionary that maps
+ * sentinels ip:port to last seen time in Pub/Sub hello message. */
+dictType instancesDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ NULL, /* key destructor */
+ dictInstancesValDestructor /* val destructor */
+};
+
+/* Instance runid (sds) -> votes (long casted to void*)
+ *
+ * This is useful into sentinelGetObjectiveLeader() function in order to
+ * count the votes and understand who is the leader. */
+dictType leaderVotesDictType = {
+ dictSdsHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCompare, /* key compare */
+ NULL, /* key destructor */
+ NULL /* val destructor */
+};
+
+/* =========================== Initialization =============================== */
+
+void sentinelCommand(redisClient *c);
+void sentinelInfoCommand(redisClient *c);
+
+struct redisCommand sentinelcmds[] = {
+ {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
+ {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
+ {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
+ {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
+ {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
+ {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
+ {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0}
+};
+
+/* This function overwrites a few normal Redis config default with Sentinel
+ * specific defaults. */
+void initSentinelConfig(void) {
+ server.port = REDIS_SENTINEL_PORT;
+}
+
+/* Perform the Sentinel mode initialization. */
+void initSentinel(void) {
+ int j;
+
+ /* Remove usual Redis commands from the command table, then just add
+ * the SENTINEL command. */
+ dictEmpty(server.commands);
+ for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
+ int retval;
+ struct redisCommand *cmd = sentinelcmds+j;
+
+ retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
+ redisAssert(retval == DICT_OK);
+ }
+
+ /* Initialize various data structures. */
+ sentinel.masters = dictCreate(&instancesDictType,NULL);
+ sentinel.tilt = 0;
+ sentinel.tilt_start_time = mstime();
+ sentinel.previous_time = mstime();
+ sentinel.running_scripts = 0;
+ sentinel.scripts_queue = listCreate();
+}
+
+/* ============================== sentinelAddr ============================== */
+
+/* Create a sentinelAddr object and return it on success.
+ * On error NULL is returned and errno is set to:
+ * ENOENT: Can't resolve the hostname.
+ * EINVAL: Invalid port number.
+ */
+sentinelAddr *createSentinelAddr(char *hostname, int port) {
+ char buf[32];
+ sentinelAddr *sa;
+
+ if (port <= 0 || port > 65535) {
+ errno = EINVAL;
+ return NULL;
+ }
+ if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
+ errno = ENOENT;
+ return NULL;
+ }
+ sa = zmalloc(sizeof(*sa));
+ sa->ip = sdsnew(buf);
+ sa->port = port;
+ return sa;
+}
+
+/* Free a Sentinel address. Can't fail. */
+void releaseSentinelAddr(sentinelAddr *sa) {
+ sdsfree(sa->ip);
+ zfree(sa);
+}
+
+/* =========================== Events notification ========================== */
+
+/* Send an event to log, pub/sub, user notification script.
+ *
+ * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
+ * the execution of the user notification script.
+ *
+ * 'type' is the message type, also used as a pub/sub channel name.
+ *
+ * 'ri', is the redis instance target of this event if applicable, and is
+ * used to obtain the path of the notification script to execute.
+ *
+ * The remaining arguments are printf-alike.
+ * If the format specifier starts with the two characters "%@" then ri is
+ * not NULL, and the message is prefixed with an instance identifier in the
+ * following format:
+ *
+ * <instance type> <instance name> <ip> <port>
+ *
+ * If the instance type is not master, than the additional string is
+ * added to specify the originating master:
+ *
+ * @ <master name> <master ip> <master port>
+ *
+ * Any other specifier after "%@" is processed by printf itself.
+ */
+void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
+ const char *fmt, ...) {
+ va_list ap;
+ char msg[REDIS_MAX_LOGMSG_LEN];
+ robj *channel, *payload;
+
+ /* Handle %@ */
+ if (fmt[0] == '%' && fmt[1] == '@') {
+ sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
+ NULL : ri->master;
+
+ if (master) {
+ snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
+ sentinelRedisInstanceTypeStr(ri),
+ ri->name, ri->addr->ip, ri->addr->port,
+ master->name, master->addr->ip, master->addr->port);
+ } else {
+ snprintf(msg, sizeof(msg), "%s %s %s %d",
+ sentinelRedisInstanceTypeStr(ri),
+ ri->name, ri->addr->ip, ri->addr->port);
+ }
+ fmt += 2;
+ } else {
+ msg[0] = '\0';
+ }
+
+ /* Use vsprintf for the rest of the formatting if any. */
+ if (fmt[0] != '\0') {
+ va_start(ap, fmt);
+ vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
+ va_end(ap);
+ }
+
+ /* Log the message if the log level allows it to be logged. */
+ if (level >= server.verbosity)
+ redisLog(level,"%s %s",type,msg);
+
+ /* Publish the message via Pub/Sub if it's not a debugging one. */
+ if (level != REDIS_DEBUG) {
+ channel = createStringObject(type,strlen(type));
+ payload = createStringObject(msg,strlen(msg));
+ pubsubPublishMessage(channel,payload);
+ decrRefCount(channel);
+ decrRefCount(payload);
+ }
+
+ /* Call the notification script if applicable. */
+ if (level == REDIS_WARNING && ri != NULL) {
+ sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
+ ri : ri->master;
+ if (master->notification_script) {
+ sentinelScheduleScriptExecution(master->notification_script,
+ type,msg,NULL);
+ }
+ }
+}
+
+/* ============================ script execution ============================ */
+
+/* Release a script job structure and all the associated data. */
+void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
+ int j = 0;
+
+ while(sj->argv[j]) sdsfree(sj->argv[j++]);
+ zfree(sj->argv);
+ zfree(sj);
+}
+
+#define SENTINEL_SCRIPT_MAX_ARGS 16
+void sentinelScheduleScriptExecution(char *path, ...) {
+ va_list ap;
+ char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
+ int argc = 1;
+ sentinelScriptJob *sj;
+
+ va_start(ap, path);
+ while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
+ argv[argc] = va_arg(ap,char*);
+ if (!argv[argc]) break;
+ argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
+ argc++;
+ }
+ va_end(ap);
+ argv[0] = sdsnew(path);
+
+ sj = zmalloc(sizeof(*sj));
+ sj->flags = SENTINEL_SCRIPT_NONE;
+ sj->retry_num = 0;
+ sj->argv = zmalloc(sizeof(char*)*(argc+1));
+ sj->start_time = 0;
+ sj->pid = 0;
+ memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
+
+ listAddNodeTail(sentinel.scripts_queue,sj);
+
+ /* Remove the oldest non running script if we already hit the limit. */
+ if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
+ listNode *ln;
+ listIter li;
+
+ listRewind(sentinel.scripts_queue,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ sj = ln->value;
+
+ if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
+ /* The first node is the oldest as we add on tail. */
+ listDelNode(sentinel.scripts_queue,ln);
+ sentinelReleaseScriptJob(sj);
+ break;
+ }
+ redisAssert(listLength(sentinel.scripts_queue) <=
+ SENTINEL_SCRIPT_MAX_QUEUE);
+ }
+}
+
+/* Lookup a script in the scripts queue via pid, and returns the list node
+ * (so that we can easily remove it from the queue if needed). */
+listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
+ listNode *ln;
+ listIter li;
+
+ listRewind(sentinel.scripts_queue,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ sentinelScriptJob *sj = ln->value;
+
+ if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
+ return ln;
+ }
+ return NULL;
+}
+
+/* Run pending scripts if we are not already at max number of running
+ * scripts. */
+void sentinelRunPendingScripts(void) {
+ listNode *ln;
+ listIter li;
+ mstime_t now = mstime();
+
+ /* Find jobs that are not running and run them, from the top to the
+ * tail of the queue, so we run older jobs first. */
+ listRewind(sentinel.scripts_queue,&li);
+ while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
+ (ln = listNext(&li)) != NULL)
+ {
+ sentinelScriptJob *sj = ln->value;
+ pid_t pid;
+
+ /* Skip if already running. */
+ if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
+
+ /* Skip if it's a retry, but not enough time has elapsed. */
+ if (sj->start_time && sj->start_time > now) continue;
+
+ sj->flags |= SENTINEL_SCRIPT_RUNNING;
+ sj->start_time = mstime();
+ sj->retry_num++;
+ pid = fork();
+
+ if (pid == -1) {
+ /* Parent (fork error).
+ * We report fork errors as signal 99, in order to unify the
+ * reporting with other kind of errors. */
+ sentinelEvent(REDIS_WARNING,"-script-error",NULL,
+ "%s %d %d", sj->argv[0], 99, 0);
+ sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
+ sj->pid = 0;
+ } else if (pid == 0) {
+ /* Child */
+ execve(sj->argv[0],sj->argv,environ);
+ /* If we are here an error occurred. */
+ _exit(2); /* Don't retry execution. */
+ } else {
+ sentinel.running_scripts++;
+ sj->pid = pid;
+ sentinelEvent(REDIS_DEBUG,"+script-child",NULL,"%ld",(long)pid);
+ }
+ }
+}
+
+/* How much to delay the execution of a script that we need to retry after
+ * an error?
+ *
+ * We double the retry delay for every further retry we do. So for instance
+ * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
+ * starting from the second attempt to execute the script the delays are:
+ * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
+mstime_t sentinelScriptRetryDelay(int retry_num) {
+ mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
+
+ while (retry_num-- > 1) delay *= 2;
+ return delay;
+}
+
+/* Check for scripts that terminated, and remove them from the queue if the
+ * script terminated successfully. If instead the script was terminated by
+ * a signal, or returned exit code "1", it is scheduled to run again if
+ * the max number of retries did not already elapsed. */
+void sentinelCollectTerminatedScripts(void) {
+ int statloc;
+ pid_t pid;
+
+ while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
+ int exitcode = WEXITSTATUS(statloc);
+ int bysignal = 0;
+ listNode *ln;
+ sentinelScriptJob *sj;
+
+ if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
+ sentinelEvent(REDIS_DEBUG,"-script-child",NULL,"%ld %d %d",
+ (long)pid, exitcode, bysignal);
+
+ ln = sentinelGetScriptListNodeByPid(pid);
+ if (ln == NULL) {
+ redisLog(REDIS_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
+ continue;
+ }
+ sj = ln->value;
+
+ /* If the script was terminated by a signal or returns an
+ * exit code of "1" (that means: please retry), we reschedule it
+ * if the max number of retries is not already reached. */
+ if ((bysignal || exitcode == 1) &&
+ sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
+ {
+ sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
+ sj->pid = 0;
+ sj->start_time = mstime() +
+ sentinelScriptRetryDelay(sj->retry_num);
+ } else {
+ /* Otherwise let's remove the script, but log the event if the
+ * execution did not terminated in the best of the ways. */
+ if (bysignal || exitcode != 0) {
+ sentinelEvent(REDIS_WARNING,"-script-error",NULL,
+ "%s %d %d", sj->argv[0], bysignal, exitcode);
+ }
+ listDelNode(sentinel.scripts_queue,ln);
+ sentinelReleaseScriptJob(sj);
+ sentinel.running_scripts--;
+ }
+ }
+}
+
+/* Kill scripts in timeout, they'll be collected by the
+ * sentinelCollectTerminatedScripts() function. */
+void sentinelKillTimedoutScripts(void) {
+ listNode *ln;
+ listIter li;
+ mstime_t now = mstime();
+
+ listRewind(sentinel.scripts_queue,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ sentinelScriptJob *sj = ln->value;
+
+ if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
+ (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
+ {
+ sentinelEvent(REDIS_WARNING,"-script-timeout",NULL,"%s %ld",
+ sj->argv[0], (long)sj->pid);
+ kill(sj->pid,SIGKILL);
+ }
+ }
+}
+
+/* Implements SENTINEL PENDING-SCRIPTS command. */
+void sentinelPendingScriptsCommand(redisClient *c) {
+ listNode *ln;
+ listIter li;
+
+ addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
+ listRewind(sentinel.scripts_queue,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ sentinelScriptJob *sj = ln->value;
+ int j = 0;
+
+ addReplyMultiBulkLen(c,10);
+
+ addReplyBulkCString(c,"argv");
+ while (sj->argv[j]) j++;
+ addReplyMultiBulkLen(c,j);
+ j = 0;
+ while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
+
+ addReplyBulkCString(c,"flags");
+ addReplyBulkCString(c,
+ (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
+
+ addReplyBulkCString(c,"pid");
+ addReplyBulkLongLong(c,sj->pid);
+
+ if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
+ addReplyBulkCString(c,"run-time");
+ addReplyBulkLongLong(c,mstime() - sj->start_time);
+ } else {
+ mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
+ if (delay < 0) delay = 0;
+ addReplyBulkCString(c,"run-delay");
+ addReplyBulkLongLong(c,delay);
+ }
+
+ addReplyBulkCString(c,"retry-num");
+ addReplyBulkLongLong(c,sj->retry_num);
+ }
+}
+
+/* This function calls, if any, the client reconfiguration script with the
+ * following parameters:
+ *
+ * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
+ *
+ * It is called every time a failover starts, ends, or is aborted.
+ *
+ * <state> is "start", "end" or "abort".
+ * <role> is either "leader" or "observer".
+ *
+ * from/to fields are respectively master -> promoted slave addresses for
+ * "start" and "end", or the reverse (promoted slave -> master) in case of
+ * "abort".
+ */
+void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
+ char fromport[32], toport[32];
+
+ if (master->client_reconfig_script == NULL) return;
+ ll2string(fromport,sizeof(fromport),from->port);
+ ll2string(toport,sizeof(toport),to->port);
+ sentinelScheduleScriptExecution(master->client_reconfig_script,
+ master->name,
+ (role == SENTINEL_LEADER) ? "leader" : "observer",
+ state, from->ip, fromport, to->ip, toport, NULL);
+}
+
+/* ========================== sentinelRedisInstance ========================= */
+
+/* Create a redis instance, the following fields must be populated by the
+ * caller if needed:
+ * runid: set to NULL but will be populated once INFO output is received.
+ * info_refresh: is set to 0 to mean that we never received INFO so far.
+ *
+ * If SRI_MASTER is set into initial flags the instance is added to
+ * sentinel.masters table.
+ *
+ * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
+ * instance is added into master->slaves or master->sentinels table.
+ *
+ * If the instance is a slave or sentinel, the name parameter is ignored and
+ * is created automatically as hostname:port.
+ *
+ * The function fails if hostname can't be resolved or port is out of range.
+ * When this happens NULL is returned and errno is set accordingly to the
+ * createSentinelAddr() function.
+ *
+ * The function may also fail and return NULL with errno set to EBUSY if
+ * a master or slave with the same name already exists. */
+sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
+ sentinelRedisInstance *ri;
+ sentinelAddr *addr;
+ dict *table = NULL;
+ char slavename[128], *sdsname;
+
+ redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
+ redisAssert((flags & SRI_MASTER) || master != NULL);
+
+ /* Check address validity. */
+ addr = createSentinelAddr(hostname,port);
+ if (addr == NULL) return NULL;
+
+ /* For slaves and sentinel we use ip:port as name. */
+ if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
+ snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
+ name = slavename;
+ }
+
+ /* Make sure the entry is not duplicated. This may happen when the same
+ * name for a master is used multiple times inside the configuration or
+ * if we try to add multiple times a slave or sentinel with same ip/port
+ * to a master. */
+ if (flags & SRI_MASTER) table = sentinel.masters;
+ else if (flags & SRI_SLAVE) table = master->slaves;
+ else if (flags & SRI_SENTINEL) table = master->sentinels;
+ sdsname = sdsnew(name);
+ if (dictFind(table,sdsname)) {
+ sdsfree(sdsname);
+ errno = EBUSY;
+ return NULL;
+ }
+
+ /* Create the instance object. */
+ ri = zmalloc(sizeof(*ri));
+ /* Note that all the instances are started in the disconnected state,
+ * the event loop will take care of connecting them. */
+ ri->flags = flags | SRI_DISCONNECTED;
+ ri->name = sdsname;
+ ri->runid = NULL;
+ ri->addr = addr;
+ ri->cc = NULL;
+ ri->pc = NULL;
+ ri->pending_commands = 0;
+ ri->cc_conn_time = 0;
+ ri->pc_conn_time = 0;
+ ri->pc_last_activity = 0;
+ ri->last_avail_time = mstime();
+ ri->last_pong_time = mstime();
+ ri->last_pub_time = mstime();
+ ri->last_hello_time = mstime();
+ ri->last_master_down_reply_time = mstime();
+ ri->s_down_since_time = 0;
+ ri->o_down_since_time = 0;
+ ri->down_after_period = master ? master->down_after_period :
+ SENTINEL_DOWN_AFTER_PERIOD;
+ ri->master_link_down_time = 0;
+ ri->auth_pass = NULL;
+ ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
+ ri->slave_reconf_sent_time = 0;
+ ri->slave_master_host = NULL;
+ ri->slave_master_port = 0;
+ ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
+ ri->sentinels = dictCreate(&instancesDictType,NULL);
+ ri->quorum = quorum;
+ ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
+ ri->master = master;
+ ri->slaves = dictCreate(&instancesDictType,NULL);
+ ri->info_refresh = 0;
+
+ /* Failover state. */
+ ri->leader = NULL;
+ ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+ ri->failover_state_change_time = 0;
+ ri->failover_start_time = 0;
+ ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
+ ri->promoted_slave = NULL;
+ ri->notification_script = NULL;
+ ri->client_reconfig_script = NULL;
+
+ /* Add into the right table. */
+ dictAdd(table, ri->name, ri);
+ return ri;
+}
+
+/* Release this instance and all its slaves, sentinels, hiredis connections.
+ * This function also takes care of unlinking the instance from the main
+ * masters table (if it is a master) or from its master sentinels/slaves table
+ * if it is a slave or sentinel. */
+void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
+ /* Release all its slaves or sentinels if any. */
+ dictRelease(ri->sentinels);
+ dictRelease(ri->slaves);
+
+ /* Release hiredis connections. */
+ if (ri->cc) sentinelKillLink(ri,ri->cc);
+ if (ri->pc) sentinelKillLink(ri,ri->pc);
+
+ /* Free other resources. */
+ sdsfree(ri->name);
+ sdsfree(ri->runid);
+ sdsfree(ri->notification_script);
+ sdsfree(ri->client_reconfig_script);
+ sdsfree(ri->slave_master_host);
+ sdsfree(ri->leader);
+ sdsfree(ri->auth_pass);
+ releaseSentinelAddr(ri->addr);
+
+ /* Clear state into the master if needed. */
+ if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
+ ri->master->promoted_slave = NULL;
+
+ zfree(ri);
+}
+
+/* Lookup a slave in a master Redis instance, by ip and port. */
+sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
+ sentinelRedisInstance *ri, char *ip, int port)
+{
+ sds key;
+ sentinelRedisInstance *slave;
+
+ redisAssert(ri->flags & SRI_MASTER);
+ key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
+ slave = dictFetchValue(ri->slaves,key);
+ sdsfree(key);
+ return slave;
+}
+
+/* Return the name of the type of the instance as a string. */
+const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
+ if (ri->flags & SRI_MASTER) return "master";
+ else if (ri->flags & SRI_SLAVE) return "slave";
+ else if (ri->flags & SRI_SENTINEL) return "sentinel";
+ else return "unknown";
+}
+
+/* This function removes all the instances found in the dictionary of instances
+ * 'd', having either:
+ *
+ * 1) The same ip/port as specified.
+ * 2) The same runid.
+ *
+ * "1" and "2" don't need to verify at the same time, just one is enough.
+ * If "runid" is NULL it is not checked.
+ * Similarly if "ip" is NULL it is not checked.
+ *
+ * This function is useful because every time we add a new Sentinel into
+ * a master's Sentinels dictionary, we want to be very sure about not
+ * having duplicated instances for any reason. This is so important because
+ * we use those other sentinels in order to run our quorum protocol to
+ * understand if it's time to proceeed with the fail over.
+ *
+ * Making sure no duplication is possible we greately improve the robustness
+ * of the quorum (otherwise we may end counting the same instance multiple
+ * times for some reason).
+ *
+ * The function returns the number of Sentinels removed. */
+int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
+ dictIterator *di;
+ dictEntry *de;
+ int removed = 0;
+
+ di = dictGetSafeIterator(master->sentinels);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
+ (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
+ {
+ dictDelete(master->sentinels,ri->name);
+ removed++;
+ }
+ }
+ dictReleaseIterator(di);
+ return removed;
+}
+
+/* Search an instance with the same runid, ip and port into a dictionary
+ * of instances. Return NULL if not found, otherwise return the instance
+ * pointer.
+ *
+ * runid or ip can be NULL. In such a case the search is performed only
+ * by the non-NULL field. */
+sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
+ dictIterator *di;
+ dictEntry *de;
+ sentinelRedisInstance *instance = NULL;
+
+ redisAssert(ip || runid); /* User must pass at least one search param. */
+ di = dictGetIterator(instances);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ if (runid && !ri->runid) continue;
+ if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
+ (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
+ ri->addr->port == port)))
+ {
+ instance = ri;
+ break;
+ }
+ }
+ dictReleaseIterator(di);
+ return instance;
+}
+
+/* Simple master lookup by name */
+sentinelRedisInstance *sentinelGetMasterByName(char *name) {
+ sentinelRedisInstance *ri;
+ sds sdsname = sdsnew(name);
+
+ ri = dictFetchValue(sentinel.masters,sdsname);
+ sdsfree(sdsname);
+ return ri;
+}
+
+/* Add the specified flags to all the instances in the specified dictionary. */
+void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
+ dictIterator *di;
+ dictEntry *de;
+
+ di = dictGetIterator(instances);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ ri->flags |= flags;
+ }
+ dictReleaseIterator(di);
+}
+
+/* Remove the specified flags to all the instances in the specified
+ * dictionary. */
+void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
+ dictIterator *di;
+ dictEntry *de;
+
+ di = dictGetIterator(instances);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ ri->flags &= ~flags;
+ }
+ dictReleaseIterator(di);
+}
+
+/* Reset the state of a monitored master:
+ * 1) Remove all slaves.
+ * 2) Remove all sentinels.
+ * 3) Remove most of the flags resulting from runtime operations.
+ * 4) Reset timers to their default value.
+ * 5) In the process of doing this undo the failover if in progress.
+ * 6) Disconnect the connections with the master (will reconnect automatically).
+ */
+void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
+ redisAssert(ri->flags & SRI_MASTER);
+ dictRelease(ri->slaves);
+ dictRelease(ri->sentinels);
+ ri->slaves = dictCreate(&instancesDictType,NULL);
+ ri->sentinels = dictCreate(&instancesDictType,NULL);
+ if (ri->cc) sentinelKillLink(ri,ri->cc);
+ if (ri->pc) sentinelKillLink(ri,ri->pc);
+ ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
+ if (ri->leader) {
+ sdsfree(ri->leader);
+ ri->leader = NULL;
+ }
+ ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+ ri->failover_state_change_time = 0;
+ ri->failover_start_time = 0;
+ ri->promoted_slave = NULL;
+ sdsfree(ri->runid);
+ sdsfree(ri->slave_master_host);
+ ri->runid = NULL;
+ ri->slave_master_host = NULL;
+ ri->last_avail_time = mstime();
+ ri->last_pong_time = mstime();
+ if (flags & SENTINEL_GENERATE_EVENT)
+ sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
+}
+
+/* Call sentinelResetMaster() on every master with a name matching the specified
+ * pattern. */
+int sentinelResetMastersByPattern(char *pattern, int flags) {
+ dictIterator *di;
+ dictEntry *de;
+ int reset = 0;
+
+ di = dictGetIterator(sentinel.masters);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ if (ri->name) {
+ if (stringmatch(pattern,ri->name,0)) {
+ sentinelResetMaster(ri,flags);
+ reset++;
+ }
+ }
+ }
+ dictReleaseIterator(di);
+ return reset;
+}
+
+/* Reset the specified master with sentinelResetMaster(), and also change
+ * the ip:port address, but take the name of the instance unmodified.
+ *
+ * This is used to handle the +switch-master and +redirect-to-master events.
+ *
+ * The function returns REDIS_ERR if the address can't be resolved for some
+ * reason. Otherwise REDIS_OK is returned.
+ *
+ * TODO: make this reset so that original sentinels are re-added with
+ * same ip / port / runid.
+ */
+
+int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
+ sentinelAddr *oldaddr, *newaddr;
+
+ newaddr = createSentinelAddr(ip,port);
+ if (newaddr == NULL) return REDIS_ERR;
+ sentinelResetMaster(master,SENTINEL_NO_FLAGS);
+ oldaddr = master->addr;
+ master->addr = newaddr;
+ /* Release the old address at the end so we are safe even if the function
+ * gets the master->addr->ip and master->addr->port as arguments. */
+ releaseSentinelAddr(oldaddr);
+ return REDIS_OK;
+}
+
+/* ============================ Config handling ============================= */
+char *sentinelHandleConfiguration(char **argv, int argc) {
+ sentinelRedisInstance *ri;
+
+ if (!strcasecmp(argv[0],"monitor") && argc == 5) {
+ /* monitor <name> <host> <port> <quorum> */
+ int quorum = atoi(argv[4]);
+
+ if (quorum <= 0) return "Quorum must be 1 or greater.";
+ if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
+ atoi(argv[3]),quorum,NULL) == NULL)
+ {
+ switch(errno) {
+ case EBUSY: return "Duplicated master name.";
+ case ENOENT: return "Can't resolve master instance hostname.";
+ case EINVAL: return "Invalid port number";
+ }
+ }
+ } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
+ /* down-after-milliseconds <name> <milliseconds> */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ ri->down_after_period = atoi(argv[2]);
+ if (ri->down_after_period <= 0)
+ return "negative or zero time parameter.";
+ } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
+ /* failover-timeout <name> <milliseconds> */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ ri->failover_timeout = atoi(argv[2]);
+ if (ri->failover_timeout <= 0)
+ return "negative or zero time parameter.";
+ } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
+ /* can-failover <name> <yes/no> */
+ int yesno = yesnotoi(argv[2]);
+
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ if (yesno == -1) return "Argument must be either yes or no.";
+ if (yesno)
+ ri->flags |= SRI_CAN_FAILOVER;
+ else
+ ri->flags &= ~SRI_CAN_FAILOVER;
+ } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
+ /* parallel-syncs <name> <milliseconds> */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ ri->parallel_syncs = atoi(argv[2]);
+ } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
+ /* notification-script <name> <path> */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ if (access(argv[2],X_OK) == -1)
+ return "Notification script seems non existing or non executable.";
+ ri->notification_script = sdsnew(argv[2]);
+ } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
+ /* client-reconfig-script <name> <path> */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ if (access(argv[2],X_OK) == -1)
+ return "Client reconfiguration script seems non existing or "
+ "non executable.";
+ ri->client_reconfig_script = sdsnew(argv[2]);
+ } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
+ /* auth-pass <name> <password> */
+ ri = sentinelGetMasterByName(argv[1]);
+ if (!ri) return "No such master with specified name.";
+ ri->auth_pass = sdsnew(argv[2]);
+ } else {
+ return "Unrecognized sentinel configuration statement.";
+ }
+ return NULL;
+}
+
+/* ====================== hiredis connection handling ======================= */
+
+/* Completely disconnect an hiredis link from an instance. */
+void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
+ if (ri->cc == c) {
+ ri->cc = NULL;
+ ri->pending_commands = 0;
+ }
+ if (ri->pc == c) ri->pc = NULL;
+ c->data = NULL;
+ ri->flags |= SRI_DISCONNECTED;
+ redisAsyncFree(c);
+}
+
+/* This function takes an hiredis context that is in an error condition
+ * and make sure to mark the instance as disconnected performing the
+ * cleanup needed.
+ *
+ * Note: we don't free the hiredis context as hiredis will do it for us
+ * for async conenctions. */
+void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
+ sentinelRedisInstance *ri = c->data;
+ int pubsub;
+
+ if (ri == NULL) return; /* The instance no longer exists. */
+
+ pubsub = (ri->pc == c);
+ sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
+ "%@ #%s", c->errstr);
+ if (pubsub)
+ ri->pc = NULL;
+ else
+ ri->cc = NULL;
+ ri->flags |= SRI_DISCONNECTED;
+}
+
+void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
+ if (status != REDIS_OK) {
+ sentinelDisconnectInstanceFromContext(c);
+ } else {
+ sentinelRedisInstance *ri = c->data;
+ int pubsub = (ri->pc == c);
+
+ sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
+ "%@");
+ }
+}
+
+void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
+ sentinelDisconnectInstanceFromContext(c);
+}
+
+/* Send the AUTH command with the specified master password if needed.
+ * Note that for slaves the password set for the master is used.
+ *
+ * We don't check at all if the command was successfully transmitted
+ * to the instance as if it fails Sentinel will detect the instance down,
+ * will disconnect and reconnect the link and so forth. */
+void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
+ char *auth_pass = (ri->flags & SRI_MASTER) ? ri->auth_pass :
+ ri->master->auth_pass;
+
+ if (auth_pass)
+ redisAsyncCommand(c, sentinelDiscardReplyCallback, NULL, "AUTH %s",
+ auth_pass);
+}
+
+/* Create the async connections for the specified instance if the instance
+ * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
+ * one of the two links (commands and pub/sub) is missing. */
+void sentinelReconnectInstance(sentinelRedisInstance *ri) {
+ if (!(ri->flags & SRI_DISCONNECTED)) return;
+
+ /* Commands connection. */
+ if (ri->cc == NULL) {
+ ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
+ if (ri->cc->err) {
+ sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
+ ri->cc->errstr);
+ sentinelKillLink(ri,ri->cc);
+ } else {
+ ri->cc_conn_time = mstime();
+ ri->cc->data = ri;
+ redisAeAttach(server.el,ri->cc);
+ redisAsyncSetConnectCallback(ri->cc,
+ sentinelLinkEstablishedCallback);
+ redisAsyncSetDisconnectCallback(ri->cc,
+ sentinelDisconnectCallback);
+ sentinelSendAuthIfNeeded(ri,ri->cc);
+ }
+ }
+ /* Pub / Sub */
+ if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
+ ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
+ if (ri->pc->err) {
+ sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
+ ri->pc->errstr);
+ sentinelKillLink(ri,ri->pc);
+ } else {
+ int retval;
+
+ ri->pc_conn_time = mstime();
+ ri->pc->data = ri;
+ redisAeAttach(server.el,ri->pc);
+ redisAsyncSetConnectCallback(ri->pc,
+ sentinelLinkEstablishedCallback);
+ redisAsyncSetDisconnectCallback(ri->pc,
+ sentinelDisconnectCallback);
+ sentinelSendAuthIfNeeded(ri,ri->pc);
+ /* Now we subscribe to the Sentinels "Hello" channel. */
+ retval = redisAsyncCommand(ri->pc,
+ sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
+ SENTINEL_HELLO_CHANNEL);
+ if (retval != REDIS_OK) {
+ /* If we can't subscribe, the Pub/Sub connection is useless
+ * and we can simply disconnect it and try again. */
+ sentinelKillLink(ri,ri->pc);
+ return;
+ }
+ }
+ }
+ /* Clear the DISCONNECTED flags only if we have both the connections
+ * (or just the commands connection if this is a slave or a
+ * sentinel instance). */
+ if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
+ ri->flags &= ~SRI_DISCONNECTED;
+}
+
+/* ======================== Redis instances pinging ======================== */
+
+/* Process the INFO output from masters. */
+void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
+ sds *lines;
+ int numlines, j;
+ int role = 0;
+ int runid_changed = 0; /* true if runid changed. */
+ int first_runid = 0; /* true if this is the first runid we receive. */
+
+ /* The following fields must be reset to a given value in the case they
+ * are not found at all in the INFO output. */
+ ri->master_link_down_time = 0;
+
+ /* Process line by line. */
+ lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
+ for (j = 0; j < numlines; j++) {
+ sentinelRedisInstance *slave;
+ sds l = lines[j];
+
+ /* run_id:<40 hex chars>*/
+ if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
+ if (ri->runid == NULL) {
+ ri->runid = sdsnewlen(l+7,40);
+ first_runid = 1;
+ } else {
+ if (strncmp(ri->runid,l+7,40) != 0) {
+ runid_changed = 1;
+ sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
+ sdsfree(ri->runid);
+ ri->runid = sdsnewlen(l+7,40);
+ }
+ }
+ }
+
+ /* slave0:<ip>,<port>,<state> */
+ if ((ri->flags & SRI_MASTER) &&
+ sdslen(l) >= 7 &&
+ !memcmp(l,"slave",5) && isdigit(l[5]))
+ {
+ char *ip, *port, *end;
+
+ ip = strchr(l,':'); if (!ip) continue;
+ ip++; /* Now ip points to start of ip address. */
+ port = strchr(ip,','); if (!port) continue;
+ *port = '\0'; /* nul term for easy access. */
+ port++; /* Now port points to start of port number. */
+ end = strchr(port,','); if (!end) continue;
+ *end = '\0'; /* nul term for easy access. */
+
+ /* Check if we already have this slave into our table,
+ * otherwise add it. */
+ if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
+ if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
+ atoi(port), ri->quorum,ri)) != NULL)
+ {
+ sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
+ }
+ }
+ }
+
+ /* master_link_down_since_seconds:<seconds> */
+ if (sdslen(l) >= 32 &&
+ !memcmp(l,"master_link_down_since_seconds",30))
+ {
+ ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
+ }
+
+ /* role:<role> */
+ if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
+ else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
+
+ if (role == SRI_SLAVE) {
+ /* master_host:<host> */
+ if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
+ sdsfree(ri->slave_master_host);
+ ri->slave_master_host = sdsnew(l+12);
+ }
+
+ /* master_port:<port> */
+ if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
+ ri->slave_master_port = atoi(l+12);
+
+ /* master_link_status:<status> */
+ if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
+ ri->slave_master_link_status =
+ (strcasecmp(l+19,"up") == 0) ?
+ SENTINEL_MASTER_LINK_STATUS_UP :
+ SENTINEL_MASTER_LINK_STATUS_DOWN;
+ }
+
+ /* slave_priority:<priority> */
+ if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
+ ri->slave_priority = atoi(l+15);
+ }
+ }
+ ri->info_refresh = mstime();
+ sdsfreesplitres(lines,numlines);
+
+ /* ---------------------------- Acting half ----------------------------- */
+ if (sentinel.tilt) return;
+
+ /* Act if a master turned into a slave. */
+ if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
+ if ((first_runid || runid_changed) && ri->slave_master_host) {
+ /* If it is the first time we receive INFO from it, but it's
+ * a slave while it was configured as a master, we want to monitor
+ * its master instead. */
+ sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
+ "%s %s %d %s %d",
+ ri->name, ri->addr->ip, ri->addr->port,
+ ri->slave_master_host, ri->slave_master_port);
+ sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
+ ri->slave_master_port);
+ return;
+ }
+ }
+
+ /* Act if a slave turned into a master. */
+ if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
+ if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+ (runid_changed || first_runid))
+ {
+ /* If a slave turned into master but:
+ *
+ * 1) Failover not in progress.
+ * 2) RunID hs changed, or its the first time we see an INFO output.
+ *
+ * We assume this is a reboot with a wrong configuration.
+ * Log the event and remove the slave. */
+ int retval;
+
+ sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
+ retval = dictDelete(ri->master->slaves,ri->name);
+ redisAssert(retval == REDIS_OK);
+ return;
+ } else if (ri->flags & SRI_PROMOTED) {
+ /* If this is a promoted slave we can change state to the
+ * failover state machine. */
+ if ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+ (ri->master->flags & SRI_I_AM_THE_LEADER) &&
+ (ri->master->failover_state ==
+ SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
+ {
+ ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
+ ri->master->failover_state_change_time = mstime();
+ sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
+ sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
+ ri->master,"%@");
+ sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
+ "start",ri->master->addr,ri->addr);
+ }
+ } else if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) ||
+ ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+ (ri->master->flags & SRI_I_AM_THE_LEADER) &&
+ ri->master->failover_state ==
+ SENTINEL_FAILOVER_STATE_WAIT_START))
+ {
+ /* No failover in progress? Then it is the start of a failover
+ * and we are an observer.
+ *
+ * We also do that if we are a leader doing a failover, in wait
+ * start, but well, somebody else started before us. */
+
+ if (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) {
+ sentinelEvent(REDIS_WARNING,"-failover-abort-race",
+ ri->master, "%@");
+ sentinelAbortFailover(ri->master);
+ }
+
+ ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
+ sentinelEvent(REDIS_WARNING,"+failover-detected",ri->master,"%@");
+ ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
+ ri->master->failover_state_change_time = mstime();
+ ri->master->promoted_slave = ri;
+ ri->flags |= SRI_PROMOTED;
+ sentinelCallClientReconfScript(ri->master,SENTINEL_OBSERVER,
+ "start", ri->master->addr,ri->addr);
+ /* We are an observer, so we can only assume that the leader
+ * is reconfiguring the slave instances. For this reason we
+ * set all the instances as RECONF_SENT waiting for progresses
+ * on this side. */
+ sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
+ SRI_RECONF_SENT);
+ }
+ }
+
+ /* Detect if the slave that is in the process of being reconfigured
+ * changed state. */
+ if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
+ (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
+ {
+ /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
+ if ((ri->flags & SRI_RECONF_SENT) &&
+ ri->slave_master_host &&
+ strcmp(ri->slave_master_host,
+ ri->master->promoted_slave->addr->ip) == 0 &&
+ ri->slave_master_port == ri->master->promoted_slave->addr->port)
+ {
+ ri->flags &= ~SRI_RECONF_SENT;
+ ri->flags |= SRI_RECONF_INPROG;
+ sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
+ }
+
+ /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
+ if ((ri->flags & SRI_RECONF_INPROG) &&
+ ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
+ {
+ ri->flags &= ~SRI_RECONF_INPROG;
+ ri->flags |= SRI_RECONF_DONE;
+ sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
+ /* If we are moving forward (a new slave is now configured)
+ * we update the change_time as we are conceptually passing
+ * to the next slave. */
+ ri->failover_state_change_time = mstime();
+ }
+ }
+}
+
+void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+ redisReply *r;
+
+ if (ri) ri->pending_commands--;
+ if (!reply || !ri) return;
+ r = reply;
+
+ if (r->type == REDIS_REPLY_STRING) {
+ sentinelRefreshInstanceInfo(ri,r->str);
+ }
+}
+
+/* Just discard the reply. We use this when we are not monitoring the return
+ * value of the command but its effects directly. */
+void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+
+ if (ri) ri->pending_commands--;
+}
+
+void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+ redisReply *r;
+
+ if (ri) ri->pending_commands--;
+ if (!reply || !ri) return;
+ r = reply;
+
+ if (r->type == REDIS_REPLY_STATUS ||
+ r->type == REDIS_REPLY_ERROR) {
+ /* Update the "instance available" field only if this is an
+ * acceptable reply. */
+ if (strncmp(r->str,"PONG",4) == 0 ||
+ strncmp(r->str,"LOADING",7) == 0 ||
+ strncmp(r->str,"MASTERDOWN",10) == 0)
+ {
+ ri->last_avail_time = mstime();
+ } else {
+ /* Send a SCRIPT KILL command if the instance appears to be
+ * down because of a busy script. */
+ if (strncmp(r->str,"BUSY",4) == 0 &&
+ (ri->flags & SRI_S_DOWN) &&
+ !(ri->flags & SRI_SCRIPT_KILL_SENT))
+ {
+ redisAsyncCommand(ri->cc,
+ sentinelDiscardReplyCallback, NULL, "SCRIPT KILL");
+ ri->flags |= SRI_SCRIPT_KILL_SENT;
+ }
+ }
+ }
+ ri->last_pong_time = mstime();
+}
+
+/* This is called when we get the reply about the PUBLISH command we send
+ * to the master to advertise this sentinel. */
+void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+ redisReply *r;
+
+ if (ri) ri->pending_commands--;
+ if (!reply || !ri) return;
+ r = reply;
+
+ /* Only update pub_time if we actually published our message. Otherwise
+ * we'll retry against in 100 milliseconds. */
+ if (r->type != REDIS_REPLY_ERROR)
+ ri->last_pub_time = mstime();
+}
+
+/* This is our Pub/Sub callback for the Hello channel. It's useful in order
+ * to discover other sentinels attached at the same master. */
+void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+ redisReply *r;
+
+ if (!reply || !ri) return;
+ r = reply;
+
+ /* Update the last activity in the pubsub channel. Note that since we
+ * receive our messages as well this timestamp can be used to detect
+ * if the link is probably diconnected even if it seems otherwise. */
+ ri->pc_last_activity = mstime();
+
+ /* Sanity check in the reply we expect, so that the code that follows
+ * can avoid to check for details. */
+ if (r->type != REDIS_REPLY_ARRAY ||
+ r->elements != 3 ||
+ r->element[0]->type != REDIS_REPLY_STRING ||
+ r->element[1]->type != REDIS_REPLY_STRING ||
+ r->element[2]->type != REDIS_REPLY_STRING ||
+ strcmp(r->element[0]->str,"message") != 0) return;
+
+ /* We are not interested in meeting ourselves */
+ if (strstr(r->element[2]->str,server.runid) != NULL) return;
+
+ {
+ int numtokens, port, removed, canfailover;
+ char **token = sdssplitlen(r->element[2]->str,
+ r->element[2]->len,
+ ":",1,&numtokens);
+ sentinelRedisInstance *sentinel;
+
+ if (numtokens == 4) {
+ /* First, try to see if we already have this sentinel. */
+ port = atoi(token[1]);
+ canfailover = atoi(token[3]);
+ sentinel = getSentinelRedisInstanceByAddrAndRunID(
+ ri->sentinels,token[0],port,token[2]);
+
+ if (!sentinel) {
+ /* If not, remove all the sentinels that have the same runid
+ * OR the same ip/port, because it's either a restart or a
+ * network topology change. */
+ removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
+ token[2]);
+ if (removed) {
+ sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
+ "%@ #duplicate of %s:%d or %s",
+ token[0],port,token[2]);
+ }
+
+ /* Add the new sentinel. */
+ sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
+ token[0],port,ri->quorum,ri);
+ if (sentinel) {
+ sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
+ /* The runid is NULL after a new instance creation and
+ * for Sentinels we don't have a later chance to fill it,
+ * so do it now. */
+ sentinel->runid = sdsnew(token[2]);
+ }
+ }
+
+ /* Update the state of the Sentinel. */
+ if (sentinel) {
+ sentinel->last_hello_time = mstime();
+ if (canfailover)
+ sentinel->flags |= SRI_CAN_FAILOVER;
+ else
+ sentinel->flags &= ~SRI_CAN_FAILOVER;
+ }
+ }
+ sdsfreesplitres(token,numtokens);
+ }
+}
+
+void sentinelPingInstance(sentinelRedisInstance *ri) {
+ mstime_t now = mstime();
+ mstime_t info_period;
+ int retval;
+
+ /* Return ASAP if we have already a PING or INFO already pending, or
+ * in the case the instance is not properly connected. */
+ if (ri->flags & SRI_DISCONNECTED) return;
+
+ /* For INFO, PING, PUBLISH that are not critical commands to send we
+ * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
+ * want to use a lot of memory just because a link is not working
+ * properly (note that anyway there is a redundant protection about this,
+ * that is, the link will be disconnected and reconnected if a long
+ * timeout condition is detected. */
+ if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
+
+ /* If this is a slave of a master in O_DOWN condition we start sending
+ * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
+ * period. In this state we want to closely monitor slaves in case they
+ * are turned into masters by another Sentinel, or by the sysadmin. */
+ if ((ri->flags & SRI_SLAVE) &&
+ (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
+ info_period = 1000;
+ } else {
+ info_period = SENTINEL_INFO_PERIOD;
+ }
+
+ if ((ri->flags & SRI_SENTINEL) == 0 &&
+ (ri->info_refresh == 0 ||
+ (now - ri->info_refresh) > info_period))
+ {
+ /* Send INFO to masters and slaves, not sentinels. */
+ retval = redisAsyncCommand(ri->cc,
+ sentinelInfoReplyCallback, NULL, "INFO");
+ if (retval != REDIS_OK) return;
+ ri->pending_commands++;
+ } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
+ /* Send PING to all the three kinds of instances. */
+ retval = redisAsyncCommand(ri->cc,
+ sentinelPingReplyCallback, NULL, "PING");
+ if (retval != REDIS_OK) return;
+ ri->pending_commands++;
+ } else if ((ri->flags & SRI_MASTER) &&
+ (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
+ {
+ /* PUBLISH hello messages only to masters. */
+ struct sockaddr_in sa;
+ socklen_t salen = sizeof(sa);
+
+ if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
+ char myaddr[128];
+
+ snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
+ inet_ntoa(sa.sin_addr), server.port, server.runid,
+ (ri->flags & SRI_CAN_FAILOVER) != 0);
+ retval = redisAsyncCommand(ri->cc,
+ sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
+ SENTINEL_HELLO_CHANNEL,myaddr);
+ if (retval != REDIS_OK) return;
+ ri->pending_commands++;
+ }
+ }
+}
+
+/* =========================== SENTINEL command ============================= */
+
+const char *sentinelFailoverStateStr(int state) {
+ switch(state) {
+ case SENTINEL_FAILOVER_STATE_NONE: return "none";
+ case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
+ case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
+ case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
+ case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
+ case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
+ case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
+ case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
+ case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
+ default: return "unknown";
+ }
+}
+
+/* Redis instance to Redis protocol representation. */
+void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
+ char *flags = sdsempty();
+ void *mbl;
+ int fields = 0;
+
+ mbl = addDeferredMultiBulkLength(c);
+
+ addReplyBulkCString(c,"name");
+ addReplyBulkCString(c,ri->name);
+ fields++;
+
+ addReplyBulkCString(c,"ip");
+ addReplyBulkCString(c,ri->addr->ip);
+ fields++;
+
+ addReplyBulkCString(c,"port");
+ addReplyBulkLongLong(c,ri->addr->port);
+ fields++;
+
+ addReplyBulkCString(c,"runid");
+ addReplyBulkCString(c,ri->runid ? ri->runid : "");
+ fields++;
+
+ addReplyBulkCString(c,"flags");
+ if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
+ if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
+ if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
+ if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
+ if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
+ if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
+ if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
+ if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
+ flags = sdscat(flags,"failover_in_progress,");
+ if (ri->flags & SRI_I_AM_THE_LEADER)
+ flags = sdscat(flags,"i_am_the_leader,");
+ if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
+ if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
+ if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
+ if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
+
+ if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
+ addReplyBulkCString(c,flags);
+ sdsfree(flags);
+ fields++;
+
+ addReplyBulkCString(c,"pending-commands");
+ addReplyBulkLongLong(c,ri->pending_commands);
+ fields++;
+
+ if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
+ addReplyBulkCString(c,"failover-state");
+ addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
+ fields++;
+ }
+
+ addReplyBulkCString(c,"last-ok-ping-reply");
+ addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
+ fields++;
+
+ addReplyBulkCString(c,"last-ping-reply");
+ addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
+ fields++;
+
+ if (ri->flags & SRI_S_DOWN) {
+ addReplyBulkCString(c,"s-down-time");
+ addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
+ fields++;
+ }
+
+ if (ri->flags & SRI_O_DOWN) {
+ addReplyBulkCString(c,"o-down-time");
+ addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
+ fields++;
+ }
+
+ /* Masters and Slaves */
+ if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+ addReplyBulkCString(c,"info-refresh");
+ addReplyBulkLongLong(c,mstime() - ri->info_refresh);
+ fields++;
+ }
+
+ /* Only masters */
+ if (ri->flags & SRI_MASTER) {
+ addReplyBulkCString(c,"num-slaves");
+ addReplyBulkLongLong(c,dictSize(ri->slaves));
+ fields++;
+
+ addReplyBulkCString(c,"num-other-sentinels");
+ addReplyBulkLongLong(c,dictSize(ri->sentinels));
+ fields++;
+
+ addReplyBulkCString(c,"quorum");
+ addReplyBulkLongLong(c,ri->quorum);
+ fields++;
+ }
+
+ /* Only slaves */
+ if (ri->flags & SRI_SLAVE) {
+ addReplyBulkCString(c,"master-link-down-time");
+ addReplyBulkLongLong(c,ri->master_link_down_time);
+ fields++;
+
+ addReplyBulkCString(c,"master-link-status");
+ addReplyBulkCString(c,
+ (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
+ "ok" : "err");
+ fields++;
+
+ addReplyBulkCString(c,"master-host");
+ addReplyBulkCString(c,
+ ri->slave_master_host ? ri->slave_master_host : "?");
+ fields++;
+
+ addReplyBulkCString(c,"master-port");
+ addReplyBulkLongLong(c,ri->slave_master_port);
+ fields++;
+
+ addReplyBulkCString(c,"slave-priority");
+ addReplyBulkLongLong(c,ri->slave_priority);
+ fields++;
+ }
+
+ /* Only sentinels */
+ if (ri->flags & SRI_SENTINEL) {
+ addReplyBulkCString(c,"last-hello-message");
+ addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
+ fields++;
+
+ addReplyBulkCString(c,"can-failover-its-master");
+ addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
+ fields++;
+
+ if (ri->flags & SRI_MASTER_DOWN) {
+ addReplyBulkCString(c,"subjective-leader");
+ addReplyBulkCString(c,ri->leader ? ri->leader : "?");
+ fields++;
+ }
+ }
+
+ setDeferredMultiBulkLength(c,mbl,fields*2);
+}
+
+/* Output a number of instances contanined inside a dictionary as
+ * Redis protocol. */
+void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
+ dictIterator *di;
+ dictEntry *de;
+
+ di = dictGetIterator(instances);
+ addReplyMultiBulkLen(c,dictSize(instances));
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ addReplySentinelRedisInstance(c,ri);
+ }
+ dictReleaseIterator(di);
+}
+
+/* Lookup the named master into sentinel.masters.
+ * If the master is not found reply to the client with an error and returns
+ * NULL. */
+sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
+ robj *name)
+{
+ sentinelRedisInstance *ri;
+
+ ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
+ if (!ri) {
+ addReplyError(c,"No such master with that name");
+ return NULL;
+ }
+ return ri;
+}
+
+void sentinelCommand(redisClient *c) {
+ if (!strcasecmp(c->argv[1]->ptr,"masters")) {
+ /* SENTINEL MASTERS */
+ if (c->argc != 2) goto numargserr;
+
+ addReplyDictOfRedisInstances(c,sentinel.masters);
+ } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
+ /* SENTINEL SLAVES <master-name> */
+ sentinelRedisInstance *ri;
+
+ if (c->argc != 3) goto numargserr;
+ if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
+ return;
+ addReplyDictOfRedisInstances(c,ri->slaves);
+ } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
+ /* SENTINEL SENTINELS <master-name> */
+ sentinelRedisInstance *ri;
+
+ if (c->argc != 3) goto numargserr;
+ if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
+ return;
+ addReplyDictOfRedisInstances(c,ri->sentinels);
+ } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
+ /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
+ sentinelRedisInstance *ri;
+ char *leader = NULL;
+ long port;
+ int isdown = 0;
+
+ if (c->argc != 4) goto numargserr;
+ if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
+ return;
+ ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
+ c->argv[2]->ptr,port,NULL);
+
+ /* It exists? Is actually a master? Is subjectively down? It's down.
+ * Note: if we are in tilt mode we always reply with "0". */
+ if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
+ (ri->flags & SRI_MASTER))
+ isdown = 1;
+ if (ri) leader = sentinelGetSubjectiveLeader(ri);
+
+ /* Reply with a two-elements multi-bulk reply: down state, leader. */
+ addReplyMultiBulkLen(c,2);
+ addReply(c, isdown ? shared.cone : shared.czero);
+ addReplyBulkCString(c, leader ? leader : "?");
+ if (leader) sdsfree(leader);
+ } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
+ /* SENTINEL RESET <pattern> */
+ if (c->argc != 3) goto numargserr;
+ addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
+ } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
+ /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
+ sentinelRedisInstance *ri;
+
+ if (c->argc != 3) goto numargserr;
+ ri = sentinelGetMasterByName(c->argv[2]->ptr);
+ if (ri == NULL) {
+ addReply(c,shared.nullmultibulk);
+ } else if (ri->info_refresh == 0) {
+ addReplySds(c,sdsnew("-IDONTKNOW I have not enough information to reply. Please ask another Sentinel.\r\n"));
+ } else {
+ sentinelAddr *addr = ri->addr;
+
+ if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
+ addr = ri->promoted_slave->addr;
+ addReplyMultiBulkLen(c,2);
+ addReplyBulkCString(c,addr->ip);
+ addReplyBulkLongLong(c,addr->port);
+ }
+ } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
+ /* SENTINEL FAILOVER <master-name> */
+ sentinelRedisInstance *ri;
+
+ if (c->argc != 3) goto numargserr;
+ if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
+ return;
+ if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
+ addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
+ return;
+ }
+ if (sentinelSelectSlave(ri) == NULL) {
+ addReplySds(c,sdsnew("-NOGOODSLAVE No suitable slave to promote\r\n"));
+ return;
+ }
+ sentinelStartFailover(ri,SENTINEL_FAILOVER_STATE_WAIT_START);
+ ri->flags |= SRI_FORCE_FAILOVER;
+ addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
+ /* SENTINEL PENDING-SCRIPTS */
+
+ if (c->argc != 2) goto numargserr;
+ sentinelPendingScriptsCommand(c);
+ } else {
+ addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
+ (char*)c->argv[1]->ptr);
+ }
+ return;
+
+numargserr:
+ addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
+ (char*)c->argv[1]->ptr);
+}
+
+void sentinelInfoCommand(redisClient *c) {
+ char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
+ sds info = sdsempty();
+ int defsections = !strcasecmp(section,"default");
+ int sections = 0;
+
+ if (c->argc > 2) {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+
+ if (!strcasecmp(section,"server") || defsections) {
+ if (sections++) info = sdscat(info,"\r\n");
+ sds serversection = genRedisInfoString("server");
+ info = sdscatlen(info,serversection,sdslen(serversection));
+ sdsfree(serversection);
+ }
+
+ if (!strcasecmp(section,"sentinel") || defsections) {
+ dictIterator *di;
+ dictEntry *de;
+ int master_id = 0;
+
+ if (sections++) info = sdscat(info,"\r\n");
+ info = sdscatprintf(info,
+ "# Sentinel\r\n"
+ "sentinel_masters:%lu\r\n"
+ "sentinel_tilt:%d\r\n"
+ "sentinel_running_scripts:%d\r\n"
+ "sentinel_scripts_queue_length:%ld\r\n",
+ dictSize(sentinel.masters),
+ sentinel.tilt,
+ sentinel.running_scripts,
+ listLength(sentinel.scripts_queue));
+
+ di = dictGetIterator(sentinel.masters);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ char *status = "ok";
+
+ if (ri->flags & SRI_O_DOWN) status = "odown";
+ else if (ri->flags & SRI_S_DOWN) status = "sdown";
+ info = sdscatprintf(info,
+ "master%d:name=%s,status=%s,address=%s:%d,"
+ "slaves=%lu,sentinels=%lu\r\n",
+ master_id++, ri->name, status,
+ ri->addr->ip, ri->addr->port,
+ dictSize(ri->slaves),
+ dictSize(ri->sentinels)+1);
+ }
+ dictReleaseIterator(di);
+ }
+
+ addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
+ (unsigned long)sdslen(info)));
+ addReplySds(c,info);
+ addReply(c,shared.crlf);
+}
+
+/* ===================== SENTINEL availability checks ======================= */
+
+/* Is this instance down from our point of view? */
+void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
+ mstime_t elapsed = mstime() - ri->last_avail_time;
+
+ /* Check if we are in need for a reconnection of one of the
+ * links, because we are detecting low activity.
+ *
+ * 1) Check if the command link seems connected, was connected not less
+ * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
+ * idle time that is greater than down_after_period / 2 seconds. */
+ if (ri->cc &&
+ (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+ (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
+ {
+ sentinelKillLink(ri,ri->cc);
+ }
+
+ /* 2) Check if the pubsub link seems connected, was connected not less
+ * than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
+ * activity in the Pub/Sub channel for more than
+ * SENTINEL_PUBLISH_PERIOD * 3.
+ */
+ if (ri->pc &&
+ (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+ (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
+ {
+ sentinelKillLink(ri,ri->pc);
+ }
+
+ /* Update the subjectively down flag. */
+ if (elapsed > ri->down_after_period) {
+ /* Is subjectively down */
+ if ((ri->flags & SRI_S_DOWN) == 0) {
+ sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
+ ri->s_down_since_time = mstime();
+ ri->flags |= SRI_S_DOWN;
+ }
+ } else {
+ /* Is subjectively up */
+ if (ri->flags & SRI_S_DOWN) {
+ sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
+ ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
+ }
+ }
+}
+
+/* Is this instance down accordingly to the configured quorum? */
+void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
+ dictIterator *di;
+ dictEntry *de;
+ int quorum = 0, odown = 0;
+
+ if (master->flags & SRI_S_DOWN) {
+ /* Is down for enough sentinels? */
+ quorum = 1; /* the current sentinel. */
+ /* Count all the other sentinels. */
+ di = dictGetIterator(master->sentinels);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ if (ri->flags & SRI_MASTER_DOWN) quorum++;
+ }
+ dictReleaseIterator(di);
+ if (quorum >= master->quorum) odown = 1;
+ }
+
+ /* Set the flag accordingly to the outcome. */
+ if (odown) {
+ if ((master->flags & SRI_O_DOWN) == 0) {
+ sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
+ quorum, master->quorum);
+ master->flags |= SRI_O_DOWN;
+ master->o_down_since_time = mstime();
+ }
+ } else {
+ if (master->flags & SRI_O_DOWN) {
+ sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
+ master->flags &= ~SRI_O_DOWN;
+ }
+ }
+}
+
+/* Receive the SENTINEL is-master-down-by-addr reply, see the
+ * sentinelAskMasterStateToOtherSentinels() function for more information. */
+void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
+ sentinelRedisInstance *ri = c->data;
+ redisReply *r;
+
+ if (ri) ri->pending_commands--;
+ if (!reply || !ri) return;
+ r = reply;
+
+ /* Ignore every error or unexpected reply.
+ * Note that if the command returns an error for any reason we'll
+ * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
+ if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
+ r->element[0]->type == REDIS_REPLY_INTEGER &&
+ r->element[1]->type == REDIS_REPLY_STRING)
+ {
+ ri->last_master_down_reply_time = mstime();
+ if (r->element[0]->integer == 1) {
+ ri->flags |= SRI_MASTER_DOWN;
+ } else {
+ ri->flags &= ~SRI_MASTER_DOWN;
+ }
+ sdsfree(ri->leader);
+ ri->leader = sdsnew(r->element[1]->str);
+ }
+}
+
+/* If we think (subjectively) the master is down, we start sending
+ * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
+ * in order to get the replies that allow to reach the quorum and
+ * possibly also mark the master as objectively down. */
+void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
+ dictIterator *di;
+ dictEntry *de;
+
+ di = dictGetIterator(master->sentinels);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
+ char port[32];
+ int retval;
+
+ /* If the master state from other sentinel is too old, we clear it. */
+ if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
+ ri->flags &= ~SRI_MASTER_DOWN;
+ sdsfree(ri->leader);
+ ri->leader = NULL;
+ }
+
+ /* Only ask if master is down to other sentinels if:
+ *
+ * 1) We believe it is down, or there is a failover in progress.
+ * 2) Sentinel is connected.
+ * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
+ if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
+ continue;
+ if (ri->flags & SRI_DISCONNECTED) continue;
+ if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
+ continue;
+
+ /* Ask */
+ ll2string(port,sizeof(port),master->addr->port);
+ retval = redisAsyncCommand(ri->cc,
+ sentinelReceiveIsMasterDownReply, NULL,
+ "SENTINEL is-master-down-by-addr %s %s",
+ master->addr->ip, port);
+ if (retval == REDIS_OK) ri->pending_commands++;
+ }
+ dictReleaseIterator(di);
+}
+
+/* =============================== FAILOVER ================================= */
+
+/* Given a master get the "subjective leader", that is, among all the sentinels
+ * with given characteristics, the one with the lexicographically smaller
+ * runid. The characteristics required are:
+ *
+ * 1) Has SRI_CAN_FAILOVER flag.
+ * 2) Is not disconnected.
+ * 3) Recently answered to our ping (no longer than
+ * SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
+ *
+ * The function returns a pointer to an sds string representing the runid of the
+ * leader sentinel instance (from our point of view). Otherwise NULL is
+ * returned if there are no suitable sentinels.
+ */
+
+int compareRunID(const void *a, const void *b) {
+ char **aptrptr = (char**)a, **bptrptr = (char**)b;
+ return strcasecmp(*aptrptr, *bptrptr);
+}
+
+char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
+ dictIterator *di;
+ dictEntry *de;
+ char **instance =
+ zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
+ int instances = 0;
+ char *leader = NULL;
+
+ if (master->flags & SRI_CAN_FAILOVER) {
+ /* Add myself if I'm a Sentinel that can failover this master. */
+ instance[instances++] = server.runid;
+ }
+
+ di = dictGetIterator(master->sentinels);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ mstime_t lag = mstime() - ri->last_avail_time;
+
+ if (lag > SENTINEL_INFO_VALIDITY_TIME ||
+ !(ri->flags & SRI_CAN_FAILOVER) ||
+ (ri->flags & SRI_DISCONNECTED) ||
+ ri->runid == NULL)
+ continue;
+ instance[instances++] = ri->runid;
+ }
+ dictReleaseIterator(di);
+
+ /* If we have at least one instance passing our checks, order the array
+ * by runid. */
+ if (instances) {
+ qsort(instance,instances,sizeof(char*),compareRunID);
+ leader = sdsnew(instance[0]);
+ }
+ zfree(instance);
+ return leader;
+}
+
+struct sentinelLeader {
+ char *runid;
+ unsigned long votes;
+};
+
+/* Helper function for sentinelGetObjectiveLeader, increment the counter
+ * relative to the specified runid. */
+void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
+ dictEntry *de = dictFind(counters,runid);
+ uint64_t oldval;
+
+ if (de) {
+ oldval = dictGetUnsignedIntegerVal(de);
+ dictSetUnsignedIntegerVal(de,oldval+1);
+ } else {
+ de = dictAddRaw(counters,runid);
+ redisAssert(de != NULL);
+ dictSetUnsignedIntegerVal(de,1);
+ }
+}
+
+/* Scan all the Sentinels attached to this master to check what is the
+ * most voted leader among Sentinels. */
+char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
+ dict *counters;
+ dictIterator *di;
+ dictEntry *de;
+ unsigned int voters = 0, voters_quorum;
+ char *myvote;
+ char *winner = NULL;
+
+ redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
+ counters = dictCreate(&leaderVotesDictType,NULL);
+
+ /* Count my vote. */
+ myvote = sentinelGetSubjectiveLeader(master);
+ if (myvote) {
+ sentinelObjectiveLeaderIncr(counters,myvote);
+ voters++;
+ }
+
+ /* Count other sentinels votes */
+ di = dictGetIterator(master->sentinels);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+ if (ri->leader == NULL) continue;
+ /* If the failover is not already in progress we are only interested
+ * in Sentinels that believe the master is down. Otherwise the leader
+ * selection is useful for the "failover-takedown" when the original
+ * leader fails. In that case we consider all the voters. */
+ if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+ !(ri->flags & SRI_MASTER_DOWN)) continue;
+ sentinelObjectiveLeaderIncr(counters,ri->leader);
+ voters++;
+ }
+ dictReleaseIterator(di);
+ voters_quorum = voters/2+1;
+
+ /* Check what's the winner. For the winner to win, it needs two conditions:
+ * 1) Absolute majority between voters (50% + 1).
+ * 2) And anyway at least master->quorum votes. */
+ {
+ uint64_t max_votes = 0; /* Max votes so far. */
+
+ di = dictGetIterator(counters);
+ while((de = dictNext(di)) != NULL) {
+ uint64_t votes = dictGetUnsignedIntegerVal(de);
+
+ if (max_votes < votes) {
+ max_votes = votes;
+ winner = dictGetKey(de);
+ }
+ }
+ dictReleaseIterator(di);
+ if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
+ winner = NULL;
+ }
+ winner = winner ? sdsnew(winner) : NULL;
+ sdsfree(myvote);
+ dictRelease(counters);
+ return winner;
+}
+
+/* Setup the master state to start a failover as a leader.
+ *
+ * State can be either:
+ *
+ * SENTINEL_FAILOVER_STATE_WAIT_START: starts a failover from scratch.
+ * SENTINEL_FAILOVER_STATE_RECONF_SLAVES: takedown a failed failover.
+ */
+void sentinelStartFailover(sentinelRedisInstance *master, int state) {
+ redisAssert(master->flags & SRI_MASTER);
+ redisAssert(state == SENTINEL_FAILOVER_STATE_WAIT_START ||
+ state == SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
+
+ master->failover_state = state;
+ master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
+ sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
+
+ /* Pick a random delay if it's a fresh failover (WAIT_START), and not
+ * a recovery of a failover started by another sentinel. */
+ if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
+ master->failover_start_time = mstime() +
+ SENTINEL_FAILOVER_FIXED_DELAY +
+ (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
+ sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
+ "%@ #starting in %lld milliseconds",
+ master->failover_start_time-mstime());
+ }
+ master->failover_state_change_time = mstime();
+}
+
+/* This function checks if there are the conditions to start the failover,
+ * that is:
+ *
+ * 1) Enough time has passed since O_DOWN.
+ * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
+ * 3) We are the objectively leader for this master.
+ *
+ * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
+ * and SRI_I_AM_THE_LEADER.
+ */
+void sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
+ char *leader;
+ int isleader;
+
+ /* We can't failover if the master is not in O_DOWN state or if
+ * there is not already a failover in progress (to perform the
+ * takedown if the leader died) or if this Sentinel is not allowed
+ * to start a failover. */
+ if (!(master->flags & SRI_CAN_FAILOVER) ||
+ !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
+
+ leader = sentinelGetObjectiveLeader(master);
+ isleader = leader && strcasecmp(leader,server.runid) == 0;
+ sdsfree(leader);
+
+ /* If I'm not the leader, I can't failover for sure. */
+ if (!isleader) return;
+
+ /* If the failover is already in progress there are two options... */
+ if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
+ if (master->flags & SRI_I_AM_THE_LEADER) {
+ /* 1) I'm flagged as leader so I already started the failover.
+ * Just return. */
+ return;
+ } else {
+ mstime_t elapsed = mstime() - master->failover_state_change_time;
+
+ /* 2) I'm the new leader, but I'm not flagged as leader in the
+ * master: I did not started the failover, but the original
+ * leader has no longer the leadership.
+ *
+ * In this case if the failover appears to be lagging
+ * for at least 25% of the configured failover timeout,
+ * I can assume I can take control. Otherwise
+ * it's better to return and wait more. */
+ if (elapsed < (master->failover_timeout/4)) return;
+ sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
+ /* We have already an elected slave if we are in
+ * FAILOVER_IN_PROGRESS state, that is, the slave that we
+ * observed turning into a master. */
+ sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
+ /* As an observer we flagged all the slaves as RECONF_SENT but
+ * now we are in charge of actually sending the reconfiguration
+ * command so let's clear this flag for all the instances. */
+ sentinelDelFlagsToDictOfRedisInstances(master->slaves,
+ SRI_RECONF_SENT);
+ }
+ } else {
+ /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set.
+ *
+ * Do we have a slave to promote? Otherwise don't start a failover
+ * at all. */
+ if (sentinelSelectSlave(master) == NULL) return;
+ sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_WAIT_START);
+ }
+}
+
+/* Select a suitable slave to promote. The current algorithm only uses
+ * the following parameters:
+ *
+ * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
+ * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
+ * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
+ * 4) master_link_down_time no more than:
+ * (now - master->s_down_since_time) + (master->down_after_period * 10).
+ * 5) Slave priority can't be zero, otherwise the slave is discareded.
+ *
+ * Among all the slaves matching the above conditions we select the slave
+ * with lower slave_priority. If priority is the same we select the slave
+ * with lexicographically smaller runid.
+ *
+ * The function returns the pointer to the selected slave, otherwise
+ * NULL if no suitable slave was found.
+ */
+
+int compareSlavesForPromotion(const void *a, const void *b) {
+ sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
+ **sb = (sentinelRedisInstance **)b;
+ char *sa_runid, *sb_runid;
+
+ if ((*sa)->slave_priority != (*sb)->slave_priority)
+ return (*sa)->slave_priority - (*sb)->slave_priority;
+
+ /* If priority is the same, select the slave with that has the
+ * lexicographically smaller runid. Note that we try to handle runid
+ * == NULL as there are old Redis versions that don't publish runid in
+ * INFO. A NULL runid is considered bigger than any other runid. */
+ sa_runid = (*sa)->runid;
+ sb_runid = (*sb)->runid;
+ if (sa_runid == NULL && sb_runid == NULL) return 0;
+ else if (sa_runid == NULL) return 1; /* a > b */
+ else if (sb_runid == NULL) return -1; /* a < b */
+ return strcasecmp(sa_runid, sb_runid);
+}
+
+sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
+ sentinelRedisInstance **instance =
+ zmalloc(sizeof(instance[0])*dictSize(master->slaves));
+ sentinelRedisInstance *selected = NULL;
+ int instances = 0;
+ dictIterator *di;
+ dictEntry *de;
+ mstime_t max_master_down_time = 0;
+
+ if (master->flags & SRI_S_DOWN)
+ max_master_down_time += mstime() - master->s_down_since_time;
+ max_master_down_time += master->down_after_period * 10;
+
+ di = dictGetIterator(master->slaves);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *slave = dictGetVal(de);
+ mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
+
+ if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
+ if (slave->last_avail_time < info_validity_time) continue;
+ if (slave->slave_priority == 0) continue;
+
+ /* If the master is in SDOWN state we get INFO for slaves every second.
+ * Otherwise we get it with the usual period so we need to account for
+ * a larger delay. */
+ if ((master->flags & SRI_S_DOWN) == 0)
+ info_validity_time -= SENTINEL_INFO_PERIOD;
+ if (slave->info_refresh < info_validity_time) continue;
+ if (slave->master_link_down_time > max_master_down_time) continue;
+ instance[instances++] = slave;
+ }
+ dictReleaseIterator(di);
+ if (instances) {
+ qsort(instance,instances,sizeof(sentinelRedisInstance*),
+ compareSlavesForPromotion);
+ selected = instance[0];
+ }
+ zfree(instance);
+ return selected;
+}
+
+/* ---------------- Failover state machine implementation ------------------- */
+void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
+ /* If we in "wait start" but the master is no longer in ODOWN nor in
+ * SDOWN condition we abort the failover. This is important as it
+ * prevents a useless failover in a a notable case of netsplit, where
+ * the senitnels are split from the redis instances. In this case
+ * the failover will not start while there is the split because no
+ * good slave can be reached. However when the split is resolved, we
+ * can go to waitstart if the slave is back rechable a few milliseconds
+ * before the master is. In that case when the master is back online
+ * we cancel the failover. */
+ if ((ri->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_FORCE_FAILOVER)) == 0) {
+ sentinelEvent(REDIS_WARNING,"-failover-abort-master-is-back",
+ ri,"%@");
+ sentinelAbortFailover(ri);
+ return;
+ }
+
+ /* Start the failover going to the next state if enough time has
+ * elapsed. */
+ if (mstime() >= ri->failover_start_time) {
+ ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
+ ri->failover_state_change_time = mstime();
+ sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
+ }
+}
+
+void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
+ sentinelRedisInstance *slave = sentinelSelectSlave(ri);
+
+ if (slave == NULL) {
+ sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
+ sentinelAbortFailover(ri);
+ } else {
+ sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
+ slave->flags |= SRI_PROMOTED;
+ ri->promoted_slave = slave;
+ ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
+ ri->failover_state_change_time = mstime();
+ sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
+ slave, "%@");
+ }
+}
+
+void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
+ int retval;
+
+ if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
+
+ /* Send SLAVEOF NO ONE command to turn the slave into a master.
+ * We actually register a generic callback for this command as we don't
+ * really care about the reply. We check if it worked indirectly observing
+ * if INFO returns a different role (master instead of slave). */
+ retval = redisAsyncCommand(ri->promoted_slave->cc,
+ sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
+ if (retval != REDIS_OK) return;
+ ri->promoted_slave->pending_commands++;
+ sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
+ ri->promoted_slave,"%@");
+ ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
+ ri->failover_state_change_time = mstime();
+}
+
+/* We actually wait for promotion indirectly checking with INFO when the
+ * slave turns into a master. */
+void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
+ mstime_t elapsed = mstime() - ri->failover_state_change_time;
+
+ if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
+ sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
+ "%@");
+ sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
+ ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
+ ri->failover_state_change_time = mstime();
+ ri->promoted_slave->flags &= ~SRI_PROMOTED;
+ ri->promoted_slave = NULL;
+ }
+}
+
+void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
+ int not_reconfigured = 0, timeout = 0;
+ dictIterator *di;
+ dictEntry *de;
+ mstime_t elapsed = mstime() - master->failover_state_change_time;
+
+ /* We can't consider failover finished if the promoted slave is
+ * not reachable. */
+ if (master->promoted_slave == NULL ||
+ master->promoted_slave->flags & SRI_S_DOWN) return;
+
+ /* The failover terminates once all the reachable slaves are properly
+ * configured. */
+ di = dictGetIterator(master->slaves);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *slave = dictGetVal(de);
+
+ if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
+ if (slave->flags & SRI_S_DOWN) continue;
+ not_reconfigured++;
+ }
+ dictReleaseIterator(di);
+
+ /* Force end of failover on timeout. */
+ if (elapsed > master->failover_timeout) {
+ not_reconfigured = 0;
+ timeout = 1;
+ sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
+ }
+
+ if (not_reconfigured == 0) {
+ int role = (master->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
+ SENTINEL_OBSERVER;
+
+ sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
+ master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
+ master->failover_state_change_time = mstime();
+ sentinelCallClientReconfScript(master,role,"end",master->addr,
+ master->promoted_slave->addr);
+ }
+
+ /* If I'm the leader it is a good idea to send a best effort SLAVEOF
+ * command to all the slaves still not reconfigured to replicate with
+ * the new master. */
+ if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
+ dictIterator *di;
+ dictEntry *de;
+ char master_port[32];
+
+ ll2string(master_port,sizeof(master_port),
+ master->promoted_slave->addr->port);
+
+ di = dictGetIterator(master->slaves);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *slave = dictGetVal(de);
+ int retval;
+
+ if (slave->flags &
+ (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
+
+ retval = redisAsyncCommand(slave->cc,
+ sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+ master->promoted_slave->addr->ip,
+ master_port);
+ if (retval == REDIS_OK) {
+ sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
+ slave->flags |= SRI_RECONF_SENT;
+ }
+ }
+ dictReleaseIterator(di);
+ }
+}
+
+/* Send SLAVE OF <new master address> to all the remaining slaves that
+ * still don't appear to have the configuration updated. */
+void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
+ dictIterator *di;
+ dictEntry *de;
+ int in_progress = 0;
+
+ di = dictGetIterator(master->slaves);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *slave = dictGetVal(de);
+
+ if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
+ in_progress++;
+ }
+ dictReleaseIterator(di);
+
+ di = dictGetIterator(master->slaves);
+ while(in_progress < master->parallel_syncs &&
+ (de = dictNext(di)) != NULL)
+ {
+ sentinelRedisInstance *slave = dictGetVal(de);
+ int retval;
+ char master_port[32];
+
+ /* Skip the promoted slave, and already configured slaves. */
+ if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
+
+ /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
+ * the slave moving forward to the next state. */
+ if ((slave->flags & SRI_RECONF_SENT) &&
+ (mstime() - slave->slave_reconf_sent_time) >
+ SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
+ {
+ sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
+ slave->flags &= ~SRI_RECONF_SENT;
+ }
+
+ /* Nothing to do for instances that are disconnected or already
+ * in RECONF_SENT state. */
+ if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
+ continue;
+
+ /* Send SLAVEOF <new master>. */
+ ll2string(master_port,sizeof(master_port),
+ master->promoted_slave->addr->port);
+ retval = redisAsyncCommand(slave->cc,
+ sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+ master->promoted_slave->addr->ip,
+ master_port);
+ if (retval == REDIS_OK) {
+ slave->flags |= SRI_RECONF_SENT;
+ slave->pending_commands++;
+ slave->slave_reconf_sent_time = mstime();
+ sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
+ in_progress++;
+ }
+ }
+ dictReleaseIterator(di);
+ sentinelFailoverDetectEnd(master);
+}
+
+/* This function is called when the slave is in
+ * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
+ * to remove it from the master table and add the promoted slave instead.
+ *
+ * If there are no promoted slaves as this instance is unique, we remove
+ * and re-add it with the same address to trigger a complete state
+ * refresh. */
+void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
+ sentinelRedisInstance *ref = master->promoted_slave ?
+ master->promoted_slave : master;
+
+ sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
+ master->name, master->addr->ip, master->addr->port,
+ ref->addr->ip, ref->addr->port);
+
+ sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
+}
+
+void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
+ redisAssert(ri->flags & SRI_MASTER);
+
+ if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
+
+ switch(ri->failover_state) {
+ case SENTINEL_FAILOVER_STATE_WAIT_START:
+ sentinelFailoverWaitStart(ri);
+ break;
+ case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
+ sentinelFailoverSelectSlave(ri);
+ break;
+ case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
+ sentinelFailoverSendSlaveOfNoOne(ri);
+ break;
+ case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
+ sentinelFailoverWaitPromotion(ri);
+ break;
+ case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
+ sentinelFailoverReconfNextSlave(ri);
+ break;
+ case SENTINEL_FAILOVER_STATE_DETECT_END:
+ sentinelFailoverDetectEnd(ri);
+ break;
+ }
+}
+
+/* Abort a failover in progress with the following steps:
+ * 1) If this instance is the leaer send a SLAVEOF command to all the already
+ * reconfigured slaves if any to configure them to replicate with the
+ * original master.
+ * 2) For both leaders and observers: clear the failover flags and state in
+ * the master instance.
+ * 3) If there is already a promoted slave and we are the leader, and this
+ * slave is not DISCONNECTED, try to reconfigure it to replicate
+ * back to the master as well, sending a best effort SLAVEOF command.
+ */
+void sentinelAbortFailover(sentinelRedisInstance *ri) {
+ char master_port[32];
+ dictIterator *di;
+ dictEntry *de;
+ int sentinel_role;
+
+ redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
+ ll2string(master_port,sizeof(master_port),ri->addr->port);
+
+ /* Clear failover related flags from slaves.
+ * Also if we are the leader make sure to send SLAVEOF commands to all the
+ * already reconfigured slaves in order to turn them back into slaves of
+ * the original master. */
+ di = dictGetIterator(ri->slaves);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *slave = dictGetVal(de);
+ if ((ri->flags & SRI_I_AM_THE_LEADER) &&
+ !(slave->flags & SRI_DISCONNECTED) &&
+ (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
+ SRI_RECONF_DONE)))
+ {
+ int retval;
+
+ retval = redisAsyncCommand(slave->cc,
+ sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+ ri->addr->ip,
+ master_port);
+ if (retval == REDIS_OK)
+ sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
+ }
+ slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
+ }
+ dictReleaseIterator(di);
+
+ sentinel_role = (ri->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
+ SENTINEL_OBSERVER;
+ ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER|SRI_FORCE_FAILOVER);
+ ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+ ri->failover_state_change_time = mstime();
+ if (ri->promoted_slave) {
+ sentinelCallClientReconfScript(ri,sentinel_role,"abort",
+ ri->promoted_slave->addr,ri->addr);
+ ri->promoted_slave->flags &= ~SRI_PROMOTED;
+ ri->promoted_slave = NULL;
+ }
+}
+
+/* The following is called only for master instances and will abort the
+ * failover process if:
+ *
+ * 1) The failover is in progress.
+ * 2) We already promoted a slave.
+ * 3) The promoted slave is in extended SDOWN condition.
+ */
+void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
+ /* Failover is in progress? Do we have a promoted slave? */
+ if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
+
+ /* Is the promoted slave into an extended SDOWN state? */
+ if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
+ (mstime() - ri->promoted_slave->s_down_since_time) <
+ (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
+
+ sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
+ sentinelAbortFailover(ri);
+}
+
+/* ======================== SENTINEL timer handler ==========================
+ * This is the "main" our Sentinel, being sentinel completely non blocking
+ * in design. The function is called every second.
+ * -------------------------------------------------------------------------- */
+
+/* Perform scheduled operations for the specified Redis instance. */
+void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
+ /* ========== MONITORING HALF ============ */
+ /* Every kind of instance */
+ sentinelReconnectInstance(ri);
+ sentinelPingInstance(ri);
+
+ /* Masters and slaves */
+ if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+ /* Nothing so far. */
+ }
+
+ /* Only masters */
+ if (ri->flags & SRI_MASTER) {
+ sentinelAskMasterStateToOtherSentinels(ri);
+ }
+
+ /* ============== ACTING HALF ============= */
+ /* We don't proceed with the acting half if we are in TILT mode.
+ * TILT happens when we find something odd with the time, like a
+ * sudden change in the clock. */
+ if (sentinel.tilt) {
+ if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
+ sentinel.tilt = 0;
+ sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
+ }
+
+ /* Every kind of instance */
+ sentinelCheckSubjectivelyDown(ri);
+
+ /* Masters and slaves */
+ if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+ /* Nothing so far. */
+ }
+
+ /* Only masters */
+ if (ri->flags & SRI_MASTER) {
+ sentinelCheckObjectivelyDown(ri);
+ sentinelStartFailoverIfNeeded(ri);
+ sentinelFailoverStateMachine(ri);
+ sentinelAbortFailoverIfNeeded(ri);
+ }
+}
+
+/* Perform scheduled operations for all the instances in the dictionary.
+ * Recursively call the function against dictionaries of slaves. */
+void sentinelHandleDictOfRedisInstances(dict *instances) {
+ dictIterator *di;
+ dictEntry *de;
+ sentinelRedisInstance *switch_to_promoted = NULL;
+
+ /* There are a number of things we need to perform against every master. */
+ di = dictGetIterator(instances);
+ while((de = dictNext(di)) != NULL) {
+ sentinelRedisInstance *ri = dictGetVal(de);
+
+ sentinelHandleRedisInstance(ri);
+ if (ri->flags & SRI_MASTER) {
+ sentinelHandleDictOfRedisInstances(ri->slaves);
+ sentinelHandleDictOfRedisInstances(ri->sentinels);
+ if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
+ switch_to_promoted = ri;
+ }
+ }
+ }
+ if (switch_to_promoted)
+ sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
+ dictReleaseIterator(di);
+}
+
+/* This function checks if we need to enter the TITL mode.
+ *
+ * The TILT mode is entered if we detect that between two invocations of the
+ * timer interrupt, a negative amount of time, or too much time has passed.
+ * Note that we expect that more or less just 100 milliseconds will pass
+ * if everything is fine. However we'll see a negative number or a
+ * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
+ * following conditions happen:
+ *
+ * 1) The Sentiel process for some time is blocked, for every kind of
+ * random reason: the load is huge, the computer was freezed for some time
+ * in I/O or alike, the process was stopped by a signal. Everything.
+ * 2) The system clock was altered significantly.
+ *
+ * Under both this conditions we'll see everything as timed out and failing
+ * without good reasons. Instead we enter the TILT mode and wait
+ * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
+ *
+ * During TILT time we still collect information, we just do not act. */
+void sentinelCheckTiltCondition(void) {
+ mstime_t now = mstime();
+ mstime_t delta = now - sentinel.previous_time;
+
+ if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
+ sentinel.tilt = 1;
+ sentinel.tilt_start_time = mstime();
+ sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
+ }
+ sentinel.previous_time = mstime();
+}
+
+void sentinelTimer(void) {
+ sentinelCheckTiltCondition();
+ sentinelHandleDictOfRedisInstances(sentinel.masters);
+ sentinelRunPendingScripts();
+ sentinelCollectTerminatedScripts();
+ sentinelKillTimedoutScripts();
+}
+
#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
#include <math.h> /* isnan() */
+zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank);
+
redisSortOperation *createSortOperation(int type, robj *pattern) {
redisSortOperation *so = zmalloc(sizeof(*so));
so->type = type;
/* Lookup the key to sort. It must be of the right types */
sortval = lookupKeyRead(c->db,c->argv[1]);
- if (sortval && sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
- sortval->type != REDIS_ZSET)
+ if (sortval && sortval->type != REDIS_SET &&
+ sortval->type != REDIS_LIST &&
+ sortval->type != REDIS_ZSET)
{
addReply(c,shared.wrongtypeerr);
return;
* Operations can be GET/DEL/INCR/DECR */
operations = listCreate();
listSetFreeMethod(operations,zfree);
- j = 2;
+ j = 2; /* options start at argv[2] */
/* Now we need to protect sortval incrementing its count, in the future
* SORT may have options able to overwrite/delete keys during the sorting
j++;
}
- /* If we have STORE we need to force sorting for deterministic output
- * and replication. We use alpha sorting since this is guaranteed to
- * work with any input. */
- if (storekey && dontsort) {
+ /* For the STORE option, or when SORT is called from a Lua script,
+ * we want to force a specific ordering even when no explicit ordering
+ * was asked (SORT BY nosort). This guarantees that replication / AOF
+ * is deterministic.
+ *
+ * However in the case 'dontsort' is true, but the type to sort is a
+ * sorted set, we don't need to do anything as ordering is guaranteed
+ * in this special case. */
+ if ((storekey || c->flags & REDIS_LUA_CLIENT) &&
+ (dontsort && sortval->type != REDIS_ZSET))
+ {
+ /* Force ALPHA sorting */
dontsort = 0;
alpha = 1;
sortby = NULL;
if (sortval->type == REDIS_ZSET)
zsetConvert(sortval, REDIS_ENCODING_SKIPLIST);
- /* Load the sorting vector with all the objects to sort */
+ /* Objtain the length of the object to sort. */
switch(sortval->type) {
case REDIS_LIST: vectorlen = listTypeLength(sortval); break;
case REDIS_SET: vectorlen = setTypeSize(sortval); break;
case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */
}
+
+ /* Perform LIMIT start,count sanity checking. */
+ start = (limit_start < 0) ? 0 : limit_start;
+ end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
+ if (start >= vectorlen) {
+ start = vectorlen-1;
+ end = vectorlen-2;
+ }
+ if (end >= vectorlen) end = vectorlen-1;
+
+ /* Optimization:
+ *
+ * 1) if the object to sort is a sorted set.
+ * 2) There is nothing to sort as dontsort is true (BY <constant string>).
+ * 3) We have a LIMIT option that actually reduces the number of elements
+ * to fetch.
+ *
+ * In this case to load all the objects in the vector is a huge waste of
+ * resources. We just allocate a vector that is big enough for the selected
+ * range length, and make sure to load just this part in the vector. */
+ if (sortval->type == REDIS_ZSET &&
+ dontsort &&
+ (start != 0 || end != vectorlen-1))
+ {
+ vectorlen = end-start+1;
+ }
+
+ /* Load the sorting vector with all the objects to sort */
vector = zmalloc(sizeof(redisSortObject)*vectorlen);
j = 0;
j++;
}
setTypeReleaseIterator(si);
+ } else if (sortval->type == REDIS_ZSET && dontsort) {
+ /* Special handling for a sorted set, if 'dontsort' is true.
+ * This makes sure we return elements in the sorted set original
+ * ordering, accordingly to DESC / ASC options.
+ *
+ * Note that in this case we also handle LIMIT here in a direct
+ * way, just getting the required range, as an optimization. */
+
+ zset *zs = sortval->ptr;
+ zskiplist *zsl = zs->zsl;
+ zskiplistNode *ln;
+ robj *ele;
+ int rangelen = vectorlen;
+
+ /* Check if starting point is trivial, before doing log(N) lookup. */
+ if (desc) {
+ long zsetlen = dictSize(((zset*)sortval->ptr)->dict);
+
+ ln = zsl->tail;
+ if (start > 0)
+ ln = zslGetElementByRank(zsl,zsetlen-start);
+ } else {
+ ln = zsl->header->level[0].forward;
+ if (start > 0)
+ ln = zslGetElementByRank(zsl,start+1);
+ }
+
+ while(rangelen--) {
+ redisAssertWithInfo(c,sortval,ln != NULL);
+ ele = ln->obj;
+ vector[j].obj = ele;
+ vector[j].u.score = 0;
+ vector[j].u.cmpobj = NULL;
+ j++;
+ ln = desc ? ln->backward : ln->level[0].forward;
+ }
+ /* The code producing the output does not know that in the case of
+ * sorted set, 'dontsort', and LIMIT, we are able to get just the
+ * range, already sorted, so we need to adjust "start" and "end"
+ * to make sure start is set to 0. */
+ end -= start;
+ start = 0;
} else if (sortval->type == REDIS_ZSET) {
dict *set = ((zset*)sortval->ptr)->dict;
dictIterator *di;
}
}
- /* We are ready to sort the vector... perform a bit of sanity check
- * on the LIMIT option too. We'll use a partial version of quicksort. */
- start = (limit_start < 0) ? 0 : limit_start;
- end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
- if (start >= vectorlen) {
- start = vectorlen-1;
- end = vectorlen-2;
- }
- if (end >= vectorlen) end = vectorlen-1;
-
- server.sort_dontsort = dontsort;
if (dontsort == 0) {
server.sort_desc = desc;
server.sort_alpha = alpha;
value = hashTypeCurrentObject(hi, REDIS_HASH_VALUE);
value = tryObjectEncoding(value);
ret = dictAdd(dict, field, value);
- redisAssert(ret == DICT_OK);
+ if (ret != DICT_OK) {
+ redisLogHexDump(REDIS_WARNING,"ziplist with dup elements dump",
+ o->ptr,ziplistBlobLen(o->ptr));
+ redisAssert(ret == DICT_OK);
+ }
}
hashTypeReleaseIterator(hi);
#include "redis.h"
+void signalListAsReady(redisClient *c, robj *key);
+
/*-----------------------------------------------------------------------------
* List API
*----------------------------------------------------------------------------*/
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
}
+/* The function pushes an elmenet to the specified list object 'subject',
+ * at head or tail position as specified by 'where'.
+ *
+ * There is no need for the caller to incremnet the refcount of 'value' as
+ * the function takes care of it if needed. */
void listTypePush(robj *subject, robj *value, int where) {
/* Check if we need to convert the ziplist */
listTypeTryConversion(subject,value);
return;
}
+ if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
+
for (j = 2; j < c->argc; j++) {
c->argv[j] = tryObjectEncoding(c->argv[j]);
- if (may_have_waiting_clients) {
- if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
- waiting++;
- continue;
- } else {
- may_have_waiting_clients = 0;
- }
- }
if (!lobj) {
lobj = createZiplistObject();
dbAdd(c->db,c->argv[1],lobj);
addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
if (pushed) signalModifiedKey(c->db,c->argv[1]);
server.dirty += pushed;
-
- /* Alter the replication of the command accordingly to the number of
- * list elements delivered to clients waiting into a blocking operation.
- * We do that only if there were waiting clients, and only if still some
- * element was pushed into the list (othewise dirty is 0 and nothign will
- * be propagated). */
- if (waiting && pushed) {
- /* CMD KEY a b C D E */
- for (j = 0; j < waiting; j++) decrRefCount(c->argv[j+2]);
- memmove(c->argv+2,c->argv+2+waiting,sizeof(robj*)*pushed);
- c->argc -= waiting;
- }
}
void lpushCommand(redisClient *c) {
* as well. This command was originally proposed by Ezra Zygmuntowicz.
*/
-void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
- if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
- /* Create the list if the key does not exist */
- if (!dstobj) {
- dstobj = createZiplistObject();
- dbAdd(c->db,dstkey,dstobj);
- } else {
- signalModifiedKey(c->db,dstkey);
- }
- listTypePush(dstobj,value,REDIS_HEAD);
- /* Additionally propagate this PUSH operation together with
- * the operation performed by the command. */
- {
- robj **argv = zmalloc(sizeof(robj*)*3);
- argv[0] = createStringObject("LPUSH",5);
- argv[1] = dstkey;
- argv[2] = value;
- incrRefCount(argv[1]);
- incrRefCount(argv[2]);
- alsoPropagate(server.lpushCommand,c->db->id,argv,3,
- REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
- }
+void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+ /* Create the list if the key does not exist */
+ if (!dstobj) {
+ dstobj = createZiplistObject();
+ dbAdd(c->db,dstkey,dstobj);
+ signalListAsReady(c,dstkey);
}
+ signalModifiedKey(c->db,dstkey);
+ listTypePush(dstobj,value,REDIS_HEAD);
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}
if (dobj && checkType(c,dobj,REDIS_LIST)) return;
value = listTypePop(sobj,REDIS_TAIL);
/* We saved touched key, and protect it, since rpoplpushHandlePush
- * may change the client command argument vector. */
+ * may change the client command argument vector (it does not
+ * currently). */
incrRefCount(touchedkey);
- rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
+ rpoplpushHandlePush(c,c->argv[2],dobj,value);
/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);
signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
-
- /* Replicate this as a simple RPOP since the LPUSH side is replicated
- * by rpoplpushHandlePush() call if needed (it may not be needed
- * if a client is blocking wait a push against the list). */
- rewriteClientCommandVector(c,2,
- resetRefCount(createStringObject("RPOP",4)),
- c->argv[1]);
}
}
* Blocking POP operations
*----------------------------------------------------------------------------*/
-/* Currently Redis blocking operations support is limited to list POP ops,
- * so the current implementation is not fully generic, but it is also not
- * completely specific so it will not require a rewrite to support new
- * kind of blocking operations in the future.
- *
- * Still it's important to note that list blocking operations can be already
- * used as a notification mechanism in order to implement other blocking
- * operations at application level, so there must be a very strong evidence
- * of usefulness and generality before new blocking operations are implemented.
- *
- * This is how the current blocking POP works, we use BLPOP as example:
+/* This is how the current blocking POP works, we use BLPOP as example:
* - If the user calls BLPOP and the key exists and contains a non empty list
* then LPOP is called instead. So BLPOP is semantically the same as LPOP
- * if there is not to block.
+ * if blocking is not required.
* - If instead BLPOP is called and the key does not exists or the list is
* empty we need to block. In order to do so we remove the notification for
* new data to read in the client socket (so that we'll not serve new
* in a dictionary (db->blocking_keys) mapping keys to a list of clients
* blocking for this keys.
* - If a PUSH operation against a key with blocked clients waiting is
- * performed, we serve the first in the list: basically instead to push
- * the new element inside the list we return it to the (first / oldest)
- * blocking client, unblock the client, and remove it form the list.
- *
- * The above comment and the source code should be enough in order to understand
- * the implementation and modify / fix it later.
+ * performed, we mark this key as "ready", and after the current command,
+ * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
+ * for this list, from the one that blocked first, to the last, accordingly
+ * to the number of elements we have in the ready list.
*/
/* Set a client in blocking mode for the specified key, with the specified
listAddNodeTail(server.unblocked_clients,c);
}
-/* This should be called from any function PUSHing into lists.
- * 'c' is the "pushing client", 'key' is the key it is pushing data against,
- * 'ele' is the element pushed.
+/* If the specified key has clients blocked waiting for list pushes, this
+ * function will put the key reference into the server.ready_keys list.
+ * Note that db->ready_keys is an hash table that allows us to avoid putting
+ * the same key agains and again in the list in case of multiple pushes
+ * made by a script or in the context of MULTI/EXEC.
*
- * If the function returns 0 there was no client waiting for a list push
- * against this key.
+ * The list will be finally processed by handleClientsBlockedOnLists() */
+void signalListAsReady(redisClient *c, robj *key) {
+ readyList *rl;
+
+ /* No clients blocking for this key? No need to queue it. */
+ if (dictFind(c->db->blocking_keys,key) == NULL) return;
+
+ /* Key was already signaled? No need to queue it again. */
+ if (dictFind(c->db->ready_keys,key) != NULL) return;
+
+ /* Ok, we need to queue this key into server.ready_keys. */
+ rl = zmalloc(sizeof(*rl));
+ rl->key = key;
+ rl->db = c->db;
+ incrRefCount(key);
+ listAddNodeTail(server.ready_keys,rl);
+
+ /* We also add the key in the db->ready_keys dictionary in order
+ * to avoid adding it multiple times into a list with a simple O(1)
+ * check. */
+ incrRefCount(key);
+ redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
+}
+
+/* This is an helper function for handleClientsBlockedOnLists(). It's work
+ * is to serve a specific client (receiver) that is blocked on 'key'
+ * in the context of the specified 'db', doing the following:
*
- * If the function returns 1 there was a client waiting for a list push
- * against this key, the element was passed to this client thus it's not
- * needed to actually add it to the list and the caller should return asap. */
-int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
- struct dictEntry *de;
- redisClient *receiver;
- int numclients;
- list *clients;
- listNode *ln;
- robj *dstkey, *dstobj;
-
- de = dictFind(c->db->blocking_keys,key);
- if (de == NULL) return 0;
- clients = dictGetVal(de);
- numclients = listLength(clients);
-
- /* Try to handle the push as long as there are clients waiting for a push.
- * Note that "numclients" is used because the list of clients waiting for a
- * push on "key" is deleted by unblockClient() when empty.
- *
- * This loop will have more than 1 iteration when there is a BRPOPLPUSH
- * that cannot push the target list because it does not contain a list. If
- * this happens, it simply tries the next client waiting for a push. */
- while (numclients--) {
- ln = listFirst(clients);
- redisAssertWithInfo(c,key,ln != NULL);
- receiver = ln->value;
- dstkey = receiver->bpop.target;
-
- /* Protect receiver->bpop.target, that will be freed by
- * the next unblockClientWaitingData() call. */
- if (dstkey) incrRefCount(dstkey);
-
- /* This should remove the first element of the "clients" list. */
- unblockClientWaitingData(receiver);
-
- if (dstkey == NULL) {
- /* BRPOP/BLPOP */
- addReplyMultiBulkLen(receiver,2);
- addReplyBulk(receiver,key);
- addReplyBulk(receiver,ele);
- return 1; /* Serve just the first client as in B[RL]POP semantics */
+ * 1) Provide the client with the 'value' element.
+ * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
+ * 'value' element on the destionation list (the LPUSH side of the command).
+ * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
+ * the AOF and replication channel.
+ *
+ * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
+ * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
+ * we can propagate the command properly.
+ *
+ * The function returns REDIS_OK if we are able to serve the client, otherwise
+ * REDIS_ERR is returned to signal the caller that the list POP operation
+ * should be undoed as the client was not served: This only happens for
+ * BRPOPLPUSH that fails to push the value to the destination key as it is
+ * of the wrong type. */
+int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
+{
+ robj *argv[3];
+
+ if (dstkey == NULL) {
+ /* Propagate the [LR]POP operation. */
+ argv[0] = (where == REDIS_HEAD) ? shared.lpop :
+ shared.rpop;
+ argv[1] = key;
+ propagate((where == REDIS_HEAD) ?
+ server.lpopCommand : server.rpopCommand,
+ db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
+
+ /* BRPOP/BLPOP */
+ addReplyMultiBulkLen(receiver,2);
+ addReplyBulk(receiver,key);
+ addReplyBulk(receiver,value);
+ } else {
+ /* BRPOPLPUSH */
+ robj *dstobj =
+ lookupKeyWrite(receiver->db,dstkey);
+ if (!(dstobj &&
+ checkType(receiver,dstobj,REDIS_LIST)))
+ {
+ /* Propagate the RPOP operation. */
+ argv[0] = shared.rpop;
+ argv[1] = key;
+ propagate(server.rpopCommand,
+ db->id,argv,2,
+ REDIS_PROPAGATE_AOF|
+ REDIS_PROPAGATE_REPL);
+ rpoplpushHandlePush(receiver,dstkey,dstobj,
+ value);
+ /* Propagate the LPUSH operation. */
+ argv[0] = shared.lpush;
+ argv[1] = dstkey;
+ argv[2] = value;
+ propagate(server.lpushCommand,
+ db->id,argv,3,
+ REDIS_PROPAGATE_AOF|
+ REDIS_PROPAGATE_REPL);
} else {
- /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
- dstobj = lookupKeyWrite(receiver->db,dstkey);
- if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
- rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
- decrRefCount(dstkey);
- return 1;
- }
- decrRefCount(dstkey);
+ /* BRPOPLPUSH failed because of wrong
+ * destination type. */
+ return REDIS_ERR;
}
}
+ return REDIS_OK;
+}
- return 0;
+/* This function should be called by Redis every time a single command,
+ * a MULTI/EXEC block, or a Lua script, terminated its execution after
+ * being called by a client.
+ *
+ * All the keys with at least one client blocked that received at least
+ * one new element via some PUSH operation are accumulated into
+ * the server.ready_keys list. This function will run the list and will
+ * serve clients accordingly. Note that the function will iterate again and
+ * again as a result of serving BRPOPLPUSH we can have new blocking clients
+ * to serve because of the PUSH side of BRPOPLPUSH. */
+void handleClientsBlockedOnLists(void) {
+ while(listLength(server.ready_keys) != 0) {
+ list *l;
+
+ /* Point server.ready_keys to a fresh list and save the current one
+ * locally. This way as we run the old list we are free to call
+ * signalListAsReady() that may push new elements in server.ready_keys
+ * when handling clients blocked into BRPOPLPUSH. */
+ l = server.ready_keys;
+ server.ready_keys = listCreate();
+
+ while(listLength(l) != 0) {
+ listNode *ln = listFirst(l);
+ readyList *rl = ln->value;
+
+ /* First of all remove this key from db->ready_keys so that
+ * we can safely call signalListAsReady() against this key. */
+ dictDelete(rl->db->ready_keys,rl->key);
+
+ /* If the key exists and it's a list, serve blocked clients
+ * with data. */
+ robj *o = lookupKeyWrite(rl->db,rl->key);
+ if (o != NULL && o->type == REDIS_LIST) {
+ dictEntry *de;
+
+ /* We serve clients in the same order they blocked for
+ * this key, from the first blocked to the last. */
+ de = dictFind(rl->db->blocking_keys,rl->key);
+ if (de) {
+ list *clients = dictGetVal(de);
+ int numclients = listLength(clients);
+
+ while(numclients--) {
+ listNode *clientnode = listFirst(clients);
+ redisClient *receiver = clientnode->value;
+ robj *dstkey = receiver->bpop.target;
+ int where = (receiver->lastcmd &&
+ receiver->lastcmd->proc == blpopCommand) ?
+ REDIS_HEAD : REDIS_TAIL;
+ robj *value = listTypePop(o,where);
+
+ if (value) {
+ /* Protect receiver->bpop.target, that will be
+ * freed by the next unblockClientWaitingData()
+ * call. */
+ if (dstkey) incrRefCount(dstkey);
+ unblockClientWaitingData(receiver);
+
+ if (serveClientBlockedOnList(receiver,
+ rl->key,dstkey,rl->db,value,
+ where) == REDIS_ERR)
+ {
+ /* If we failed serving the client we need
+ * to also undo the POP operation. */
+ listTypePush(o,value,where);
+ }
+
+ if (dstkey) decrRefCount(dstkey);
+ decrRefCount(value);
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
+ /* We don't call signalModifiedKey() as it was already called
+ * when an element was pushed on the list. */
+ }
+
+ /* Free this item. */
+ decrRefCount(rl->key);
+ zfree(rl);
+ listDelNode(l,ln);
+ }
+ listRelease(l); /* We have the new list on place at this point. */
+ }
}
int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
if (key == NULL) {
if (c->flags & REDIS_MULTI) {
-
/* Blocking against an empty list in a multi state
* returns immediately. */
addReply(c, shared.nullbulk);
if (key->type != REDIS_LIST) {
addReply(c, shared.wrongtypeerr);
} else {
-
/* The list exists and has elements, so
* the regular rpoplpushCommand is executed. */
redisAssertWithInfo(c,key,listTypeLength(key) > 0);
* Set Commands
*----------------------------------------------------------------------------*/
+void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op);
+
/* Factory method to return a set that *can* hold "value". When the object has
* an integer-encodable value, an intset will be returned. Otherwise a regular
* hash table. */
}
/* Convert the set to specified encoding. The resulting dict (when converting
- * to a hashtable) is presized to hold the number of elements in the original
+ * to a hash table) is presized to hold the number of elements in the original
* set. */
void setTypeConvert(robj *setobj, int enc) {
setTypeIterator *si;
server.dirty++;
}
+/* handle the "SRANDMEMBER key <count>" variant. The normal version of the
+ * command is handled by the srandmemberCommand() function itself. */
+
+/* How many times bigger should be the set compared to the requested size
+ * for us to don't use the "remove elements" strategy? Read later in the
+ * implementation for more info. */
+#define SRANDMEMBER_SUB_STRATEGY_MUL 3
+
+void srandmemberWithCountCommand(redisClient *c) {
+ long l;
+ unsigned long count, size;
+ int uniq = 1;
+ robj *set, *ele;
+ int64_t llele;
+ int encoding;
+
+ dict *d;
+
+ if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != REDIS_OK) return;
+ if (l >= 0) {
+ count = (unsigned) l;
+ } else {
+ /* A negative count means: return the same elements multiple times
+ * (i.e. don't remove the extracted element after every extraction). */
+ count = -l;
+ uniq = 0;
+ }
+
+ if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk))
+ == NULL || checkType(c,set,REDIS_SET)) return;
+ size = setTypeSize(set);
+
+ /* If count is zero, serve it ASAP to avoid special cases later. */
+ if (count == 0) {
+ addReply(c,shared.emptymultibulk);
+ return;
+ }
+
+ /* CASE 1: The count was negative, so the extraction method is just:
+ * "return N random elements" sampling the whole set every time.
+ * This case is trivial and can be served without auxiliary data
+ * structures. */
+ if (!uniq) {
+ addReplyMultiBulkLen(c,count);
+ while(count--) {
+ encoding = setTypeRandomElement(set,&ele,&llele);
+ if (encoding == REDIS_ENCODING_INTSET) {
+ addReplyBulkLongLong(c,llele);
+ } else {
+ addReplyBulk(c,ele);
+ }
+ }
+ return;
+ }
+
+ /* CASE 2:
+ * The number of requested elements is greater than the number of
+ * elements inside the set: simply return the whole set. */
+ if (count >= size) {
+ sunionDiffGenericCommand(c,c->argv,c->argc-1,NULL,REDIS_OP_UNION);
+ return;
+ }
+
+ /* For CASE 3 and CASE 4 we need an auxiliary dictionary. */
+ d = dictCreate(&setDictType,NULL);
+
+ /* CASE 3:
+ * The number of elements inside the set is not greater than
+ * SRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements.
+ * In this case we create a set from scratch with all the elements, and
+ * subtract random elements to reach the requested number of elements.
+ *
+ * This is done because if the number of requsted elements is just
+ * a bit less than the number of elements in the set, the natural approach
+ * used into CASE 3 is highly inefficient. */
+ if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) {
+ setTypeIterator *si;
+
+ /* Add all the elements into the temporary dictionary. */
+ si = setTypeInitIterator(set);
+ while((encoding = setTypeNext(si,&ele,&llele)) != -1) {
+ int retval;
+
+ if (encoding == REDIS_ENCODING_INTSET) {
+ retval = dictAdd(d,createStringObjectFromLongLong(llele),NULL);
+ } else if (ele->encoding == REDIS_ENCODING_RAW) {
+ retval = dictAdd(d,dupStringObject(ele),NULL);
+ } else if (ele->encoding == REDIS_ENCODING_INT) {
+ retval = dictAdd(d,
+ createStringObjectFromLongLong((long)ele->ptr),NULL);
+ }
+ redisAssert(retval == DICT_OK);
+ }
+ setTypeReleaseIterator(si);
+ redisAssert(dictSize(d) == size);
+
+ /* Remove random elements to reach the right count. */
+ while(size > count) {
+ dictEntry *de;
+
+ de = dictGetRandomKey(d);
+ dictDelete(d,dictGetKey(de));
+ size--;
+ }
+ }
+
+ /* CASE 4: We have a big set compared to the requested number of elements.
+ * In this case we can simply get random elements from the set and add
+ * to the temporary set, trying to eventually get enough unique elements
+ * to reach the specified count. */
+ else {
+ unsigned long added = 0;
+
+ while(added < count) {
+ encoding = setTypeRandomElement(set,&ele,&llele);
+ if (encoding == REDIS_ENCODING_INTSET) {
+ ele = createStringObjectFromLongLong(llele);
+ } else if (ele->encoding == REDIS_ENCODING_RAW) {
+ ele = dupStringObject(ele);
+ } else if (ele->encoding == REDIS_ENCODING_INT) {
+ ele = createStringObjectFromLongLong((long)ele->ptr);
+ }
+ /* Try to add the object to the dictionary. If it already exists
+ * free it, otherwise increment the number of objects we have
+ * in the result dictionary. */
+ if (dictAdd(d,ele,NULL) == DICT_OK)
+ added++;
+ else
+ decrRefCount(ele);
+ }
+ }
+
+ /* CASE 3 & 4: send the result to the user. */
+ {
+ dictIterator *di;
+ dictEntry *de;
+
+ addReplyMultiBulkLen(c,count);
+ di = dictGetIterator(d);
+ while((de = dictNext(di)) != NULL)
+ addReplyBulk(c,dictGetKey(de));
+ dictReleaseIterator(di);
+ dictRelease(d);
+ }
+}
+
void srandmemberCommand(redisClient *c) {
robj *set, *ele;
int64_t llele;
int encoding;
+ if (c->argc == 3) {
+ srandmemberWithCountCommand(c);
+ return;
+ } else if (c->argc > 3) {
+ addReply(c,shared.syntaxerr);
+ return;
+ }
+
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,set,REDIS_SET)) return;
#include <math.h>
#include <unistd.h>
#include <sys/time.h>
+#include <float.h>
#include "util.h"
* integer printing function that is much faster. */
double min = -4503599627370495; /* (2^52)-1 */
double max = 4503599627370496; /* -(2^52) */
- if (val > min && val < max && value == ((double)((long long)value)))
+ if (value > min && value < max && value == ((double)((long long)value)))
len = ll2string(buf,len,(long long)value);
else
#endif
-#define REDIS_VERSION "2.5.10"
+#define REDIS_VERSION "2.5.13"
int32_t i32;
int64_t i64;
if (encoding == ZIP_INT_8B) {
- ((char*)p)[0] = (char)value;
+ ((int8_t*)p)[0] = (int8_t)value;
} else if (encoding == ZIP_INT_16B) {
i16 = value;
memcpy(p,&i16,sizeof(i16));
} else if (encoding == ZIP_INT_24B) {
i32 = value<<8;
memrev32ifbe(&i32);
- memcpy(p,((unsigned char*)&i32)+1,sizeof(i32)-sizeof(int8_t));
+ memcpy(p,((uint8_t*)&i32)+1,sizeof(i32)-sizeof(uint8_t));
} else if (encoding == ZIP_INT_32B) {
i32 = value;
memcpy(p,&i32,sizeof(i32));
int32_t i32;
int64_t i64, ret = 0;
if (encoding == ZIP_INT_8B) {
- ret = ((char*)p)[0];
+ ret = ((int8_t*)p)[0];
} else if (encoding == ZIP_INT_16B) {
memcpy(&i16,p,sizeof(i16));
memrev16ifbe(&i16);
ret = i32;
} else if (encoding == ZIP_INT_24B) {
i32 = 0;
- memcpy(((unsigned char*)&i32)+1,p,sizeof(i32)-sizeof(int8_t));
+ memcpy(((uint8_t*)&i32)+1,p,sizeof(i32)-sizeof(uint8_t));
memrev32ifbe(&i32);
ret = i32>>8;
} else if (encoding == ZIP_INT_64B) {
totlen = p-first.p;
if (totlen > 0) {
if (p[0] != ZIP_END) {
- /* Tricky: storing the prevlen in this entry might reduce or
- * increase the number of bytes needed, compared to the current
- * prevlen. Note that we can always store this length because
- * it was previously stored by an entry that is being deleted. */
+ /* Storing `prevrawlen` in this entry may increase or decrease the
+ * number of bytes required compare to the current `prevrawlen`.
+ * There always is room to store this, because it was previously
+ * stored by an entry that is now being deleted. */
nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);
- zipPrevEncodeLength(p-nextdiff,first.prevrawlen);
+ p -= nextdiff;
+ zipPrevEncodeLength(p,first.prevrawlen);
/* Update offset for tail */
ZIPLIST_TAIL_OFFSET(zl) =
}
/* Move tail to the front of the ziplist */
- memmove(first.p,p-nextdiff,
- intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1+nextdiff);
+ memmove(first.p,p,
+ intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);
} else {
/* The entire tail was deleted. No need to move memory. */
ZIPLIST_TAIL_OFFSET(zl) =
return p;
}
} else {
- /* Find out if the specified entry can be encoded */
+ /* Find out if the searched field can be encoded. Note that
+ * we do it only the first time, once done vencoding is set
+ * to non-zero and vll is set to the integer value. */
if (vencoding == 0) {
- /* UINT_MAX when the entry CANNOT be encoded */
if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
+ /* If the entry can't be encoded we set it to
+ * UCHAR_MAX so that we don't retry again the next
+ * time. */
vencoding = UCHAR_MAX;
}
-
/* Must be non-zero by now */
assert(vencoding);
}
- /* Compare current entry with specified entry */
- if (encoding == vencoding) {
+ /* Compare current entry with specified entry, do it only
+ * if vencoding != UCHAR_MAX because if there is no encoding
+ * possible for the field it can't be a valid integer. */
+ if (vencoding != UCHAR_MAX) {
long long ll = zipLoadInteger(q, encoding);
if (ll == vll) {
return p;
return len;
}
+void verify(unsigned char *zl, zlentry *e) {
+ int i;
+ int len = ziplistLen(zl);
+ zlentry _e;
+
+ for (i = 0; i < len; i++) {
+ memset(&e[i], 0, sizeof(zlentry));
+ e[i] = zipEntry(ziplistIndex(zl, i));
+
+ memset(&_e, 0, sizeof(zlentry));
+ _e = zipEntry(ziplistIndex(zl, -len+i));
+
+ assert(memcmp(&e[i], &_e, sizeof(zlentry)) == 0);
+ }
+}
+
int main(int argc, char **argv) {
unsigned char *zl, *p;
unsigned char *entry;
printf("SUCCESS\n\n");
}
+ printf("Regression test deleting next to last entries:\n");
+ {
+ char v[3][257];
+ zlentry e[3];
+ int i;
+
+ for (i = 0; i < (sizeof(v)/sizeof(v[0])); i++) {
+ memset(v[i], 'a' + i, sizeof(v[0]));
+ }
+
+ v[0][256] = '\0';
+ v[1][ 1] = '\0';
+ v[2][256] = '\0';
+
+ zl = ziplistNew();
+ for (i = 0; i < (sizeof(v)/sizeof(v[0])); i++) {
+ zl = ziplistPush(zl, (unsigned char *) v[i], strlen(v[i]), ZIPLIST_TAIL);
+ }
+
+ verify(zl, e);
+
+ assert(e[0].prevrawlensize == 1);
+ assert(e[1].prevrawlensize == 5);
+ assert(e[2].prevrawlensize == 1);
+
+ /* Deleting entry 1 will increase `prevrawlensize` for entry 2 */
+ unsigned char *p = e[1].p;
+ zl = ziplistDelete(zl, &p);
+
+ verify(zl, e);
+
+ assert(e[0].prevrawlensize == 1);
+ assert(e[1].prevrawlensize == 5);
+
+ printf("SUCCESS\n\n");
+ }
+
printf("Create long list and check indices:\n");
{
zl = ziplistNew();
* <len> lengths are encoded in a single value or in a 5 bytes value.
* If the first byte value (as an unsigned 8 bit value) is between 0 and
* 252, it's a single-byte length. If it is 253 then a four bytes unsigned
- * integer follows (in the host byte ordering). A value fo 255 is used to
+ * integer follows (in the host byte ordering). A value of 255 is used to
* signal the end of the hash. The special value 254 is used to mark
* empty space that can be used to add new key/value pairs.
*
- * <free> is the number of free unused bytes
- * after the string, resulting from modification of values associated to a
- * key (for instance if "foo" is set to "bar', and later "foo" will be se to
- * "hi", I'll have a free byte to use if the value will enlarge again later,
- * or even in order to add a key/value pair if it fits.
+ * <free> is the number of free unused bytes after the string, resulting
+ * from modification of values associated to a key. For instance if "foo"
+ * is set to "bar", and later "foo" will be set to "hi", it will have a
+ * free byte to use if the value will enlarge again later, or even in
+ * order to add a key/value pair if it fits.
*
* <free> is always an unsigned 8 bit number, because if after an
* update operation there are more than a few free bytes, the zipmap will be
static int zmalloc_thread_safe = 0;
pthread_mutex_t used_memory_mutex = PTHREAD_MUTEX_INITIALIZER;
-static void zmalloc_oom(size_t size) {
+static void zmalloc_default_oom(size_t size) {
fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n",
size);
fflush(stderr);
abort();
}
+static void (*zmalloc_oom_handler)(size_t) = zmalloc_default_oom;
+
void *zmalloc(size_t size) {
void *ptr = malloc(size+PREFIX_SIZE);
- if (!ptr) zmalloc_oom(size);
+ if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(zmalloc_size(ptr),size);
return ptr;
void *zcalloc(size_t size) {
void *ptr = calloc(1, size+PREFIX_SIZE);
- if (!ptr) zmalloc_oom(size);
+ if (!ptr) zmalloc_oom_handler(size);
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(zmalloc_size(ptr),size);
return ptr;
#ifdef HAVE_MALLOC_SIZE
oldsize = zmalloc_size(ptr);
newptr = realloc(ptr,size);
- if (!newptr) zmalloc_oom(size);
+ if (!newptr) zmalloc_oom_handler(size);
update_zmalloc_stat_free(oldsize);
update_zmalloc_stat_alloc(zmalloc_size(newptr),size);
realptr = (char*)ptr-PREFIX_SIZE;
oldsize = *((size_t*)realptr);
newptr = realloc(realptr,size+PREFIX_SIZE);
- if (!newptr) zmalloc_oom(size);
+ if (!newptr) zmalloc_oom_handler(size);
*((size_t*)newptr) = size;
update_zmalloc_stat_free(oldsize);
zmalloc_thread_safe = 1;
}
+void zmalloc_set_oom_handler(void (*oom_handler)(size_t)) {
+ zmalloc_oom_handler = oom_handler;
+}
+
/* Get the RSS information in an OS-specific way.
*
* WARNING: the function zmalloc_get_rss() is not designed to be fast
char *zstrdup(const char *s);
size_t zmalloc_used_memory(void);
void zmalloc_enable_thread_safeness(void);
+void zmalloc_set_oom_handler(void (*oom_handler)(size_t));
float zmalloc_get_fragmentation_ratio(void);
size_t zmalloc_get_rss(void);
void zlibc_free(void *ptr);
test {SET on the master should immediately propagate} {
r -1 set mykey bar
- if {$::valgrind} {after 2000}
- r 0 get mykey
- } {bar}
+
+ wait_for_condition 500 100 {
+ [r 0 get mykey] eq {bar}
+ } else {
+ fail "SET on master did not propagated on slave"
+ }
+ }
test {FLUSHALL should replicate} {
r -1 flushall
expr {int(rand()*$max)}
}
+proc randomSignedInt {max} {
+ set i [randomInt $max]
+ if {rand() > 0.5} {
+ set i -$i
+ }
+ return $i
+}
+
proc randpath args {
set path [expr {int(rand()*[llength $args])}]
uplevel 1 [lindex $args $path]
proc randomValue {} {
randpath {
# Small enough to likely collide
- randomInt 1000
+ randomSignedInt 1000
} {
# 32 bit compressible signed/unsigned
- randpath {randomInt 2000000000} {randomInt 4000000000}
+ randpath {randomSignedInt 2000000000} {randomSignedInt 4000000000}
} {
# 64 bit
- randpath {randomInt 1000000000000}
+ randpath {randomSignedInt 1000000000000}
} {
# Random string
randpath {randstring 0 256 alpha} \
set e
} {ERR*syntax*}
+ test {BITCOUNT regression test for github issue #582} {
+ r del str
+ r setbit foo 0 1
+ if {[catch {r bitcount foo 0 4294967296} e]} {
+ assert_match {*ERR*out of range*} $e
+ set _ 1
+ } else {
+ set e
+ }
+ } {1}
+
test {BITOP NOT (empty string)} {
r set s ""
r bitop not dest s
set e
} {*not allowed after*}
+ test {EVAL - No arguments to redis.call/pcall is considered an error} {
+ set e {}
+ catch {r eval {return redis.call()} 0} e
+ set e
+ } {*one argument*}
+
test {EVAL - redis.call variant raises a Lua error on Redis cmd error (1)} {
set e {}
catch {
r eval {return redis.call('smembers','myset')} 0
} {a aa aaa azz b c d e f g h i l m n o p q r s t u v z}
- test "SORT is normally not re-ordered by the scripting engine" {
+ test "SORT is normally not alpha re-ordered for the scripting engine" {
r del myset
r sadd myset 1 2 3 4 10
r eval {return redis.call('sort','myset','desc')} 0
} {10 4 3 2 1}
- test "SORT BY <constant> output gets ordered by scripting" {
+ test "SORT BY <constant> output gets ordered for scripting" {
r del myset
r sadd myset a b c d e f g h i l m n o p q r s t u v z aa aaa azz
r eval {return redis.call('sort','myset','by','_')} 0
} {a aa aaa azz b c d e f g h i l m n o p q r s t u v z}
- test "SORT output containing NULLs is well handled by scripting" {
+ test "SORT BY <constant> with GET gets ordered for scripting" {
r del myset
r sadd myset a b c
r eval {return redis.call('sort','myset','by','_','get','#','get','_:*')} 0
- } {{} {} {} a b c}
+ } {a {} b {} c {}}
test "redis.sha1hex() implementation" {
list [r eval {return redis.sha1hex('')} 0] \
fail "Expected 2 in x, but value is '[r -1 get x]'"
}
}
+
+ test {Replication of script multiple pushes to list with BLPOP} {
+ set rd [redis_deferring_client]
+ $rd brpop a 0
+ r eval {
+ redis.call("lpush","a","1");
+ redis.call("lpush","a","2");
+ } 0
+ set res [$rd read]
+ $rd close
+ wait_for_condition 50 100 {
+ [r -1 lrange a 0 -1] eq [r lrange a 0 -1]
+ } else {
+ fail "Expected list 'a' in slave and master to be the same, but they are respectively '[r -1 lrange a 0 -1]' and '[r lrange a 0 -1]'"
+ }
+ set res
+ } {a 1}
}
}
r sort zset alpha desc
} {e d c b a}
+ test "SORT sorted set BY nosort should retain ordering" {
+ r del zset
+ r zadd zset 1 a
+ r zadd zset 5 b
+ r zadd zset 2 c
+ r zadd zset 10 d
+ r zadd zset 3 e
+ r multi
+ r sort zset by nosort asc
+ r sort zset by nosort desc
+ r exec
+ } {{a c e b d} {d b e c a}}
+
+ test "SORT sorted set BY nosort + LIMIT" {
+ r del zset
+ r zadd zset 1 a
+ r zadd zset 5 b
+ r zadd zset 2 c
+ r zadd zset 10 d
+ r zadd zset 3 e
+ assert_equal [r sort zset by nosort asc limit 0 1] {a}
+ assert_equal [r sort zset by nosort desc limit 0 1] {d}
+ assert_equal [r sort zset by nosort asc limit 0 2] {a c}
+ assert_equal [r sort zset by nosort desc limit 0 2] {d b}
+ assert_equal [r sort zset by nosort limit 5 10] {}
+ assert_equal [r sort zset by nosort limit -10 100] {a c e b d}
+ }
+
+ test "SORT sorted set BY nosort works as expected from scripts" {
+ r del zset
+ r zadd zset 1 a
+ r zadd zset 5 b
+ r zadd zset 2 c
+ r zadd zset 10 d
+ r zadd zset 3 e
+ r eval {
+ return {redis.call('sort','zset','by','nosort','asc'),
+ redis.call('sort','zset','by','nosort','desc')}
+ } 0
+ } {{a c e b d} {d b e c a}}
+
test "SORT sorted set: +inf and -inf handling" {
r del zset
r zadd zset -100 a
} {b}
foreach size {10 512} {
- test "Hash fuzzing - $size fields" {
+ test "Hash fuzzing #1 - $size fields" {
for {set times 0} {$times < 10} {incr times} {
catch {unset hash}
array set hash {}
assert_equal [array size hash] [r hlen hash]
}
}
+
+ test "Hash fuzzing #2 - $size fields" {
+ for {set times 0} {$times < 10} {incr times} {
+ catch {unset hash}
+ array set hash {}
+ r del hash
+
+ # Create
+ for {set j 0} {$j < $size} {incr j} {
+ randpath {
+ set field [randomValue]
+ set value [randomValue]
+ r hset hash $field $value
+ set hash($field) $value
+ } {
+ set field [randomSignedInt 512]
+ set value [randomSignedInt 512]
+ r hset hash $field $value
+ set hash($field) $value
+ } {
+ randpath {
+ set field [randomValue]
+ } {
+ set field [randomSignedInt 512]
+ }
+ r hdel hash $field
+ unset -nocomplain hash($field)
+ }
+ }
+
+ # Verify
+ foreach {k v} [array get hash] {
+ assert_equal $v [r hget hash $k]
+ }
+ assert_equal [array size hash] [r hlen hash]
+ }
+ }
+ }
+
+ test {Stress test the hash ziplist -> hashtable encoding conversion} {
+ r config set hash-max-ziplist-entries 32
+ for {set j 0} {$j < 100} {incr j} {
+ r del myhash
+ for {set i 0} {$i < 64} {incr i} {
+ r hset myhash [randomValue] [randomValue]
+ }
+ assert {[r object encoding myhash] eq {hashtable}}
+ }
}
}
}
}
+ test "BLPOP, LPUSH + DEL should not awake blocked client" {
+ set rd [redis_deferring_client]
+ r del list
+
+ $rd blpop list 0
+ r multi
+ r lpush list a
+ r del list
+ r exec
+ r del list
+ r lpush list b
+ $rd read
+ } {list b}
+
+ test "BLPOP, LPUSH + DEL + SET should not awake blocked client" {
+ set rd [redis_deferring_client]
+ r del list
+
+ $rd blpop list 0
+ r multi
+ r lpush list a
+ r del list
+ r set list foo
+ r exec
+ r del list
+ r lpush list b
+ $rd read
+ } {list b}
+
+ test "MULTI/EXEC is isolated from the point of view of BLPOP" {
+ set rd [redis_deferring_client]
+ r del list
+ $rd blpop list 0
+ r multi
+ r lpush list a
+ r lpush list b
+ r lpush list c
+ r exec
+ $rd read
+ } {list c}
+
test "BLPOP with variadic LPUSH" {
set rd [redis_deferring_client]
r del blist target
if {$::valgrind} {after 100}
assert_equal 2 [r lpush blist foo bar]
if {$::valgrind} {after 100}
- assert_equal {blist foo} [$rd read]
- assert_equal bar [lindex [r lrange blist 0 -1] 0]
+ assert_equal {blist bar} [$rd read]
+ assert_equal foo [lindex [r lrange blist 0 -1] 0]
}
test "BRPOPLPUSH with zero timeout should block indefinitely" {
assert_equal {foo} [r lrange blist 0 -1]
}
+ test "BRPOPLPUSH maintains order of elements after failure" {
+ set rd [redis_deferring_client]
+ r del blist target
+ r set target nolist
+ $rd brpoplpush blist target 0
+ r rpush blist a b c
+ assert_error "ERR*wrong kind*" {$rd read}
+ r lrange blist 0 -1
+ } {a b c}
+
test "BRPOPLPUSH with multiple blocked clients" {
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
r exec
} {foo bar {} {} {bar foo}}
+ test "PUSH resulting from BRPOPLPUSH affect WATCH" {
+ set blocked_client [redis_deferring_client]
+ set watching_client [redis_deferring_client]
+ r del srclist dstlist somekey
+ r set somekey somevalue
+ $blocked_client brpoplpush srclist dstlist 0
+ $watching_client watch dstlist
+ $watching_client read
+ $watching_client multi
+ $watching_client read
+ $watching_client get somekey
+ $watching_client read
+ r lpush srclist element
+ $watching_client exec
+ $watching_client read
+ } {}
+
+ test "BRPOPLPUSH does not affect WATCH while still blocked" {
+ set blocked_client [redis_deferring_client]
+ set watching_client [redis_deferring_client]
+ r del srclist dstlist somekey
+ r set somekey somevalue
+ $blocked_client brpoplpush srclist dstlist 0
+ $watching_client watch dstlist
+ $watching_client read
+ $watching_client multi
+ $watching_client read
+ $watching_client get somekey
+ $watching_client read
+ $watching_client exec
+ # Blocked BLPOPLPUSH may create problems, unblock it.
+ r lpush srclist element
+ $watching_client read
+ } {somevalue}
+
test {BRPOPLPUSH timeout} {
set rd [redis_deferring_client]
}
}
+ test "SRANDMEMBER with <count> against non existing key" {
+ r srandmember nonexisting_key 100
+ } {}
+
+ foreach {type contents} {
+ hashtable {
+ 1 5 10 50 125 50000 33959417 4775547 65434162
+ 12098459 427716 483706 2726473884 72615637475
+ MARY PATRICIA LINDA BARBARA ELIZABETH JENNIFER MARIA
+ SUSAN MARGARET DOROTHY LISA NANCY KAREN BETTY HELEN
+ SANDRA DONNA CAROL RUTH SHARON MICHELLE LAURA SARAH
+ KIMBERLY DEBORAH JESSICA SHIRLEY CYNTHIA ANGELA MELISSA
+ BRENDA AMY ANNA REBECCA VIRGINIA KATHLEEN
+ }
+ intset {
+ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
+ 20 21 22 23 24 25 26 27 28 29
+ 30 31 32 33 34 35 36 37 38 39
+ 40 41 42 43 44 45 46 47 48 49
+ }
+ } {
+ test "SRANDMEMBER with <count> - $type" {
+ create_set myset $contents
+ unset -nocomplain myset
+ array set myset {}
+ foreach ele [r smembers myset] {
+ set myset($ele) 1
+ }
+ assert_equal [lsort $contents] [lsort [array names myset]]
+
+ # Make sure that a count of 0 is handled correctly.
+ assert_equal [r srandmember myset 0] {}
+
+ # We'll stress different parts of the code, see the implementation
+ # of SRANDMEMBER for more information, but basically there are
+ # four different code paths.
+ #
+ # PATH 1: Use negative count.
+ #
+ # 1) Check that it returns repeated elements.
+ set res [r srandmember myset -100]
+ assert_equal [llength $res] 100
+
+ # 2) Check that all the elements actually belong to the
+ # original set.
+ foreach ele $res {
+ assert {[info exists myset($ele)]}
+ }
+
+ # 3) Check that eventually all the elements are returned.
+ unset -nocomplain auxset
+ set iterations 1000
+ while {$iterations != 0} {
+ incr iterations -1
+ set res [r srandmember myset -10]
+ foreach ele $res {
+ set auxset($ele) 1
+ }
+ if {[lsort [array names myset]] eq
+ [lsort [array names auxset]]} {
+ break;
+ }
+ }
+ assert {$iterations != 0}
+
+ # PATH 2: positive count (unique behavior) with requested size
+ # equal or greater than set size.
+ foreach size {50 100} {
+ set res [r srandmember myset $size]
+ assert_equal [llength $res] 50
+ assert_equal [lsort $res] [lsort [array names myset]]
+ }
+
+ # PATH 3: Ask almost as elements as there are in the set.
+ # In this case the implementation will duplicate the original
+ # set and will remove random elements up to the requested size.
+ #
+ # PATH 4: Ask a number of elements definitely smaller than
+ # the set size.
+ #
+ # We can test both the code paths just changing the size but
+ # using the same code.
+
+ foreach size {45 5} {
+ set res [r srandmember myset $size]
+ assert_equal [llength $res] $size
+
+ # 1) Check that all the elements actually belong to the
+ # original set.
+ foreach ele $res {
+ assert {[info exists myset($ele)]}
+ }
+
+ # 2) Check that eventually all the elements are returned.
+ unset -nocomplain auxset
+ set iterations 1000
+ while {$iterations != 0} {
+ incr iterations -1
+ set res [r srandmember myset -10]
+ foreach ele $res {
+ set auxset($ele) 1
+ }
+ if {[lsort [array names myset]] eq
+ [lsort [array names auxset]]} {
+ break;
+ }
+ }
+ assert {$iterations != 0}
+ }
+ }
+ }
+
proc setup_move {} {
r del myset3 myset4
create_set myset1 {1 a b}