]> git.saurik.com Git - redis.git/commitdiff
Merge pull request #544 from dvirsky/2.6
authorSalvatore Sanfilippo <antirez@gmail.com>
Fri, 5 Oct 2012 10:30:58 +0000 (03:30 -0700)
committerSalvatore Sanfilippo <antirez@gmail.com>
Fri, 5 Oct 2012 10:30:58 +0000 (03:30 -0700)
fixed install script to rewrite the default config

52 files changed:
.gitignore
00-RELEASENOTES
deps/hiredis/README.md
deps/hiredis/async.c
deps/hiredis/hiredis.c
deps/hiredis/hiredis.h
deps/hiredis/net.c
redis.conf
sentinel.conf [new file with mode: 0644]
src/Makefile
src/ae.c
src/ae.h
src/ae_epoll.c
src/anet.c
src/aof.c
src/bitops.c
src/config.c
src/config.h
src/db.c
src/debug.c
src/dict.c
src/dict.h
src/fmacros.h
src/networking.c
src/rdb.c
src/redis-benchmark.c
src/redis-cli.c
src/redis.c
src/redis.h
src/replication.c
src/scripting.c
src/sds.c
src/sds.h
src/sentinel.c [new file with mode: 0644]
src/sort.c
src/t_hash.c
src/t_list.c
src/t_set.c
src/util.c
src/version.h
src/ziplist.c
src/zipmap.c
src/zmalloc.c
src/zmalloc.h
tests/integration/replication.tcl
tests/support/util.tcl
tests/unit/bitops.tcl
tests/unit/scripting.tcl
tests/unit/sort.tcl
tests/unit/type/hash.tcl
tests/unit/type/list.tcl
tests/unit/type/set.tcl

index 5f262c4617928cbc4adf7e4d01cc06ad7fa3ca8d..89afeb0f79b6fcd72978fdbeaf6081175baa7ad3 100644 (file)
@@ -2,11 +2,7 @@
 *.o
 *.rdb
 *.log
-redis-cli
-redis-server
-redis-benchmark
-redis-check-dump
-redis-check-aof
+redis-*
 doc-tools
 release
 misc/*
@@ -16,7 +12,6 @@ SHORT_TERM_TODO
 release.h
 src/transfer.sh
 src/configs
-src/redis-server.dSYM
 redis.ds
 src/redis.conf
 deps/lua/src/lua
@@ -24,3 +19,4 @@ deps/lua/src/luac
 deps/lua/src/liblua.a
 .make-*
 .prerequisites
+*.dSYM
index 031a433ea7944d94e98f9c28dffb4470ad3c5ec9..12f1e3d1862da41183cb109e97be2f3d42bfe86c 100644 (file)
@@ -10,10 +10,69 @@ Upgrade urgency levels:
 
 LOW:      No need to upgrade unless there are new features you want to use.
 MODERATE: Program an upgrade of the server, but it's not urgent.
-HIGH:     There is a critical bug that may affect some part of users. Upgrade!
+HIGH:     There is a critical bug that may affect a subset of users. Upgrade!
 CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
 --------------------------------------------------------------------------------
 
+---[ Redis 2.5.13 (2.6 Release Candidate 7) ]
+
+UPGRADE URGENCY: HIGH
+
+* [BUGFIX]   Theoretical bug in ziplist fixed.
+* [BUGFIX]   Better out of memory handling (Log produced in log file).
+* [BUGFIX]   Incrementally flush RDB file on slave side while performing the
+             first synchronization with the master. This makes Redis less
+             blocking in environments where disk I/O is slow.
+* [BUGFIX]   Don't crash with Lua's redis.call() without arguments.
+* [BUGFIX]   Don't crash after a big number of Lua calls on 32 bit systems
+             because of a failed assertion.
+* [BUGFIX]   Fix SORT behaviour when called from scripting.
+* [BUGFIX]   Adjust slave PING period accordingly to REDIS_HZ define.
+* [BUGFIX]   BITCOUNT: fix crash on overflowing arguments.
+* [BUGFIX]   Return an error when SELECT argument is not an integer.
+* [BUGFIX]   Blocking operations on lists were completely reimplemented for
+             correctness. Now blocking list ops and pushes originated from
+             Lua scripts will play well together and will be replicated
+             and transmitted to the AOF correctly.
+* [IMPROVED] Send async PING before starting replication to avoid blocking if
+             master allows us to connect but it is actually not able to reply.
+* [IMPROVED] Support slave-priority for Redis Sentinel.
+* [IMPROVED] Hiredis library updated.
+
+---[ Redis 2.5.12 (2.6 Release Candidate 6) ]
+
+UPGRADE URGENCY: MODERATE.
+
+* [BUGFIX]   Fixed a timing attack on AUTH (Issue #560).
+* [BUGFIX]   Don't assume that "char" is signed.
+* [BUGFIX]   Check that we have connection before enabling pipe mode.
+* [BUGFIX]   Use the optimized version of the function to convert a double to
+             its string representation. Compilation was disabled because of
+             a typo in the #if statement.
+* [IMPROVED} REPLCONF internal command introduced, now INFO shows slaves with
+             correct port numbers. This makes 2.5.12 Redis Sentinel compatible.
+* [IMPROVED] Truncate short write from the AOF for a cleaner restart. On short
+             writes (for instance out of space) Redis will now try to remove
+             the half-written data so that the next restart will work without
+             the need for the "redis-check-aof" utility.
+* [IMPROVED] New in INFO: aof_last_bgrewrite_status
+* [IMPROVED] Allow Pub/Sub in contexts where other commands are blocked.
+* [BUGFIX]   mark fd as writable when EPOLLERR or EPOLLHUP is returned by
+             epoll_wait.
+
+---[ Redis 2.5.11 (2.6 Release Candidate 5) ]
+
+UPGRADE URGENCY: HIGH.
+
+* [BUGFIX]   Fixed Hash corruption when loading an RDB file generated by
+             previous versions of Redis that encoded hashes using
+             a different ziplist encoding format for small integers.
+             All the fileds that are integers in the range 0-255 may not
+             be recognized, or duplicated un updates, causing a crash
+             when the ziplist is converted to a real hash. (Issue #547).
+* [BUGFIX]   Fixed the count of memory used by output buffers in the
+             setDeferredMultiBulkLength() function.
+
 ---[ Redis 2.5.10 (2.6 Release Candidate 4) ]
 
 UPGRADE URGENCY: HIGH.
@@ -123,7 +182,7 @@ Redis 2.4 is mostly a strict subset of 2.6. However there are a few things
 that you should be aware of:
 
 * You can't use .rdb and AOF files generated with 2.6 into a 2.4 instance.
-* 2.4 slaves can be attached to 2.6 masters, but not the contrary, and only
+* 2.6 slaves can be attached to 2.4 masters, but not the contrary, and only
   for the time needed to perform the version upgrade.
 
 There are also a few API differences, that are unlikely to cause problems,
index a58101cc6e503bc4e14d44fc6b7e40408c641113..62fe1067b7cf2f20fba08ae5d98edb6e75bc6cdf 100644 (file)
@@ -73,7 +73,7 @@ convert it to the protocol used to communicate with Redis.
 One or more spaces separates arguments, so you can use the specifiers
 anywhere in an argument:
 
-    reply = redisCommand("SET key:%s %s", myid, value);
+    reply = redisCommand(context, "SET key:%s %s", myid, value);
 
 ### Using replies
 
@@ -320,6 +320,10 @@ The reply parsing API consists of the following functions:
     int redisReaderFeed(redisReader *reader, const char *buf, size_t len);
     int redisReaderGetReply(redisReader *reader, void **reply);
 
+The same set of functions are used internally by hiredis when creating a
+normal Redis context, the above API just exposes it to the user for a direct
+usage.
+
 ### Usage
 
 The function `redisReaderCreate` creates a `redisReader` structure that holds a
@@ -346,6 +350,29 @@ immediately after creating the `redisReader`.
 For example, [hiredis-rb](https://github.com/pietern/hiredis-rb/blob/master/ext/hiredis_ext/reader.c)
 uses customized reply object functions to create Ruby objects.
 
+### Reader max buffer
+
+Both when using the Reader API directly or when using it indirectly via a
+normal Redis context, the redisReader structure uses a buffer in order to
+accumulate data from the server.
+Usually this buffer is destroyed when it is empty and is larger than 16
+kb in order to avoid wasting memory in unused buffers
+
+However when working with very big payloads destroying the buffer may slow
+down performances considerably, so it is possible to modify the max size of
+an idle buffer changing the value of the `maxbuf` field of the reader structure
+to the desired value. The special value of 0 means that there is no maximum
+value for an idle buffer, so the buffer will never get freed.
+
+For instance if you have a normal Redis context you can set the maximum idle
+buffer to zero (unlimited) just with:
+
+    context->reader->maxbuf = 0;
+
+This should be done only in order to maximize performances when working with
+large payloads. The context should be set back to `REDIS_READER_MAX_BUF` again
+as soon as possible in order to prevent allocation of useless memory.
+
 ## AUTHORS
 
 Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and
index f83e2f51af03c77e5864e88a304339d244d02650..f65f8694ccfabd440e15393946deef1bbac27144 100644 (file)
@@ -372,6 +372,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
                 __redisAsyncDisconnect(ac);
                 return;
             }
+            
+            /* If monitor mode, repush callback */
+            if(c->flags & REDIS_MONITORING) {
+                __redisPushCallback(&ac->replies,&cb);
+            }
 
             /* When the connection is not being disconnected, simply stop
              * trying to get replies and wait for the next loop tick. */
@@ -381,22 +386,31 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
         /* Even if the context is subscribed, pending regular callbacks will
          * get a reply before pub/sub messages arrive. */
         if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
-            /* A spontaneous reply in a not-subscribed context can only be the
-             * error reply that is sent when a new connection exceeds the
-             * maximum number of allowed connections on the server side. This
-             * is seen as an error instead of a regular reply because the
-             * server closes the connection after sending it. To prevent the
-             * error from being overwritten by an EOF error the connection is
-             * closed here. See issue #43. */
-            if ( !(c->flags & REDIS_SUBSCRIBED) && ((redisReply*)reply)->type == REDIS_REPLY_ERROR ) {
+            /*
+             * A spontaneous reply in a not-subscribed context can be the error
+             * reply that is sent when a new connection exceeds the maximum
+             * number of allowed connections on the server side.
+             *
+             * This is seen as an error instead of a regular reply because the
+             * server closes the connection after sending it.
+             *
+             * To prevent the error from being overwritten by an EOF error the
+             * connection is closed here. See issue #43.
+             *
+             * Another possibility is that the server is loading its dataset.
+             * In this case we also want to close the connection, and have the
+             * user wait until the server is ready to take our request.
+             */
+            if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
                 c->err = REDIS_ERR_OTHER;
                 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
                 __redisAsyncDisconnect(ac);
                 return;
             }
-            /* No more regular callbacks and no errors, the context *must* be subscribed. */
-            assert(c->flags & REDIS_SUBSCRIBED);
-            __redisGetSubscribeCallback(ac,reply,&cb);
+            /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */
+            assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING));
+            if(c->flags & REDIS_SUBSCRIBED)
+                __redisGetSubscribeCallback(ac,reply,&cb);
         }
 
         if (cb.fn != NULL) {
@@ -557,6 +571,10 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
         /* (P)UNSUBSCRIBE does not have its own response: every channel or
          * pattern that is unsubscribed will receive a message. This means we
          * should not append a callback function for this command. */
+     } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) {
+         /* Set monitor flag and push callback */
+         c->flags |= REDIS_MONITORING;
+         __redisPushCallback(&ac->replies,&cb);
     } else {
         if (c->flags & REDIS_SUBSCRIBED)
             /* This will likely result in an error reply, but it needs to be
index e6109db847c494086691614cc555038888ab6bf5..4709ee325309d39ee68a41159548ba7441359171 100644 (file)
@@ -446,7 +446,7 @@ static int processMultiBulkItem(redisReader *r) {
     long elements;
     int root = 0;
 
-    /* Set error for nested multi bulks with depth > 2 */
+    /* Set error for nested multi bulks with depth > 7 */
     if (r->ridx == 8) {
         __redisReaderSetError(r,REDIS_ERR_PROTOCOL,
             "No support for nested multi bulk replies with depth > 7");
@@ -564,6 +564,7 @@ redisReader *redisReaderCreate(void) {
     r->errstr[0] = '\0';
     r->fn = &defaultFunctions;
     r->buf = sdsempty();
+    r->maxbuf = REDIS_READER_MAX_BUF;
     if (r->buf == NULL) {
         free(r);
         return NULL;
@@ -590,9 +591,8 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
 
     /* Copy the provided buffer. */
     if (buf != NULL && len >= 1) {
-#if 0
         /* Destroy internal buffer when it is empty and is quite large. */
-        if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
+        if (r->len == 0 && r->maxbuf != 0 && sdsavail(r->buf) > r->maxbuf) {
             sdsfree(r->buf);
             r->buf = sdsempty();
             r->pos = 0;
@@ -600,7 +600,6 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
             /* r->buf should not be NULL since we just free'd a larger one. */
             assert(r->buf != NULL);
         }
-#endif
 
         newbuf = sdscatlen(r->buf,buf,len);
         if (newbuf == NULL) {
index a73f50e957b916883e087b6067ee8b79cc1eec3c..b922831e3e23232d28c79419c28fbdaeb4a2eff5 100644 (file)
@@ -76,6 +76,9 @@
 /* Flag that is set when the async context has one or more subscriptions. */
 #define REDIS_SUBSCRIBED 0x20
 
+/* Flag that is set when monitor mode is active */
+#define REDIS_MONITORING 0x40
+
 #define REDIS_REPLY_STRING 1
 #define REDIS_REPLY_ARRAY 2
 #define REDIS_REPLY_INTEGER 3
@@ -83,6 +86,8 @@
 #define REDIS_REPLY_STATUS 5
 #define REDIS_REPLY_ERROR 6
 
+#define REDIS_READER_MAX_BUF (1024*16)  /* Default max unused reader buffer. */
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -122,6 +127,7 @@ typedef struct redisReader {
     char *buf; /* Read buffer */
     size_t pos; /* Buffer cursor */
     size_t len; /* Buffer length */
+    size_t maxbuf; /* Max length of unused buffer */
 
     redisReadTask rstack[9];
     int ridx; /* Index of current read task */
index 158e1dd8af8ce37885dc12699e1bd12c0fe1f588..82ab2b468c8ebfdd8e6ca2fb62bd65a9d56892c7 100644 (file)
@@ -45,6 +45,8 @@
 #include <errno.h>
 #include <stdarg.h>
 #include <stdio.h>
+#include <poll.h>
+#include <limits.h>
 
 #include "net.h"
 #include "sds.h"
@@ -121,28 +123,38 @@ static int redisSetTcpNoDelay(redisContext *c, int fd) {
     return REDIS_OK;
 }
 
+#define __MAX_MSEC (((LONG_MAX) - 999) / 1000)
+
 static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) {
-    struct timeval to;
-    struct timeval *toptr = NULL;
-    fd_set wfd;
+    struct pollfd   wfd[1];
+    long msec;
+
+    msec          = -1;
+    wfd[0].fd     = fd;
+    wfd[0].events = POLLOUT;
 
     /* Only use timeout when not NULL. */
     if (timeout != NULL) {
-        to = *timeout;
-        toptr = &to;
+        if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) {
+            close(fd);
+            return REDIS_ERR;
+        }
+
+        msec = (timeout->tv_sec * 1000) + ((timeout->tv_usec + 999) / 1000);
+
+        if (msec < 0 || msec > INT_MAX) {
+            msec = INT_MAX;
+        }
     }
 
     if (errno == EINPROGRESS) {
-        FD_ZERO(&wfd);
-        FD_SET(fd, &wfd);
+        int res;
 
-        if (select(FD_SETSIZE, NULL, &wfd, NULL, toptr) == -1) {
-            __redisSetErrorFromErrno(c,REDIS_ERR_IO,"select(2)");
+        if ((res = poll(wfd, 1, msec)) == -1) {
+            __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)");
             close(fd);
             return REDIS_ERR;
-        }
-
-        if (!FD_ISSET(fd, &wfd)) {
+        } else if (res == 0) {
             errno = ETIMEDOUT;
             __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
             close(fd);
index f5e15f69d22678f28289f249ce6618e18be6097d..97aea334b4eb759b9aee49826a200ff5530de971 100644 (file)
@@ -196,6 +196,21 @@ slave-read-only yes
 #
 # repl-timeout 60
 
+# The slave priority is an integer number published by Redis in the INFO output.
+# It is used by Redis Sentinel in order to select a slave to promote into a
+# master if the master is no longer working correctly.
+#
+# A slave with a low priority number is considered better for promotion, so
+# for instance if there are three slaves with priority 10, 100, 25 Sentinel will
+# pick the one wtih priority 10, that is the lowest.
+#
+# However a special priority of 0 marks the slave as not able to perform the
+# role of master, so a slave with priority of 0 will never be selected by
+# Redis Sentinel for promotion.
+#
+# By default the priority is 100.
+slave-priority 100
+
 ################################## SECURITY ###################################
 
 # Require clients to issue AUTH <PASSWORD> before processing any other
diff --git a/sentinel.conf b/sentinel.conf
new file mode 100644 (file)
index 0000000..94169ee
--- /dev/null
@@ -0,0 +1,150 @@
+# Example sentinel.conf
+
+# port <sentinel-port>
+# The port that this sentinel instance will run on
+port 26379
+
+# sentinel monitor <master-name> <ip> <redis-port> <quorum>
+#
+# Tells Sentinel to monitor this slave, and to consider it in O_DOWN
+# (Objectively Down) state only if at least <quorum> sentinels agree.
+#
+# Note: master name should not include special characters or spaces.
+# The valid charset is A-z 0-9 and the three characters ".-_".
+sentinel monitor mymaster 127.0.0.1 6379 2
+
+# sentinel auth-pass <master-name> <password>
+#
+# Set the password to use to authenticate with the master and slaves.
+# Useful if there is a password set in the Redis instances to monitor.
+#
+# Note that the master password is also used for slaves, so it is not
+# possible to set a different password in masters and slaves instances
+# if you want to be able to monitor these instances with Sentinel.
+#
+# However you can have Redis instances without the authentication enabled
+# mixed with Redis instances requiring the authentication (as long as the
+# password set is the same for all the instances requiring the password) as
+# the AUTH command will have no effect in Redis instances with authentication
+# switched off.
+#
+# Example:
+#
+# sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
+
+# sentinel down-after-milliseconds <master-name> <milliseconds>
+#
+# Number of milliseconds the master (or any attached slave or sentinel) should
+# be unreachable (as in, not acceptable reply to PING, continuously, for the
+# specified period) in order to consider it in S_DOWN state (Subjectively
+# Down).
+#
+# Default is 30 seconds.
+sentinel down-after-milliseconds mymaster 30000
+
+# sentinel can-failover <master-name> <yes|no>
+#
+# Specify if this Sentinel can start the failover for this master.
+sentinel can-failover mymaster yes
+
+# sentinel parallel-syncs <master-name> <numslaves>
+#
+# How many slaves we can reconfigure to point to the new slave simultaneously
+# during the failover. Use a low number if you use the slaves to serve query
+# to avoid that all the slaves will be unreachable at about the same
+# time while performing the synchronization with the master.
+sentinel parallel-syncs mymaster 1
+
+# sentinel failover-timeout <master-name> <milliseconds>
+#
+# Specifies the failover timeout in milliseconds. When this time has elapsed
+# without any progress in the failover process, it is considered concluded by
+# the sentinel even if not all the attached slaves were correctly configured
+# to replicate with the new master (however a "best effort" SLAVEOF command
+# is sent to all the slaves before).
+#
+# Also when 25% of this time has elapsed without any advancement, and there
+# is a leader switch (the sentinel did not started the failover but is now
+# elected as leader), the sentinel will continue the failover doing a
+# "takeover".
+#
+# Default is 15 minutes.
+sentinel failover-timeout mymaster 900000
+
+# SCRIPTS EXECTION
+#
+# sentinel notification-script and sentinel reconfig-script are used in order
+# to configure scripts that are called to notify the system administrator
+# or to reconfigure clients after a failover. The scripts are executed
+# with the following rules for error handling:
+#
+# If script exists with "1" the execution is retried later (up to a maximum
+# number of times currently set to 10).
+#
+# If script exists with "2" (or an higher value) the script execution is
+# not retried.
+#
+# If script terminates because it receives a signal the behavior is the same
+# as exit code 1.
+#
+# A script has a maximum running time of 60 seconds. After this limit is
+# reached the script is terminated with a SIGKILL and the execution retried.
+
+# NOTIFICATION SCRIPT
+#
+# sentinel notification-script <master-name> <script-path>
+# 
+# Call the specified notification script for any sentienl event that is
+# generated in the WARNING level (for instance -sdown, -odown, and so forth).
+# This script should notify the system administrator via email, SMS, or any
+# other messaging system, that there is something wrong with the monitored
+# Redis systems.
+#
+# The script is called with just two arguments: the first is the event type
+# and the second the event description.
+#
+# The script must exist and be executable in order for sentinel to start if
+# this option is provided.
+#
+# Example:
+#
+# sentinel notification-script mymaster /var/redis/notify.sh
+
+# CLIENTS RECONFIGURATION SCRIPT
+#
+# sentinel client-reconfig-script <master-name> <script-path>
+#
+# When the failover starts, ends, or is aborted, a script can be called in
+# order to perform application-specific tasks to notify the clients that the
+# configuration has changed and the master is at a different address.
+# 
+# The script is called in the following cases:
+#
+# Failover started (a slave is already promoted)
+# Failover finished (all the additional slaves already reconfigured)
+# Failover aborted (in that case the script was previously called when the
+#                   failover started, and now gets called again with swapped
+#                   addresses).
+#
+# The following arguments are passed to the script:
+#
+# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
+#
+# <state> is "start", "end" or "abort"
+# <role> is either "leader" or "observer"
+# 
+# The arguments from-ip, from-port, to-ip, to-port are used to communicate
+# the old address of the master and the new address of the elected slave
+# (now a master) in the case state is "start" or "end".
+#
+# For abort instead the "from" is the address of the promoted slave and
+# "to" is the address of the original master address, since the failover
+# was aborted.
+#
+# This script should be resistant to multiple invocations.
+#
+# Example:
+#
+# sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
+
+
index 7c21632e90147fa06909c9203778d3469a065ab7..204a27148437fd936444c3a09a72d1bce6e4af01 100644 (file)
@@ -78,6 +78,7 @@ endif
 
 REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS)
 REDIS_LD=$(QUIET_LINK)$(CC) $(FINAL_LDFLAGS)
+REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL)
 
 PREFIX?=/usr/local
 INSTALL_BIN= $(PREFIX)/bin
@@ -93,10 +94,12 @@ ENDCOLOR="\033[0m"
 ifndef V
 QUIET_CC = @printf '    %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR) 1>&2;
 QUIET_LINK = @printf '    %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) 1>&2;
+QUIET_INSTALL = @printf '    %b %b\n' $(LINKCOLOR)INSTALL$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) 1>&2;
 endif
 
 REDIS_SERVER_NAME= redis-server
-REDIS_SERVER_OBJ= adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o migrate.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o
+REDIS_SENTINEL_NAME= redis-sentinel
+REDIS_SERVER_OBJ= adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o migrate.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o
 REDIS_CLI_NAME= redis-cli
 REDIS_CLI_OBJ= anet.o sds.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o
 REDIS_BENCHMARK_NAME= redis-benchmark
@@ -106,7 +109,7 @@ REDIS_CHECK_DUMP_OBJ= redis-check-dump.o lzf_c.o lzf_d.o crc64.o
 REDIS_CHECK_AOF_NAME= redis-check-aof
 REDIS_CHECK_AOF_OBJ= redis-check-aof.o
 
-all: $(REDIS_SERVER_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME)
+all: $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME)
        @echo ""
        @echo "Hint: To run 'make test' is a good idea ;)"
        @echo ""
@@ -151,7 +154,11 @@ endif
 
 # redis-server
 $(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ)
-       $(REDIS_LD) -o $@ $^ ../deps/lua/src/liblua.a $(FINAL_LIBS)
+       $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a $(FINAL_LIBS)
+
+# redis-sentinel
+$(REDIS_SENTINEL_NAME): $(REDIS_SERVER_NAME)
+       $(REDIS_INSTALL) $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME)
 
 # redis-cli
 $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ)
@@ -176,7 +183,7 @@ $(REDIS_CHECK_AOF_NAME): $(REDIS_CHECK_AOF_OBJ)
        $(REDIS_CC) -c $<
 
 clean:
-       rm -rf $(REDIS_SERVER_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html
+       rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_DUMP_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html
 
 .PHONY: clean
 
@@ -217,8 +224,8 @@ src/help.h:
 
 install: all
        mkdir -p $(INSTALL_BIN)
-       $(INSTALL) $(REDIS_SERVER_NAME) $(INSTALL_BIN)
-       $(INSTALL) $(REDIS_BENCHMARK_NAME) $(INSTALL_BIN)
-       $(INSTALL) $(REDIS_CLI_NAME) $(INSTALL_BIN)
-       $(INSTALL) $(REDIS_CHECK_DUMP_NAME) $(INSTALL_BIN)
-       $(INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN)
+       $(REDIS_INSTALL) $(REDIS_SERVER_NAME) $(INSTALL_BIN)
+       $(REDIS_INSTALL) $(REDIS_BENCHMARK_NAME) $(INSTALL_BIN)
+       $(REDIS_INSTALL) $(REDIS_CLI_NAME) $(INSTALL_BIN)
+       $(REDIS_INSTALL) $(REDIS_CHECK_DUMP_NAME) $(INSTALL_BIN)
+       $(REDIS_INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN)
index ba53b456805ad3b33910f96e57a2a2e8490cddd2..d2faed326307f8ea12fdb5c718a676b7c6c257a9 100644 (file)
--- a/src/ae.c
+++ b/src/ae.c
@@ -37,6 +37,7 @@
 #include <stdlib.h>
 #include <poll.h>
 #include <string.h>
+#include <time.h>
 
 #include "ae.h"
 #include "zmalloc.h"
@@ -67,6 +68,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
     eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
     if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
     eventLoop->setsize = setsize;
+    eventLoop->lastTime = time(NULL);
     eventLoop->timeEventHead = NULL;
     eventLoop->timeEventNextId = 0;
     eventLoop->stop = 0;
@@ -236,6 +238,24 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
     int processed = 0;
     aeTimeEvent *te;
     long long maxId;
+    time_t now = time(NULL);
+
+    /* If the system clock is moved to the future, and then set back to the
+     * right value, time events may be delayed in a random way. Often this
+     * means that scheduled operations will not be performed soon enough.
+     *
+     * Here we try to detect system clock skews, and force all the time
+     * events to be processed ASAP when this happens: the idea is that
+     * processing events earlier is less dangerous than delaying them
+     * indefinitely, and practice suggests it is. */
+    if (now < eventLoop->lastTime) {
+        te = eventLoop->timeEventHead;
+        while(te) {
+            te->when_sec = 0;
+            te = te->next;
+        }
+    }
+    eventLoop->lastTime = now;
 
     te = eventLoop->timeEventHead;
     maxId = eventLoop->timeEventNextId-1;
index e1dccfc76a684a50b7aafb06809348bb0a9b98e7..f52a075e5fa616c3363536f7054e909e5c6b1b9b 100644 (file)
--- a/src/ae.h
+++ b/src/ae.h
@@ -88,6 +88,7 @@ typedef struct aeEventLoop {
     int maxfd;   /* highest file descriptor currently registered */
     int setsize; /* max number of file descriptors tracked */
     long long timeEventNextId;
+    time_t lastTime;     /* Used to detect system clock skew */
     aeFileEvent *events; /* Registered events */
     aeFiredEvent *fired; /* Fired events */
     aeTimeEvent *timeEventHead;
index cac10d67f7c01c6936b897e7a3bdc2985463edcf..0231f24358f13b16f2cb675937228cb0e7006794 100644 (file)
@@ -89,6 +89,8 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
 
             if (e->events & EPOLLIN) mask |= AE_READABLE;
             if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
+            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
+            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
             eventLoop->fired[j].fd = e->data.fd;
             eventLoop->fired[j].mask = mask;
         }
index 434d945c7d850edd8b8a8934d254bbc88e5d62d0..4b52425cf8166754cecabd5877b5622b32619e2b 100644 (file)
@@ -367,3 +367,18 @@ int anetPeerToString(int fd, char *ip, int *port) {
     if (port) *port = ntohs(sa.sin_port);
     return 0;
 }
+
+int anetSockName(int fd, char *ip, int *port) {
+    struct sockaddr_in sa;
+    socklen_t salen = sizeof(sa);
+
+    if (getsockname(fd,(struct sockaddr*)&sa,&salen) == -1) {
+        *port = 0;
+        ip[0] = '?';
+        ip[1] = '\0';
+        return -1;
+    }
+    if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
+    if (port) *port = ntohs(sa.sin_port);
+    return 0;
+}
index 09bfb04924520db75201a66382cf409f24a5b088..441ccaf18802ef9139efd2c38169a3c39a57799d 100644 (file)
--- a/src/aof.c
+++ b/src/aof.c
@@ -250,6 +250,13 @@ void flushAppendOnlyFile(int force) {
                                    strerror(errno),
                                    (long)nwritten,
                                    (long)sdslen(server.aof_buf));
+
+            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
+                redisLog(REDIS_WARNING, "Could not remove short write "
+                         "from the append-only file.  Redis may refuse "
+                         "to load the AOF the next time it starts.  "
+                         "ftruncate: %s", strerror(errno));
+            }
         }
         exit(1);
     }
@@ -1093,6 +1100,8 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
             server.aof_buf = sdsempty();
         }
 
+        server.aof_lastbgrewrite_status = REDIS_OK;
+
         redisLog(REDIS_NOTICE, "Background AOF rewrite finished successfully");
         /* Change state from WAIT_REWRITE to ON if needed */
         if (server.aof_state == REDIS_AOF_WAIT_REWRITE)
@@ -1104,9 +1113,13 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
         redisLog(REDIS_VERBOSE,
             "Background AOF rewrite signal handler took %lldus", ustime()-now);
     } else if (!bysignal && exitcode != 0) {
+        server.aof_lastbgrewrite_status = REDIS_ERR;
+
         redisLog(REDIS_WARNING,
             "Background AOF rewrite terminated with error");
     } else {
+        server.aof_lastbgrewrite_status = REDIS_ERR;
+
         redisLog(REDIS_WARNING,
             "Background AOF rewrite terminated by signal %d", bysignal);
     }
index 00192b92b14fc13194ded59143b1f2a9582e8871..39d24ab7d43182c8d0bfe58f72d8edb32969ca44 100644 (file)
@@ -114,14 +114,14 @@ void setbitCommand(redisClient *c) {
     o->ptr = sdsgrowzero(o->ptr,byte+1);
 
     /* Get current values */
-    byteval = ((char*)o->ptr)[byte];
+    byteval = ((uint8_t*)o->ptr)[byte];
     bit = 7 - (bitoffset & 0x7);
     bitval = byteval & (1 << bit);
 
     /* Update byte with new bit value and return original value */
     byteval &= ~(1 << bit);
     byteval |= ((on & 0x1) << bit);
-    ((char*)o->ptr)[byte] = byteval;
+    ((uint8_t*)o->ptr)[byte] = byteval;
     signalModifiedKey(c->db,c->argv[1]);
     server.dirty++;
     addReply(c, bitval ? shared.cone : shared.czero);
@@ -148,7 +148,7 @@ void getbitCommand(redisClient *c) {
             bitval = llbuf[byte] & (1 << bit);
     } else {
         if (byte < sdslen(o->ptr))
-            bitval = ((char*)o->ptr)[byte] & (1 << bit);
+            bitval = ((uint8_t*)o->ptr)[byte] & (1 << bit);
     }
 
     addReply(c, bitval ? shared.cone : shared.czero);
@@ -327,10 +327,9 @@ void bitopCommand(redisClient *c) {
 /* BITCOUNT key [start end] */
 void bitcountCommand(redisClient *c) {
     robj *o;
-    long start, end;
+    long start, end, strlen;
     unsigned char *p;
     char llbuf[32];
-    size_t strlen;
 
     /* Lookup, check for type, and return 0 for non existing keys. */
     if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
@@ -357,7 +356,7 @@ void bitcountCommand(redisClient *c) {
         if (end < 0) end = strlen+end;
         if (start < 0) start = 0;
         if (end < 0) end = 0;
-        if ((unsigned)end >= strlen) end = strlen-1;
+        if (end >= strlen) end = strlen-1;
     } else if (c->argc == 2) {
         /* The whole string. */
         start = 0;
index c2ea5b76816c0fcb278e5bb2572e2f6a0c4a9acb..a36eb9a392b0ffcbd1cdfac502409d9e7e024e4f 100644 (file)
@@ -264,6 +264,10 @@ void loadServerConfigFromString(char *config) {
         {
             server.aof_rewrite_min_size = memtoll(argv[1],NULL);
         } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
+            if (strlen(argv[1]) > REDIS_AUTHPASS_MAX_LEN) {
+                err = "Password is longer than REDIS_AUTHPASS_MAX_LEN";
+                goto loaderr;
+            }
             server.requirepass = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
             zfree(server.pidfile);
@@ -343,6 +347,19 @@ void loadServerConfigFromString(char *config) {
             if ((server.stop_writes_on_bgsave_err = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
             }
+        } else if (!strcasecmp(argv[0],"slave-priority") && argc == 2) {
+            server.slave_priority = atoi(argv[1]);
+        } else if (!strcasecmp(argv[0],"sentinel")) {
+            /* argc == 1 is handled by main() as we need to enter the sentinel
+             * mode ASAP. */
+            if (argc != 1) {
+                if (!server.sentinel_mode) {
+                    err = "sentinel directive while not in sentinel mode";
+                    goto loaderr;
+                }
+                err = sentinelHandleConfiguration(argv+1,argc-1);
+                if (err) goto loaderr;
+            }
         } else {
             err = "Bad directive or wrong number of arguments"; goto loaderr;
         }
@@ -411,6 +428,7 @@ void configSetCommand(redisClient *c) {
         zfree(server.rdb_filename);
         server.rdb_filename = zstrdup(o->ptr);
     } else if (!strcasecmp(c->argv[2]->ptr,"requirepass")) {
+        if (sdslen(o->ptr) > REDIS_AUTHPASS_MAX_LEN) goto badfmt;
         zfree(server.requirepass);
         server.requirepass = ((char*)o->ptr)[0] ? zstrdup(o->ptr) : NULL;
     } else if (!strcasecmp(c->argv[2]->ptr,"masterauth")) {
@@ -420,7 +438,12 @@ void configSetCommand(redisClient *c) {
         if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
             ll < 0) goto badfmt;
         server.maxmemory = ll;
-        if (server.maxmemory) freeMemoryIfNeeded();
+        if (server.maxmemory) {
+            if (server.maxmemory < zmalloc_used_memory()) {
+                redisLog(REDIS_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in keys eviction and/or inability to accept new write commands depending on the maxmemory-policy.");
+            }
+            freeMemoryIfNeeded();
+        }
     } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory-policy")) {
         if (!strcasecmp(o->ptr,"volatile-lru")) {
             server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
@@ -643,6 +666,10 @@ void configSetCommand(redisClient *c) {
 
         if (yn == -1) goto badfmt;
         server.rdb_checksum = yn;
+    } else if (!strcasecmp(c->argv[2]->ptr,"slave-priority")) {
+        if (getLongLongFromObject(o,&ll) == REDIS_ERR ||
+            ll <= 0) goto badfmt;
+        server.slave_priority = ll;
     } else {
         addReplyErrorFormat(c,"Unsupported CONFIG parameter: %s",
             (char*)c->argv[2]->ptr);
@@ -732,6 +759,7 @@ void configGetCommand(redisClient *c) {
     config_get_numerical_field("repl-timeout",server.repl_timeout);
     config_get_numerical_field("maxclients",server.maxclients);
     config_get_numerical_field("watchdog-period",server.watchdog_period);
+    config_get_numerical_field("slave-priority",server.slave_priority);
 
     /* Bool (yes/no) values */
     config_get_bool_field("no-appendfsync-on-rewrite",
index 28ef37d6ed01043a6000856de653608604218b51..617682fcf45247d07ae734f3724cc0a4fde600a5 100644 (file)
 #define aof_fsync fsync
 #endif
 
+/* Define rdb_fsync_range to sync_file_range() on Linux, otherwise we use
+ * the plain fsync() call. */
+#ifdef __linux__
+#define rdb_fsync_range(fd,off,size) sync_file_range(fd,off,size,SYNC_FILE_RANGE_WAIT_BEFORE|SYNC_FILE_RANGE_WRITE)
+#else
+#define rdb_fsync_range(fd,off,size) fsync(fd)
+#endif
+
 /* Byte ordering detection */
 #include <sys/types.h> /* This will likely define BYTE_ORDER */
 
index e65106a583c1117c197b6a064537f1cb25d9936b..e78b0d535bd15d13866802ae59a24567476e9a77 100644 (file)
--- a/src/db.c
+++ b/src/db.c
@@ -226,7 +226,11 @@ void existsCommand(redisClient *c) {
 }
 
 void selectCommand(redisClient *c) {
-    int id = atoi(c->argv[1]->ptr);
+    long id;
+
+    if (getLongFromObjectOrReply(c, c->argv[1], &id,
+        "invalid DB index") != REDIS_OK)
+        return;
 
     if (selectDb(c,id) == REDIS_ERR) {
         addReplyError(c,"invalid DB index");
index 4687fb6c072ff17e81070da0cd8f9f699ee7138a..566b2b959cd2d6c7a0fdae950c4b3221ea4d8b1a 100644 (file)
@@ -218,6 +218,10 @@ void computeDatasetDigest(unsigned char *final) {
 void debugCommand(redisClient *c) {
     if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
         *((char*)-1) = 'x';
+    } else if (!strcasecmp(c->argv[1]->ptr,"oom")) {
+        void *ptr = zmalloc(ULONG_MAX); /* Should trigger an out of memory. */
+        zfree(ptr);
+        addReply(c,shared.ok);
     } else if (!strcasecmp(c->argv[1]->ptr,"assert")) {
         if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]);
         redisAssertWithInfo(c,c->argv[0],1 == 2);
@@ -686,6 +690,30 @@ void sigsegvHandler(int sig, siginfo_t *info, void *secret) {
 }
 #endif /* HAVE_BACKTRACE */
 
+/* ==================== Logging functions for debugging ===================== */
+
+void redisLogHexDump(int level, char *descr, void *value, size_t len) {
+    char buf[65], *b;
+    unsigned char *v = value;
+    char charset[] = "0123456789abcdef";
+
+    redisLog(level,"%s (hexdump):", descr);
+    b = buf;
+    while(len) {
+        b[0] = charset[(*v)>>4];
+        b[1] = charset[(*v)&0xf];
+        b[2] = '\0';
+        b += 2;
+        len--;
+        v++;
+        if (b-buf == 64 || len == 0) {
+            redisLogRaw(level|REDIS_LOG_RAW,buf);
+            b = buf;
+        }
+    }
+    redisLogRaw(level|REDIS_LOG_RAW,"\n");
+}
+
 /* =========================== Software Watchdog ============================ */
 #include <sys/time.h>
 
index 69656734c6085a9e28b51eb75df218e3dd71da7a..ec58e820073065dbee96d69a1d1a8135f2ef3f7a 100644 (file)
@@ -85,29 +85,73 @@ unsigned int dictIdentityHashFunction(unsigned int key)
     return key;
 }
 
-static int dict_hash_function_seed = 5381;
+static uint32_t dict_hash_function_seed = 5381;
 
-void dictSetHashFunctionSeed(unsigned int seed) {
+void dictSetHashFunctionSeed(uint32_t seed) {
     dict_hash_function_seed = seed;
 }
 
-unsigned int dictGetHashFunctionSeed(void) {
+uint32_t dictGetHashFunctionSeed(void) {
     return dict_hash_function_seed;
 }
 
-/* Generic hash function (a popular one from Bernstein).
- * I tested a few and this was the best. */
-unsigned int dictGenHashFunction(const unsigned char *buf, int len) {
-    unsigned int hash = dict_hash_function_seed;
+/* MurmurHash2, by Austin Appleby
+ * Note - This code makes a few assumptions about how your machine behaves -
+ * 1. We can read a 4-byte value from any address without crashing
+ * 2. sizeof(int) == 4
+ *
+ * And it has a few limitations -
+ *
+ * 1. It will not work incrementally.
+ * 2. It will not produce the same results on little-endian and big-endian
+ *    machines.
+ */
+unsigned int dictGenHashFunction(const void *key, int len) {
+    /* 'm' and 'r' are mixing constants generated offline.
+     They're not really 'magic', they just happen to work well.  */
+    uint32_t seed = dict_hash_function_seed;
+    const uint32_t m = 0x5bd1e995;
+    const int r = 24;
 
-    while (len--)
-        hash = ((hash << 5) + hash) + (*buf++); /* hash * 33 + c */
-    return hash;
+    /* Initialize the hash to a 'random' value */
+    uint32_t h = seed ^ len;
+
+    /* Mix 4 bytes at a time into the hash */
+    const unsigned char *data = (const unsigned char *)key;
+
+    while(len >= 4) {
+        uint32_t k = *(uint32_t*)data;
+
+        k *= m;
+        k ^= k >> r;
+        k *= m;
+
+        h *= m;
+        h ^= k;
+
+        data += 4;
+        len -= 4;
+    }
+
+    /* Handle the last few bytes of the input array  */
+    switch(len) {
+    case 3: h ^= data[2] << 16;
+    case 2: h ^= data[1] << 8;
+    case 1: h ^= data[0]; h *= m;
+    };
+
+    /* Do a few final mixes of the hash to ensure the last few
+     * bytes are well-incorporated. */
+    h ^= h >> 13;
+    h *= m;
+    h ^= h >> 15;
+
+    return (unsigned int)h;
 }
 
-/* And a case insensitive version */
+/* And a case insensitive hash function (based on djb hash) */
 unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len) {
-    unsigned int hash = dict_hash_function_seed;
+    unsigned int hash = (unsigned int)dict_hash_function_seed;
 
     while (len--)
         hash = ((hash << 5) + hash) + (tolower(*buf++)); /* hash * 33 + c */
@@ -116,8 +160,8 @@ unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len) {
 
 /* ----------------------------- API implementation ------------------------- */
 
-/* Reset an hashtable already initialized with ht_init().
- * NOTE: This function should only called by ht_destroy(). */
+/* Reset a hash table already initialized with ht_init().
+ * NOTE: This function should only be called by ht_destroy(). */
 static void _dictReset(dictht *ht)
 {
     ht->table = NULL;
@@ -162,18 +206,18 @@ int dictResize(dict *d)
     return dictExpand(d, minimal);
 }
 
-/* Expand or create the hashtable */
+/* Expand or create the hash table */
 int dictExpand(dict *d, unsigned long size)
 {
-    dictht n; /* the new hashtable */
+    dictht n; /* the new hash table */
     unsigned long realsize = _dictNextPower(size);
 
     /* the size is invalid if it is smaller than the number of
-     * elements already inside the hashtable */
+     * elements already inside the hash table */
     if (dictIsRehashing(d) || d->ht[0].used > size)
         return DICT_ERR;
 
-    /* Allocate the new hashtable and initialize all pointers to NULL */
+    /* Allocate the new hash table and initialize all pointers to NULL */
     n.size = realsize;
     n.sizemask = realsize-1;
     n.table = zcalloc(realsize*sizeof(dictEntry*));
@@ -280,7 +324,7 @@ int dictAdd(dict *d, void *key, void *val)
  * a value returns the dictEntry structure to the user, that will make
  * sure to fill the value field as he wishes.
  *
- * This function is also directly expoed to user API to be called
+ * This function is also directly exposed to user API to be called
  * mainly in order to store non-pointers inside the hash value, example:
  *
  * entry = dictAddRaw(dict,mykey);
@@ -607,7 +651,7 @@ static int _dictKeyIndex(dict *d, const void *key)
     unsigned int h, idx, table;
     dictEntry *he;
 
-    /* Expand the hashtable if needed */
+    /* Expand the hash table if needed */
     if (_dictExpandIfNeeded(d) == DICT_ERR)
         return -1;
     /* Compute the key hash value */
index 5f85695354471b29ab074f974368a673a5bfc45b..f480ae539232fbe374feeb96661a3088d8b12e20 100644 (file)
@@ -155,7 +155,7 @@ dictEntry *dictNext(dictIterator *iter);
 void dictReleaseIterator(dictIterator *iter);
 dictEntry *dictGetRandomKey(dict *d);
 void dictPrintStats(dict *d);
-unsigned int dictGenHashFunction(const unsigned char *buf, int len);
+unsigned int dictGenHashFunction(const void *key, int len);
 unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len);
 void dictEmpty(dict *d);
 void dictEnableResize(void);
index 866a9afa816163dfc54415619367a5ea8759a59e..3e54876575d44b53ec7dcf8c077e39c9f39afcdf 100644 (file)
@@ -3,6 +3,10 @@
 
 #define _BSD_SOURCE
 
+#if defined(__linux__)
+#define _GNU_SOURCE
+#endif
+
 #if defined(__linux__) || defined(__OpenBSD__)
 #define _XOPEN_SOURCE 700
 #else
index f922e2975133d0deaf438c633a06a03dddc41855..3bc084f7d5fc4d5f4c7c0cbf61928ff011fae8fe 100644 (file)
@@ -55,6 +55,7 @@ redisClient *createClient(int fd) {
     c->ctime = c->lastinteraction = server.unixtime;
     c->authenticated = 0;
     c->replstate = REDIS_REPL_NONE;
+    c->slave_listening_port = 0;
     c->reply = listCreate();
     c->reply_bytes = 0;
     c->obuf_soft_limit_reached_time = 0;
@@ -364,7 +365,10 @@ void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
 
         /* Only glue when the next node is non-NULL (an sds in this case) */
         if (next->ptr != NULL) {
+            c->reply_bytes -= zmalloc_size_sds(len->ptr);
+            c->reply_bytes -= zmalloc_size_sds(next->ptr);
             len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
+            c->reply_bytes += zmalloc_size_sds(len->ptr);
             listDelNode(c->reply,ln->next);
         }
     }
@@ -1302,6 +1306,7 @@ int checkClientOutputBufferLimits(redisClient *c) {
  * called from contexts where the client can't be freed safely, i.e. from the
  * lower level functions pushing data inside the client output buffers. */
 void asyncCloseClientOnOutputBufferLimitReached(redisClient *c) {
+    redisAssert(c->reply_bytes < ULONG_MAX-(1024*64));
     if (c->reply_bytes == 0 || c->flags & REDIS_CLOSE_ASAP) return;
     if (checkClientOutputBufferLimits(c)) {
         sds client = getClientInfoString(c);
index 90c2ea0843fef07fdc733ec3be7cd2fb67d554da..fd9fcacf2de3e855d88e5d8ec123d1a554caf4c3 100644 (file)
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -803,7 +803,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
             }
 
             /* This will also be called when the set was just converted
-             * to regular hashtable encoded set */
+             * to regular hash table encoded set */
             if (o->encoding == REDIS_ENCODING_HT) {
                 dictAdd((dict*)o->ptr,ele,NULL);
             } else {
index 19eb49152c749a00774dda1a3a4d9d5681024865..1be4c07d913dcee9937b90c2f1457e6d32d8f966 100644 (file)
@@ -263,6 +263,8 @@ static client createClient(char *cmd, size_t len) {
             fprintf(stderr,"%s: %s\n",config.hostsocket,c->context->errstr);
         exit(1);
     }
+    /* Suppress hiredis cleanup of unused buffers for max speed. */
+    c->context->reader->maxbuf = 0;
     /* Queue N requests accordingly to the pipeline size. */
     c->obuf = sdsempty();
     for (j = 0; j < config.pipeline; j++)
index f485587957dea102bda72243f4273dbae10702e4..8d20d1cd502ebfd7c53793914a3a9a5477c9db8a 100644 (file)
@@ -712,17 +712,17 @@ static void usage() {
 "  -a <password>    Password to use when connecting to the server\n"
 "  -r <repeat>      Execute specified command N times\n"
 "  -i <interval>    When -r is used, waits <interval> seconds per command.\n"
-"                   It is possible to specify sub-second times like -i 0.1.\n"
+"                   It is possible to specify sub-second times like -i 0.1\n"
 "  -n <db>          Database number\n"
 "  -x               Read last argument from STDIN\n"
 "  -d <delimiter>   Multi-bulk delimiter in for raw formatting (default: \\n)\n"
 "  -c               Enable cluster mode (follow -ASK and -MOVED redirections)\n"
 "  --raw            Use raw formatting for replies (default when STDOUT is not a tty)\n"
-"  --latency        Enter a special mode continuously sampling latency.\n"
-"  --slave          Simulate a slave showing commands received from the master.\n"
-"  --pipe           Transfer raw Redis protocol from stdin to server.\n"
-"  --bigkeys        Sample Redis keys looking for big keys.\n"
-"  --eval <file>    Send an EVAL command using the Lua script at <file>.\n"
+"  --latency        Enter a special mode continuously sampling latency\n"
+"  --slave          Simulate a slave showing commands received from the master\n"
+"  --pipe           Transfer raw Redis protocol from stdin to server\n"
+"  --bigkeys        Sample Redis keys looking for big keys\n"
+"  --eval <file>    Send an EVAL command using the Lua script at <file>\n"
 "  --help           Output this help and exit\n"
 "  --version        Output version and exit\n"
 "\n"
@@ -1233,7 +1233,7 @@ int main(int argc, char **argv) {
 
     /* Pipe mode */
     if (config.pipe_mode) {
-        cliConnect(0);
+        if (cliConnect(0) == REDIS_ERR) exit(1);
         pipeMode();
     }
 
index 46915c1302fc71a4b4de9b9bd884fdcefdcaaf03..aa5a73f2abae195632463461416acf60c5c3f887 100644 (file)
@@ -106,6 +106,10 @@ struct redisCommand *commandTable;
  *    results. For instance SPOP and RANDOMKEY are two random commands.
  * S: Sort command output array if called from script, so that the output
  *    is deterministic.
+ * l: Allow command while loading the database.
+ * t: Allow command while a slave has stale data but is not allowed to
+ *    server this data. Normally no command is accepted in this condition
+ *    but just a few.
  */
 struct redisCommand redisCommandTable[] = {
     {"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
@@ -148,7 +152,7 @@ struct redisCommand redisCommandTable[] = {
     {"sismember",sismemberCommand,3,"r",0,NULL,1,1,1,0,0},
     {"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0},
     {"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0},
-    {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0},
+    {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
     {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
     {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
     {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
@@ -215,22 +219,23 @@ struct redisCommand redisCommandTable[] = {
     {"exec",execCommand,1,"s",0,NULL,0,0,0,0,0},
     {"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
     {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
+    {"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0},
     {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
     {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
-    {"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0},
-    {"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0},
+    {"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0},
+    {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0},
     {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
     {"ttl",ttlCommand,2,"r",0,NULL,1,1,1,0,0},
     {"pttl",pttlCommand,2,"r",0,NULL,1,1,1,0,0},
     {"persist",persistCommand,2,"w",0,NULL,1,1,1,0,0},
-    {"slaveof",slaveofCommand,3,"as",0,NULL,0,0,0,0,0},
+    {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
     {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
     {"config",configCommand,-2,"ar",0,NULL,0,0,0,0,0},
-    {"subscribe",subscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
-    {"unsubscribe",unsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
-    {"psubscribe",psubscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
-    {"punsubscribe",punsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
-    {"publish",publishCommand,3,"pf",0,NULL,0,0,0,0,0},
+    {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
+    {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
+    {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
+    {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
+    {"publish",publishCommand,3,"pflt",0,NULL,0,0,0,0,0},
     {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
     {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
     {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
@@ -327,17 +332,6 @@ err:
     if (server.logfile) close(fd);
 }
 
-/* Redis generally does not try to recover from out of memory conditions
- * when allocating objects or strings, it is not clear if it will be possible
- * to report this condition to the client since the networking layer itself
- * is based on heap allocation for send buffers, so we simply abort.
- * At least the code will be simpler to read... */
-void oom(const char *msg) {
-    redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
-    sleep(1);
-    abort();
-}
-
 /* Return the UNIX time in microseconds */
 long long ustime(void) {
     struct timeval tv;
@@ -803,13 +797,8 @@ void clientsCron(void) {
  * a macro is used: run_with_period(milliseconds) { .... }
  */
 
-/* Using the following macro you can run code inside serverCron() with the
- * specified period, specified in milliseconds.
- * The actual resolution depends on REDIS_HZ. */
-#define run_with_period(_ms_) if (!(loops % ((_ms_)/(1000/REDIS_HZ))))
-
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
-    int j, loops = server.cronloops;
+    int j;
     REDIS_NOTUSED(eventLoop);
     REDIS_NOTUSED(id);
     REDIS_NOTUSED(clientData);
@@ -878,11 +867,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     }
 
     /* Show information about connected clients */
-    run_with_period(5000) {
-        redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
-            listLength(server.clients)-listLength(server.slaves),
-            listLength(server.slaves),
-            zmalloc_used_memory());
+    if (!server.sentinel_mode) {
+        run_with_period(5000) {
+            redisLog(REDIS_VERBOSE,
+                "%d clients connected (%d slaves), %zu bytes in use",
+                listLength(server.clients)-listLength(server.slaves),
+                listLength(server.slaves),
+                zmalloc_used_memory());
+        }
     }
 
     /* We need to do a few operations on clients asynchronously. */
@@ -962,6 +954,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * to detect transfer failures. */
     run_with_period(1000) replicationCron();
 
+    /* Run the sentinel timer if we are in sentinel mode. */
+    run_with_period(100) {
+        if (server.sentinel_mode) sentinelTimer();
+    }
+
     server.cronloops++;
     return 1000/REDIS_HZ;
 }
@@ -1052,6 +1049,7 @@ void createSharedObjects(void) {
     shared.del = createStringObject("DEL",3);
     shared.rpop = createStringObject("RPOP",4);
     shared.lpop = createStringObject("LPOP",4);
+    shared.lpush = createStringObject("LPUSH",5);
     for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
         shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
         shared.integers[j]->encoding = REDIS_ENCODING_INT;
@@ -1095,6 +1093,7 @@ void initServerConfig() {
     server.aof_last_fsync = time(NULL);
     server.aof_rewrite_time_last = -1;
     server.aof_rewrite_time_start = -1;
+    server.aof_lastbgrewrite_status = REDIS_OK;
     server.aof_delayed_fsync = 0;
     server.aof_fd = -1;
     server.aof_selected_db = -1; /* Make sure the first time will not match */
@@ -1142,6 +1141,7 @@ void initServerConfig() {
     server.repl_serve_stale_data = 1;
     server.repl_slave_ro = 1;
     server.repl_down_since = time(NULL);
+    server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
 
     /* Client output buffer limits */
     server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
@@ -1168,6 +1168,8 @@ void initServerConfig() {
     server.delCommand = lookupCommandByCString("del");
     server.multiCommand = lookupCommandByCString("multi");
     server.lpushCommand = lookupCommandByCString("lpush");
+    server.lpopCommand = lookupCommandByCString("lpop");
+    server.rpopCommand = lookupCommandByCString("rpop");
     
     /* Slow log */
     server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -1243,6 +1245,7 @@ void initServer() {
     server.slaves = listCreate();
     server.monitors = listCreate();
     server.unblocked_clients = listCreate();
+    server.ready_keys = listCreate();
 
     createSharedObjects();
     adjustOpenFilesLimit();
@@ -1273,6 +1276,7 @@ void initServer() {
         server.db[j].dict = dictCreate(&dbDictType,NULL);
         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
+        server.db[j].ready_keys = dictCreate(&setDictType,NULL);
         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
         server.db[j].id = j;
     }
@@ -1308,9 +1312,9 @@ void initServer() {
     server.stop_writes_on_bgsave_err = 1;
     aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
     if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
-        acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
+        acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event.");
     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
-        acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");
+        acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
 
     if (server.aof_state == REDIS_AOF_ON) {
         server.aof_fd = open(server.aof_filename,
@@ -1359,6 +1363,8 @@ void populateCommandTable(void) {
             case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
             case 'R': c->flags |= REDIS_CMD_RANDOM; break;
             case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
+            case 'l': c->flags |= REDIS_CMD_LOADING; break;
+            case 't': c->flags |= REDIS_CMD_STALE; break;
             default: redisPanic("Unsupported command flag"); break;
             }
             f++;
@@ -1577,7 +1583,7 @@ int processCommand(redisClient *c) {
         return REDIS_OK;
     }
 
-    /* Don't accept wirte commands if this is a read only slave. But
+    /* Don't accept write commands if this is a read only slave. But
      * accept write commands if this is our master. */
     if (server.masterhost && server.repl_slave_ro &&
         !(c->flags & REDIS_MASTER) &&
@@ -1602,19 +1608,20 @@ int processCommand(redisClient *c) {
      * we are a slave with a broken link with master. */
     if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
         server.repl_serve_stale_data == 0 &&
-        c->cmd->proc != infoCommand && c->cmd->proc != slaveofCommand)
+        !(c->cmd->flags & REDIS_CMD_STALE))
     {
         addReply(c, shared.masterdownerr);
         return REDIS_OK;
     }
 
-    /* Loading DB? Return an error if the command is not INFO */
-    if (server.loading && c->cmd->proc != infoCommand) {
+    /* Loading DB? Return an error if the command has not the
+     * REDIS_CMD_LOADING flag. */
+    if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
         addReply(c, shared.loadingerr);
         return REDIS_OK;
     }
 
-    /* Lua script too slow? Only allow SHUTDOWN NOSAVE and SCRIPT KILL. */
+    /* Lua script too slow? Only allow commands with REDIS_CMD_STALE flag. */
     if (server.lua_timedout &&
         !(c->cmd->proc == shutdownCommand &&
           c->argc == 2 &&
@@ -1636,6 +1643,8 @@ int processCommand(redisClient *c) {
         addReply(c,shared.queued);
     } else {
         call(c,REDIS_CALL_FULL);
+        if (listLength(server.ready_keys))
+            handleClientsBlockedOnLists();
     }
     return REDIS_OK;
 }
@@ -1698,10 +1707,52 @@ int prepareForShutdown(int flags) {
 
 /*================================== Commands =============================== */
 
+/* Return zero if strings are the same, non-zero if they are not.
+ * The comparison is performed in a way that prevents an attacker to obtain
+ * information about the nature of the strings just monitoring the execution
+ * time of the function.
+ *
+ * Note that limiting the comparison length to strings up to 512 bytes we
+ * can avoid leaking any information about the password length and any
+ * possible branch misprediction related leak.
+ */
+int time_independent_strcmp(char *a, char *b) {
+    char bufa[REDIS_AUTHPASS_MAX_LEN], bufb[REDIS_AUTHPASS_MAX_LEN];
+    /* The above two strlen perform len(a) + len(b) operations where either
+     * a or b are fixed (our password) length, and the difference is only
+     * relative to the length of the user provided string, so no information
+     * leak is possible in the following two lines of code. */
+    int alen = strlen(a);
+    int blen = strlen(b);
+    int j;
+    int diff = 0;
+
+    /* We can't compare strings longer than our static buffers.
+     * Note that this will never pass the first test in practical circumstances
+     * so there is no info leak. */
+    if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
+
+    memset(bufa,0,sizeof(bufa));        /* Constant time. */
+    memset(bufb,0,sizeof(bufb));        /* Constant time. */
+    /* Again the time of the following two copies is proportional to
+     * len(a) + len(b) so no info is leaked. */
+    memcpy(bufa,a,alen);
+    memcpy(bufb,b,blen);
+
+    /* Always compare all the chars in the two buffers without
+     * conditional expressions. */
+    for (j = 0; j < sizeof(bufa); j++) {
+        diff |= (bufa[j] ^ bufb[j]);
+    }
+    /* Length must be equal as well. */
+    diff |= alen ^ blen;
+    return diff; /* If zero strings are the same. */
+}
+
 void authCommand(redisClient *c) {
     if (!server.requirepass) {
         addReplyError(c,"Client sent AUTH, but no password is set");
-    } else if (!strcmp(c->argv[1]->ptr, server.requirepass)) {
+    } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
       c->authenticated = 1;
       addReply(c,shared.ok);
     } else {
@@ -1761,7 +1812,7 @@ sds genRedisInfoString(char *section) {
     unsigned long lol, bib;
     int allsections = 0, defsections = 0;
     int sections = 0;
-    
+
     if (section) {
         allsections = strcasecmp(section,"all") == 0;
         defsections = strcasecmp(section,"default") == 0;
@@ -1774,7 +1825,11 @@ sds genRedisInfoString(char *section) {
     /* Server */
     if (allsections || defsections || !strcasecmp(section,"server")) {
         struct utsname name;
+        char *mode;
 
+        if (server.sentinel_mode) mode = "sentinel";
+        else mode = "standalone";
+    
         if (sections++) info = sdscat(info,"\r\n");
         uname(&name);
         info = sdscatprintf(info,
@@ -1782,6 +1837,7 @@ sds genRedisInfoString(char *section) {
             "redis_version:%s\r\n"
             "redis_git_sha1:%s\r\n"
             "redis_git_dirty:%d\r\n"
+            "redis_mode:%s\r\n"
             "os:%s %s %s\r\n"
             "arch_bits:%d\r\n"
             "multiplexing_api:%s\r\n"
@@ -1795,6 +1851,7 @@ sds genRedisInfoString(char *section) {
             REDIS_VERSION,
             redisGitSHA1(),
             strtol(redisGitDirty(),NULL,10) > 0,
+            mode,
             name.sysname, name.release, name.machine,
             server.arch_bits,
             aeGetApiName(),
@@ -1870,12 +1927,13 @@ sds genRedisInfoString(char *section) {
             "aof_rewrite_in_progress:%d\r\n"
             "aof_rewrite_scheduled:%d\r\n"
             "aof_last_rewrite_time_sec:%ld\r\n"
-            "aof_current_rewrite_time_sec:%ld\r\n",
+            "aof_current_rewrite_time_sec:%ld\r\n"
+            "aof_last_bgrewrite_status:%s\r\n",
             server.loading,
             server.dirty,
             server.rdb_child_pid != -1,
             server.lastsave,
-            server.lastbgsave_status == REDIS_OK ? "ok" : "err",
+            (server.lastbgsave_status == REDIS_OK) ? "ok" : "err",
             server.rdb_save_time_last,
             (server.rdb_child_pid == -1) ?
                 -1 : time(NULL)-server.rdb_save_time_start,
@@ -1884,7 +1942,8 @@ sds genRedisInfoString(char *section) {
             server.aof_rewrite_scheduled,
             server.aof_rewrite_time_last,
             (server.aof_child_pid == -1) ?
-                -1 : time(NULL)-server.aof_rewrite_time_start);
+                -1 : time(NULL)-server.aof_rewrite_time_start,
+            (server.aof_lastbgrewrite_status == REDIS_OK) ? "ok" : "err");
 
         if (server.aof_state != REDIS_AOF_OFF) {
             info = sdscatprintf(info,
@@ -1892,7 +1951,7 @@ sds genRedisInfoString(char *section) {
                 "aof_base_size:%lld\r\n"
                 "aof_pending_rewrite:%d\r\n"
                 "aof_buffer_length:%zu\r\n"
-                "aof_rewrite_buffer_length:%zu\r\n"
+                "aof_rewrite_buffer_length:%lu\r\n"
                 "aof_pending_bio_fsync:%llu\r\n"
                 "aof_delayed_fsync:%lu\r\n",
                 (long long) server.aof_current_size,
@@ -1990,9 +2049,10 @@ sds genRedisInfoString(char *section) {
 
             if (server.repl_state == REDIS_REPL_TRANSFER) {
                 info = sdscatprintf(info,
-                    "master_sync_left_bytes:%ld\r\n"
+                    "master_sync_left_bytes:%lld\r\n"
                     "master_sync_last_io_seconds_ago:%d\r\n"
-                    ,(long)server.repl_transfer_left,
+                    , (long long)
+                        (server.repl_transfer_size - server.repl_transfer_read),
                     (int)(server.unixtime-server.repl_transfer_lastio)
                 );
             }
@@ -2002,6 +2062,8 @@ sds genRedisInfoString(char *section) {
                     "master_link_down_since_seconds:%ld\r\n",
                     (long)server.unixtime-server.repl_down_since);
             }
+            info = sdscatprintf(info,
+                "slave_priority:%d\r\n", server.slave_priority);
         }
         info = sdscatprintf(info,
             "connected_slaves:%lu\r\n",
@@ -2033,7 +2095,7 @@ sds genRedisInfoString(char *section) {
                 }
                 if (state == NULL) continue;
                 info = sdscatprintf(info,"slave%d:%s,%d,%s\r\n",
-                    slaveid,ip,port,state);
+                    slaveid,ip,slave->slave_listening_port,state);
                 slaveid++;
             }
         }
@@ -2341,21 +2403,25 @@ void usage() {
     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
     fprintf(stderr,"       ./redis-server --port 7777\n");
     fprintf(stderr,"       ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
-    fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n");
+    fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
+    fprintf(stderr,"Sentinel mode:\n");
+    fprintf(stderr,"       ./redis-server /etc/sentinel.conf --sentinel\n");
     exit(1);
 }
 
 void redisAsciiArt(void) {
 #include "asciilogo.h"
     char *buf = zmalloc(1024*16);
+    char *mode = "stand alone";
+
+    if (server.sentinel_mode) mode = "sentinel";
 
     snprintf(buf,1024*16,ascii_logo,
         REDIS_VERSION,
         redisGitSHA1(),
         strtol(redisGitDirty(),NULL,10) > 0,
         (sizeof(long) == 8) ? "64" : "32",
-        "stand alone",
-        server.port,
+        mode, server.port,
         (long) getpid()
     );
     redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf);
@@ -2393,17 +2459,60 @@ void setupSignalHandlers(void) {
 
 void memtest(size_t megabytes, int passes);
 
+/* Returns 1 if there is --sentinel among the arguments or if
+ * argv[0] is exactly "redis-sentinel". */
+int checkForSentinelMode(int argc, char **argv) {
+    int j;
+
+    if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
+    for (j = 1; j < argc; j++)
+        if (!strcmp(argv[j],"--sentinel")) return 1;
+    return 0;
+}
+
+/* Function called at startup to load RDB or AOF file in memory. */
+void loadDataFromDisk(void) {
+    long long start = ustime();
+    if (server.aof_state == REDIS_AOF_ON) {
+        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
+            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
+    } else {
+        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
+            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
+                (float)(ustime()-start)/1000000);
+        } else if (errno != ENOENT) {
+            redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
+            exit(1);
+        }
+    }
+}
+
+void redisOutOfMemoryHandler(size_t allocation_size) {
+    redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!",
+        allocation_size);
+    redisPanic("OOM");
+}
+
 int main(int argc, char **argv) {
-    long long start;
     struct timeval tv;
 
     /* We need to initialize our libraries, and the server configuration. */
     zmalloc_enable_thread_safeness();
+    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
     srand(time(NULL)^getpid());
     gettimeofday(&tv,NULL);
     dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
+    server.sentinel_mode = checkForSentinelMode(argc,argv);
     initServerConfig();
 
+    /* We need to init sentinel right now as parsing the configuration file
+     * in sentinel mode will have the effect of populating the sentinel
+     * data structures with master nodes to monitor. */
+    if (server.sentinel_mode) {
+        initSentinelConfig();
+        initSentinel();
+    }
+
     if (argc >= 2) {
         int j = 1; /* First option to parse in argv[] */
         sds options = sdsempty();
@@ -2449,33 +2558,31 @@ int main(int argc, char **argv) {
         loadServerConfig(configfile,options);
         sdsfree(options);
     } else {
-        redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
+        redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
     }
     if (server.daemonize) daemonize();
     initServer();
     if (server.daemonize) createPidFile();
     redisAsciiArt();
-    redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
-#ifdef __linux__
-    linuxOvercommitMemoryWarning();
-#endif
-    start = ustime();
-    if (server.aof_state == REDIS_AOF_ON) {
-        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
-            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
-    } else {
-        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
-            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
-                (float)(ustime()-start)/1000000);
-        } else if (errno != ENOENT) {
-            redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
-            exit(1);
-        }
+
+    if (!server.sentinel_mode) {
+        /* Things only needed when not runnign in Sentinel mode. */
+        redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
+    #ifdef __linux__
+        linuxOvercommitMemoryWarning();
+    #endif
+        loadDataFromDisk();
+        if (server.ipfd > 0)
+            redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+        if (server.sofd > 0)
+            redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
     }
-    if (server.ipfd > 0)
-        redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
-    if (server.sofd > 0)
-        redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
+
+    /* Warning the user about suspicious maxmemory setting. */
+    if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
+        redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
+    }
+
     aeSetBeforeSleepProc(server.el,beforeSleep);
     aeMain(server.el);
     aeDeleteEventLoop(server.el);
index d877165a4156d082b3582f6b0e9c3491b9d80a07..6e917e406b9c2af355521d03e6bf02217eade3e3 100644 (file)
 #define REDIS_SLOWLOG_LOG_SLOWER_THAN 10000
 #define REDIS_SLOWLOG_MAX_LEN 128
 #define REDIS_MAX_CLIENTS 10000
-
+#define REDIS_AUTHPASS_MAX_LEN 512
+#define REDIS_DEFAULT_SLAVE_PRIORITY 100
 #define REDIS_REPL_TIMEOUT 60
 #define REDIS_REPL_PING_SLAVE_PERIOD 10
-
 #define REDIS_RUN_ID_SIZE 40
 #define REDIS_OPS_SEC_SAMPLES 16
 
@@ -84,6 +84,8 @@
 #define REDIS_CMD_NOSCRIPT  64              /* "s" flag */
 #define REDIS_CMD_RANDOM 128                /* "R" flag */
 #define REDIS_CMD_SORT_FOR_SCRIPT 256       /* "S" flag */
+#define REDIS_CMD_LOADING 512               /* "l" flag */
+#define REDIS_CMD_STALE 1024                /* "t" flag */
 
 /* Object types */
 #define REDIS_STRING 0
 #define REDIS_REPL_NONE 0 /* No active replication */
 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
 #define REDIS_REPL_CONNECTING 2 /* Connecting to master */
-#define REDIS_REPL_TRANSFER 3 /* Receiving .rdb from master */
-#define REDIS_REPL_CONNECTED 4 /* Connected to master */
+#define REDIS_REPL_RECEIVE_PONG 3 /* Wait for PING reply */
+#define REDIS_REPL_TRANSFER 4 /* Receiving .rdb from master */
+#define REDIS_REPL_CONNECTED 5 /* Connected to master */
 
 /* Synchronous read timeout - slave side */
 #define REDIS_REPL_SYNCIO_TIMEOUT 5
 #define REDIS_PROPAGATE_AOF 1
 #define REDIS_PROPAGATE_REPL 2
 
+/* Using the following macro you can run code inside serverCron() with the
+ * specified period, specified in milliseconds.
+ * The actual resolution depends on REDIS_HZ. */
+#define run_with_period(_ms_) if (!(server.cronloops%((_ms_)/(1000/REDIS_HZ))))
+
 /* We can print the stacktrace, so our assert is defined this way: */
 #define redisAssertWithInfo(_c,_o,_e) ((_e)?(void)0 : (_redisAssertWithInfo(_c,_o,#_e,__FILE__,__LINE__),_exit(1)))
 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
@@ -292,6 +300,7 @@ typedef struct redisDb {
     dict *dict;                 /* The keyspace for this DB */
     dict *expires;              /* Timeout of keys with a timeout set */
     dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) */
+    dict *ready_keys;           /* Blocked keys that received a PUSH */
     dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
     int id;
 } redisDb;
@@ -318,6 +327,22 @@ typedef struct blockingState {
                              * for BRPOPLPUSH. */
 } blockingState;
 
+/* The following structure represents a node in the server.ready_keys list,
+ * where we accumulate all the keys that had clients blocked with a blocking
+ * operation such as B[LR]POP, but received new data in the context of the
+ * last executed command.
+ *
+ * After the execution of every command or script, we run this list to check
+ * if as a result we should serve data to clients blocked, unblocking them.
+ * Note that server.ready_keys will not have duplicates as there dictionary
+ * also called ready_keys in every structure representing a Redis database,
+ * where we make sure to remember if a given key was already added in the
+ * server.ready_keys list. */
+typedef struct readyList {
+    redisDb *db;
+    robj *key;
+} readyList;
+
 /* With multiplexing we need to take per-clinet state.
  * Clients are taken in a liked list. */
 typedef struct redisClient {
@@ -345,6 +370,7 @@ typedef struct redisClient {
     int repldbfd;           /* replication DB file descriptor */
     long repldboff;         /* replication DB file offset */
     off_t repldbsize;       /* replication DB file size */
+    int slave_listening_port; /* As configured with: SLAVECONF listening-port */
     multiState mstate;      /* MULTI/EXEC state */
     blockingState bpop;   /* blocking state */
     list *io_keys;          /* Keys this client is waiting to be loaded from the
@@ -371,6 +397,7 @@ struct sharedObjectsStruct {
     *masterdownerr, *roslaveerr,
     *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
     *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
+    *lpush,
     *select[REDIS_SHARED_SELECT_CMDS],
     *integers[REDIS_SHARED_INTEGERS],
     *mbulkhdr[REDIS_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@@ -447,6 +474,7 @@ struct redisServer {
     int arch_bits;              /* 32 or 64 depending on sizeof(long) */
     int cronloops;              /* Number of times the cron function run */
     char runid[REDIS_RUN_ID_SIZE+1];  /* ID always different at every exec. */
+    int sentinel_mode;          /* True if this instance is a Sentinel. */
     /* Networking */
     int port;                   /* TCP listening port */
     char *bindaddr;             /* Bind address or NULL */
@@ -465,7 +493,8 @@ struct redisServer {
     off_t loading_loaded_bytes;
     time_t loading_start_time;
     /* Fast pointers to often looked up command */
-    struct redisCommand *delCommand, *multiCommand, *lpushCommand;
+    struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
+                        *rpopCommand;
     /* Fields used only for stats */
     time_t stat_starttime;          /* Server start time */
     long long stat_numcommands;     /* Number of processed commands */
@@ -513,6 +542,7 @@ struct redisServer {
     time_t aof_last_fsync;            /* UNIX time of last fsync() */
     time_t aof_rewrite_time_last;   /* Time used by last AOF rewrite run. */
     time_t aof_rewrite_time_start;  /* Current AOF rewrite start time. */
+    int aof_lastbgrewrite_status;   /* REDIS_OK or REDIS_ERR */
     unsigned long aof_delayed_fsync;  /* delayed AOF fsync() counter */
     /* RDB persistence */
     long long dirty;                /* Changes to DB from the last save */
@@ -539,12 +569,14 @@ struct redisServer {
     char *masterauth;               /* AUTH with this password with master */
     char *masterhost;               /* Hostname of master */
     int masterport;                 /* Port of master */
-    int repl_ping_slave_period;     /* Master pings the salve every N seconds */
+    int repl_ping_slave_period;     /* Master pings the slave every N seconds */
     int repl_timeout;               /* Timeout after N seconds of master idle */
     redisClient *master;     /* Client that is master for this slave */
     int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
     int repl_state;          /* Replication status if the instance is a slave */
-    off_t repl_transfer_left;  /* Bytes left reading .rdb  */
+    off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
+    off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
+    off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
     int repl_transfer_s;     /* Slave -> Master SYNC socket */
     int repl_transfer_fd;    /* Slave -> Master SYNC temp file descriptor */
     char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
@@ -552,6 +584,7 @@ struct redisServer {
     int repl_serve_stale_data; /* Serve stale data when link is down? */
     int repl_slave_ro;          /* Slave is read only? */
     time_t repl_down_since; /* Unix time at which link with master went down */
+    int slave_priority;             /* Reported in INFO and used by Sentinel. */
     /* Limits */
     unsigned int maxclients;        /* Max number of simultaneous clients */
     unsigned long long maxmemory;   /* Max number of memory bytes to use */
@@ -560,9 +593,9 @@ struct redisServer {
     /* Blocked clients */
     unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
     list *unblocked_clients; /* list of clients to unblock before next loop */
+    list *ready_keys;        /* List of readyList structures for BLPOP & co */
     /* Sort parameters - qsort_r() is only available under BSD so we
      * have to take this state global, in order to pass it to sortCompare() */
-    int sort_dontsort;
     int sort_desc;
     int sort_alpha;
     int sort_bypattern;
@@ -770,7 +803,7 @@ int listTypeEqual(listTypeEntry *entry, robj *o);
 void listTypeDelete(listTypeEntry *entry);
 void listTypeConvert(robj *subject, int enc);
 void unblockClientWaitingData(redisClient *c);
-int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
+void handleClientsBlockedOnLists(void);
 void popGenericCommand(redisClient *c, int where);
 
 /* MULTI/EXEC/WATCH... */
@@ -967,6 +1000,12 @@ int *noPreloadGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numke
 int *renameGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
 int *zunionInterGetKeys(struct redisCommand *cmd,robj **argv, int argc, int *numkeys, int flags);
 
+/* Sentinel */
+void initSentinelConfig(void);
+void initSentinel(void);
+void sentinelTimer(void);
+char *sentinelHandleConfiguration(char **argv, int argc);
+
 /* Scripting */
 void scriptingInit(void);
 
@@ -1109,6 +1148,7 @@ void scriptCommand(redisClient *c);
 void timeCommand(redisClient *c);
 void bitopCommand(redisClient *c);
 void bitcountCommand(redisClient *c);
+void replconfCommand(redisClient *c);
 
 #if defined(__GNUC__)
 void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
@@ -1128,4 +1168,11 @@ sds genRedisInfoString(char *section);
 void enableWatchdog(int period);
 void disableWatchdog(void);
 void watchdogScheduleSignal(int period);
+void redisLogHexDump(int level, char *descr, void *value, size_t len);
+
+#define redisDebug(fmt, ...) \
+    printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__)
+#define redisDebugMark() \
+    printf("-- MARK %s:%d --\n", __FILE__, __LINE__)
+
 #endif
index 8eb36f837e192b06be1f0dcdbf128f88de0932b8..c1e4619160bc78c38808df3ed8df222f84d6717c 100644 (file)
@@ -3,6 +3,7 @@
 #include <sys/time.h>
 #include <unistd.h>
 #include <fcntl.h>
+#include <sys/socket.h>
 #include <sys/stat.h>
 
 /* ---------------------------------- MASTER -------------------------------- */
@@ -145,6 +146,46 @@ void syncCommand(redisClient *c) {
     return;
 }
 
+/* REPLCONF <option> <value> <option> <value> ...
+ * This command is used by a slave in order to configure the replication
+ * process before starting it with the SYNC command.
+ *
+ * Currently the only use of this command is to communicate to the master
+ * what is the listening port of the Slave redis instance, so that the
+ * master can accurately list slaves and their listening ports in
+ * the INFO output.
+ *
+ * In the future the same command can be used in order to configure
+ * the replication to initiate an incremental replication instead of a
+ * full resync. */
+void replconfCommand(redisClient *c) {
+    int j;
+
+    if ((c->argc % 2) == 0) {
+        /* Number of arguments must be odd to make sure that every
+         * option has a corresponding value. */
+        addReply(c,shared.syntaxerr);
+        return;
+    }
+
+    /* Process every option-value pair. */
+    for (j = 1; j < c->argc; j+=2) {
+        if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
+            long port;
+
+            if ((getLongFromObjectOrReply(c,c->argv[j+1],
+                    &port,NULL) != REDIS_OK))
+                return;
+            c->slave_listening_port = port;
+        } else {
+            addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
+                (char*)c->argv[j]->ptr);
+            return;
+        }
+    }
+    addReply(c,shared.ok);
+}
+
 void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
     redisClient *slave = privdata;
     REDIS_NOTUSED(el);
@@ -271,16 +312,18 @@ void replicationAbortSyncTransfer(void) {
 }
 
 /* Asynchronously read the SYNC payload we receive from a master */
+#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
 void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
     char buf[4096];
     ssize_t nread, readlen;
+    off_t left;
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(privdata);
     REDIS_NOTUSED(mask);
 
-    /* If repl_transfer_left == -1 we still have to read the bulk length
+    /* If repl_transfer_size == -1 we still have to read the bulk length
      * from the master reply. */
-    if (server.repl_transfer_left == -1) {
+    if (server.repl_transfer_size == -1) {
         if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
             redisLog(REDIS_WARNING,
                 "I/O error reading bulk count from MASTER: %s",
@@ -303,16 +346,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
             redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
             goto error;
         }
-        server.repl_transfer_left = strtol(buf+1,NULL,10);
+        server.repl_transfer_size = strtol(buf+1,NULL,10);
         redisLog(REDIS_NOTICE,
             "MASTER <-> SLAVE sync: receiving %ld bytes from master",
-            server.repl_transfer_left);
+            server.repl_transfer_size);
         return;
     }
 
     /* Read bulk data */
-    readlen = (server.repl_transfer_left < (signed)sizeof(buf)) ?
-        server.repl_transfer_left : (signed)sizeof(buf);
+    left = server.repl_transfer_size - server.repl_transfer_read;
+    readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
     nread = read(fd,buf,readlen);
     if (nread <= 0) {
         redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
@@ -325,9 +368,23 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
         redisLog(REDIS_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> SLAVE synchronization: %s", strerror(errno));
         goto error;
     }
-    server.repl_transfer_left -= nread;
+    server.repl_transfer_read += nread;
+
+    /* Sync data on disk from time to time, otherwise at the end of the transfer
+     * we may suffer a big delay as the memory buffers are copied into the
+     * actual disk. */
+    if (server.repl_transfer_read >=
+        server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
+    {
+        off_t sync_size = server.repl_transfer_read -
+                          server.repl_transfer_last_fsync_off;
+        rdb_fsync_range(server.repl_transfer_fd,
+            server.repl_transfer_last_fsync_off, sync_size);
+        server.repl_transfer_last_fsync_off += sync_size;
+    }
+
     /* Check if the transfer is now complete */
-    if (server.repl_transfer_left == 0) {
+    if (server.repl_transfer_read == server.repl_transfer_size) {
         if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
             redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
             replicationAbortSyncTransfer();
@@ -378,9 +435,57 @@ error:
     return;
 }
 
+/* Send a synchronous command to the master. Used to send AUTH and
+ * REPLCONF commands before starting the replication with SYNC.
+ *
+ * On success NULL is returned.
+ * On error an sds string describing the error is returned.
+ */
+char *sendSynchronousCommand(int fd, ...) {
+    va_list ap;
+    sds cmd = sdsempty();
+    char *arg, buf[256];
+
+    /* Create the command to send to the master, we use simple inline
+     * protocol for simplicity as currently we only send simple strings. */
+    va_start(ap,fd);
+    while(1) {
+        arg = va_arg(ap, char*);
+        if (arg == NULL) break;
+
+        if (sdslen(cmd) != 0) cmd = sdscatlen(cmd," ",1);
+        cmd = sdscat(cmd,arg);
+    }
+    cmd = sdscatlen(cmd,"\r\n",2);
+
+    /* Transfer command to the server. */
+    if (syncWrite(fd,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
+        sdsfree(cmd);
+        return sdscatprintf(sdsempty(),"Writing to master: %s",
+                strerror(errno));
+    }
+    sdsfree(cmd);
+
+    /* Read the reply from the server. */
+    if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
+    {
+        return sdscatprintf(sdsempty(),"Reading from master: %s",
+                strerror(errno));
+    }
+
+    /* Check for errors from the server. */
+    if (buf[0] != '+') {
+        return sdscatprintf(sdsempty(),"Error from master: %s", buf);
+    }
+
+    return NULL; /* No errors. */
+}
+
 void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
-    char buf[1024], tmpfile[256];
+    char tmpfile[256], *err;
     int dfd, maxtries = 5;
+    int sockerr = 0;
+    socklen_t errlen = sizeof(sockerr);
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(privdata);
     REDIS_NOTUSED(mask);
@@ -392,35 +497,88 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
         return;
     }
 
-    redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
-    /* This event should only be triggered once since it is used to have a
-     * non-blocking connect(2) to the master. It has been triggered when this
-     * function is called, so we can delete it. */
-    aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
+    /* Check for errors in the socket. */
+    if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
+        sockerr = errno;
+    if (sockerr) {
+        aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
+        redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
+            strerror(sockerr));
+        goto error;
+    }
 
-    /* AUTH with the master if required. */
-    if(server.masterauth) {
-        char authcmd[1024];
-        size_t authlen;
+    /* If we were connecting, it's time to send a non blocking PING, we want to
+     * make sure the master is able to reply before going into the actual
+     * replication process where we have long timeouts in the order of
+     * seconds (in the meantime the slave would block). */
+    if (server.repl_state == REDIS_REPL_CONNECTING) {
+        redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
+        /* Delete the writable event so that the readable event remains
+         * registered and we can wait for the PONG reply. */
+        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
+        server.repl_state = REDIS_REPL_RECEIVE_PONG;
+        /* Send the PING, don't check for errors at all, we have the timeout
+         * that will take care about this. */
+        syncWrite(fd,"PING\r\n",6,100);
+        return;
+    }
+
+    /* Receive the PONG command. */
+    if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
+        char buf[1024];
+
+        /* Delete the readable event, we no longer need it now that there is
+         * the PING reply to read. */
+        aeDeleteFileEvent(server.el,fd,AE_READABLE);
 
-        authlen = snprintf(authcmd,sizeof(authcmd),"AUTH %s\r\n",server.masterauth);
-        if (syncWrite(fd,authcmd,authlen,server.repl_syncio_timeout*1000) == -1) {
-            redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
+        /* Read the reply with explicit timeout. */
+        buf[0] = '\0';
+        if (syncReadLine(fd,buf,sizeof(buf),
+            server.repl_syncio_timeout*1000) == -1)
+        {
+            redisLog(REDIS_WARNING,
+                "I/O error reading PING reply from master: %s",
                 strerror(errno));
             goto error;
         }
-        /* Read the AUTH result.  */
-        if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
-            redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
-                strerror(errno));
+
+        /* We don't care about the reply, it can be +PONG or an error since
+         * the server requires AUTH. As long as it replies correctly, it's
+         * fine from our point of view. */
+        if (buf[0] != '-' && buf[0] != '+') {
+            redisLog(REDIS_WARNING,"Unexpected reply to PING from master.");
             goto error;
+        } else {
+            redisLog(REDIS_NOTICE,
+                "Master replied to PING, replication can continue...");
         }
-        if (buf[0] != '+') {
-            redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
+    }
+
+    /* AUTH with the master if required. */
+    if(server.masterauth) {
+        err = sendSynchronousCommand(fd,"AUTH",server.masterauth,NULL);
+        if (err) {
+            redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",err);
+            sdsfree(err);
             goto error;
         }
     }
 
+    /* Set the slave port, so that Master's INFO command can list the
+     * slave listening port correctly. */
+    {
+        sds port = sdsfromlonglong(server.port);
+        err = sendSynchronousCommand(fd,"REPLCONF","listening-port",port,
+                                         NULL);
+        sdsfree(port);
+        /* Ignore the error if any, not all the Redis versions support
+         * REPLCONF listening-port. */
+        if (err) {
+            redisLog(REDIS_NOTICE,"(non critical): Master does not understand REPLCONF listening-port: %s", err);
+            sdsfree(err);
+        }
+    }
+
     /* Issue the SYNC command */
     if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
         redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
@@ -450,15 +608,18 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
     }
 
     server.repl_state = REDIS_REPL_TRANSFER;
-    server.repl_transfer_left = -1;
+    server.repl_transfer_size = -1;
+    server.repl_transfer_read = 0;
+    server.repl_transfer_last_fsync_off = 0;
     server.repl_transfer_fd = dfd;
     server.repl_transfer_lastio = server.unixtime;
     server.repl_transfer_tmpfile = zstrdup(tmpfile);
     return;
 
 error:
-    server.repl_state = REDIS_REPL_CONNECT;
     close(fd);
+    server.repl_transfer_s = -1;
+    server.repl_state = REDIS_REPL_CONNECT;
     return;
 }
 
@@ -491,7 +652,8 @@ int connectWithMaster(void) {
 void undoConnectWithMaster(void) {
     int fd = server.repl_transfer_s;
 
-    redisAssert(server.repl_state == REDIS_REPL_CONNECTING);
+    redisAssert(server.repl_state == REDIS_REPL_CONNECTING ||
+                server.repl_state == REDIS_REPL_RECEIVE_PONG);
     aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
     close(fd);
     server.repl_transfer_s = -1;
@@ -507,7 +669,8 @@ void slaveofCommand(redisClient *c) {
             if (server.master) freeClient(server.master);
             if (server.repl_state == REDIS_REPL_TRANSFER)
                 replicationAbortSyncTransfer();
-            else if (server.repl_state == REDIS_REPL_CONNECTING)
+            else if (server.repl_state == REDIS_REPL_CONNECTING ||
+                     server.repl_state == REDIS_REPL_RECEIVE_PONG)
                 undoConnectWithMaster();
             server.repl_state = REDIS_REPL_NONE;
             redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
@@ -545,7 +708,9 @@ void slaveofCommand(redisClient *c) {
 
 void replicationCron(void) {
     /* Non blocking connection timeout? */
-    if (server.masterhost && server.repl_state == REDIS_REPL_CONNECTING &&
+    if (server.masterhost &&
+        (server.repl_state == REDIS_REPL_CONNECTING ||
+         server.repl_state == REDIS_REPL_RECEIVE_PONG) &&
         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
     {
         redisLog(REDIS_WARNING,"Timeout connecting to the MASTER...");
@@ -556,7 +721,7 @@ void replicationCron(void) {
     if (server.masterhost && server.repl_state == REDIS_REPL_TRANSFER &&
         (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
     {
-        redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER...");
+        redisLog(REDIS_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
         replicationAbortSyncTransfer();
     }
 
@@ -580,7 +745,7 @@ void replicationCron(void) {
      * So slaves can implement an explicit timeout to masters, and will
      * be able to detect a link disconnection even if the TCP connection
      * will not actually go down. */
-    if (!(server.cronloops % (server.repl_ping_slave_period*10))) {
+    if (!(server.cronloops % (server.repl_ping_slave_period * REDIS_HZ))) {
         listIter li;
         listNode *ln;
 
index 8c89c923c0ce4331b7136dc453141b578b870758..13f608e67cfe23c6819f9e76f1adc78d7033a026 100644 (file)
@@ -167,6 +167,13 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
     redisClient *c = server.lua_client;
     sds reply;
 
+    /* Require at least one argument */
+    if (argc == 0) {
+        luaPushError(lua,
+            "Please specify at least one argument for redis.call()");
+        return 1;
+    }
+
     /* Build the arguments vector */
     argv = zmalloc(sizeof(robj*)*argc);
     for (j = 0; j < argc; j++) {
@@ -275,11 +282,10 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
      * reply as expected. */
     if ((cmd->flags & REDIS_CMD_SORT_FOR_SCRIPT) &&
         (reply[0] == '*' && reply[1] != '-')) {
-        /* Skip this step if command is SORT but output was already sorted */
-        if (cmd->proc != sortCommand || server.sort_dontsort)
             luaSortArray(lua);
     }
     sdsfree(reply);
+    c->reply_bytes = 0;
 
 cleanup:
     /* Clean up. Command code may have changed argv/argc so we use the
@@ -326,6 +332,34 @@ int luaRedisSha1hexCommand(lua_State *lua) {
     return 1;
 }
 
+/* Returns a table with a single field 'field' set to the string value
+ * passed as argument. This helper function is handy when returning
+ * a Redis Protocol error or status reply from Lua:
+ *
+ * return redis.error_reply("ERR Some Error")
+ * return redis.status_reply("ERR Some Error")
+ */
+int luaRedisReturnSingleFieldTable(lua_State *lua, char *field) {
+    if (lua_gettop(lua) != 1 || lua_type(lua,-1) != LUA_TSTRING) {
+        luaPushError(lua, "wrong number or type of arguments");
+        return 1;
+    }
+
+    lua_newtable(lua);
+    lua_pushstring(lua, field);
+    lua_pushvalue(lua, -3);
+    lua_settable(lua, -3);
+    return 1;
+}
+
+int luaRedisErrorReplyCommand(lua_State *lua) {
+    return luaRedisReturnSingleFieldTable(lua,"err");
+}
+
+int luaRedisStatusReplyCommand(lua_State *lua) {
+    return luaRedisReturnSingleFieldTable(lua,"ok");
+}
+
 int luaLogCommand(lua_State *lua) {
     int j, argc = lua_gettop(lua);
     int level;
@@ -510,6 +544,14 @@ void scriptingInit(void) {
     lua_pushcfunction(lua, luaRedisSha1hexCommand);
     lua_settable(lua, -3);
 
+    /* redis.error_reply and redis.status_reply */
+    lua_pushstring(lua, "error_reply");
+    lua_pushcfunction(lua, luaRedisErrorReplyCommand);
+    lua_settable(lua, -3);
+    lua_pushstring(lua, "status_reply");
+    lua_pushcfunction(lua, luaRedisStatusReplyCommand);
+    lua_settable(lua, -3);
+
     /* Finally set the table as 'redis' global var. */
     lua_setglobal(lua,"redis");
 
index bc6aa6b2f25cc8988b405782a3e25d50e08ea704..ca915daf88e69c31fc54bffb24415674f7e62e00 100644 (file)
--- a/src/sds.c
+++ b/src/sds.c
@@ -179,7 +179,7 @@ sds sdsgrowzero(sds s, size_t len) {
     return s;
 }
 
-sds sdscatlen(sds s, void *t, size_t len) {
+sds sdscatlen(sds s, const void *t, size_t len) {
     struct sdshdr *sh;
     size_t curlen = sdslen(s);
 
@@ -193,15 +193,15 @@ sds sdscatlen(sds s, void *t, size_t len) {
     return s;
 }
 
-sds sdscat(sds s, char *t) {
+sds sdscat(sds s, const char *t) {
     return sdscatlen(s, t, strlen(t));
 }
 
-sds sdscatsds(sds s, sds t) {
+sds sdscatsds(sds s, const sds t) {
     return sdscatlen(s, t, sdslen(t));
 }
 
-sds sdscpylen(sds s, char *t, size_t len) {
+sds sdscpylen(sds s, const char *t, size_t len) {
     struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr)));
     size_t totlen = sh->free+sh->len;
 
@@ -218,7 +218,7 @@ sds sdscpylen(sds s, char *t, size_t len) {
     return s;
 }
 
-sds sdscpy(sds s, char *t) {
+sds sdscpy(sds s, const char *t) {
     return sdscpylen(s, t, strlen(t));
 }
 
@@ -314,7 +314,7 @@ void sdstoupper(sds s) {
     for (j = 0; j < len; j++) s[j] = toupper(s[j]);
 }
 
-int sdscmp(sds s1, sds s2) {
+int sdscmp(const sds s1, const sds s2) {
     size_t l1, l2, minlen;
     int cmp;
 
@@ -342,7 +342,7 @@ int sdscmp(sds s1, sds s2) {
  * requires length arguments. sdssplit() is just the
  * same function but for zero-terminated strings.
  */
-sds *sdssplitlen(char *s, int len, char *sep, int seplen, int *count) {
+sds *sdssplitlen(const char *s, int len, const char *sep, int seplen, int *count) {
     int elements = 0, slots = 5, start = 0, j;
     sds *tokens;
 
@@ -413,7 +413,7 @@ sds sdsfromlonglong(long long value) {
     return sdsnewlen(p,32-(p-buf));
 }
 
-sds sdscatrepr(sds s, char *p, size_t len) {
+sds sdscatrepr(sds s, const char *p, size_t len) {
     s = sdscatlen(s,"\"",1);
     while(len--) {
         switch(*p) {
@@ -481,8 +481,8 @@ int hex_digit_to_int(char c) {
  * Note that sdscatrepr() is able to convert back a string into
  * a quoted string in the same format sdssplitargs() is able to parse.
  */
-sds *sdssplitargs(char *line, int *argc) {
-    char *p = line;
+sds *sdssplitargs(const char *line, int *argc) {
+    const char *p = line;
     char *current = NULL;
     char **vector = NULL;
 
@@ -604,7 +604,7 @@ void sdssplitargs_free(sds *argv, int argc) {
  *
  * The function returns the sds string pointer, that is always the same
  * as the input pointer since no resize is needed. */
-sds sdsmapchars(sds s, char *from, char *to, size_t setlen) {
+sds sdsmapchars(sds s, const char *from, const char *to, size_t setlen) {
     size_t j, i, l = sdslen(s);
 
     for (j = 0; j < l; j++) {
index 0648381bb9a74a72382a5dc5dd67b9c15badf7d3..e8d3065036fd989e81c5cc474297a3f100d9f731 100644 (file)
--- a/src/sds.h
+++ b/src/sds.h
@@ -60,13 +60,13 @@ sds sdsempty();
 size_t sdslen(const sds s);
 sds sdsdup(const sds s);
 void sdsfree(sds s);
-size_t sdsavail(sds s);
+size_t sdsavail(const sds s);
 sds sdsgrowzero(sds s, size_t len);
-sds sdscatlen(sds s, void *t, size_t len);
-sds sdscat(sds s, char *t);
-sds sdscatsds(sds s, sds t);
-sds sdscpylen(sds s, char *t, size_t len);
-sds sdscpy(sds s, char *t);
+sds sdscatlen(sds s, const void *t, size_t len);
+sds sdscat(sds s, const char *t);
+sds sdscatsds(sds s, const sds t);
+sds sdscpylen(sds s, const char *t, size_t len);
+sds sdscpy(sds s, const char *t);
 
 sds sdscatvprintf(sds s, const char *fmt, va_list ap);
 #ifdef __GNUC__
@@ -80,16 +80,16 @@ sds sdstrim(sds s, const char *cset);
 sds sdsrange(sds s, int start, int end);
 void sdsupdatelen(sds s);
 void sdsclear(sds s);
-int sdscmp(sds s1, sds s2);
-sds *sdssplitlen(char *s, int len, char *sep, int seplen, int *count);
+int sdscmp(const sds s1, const sds s2);
+sds *sdssplitlen(const char *s, int len, const char *sep, int seplen, int *count);
 void sdsfreesplitres(sds *tokens, int count);
 void sdstolower(sds s);
 void sdstoupper(sds s);
 sds sdsfromlonglong(long long value);
-sds sdscatrepr(sds s, char *p, size_t len);
-sds *sdssplitargs(char *line, int *argc);
+sds sdscatrepr(sds s, const char *p, size_t len);
+sds *sdssplitargs(const char *line, int *argc);
 void sdssplitargs_free(sds *argv, int argc);
-sds sdsmapchars(sds s, char *from, char *to, size_t setlen);
+sds sdsmapchars(sds s, const char *from, const char *to, size_t setlen);
 
 /* Low level functions exposed to the user API */
 sds sdsMakeRoomFor(sds s, size_t addlen);
diff --git a/src/sentinel.c b/src/sentinel.c
new file mode 100644 (file)
index 0000000..9c8dee7
--- /dev/null
@@ -0,0 +1,3058 @@
+/* Redis Sentinel implementation
+ * -----------------------------
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *   * Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *   * Redistributions in binary form must reproduce the above copyright
+ *     notice, this list of conditions and the following disclaimer in the
+ *     documentation and/or other materials provided with the distribution.
+ *   * Neither the name of Redis nor the names of its contributors may be used
+ *     to endorse or promote products derived from this software without
+ *     specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "redis.h"
+#include "hiredis.h"
+#include "async.h"
+
+#include <ctype.h>
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <sys/wait.h>
+
+extern char **environ;
+
+#define REDIS_SENTINEL_PORT 26379
+
+/* ======================== Sentinel global state =========================== */
+
+typedef long long mstime_t; /* millisecond time type. */
+
+/* Address object, used to describe an ip:port pair. */
+typedef struct sentinelAddr {
+    char *ip;
+    int port;
+} sentinelAddr;
+
+/* A Sentinel Redis Instance object is monitoring. */
+#define SRI_MASTER  (1<<0)
+#define SRI_SLAVE   (1<<1)
+#define SRI_SENTINEL (1<<2)
+#define SRI_DISCONNECTED (1<<3)
+#define SRI_S_DOWN (1<<4)   /* Subjectively down (no quorum). */
+#define SRI_O_DOWN (1<<5)   /* Objectively down (quorum reached). */
+#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
+                                   its master is down. */
+/* SRI_CAN_FAILOVER when set in an SRI_MASTER instance means that we are
+ * allowed to perform the failover for this master.
+ * When set in a SRI_SENTINEL instance means that sentinel is allowed to
+ * perform the failover on its master. */
+#define SRI_CAN_FAILOVER (1<<7)
+#define SRI_FAILOVER_IN_PROGRESS (1<<8) /* Failover is in progress for
+                                           this master. */
+#define SRI_I_AM_THE_LEADER (1<<9)     /* We are the leader for this master. */
+#define SRI_PROMOTED (1<<10)            /* Slave selected for promotion. */
+#define SRI_RECONF_SENT (1<<11)     /* SLAVEOF <newmaster> sent. */
+#define SRI_RECONF_INPROG (1<<12)   /* Slave synchronization in progress. */
+#define SRI_RECONF_DONE (1<<13)     /* Slave synchronized with new master. */
+#define SRI_FORCE_FAILOVER (1<<14)  /* Force failover with master up. */
+#define SRI_SCRIPT_KILL_SENT (1<<15) /* SCRIPT KILL already sent on -BUSY */
+
+#define SENTINEL_INFO_PERIOD 10000
+#define SENTINEL_PING_PERIOD 1000
+#define SENTINEL_ASK_PERIOD 1000
+#define SENTINEL_PUBLISH_PERIOD 5000
+#define SENTINEL_DOWN_AFTER_PERIOD 30000
+#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
+#define SENTINEL_TILT_TRIGGER 2000
+#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
+#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
+#define SENTINEL_PROMOTION_RETRY_PERIOD 30000
+#define SENTINEL_SLAVE_RECONF_RETRY_PERIOD 10000
+#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
+#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
+#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*15*1000)
+#define SENTINEL_MAX_PENDING_COMMANDS 100
+#define SENTINEL_EXTENDED_SDOWN_MULTIPLIER 10
+
+/* How many milliseconds is an information valid? This applies for instance
+ * to the reply to SENTINEL IS-MASTER-DOWN-BY-ADDR replies. */
+#define SENTINEL_INFO_VALIDITY_TIME 5000
+#define SENTINEL_FAILOVER_FIXED_DELAY 5000
+#define SENTINEL_FAILOVER_MAX_RANDOM_DELAY 10000
+
+/* Failover machine different states. */
+#define SENTINEL_FAILOVER_STATE_NONE 0  /* No failover in progress. */
+#define SENTINEL_FAILOVER_STATE_WAIT_START 1  /* Wait for failover_start_time*/ 
+#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
+#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
+#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
+#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
+#define SENTINEL_FAILOVER_STATE_WAIT_NEXT_SLAVE 6 /* wait replication */
+#define SENTINEL_FAILOVER_STATE_ALERT_CLIENTS 7 /* Run user script. */
+#define SENTINEL_FAILOVER_STATE_WAIT_ALERT_SCRIPT 8 /* Wait script exec. */
+#define SENTINEL_FAILOVER_STATE_DETECT_END 9 /* Check for failover end. */
+#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 10 /* Monitor promoted slave. */
+
+#define SENTINEL_MASTER_LINK_STATUS_UP 0
+#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
+
+/* Generic flags that can be used with different functions. */
+#define SENTINEL_NO_FLAGS 0
+#define SENTINEL_GENERATE_EVENT 1
+#define SENTINEL_LEADER 2
+#define SENTINEL_OBSERVER 4
+
+/* Script execution flags and limits. */
+#define SENTINEL_SCRIPT_NONE 0
+#define SENTINEL_SCRIPT_RUNNING 1
+#define SENTINEL_SCRIPT_MAX_QUEUE 256
+#define SENTINEL_SCRIPT_MAX_RUNNING 16
+#define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
+#define SENTINEL_SCRIPT_MAX_RETRY 10
+#define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
+
+typedef struct sentinelRedisInstance {
+    int flags;      /* See SRI_... defines */
+    char *name;     /* Master name from the point of view of this sentinel. */
+    char *runid;    /* run ID of this instance. */
+    sentinelAddr *addr; /* Master host. */
+    redisAsyncContext *cc; /* Hiredis context for commands. */
+    redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
+    int pending_commands;   /* Number of commands sent waiting for a reply. */
+    mstime_t cc_conn_time; /* cc connection time. */
+    mstime_t pc_conn_time; /* pc connection time. */
+    mstime_t pc_last_activity; /* Last time we received any message. */
+    mstime_t last_avail_time; /* Last time the instance replied to ping with
+                                 a reply we consider valid. */
+    mstime_t last_pong_time;  /* Last time the instance replied to ping,
+                                 whatever the reply was. That's used to check
+                                 if the link is idle and must be reconnected. */
+    mstime_t last_pub_time;   /* Last time we sent hello via Pub/Sub. */
+    mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
+                                 we received an hello from this Sentinel
+                                 via Pub/Sub. */
+    mstime_t last_master_down_reply_time; /* Time of last reply to
+                                             SENTINEL is-master-down command. */
+    mstime_t s_down_since_time; /* Subjectively down since time. */
+    mstime_t o_down_since_time; /* Objectively down since time. */
+    mstime_t down_after_period; /* Consider it down after that period. */
+    mstime_t info_refresh;  /* Time at which we received INFO output from it. */
+
+    /* Master specific. */
+    dict *sentinels;    /* Other sentinels monitoring the same master. */
+    dict *slaves;       /* Slaves for this master instance. */
+    int quorum;         /* Number of sentinels that need to agree on failure. */
+    int parallel_syncs; /* How many slaves to reconfigure at same time. */
+    char *auth_pass;    /* Password to use for AUTH against master & slaves. */
+
+    /* Slave specific. */
+    mstime_t master_link_down_time; /* Slave replication link down time. */
+    int slave_priority; /* Slave priority according to its INFO output. */
+    mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
+    struct sentinelRedisInstance *master; /* Master instance if SRI_SLAVE is set. */
+    char *slave_master_host;    /* Master host as reported by INFO */
+    int slave_master_port;      /* Master port as reported by INFO */
+    int slave_master_link_status; /* Master link status as reported by INFO */
+    /* Failover */
+    char *leader;       /* If this is a master instance, this is the runid of
+                           the Sentinel that should perform the failover. If
+                           this is a Sentinel, this is the runid of the Sentinel
+                           that this other Sentinel is voting as leader.
+                           This field is valid only if SRI_MASTER_DOWN is
+                           set on the Sentinel instance. */
+    int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
+    mstime_t failover_state_change_time;
+    mstime_t failover_start_time;   /* When to start to failover if leader. */
+    mstime_t failover_timeout;      /* Max time to refresh failover state. */
+    struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
+    /* Scripts executed to notify admin or reconfigure clients: when they
+     * are set to NULL no script is executed. */
+    char *notification_script;
+    char *client_reconfig_script;
+} sentinelRedisInstance;
+
+/* Main state. */
+struct sentinelState {
+    dict *masters;      /* Dictionary of master sentinelRedisInstances.
+                           Key is the instance name, value is the
+                           sentinelRedisInstance structure pointer. */
+    int tilt;           /* Are we in TILT mode? */
+    int running_scripts;    /* Number of scripts in execution right now. */
+    mstime_t tilt_start_time;   /* When TITL started. */
+    mstime_t previous_time;     /* Time last time we ran the time handler. */
+    list *scripts_queue;    /* Queue of user scripts to execute. */
+} sentinel;
+
+/* A script execution job. */
+typedef struct sentinelScriptJob {
+    int flags;              /* Script job flags: SENTINEL_SCRIPT_* */
+    int retry_num;          /* Number of times we tried to execute it. */
+    char **argv;            /* Arguments to call the script. */
+    mstime_t start_time;    /* Script execution time if the script is running,
+                               otherwise 0 if we are allowed to retry the
+                               execution at any time. If the script is not
+                               running and it's not 0, it means: do not run
+                               before the specified time. */
+    pid_t pid;              /* Script execution pid. */
+} sentinelScriptJob;
+
+/* ======================= hiredis ae.c adapters =============================
+ * Note: this implementation is taken from hiredis/adapters/ae.h, however
+ * we have our modified copy for Sentinel in order to use our allocator
+ * and to have full control over how the adapter works. */
+
+typedef struct redisAeEvents {
+    redisAsyncContext *context;
+    aeEventLoop *loop;
+    int fd;
+    int reading, writing;
+} redisAeEvents;
+
+static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
+    ((void)el); ((void)fd); ((void)mask);
+
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    redisAsyncHandleRead(e->context);
+}
+
+static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
+    ((void)el); ((void)fd); ((void)mask);
+
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    redisAsyncHandleWrite(e->context);
+}
+
+static void redisAeAddRead(void *privdata) {
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    aeEventLoop *loop = e->loop;
+    if (!e->reading) {
+        e->reading = 1;
+        aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
+    }
+}
+
+static void redisAeDelRead(void *privdata) {
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    aeEventLoop *loop = e->loop;
+    if (e->reading) {
+        e->reading = 0;
+        aeDeleteFileEvent(loop,e->fd,AE_READABLE);
+    }
+}
+
+static void redisAeAddWrite(void *privdata) {
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    aeEventLoop *loop = e->loop;
+    if (!e->writing) {
+        e->writing = 1;
+        aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
+    }
+}
+
+static void redisAeDelWrite(void *privdata) {
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    aeEventLoop *loop = e->loop;
+    if (e->writing) {
+        e->writing = 0;
+        aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
+    }
+}
+
+static void redisAeCleanup(void *privdata) {
+    redisAeEvents *e = (redisAeEvents*)privdata;
+    redisAeDelRead(privdata);
+    redisAeDelWrite(privdata);
+    zfree(e);
+}
+
+static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
+    redisContext *c = &(ac->c);
+    redisAeEvents *e;
+
+    /* Nothing should be attached when something is already attached */
+    if (ac->ev.data != NULL)
+        return REDIS_ERR;
+
+    /* Create container for context and r/w events */
+    e = (redisAeEvents*)zmalloc(sizeof(*e));
+    e->context = ac;
+    e->loop = loop;
+    e->fd = c->fd;
+    e->reading = e->writing = 0;
+
+    /* Register functions to start/stop listening for events */
+    ac->ev.addRead = redisAeAddRead;
+    ac->ev.delRead = redisAeDelRead;
+    ac->ev.addWrite = redisAeAddWrite;
+    ac->ev.delWrite = redisAeDelWrite;
+    ac->ev.cleanup = redisAeCleanup;
+    ac->ev.data = e;
+
+    return REDIS_OK;
+}
+
+/* ============================= Prototypes ================================= */
+
+void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
+void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
+void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
+sentinelRedisInstance *sentinelGetMasterByName(char *name);
+char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
+char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
+int yesnotoi(char *s);
+void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
+void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
+const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
+void sentinelAbortFailover(sentinelRedisInstance *ri);
+void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
+sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
+void sentinelScheduleScriptExecution(char *path, ...);
+void sentinelStartFailover(sentinelRedisInstance *master, int state);
+void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
+
+/* ========================= Dictionary types =============================== */
+
+unsigned int dictSdsHash(const void *key);
+int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
+void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
+
+void dictInstancesValDestructor (void *privdata, void *obj) {
+    releaseSentinelRedisInstance(obj);
+}
+
+/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
+ *
+ * also used for: sentinelRedisInstance->sentinels dictionary that maps
+ * sentinels ip:port to last seen time in Pub/Sub hello message. */
+dictType instancesDictType = {
+    dictSdsHash,               /* hash function */
+    NULL,                      /* key dup */
+    NULL,                      /* val dup */
+    dictSdsKeyCompare,         /* key compare */
+    NULL,                      /* key destructor */
+    dictInstancesValDestructor /* val destructor */
+};
+
+/* Instance runid (sds) -> votes (long casted to void*)
+ *
+ * This is useful into sentinelGetObjectiveLeader() function in order to
+ * count the votes and understand who is the leader. */
+dictType leaderVotesDictType = {
+    dictSdsHash,               /* hash function */
+    NULL,                      /* key dup */
+    NULL,                      /* val dup */
+    dictSdsKeyCompare,         /* key compare */
+    NULL,                      /* key destructor */
+    NULL                       /* val destructor */
+};
+
+/* =========================== Initialization =============================== */
+
+void sentinelCommand(redisClient *c);
+void sentinelInfoCommand(redisClient *c);
+
+struct redisCommand sentinelcmds[] = {
+    {"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
+    {"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
+    {"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
+    {"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
+    {"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
+    {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
+    {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0}
+};
+
+/* This function overwrites a few normal Redis config default with Sentinel
+ * specific defaults. */
+void initSentinelConfig(void) {
+    server.port = REDIS_SENTINEL_PORT;
+}
+
+/* Perform the Sentinel mode initialization. */
+void initSentinel(void) {
+    int j;
+
+    /* Remove usual Redis commands from the command table, then just add
+     * the SENTINEL command. */
+    dictEmpty(server.commands);
+    for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
+        int retval;
+        struct redisCommand *cmd = sentinelcmds+j;
+
+        retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
+        redisAssert(retval == DICT_OK);
+    }
+
+    /* Initialize various data structures. */
+    sentinel.masters = dictCreate(&instancesDictType,NULL);
+    sentinel.tilt = 0;
+    sentinel.tilt_start_time = mstime();
+    sentinel.previous_time = mstime();
+    sentinel.running_scripts = 0;
+    sentinel.scripts_queue = listCreate();
+}
+
+/* ============================== sentinelAddr ============================== */
+
+/* Create a sentinelAddr object and return it on success.
+ * On error NULL is returned and errno is set to:
+ *  ENOENT: Can't resolve the hostname.
+ *  EINVAL: Invalid port number.
+ */
+sentinelAddr *createSentinelAddr(char *hostname, int port) {
+    char buf[32];
+    sentinelAddr *sa;
+
+    if (port <= 0 || port > 65535) {
+        errno = EINVAL;
+        return NULL;
+    }
+    if (anetResolve(NULL,hostname,buf) == ANET_ERR) {
+        errno = ENOENT;
+        return NULL;
+    }
+    sa = zmalloc(sizeof(*sa));
+    sa->ip = sdsnew(buf);
+    sa->port = port;
+    return sa;
+}
+
+/* Free a Sentinel address. Can't fail. */
+void releaseSentinelAddr(sentinelAddr *sa) {
+    sdsfree(sa->ip);
+    zfree(sa);
+}
+
+/* =========================== Events notification ========================== */
+
+/* Send an event to log, pub/sub, user notification script.
+ * 
+ * 'level' is the log level for logging. Only REDIS_WARNING events will trigger
+ * the execution of the user notification script.
+ *
+ * 'type' is the message type, also used as a pub/sub channel name.
+ *
+ * 'ri', is the redis instance target of this event if applicable, and is
+ * used to obtain the path of the notification script to execute.
+ *
+ * The remaining arguments are printf-alike.
+ * If the format specifier starts with the two characters "%@" then ri is
+ * not NULL, and the message is prefixed with an instance identifier in the
+ * following format:
+ *
+ *  <instance type> <instance name> <ip> <port>
+ *
+ *  If the instance type is not master, than the additional string is
+ *  added to specify the originating master:
+ *
+ *  @ <master name> <master ip> <master port>
+ *
+ *  Any other specifier after "%@" is processed by printf itself.
+ */
+void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
+                   const char *fmt, ...) {
+    va_list ap;
+    char msg[REDIS_MAX_LOGMSG_LEN];
+    robj *channel, *payload;
+
+    /* Handle %@ */
+    if (fmt[0] == '%' && fmt[1] == '@') {
+        sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
+                                         NULL : ri->master;
+
+        if (master) {
+            snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
+                sentinelRedisInstanceTypeStr(ri),
+                ri->name, ri->addr->ip, ri->addr->port,
+                master->name, master->addr->ip, master->addr->port);
+        } else {
+            snprintf(msg, sizeof(msg), "%s %s %s %d",
+                sentinelRedisInstanceTypeStr(ri),
+                ri->name, ri->addr->ip, ri->addr->port);
+        }
+        fmt += 2;
+    } else {
+        msg[0] = '\0';
+    }
+
+    /* Use vsprintf for the rest of the formatting if any. */
+    if (fmt[0] != '\0') {
+        va_start(ap, fmt);
+        vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
+        va_end(ap);
+    }
+
+    /* Log the message if the log level allows it to be logged. */
+    if (level >= server.verbosity)
+        redisLog(level,"%s %s",type,msg);
+
+    /* Publish the message via Pub/Sub if it's not a debugging one. */
+    if (level != REDIS_DEBUG) {
+        channel = createStringObject(type,strlen(type));
+        payload = createStringObject(msg,strlen(msg));
+        pubsubPublishMessage(channel,payload);
+        decrRefCount(channel);
+        decrRefCount(payload);
+    }
+
+    /* Call the notification script if applicable. */
+    if (level == REDIS_WARNING && ri != NULL) {
+        sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
+                                         ri : ri->master;
+        if (master->notification_script) {
+            sentinelScheduleScriptExecution(master->notification_script,
+                type,msg,NULL);
+        }
+    }
+}
+
+/* ============================ script execution ============================ */
+
+/* Release a script job structure and all the associated data. */
+void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
+    int j = 0;
+
+    while(sj->argv[j]) sdsfree(sj->argv[j++]);
+    zfree(sj->argv);
+    zfree(sj);
+}
+
+#define SENTINEL_SCRIPT_MAX_ARGS 16
+void sentinelScheduleScriptExecution(char *path, ...) {
+    va_list ap;
+    char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
+    int argc = 1;
+    sentinelScriptJob *sj;
+
+    va_start(ap, path);
+    while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
+        argv[argc] = va_arg(ap,char*);
+        if (!argv[argc]) break;
+        argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
+        argc++;
+    }
+    va_end(ap);
+    argv[0] = sdsnew(path);
+    
+    sj = zmalloc(sizeof(*sj));
+    sj->flags = SENTINEL_SCRIPT_NONE;
+    sj->retry_num = 0;
+    sj->argv = zmalloc(sizeof(char*)*(argc+1));
+    sj->start_time = 0;
+    sj->pid = 0;
+    memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
+
+    listAddNodeTail(sentinel.scripts_queue,sj);
+
+    /* Remove the oldest non running script if we already hit the limit. */
+    if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
+        listNode *ln;
+        listIter li;
+
+        listRewind(sentinel.scripts_queue,&li);
+        while ((ln = listNext(&li)) != NULL) {
+            sj = ln->value;
+
+            if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
+            /* The first node is the oldest as we add on tail. */
+            listDelNode(sentinel.scripts_queue,ln);
+            sentinelReleaseScriptJob(sj);
+            break;
+        }
+        redisAssert(listLength(sentinel.scripts_queue) <=
+                    SENTINEL_SCRIPT_MAX_QUEUE);
+    }
+}
+
+/* Lookup a script in the scripts queue via pid, and returns the list node
+ * (so that we can easily remove it from the queue if needed). */
+listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
+    listNode *ln;
+    listIter li;
+
+    listRewind(sentinel.scripts_queue,&li);
+    while ((ln = listNext(&li)) != NULL) {
+        sentinelScriptJob *sj = ln->value;
+
+        if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
+            return ln;
+    }
+    return NULL;
+}
+
+/* Run pending scripts if we are not already at max number of running
+ * scripts. */
+void sentinelRunPendingScripts(void) {
+    listNode *ln;
+    listIter li;
+    mstime_t now = mstime();
+
+    /* Find jobs that are not running and run them, from the top to the
+     * tail of the queue, so we run older jobs first. */
+    listRewind(sentinel.scripts_queue,&li);
+    while (sentinel.running_scripts < SENTINEL_SCRIPT_MAX_RUNNING &&
+           (ln = listNext(&li)) != NULL)
+    {
+        sentinelScriptJob *sj = ln->value;
+        pid_t pid;
+
+        /* Skip if already running. */
+        if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
+
+        /* Skip if it's a retry, but not enough time has elapsed. */
+        if (sj->start_time && sj->start_time > now) continue;
+
+        sj->flags |= SENTINEL_SCRIPT_RUNNING;
+        sj->start_time = mstime();
+        sj->retry_num++;
+        pid = fork();
+
+        if (pid == -1) {
+            /* Parent (fork error).
+             * We report fork errors as signal 99, in order to unify the
+             * reporting with other kind of errors. */
+            sentinelEvent(REDIS_WARNING,"-script-error",NULL,
+                          "%s %d %d", sj->argv[0], 99, 0);
+            sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
+            sj->pid = 0;
+        } else if (pid == 0) {
+            /* Child */
+            execve(sj->argv[0],sj->argv,environ);
+            /* If we are here an error occurred. */
+            _exit(2); /* Don't retry execution. */
+        } else {
+            sentinel.running_scripts++;
+            sj->pid = pid;
+            sentinelEvent(REDIS_DEBUG,"+script-child",NULL,"%ld",(long)pid);
+        }
+    }
+}
+
+/* How much to delay the execution of a script that we need to retry after
+ * an error?
+ *
+ * We double the retry delay for every further retry we do. So for instance
+ * if RETRY_DELAY is set to 30 seconds and the max number of retries is 10
+ * starting from the second attempt to execute the script the delays are:
+ * 30 sec, 60 sec, 2 min, 4 min, 8 min, 16 min, 32 min, 64 min, 128 min. */
+mstime_t sentinelScriptRetryDelay(int retry_num) {
+    mstime_t delay = SENTINEL_SCRIPT_RETRY_DELAY;
+
+    while (retry_num-- > 1) delay *= 2;
+    return delay;
+}
+
+/* Check for scripts that terminated, and remove them from the queue if the
+ * script terminated successfully. If instead the script was terminated by
+ * a signal, or returned exit code "1", it is scheduled to run again if
+ * the max number of retries did not already elapsed. */
+void sentinelCollectTerminatedScripts(void) {
+    int statloc;
+    pid_t pid;
+
+    while ((pid = wait3(&statloc,WNOHANG,NULL)) > 0) {
+        int exitcode = WEXITSTATUS(statloc);
+        int bysignal = 0;
+        listNode *ln;
+        sentinelScriptJob *sj;
+
+        if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
+        sentinelEvent(REDIS_DEBUG,"-script-child",NULL,"%ld %d %d",
+            (long)pid, exitcode, bysignal);
+        
+        ln = sentinelGetScriptListNodeByPid(pid);
+        if (ln == NULL) {
+            redisLog(REDIS_WARNING,"wait3() returned a pid (%ld) we can't find in our scripts execution queue!", (long)pid);
+            continue;
+        }
+        sj = ln->value;
+
+        /* If the script was terminated by a signal or returns an
+         * exit code of "1" (that means: please retry), we reschedule it
+         * if the max number of retries is not already reached. */
+        if ((bysignal || exitcode == 1) &&
+            sj->retry_num != SENTINEL_SCRIPT_MAX_RETRY)
+        {
+            sj->flags &= ~SENTINEL_SCRIPT_RUNNING;
+            sj->pid = 0;
+            sj->start_time = mstime() +
+                             sentinelScriptRetryDelay(sj->retry_num);
+        } else {
+            /* Otherwise let's remove the script, but log the event if the
+             * execution did not terminated in the best of the ways. */
+            if (bysignal || exitcode != 0) {
+                sentinelEvent(REDIS_WARNING,"-script-error",NULL,
+                              "%s %d %d", sj->argv[0], bysignal, exitcode);
+            }
+            listDelNode(sentinel.scripts_queue,ln);
+            sentinelReleaseScriptJob(sj);
+            sentinel.running_scripts--;
+        }
+    }
+}
+
+/* Kill scripts in timeout, they'll be collected by the
+ * sentinelCollectTerminatedScripts() function. */
+void sentinelKillTimedoutScripts(void) {
+    listNode *ln;
+    listIter li;
+    mstime_t now = mstime();
+
+    listRewind(sentinel.scripts_queue,&li);
+    while ((ln = listNext(&li)) != NULL) {
+        sentinelScriptJob *sj = ln->value;
+
+        if (sj->flags & SENTINEL_SCRIPT_RUNNING &&
+            (now - sj->start_time) > SENTINEL_SCRIPT_MAX_RUNTIME)
+        {
+            sentinelEvent(REDIS_WARNING,"-script-timeout",NULL,"%s %ld",
+                sj->argv[0], (long)sj->pid);
+            kill(sj->pid,SIGKILL);
+        }
+    }
+}
+
+/* Implements SENTINEL PENDING-SCRIPTS command. */
+void sentinelPendingScriptsCommand(redisClient *c) {
+    listNode *ln;
+    listIter li;
+
+    addReplyMultiBulkLen(c,listLength(sentinel.scripts_queue));
+    listRewind(sentinel.scripts_queue,&li);
+    while ((ln = listNext(&li)) != NULL) {
+        sentinelScriptJob *sj = ln->value;
+        int j = 0;
+
+        addReplyMultiBulkLen(c,10);
+
+        addReplyBulkCString(c,"argv");
+        while (sj->argv[j]) j++;
+        addReplyMultiBulkLen(c,j);
+        j = 0;
+        while (sj->argv[j]) addReplyBulkCString(c,sj->argv[j++]);
+
+        addReplyBulkCString(c,"flags");
+        addReplyBulkCString(c,
+            (sj->flags & SENTINEL_SCRIPT_RUNNING) ? "running" : "scheduled");
+
+        addReplyBulkCString(c,"pid");
+        addReplyBulkLongLong(c,sj->pid);
+
+        if (sj->flags & SENTINEL_SCRIPT_RUNNING) {
+            addReplyBulkCString(c,"run-time");
+            addReplyBulkLongLong(c,mstime() - sj->start_time);
+        } else {
+            mstime_t delay = sj->start_time ? (sj->start_time-mstime()) : 0;
+            if (delay < 0) delay = 0;
+            addReplyBulkCString(c,"run-delay");
+            addReplyBulkLongLong(c,delay);
+        }
+
+        addReplyBulkCString(c,"retry-num");
+        addReplyBulkLongLong(c,sj->retry_num);
+    }
+}
+
+/* This function calls, if any, the client reconfiguration script with the
+ * following parameters:
+ *
+ * <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
+ *
+ * It is called every time a failover starts, ends, or is aborted.
+ *
+ * <state> is "start", "end" or "abort".
+ * <role> is either "leader" or "observer".
+ *
+ * from/to fields are respectively master -> promoted slave addresses for
+ * "start" and "end", or the reverse (promoted slave -> master) in case of
+ * "abort".
+ */
+void sentinelCallClientReconfScript(sentinelRedisInstance *master, int role, char *state, sentinelAddr *from, sentinelAddr *to) {
+    char fromport[32], toport[32];
+
+    if (master->client_reconfig_script == NULL) return;
+    ll2string(fromport,sizeof(fromport),from->port);
+    ll2string(toport,sizeof(toport),to->port);
+    sentinelScheduleScriptExecution(master->client_reconfig_script,
+        master->name,
+        (role == SENTINEL_LEADER) ? "leader" : "observer",
+        state, from->ip, fromport, to->ip, toport, NULL);
+}
+
+/* ========================== sentinelRedisInstance ========================= */
+
+/* Create a redis instance, the following fields must be populated by the
+ * caller if needed:
+ * runid: set to NULL but will be populated once INFO output is received.
+ * info_refresh: is set to 0 to mean that we never received INFO so far.
+ *
+ * If SRI_MASTER is set into initial flags the instance is added to
+ * sentinel.masters table.
+ *
+ * if SRI_SLAVE or SRI_SENTINEL is set then 'master' must be not NULL and the
+ * instance is added into master->slaves or master->sentinels table.
+ *
+ * If the instance is a slave or sentinel, the name parameter is ignored and
+ * is created automatically as hostname:port.
+ *
+ * The function fails if hostname can't be resolved or port is out of range.
+ * When this happens NULL is returned and errno is set accordingly to the
+ * createSentinelAddr() function.
+ *
+ * The function may also fail and return NULL with errno set to EBUSY if
+ * a master or slave with the same name already exists. */
+sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
+    sentinelRedisInstance *ri;
+    sentinelAddr *addr;
+    dict *table = NULL;
+    char slavename[128], *sdsname;
+
+    redisAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
+    redisAssert((flags & SRI_MASTER) || master != NULL);
+
+    /* Check address validity. */
+    addr = createSentinelAddr(hostname,port);
+    if (addr == NULL) return NULL;
+
+    /* For slaves and sentinel we use ip:port as name. */
+    if (flags & (SRI_SLAVE|SRI_SENTINEL)) {
+        snprintf(slavename,sizeof(slavename),"%s:%d",hostname,port);
+        name = slavename;
+    }
+
+    /* Make sure the entry is not duplicated. This may happen when the same
+     * name for a master is used multiple times inside the configuration or
+     * if we try to add multiple times a slave or sentinel with same ip/port
+     * to a master. */
+    if (flags & SRI_MASTER) table = sentinel.masters;
+    else if (flags & SRI_SLAVE) table = master->slaves;
+    else if (flags & SRI_SENTINEL) table = master->sentinels;
+    sdsname = sdsnew(name);
+    if (dictFind(table,sdsname)) {
+        sdsfree(sdsname);
+        errno = EBUSY;
+        return NULL;
+    }
+
+    /* Create the instance object. */
+    ri = zmalloc(sizeof(*ri));
+    /* Note that all the instances are started in the disconnected state,
+     * the event loop will take care of connecting them. */
+    ri->flags = flags | SRI_DISCONNECTED;
+    ri->name = sdsname;
+    ri->runid = NULL;
+    ri->addr = addr;
+    ri->cc = NULL;
+    ri->pc = NULL;
+    ri->pending_commands = 0;
+    ri->cc_conn_time = 0;
+    ri->pc_conn_time = 0;
+    ri->pc_last_activity = 0;
+    ri->last_avail_time = mstime();
+    ri->last_pong_time = mstime();
+    ri->last_pub_time = mstime();
+    ri->last_hello_time = mstime();
+    ri->last_master_down_reply_time = mstime();
+    ri->s_down_since_time = 0;
+    ri->o_down_since_time = 0;
+    ri->down_after_period = master ? master->down_after_period :
+                            SENTINEL_DOWN_AFTER_PERIOD;
+    ri->master_link_down_time = 0;
+    ri->auth_pass = NULL;
+    ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
+    ri->slave_reconf_sent_time = 0;
+    ri->slave_master_host = NULL;
+    ri->slave_master_port = 0;
+    ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
+    ri->sentinels = dictCreate(&instancesDictType,NULL);
+    ri->quorum = quorum;
+    ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
+    ri->master = master;
+    ri->slaves = dictCreate(&instancesDictType,NULL);
+    ri->info_refresh = 0;
+
+    /* Failover state. */
+    ri->leader = NULL;
+    ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+    ri->failover_state_change_time = 0;
+    ri->failover_start_time = 0;
+    ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
+    ri->promoted_slave = NULL;
+    ri->notification_script = NULL;
+    ri->client_reconfig_script = NULL;
+
+    /* Add into the right table. */
+    dictAdd(table, ri->name, ri);
+    return ri;
+}
+
+/* Release this instance and all its slaves, sentinels, hiredis connections.
+ * This function also takes care of unlinking the instance from the main
+ * masters table (if it is a master) or from its master sentinels/slaves table
+ * if it is a slave or sentinel. */
+void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
+    /* Release all its slaves or sentinels if any. */
+    dictRelease(ri->sentinels);
+    dictRelease(ri->slaves);
+
+    /* Release hiredis connections. */
+    if (ri->cc) sentinelKillLink(ri,ri->cc);
+    if (ri->pc) sentinelKillLink(ri,ri->pc);
+
+    /* Free other resources. */
+    sdsfree(ri->name);
+    sdsfree(ri->runid);
+    sdsfree(ri->notification_script);
+    sdsfree(ri->client_reconfig_script);
+    sdsfree(ri->slave_master_host);
+    sdsfree(ri->leader);
+    sdsfree(ri->auth_pass);
+    releaseSentinelAddr(ri->addr);
+
+    /* Clear state into the master if needed. */
+    if ((ri->flags & SRI_SLAVE) && (ri->flags & SRI_PROMOTED) && ri->master)
+        ri->master->promoted_slave = NULL;
+
+    zfree(ri);
+}
+
+/* Lookup a slave in a master Redis instance, by ip and port. */
+sentinelRedisInstance *sentinelRedisInstanceLookupSlave(
+                sentinelRedisInstance *ri, char *ip, int port)
+{
+    sds key;
+    sentinelRedisInstance *slave;
+  
+    redisAssert(ri->flags & SRI_MASTER);
+    key = sdscatprintf(sdsempty(),"%s:%d",ip,port);
+    slave = dictFetchValue(ri->slaves,key);
+    sdsfree(key);
+    return slave;
+}
+
+/* Return the name of the type of the instance as a string. */
+const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri) {
+    if (ri->flags & SRI_MASTER) return "master";
+    else if (ri->flags & SRI_SLAVE) return "slave";
+    else if (ri->flags & SRI_SENTINEL) return "sentinel";
+    else return "unknown";
+}
+
+/* This function removes all the instances found in the dictionary of instances
+ * 'd', having either:
+ * 
+ * 1) The same ip/port as specified.
+ * 2) The same runid.
+ *
+ * "1" and "2" don't need to verify at the same time, just one is enough.
+ * If "runid" is NULL it is not checked.
+ * Similarly if "ip" is NULL it is not checked.
+ *
+ * This function is useful because every time we add a new Sentinel into
+ * a master's Sentinels dictionary, we want to be very sure about not
+ * having duplicated instances for any reason. This is so important because
+ * we use those other sentinels in order to run our quorum protocol to
+ * understand if it's time to proceeed with the fail over.
+ *
+ * Making sure no duplication is possible we greately improve the robustness
+ * of the quorum (otherwise we may end counting the same instance multiple
+ * times for some reason).
+ *
+ * The function returns the number of Sentinels removed. */
+int removeMatchingSentinelsFromMaster(sentinelRedisInstance *master, char *ip, int port, char *runid) {
+    dictIterator *di;
+    dictEntry *de;
+    int removed = 0;
+
+    di = dictGetSafeIterator(master->sentinels);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+
+        if ((ri->runid && runid && strcmp(ri->runid,runid) == 0) ||
+            (ip && strcmp(ri->addr->ip,ip) == 0 && port == ri->addr->port))
+        {
+            dictDelete(master->sentinels,ri->name);
+            removed++;
+        }
+    }
+    dictReleaseIterator(di);
+    return removed;
+}
+
+/* Search an instance with the same runid, ip and port into a dictionary
+ * of instances. Return NULL if not found, otherwise return the instance
+ * pointer.
+ *
+ * runid or ip can be NULL. In such a case the search is performed only
+ * by the non-NULL field. */
+sentinelRedisInstance *getSentinelRedisInstanceByAddrAndRunID(dict *instances, char *ip, int port, char *runid) {
+    dictIterator *di;
+    dictEntry *de;
+    sentinelRedisInstance *instance = NULL;
+
+    redisAssert(ip || runid);   /* User must pass at least one search param. */
+    di = dictGetIterator(instances);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+
+        if (runid && !ri->runid) continue;
+        if ((runid == NULL || strcmp(ri->runid, runid) == 0) &&
+            (ip == NULL || (strcmp(ri->addr->ip, ip) == 0 &&
+                            ri->addr->port == port)))
+        {
+            instance = ri;
+            break;
+        }
+    }
+    dictReleaseIterator(di);
+    return instance;
+}
+
+/* Simple master lookup by name */
+sentinelRedisInstance *sentinelGetMasterByName(char *name) {
+    sentinelRedisInstance *ri;
+    sds sdsname = sdsnew(name);
+
+    ri = dictFetchValue(sentinel.masters,sdsname);
+    sdsfree(sdsname);
+    return ri;
+}
+
+/* Add the specified flags to all the instances in the specified dictionary. */
+void sentinelAddFlagsToDictOfRedisInstances(dict *instances, int flags) {
+    dictIterator *di;
+    dictEntry *de;
+
+    di = dictGetIterator(instances);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+        ri->flags |= flags;
+    }
+    dictReleaseIterator(di);
+}
+
+/* Remove the specified flags to all the instances in the specified
+ * dictionary. */
+void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
+    dictIterator *di;
+    dictEntry *de;
+
+    di = dictGetIterator(instances);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+        ri->flags &= ~flags;
+    }
+    dictReleaseIterator(di);
+}
+
+/* Reset the state of a monitored master:
+ * 1) Remove all slaves.
+ * 2) Remove all sentinels.
+ * 3) Remove most of the flags resulting from runtime operations.
+ * 4) Reset timers to their default value.
+ * 5) In the process of doing this undo the failover if in progress.
+ * 6) Disconnect the connections with the master (will reconnect automatically).
+ */
+void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
+    redisAssert(ri->flags & SRI_MASTER);
+    dictRelease(ri->slaves);
+    dictRelease(ri->sentinels);
+    ri->slaves = dictCreate(&instancesDictType,NULL);
+    ri->sentinels = dictCreate(&instancesDictType,NULL);
+    if (ri->cc) sentinelKillLink(ri,ri->cc);
+    if (ri->pc) sentinelKillLink(ri,ri->pc);
+    ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
+    if (ri->leader) {
+        sdsfree(ri->leader);
+        ri->leader = NULL;
+    }
+    ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+    ri->failover_state_change_time = 0;
+    ri->failover_start_time = 0;
+    ri->promoted_slave = NULL;
+    sdsfree(ri->runid);
+    sdsfree(ri->slave_master_host);
+    ri->runid = NULL;
+    ri->slave_master_host = NULL;
+    ri->last_avail_time = mstime();
+    ri->last_pong_time = mstime();
+    if (flags & SENTINEL_GENERATE_EVENT)
+        sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
+}
+
+/* Call sentinelResetMaster() on every master with a name matching the specified
+ * pattern. */
+int sentinelResetMastersByPattern(char *pattern, int flags) {
+    dictIterator *di;
+    dictEntry *de;
+    int reset = 0;
+
+    di = dictGetIterator(sentinel.masters);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+
+        if (ri->name) {
+            if (stringmatch(pattern,ri->name,0)) {
+                sentinelResetMaster(ri,flags);
+                reset++;
+            }
+        }
+    }
+    dictReleaseIterator(di);
+    return reset;
+}
+
+/* Reset the specified master with sentinelResetMaster(), and also change
+ * the ip:port address, but take the name of the instance unmodified.
+ *
+ * This is used to handle the +switch-master and +redirect-to-master events.
+ *
+ * The function returns REDIS_ERR if the address can't be resolved for some
+ * reason. Otherwise REDIS_OK is returned.
+ *
+ * TODO: make this reset so that original sentinels are re-added with
+ * same ip / port / runid.
+ */
+
+int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
+    sentinelAddr *oldaddr, *newaddr;
+
+    newaddr = createSentinelAddr(ip,port);
+    if (newaddr == NULL) return REDIS_ERR;
+    sentinelResetMaster(master,SENTINEL_NO_FLAGS);
+    oldaddr = master->addr;
+    master->addr = newaddr;
+    /* Release the old address at the end so we are safe even if the function
+     * gets the master->addr->ip and master->addr->port as arguments. */
+    releaseSentinelAddr(oldaddr);
+    return REDIS_OK;
+}
+
+/* ============================ Config handling ============================= */
+char *sentinelHandleConfiguration(char **argv, int argc) {
+    sentinelRedisInstance *ri;
+
+    if (!strcasecmp(argv[0],"monitor") && argc == 5) {
+        /* monitor <name> <host> <port> <quorum> */
+        int quorum = atoi(argv[4]);
+
+        if (quorum <= 0) return "Quorum must be 1 or greater.";
+        if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
+                                        atoi(argv[3]),quorum,NULL) == NULL)
+        {
+            switch(errno) {
+            case EBUSY: return "Duplicated master name.";
+            case ENOENT: return "Can't resolve master instance hostname.";
+            case EINVAL: return "Invalid port number";
+            }
+        }
+    } else if (!strcasecmp(argv[0],"down-after-milliseconds") && argc == 3) {
+        /* down-after-milliseconds <name> <milliseconds> */
+        ri = sentinelGetMasterByName(argv[1]);
+        if (!ri) return "No such master with specified name.";
+        ri->down_after_period = atoi(argv[2]);
+        if (ri->down_after_period <= 0)
+            return "negative or zero time parameter.";
+    } else if (!strcasecmp(argv[0],"failover-timeout") && argc == 3) {
+        /* failover-timeout <name> <milliseconds> */
+        ri = sentinelGetMasterByName(argv[1]);
+        if (!ri) return "No such master with specified name.";
+        ri->failover_timeout = atoi(argv[2]);
+        if (ri->failover_timeout <= 0)
+            return "negative or zero time parameter.";
+    } else if (!strcasecmp(argv[0],"can-failover") && argc == 3) {
+        /* can-failover <name> <yes/no> */
+        int yesno = yesnotoi(argv[2]);
+
+        ri = sentinelGetMasterByName(argv[1]);
+        if (!ri) return "No such master with specified name.";
+        if (yesno == -1) return "Argument must be either yes or no.";
+        if (yesno)
+            ri->flags |= SRI_CAN_FAILOVER;
+        else
+            ri->flags &= ~SRI_CAN_FAILOVER;
+   } else if (!strcasecmp(argv[0],"parallel-syncs") && argc == 3) {
+        /* parallel-syncs <name> <milliseconds> */
+        ri = sentinelGetMasterByName(argv[1]);
+        if (!ri) return "No such master with specified name.";
+        ri->parallel_syncs = atoi(argv[2]);
+   } else if (!strcasecmp(argv[0],"notification-script") && argc == 3) {
+        /* notification-script <name> <path> */
+        ri = sentinelGetMasterByName(argv[1]);
+        if (!ri) return "No such master with specified name.";
+        if (access(argv[2],X_OK) == -1)
+            return "Notification script seems non existing or non executable.";
+        ri->notification_script = sdsnew(argv[2]);
+   } else if (!strcasecmp(argv[0],"client-reconfig-script") && argc == 3) {
+        /* client-reconfig-script <name> <path> */
+        ri = sentinelGetMasterByName(argv[1]);
+        if (!ri) return "No such master with specified name.";
+        if (access(argv[2],X_OK) == -1)
+            return "Client reconfiguration script seems non existing or "
+                   "non executable.";
+        ri->client_reconfig_script = sdsnew(argv[2]);
+   } else if (!strcasecmp(argv[0],"auth-pass") && argc == 3) {
+        /* auth-pass <name> <password> */
+        ri = sentinelGetMasterByName(argv[1]);
+        if (!ri) return "No such master with specified name.";
+        ri->auth_pass = sdsnew(argv[2]);
+    } else {
+        return "Unrecognized sentinel configuration statement.";
+    }
+    return NULL;
+}
+
+/* ====================== hiredis connection handling ======================= */
+
+/* Completely disconnect an hiredis link from an instance. */
+void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
+    if (ri->cc == c) {
+        ri->cc = NULL;
+        ri->pending_commands = 0;
+    }
+    if (ri->pc == c) ri->pc = NULL;
+    c->data = NULL;
+    ri->flags |= SRI_DISCONNECTED;
+    redisAsyncFree(c);
+}
+
+/* This function takes an hiredis context that is in an error condition
+ * and make sure to mark the instance as disconnected performing the
+ * cleanup needed.
+ *
+ * Note: we don't free the hiredis context as hiredis will do it for us
+ * for async conenctions. */
+void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c) {
+    sentinelRedisInstance *ri = c->data;
+    int pubsub;
+
+    if (ri == NULL) return; /* The instance no longer exists. */
+
+    pubsub = (ri->pc == c);
+    sentinelEvent(REDIS_DEBUG, pubsub ? "-pubsub-link" : "-cmd-link", ri,
+        "%@ #%s", c->errstr);
+    if (pubsub)
+        ri->pc = NULL;
+    else
+        ri->cc = NULL;
+    ri->flags |= SRI_DISCONNECTED;
+}
+
+void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
+    if (status != REDIS_OK) {
+        sentinelDisconnectInstanceFromContext(c);
+    } else {
+        sentinelRedisInstance *ri = c->data;
+        int pubsub = (ri->pc == c);
+
+        sentinelEvent(REDIS_DEBUG, pubsub ? "+pubsub-link" : "+cmd-link", ri,
+            "%@");
+    }
+}
+
+void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
+    sentinelDisconnectInstanceFromContext(c);
+}
+
+/* Send the AUTH command with the specified master password if needed.
+ * Note that for slaves the password set for the master is used.
+ *
+ * We don't check at all if the command was successfully transmitted
+ * to the instance as if it fails Sentinel will detect the instance down,
+ * will disconnect and reconnect the link and so forth. */
+void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
+    char *auth_pass = (ri->flags & SRI_MASTER) ? ri->auth_pass :
+                                                 ri->master->auth_pass;
+
+    if (auth_pass)
+        redisAsyncCommand(c, sentinelDiscardReplyCallback, NULL, "AUTH %s",
+            auth_pass);
+}
+
+/* Create the async connections for the specified instance if the instance
+ * is disconnected. Note that the SRI_DISCONNECTED flag is set even if just
+ * one of the two links (commands and pub/sub) is missing. */
+void sentinelReconnectInstance(sentinelRedisInstance *ri) {
+    if (!(ri->flags & SRI_DISCONNECTED)) return;
+
+    /* Commands connection. */
+    if (ri->cc == NULL) {
+        ri->cc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
+        if (ri->cc->err) {
+            sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
+                ri->cc->errstr);
+            sentinelKillLink(ri,ri->cc);
+        } else {
+            ri->cc_conn_time = mstime();
+            ri->cc->data = ri;
+            redisAeAttach(server.el,ri->cc);
+            redisAsyncSetConnectCallback(ri->cc,
+                                            sentinelLinkEstablishedCallback);
+            redisAsyncSetDisconnectCallback(ri->cc,
+                                            sentinelDisconnectCallback);
+            sentinelSendAuthIfNeeded(ri,ri->cc);
+        }
+    }
+    /* Pub / Sub */
+    if ((ri->flags & SRI_MASTER) && ri->pc == NULL) {
+        ri->pc = redisAsyncConnect(ri->addr->ip,ri->addr->port);
+        if (ri->pc->err) {
+            sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
+                ri->pc->errstr);
+            sentinelKillLink(ri,ri->pc);
+        } else {
+            int retval;
+
+            ri->pc_conn_time = mstime();
+            ri->pc->data = ri;
+            redisAeAttach(server.el,ri->pc);
+            redisAsyncSetConnectCallback(ri->pc,
+                                            sentinelLinkEstablishedCallback);
+            redisAsyncSetDisconnectCallback(ri->pc,
+                                            sentinelDisconnectCallback);
+            sentinelSendAuthIfNeeded(ri,ri->pc);
+            /* Now we subscribe to the Sentinels "Hello" channel. */
+            retval = redisAsyncCommand(ri->pc,
+                sentinelReceiveHelloMessages, NULL, "SUBSCRIBE %s",
+                    SENTINEL_HELLO_CHANNEL);
+            if (retval != REDIS_OK) {
+                /* If we can't subscribe, the Pub/Sub connection is useless
+                 * and we can simply disconnect it and try again. */
+                sentinelKillLink(ri,ri->pc);
+                return;
+            }
+        }
+    }
+    /* Clear the DISCONNECTED flags only if we have both the connections
+     * (or just the commands connection if this is a slave or a
+     * sentinel instance). */
+    if (ri->cc && (ri->flags & (SRI_SLAVE|SRI_SENTINEL) || ri->pc))
+        ri->flags &= ~SRI_DISCONNECTED;
+}
+
+/* ======================== Redis instances pinging  ======================== */
+
+/* Process the INFO output from masters. */
+void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
+    sds *lines;
+    int numlines, j;
+    int role = 0;
+    int runid_changed = 0;  /* true if runid changed. */
+    int first_runid = 0;    /* true if this is the first runid we receive. */
+
+    /* The following fields must be reset to a given value in the case they
+     * are not found at all in the INFO output. */
+    ri->master_link_down_time = 0;
+
+    /* Process line by line. */
+    lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
+    for (j = 0; j < numlines; j++) {
+        sentinelRedisInstance *slave;
+        sds l = lines[j];
+
+        /* run_id:<40 hex chars>*/
+        if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
+            if (ri->runid == NULL) {
+                ri->runid = sdsnewlen(l+7,40);
+                first_runid = 1;
+            } else {
+                if (strncmp(ri->runid,l+7,40) != 0) {
+                    runid_changed = 1;
+                    sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");
+                    sdsfree(ri->runid);
+                    ri->runid = sdsnewlen(l+7,40);
+                }
+            }
+        }
+
+        /* slave0:<ip>,<port>,<state> */
+        if ((ri->flags & SRI_MASTER) &&
+            sdslen(l) >= 7 &&
+            !memcmp(l,"slave",5) && isdigit(l[5]))
+        {
+            char *ip, *port, *end;
+
+            ip = strchr(l,':'); if (!ip) continue;
+            ip++; /* Now ip points to start of ip address. */
+            port = strchr(ip,','); if (!port) continue;
+            *port = '\0'; /* nul term for easy access. */
+            port++; /* Now port points to start of port number. */
+            end = strchr(port,','); if (!end) continue;
+            *end = '\0'; /* nul term for easy access. */
+
+            /* Check if we already have this slave into our table,
+             * otherwise add it. */
+            if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
+                if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
+                            atoi(port), ri->quorum,ri)) != NULL)
+                {
+                    sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
+                }
+            }
+        }
+
+        /* master_link_down_since_seconds:<seconds> */
+        if (sdslen(l) >= 32 &&
+            !memcmp(l,"master_link_down_since_seconds",30))
+        {
+            ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
+        }
+
+        /* role:<role> */
+        if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
+        else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
+
+        if (role == SRI_SLAVE) {
+            /* master_host:<host> */
+            if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
+                sdsfree(ri->slave_master_host);
+                ri->slave_master_host = sdsnew(l+12);
+            }
+
+            /* master_port:<port> */
+            if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12))
+                ri->slave_master_port = atoi(l+12);
+            
+            /* master_link_status:<status> */
+            if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
+                ri->slave_master_link_status =
+                    (strcasecmp(l+19,"up") == 0) ?
+                    SENTINEL_MASTER_LINK_STATUS_UP :
+                    SENTINEL_MASTER_LINK_STATUS_DOWN;
+            }
+
+            /* slave_priority:<priority> */
+            if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
+                ri->slave_priority = atoi(l+15);
+        }
+    }
+    ri->info_refresh = mstime();
+    sdsfreesplitres(lines,numlines);
+
+    /* ---------------------------- Acting half ----------------------------- */
+    if (sentinel.tilt) return;
+
+    /* Act if a master turned into a slave. */
+    if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
+        if ((first_runid || runid_changed) && ri->slave_master_host) {
+            /* If it is the first time we receive INFO from it, but it's
+             * a slave while it was configured as a master, we want to monitor
+             * its master instead. */
+            sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
+                "%s %s %d %s %d",
+                ri->name, ri->addr->ip, ri->addr->port,
+                ri->slave_master_host, ri->slave_master_port);
+            sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
+                                                   ri->slave_master_port);
+            return;
+        }
+    }
+
+    /* Act if a slave turned into a master. */
+    if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
+        if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+            (runid_changed || first_runid))
+        {
+            /* If a slave turned into master but:
+             *
+             * 1) Failover not in progress.
+             * 2) RunID hs changed, or its the first time we see an INFO output.
+             * 
+             * We assume this is a reboot with a wrong configuration.
+             * Log the event and remove the slave. */
+            int retval;
+
+            sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
+            retval = dictDelete(ri->master->slaves,ri->name);
+            redisAssert(retval == REDIS_OK);
+            return;
+        } else if (ri->flags & SRI_PROMOTED) {
+            /* If this is a promoted slave we can change state to the
+             * failover state machine. */
+            if ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+                (ri->master->flags & SRI_I_AM_THE_LEADER) &&
+                (ri->master->failover_state ==
+                    SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
+            {
+                ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
+                ri->master->failover_state_change_time = mstime();
+                sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
+                sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
+                    ri->master,"%@");
+                sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
+                    "start",ri->master->addr,ri->addr);
+            }
+        } else if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) ||
+                    ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+                     (ri->master->flags & SRI_I_AM_THE_LEADER) &&
+                     ri->master->failover_state ==
+                     SENTINEL_FAILOVER_STATE_WAIT_START))
+        {
+            /* No failover in progress? Then it is the start of a failover
+             * and we are an observer.
+             *
+             * We also do that if we are a leader doing a failover, in wait
+             * start, but well, somebody else started before us. */
+
+            if (ri->master->flags & SRI_FAILOVER_IN_PROGRESS) {
+                sentinelEvent(REDIS_WARNING,"-failover-abort-race",
+                                ri->master, "%@");
+                sentinelAbortFailover(ri->master);
+            }
+
+            ri->master->flags |= SRI_FAILOVER_IN_PROGRESS;
+            sentinelEvent(REDIS_WARNING,"+failover-detected",ri->master,"%@");
+            ri->master->failover_state = SENTINEL_FAILOVER_STATE_DETECT_END;
+            ri->master->failover_state_change_time = mstime();
+            ri->master->promoted_slave = ri;
+            ri->flags |= SRI_PROMOTED;
+            sentinelCallClientReconfScript(ri->master,SENTINEL_OBSERVER,
+                "start", ri->master->addr,ri->addr);
+            /* We are an observer, so we can only assume that the leader
+             * is reconfiguring the slave instances. For this reason we
+             * set all the instances as RECONF_SENT waiting for progresses
+             * on this side. */
+            sentinelAddFlagsToDictOfRedisInstances(ri->master->slaves,
+                SRI_RECONF_SENT);
+        }
+    }
+
+    /* Detect if the slave that is in the process of being reconfigured
+     * changed state. */
+    if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
+        (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
+    {
+        /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
+        if ((ri->flags & SRI_RECONF_SENT) &&
+            ri->slave_master_host &&
+            strcmp(ri->slave_master_host,
+                    ri->master->promoted_slave->addr->ip) == 0 &&
+            ri->slave_master_port == ri->master->promoted_slave->addr->port)
+        {
+            ri->flags &= ~SRI_RECONF_SENT;
+            ri->flags |= SRI_RECONF_INPROG;
+            sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
+        }
+
+        /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
+        if ((ri->flags & SRI_RECONF_INPROG) &&
+            ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
+        {
+            ri->flags &= ~SRI_RECONF_INPROG;
+            ri->flags |= SRI_RECONF_DONE;
+            sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
+            /* If we are moving forward (a new slave is now configured)
+             * we update the change_time as we are conceptually passing
+             * to the next slave. */
+            ri->failover_state_change_time = mstime();
+        }
+    }
+}
+
+void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+    redisReply *r;
+
+    if (ri) ri->pending_commands--;
+    if (!reply || !ri) return;
+    r = reply;
+
+    if (r->type == REDIS_REPLY_STRING) {
+        sentinelRefreshInstanceInfo(ri,r->str);
+    }
+}
+
+/* Just discard the reply. We use this when we are not monitoring the return
+ * value of the command but its effects directly. */
+void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+
+    if (ri) ri->pending_commands--;
+}
+
+void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+    redisReply *r;
+
+    if (ri) ri->pending_commands--;
+    if (!reply || !ri) return;
+    r = reply;
+
+    if (r->type == REDIS_REPLY_STATUS ||
+        r->type == REDIS_REPLY_ERROR) {
+        /* Update the "instance available" field only if this is an
+         * acceptable reply. */
+        if (strncmp(r->str,"PONG",4) == 0 ||
+            strncmp(r->str,"LOADING",7) == 0 ||
+            strncmp(r->str,"MASTERDOWN",10) == 0)
+        {
+            ri->last_avail_time = mstime();
+        } else {
+            /* Send a SCRIPT KILL command if the instance appears to be
+             * down because of a busy script. */
+            if (strncmp(r->str,"BUSY",4) == 0 &&
+                (ri->flags & SRI_S_DOWN) &&
+                !(ri->flags & SRI_SCRIPT_KILL_SENT))
+            {
+                redisAsyncCommand(ri->cc,
+                    sentinelDiscardReplyCallback, NULL, "SCRIPT KILL");
+                ri->flags |= SRI_SCRIPT_KILL_SENT;
+            }
+        }
+    }
+    ri->last_pong_time = mstime();
+}
+
+/* This is called when we get the reply about the PUBLISH command we send
+ * to the master to advertise this sentinel. */
+void sentinelPublishReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+    redisReply *r;
+
+    if (ri) ri->pending_commands--;
+    if (!reply || !ri) return;
+    r = reply;
+
+    /* Only update pub_time if we actually published our message. Otherwise
+     * we'll retry against in 100 milliseconds. */
+    if (r->type != REDIS_REPLY_ERROR)
+        ri->last_pub_time = mstime();
+}
+
+/* This is our Pub/Sub callback for the Hello channel. It's useful in order
+ * to discover other sentinels attached at the same master. */
+void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+    redisReply *r;
+
+    if (!reply || !ri) return;
+    r = reply;
+
+    /* Update the last activity in the pubsub channel. Note that since we
+     * receive our messages as well this timestamp can be used to detect
+     * if the link is probably diconnected even if it seems otherwise. */
+    ri->pc_last_activity = mstime();
+   
+    /* Sanity check in the reply we expect, so that the code that follows
+     * can avoid to check for details. */
+    if (r->type != REDIS_REPLY_ARRAY ||
+        r->elements != 3 ||
+        r->element[0]->type != REDIS_REPLY_STRING ||
+        r->element[1]->type != REDIS_REPLY_STRING ||
+        r->element[2]->type != REDIS_REPLY_STRING ||
+        strcmp(r->element[0]->str,"message") != 0) return;
+
+    /* We are not interested in meeting ourselves */
+    if (strstr(r->element[2]->str,server.runid) != NULL) return;
+
+    {
+        int numtokens, port, removed, canfailover;
+        char **token = sdssplitlen(r->element[2]->str,
+                                   r->element[2]->len,
+                                   ":",1,&numtokens);
+        sentinelRedisInstance *sentinel;
+
+        if (numtokens == 4) {
+            /* First, try to see if we already have this sentinel. */
+            port = atoi(token[1]);
+            canfailover = atoi(token[3]);
+            sentinel = getSentinelRedisInstanceByAddrAndRunID(
+                            ri->sentinels,token[0],port,token[2]);
+
+            if (!sentinel) {
+                /* If not, remove all the sentinels that have the same runid
+                 * OR the same ip/port, because it's either a restart or a
+                 * network topology change. */
+                removed = removeMatchingSentinelsFromMaster(ri,token[0],port,
+                                token[2]);
+                if (removed) {
+                    sentinelEvent(REDIS_NOTICE,"-dup-sentinel",ri,
+                        "%@ #duplicate of %s:%d or %s",
+                        token[0],port,token[2]);
+                }
+
+                /* Add the new sentinel. */
+                sentinel = createSentinelRedisInstance(NULL,SRI_SENTINEL,
+                                token[0],port,ri->quorum,ri);
+                if (sentinel) {
+                    sentinelEvent(REDIS_NOTICE,"+sentinel",sentinel,"%@");
+                    /* The runid is NULL after a new instance creation and
+                     * for Sentinels we don't have a later chance to fill it,
+                     * so do it now. */
+                    sentinel->runid = sdsnew(token[2]);
+                }
+            }
+
+            /* Update the state of the Sentinel. */
+            if (sentinel) {
+                sentinel->last_hello_time = mstime();
+                if (canfailover)
+                    sentinel->flags |= SRI_CAN_FAILOVER;
+                else
+                    sentinel->flags &= ~SRI_CAN_FAILOVER;
+            }
+        }
+        sdsfreesplitres(token,numtokens);
+    }
+}
+
+void sentinelPingInstance(sentinelRedisInstance *ri) {
+    mstime_t now = mstime();
+    mstime_t info_period;
+    int retval;
+
+    /* Return ASAP if we have already a PING or INFO already pending, or
+     * in the case the instance is not properly connected. */
+    if (ri->flags & SRI_DISCONNECTED) return;
+
+    /* For INFO, PING, PUBLISH that are not critical commands to send we
+     * also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
+     * want to use a lot of memory just because a link is not working
+     * properly (note that anyway there is a redundant protection about this,
+     * that is, the link will be disconnected and reconnected if a long
+     * timeout condition is detected. */
+    if (ri->pending_commands >= SENTINEL_MAX_PENDING_COMMANDS) return;
+
+    /* If this is a slave of a master in O_DOWN condition we start sending
+     * it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
+     * period. In this state we want to closely monitor slaves in case they
+     * are turned into masters by another Sentinel, or by the sysadmin. */
+    if ((ri->flags & SRI_SLAVE) &&
+        (ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) {
+        info_period = 1000;
+    } else {
+        info_period = SENTINEL_INFO_PERIOD;
+    }
+
+    if ((ri->flags & SRI_SENTINEL) == 0 &&
+        (ri->info_refresh == 0 ||
+        (now - ri->info_refresh) > info_period))
+    {
+        /* Send INFO to masters and slaves, not sentinels. */
+        retval = redisAsyncCommand(ri->cc,
+            sentinelInfoReplyCallback, NULL, "INFO");
+        if (retval != REDIS_OK) return;
+        ri->pending_commands++;
+    } else if ((now - ri->last_pong_time) > SENTINEL_PING_PERIOD) {
+        /* Send PING to all the three kinds of instances. */
+        retval = redisAsyncCommand(ri->cc,
+            sentinelPingReplyCallback, NULL, "PING");
+        if (retval != REDIS_OK) return;
+        ri->pending_commands++;
+    } else if ((ri->flags & SRI_MASTER) &&
+               (now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD)
+    {
+        /* PUBLISH hello messages only to masters. */
+        struct sockaddr_in sa;
+        socklen_t salen = sizeof(sa);
+
+        if (getsockname(ri->cc->c.fd,(struct sockaddr*)&sa,&salen) != -1) {
+            char myaddr[128];
+
+            snprintf(myaddr,sizeof(myaddr),"%s:%d:%s:%d",
+                inet_ntoa(sa.sin_addr), server.port, server.runid,
+                (ri->flags & SRI_CAN_FAILOVER) != 0);
+            retval = redisAsyncCommand(ri->cc,
+                sentinelPublishReplyCallback, NULL, "PUBLISH %s %s",
+                    SENTINEL_HELLO_CHANNEL,myaddr);
+            if (retval != REDIS_OK) return;
+            ri->pending_commands++;
+        }
+    }
+}
+
+/* =========================== SENTINEL command ============================= */
+
+const char *sentinelFailoverStateStr(int state) {
+    switch(state) {
+    case SENTINEL_FAILOVER_STATE_NONE: return "none";
+    case SENTINEL_FAILOVER_STATE_WAIT_START: return "wait_start";
+    case SENTINEL_FAILOVER_STATE_SELECT_SLAVE: return "select_slave";
+    case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE: return "send_slaveof_noone";
+    case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION: return "wait_promotion";
+    case SENTINEL_FAILOVER_STATE_RECONF_SLAVES: return "reconf_slaves";
+    case SENTINEL_FAILOVER_STATE_ALERT_CLIENTS: return "alert_clients";
+    case SENTINEL_FAILOVER_STATE_DETECT_END: return "detect_end";
+    case SENTINEL_FAILOVER_STATE_UPDATE_CONFIG: return "update_config";
+    default: return "unknown";
+    }
+}
+
+/* Redis instance to Redis protocol representation. */
+void addReplySentinelRedisInstance(redisClient *c, sentinelRedisInstance *ri) {
+    char *flags = sdsempty();
+    void *mbl;
+    int fields = 0;
+
+    mbl = addDeferredMultiBulkLength(c);
+
+    addReplyBulkCString(c,"name");
+    addReplyBulkCString(c,ri->name);
+    fields++;
+
+    addReplyBulkCString(c,"ip");
+    addReplyBulkCString(c,ri->addr->ip);
+    fields++;
+
+    addReplyBulkCString(c,"port");
+    addReplyBulkLongLong(c,ri->addr->port);
+    fields++;
+
+    addReplyBulkCString(c,"runid");
+    addReplyBulkCString(c,ri->runid ? ri->runid : "");
+    fields++;
+
+    addReplyBulkCString(c,"flags");
+    if (ri->flags & SRI_S_DOWN) flags = sdscat(flags,"s_down,");
+    if (ri->flags & SRI_O_DOWN) flags = sdscat(flags,"o_down,");
+    if (ri->flags & SRI_MASTER) flags = sdscat(flags,"master,");
+    if (ri->flags & SRI_SLAVE) flags = sdscat(flags,"slave,");
+    if (ri->flags & SRI_SENTINEL) flags = sdscat(flags,"sentinel,");
+    if (ri->flags & SRI_DISCONNECTED) flags = sdscat(flags,"disconnected,");
+    if (ri->flags & SRI_MASTER_DOWN) flags = sdscat(flags,"master_down,");
+    if (ri->flags & SRI_FAILOVER_IN_PROGRESS)
+        flags = sdscat(flags,"failover_in_progress,");
+    if (ri->flags & SRI_I_AM_THE_LEADER)
+        flags = sdscat(flags,"i_am_the_leader,");
+    if (ri->flags & SRI_PROMOTED) flags = sdscat(flags,"promoted,");
+    if (ri->flags & SRI_RECONF_SENT) flags = sdscat(flags,"reconf_sent,");
+    if (ri->flags & SRI_RECONF_INPROG) flags = sdscat(flags,"reconf_inprog,");
+    if (ri->flags & SRI_RECONF_DONE) flags = sdscat(flags,"reconf_done,");
+
+    if (sdslen(flags) != 0) flags = sdsrange(flags,0,-2); /* remove last "," */
+    addReplyBulkCString(c,flags);
+    sdsfree(flags);
+    fields++;
+
+    addReplyBulkCString(c,"pending-commands");
+    addReplyBulkLongLong(c,ri->pending_commands);
+    fields++;
+
+    if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
+        addReplyBulkCString(c,"failover-state");
+        addReplyBulkCString(c,(char*)sentinelFailoverStateStr(ri->failover_state));
+        fields++;
+    }
+
+    addReplyBulkCString(c,"last-ok-ping-reply");
+    addReplyBulkLongLong(c,mstime() - ri->last_avail_time);
+    fields++;
+
+    addReplyBulkCString(c,"last-ping-reply");
+    addReplyBulkLongLong(c,mstime() - ri->last_pong_time);
+    fields++;
+
+    if (ri->flags & SRI_S_DOWN) {
+        addReplyBulkCString(c,"s-down-time");
+        addReplyBulkLongLong(c,mstime()-ri->s_down_since_time);
+        fields++;
+    }
+
+    if (ri->flags & SRI_O_DOWN) {
+        addReplyBulkCString(c,"o-down-time");
+        addReplyBulkLongLong(c,mstime()-ri->o_down_since_time);
+        fields++;
+    }
+
+    /* Masters and Slaves */
+    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+        addReplyBulkCString(c,"info-refresh");
+        addReplyBulkLongLong(c,mstime() - ri->info_refresh);
+        fields++;
+    }
+
+    /* Only masters */
+    if (ri->flags & SRI_MASTER) {
+        addReplyBulkCString(c,"num-slaves");
+        addReplyBulkLongLong(c,dictSize(ri->slaves));
+        fields++;
+
+        addReplyBulkCString(c,"num-other-sentinels");
+        addReplyBulkLongLong(c,dictSize(ri->sentinels));
+        fields++;
+
+        addReplyBulkCString(c,"quorum");
+        addReplyBulkLongLong(c,ri->quorum);
+        fields++;
+    }
+
+    /* Only slaves */
+    if (ri->flags & SRI_SLAVE) {
+        addReplyBulkCString(c,"master-link-down-time");
+        addReplyBulkLongLong(c,ri->master_link_down_time);
+        fields++;
+
+        addReplyBulkCString(c,"master-link-status");
+        addReplyBulkCString(c,
+            (ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP) ?
+            "ok" : "err");
+        fields++;
+
+        addReplyBulkCString(c,"master-host");
+        addReplyBulkCString(c,
+            ri->slave_master_host ? ri->slave_master_host : "?");
+        fields++;
+
+        addReplyBulkCString(c,"master-port");
+        addReplyBulkLongLong(c,ri->slave_master_port);
+        fields++;
+
+        addReplyBulkCString(c,"slave-priority");
+        addReplyBulkLongLong(c,ri->slave_priority);
+        fields++;
+    }
+
+    /* Only sentinels */
+    if (ri->flags & SRI_SENTINEL) {
+        addReplyBulkCString(c,"last-hello-message");
+        addReplyBulkLongLong(c,mstime() - ri->last_hello_time);
+        fields++;
+
+        addReplyBulkCString(c,"can-failover-its-master");
+        addReplyBulkLongLong(c,(ri->flags & SRI_CAN_FAILOVER) != 0);
+        fields++;
+
+        if (ri->flags & SRI_MASTER_DOWN) {
+            addReplyBulkCString(c,"subjective-leader");
+            addReplyBulkCString(c,ri->leader ? ri->leader : "?");
+            fields++;
+        }
+    }
+
+    setDeferredMultiBulkLength(c,mbl,fields*2);
+}
+
+/* Output a number of instances contanined inside a dictionary as
+ * Redis protocol. */
+void addReplyDictOfRedisInstances(redisClient *c, dict *instances) {
+    dictIterator *di;
+    dictEntry *de;
+
+    di = dictGetIterator(instances);
+    addReplyMultiBulkLen(c,dictSize(instances));
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+
+        addReplySentinelRedisInstance(c,ri);
+    }
+    dictReleaseIterator(di);
+}
+
+/* Lookup the named master into sentinel.masters.
+ * If the master is not found reply to the client with an error and returns
+ * NULL. */
+sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(redisClient *c,
+                        robj *name)
+{
+    sentinelRedisInstance *ri;
+
+    ri = dictFetchValue(sentinel.masters,c->argv[2]->ptr);
+    if (!ri) {
+        addReplyError(c,"No such master with that name");
+        return NULL;
+    }
+    return ri;
+}
+
+void sentinelCommand(redisClient *c) {
+    if (!strcasecmp(c->argv[1]->ptr,"masters")) {
+        /* SENTINEL MASTERS */
+        if (c->argc != 2) goto numargserr;
+
+        addReplyDictOfRedisInstances(c,sentinel.masters);
+    } else if (!strcasecmp(c->argv[1]->ptr,"slaves")) {
+        /* SENTINEL SLAVES <master-name> */
+        sentinelRedisInstance *ri;
+
+        if (c->argc != 3) goto numargserr;
+        if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
+            return;
+        addReplyDictOfRedisInstances(c,ri->slaves);
+    } else if (!strcasecmp(c->argv[1]->ptr,"sentinels")) {
+        /* SENTINEL SENTINELS <master-name> */
+        sentinelRedisInstance *ri;
+
+        if (c->argc != 3) goto numargserr;
+        if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
+            return;
+        addReplyDictOfRedisInstances(c,ri->sentinels);
+    } else if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
+        /* SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> */
+        sentinelRedisInstance *ri;
+        char *leader = NULL;
+        long port;
+        int isdown = 0;
+
+        if (c->argc != 4) goto numargserr;
+        if (getLongFromObjectOrReply(c,c->argv[3],&port,NULL) != REDIS_OK)
+            return;
+        ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
+            c->argv[2]->ptr,port,NULL);
+
+        /* It exists? Is actually a master? Is subjectively down? It's down.
+         * Note: if we are in tilt mode we always reply with "0". */
+        if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
+                                    (ri->flags & SRI_MASTER))
+            isdown = 1;
+        if (ri) leader = sentinelGetSubjectiveLeader(ri);
+
+        /* Reply with a two-elements multi-bulk reply: down state, leader. */
+        addReplyMultiBulkLen(c,2);
+        addReply(c, isdown ? shared.cone : shared.czero);
+        addReplyBulkCString(c, leader ? leader : "?");
+        if (leader) sdsfree(leader);
+    } else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
+        /* SENTINEL RESET <pattern> */
+        if (c->argc != 3) goto numargserr;
+        addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
+    } else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
+        /* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
+        sentinelRedisInstance *ri;
+
+        if (c->argc != 3) goto numargserr;
+        ri = sentinelGetMasterByName(c->argv[2]->ptr);
+        if (ri == NULL) {
+            addReply(c,shared.nullmultibulk);
+        } else if (ri->info_refresh == 0) {
+            addReplySds(c,sdsnew("-IDONTKNOW I have not enough information to reply. Please ask another Sentinel.\r\n"));
+        } else {
+            sentinelAddr *addr = ri->addr;
+
+            if ((ri->flags & SRI_FAILOVER_IN_PROGRESS) && ri->promoted_slave)
+                addr = ri->promoted_slave->addr;
+            addReplyMultiBulkLen(c,2);
+            addReplyBulkCString(c,addr->ip);
+            addReplyBulkLongLong(c,addr->port);
+        }
+    } else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
+        /* SENTINEL FAILOVER <master-name> */
+        sentinelRedisInstance *ri;
+
+        if (c->argc != 3) goto numargserr;
+        if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)
+            return;
+        if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {
+            addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));
+            return;
+        }
+        if (sentinelSelectSlave(ri) == NULL) {
+            addReplySds(c,sdsnew("-NOGOODSLAVE No suitable slave to promote\r\n"));
+            return;
+        }
+        sentinelStartFailover(ri,SENTINEL_FAILOVER_STATE_WAIT_START);
+        ri->flags |= SRI_FORCE_FAILOVER;
+        addReply(c,shared.ok);
+    } else if (!strcasecmp(c->argv[1]->ptr,"pending-scripts")) {
+        /* SENTINEL PENDING-SCRIPTS */
+
+        if (c->argc != 2) goto numargserr;
+        sentinelPendingScriptsCommand(c);
+    } else {
+        addReplyErrorFormat(c,"Unknown sentinel subcommand '%s'",
+                               (char*)c->argv[1]->ptr);
+    }
+    return;
+
+numargserr:
+    addReplyErrorFormat(c,"Wrong number of commands for 'sentinel %s'",
+                          (char*)c->argv[1]->ptr);
+}
+
+void sentinelInfoCommand(redisClient *c) {
+    char *section = c->argc == 2 ? c->argv[1]->ptr : "default";
+    sds info = sdsempty();
+    int defsections = !strcasecmp(section,"default");
+    int sections = 0;
+
+    if (c->argc > 2) {
+        addReply(c,shared.syntaxerr);
+        return;
+    }
+
+    if (!strcasecmp(section,"server") || defsections) {
+        if (sections++) info = sdscat(info,"\r\n");
+        sds serversection = genRedisInfoString("server");
+        info = sdscatlen(info,serversection,sdslen(serversection));
+        sdsfree(serversection);
+    }
+
+    if (!strcasecmp(section,"sentinel") || defsections) {
+        dictIterator *di;
+        dictEntry *de;
+        int master_id = 0;
+
+        if (sections++) info = sdscat(info,"\r\n");
+        info = sdscatprintf(info,
+            "# Sentinel\r\n"
+            "sentinel_masters:%lu\r\n"
+            "sentinel_tilt:%d\r\n"
+            "sentinel_running_scripts:%d\r\n"
+            "sentinel_scripts_queue_length:%ld\r\n",
+            dictSize(sentinel.masters),
+            sentinel.tilt,
+            sentinel.running_scripts,
+            listLength(sentinel.scripts_queue));
+
+        di = dictGetIterator(sentinel.masters);
+        while((de = dictNext(di)) != NULL) {
+            sentinelRedisInstance *ri = dictGetVal(de);
+            char *status = "ok";
+
+            if (ri->flags & SRI_O_DOWN) status = "odown";
+            else if (ri->flags & SRI_S_DOWN) status = "sdown";
+            info = sdscatprintf(info,
+                "master%d:name=%s,status=%s,address=%s:%d,"
+                "slaves=%lu,sentinels=%lu\r\n",
+                master_id++, ri->name, status,
+                ri->addr->ip, ri->addr->port,
+                dictSize(ri->slaves),
+                dictSize(ri->sentinels)+1);
+        }
+        dictReleaseIterator(di);
+    }
+
+    addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
+        (unsigned long)sdslen(info)));
+    addReplySds(c,info);
+    addReply(c,shared.crlf);
+}
+
+/* ===================== SENTINEL availability checks ======================= */
+
+/* Is this instance down from our point of view? */
+void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
+    mstime_t elapsed = mstime() - ri->last_avail_time;
+
+    /* Check if we are in need for a reconnection of one of the 
+     * links, because we are detecting low activity.
+     *
+     * 1) Check if the command link seems connected, was connected not less
+     *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have an
+     *    idle time that is greater than down_after_period / 2 seconds. */
+    if (ri->cc &&
+        (mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+        (mstime() - ri->last_pong_time) > (ri->down_after_period/2))
+    {
+        sentinelKillLink(ri,ri->cc);
+    }
+
+    /* 2) Check if the pubsub link seems connected, was connected not less
+     *    than SENTINEL_MIN_LINK_RECONNECT_PERIOD, but still we have no
+     *    activity in the Pub/Sub channel for more than
+     *    SENTINEL_PUBLISH_PERIOD * 3.
+     */
+    if (ri->pc &&
+        (mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
+        (mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
+    {
+        sentinelKillLink(ri,ri->pc);
+    }
+
+    /* Update the subjectively down flag. */
+    if (elapsed > ri->down_after_period) {
+        /* Is subjectively down */
+        if ((ri->flags & SRI_S_DOWN) == 0) {
+            sentinelEvent(REDIS_WARNING,"+sdown",ri,"%@");
+            ri->s_down_since_time = mstime();
+            ri->flags |= SRI_S_DOWN;
+        }
+    } else {
+        /* Is subjectively up */
+        if (ri->flags & SRI_S_DOWN) {
+            sentinelEvent(REDIS_WARNING,"-sdown",ri,"%@");
+            ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
+        }
+    }
+}
+
+/* Is this instance down accordingly to the configured quorum? */
+void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
+    dictIterator *di;
+    dictEntry *de;
+    int quorum = 0, odown = 0;
+
+    if (master->flags & SRI_S_DOWN) {
+        /* Is down for enough sentinels? */
+        quorum = 1; /* the current sentinel. */
+        /* Count all the other sentinels. */
+        di = dictGetIterator(master->sentinels);
+        while((de = dictNext(di)) != NULL) {
+            sentinelRedisInstance *ri = dictGetVal(de);
+
+            if (ri->flags & SRI_MASTER_DOWN) quorum++;
+        }
+        dictReleaseIterator(di);
+        if (quorum >= master->quorum) odown = 1;
+    }
+
+    /* Set the flag accordingly to the outcome. */
+    if (odown) {
+        if ((master->flags & SRI_O_DOWN) == 0) {
+            sentinelEvent(REDIS_WARNING,"+odown",master,"%@ #quorum %d/%d",
+                quorum, master->quorum);
+            master->flags |= SRI_O_DOWN;
+            master->o_down_since_time = mstime();
+        }
+    } else {
+        if (master->flags & SRI_O_DOWN) {
+            sentinelEvent(REDIS_WARNING,"-odown",master,"%@");
+            master->flags &= ~SRI_O_DOWN;
+        }
+    }
+}
+
+/* Receive the SENTINEL is-master-down-by-addr reply, see the
+ * sentinelAskMasterStateToOtherSentinels() function for more information. */
+void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
+    sentinelRedisInstance *ri = c->data;
+    redisReply *r;
+
+    if (ri) ri->pending_commands--;
+    if (!reply || !ri) return;
+    r = reply;
+
+    /* Ignore every error or unexpected reply.
+     * Note that if the command returns an error for any reason we'll
+     * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
+    if (r->type == REDIS_REPLY_ARRAY && r->elements == 2 &&
+        r->element[0]->type == REDIS_REPLY_INTEGER &&
+        r->element[1]->type == REDIS_REPLY_STRING)
+    {
+        ri->last_master_down_reply_time = mstime();
+        if (r->element[0]->integer == 1) {
+            ri->flags |= SRI_MASTER_DOWN;
+        } else {
+            ri->flags &= ~SRI_MASTER_DOWN;
+        }
+        sdsfree(ri->leader);
+        ri->leader = sdsnew(r->element[1]->str);
+    }
+}
+
+/* If we think (subjectively) the master is down, we start sending
+ * SENTINEL IS-MASTER-DOWN-BY-ADDR requests to other sentinels
+ * in order to get the replies that allow to reach the quorum and
+ * possibly also mark the master as objectively down. */
+void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master) {
+    dictIterator *di;
+    dictEntry *de;
+
+    di = dictGetIterator(master->sentinels);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+        mstime_t elapsed = mstime() - ri->last_master_down_reply_time;
+        char port[32];
+        int retval;
+
+        /* If the master state from other sentinel is too old, we clear it. */
+        if (elapsed > SENTINEL_INFO_VALIDITY_TIME) {
+            ri->flags &= ~SRI_MASTER_DOWN;
+            sdsfree(ri->leader);
+            ri->leader = NULL;
+        }
+
+        /* Only ask if master is down to other sentinels if:
+         *
+         * 1) We believe it is down, or there is a failover in progress.
+         * 2) Sentinel is connected.
+         * 3) We did not received the info within SENTINEL_ASK_PERIOD ms. */
+        if ((master->flags & (SRI_S_DOWN|SRI_FAILOVER_IN_PROGRESS)) == 0)
+            continue;
+        if (ri->flags & SRI_DISCONNECTED) continue;
+        if (mstime() - ri->last_master_down_reply_time < SENTINEL_ASK_PERIOD)
+            continue;
+
+        /* Ask */
+        ll2string(port,sizeof(port),master->addr->port);
+        retval = redisAsyncCommand(ri->cc,
+                    sentinelReceiveIsMasterDownReply, NULL,
+                    "SENTINEL is-master-down-by-addr %s %s",
+                    master->addr->ip, port);
+        if (retval == REDIS_OK) ri->pending_commands++;
+    }
+    dictReleaseIterator(di);
+}
+
+/* =============================== FAILOVER ================================= */
+
+/* Given a master get the "subjective leader", that is, among all the sentinels
+ * with given characteristics, the one with the lexicographically smaller
+ * runid. The characteristics required are:
+ *
+ * 1) Has SRI_CAN_FAILOVER flag.
+ * 2) Is not disconnected.
+ * 3) Recently answered to our ping (no longer than
+ *    SENTINEL_INFO_VALIDITY_TIME milliseconds ago).
+ *
+ * The function returns a pointer to an sds string representing the runid of the
+ * leader sentinel instance (from our point of view). Otherwise NULL is
+ * returned if there are no suitable sentinels.
+ */
+
+int compareRunID(const void *a, const void *b) {
+    char **aptrptr = (char**)a, **bptrptr = (char**)b;
+    return strcasecmp(*aptrptr, *bptrptr);
+}
+
+char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master) {
+    dictIterator *di;
+    dictEntry *de;
+    char **instance =
+        zmalloc(sizeof(char*)*(dictSize(master->sentinels)+1));
+    int instances = 0;
+    char *leader = NULL;
+
+    if (master->flags & SRI_CAN_FAILOVER) {
+        /* Add myself if I'm a Sentinel that can failover this master. */
+        instance[instances++] = server.runid;
+    }
+
+    di = dictGetIterator(master->sentinels);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+        mstime_t lag = mstime() - ri->last_avail_time;
+
+        if (lag > SENTINEL_INFO_VALIDITY_TIME ||
+            !(ri->flags & SRI_CAN_FAILOVER) ||
+            (ri->flags & SRI_DISCONNECTED) ||
+            ri->runid == NULL)
+            continue;
+        instance[instances++] = ri->runid;
+    }
+    dictReleaseIterator(di);
+
+    /* If we have at least one instance passing our checks, order the array
+     * by runid. */
+    if (instances) {
+        qsort(instance,instances,sizeof(char*),compareRunID);
+        leader = sdsnew(instance[0]);
+    }
+    zfree(instance);
+    return leader;
+}
+
+struct sentinelLeader {
+    char *runid;
+    unsigned long votes;
+};
+
+/* Helper function for sentinelGetObjectiveLeader, increment the counter
+ * relative to the specified runid. */
+void sentinelObjectiveLeaderIncr(dict *counters, char *runid) {
+    dictEntry *de = dictFind(counters,runid);
+    uint64_t oldval;
+
+    if (de) {
+        oldval = dictGetUnsignedIntegerVal(de);
+        dictSetUnsignedIntegerVal(de,oldval+1);
+    } else {
+        de = dictAddRaw(counters,runid);
+        redisAssert(de != NULL);
+        dictSetUnsignedIntegerVal(de,1);
+    }
+}
+
+/* Scan all the Sentinels attached to this master to check what is the
+ * most voted leader among Sentinels. */
+char *sentinelGetObjectiveLeader(sentinelRedisInstance *master) {
+    dict *counters;
+    dictIterator *di;
+    dictEntry *de;
+    unsigned int voters = 0, voters_quorum;
+    char *myvote;
+    char *winner = NULL;
+
+    redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));
+    counters = dictCreate(&leaderVotesDictType,NULL);
+
+    /* Count my vote. */
+    myvote = sentinelGetSubjectiveLeader(master);
+    if (myvote) {
+        sentinelObjectiveLeaderIncr(counters,myvote);
+        voters++;
+    }
+
+    /* Count other sentinels votes */
+    di = dictGetIterator(master->sentinels);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+        if (ri->leader == NULL) continue;
+        /* If the failover is not already in progress we are only interested
+         * in Sentinels that believe the master is down. Otherwise the leader
+         * selection is useful for the "failover-takedown" when the original
+         * leader fails. In that case we consider all the voters. */
+        if (!(master->flags & SRI_FAILOVER_IN_PROGRESS) &&
+            !(ri->flags & SRI_MASTER_DOWN)) continue;
+        sentinelObjectiveLeaderIncr(counters,ri->leader);
+        voters++;
+    }
+    dictReleaseIterator(di);
+    voters_quorum = voters/2+1;
+
+    /* Check what's the winner. For the winner to win, it needs two conditions:
+     * 1) Absolute majority between voters (50% + 1).
+     * 2) And anyway at least master->quorum votes. */
+    {
+        uint64_t max_votes = 0; /* Max votes so far. */
+
+        di = dictGetIterator(counters);
+        while((de = dictNext(di)) != NULL) {
+            uint64_t votes = dictGetUnsignedIntegerVal(de);
+
+            if (max_votes < votes) {
+                max_votes = votes;
+                winner = dictGetKey(de);
+            }
+        }
+        dictReleaseIterator(di);
+        if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
+            winner = NULL;
+    }
+    winner = winner ? sdsnew(winner) : NULL;
+    sdsfree(myvote);
+    dictRelease(counters);
+    return winner;
+}
+
+/* Setup the master state to start a failover as a leader.
+ *
+ * State can be either:
+ *
+ * SENTINEL_FAILOVER_STATE_WAIT_START: starts a failover from scratch.
+ * SENTINEL_FAILOVER_STATE_RECONF_SLAVES: takedown a failed failover.
+ */
+void sentinelStartFailover(sentinelRedisInstance *master, int state) {
+    redisAssert(master->flags & SRI_MASTER);
+    redisAssert(state == SENTINEL_FAILOVER_STATE_WAIT_START ||
+                state == SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
+
+    master->failover_state = state;
+    master->flags |= SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER;
+    sentinelEvent(REDIS_WARNING,"+failover-triggered",master,"%@");
+
+    /* Pick a random delay if it's a fresh failover (WAIT_START), and not
+     * a recovery of a failover started by another sentinel. */
+    if (master->failover_state == SENTINEL_FAILOVER_STATE_WAIT_START) {
+        master->failover_start_time = mstime() +
+            SENTINEL_FAILOVER_FIXED_DELAY +
+            (rand() % SENTINEL_FAILOVER_MAX_RANDOM_DELAY);
+        sentinelEvent(REDIS_WARNING,"+failover-state-wait-start",master,
+            "%@ #starting in %lld milliseconds",
+            master->failover_start_time-mstime());
+    }
+    master->failover_state_change_time = mstime();
+}
+
+/* This function checks if there are the conditions to start the failover,
+ * that is:
+ *
+ * 1) Enough time has passed since O_DOWN.
+ * 2) The master is marked as SRI_CAN_FAILOVER, so we can failover it.
+ * 3) We are the objectively leader for this master.
+ *
+ * If the conditions are met we flag the master as SRI_FAILOVER_IN_PROGRESS
+ * and SRI_I_AM_THE_LEADER.
+ */
+void sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
+    char *leader;
+    int isleader;
+
+    /* We can't failover if the master is not in O_DOWN state or if
+     * there is not already a failover in progress (to perform the
+     * takedown if the leader died) or if this Sentinel is not allowed
+     * to start a failover. */
+    if (!(master->flags & SRI_CAN_FAILOVER) ||
+        !(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS))) return;
+
+    leader = sentinelGetObjectiveLeader(master);
+    isleader = leader && strcasecmp(leader,server.runid) == 0;
+    sdsfree(leader);
+
+    /* If I'm not the leader, I can't failover for sure. */
+    if (!isleader) return;
+
+    /* If the failover is already in progress there are two options... */
+    if (master->flags & SRI_FAILOVER_IN_PROGRESS) {
+        if (master->flags & SRI_I_AM_THE_LEADER) {
+            /* 1) I'm flagged as leader so I already started the failover.
+             *    Just return. */
+            return;
+        } else {
+            mstime_t elapsed = mstime() - master->failover_state_change_time;
+
+            /* 2) I'm the new leader, but I'm not flagged as leader in the
+             *    master: I did not started the failover, but the original
+             *    leader has no longer the leadership.
+             *
+             *    In this case if the failover appears to be lagging
+             *    for at least 25% of the configured failover timeout,
+             *    I can assume I can take control. Otherwise
+             *    it's better to return and wait more. */
+            if (elapsed < (master->failover_timeout/4)) return;
+            sentinelEvent(REDIS_WARNING,"+failover-takedown",master,"%@");
+            /* We have already an elected slave if we are in
+             * FAILOVER_IN_PROGRESS state, that is, the slave that we
+             * observed turning into a master. */
+            sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_RECONF_SLAVES);
+            /* As an observer we flagged all the slaves as RECONF_SENT but
+             * now we are in charge of actually sending the reconfiguration
+             * command so let's clear this flag for all the instances. */
+            sentinelDelFlagsToDictOfRedisInstances(master->slaves,
+                SRI_RECONF_SENT);
+        }
+    } else {
+        /* Brand new failover as SRI_FAILOVER_IN_PROGRESS was not set.
+         *
+         * Do we have a slave to promote? Otherwise don't start a failover
+         * at all. */
+        if (sentinelSelectSlave(master) == NULL) return;
+        sentinelStartFailover(master,SENTINEL_FAILOVER_STATE_WAIT_START);
+    }
+}
+
+/* Select a suitable slave to promote. The current algorithm only uses
+ * the following parameters:
+ *
+ * 1) None of the following conditions: S_DOWN, O_DOWN, DISCONNECTED.
+ * 2) last_avail_time more recent than SENTINEL_INFO_VALIDITY_TIME.
+ * 3) info_refresh more recent than SENTINEL_INFO_VALIDITY_TIME.
+ * 4) master_link_down_time no more than:
+ *     (now - master->s_down_since_time) + (master->down_after_period * 10).
+ * 5) Slave priority can't be zero, otherwise the slave is discareded.
+ *
+ * Among all the slaves matching the above conditions we select the slave
+ * with lower slave_priority. If priority is the same we select the slave
+ * with lexicographically smaller runid.
+ *
+ * The function returns the pointer to the selected slave, otherwise
+ * NULL if no suitable slave was found.
+ */
+
+int compareSlavesForPromotion(const void *a, const void *b) {
+    sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
+                          **sb = (sentinelRedisInstance **)b;
+    char *sa_runid, *sb_runid;
+
+    if ((*sa)->slave_priority != (*sb)->slave_priority)
+        return (*sa)->slave_priority - (*sb)->slave_priority;
+
+    /* If priority is the same, select the slave with that has the
+     * lexicographically smaller runid. Note that we try to handle runid
+     * == NULL as there are old Redis versions that don't publish runid in
+     * INFO. A NULL runid is considered bigger than any other runid. */
+    sa_runid = (*sa)->runid;
+    sb_runid = (*sb)->runid;
+    if (sa_runid == NULL && sb_runid == NULL) return 0;
+    else if (sa_runid == NULL) return 1;  /* a > b */
+    else if (sb_runid == NULL) return -1; /* a < b */
+    return strcasecmp(sa_runid, sb_runid);
+}
+
+sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
+    sentinelRedisInstance **instance =
+        zmalloc(sizeof(instance[0])*dictSize(master->slaves));
+    sentinelRedisInstance *selected = NULL;
+    int instances = 0;
+    dictIterator *di;
+    dictEntry *de;
+    mstime_t max_master_down_time = 0;
+
+    if (master->flags & SRI_S_DOWN)
+        max_master_down_time += mstime() - master->s_down_since_time;
+    max_master_down_time += master->down_after_period * 10;
+
+    di = dictGetIterator(master->slaves);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *slave = dictGetVal(de);
+        mstime_t info_validity_time = mstime()-SENTINEL_INFO_VALIDITY_TIME;
+
+        if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_DISCONNECTED)) continue;
+        if (slave->last_avail_time < info_validity_time) continue;
+        if (slave->slave_priority == 0) continue;
+
+        /* If the master is in SDOWN state we get INFO for slaves every second.
+         * Otherwise we get it with the usual period so we need to account for
+         * a larger delay. */
+        if ((master->flags & SRI_S_DOWN) == 0)
+            info_validity_time -= SENTINEL_INFO_PERIOD;
+        if (slave->info_refresh < info_validity_time) continue;
+        if (slave->master_link_down_time > max_master_down_time) continue;
+        instance[instances++] = slave;
+    }
+    dictReleaseIterator(di);
+    if (instances) {
+        qsort(instance,instances,sizeof(sentinelRedisInstance*),
+            compareSlavesForPromotion);
+        selected = instance[0];
+    }
+    zfree(instance);
+    return selected;
+}
+
+/* ---------------- Failover state machine implementation ------------------- */
+void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
+    /* If we in "wait start" but the master is no longer in ODOWN nor in
+     * SDOWN condition we abort the failover. This is important as it
+     * prevents a useless failover in a a notable case of netsplit, where
+     * the senitnels are split from the redis instances. In this case
+     * the failover will not start while there is the split because no
+     * good slave can be reached. However when the split is resolved, we
+     * can go to waitstart if the slave is back rechable a few milliseconds
+     * before the master is. In that case when the master is back online
+     * we cancel the failover. */
+    if ((ri->flags & (SRI_S_DOWN|SRI_O_DOWN|SRI_FORCE_FAILOVER)) == 0) {
+        sentinelEvent(REDIS_WARNING,"-failover-abort-master-is-back",
+            ri,"%@");
+        sentinelAbortFailover(ri);
+        return;
+    }
+
+    /* Start the failover going to the next state if enough time has
+     * elapsed. */
+    if (mstime() >= ri->failover_start_time) {
+        ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
+        ri->failover_state_change_time = mstime();
+        sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
+    }
+}
+
+void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
+    sentinelRedisInstance *slave = sentinelSelectSlave(ri);
+
+    if (slave == NULL) {
+        sentinelEvent(REDIS_WARNING,"-failover-abort-no-good-slave",ri,"%@");
+        sentinelAbortFailover(ri);
+    } else {
+        sentinelEvent(REDIS_WARNING,"+selected-slave",slave,"%@");
+        slave->flags |= SRI_PROMOTED;
+        ri->promoted_slave = slave;
+        ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
+        ri->failover_state_change_time = mstime();
+        sentinelEvent(REDIS_NOTICE,"+failover-state-send-slaveof-noone",
+            slave, "%@");
+    }
+}
+
+void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
+    int retval;
+
+    if (ri->promoted_slave->flags & SRI_DISCONNECTED) return;
+
+    /* Send SLAVEOF NO ONE command to turn the slave into a master.
+     * We actually register a generic callback for this command as we don't
+     * really care about the reply. We check if it worked indirectly observing
+     * if INFO returns a different role (master instead of slave). */
+    retval = redisAsyncCommand(ri->promoted_slave->cc,
+        sentinelDiscardReplyCallback, NULL, "SLAVEOF NO ONE");
+    if (retval != REDIS_OK) return;
+    ri->promoted_slave->pending_commands++;
+    sentinelEvent(REDIS_NOTICE, "+failover-state-wait-promotion",
+        ri->promoted_slave,"%@");
+    ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
+    ri->failover_state_change_time = mstime();
+}
+
+/* We actually wait for promotion indirectly checking with INFO when the
+ * slave turns into a master. */
+void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
+    mstime_t elapsed = mstime() - ri->failover_state_change_time;
+
+    if (elapsed >= SENTINEL_PROMOTION_RETRY_PERIOD) {
+        sentinelEvent(REDIS_WARNING,"-promotion-timeout",ri->promoted_slave,
+            "%@");
+        sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
+        ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
+        ri->failover_state_change_time = mstime();
+        ri->promoted_slave->flags &= ~SRI_PROMOTED;
+        ri->promoted_slave = NULL;
+    }
+}
+
+void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
+    int not_reconfigured = 0, timeout = 0;
+    dictIterator *di;
+    dictEntry *de;
+    mstime_t elapsed = mstime() - master->failover_state_change_time;
+
+    /* We can't consider failover finished if the promoted slave is
+     * not reachable. */
+    if (master->promoted_slave == NULL ||
+        master->promoted_slave->flags & SRI_S_DOWN) return;
+
+    /* The failover terminates once all the reachable slaves are properly
+     * configured. */
+    di = dictGetIterator(master->slaves);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *slave = dictGetVal(de);
+
+        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
+        if (slave->flags & SRI_S_DOWN) continue;
+        not_reconfigured++;
+    }
+    dictReleaseIterator(di);
+
+    /* Force end of failover on timeout. */
+    if (elapsed > master->failover_timeout) {
+        not_reconfigured = 0;
+        timeout = 1;
+        sentinelEvent(REDIS_WARNING,"+failover-end-for-timeout",master,"%@");
+    }
+
+    if (not_reconfigured == 0) {
+        int role = (master->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
+                                                           SENTINEL_OBSERVER;
+
+        sentinelEvent(REDIS_WARNING,"+failover-end",master,"%@");
+        master->failover_state = SENTINEL_FAILOVER_STATE_UPDATE_CONFIG;
+        master->failover_state_change_time = mstime();
+        sentinelCallClientReconfScript(master,role,"end",master->addr,
+            master->promoted_slave->addr);
+    }
+
+    /* If I'm the leader it is a good idea to send a best effort SLAVEOF
+     * command to all the slaves still not reconfigured to replicate with
+     * the new master. */
+    if (timeout && (master->flags & SRI_I_AM_THE_LEADER)) {
+        dictIterator *di;
+        dictEntry *de;
+        char master_port[32];
+
+        ll2string(master_port,sizeof(master_port),
+            master->promoted_slave->addr->port);
+
+        di = dictGetIterator(master->slaves);
+        while((de = dictNext(di)) != NULL) {
+            sentinelRedisInstance *slave = dictGetVal(de);
+            int retval;
+
+            if (slave->flags &
+                (SRI_RECONF_DONE|SRI_RECONF_SENT|SRI_DISCONNECTED)) continue;
+
+            retval = redisAsyncCommand(slave->cc,
+                sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+                    master->promoted_slave->addr->ip,
+                    master_port);
+            if (retval == REDIS_OK) {
+                sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent-be",slave,"%@");
+                slave->flags |= SRI_RECONF_SENT;
+            }
+        }
+        dictReleaseIterator(di);
+    }
+}
+
+/* Send SLAVE OF <new master address> to all the remaining slaves that
+ * still don't appear to have the configuration updated. */
+void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
+    dictIterator *di;
+    dictEntry *de;
+    int in_progress = 0;
+
+    di = dictGetIterator(master->slaves);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *slave = dictGetVal(de);
+
+        if (slave->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG))
+            in_progress++;
+    }
+    dictReleaseIterator(di);
+
+    di = dictGetIterator(master->slaves);
+    while(in_progress < master->parallel_syncs &&
+          (de = dictNext(di)) != NULL)
+    {
+        sentinelRedisInstance *slave = dictGetVal(de);
+        int retval;
+        char master_port[32];
+
+        /* Skip the promoted slave, and already configured slaves. */
+        if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE)) continue;
+
+        /* Clear the SRI_RECONF_SENT flag if too much time elapsed without
+         * the slave moving forward to the next state. */
+        if ((slave->flags & SRI_RECONF_SENT) &&
+            (mstime() - slave->slave_reconf_sent_time) >
+            SENTINEL_SLAVE_RECONF_RETRY_PERIOD)
+        {
+            sentinelEvent(REDIS_NOTICE,"-slave-reconf-sent-timeout",slave,"%@");
+            slave->flags &= ~SRI_RECONF_SENT;
+        }
+
+        /* Nothing to do for instances that are disconnected or already
+         * in RECONF_SENT state. */
+        if (slave->flags & (SRI_DISCONNECTED|SRI_RECONF_SENT|SRI_RECONF_INPROG))
+            continue;
+
+        /* Send SLAVEOF <new master>. */
+        ll2string(master_port,sizeof(master_port),
+            master->promoted_slave->addr->port);
+        retval = redisAsyncCommand(slave->cc,
+            sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+                master->promoted_slave->addr->ip,
+                master_port);
+        if (retval == REDIS_OK) {
+            slave->flags |= SRI_RECONF_SENT;
+            slave->pending_commands++;
+            slave->slave_reconf_sent_time = mstime();
+            sentinelEvent(REDIS_NOTICE,"+slave-reconf-sent",slave,"%@");
+            in_progress++;
+        }
+    }
+    dictReleaseIterator(di);
+    sentinelFailoverDetectEnd(master);
+}
+
+/* This function is called when the slave is in
+ * SENTINEL_FAILOVER_STATE_UPDATE_CONFIG state. In this state we need
+ * to remove it from the master table and add the promoted slave instead.
+ *
+ * If there are no promoted slaves as this instance is unique, we remove
+ * and re-add it with the same address to trigger a complete state
+ * refresh. */
+void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
+    sentinelRedisInstance *ref = master->promoted_slave ?
+                                 master->promoted_slave : master;
+
+    sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
+        master->name, master->addr->ip, master->addr->port,
+        ref->addr->ip, ref->addr->port);
+
+    sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
+}
+
+void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
+    redisAssert(ri->flags & SRI_MASTER);
+
+    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
+
+    switch(ri->failover_state) {
+        case SENTINEL_FAILOVER_STATE_WAIT_START:
+            sentinelFailoverWaitStart(ri);
+            break;
+        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
+            sentinelFailoverSelectSlave(ri);
+            break;
+        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
+            sentinelFailoverSendSlaveOfNoOne(ri);
+            break;
+        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
+            sentinelFailoverWaitPromotion(ri);
+            break;
+        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
+            sentinelFailoverReconfNextSlave(ri);
+            break;
+        case SENTINEL_FAILOVER_STATE_DETECT_END:
+            sentinelFailoverDetectEnd(ri);
+            break;
+    }
+}
+
+/* Abort a failover in progress with the following steps:
+ * 1) If this instance is the leaer send a SLAVEOF command to all the already
+ *    reconfigured slaves if any to configure them to replicate with the
+ *    original master.
+ * 2) For both leaders and observers: clear the failover flags and state in
+ *    the master instance.
+ * 3) If there is already a promoted slave and we are the leader, and this
+ *    slave is not DISCONNECTED, try to reconfigure it to replicate
+ *    back to the master as well, sending a best effort SLAVEOF command.
+ */
+void sentinelAbortFailover(sentinelRedisInstance *ri) {
+    char master_port[32];
+    dictIterator *di;
+    dictEntry *de;
+    int sentinel_role;
+
+    redisAssert(ri->flags & SRI_FAILOVER_IN_PROGRESS);
+    ll2string(master_port,sizeof(master_port),ri->addr->port);
+
+    /* Clear failover related flags from slaves.
+     * Also if we are the leader make sure to send SLAVEOF commands to all the
+     * already reconfigured slaves in order to turn them back into slaves of
+     * the original master. */
+    di = dictGetIterator(ri->slaves);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *slave = dictGetVal(de);
+        if ((ri->flags & SRI_I_AM_THE_LEADER) &&
+            !(slave->flags & SRI_DISCONNECTED) &&
+             (slave->flags & (SRI_PROMOTED|SRI_RECONF_SENT|SRI_RECONF_INPROG|
+                              SRI_RECONF_DONE)))
+        {
+            int retval;
+
+            retval = redisAsyncCommand(slave->cc,
+                sentinelDiscardReplyCallback, NULL, "SLAVEOF %s %s",
+                    ri->addr->ip,
+                    master_port);
+            if (retval == REDIS_OK)
+                sentinelEvent(REDIS_NOTICE,"-slave-reconf-undo",slave,"%@");
+        }
+        slave->flags &= ~(SRI_RECONF_SENT|SRI_RECONF_INPROG|SRI_RECONF_DONE);
+    }
+    dictReleaseIterator(di);
+
+    sentinel_role = (ri->flags & SRI_I_AM_THE_LEADER) ? SENTINEL_LEADER :
+                                                        SENTINEL_OBSERVER;
+    ri->flags &= ~(SRI_FAILOVER_IN_PROGRESS|SRI_I_AM_THE_LEADER|SRI_FORCE_FAILOVER);
+    ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
+    ri->failover_state_change_time = mstime();
+    if (ri->promoted_slave) {
+        sentinelCallClientReconfScript(ri,sentinel_role,"abort",
+            ri->promoted_slave->addr,ri->addr);
+        ri->promoted_slave->flags &= ~SRI_PROMOTED;
+        ri->promoted_slave = NULL;
+    }
+}
+
+/* The following is called only for master instances and will abort the
+ * failover process if:
+ *
+ * 1) The failover is in progress.
+ * 2) We already promoted a slave.
+ * 3) The promoted slave is in extended SDOWN condition.
+ */
+void sentinelAbortFailoverIfNeeded(sentinelRedisInstance *ri) {
+    /* Failover is in progress? Do we have a promoted slave? */
+    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS) || !ri->promoted_slave) return;
+
+    /* Is the promoted slave into an extended SDOWN state? */
+    if (!(ri->promoted_slave->flags & SRI_S_DOWN) ||
+        (mstime() - ri->promoted_slave->s_down_since_time) <
+        (ri->down_after_period * SENTINEL_EXTENDED_SDOWN_MULTIPLIER)) return;
+
+    sentinelEvent(REDIS_WARNING,"-failover-abort-x-sdown",ri->promoted_slave,"%@");
+    sentinelAbortFailover(ri);
+}
+
+/* ======================== SENTINEL timer handler ==========================
+ * This is the "main" our Sentinel, being sentinel completely non blocking
+ * in design. The function is called every second.
+ * -------------------------------------------------------------------------- */
+
+/* Perform scheduled operations for the specified Redis instance. */
+void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
+    /* ========== MONITORING HALF ============ */
+    /* Every kind of instance */
+    sentinelReconnectInstance(ri);
+    sentinelPingInstance(ri);
+
+    /* Masters and slaves */
+    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+        /* Nothing so far. */
+    }
+
+    /* Only masters */
+    if (ri->flags & SRI_MASTER) {
+        sentinelAskMasterStateToOtherSentinels(ri);
+    }
+
+    /* ============== ACTING HALF ============= */
+    /* We don't proceed with the acting half if we are in TILT mode.
+     * TILT happens when we find something odd with the time, like a
+     * sudden change in the clock. */
+    if (sentinel.tilt) {
+        if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
+        sentinel.tilt = 0;
+        sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
+    }
+
+    /* Every kind of instance */
+    sentinelCheckSubjectivelyDown(ri);
+
+    /* Masters and slaves */
+    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
+        /* Nothing so far. */
+    }
+
+    /* Only masters */
+    if (ri->flags & SRI_MASTER) {
+        sentinelCheckObjectivelyDown(ri);
+        sentinelStartFailoverIfNeeded(ri);
+        sentinelFailoverStateMachine(ri);
+        sentinelAbortFailoverIfNeeded(ri);
+    }
+}
+
+/* Perform scheduled operations for all the instances in the dictionary.
+ * Recursively call the function against dictionaries of slaves. */
+void sentinelHandleDictOfRedisInstances(dict *instances) {
+    dictIterator *di;
+    dictEntry *de;
+    sentinelRedisInstance *switch_to_promoted = NULL;
+
+    /* There are a number of things we need to perform against every master. */
+    di = dictGetIterator(instances);
+    while((de = dictNext(di)) != NULL) {
+        sentinelRedisInstance *ri = dictGetVal(de);
+
+        sentinelHandleRedisInstance(ri);
+        if (ri->flags & SRI_MASTER) {
+            sentinelHandleDictOfRedisInstances(ri->slaves);
+            sentinelHandleDictOfRedisInstances(ri->sentinels);
+            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
+                switch_to_promoted = ri;
+            }
+        }
+    }
+    if (switch_to_promoted)
+        sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
+    dictReleaseIterator(di);
+}
+
+/* This function checks if we need to enter the TITL mode.
+ *
+ * The TILT mode is entered if we detect that between two invocations of the
+ * timer interrupt, a negative amount of time, or too much time has passed.
+ * Note that we expect that more or less just 100 milliseconds will pass
+ * if everything is fine. However we'll see a negative number or a
+ * difference bigger than SENTINEL_TILT_TRIGGER milliseconds if one of the
+ * following conditions happen:
+ *
+ * 1) The Sentiel process for some time is blocked, for every kind of
+ * random reason: the load is huge, the computer was freezed for some time
+ * in I/O or alike, the process was stopped by a signal. Everything.
+ * 2) The system clock was altered significantly.
+ *
+ * Under both this conditions we'll see everything as timed out and failing
+ * without good reasons. Instead we enter the TILT mode and wait
+ * for SENTIENL_TILT_PERIOD to elapse before starting to act again.
+ *
+ * During TILT time we still collect information, we just do not act. */
+void sentinelCheckTiltCondition(void) {
+    mstime_t now = mstime();
+    mstime_t delta = now - sentinel.previous_time;
+
+    if (delta < 0 || delta > SENTINEL_TILT_TRIGGER) {
+        sentinel.tilt = 1;
+        sentinel.tilt_start_time = mstime();
+        sentinelEvent(REDIS_WARNING,"+tilt",NULL,"#tilt mode entered");
+    }
+    sentinel.previous_time = mstime();
+}
+
+void sentinelTimer(void) {
+    sentinelCheckTiltCondition();
+    sentinelHandleDictOfRedisInstances(sentinel.masters);
+    sentinelRunPendingScripts();
+    sentinelCollectTerminatedScripts();
+    sentinelKillTimedoutScripts();
+}
+
index c1ed5517f2fc00d4edbf453445cfcc4d5d9f7b76..d18a52959ecfeec28554524abbe0672e624ee2aa 100644 (file)
@@ -2,6 +2,8 @@
 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
 #include <math.h> /* isnan() */
 
+zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank);
+
 redisSortOperation *createSortOperation(int type, robj *pattern) {
     redisSortOperation *so = zmalloc(sizeof(*so));
     so->type = type;
@@ -156,8 +158,9 @@ void sortCommand(redisClient *c) {
 
     /* Lookup the key to sort. It must be of the right types */
     sortval = lookupKeyRead(c->db,c->argv[1]);
-    if (sortval && sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
-        sortval->type != REDIS_ZSET)
+    if (sortval && sortval->type != REDIS_SET &&
+                   sortval->type != REDIS_LIST &&
+                   sortval->type != REDIS_ZSET)
     {
         addReply(c,shared.wrongtypeerr);
         return;
@@ -167,7 +170,7 @@ void sortCommand(redisClient *c) {
      * Operations can be GET/DEL/INCR/DECR */
     operations = listCreate();
     listSetFreeMethod(operations,zfree);
-    j = 2;
+    j = 2; /* options start at argv[2] */
 
     /* Now we need to protect sortval incrementing its count, in the future
      * SORT may have options able to overwrite/delete keys during the sorting
@@ -213,10 +216,18 @@ void sortCommand(redisClient *c) {
         j++;
     }
 
-    /* If we have STORE we need to force sorting for deterministic output
-     * and replication. We use alpha sorting since this is guaranteed to
-     * work with any input. */
-    if (storekey && dontsort) {
+    /* For the STORE option, or when SORT is called from a Lua script,
+     * we want to force a specific ordering even when no explicit ordering
+     * was asked (SORT BY nosort). This guarantees that replication / AOF
+     * is deterministic.
+     *
+     * However in the case 'dontsort' is true, but the type to sort is a
+     * sorted set, we don't need to do anything as ordering is guaranteed
+     * in this special case. */
+    if ((storekey || c->flags & REDIS_LUA_CLIENT) &&
+        (dontsort && sortval->type != REDIS_ZSET))
+    {
+        /* Force ALPHA sorting */
         dontsort = 0;
         alpha = 1;
         sortby = NULL;
@@ -226,13 +237,41 @@ void sortCommand(redisClient *c) {
     if (sortval->type == REDIS_ZSET)
         zsetConvert(sortval, REDIS_ENCODING_SKIPLIST);
 
-    /* Load the sorting vector with all the objects to sort */
+    /* Objtain the length of the object to sort. */
     switch(sortval->type) {
     case REDIS_LIST: vectorlen = listTypeLength(sortval); break;
     case REDIS_SET: vectorlen =  setTypeSize(sortval); break;
     case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
     default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */
     }
+
+    /* Perform LIMIT start,count sanity checking. */
+    start = (limit_start < 0) ? 0 : limit_start;
+    end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
+    if (start >= vectorlen) {
+        start = vectorlen-1;
+        end = vectorlen-2;
+    }
+    if (end >= vectorlen) end = vectorlen-1;
+
+    /* Optimization:
+     *
+     * 1) if the object to sort is a sorted set.
+     * 2) There is nothing to sort as dontsort is true (BY <constant string>).
+     * 3) We have a LIMIT option that actually reduces the number of elements
+     *    to fetch.
+     *
+     * In this case to load all the objects in the vector is a huge waste of
+     * resources. We just allocate a vector that is big enough for the selected
+     * range length, and make sure to load just this part in the vector. */
+    if (sortval->type == REDIS_ZSET &&
+        dontsort &&
+        (start != 0 || end != vectorlen-1))
+    {
+        vectorlen = end-start+1;
+    }
+
+    /* Load the sorting vector with all the objects to sort */
     vector = zmalloc(sizeof(redisSortObject)*vectorlen);
     j = 0;
 
@@ -256,6 +295,48 @@ void sortCommand(redisClient *c) {
             j++;
         }
         setTypeReleaseIterator(si);
+    } else if (sortval->type == REDIS_ZSET && dontsort) {
+        /* Special handling for a sorted set, if 'dontsort' is true.
+         * This makes sure we return elements in the sorted set original
+         * ordering, accordingly to DESC / ASC options.
+         *
+         * Note that in this case we also handle LIMIT here in a direct
+         * way, just getting the required range, as an optimization. */
+
+        zset *zs = sortval->ptr;
+        zskiplist *zsl = zs->zsl;
+        zskiplistNode *ln;
+        robj *ele;
+        int rangelen = vectorlen;
+
+        /* Check if starting point is trivial, before doing log(N) lookup. */
+        if (desc) {
+            long zsetlen = dictSize(((zset*)sortval->ptr)->dict);
+
+            ln = zsl->tail;
+            if (start > 0)
+                ln = zslGetElementByRank(zsl,zsetlen-start);
+        } else {
+            ln = zsl->header->level[0].forward;
+            if (start > 0)
+                ln = zslGetElementByRank(zsl,start+1);
+        }
+
+        while(rangelen--) {
+            redisAssertWithInfo(c,sortval,ln != NULL);
+            ele = ln->obj;
+            vector[j].obj = ele;
+            vector[j].u.score = 0;
+            vector[j].u.cmpobj = NULL;
+            j++;
+            ln = desc ? ln->backward : ln->level[0].forward;
+        }
+        /* The code producing the output does not know that in the case of
+         * sorted set, 'dontsort', and LIMIT, we are able to get just the
+         * range, already sorted, so we need to adjust "start" and "end"
+         * to make sure start is set to 0. */
+        end -= start;
+        start = 0;
     } else if (sortval->type == REDIS_ZSET) {
         dict *set = ((zset*)sortval->ptr)->dict;
         dictIterator *di;
@@ -316,17 +397,6 @@ void sortCommand(redisClient *c) {
         }
     }
 
-    /* We are ready to sort the vector... perform a bit of sanity check
-     * on the LIMIT option too. We'll use a partial version of quicksort. */
-    start = (limit_start < 0) ? 0 : limit_start;
-    end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
-    if (start >= vectorlen) {
-        start = vectorlen-1;
-        end = vectorlen-2;
-    }
-    if (end >= vectorlen) end = vectorlen-1;
-
-    server.sort_dontsort = dontsort;
     if (dontsort == 0) {
         server.sort_desc = desc;
         server.sort_alpha = alpha;
index 5b7a347abbcd75379ac1e314862f8c83ff3c4c97..aa021b03814b60bd35a036c3a72877898b6e92f2 100644 (file)
@@ -403,7 +403,11 @@ void hashTypeConvertZiplist(robj *o, int enc) {
             value = hashTypeCurrentObject(hi, REDIS_HASH_VALUE);
             value = tryObjectEncoding(value);
             ret = dictAdd(dict, field, value);
-            redisAssert(ret == DICT_OK);
+            if (ret != DICT_OK) {
+                redisLogHexDump(REDIS_WARNING,"ziplist with dup elements dump",
+                    o->ptr,ziplistBlobLen(o->ptr));
+                redisAssert(ret == DICT_OK);
+            }
         }
 
         hashTypeReleaseIterator(hi);
index ca03916b953fd1eb91d9e80b506b6646d97e45f4..77e40eb6b215dbe69036bc7ac00595e688072b4d 100644 (file)
@@ -1,5 +1,7 @@
 #include "redis.h"
 
+void signalListAsReady(redisClient *c, robj *key);
+
 /*-----------------------------------------------------------------------------
  * List API
  *----------------------------------------------------------------------------*/
@@ -14,6 +16,11 @@ void listTypeTryConversion(robj *subject, robj *value) {
             listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
 }
 
+/* The function pushes an elmenet to the specified list object 'subject',
+ * at head or tail position as specified by 'where'.
+ *
+ * There is no need for the caller to incremnet the refcount of 'value' as
+ * the function takes care of it if needed. */
 void listTypePush(robj *subject, robj *value, int where) {
     /* Check if we need to convert the ziplist */
     listTypeTryConversion(subject,value);
@@ -268,16 +275,10 @@ void pushGenericCommand(redisClient *c, int where) {
         return;
     }
 
+    if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
+
     for (j = 2; j < c->argc; j++) {
         c->argv[j] = tryObjectEncoding(c->argv[j]);
-        if (may_have_waiting_clients) {
-            if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
-                waiting++;
-                continue;
-            } else {
-                may_have_waiting_clients = 0;
-            }
-        }
         if (!lobj) {
             lobj = createZiplistObject();
             dbAdd(c->db,c->argv[1],lobj);
@@ -288,18 +289,6 @@ void pushGenericCommand(redisClient *c, int where) {
     addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
     if (pushed) signalModifiedKey(c->db,c->argv[1]);
     server.dirty += pushed;
-
-    /* Alter the replication of the command accordingly to the number of
-     * list elements delivered to clients waiting into a blocking operation.
-     * We do that only if there were waiting clients, and only if still some
-     * element was pushed into the list (othewise dirty is 0 and nothign will
-     * be propagated). */
-    if (waiting && pushed) {
-        /* CMD KEY a b C D E */
-        for (j = 0; j < waiting; j++) decrRefCount(c->argv[j+2]);
-        memmove(c->argv+2,c->argv+2+waiting,sizeof(robj*)*pushed);
-        c->argc -= waiting;
-    }
 }
 
 void lpushCommand(redisClient *c) {
@@ -666,29 +655,15 @@ void lremCommand(redisClient *c) {
  * as well. This command was originally proposed by Ezra Zygmuntowicz.
  */
 
-void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
-    if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
-        /* Create the list if the key does not exist */
-        if (!dstobj) {
-            dstobj = createZiplistObject();
-            dbAdd(c->db,dstkey,dstobj);
-        } else {
-            signalModifiedKey(c->db,dstkey);
-        }
-        listTypePush(dstobj,value,REDIS_HEAD);
-        /* Additionally propagate this PUSH operation together with
-         * the operation performed by the command. */
-        {
-            robj **argv = zmalloc(sizeof(robj*)*3);
-            argv[0] = createStringObject("LPUSH",5);
-            argv[1] = dstkey;
-            argv[2] = value;
-            incrRefCount(argv[1]);
-            incrRefCount(argv[2]);
-            alsoPropagate(server.lpushCommand,c->db->id,argv,3,
-                          REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
-        }
+void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
+    /* Create the list if the key does not exist */
+    if (!dstobj) {
+        dstobj = createZiplistObject();
+        dbAdd(c->db,dstkey,dstobj);
+        signalListAsReady(c,dstkey);
     }
+    signalModifiedKey(c->db,dstkey);
+    listTypePush(dstobj,value,REDIS_HEAD);
     /* Always send the pushed value to the client. */
     addReplyBulk(c,value);
 }
@@ -709,9 +684,10 @@ void rpoplpushCommand(redisClient *c) {
         if (dobj && checkType(c,dobj,REDIS_LIST)) return;
         value = listTypePop(sobj,REDIS_TAIL);
         /* We saved touched key, and protect it, since rpoplpushHandlePush
-         * may change the client command argument vector. */
+         * may change the client command argument vector (it does not
+         * currently). */
         incrRefCount(touchedkey);
-        rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
+        rpoplpushHandlePush(c,c->argv[2],dobj,value);
 
         /* listTypePop returns an object with its refcount incremented */
         decrRefCount(value);
@@ -721,13 +697,6 @@ void rpoplpushCommand(redisClient *c) {
         signalModifiedKey(c->db,touchedkey);
         decrRefCount(touchedkey);
         server.dirty++;
-
-        /* Replicate this as a simple RPOP since the LPUSH side is replicated
-         * by rpoplpushHandlePush() call if needed (it may not be needed
-         * if a client is blocking wait a push against the list). */
-        rewriteClientCommandVector(c,2,
-            resetRefCount(createStringObject("RPOP",4)),
-            c->argv[1]);
     }
 }
 
@@ -735,20 +704,10 @@ void rpoplpushCommand(redisClient *c) {
  * Blocking POP operations
  *----------------------------------------------------------------------------*/
 
-/* Currently Redis blocking operations support is limited to list POP ops,
- * so the current implementation is not fully generic, but it is also not
- * completely specific so it will not require a rewrite to support new
- * kind of blocking operations in the future.
- *
- * Still it's important to note that list blocking operations can be already
- * used as a notification mechanism in order to implement other blocking
- * operations at application level, so there must be a very strong evidence
- * of usefulness and generality before new blocking operations are implemented.
- *
- * This is how the current blocking POP works, we use BLPOP as example:
+/* This is how the current blocking POP works, we use BLPOP as example:
  * - If the user calls BLPOP and the key exists and contains a non empty list
  *   then LPOP is called instead. So BLPOP is semantically the same as LPOP
- *   if there is not to block.
+ *   if blocking is not required.
  * - If instead BLPOP is called and the key does not exists or the list is
  *   empty we need to block. In order to do so we remove the notification for
  *   new data to read in the client socket (so that we'll not serve new
@@ -756,12 +715,10 @@ void rpoplpushCommand(redisClient *c) {
  *   in a dictionary (db->blocking_keys) mapping keys to a list of clients
  *   blocking for this keys.
  * - If a PUSH operation against a key with blocked clients waiting is
- *   performed, we serve the first in the list: basically instead to push
- *   the new element inside the list we return it to the (first / oldest)
- *   blocking client, unblock the client, and remove it form the list.
- *
- * The above comment and the source code should be enough in order to understand
- * the implementation and modify / fix it later.
+ *   performed, we mark this key as "ready", and after the current command,
+ *   MULTI/EXEC block, or script, is executed, we serve all the clients waiting
+ *   for this list, from the one that blocked first, to the last, accordingly
+ *   to the number of elements we have in the ready list.
  */
 
 /* Set a client in blocking mode for the specified key, with the specified
@@ -836,68 +793,192 @@ void unblockClientWaitingData(redisClient *c) {
     listAddNodeTail(server.unblocked_clients,c);
 }
 
-/* This should be called from any function PUSHing into lists.
- * 'c' is the "pushing client", 'key' is the key it is pushing data against,
- * 'ele' is the element pushed.
+/* If the specified key has clients blocked waiting for list pushes, this
+ * function will put the key reference into the server.ready_keys list.
+ * Note that db->ready_keys is an hash table that allows us to avoid putting
+ * the same key agains and again in the list in case of multiple pushes
+ * made by a script or in the context of MULTI/EXEC.
  *
- * If the function returns 0 there was no client waiting for a list push
- * against this key.
+ * The list will be finally processed by handleClientsBlockedOnLists() */
+void signalListAsReady(redisClient *c, robj *key) {
+    readyList *rl;
+
+    /* No clients blocking for this key? No need to queue it. */
+    if (dictFind(c->db->blocking_keys,key) == NULL) return;
+
+    /* Key was already signaled? No need to queue it again. */
+    if (dictFind(c->db->ready_keys,key) != NULL) return;
+
+    /* Ok, we need to queue this key into server.ready_keys. */
+    rl = zmalloc(sizeof(*rl));
+    rl->key = key;
+    rl->db = c->db;
+    incrRefCount(key);
+    listAddNodeTail(server.ready_keys,rl);
+
+    /* We also add the key in the db->ready_keys dictionary in order
+     * to avoid adding it multiple times into a list with a simple O(1)
+     * check. */
+    incrRefCount(key);
+    redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
+}
+
+/* This is an helper function for handleClientsBlockedOnLists(). It's work
+ * is to serve a specific client (receiver) that is blocked on 'key'
+ * in the context of the specified 'db', doing the following:
  *
- * If the function returns 1 there was a client waiting for a list push
- * against this key, the element was passed to this client thus it's not
- * needed to actually add it to the list and the caller should return asap. */
-int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
-    struct dictEntry *de;
-    redisClient *receiver;
-    int numclients;
-    list *clients;
-    listNode *ln;
-    robj *dstkey, *dstobj;
-
-    de = dictFind(c->db->blocking_keys,key);
-    if (de == NULL) return 0;
-    clients = dictGetVal(de);
-    numclients = listLength(clients);
-
-    /* Try to handle the push as long as there are clients waiting for a push.
-     * Note that "numclients" is used because the list of clients waiting for a
-     * push on "key" is deleted by unblockClient() when empty.
-     *
-     * This loop will have more than 1 iteration when there is a BRPOPLPUSH
-     * that cannot push the target list because it does not contain a list. If
-     * this happens, it simply tries the next client waiting for a push. */
-    while (numclients--) {
-        ln = listFirst(clients);
-        redisAssertWithInfo(c,key,ln != NULL);
-        receiver = ln->value;
-        dstkey = receiver->bpop.target;
-
-        /* Protect receiver->bpop.target, that will be freed by
-         * the next unblockClientWaitingData() call. */
-        if (dstkey) incrRefCount(dstkey);
-
-        /* This should remove the first element of the "clients" list. */
-        unblockClientWaitingData(receiver);
-
-        if (dstkey == NULL) {
-            /* BRPOP/BLPOP */
-            addReplyMultiBulkLen(receiver,2);
-            addReplyBulk(receiver,key);
-            addReplyBulk(receiver,ele);
-            return 1; /* Serve just the first client as in B[RL]POP semantics */
+ * 1) Provide the client with the 'value' element.
+ * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
+ *    'value' element on the destionation list (the LPUSH side of the command).
+ * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
+ *    the AOF and replication channel.
+ *
+ * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
+ * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
+ * we can propagate the command properly.
+ *
+ * The function returns REDIS_OK if we are able to serve the client, otherwise
+ * REDIS_ERR is returned to signal the caller that the list POP operation
+ * should be undoed as the client was not served: This only happens for
+ * BRPOPLPUSH that fails to push the value to the destination key as it is
+ * of the wrong type. */
+int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
+{
+    robj *argv[3];
+
+    if (dstkey == NULL) {
+        /* Propagate the [LR]POP operation. */
+        argv[0] = (where == REDIS_HEAD) ? shared.lpop :
+                                          shared.rpop;
+        argv[1] = key;
+        propagate((where == REDIS_HEAD) ?
+            server.lpopCommand : server.rpopCommand,
+            db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
+
+        /* BRPOP/BLPOP */
+        addReplyMultiBulkLen(receiver,2);
+        addReplyBulk(receiver,key);
+        addReplyBulk(receiver,value);
+    } else {
+        /* BRPOPLPUSH */
+        robj *dstobj =
+            lookupKeyWrite(receiver->db,dstkey);
+        if (!(dstobj &&
+             checkType(receiver,dstobj,REDIS_LIST)))
+        {
+            /* Propagate the RPOP operation. */
+            argv[0] = shared.rpop;
+            argv[1] = key;
+            propagate(server.rpopCommand,
+                db->id,argv,2,
+                REDIS_PROPAGATE_AOF|
+                REDIS_PROPAGATE_REPL);
+            rpoplpushHandlePush(receiver,dstkey,dstobj,
+                value);
+            /* Propagate the LPUSH operation. */
+            argv[0] = shared.lpush;
+            argv[1] = dstkey;
+            argv[2] = value;
+            propagate(server.lpushCommand,
+                db->id,argv,3,
+                REDIS_PROPAGATE_AOF|
+                REDIS_PROPAGATE_REPL);
         } else {
-            /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
-            dstobj = lookupKeyWrite(receiver->db,dstkey);
-            if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
-                rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
-                decrRefCount(dstkey);
-                return 1;
-            }
-            decrRefCount(dstkey);
+            /* BRPOPLPUSH failed because of wrong
+             * destination type. */
+            return REDIS_ERR;
         }
     }
+    return REDIS_OK;
+}
 
-    return 0;
+/* This function should be called by Redis every time a single command,
+ * a MULTI/EXEC block, or a Lua script, terminated its execution after
+ * being called by a client.
+ *
+ * All the keys with at least one client blocked that received at least
+ * one new element via some PUSH operation are accumulated into
+ * the server.ready_keys list. This function will run the list and will
+ * serve clients accordingly. Note that the function will iterate again and
+ * again as a result of serving BRPOPLPUSH we can have new blocking clients
+ * to serve because of the PUSH side of BRPOPLPUSH. */
+void handleClientsBlockedOnLists(void) {
+    while(listLength(server.ready_keys) != 0) {
+        list *l;
+
+        /* Point server.ready_keys to a fresh list and save the current one
+         * locally. This way as we run the old list we are free to call
+         * signalListAsReady() that may push new elements in server.ready_keys
+         * when handling clients blocked into BRPOPLPUSH. */
+        l = server.ready_keys;
+        server.ready_keys = listCreate();
+
+        while(listLength(l) != 0) {
+            listNode *ln = listFirst(l);
+            readyList *rl = ln->value;
+
+            /* First of all remove this key from db->ready_keys so that
+             * we can safely call signalListAsReady() against this key. */
+            dictDelete(rl->db->ready_keys,rl->key);
+
+            /* If the key exists and it's a list, serve blocked clients
+             * with data. */
+            robj *o = lookupKeyWrite(rl->db,rl->key);
+            if (o != NULL && o->type == REDIS_LIST) {
+                dictEntry *de;
+
+                /* We serve clients in the same order they blocked for
+                 * this key, from the first blocked to the last. */
+                de = dictFind(rl->db->blocking_keys,rl->key);
+                if (de) {
+                    list *clients = dictGetVal(de);
+                    int numclients = listLength(clients);
+
+                    while(numclients--) {
+                        listNode *clientnode = listFirst(clients);
+                        redisClient *receiver = clientnode->value;
+                        robj *dstkey = receiver->bpop.target;
+                        int where = (receiver->lastcmd &&
+                                     receiver->lastcmd->proc == blpopCommand) ?
+                                    REDIS_HEAD : REDIS_TAIL;
+                        robj *value = listTypePop(o,where);
+
+                        if (value) {
+                            /* Protect receiver->bpop.target, that will be
+                             * freed by the next unblockClientWaitingData()
+                             * call. */
+                            if (dstkey) incrRefCount(dstkey);
+                            unblockClientWaitingData(receiver);
+
+                            if (serveClientBlockedOnList(receiver,
+                                rl->key,dstkey,rl->db,value,
+                                where) == REDIS_ERR)
+                            {
+                                /* If we failed serving the client we need
+                                 * to also undo the POP operation. */
+                                    listTypePush(o,value,where);
+                            }
+
+                            if (dstkey) decrRefCount(dstkey);
+                            decrRefCount(value);
+                        } else {
+                            break;
+                        }
+                    }
+                }
+                
+                if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
+                /* We don't call signalModifiedKey() as it was already called
+                 * when an element was pushed on the list. */
+            }
+
+            /* Free this item. */
+            decrRefCount(rl->key);
+            zfree(rl);
+            listDelNode(l,ln);
+        }
+        listRelease(l); /* We have the new list on place at this point. */
+    }
 }
 
 int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
@@ -986,7 +1067,6 @@ void brpoplpushCommand(redisClient *c) {
 
     if (key == NULL) {
         if (c->flags & REDIS_MULTI) {
-
             /* Blocking against an empty list in a multi state
              * returns immediately. */
             addReply(c, shared.nullbulk);
@@ -998,7 +1078,6 @@ void brpoplpushCommand(redisClient *c) {
         if (key->type != REDIS_LIST) {
             addReply(c, shared.wrongtypeerr);
         } else {
-
             /* The list exists and has elements, so
              * the regular rpoplpushCommand is executed. */
             redisAssertWithInfo(c,key,listTypeLength(key) > 0);
index 3cf1cf005ae7f79d2b919e036d3ceeaf17ce1fc4..fada1095aef1f6317a976e6f665de022de0962b2 100644 (file)
@@ -4,6 +4,8 @@
  * Set Commands
  *----------------------------------------------------------------------------*/
 
+void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op);
+
 /* Factory method to return a set that *can* hold "value". When the object has
  * an integer-encodable value, an intset will be returned. Otherwise a regular
  * hash table. */
@@ -185,7 +187,7 @@ unsigned long setTypeSize(robj *subject) {
 }
 
 /* Convert the set to specified encoding. The resulting dict (when converting
- * to a hashtable) is presized to hold the number of elements in the original
+ * to a hash table) is presized to hold the number of elements in the original
  * set. */
 void setTypeConvert(robj *setobj, int enc) {
     setTypeIterator *si;
@@ -360,11 +362,165 @@ void spopCommand(redisClient *c) {
     server.dirty++;
 }
 
+/* handle the "SRANDMEMBER key <count>" variant. The normal version of the
+ * command is handled by the srandmemberCommand() function itself. */
+
+/* How many times bigger should be the set compared to the requested size
+ * for us to don't use the "remove elements" strategy? Read later in the
+ * implementation for more info. */
+#define SRANDMEMBER_SUB_STRATEGY_MUL 3
+
+void srandmemberWithCountCommand(redisClient *c) {
+    long l;
+    unsigned long count, size;
+    int uniq = 1;
+    robj *set, *ele;
+    int64_t llele;
+    int encoding;
+
+    dict *d;
+
+    if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != REDIS_OK) return;
+    if (l >= 0) {
+        count = (unsigned) l;
+    } else {
+        /* A negative count means: return the same elements multiple times
+         * (i.e. don't remove the extracted element after every extraction). */
+        count = -l;
+        uniq = 0;
+    }
+
+    if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk))
+        == NULL || checkType(c,set,REDIS_SET)) return;
+    size = setTypeSize(set);
+
+    /* If count is zero, serve it ASAP to avoid special cases later. */
+    if (count == 0) {
+        addReply(c,shared.emptymultibulk);
+        return;
+    }
+
+    /* CASE 1: The count was negative, so the extraction method is just:
+     * "return N random elements" sampling the whole set every time.
+     * This case is trivial and can be served without auxiliary data
+     * structures. */
+    if (!uniq) {
+        addReplyMultiBulkLen(c,count);
+        while(count--) {
+            encoding = setTypeRandomElement(set,&ele,&llele);
+            if (encoding == REDIS_ENCODING_INTSET) {
+                addReplyBulkLongLong(c,llele);
+            } else {
+                addReplyBulk(c,ele);
+            }
+        }
+        return;
+    }
+
+    /* CASE 2:
+     * The number of requested elements is greater than the number of
+     * elements inside the set: simply return the whole set. */
+    if (count >= size) {
+        sunionDiffGenericCommand(c,c->argv,c->argc-1,NULL,REDIS_OP_UNION);
+        return;
+    }
+
+    /* For CASE 3 and CASE 4 we need an auxiliary dictionary. */
+    d = dictCreate(&setDictType,NULL);
+
+    /* CASE 3:
+     * The number of elements inside the set is not greater than
+     * SRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements.
+     * In this case we create a set from scratch with all the elements, and
+     * subtract random elements to reach the requested number of elements.
+     *
+     * This is done because if the number of requsted elements is just
+     * a bit less than the number of elements in the set, the natural approach
+     * used into CASE 3 is highly inefficient. */
+    if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) {
+        setTypeIterator *si;
+
+        /* Add all the elements into the temporary dictionary. */
+        si = setTypeInitIterator(set);
+        while((encoding = setTypeNext(si,&ele,&llele)) != -1) {
+            int retval;
+
+            if (encoding == REDIS_ENCODING_INTSET) {
+                retval = dictAdd(d,createStringObjectFromLongLong(llele),NULL);
+            } else if (ele->encoding == REDIS_ENCODING_RAW) {
+                retval = dictAdd(d,dupStringObject(ele),NULL);
+            } else if (ele->encoding == REDIS_ENCODING_INT) {
+                retval = dictAdd(d,
+                    createStringObjectFromLongLong((long)ele->ptr),NULL);
+            }
+            redisAssert(retval == DICT_OK);
+        }
+        setTypeReleaseIterator(si);
+        redisAssert(dictSize(d) == size);
+
+        /* Remove random elements to reach the right count. */
+        while(size > count) {
+            dictEntry *de;
+
+            de = dictGetRandomKey(d);
+            dictDelete(d,dictGetKey(de));
+            size--;
+        }
+    }
+    
+    /* CASE 4: We have a big set compared to the requested number of elements.
+     * In this case we can simply get random elements from the set and add
+     * to the temporary set, trying to eventually get enough unique elements
+     * to reach the specified count. */
+    else {
+        unsigned long added = 0;
+
+        while(added < count) {
+            encoding = setTypeRandomElement(set,&ele,&llele);
+            if (encoding == REDIS_ENCODING_INTSET) {
+                ele = createStringObjectFromLongLong(llele);
+            } else if (ele->encoding == REDIS_ENCODING_RAW) {
+                ele = dupStringObject(ele);
+            } else if (ele->encoding == REDIS_ENCODING_INT) {
+                ele = createStringObjectFromLongLong((long)ele->ptr);
+            }
+            /* Try to add the object to the dictionary. If it already exists
+             * free it, otherwise increment the number of objects we have
+             * in the result dictionary. */
+            if (dictAdd(d,ele,NULL) == DICT_OK)
+                added++;
+            else
+                decrRefCount(ele);
+        }
+    }
+
+    /* CASE 3 & 4: send the result to the user. */
+    {
+        dictIterator *di;
+        dictEntry *de;
+
+        addReplyMultiBulkLen(c,count);
+        di = dictGetIterator(d);
+        while((de = dictNext(di)) != NULL)
+            addReplyBulk(c,dictGetKey(de));
+        dictReleaseIterator(di);
+        dictRelease(d);
+    }
+}
+
 void srandmemberCommand(redisClient *c) {
     robj *set, *ele;
     int64_t llele;
     int encoding;
 
+    if (c->argc == 3) {
+        srandmemberWithCountCommand(c);
+        return;
+    } else if (c->argc > 3) {
+        addReply(c,shared.syntaxerr);
+        return;
+    }
+
     if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
         checkType(c,set,REDIS_SET)) return;
 
index bcdafc6394e47e87cf31e760f5825a5d0d961fc9..e3224f5012f90fc48fc25ee4965733ee15d03ec8 100644 (file)
@@ -7,6 +7,7 @@
 #include <math.h>
 #include <unistd.h>
 #include <sys/time.h>
+#include <float.h>
 
 #include "util.h"
 
@@ -319,7 +320,7 @@ int d2string(char *buf, size_t len, double value) {
          * integer printing function that is much faster. */
         double min = -4503599627370495; /* (2^52)-1 */
         double max = 4503599627370496; /* -(2^52) */
-        if (val > min && val < max && value == ((double)((long long)value)))
+        if (value > min && value < max && value == ((double)((long long)value)))
             len = ll2string(buf,len,(long long)value);
         else
 #endif
index e2827f572f8b98d96f4d435b50ac0ea935579e3a..6329f488ac5bc199e0f14cdb64cb95c47edb0039 100644 (file)
@@ -1 +1 @@
-#define REDIS_VERSION "2.5.10"
+#define REDIS_VERSION "2.5.13"
index 31e61633e5b1ad494468b021658ed75f455cf2cc..23bad45c656ac6edf46d532866cf46e01b8c3110 100644 (file)
@@ -317,7 +317,7 @@ static void zipSaveInteger(unsigned char *p, int64_t value, unsigned char encodi
     int32_t i32;
     int64_t i64;
     if (encoding == ZIP_INT_8B) {
-        ((char*)p)[0] = (char)value;
+        ((int8_t*)p)[0] = (int8_t)value;
     } else if (encoding == ZIP_INT_16B) {
         i16 = value;
         memcpy(p,&i16,sizeof(i16));
@@ -325,7 +325,7 @@ static void zipSaveInteger(unsigned char *p, int64_t value, unsigned char encodi
     } else if (encoding == ZIP_INT_24B) {
         i32 = value<<8;
         memrev32ifbe(&i32);
-        memcpy(p,((unsigned char*)&i32)+1,sizeof(i32)-sizeof(int8_t));
+        memcpy(p,((uint8_t*)&i32)+1,sizeof(i32)-sizeof(uint8_t));
     } else if (encoding == ZIP_INT_32B) {
         i32 = value;
         memcpy(p,&i32,sizeof(i32));
@@ -347,7 +347,7 @@ static int64_t zipLoadInteger(unsigned char *p, unsigned char encoding) {
     int32_t i32;
     int64_t i64, ret = 0;
     if (encoding == ZIP_INT_8B) {
-        ret = ((char*)p)[0];
+        ret = ((int8_t*)p)[0];
     } else if (encoding == ZIP_INT_16B) {
         memcpy(&i16,p,sizeof(i16));
         memrev16ifbe(&i16);
@@ -358,7 +358,7 @@ static int64_t zipLoadInteger(unsigned char *p, unsigned char encoding) {
         ret = i32;
     } else if (encoding == ZIP_INT_24B) {
         i32 = 0;
-        memcpy(((unsigned char*)&i32)+1,p,sizeof(i32)-sizeof(int8_t));
+        memcpy(((uint8_t*)&i32)+1,p,sizeof(i32)-sizeof(uint8_t));
         memrev32ifbe(&i32);
         ret = i32>>8;
     } else if (encoding == ZIP_INT_64B) {
@@ -500,12 +500,13 @@ static unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsig
     totlen = p-first.p;
     if (totlen > 0) {
         if (p[0] != ZIP_END) {
-            /* Tricky: storing the prevlen in this entry might reduce or
-             * increase the number of bytes needed, compared to the current
-             * prevlen. Note that we can always store this length because
-             * it was previously stored by an entry that is being deleted. */
+            /* Storing `prevrawlen` in this entry may increase or decrease the
+             * number of bytes required compare to the current `prevrawlen`.
+             * There always is room to store this, because it was previously
+             * stored by an entry that is now being deleted. */
             nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);
-            zipPrevEncodeLength(p-nextdiff,first.prevrawlen);
+            p -= nextdiff;
+            zipPrevEncodeLength(p,first.prevrawlen);
 
             /* Update offset for tail */
             ZIPLIST_TAIL_OFFSET(zl) =
@@ -521,8 +522,8 @@ static unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsig
             }
 
             /* Move tail to the front of the ziplist */
-            memmove(first.p,p-nextdiff,
-                intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1+nextdiff);
+            memmove(first.p,p,
+                intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);
         } else {
             /* The entire tail was deleted. No need to move memory. */
             ZIPLIST_TAIL_OFFSET(zl) =
@@ -805,19 +806,24 @@ unsigned char *ziplistFind(unsigned char *p, unsigned char *vstr, unsigned int v
                     return p;
                 }
             } else {
-                /* Find out if the specified entry can be encoded */
+                /* Find out if the searched field can be encoded. Note that
+                 * we do it only the first time, once done vencoding is set
+                 * to non-zero and vll is set to the integer value. */
                 if (vencoding == 0) {
-                    /* UINT_MAX when the entry CANNOT be encoded */
                     if (!zipTryEncoding(vstr, vlen, &vll, &vencoding)) {
+                        /* If the entry can't be encoded we set it to
+                         * UCHAR_MAX so that we don't retry again the next
+                         * time. */
                         vencoding = UCHAR_MAX;
                     }
-
                     /* Must be non-zero by now */
                     assert(vencoding);
                 }
 
-                /* Compare current entry with specified entry */
-                if (encoding == vencoding) {
+                /* Compare current entry with specified entry, do it only
+                 * if vencoding != UCHAR_MAX because if there is no encoding
+                 * possible for the field it can't be a valid integer. */
+                if (vencoding != UCHAR_MAX) {
                     long long ll = zipLoadInteger(q, encoding);
                     if (ll == vll) {
                         return p;
@@ -1029,6 +1035,22 @@ int randstring(char *target, unsigned int min, unsigned int max) {
     return len;
 }
 
+void verify(unsigned char *zl, zlentry *e) {
+    int i;
+    int len = ziplistLen(zl);
+    zlentry _e;
+
+    for (i = 0; i < len; i++) {
+        memset(&e[i], 0, sizeof(zlentry));
+        e[i] = zipEntry(ziplistIndex(zl, i));
+
+        memset(&_e, 0, sizeof(zlentry));
+        _e = zipEntry(ziplistIndex(zl, -len+i));
+
+        assert(memcmp(&e[i], &_e, sizeof(zlentry)) == 0);
+    }
+}
+
 int main(int argc, char **argv) {
     unsigned char *zl, *p;
     unsigned char *entry;
@@ -1310,6 +1332,43 @@ int main(int argc, char **argv) {
         printf("SUCCESS\n\n");
     }
 
+    printf("Regression test deleting next to last entries:\n");
+    {
+        char v[3][257];
+        zlentry e[3];
+        int i;
+
+        for (i = 0; i < (sizeof(v)/sizeof(v[0])); i++) {
+            memset(v[i], 'a' + i, sizeof(v[0]));
+        }
+
+        v[0][256] = '\0';
+        v[1][  1] = '\0';
+        v[2][256] = '\0';
+
+        zl = ziplistNew();
+        for (i = 0; i < (sizeof(v)/sizeof(v[0])); i++) {
+            zl = ziplistPush(zl, (unsigned char *) v[i], strlen(v[i]), ZIPLIST_TAIL);
+        }
+
+        verify(zl, e);
+
+        assert(e[0].prevrawlensize == 1);
+        assert(e[1].prevrawlensize == 5);
+        assert(e[2].prevrawlensize == 1);
+
+        /* Deleting entry 1 will increase `prevrawlensize` for entry 2 */
+        unsigned char *p = e[1].p;
+        zl = ziplistDelete(zl, &p);
+
+        verify(zl, e);
+
+        assert(e[0].prevrawlensize == 1);
+        assert(e[1].prevrawlensize == 5);
+
+        printf("SUCCESS\n\n");
+    }
+
     printf("Create long list and check indices:\n");
     {
         zl = ziplistNew();
index 1f11fd429ea016a303a04cf6d78645bb1d8fa60f..d9b7c8b31e075393eff0e5410ecc8f9444bb9ebd 100644 (file)
  * <len> lengths are encoded in a single value or in a 5 bytes value.
  * If the first byte value (as an unsigned 8 bit value) is between 0 and
  * 252, it's a single-byte length. If it is 253 then a four bytes unsigned
- * integer follows (in the host byte ordering). A value fo 255 is used to
+ * integer follows (in the host byte ordering). A value of 255 is used to
  * signal the end of the hash. The special value 254 is used to mark
  * empty space that can be used to add new key/value pairs.
  *
- * <free> is the number of free unused bytes
- * after the string, resulting from modification of values associated to a
- * key (for instance if "foo" is set to "bar', and later "foo" will be se to
- * "hi", I'll have a free byte to use if the value will enlarge again later,
- * or even in order to add a key/value pair if it fits.
+ * <free> is the number of free unused bytes after the string, resulting 
+ * from modification of values associated to a key. For instance if "foo"
+ * is set to "bar", and later "foo" will be set to "hi", it will have a
+ * free byte to use if the value will enlarge again later, or even in
+ * order to add a key/value pair if it fits.
  *
  * <free> is always an unsigned 8 bit number, because if after an
  * update operation there are more than a few free bytes, the zipmap will be
index 79b56158613b5faf893af0670f0cea98bc2a8810..43d60a9c6691942043f396dafadce83f5b985497 100644 (file)
@@ -109,17 +109,19 @@ static size_t used_memory = 0;
 static int zmalloc_thread_safe = 0;
 pthread_mutex_t used_memory_mutex = PTHREAD_MUTEX_INITIALIZER;
 
-static void zmalloc_oom(size_t size) {
+static void zmalloc_default_oom(size_t size) {
     fprintf(stderr, "zmalloc: Out of memory trying to allocate %zu bytes\n",
         size);
     fflush(stderr);
     abort();
 }
 
+static void (*zmalloc_oom_handler)(size_t) = zmalloc_default_oom;
+
 void *zmalloc(size_t size) {
     void *ptr = malloc(size+PREFIX_SIZE);
 
-    if (!ptr) zmalloc_oom(size);
+    if (!ptr) zmalloc_oom_handler(size);
 #ifdef HAVE_MALLOC_SIZE
     update_zmalloc_stat_alloc(zmalloc_size(ptr),size);
     return ptr;
@@ -133,7 +135,7 @@ void *zmalloc(size_t size) {
 void *zcalloc(size_t size) {
     void *ptr = calloc(1, size+PREFIX_SIZE);
 
-    if (!ptr) zmalloc_oom(size);
+    if (!ptr) zmalloc_oom_handler(size);
 #ifdef HAVE_MALLOC_SIZE
     update_zmalloc_stat_alloc(zmalloc_size(ptr),size);
     return ptr;
@@ -155,7 +157,7 @@ void *zrealloc(void *ptr, size_t size) {
 #ifdef HAVE_MALLOC_SIZE
     oldsize = zmalloc_size(ptr);
     newptr = realloc(ptr,size);
-    if (!newptr) zmalloc_oom(size);
+    if (!newptr) zmalloc_oom_handler(size);
 
     update_zmalloc_stat_free(oldsize);
     update_zmalloc_stat_alloc(zmalloc_size(newptr),size);
@@ -164,7 +166,7 @@ void *zrealloc(void *ptr, size_t size) {
     realptr = (char*)ptr-PREFIX_SIZE;
     oldsize = *((size_t*)realptr);
     newptr = realloc(realptr,size+PREFIX_SIZE);
-    if (!newptr) zmalloc_oom(size);
+    if (!newptr) zmalloc_oom_handler(size);
 
     *((size_t*)newptr) = size;
     update_zmalloc_stat_free(oldsize);
@@ -236,6 +238,10 @@ void zmalloc_enable_thread_safeness(void) {
     zmalloc_thread_safe = 1;
 }
 
+void zmalloc_set_oom_handler(void (*oom_handler)(size_t)) {
+    zmalloc_oom_handler = oom_handler;
+}
+
 /* Get the RSS information in an OS-specific way.
  *
  * WARNING: the function zmalloc_get_rss() is not designed to be fast
index 89f5b6ee046ed9b9b4fcf6b236981396a076a364..14e79534e17dccd455975e1e328d59741aecb5ca 100644 (file)
@@ -72,6 +72,7 @@ void zfree(void *ptr);
 char *zstrdup(const char *s);
 size_t zmalloc_used_memory(void);
 void zmalloc_enable_thread_safeness(void);
+void zmalloc_set_oom_handler(void (*oom_handler)(size_t));
 float zmalloc_get_fragmentation_ratio(void);
 size_t zmalloc_get_rss(void);
 void zlibc_free(void *ptr);
index 18e639d41f6bab6b40c696143211645f0896db07..da94b088065034797e010f87dfe7b5f3e737cd8f 100644 (file)
@@ -61,9 +61,13 @@ start_server {tags {"repl"}} {
         
         test {SET on the master should immediately propagate} {
             r -1 set mykey bar
-            if {$::valgrind} {after 2000}
-            r  0 get mykey
-        } {bar}
+
+            wait_for_condition 500 100 {
+                [r  0 get mykey] eq {bar}
+            } else {
+                fail "SET on master did not propagated on slave"
+            }
+        }
 
         test {FLUSHALL should replicate} {
             r -1 flushall
index 96af279d84118cbcbe9ea0855ceb9d14c682b914..48d06b741575f31839371b8a7ae962d47f8d1c7f 100644 (file)
@@ -95,6 +95,14 @@ proc randomInt {max} {
     expr {int(rand()*$max)}
 }
 
+proc randomSignedInt {max} {
+    set i [randomInt $max]
+    if {rand() > 0.5} {
+        set i -$i
+    }
+    return $i
+}
+
 proc randpath args {
     set path [expr {int(rand()*[llength $args])}]
     uplevel 1 [lindex $args $path]
@@ -103,13 +111,13 @@ proc randpath args {
 proc randomValue {} {
     randpath {
         # Small enough to likely collide
-        randomInt 1000
+        randomSignedInt 1000
     } {
         # 32 bit compressible signed/unsigned
-        randpath {randomInt 2000000000} {randomInt 4000000000}
+        randpath {randomSignedInt 2000000000} {randomSignedInt 4000000000}
     } {
         # 64 bit
-        randpath {randomInt 1000000000000}
+        randpath {randomSignedInt 1000000000000}
     } {
         # Random string
         randpath {randstring 0 256 alpha} \
index 0e3403bfe1818ee043788127f6cf430902418497..127a0e680346ad4d0e783f1751b5e6151f1a1bb7 100644 (file)
@@ -73,6 +73,17 @@ start_server {tags {"bitops"}} {
         set e
     } {ERR*syntax*}
 
+    test {BITCOUNT regression test for github issue #582} {
+        r del str
+        r setbit foo 0 1
+        if {[catch {r bitcount foo 0 4294967296} e]} {
+            assert_match {*ERR*out of range*} $e
+            set _ 1
+        } else {
+            set e
+        }
+    } {1}
+
     test {BITOP NOT (empty string)} {
         r set s ""
         r bitop not dest s
index ec6e5c2a6be5fface52a5e69d75164e3d1d2adb0..6dbdb6b63d7df755ec74961a46657dd8909da5bf 100644 (file)
@@ -145,6 +145,12 @@ start_server {tags {"scripting"}} {
         set e
     } {*not allowed after*}
 
+    test {EVAL - No arguments to redis.call/pcall is considered an error} {
+        set e {}
+        catch {r eval {return redis.call()} 0} e
+        set e
+    } {*one argument*}
+
     test {EVAL - redis.call variant raises a Lua error on Redis cmd error (1)} {
         set e {}
         catch {
@@ -197,23 +203,23 @@ start_server {tags {"scripting"}} {
         r eval {return redis.call('smembers','myset')} 0
     } {a aa aaa azz b c d e f g h i l m n o p q r s t u v z}
 
-    test "SORT is normally not re-ordered by the scripting engine" {
+    test "SORT is normally not alpha re-ordered for the scripting engine" {
         r del myset
         r sadd myset 1 2 3 4 10
         r eval {return redis.call('sort','myset','desc')} 0
     } {10 4 3 2 1}
 
-    test "SORT BY <constant> output gets ordered by scripting" {
+    test "SORT BY <constant> output gets ordered for scripting" {
         r del myset
         r sadd myset a b c d e f g h i l m n o p q r s t u v z aa aaa azz
         r eval {return redis.call('sort','myset','by','_')} 0
     } {a aa aaa azz b c d e f g h i l m n o p q r s t u v z}
 
-    test "SORT output containing NULLs is well handled by scripting" {
+    test "SORT BY <constant> with GET gets ordered for scripting" {
         r del myset
         r sadd myset a b c
         r eval {return redis.call('sort','myset','by','_','get','#','get','_:*')} 0
-    } {{} {} {} a b c}
+    } {a {} b {} c {}}
 
     test "redis.sha1hex() implementation" {
         list [r eval {return redis.sha1hex('')} 0] \
@@ -338,5 +344,22 @@ start_server {tags {"scripting repl"}} {
                 fail "Expected 2 in x, but value is '[r -1 get x]'"
             }
         }
+
+        test {Replication of script multiple pushes to list with BLPOP} {
+            set rd [redis_deferring_client]
+            $rd brpop a 0
+            r eval {
+                redis.call("lpush","a","1");
+                redis.call("lpush","a","2");
+            } 0
+            set res [$rd read]
+            $rd close
+            wait_for_condition 50 100 {
+                [r -1 lrange a 0 -1] eq [r lrange a 0 -1]
+            } else {
+                fail "Expected list 'a' in slave and master to be the same, but they are respectively '[r -1 lrange a 0 -1]' and '[r lrange a 0 -1]'"
+            }
+            set res
+        } {a 1}
     }
 }
index 5a181641cff8a935a947e24613d726276faef666..6c5644a798da85580f543e470a286bfbba36fd26 100644 (file)
@@ -118,6 +118,47 @@ start_server {
         r sort zset alpha desc
     } {e d c b a}
 
+    test "SORT sorted set BY nosort should retain ordering" {
+        r del zset
+        r zadd zset 1 a
+        r zadd zset 5 b
+        r zadd zset 2 c
+        r zadd zset 10 d
+        r zadd zset 3 e
+        r multi
+        r sort zset by nosort asc
+        r sort zset by nosort desc
+        r exec
+    } {{a c e b d} {d b e c a}}
+
+    test "SORT sorted set BY nosort + LIMIT" {
+        r del zset
+        r zadd zset 1 a
+        r zadd zset 5 b
+        r zadd zset 2 c
+        r zadd zset 10 d
+        r zadd zset 3 e
+        assert_equal [r sort zset by nosort asc limit 0 1] {a}
+        assert_equal [r sort zset by nosort desc limit 0 1] {d}
+        assert_equal [r sort zset by nosort asc limit 0 2] {a c}
+        assert_equal [r sort zset by nosort desc limit 0 2] {d b}
+        assert_equal [r sort zset by nosort limit 5 10] {}
+        assert_equal [r sort zset by nosort limit -10 100] {a c e b d}
+    }
+
+    test "SORT sorted set BY nosort works as expected from scripts" {
+        r del zset
+        r zadd zset 1 a
+        r zadd zset 5 b
+        r zadd zset 2 c
+        r zadd zset 10 d
+        r zadd zset 3 e
+        r eval {
+            return {redis.call('sort','zset','by','nosort','asc'),
+                    redis.call('sort','zset','by','nosort','desc')}
+        } 0
+    } {{a c e b d} {d b e c a}}
+
     test "SORT sorted set: +inf and -inf handling" {
         r del zset
         r zadd zset -100 a
index 950805d1bdd3b7d23cb0a685bd21bfec3809bca5..fa52afd167ad5e2228525f701f75917a41768ecb 100644 (file)
@@ -397,7 +397,7 @@ start_server {tags {"hash"}} {
     } {b}
 
     foreach size {10 512} {
-        test "Hash fuzzing - $size fields" {
+        test "Hash fuzzing #1 - $size fields" {
             for {set times 0} {$times < 10} {incr times} {
                 catch {unset hash}
                 array set hash {}
@@ -418,5 +418,53 @@ start_server {tags {"hash"}} {
                 assert_equal [array size hash] [r hlen hash]
             }
         }
+
+        test "Hash fuzzing #2 - $size fields" {
+            for {set times 0} {$times < 10} {incr times} {
+                catch {unset hash}
+                array set hash {}
+                r del hash
+
+                # Create
+                for {set j 0} {$j < $size} {incr j} {
+                    randpath {
+                        set field [randomValue]
+                        set value [randomValue]
+                        r hset hash $field $value
+                        set hash($field) $value
+                    } {
+                        set field [randomSignedInt 512]
+                        set value [randomSignedInt 512]
+                        r hset hash $field $value
+                        set hash($field) $value
+                    } {
+                        randpath {
+                            set field [randomValue]
+                        } {
+                            set field [randomSignedInt 512]
+                        }
+                        r hdel hash $field
+                        unset -nocomplain hash($field)
+                    }
+                }
+
+                # Verify
+                foreach {k v} [array get hash] {
+                    assert_equal $v [r hget hash $k]
+                }
+                assert_equal [array size hash] [r hlen hash]
+            }
+        }
+    }
+
+    test {Stress test the hash ziplist -> hashtable encoding conversion} {
+        r config set hash-max-ziplist-entries 32
+        for {set j 0} {$j < 100} {incr j} {
+            r del myhash
+            for {set i 0} {$i < 64} {incr i} {
+                r hset myhash [randomValue] [randomValue]
+            }
+            assert {[r object encoding myhash] eq {hashtable}}
+        }
     }
 }
index 85dde5690a490093ccd3369418f6d6637e596bbc..8f598a4ab3f9402877b813368904c2a2dee9b7d2 100644 (file)
@@ -161,6 +161,47 @@ start_server {
         }
     }
 
+    test "BLPOP, LPUSH + DEL should not awake blocked client" {
+        set rd [redis_deferring_client]
+        r del list
+
+        $rd blpop list 0
+        r multi
+        r lpush list a
+        r del list
+        r exec
+        r del list
+        r lpush list b
+        $rd read
+    } {list b}
+
+    test "BLPOP, LPUSH + DEL + SET should not awake blocked client" {
+        set rd [redis_deferring_client]
+        r del list
+
+        $rd blpop list 0
+        r multi
+        r lpush list a
+        r del list
+        r set list foo
+        r exec
+        r del list
+        r lpush list b
+        $rd read
+    } {list b}
+
+    test "MULTI/EXEC is isolated from the point of view of BLPOP" {
+        set rd [redis_deferring_client]
+        r del list
+        $rd blpop list 0
+        r multi
+        r lpush list a
+        r lpush list b
+        r lpush list c
+        r exec
+        $rd read
+    } {list c}
+
     test "BLPOP with variadic LPUSH" {
         set rd [redis_deferring_client]
         r del blist target
@@ -169,8 +210,8 @@ start_server {
         if {$::valgrind} {after 100}
         assert_equal 2 [r lpush blist foo bar]
         if {$::valgrind} {after 100}
-        assert_equal {blist foo} [$rd read]
-        assert_equal bar [lindex [r lrange blist 0 -1] 0]
+        assert_equal {blist bar} [$rd read]
+        assert_equal foo [lindex [r lrange blist 0 -1] 0]
     }
 
     test "BRPOPLPUSH with zero timeout should block indefinitely" {
@@ -222,6 +263,16 @@ start_server {
         assert_equal {foo} [r lrange blist 0 -1]
     }
 
+    test "BRPOPLPUSH maintains order of elements after failure" {
+        set rd [redis_deferring_client]
+        r del blist target
+        r set target nolist
+        $rd brpoplpush blist target 0
+        r rpush blist a b c
+        assert_error "ERR*wrong kind*" {$rd read}
+        r lrange blist 0 -1
+    } {a b c}
+
     test "BRPOPLPUSH with multiple blocked clients" {
         set rd1 [redis_deferring_client]
         set rd2 [redis_deferring_client]
@@ -293,6 +344,41 @@ start_server {
         r exec
     } {foo bar {} {} {bar foo}}
 
+    test "PUSH resulting from BRPOPLPUSH affect WATCH" {
+        set blocked_client [redis_deferring_client]
+        set watching_client [redis_deferring_client]
+        r del srclist dstlist somekey
+        r set somekey somevalue
+        $blocked_client brpoplpush srclist dstlist 0
+        $watching_client watch dstlist
+        $watching_client read
+        $watching_client multi
+        $watching_client read
+        $watching_client get somekey
+        $watching_client read
+        r lpush srclist element
+        $watching_client exec
+        $watching_client read
+    } {}
+
+    test "BRPOPLPUSH does not affect WATCH while still blocked" {
+        set blocked_client [redis_deferring_client]
+        set watching_client [redis_deferring_client]
+        r del srclist dstlist somekey
+        r set somekey somevalue
+        $blocked_client brpoplpush srclist dstlist 0
+        $watching_client watch dstlist
+        $watching_client read
+        $watching_client multi
+        $watching_client read
+        $watching_client get somekey
+        $watching_client read
+        $watching_client exec
+        # Blocked BLPOPLPUSH may create problems, unblock it.
+        r lpush srclist element
+        $watching_client read
+    } {somevalue}
+
     test {BRPOPLPUSH timeout} {
       set rd [redis_deferring_client]
 
index f4f2837351fc9da650363d83136e15be89fb1668..33416d944b0a8abbb48f89d3948bd43f7cd6f949 100644 (file)
@@ -271,6 +271,118 @@ start_server {
         }
     }
 
+    test "SRANDMEMBER with <count> against non existing key" {
+        r srandmember nonexisting_key 100
+    } {}
+
+    foreach {type contents} {
+        hashtable {
+            1 5 10 50 125 50000 33959417 4775547 65434162
+            12098459 427716 483706 2726473884 72615637475
+            MARY PATRICIA LINDA BARBARA ELIZABETH JENNIFER MARIA
+            SUSAN MARGARET DOROTHY LISA NANCY KAREN BETTY HELEN
+            SANDRA DONNA CAROL RUTH SHARON MICHELLE LAURA SARAH
+            KIMBERLY DEBORAH JESSICA SHIRLEY CYNTHIA ANGELA MELISSA
+            BRENDA AMY ANNA REBECCA VIRGINIA KATHLEEN
+        }
+        intset {
+            0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
+            20 21 22 23 24 25 26 27 28 29
+            30 31 32 33 34 35 36 37 38 39
+            40 41 42 43 44 45 46 47 48 49
+        }
+    } {
+        test "SRANDMEMBER with <count> - $type" {
+            create_set myset $contents
+            unset -nocomplain myset
+            array set myset {}
+            foreach ele [r smembers myset] {
+                set myset($ele) 1
+            }
+            assert_equal [lsort $contents] [lsort [array names myset]]
+
+            # Make sure that a count of 0 is handled correctly.
+            assert_equal [r srandmember myset 0] {}
+
+            # We'll stress different parts of the code, see the implementation
+            # of SRANDMEMBER for more information, but basically there are
+            # four different code paths.
+            #
+            # PATH 1: Use negative count.
+            #
+            # 1) Check that it returns repeated elements.
+            set res [r srandmember myset -100]
+            assert_equal [llength $res] 100
+
+            # 2) Check that all the elements actually belong to the
+            # original set.
+            foreach ele $res {
+                assert {[info exists myset($ele)]}
+            }
+
+            # 3) Check that eventually all the elements are returned.
+            unset -nocomplain auxset
+            set iterations 1000
+            while {$iterations != 0} {
+                incr iterations -1
+                set res [r srandmember myset -10]
+                foreach ele $res {
+                    set auxset($ele) 1
+                }
+                if {[lsort [array names myset]] eq
+                    [lsort [array names auxset]]} {
+                    break;
+                }
+            }
+            assert {$iterations != 0}
+
+            # PATH 2: positive count (unique behavior) with requested size
+            # equal or greater than set size.
+            foreach size {50 100} {
+                set res [r srandmember myset $size]
+                assert_equal [llength $res] 50
+                assert_equal [lsort $res] [lsort [array names myset]]
+            }
+
+            # PATH 3: Ask almost as elements as there are in the set.
+            # In this case the implementation will duplicate the original
+            # set and will remove random elements up to the requested size.
+            #
+            # PATH 4: Ask a number of elements definitely smaller than
+            # the set size.
+            #
+            # We can test both the code paths just changing the size but
+            # using the same code.
+
+            foreach size {45 5} {
+                set res [r srandmember myset $size]
+                assert_equal [llength $res] $size
+
+                # 1) Check that all the elements actually belong to the
+                # original set.
+                foreach ele $res {
+                    assert {[info exists myset($ele)]}
+                }
+
+                # 2) Check that eventually all the elements are returned.
+                unset -nocomplain auxset
+                set iterations 1000
+                while {$iterations != 0} {
+                    incr iterations -1
+                    set res [r srandmember myset -10]
+                    foreach ele $res {
+                        set auxset($ele) 1
+                    }
+                    if {[lsort [array names myset]] eq
+                        [lsort [array names auxset]]} {
+                        break;
+                    }
+                }
+                assert {$iterations != 0}
+            }
+        }
+    }
+
     proc setup_move {} {
         r del myset3 myset4
         create_set myset1 {1 a b}