]> git.saurik.com Git - redis.git/blobdiff - src/redis.c
Further integrate MDB key archival into config.
[redis.git] / src / redis.c
index e12d27cc339c35bd96c8c4ce0ac1b7b6341035b0..eed212635dc3882626fe76043f64f54c92fdd02e 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -49,6 +49,7 @@
 #include <math.h>
 #include <sys/resource.h>
 #include <sys/utsname.h>
+#include <lmdb.h>
 
 /* Our shared "common" objects */
 
@@ -106,6 +107,11 @@ struct redisCommand *commandTable;
  *    results. For instance SPOP and RANDOMKEY are two random commands.
  * S: Sort command output array if called from script, so that the output
  *    is deterministic.
+ * l: Allow command while loading the database.
+ * t: Allow command while a slave has stale data but is not allowed to
+ *    server this data. Normally no command is accepted in this condition
+ *    but just a few.
+ * M: Do not automatically propagate the command on MONITOR.
  */
 struct redisCommand redisCommandTable[] = {
     {"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
@@ -148,7 +154,7 @@ struct redisCommand redisCommandTable[] = {
     {"sismember",sismemberCommand,3,"r",0,NULL,1,1,1,0,0},
     {"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0},
     {"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0},
-    {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0},
+    {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
     {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
     {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
     {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
@@ -212,25 +218,26 @@ struct redisCommand redisCommandTable[] = {
     {"lastsave",lastsaveCommand,1,"r",0,NULL,0,0,0,0,0},
     {"type",typeCommand,2,"r",0,NULL,1,1,1,0,0},
     {"multi",multiCommand,1,"rs",0,NULL,0,0,0,0,0},
-    {"exec",execCommand,1,"s",0,NULL,0,0,0,0,0},
+    {"exec",execCommand,1,"sM",0,NULL,0,0,0,0,0},
     {"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
     {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
+    {"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0},
     {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
     {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
-    {"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0},
-    {"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0},
+    {"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0},
+    {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0},
     {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
     {"ttl",ttlCommand,2,"r",0,NULL,1,1,1,0,0},
     {"pttl",pttlCommand,2,"r",0,NULL,1,1,1,0,0},
     {"persist",persistCommand,2,"w",0,NULL,1,1,1,0,0},
-    {"slaveof",slaveofCommand,3,"as",0,NULL,0,0,0,0,0},
+    {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
     {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
     {"config",configCommand,-2,"ar",0,NULL,0,0,0,0,0},
-    {"subscribe",subscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
-    {"unsubscribe",unsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
-    {"psubscribe",psubscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
-    {"punsubscribe",punsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
-    {"publish",publishCommand,3,"pf",0,NULL,0,0,0,0,0},
+    {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
+    {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
+    {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
+    {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
+    {"publish",publishCommand,3,"pflt",0,NULL,0,0,0,0,0},
     {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
     {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
     {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
@@ -242,7 +249,9 @@ struct redisCommand redisCommandTable[] = {
     {"evalsha",evalShaCommand,-3,"s",0,zunionInterGetKeys,0,0,0,0,0},
     {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
     {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
-    {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}
+    {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0},
+    {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
+    {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0}
 };
 
 /*============================ Utility functions ============================ */
@@ -325,17 +334,6 @@ err:
     if (server.logfile) close(fd);
 }
 
-/* Redis generally does not try to recover from out of memory conditions
- * when allocating objects or strings, it is not clear if it will be possible
- * to report this condition to the client since the networking layer itself
- * is based on heap allocation for send buffers, so we simply abort.
- * At least the code will be simpler to read... */
-void oom(const char *msg) {
-    redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
-    sleep(1);
-    abort();
-}
-
 /* Return the UNIX time in microseconds */
 long long ustime(void) {
     struct timeval tv;
@@ -394,7 +392,8 @@ int dictSdsKeyCompare(void *privdata, const void *key1,
     return memcmp(key1, key2, l1) == 0;
 }
 
-/* A case insensitive version used for the command lookup table. */
+/* A case insensitive version used for the command lookup table and other
+ * places where case insensitive non binary-safe comparison is needed. */
 int dictSdsKeyCaseCompare(void *privdata, const void *key1,
         const void *key2)
 {
@@ -509,6 +508,16 @@ dictType dbDictType = {
     dictRedisObjectDestructor   /* val destructor */
 };
 
+/* server.lua_scripts sha (as sds string) -> scripts (as robj) cache. */
+dictType shaScriptObjectDictType = {
+    dictSdsCaseHash,            /* hash function */
+    NULL,                       /* key dup */
+    NULL,                       /* val dup */
+    dictSdsKeyCaseCompare,      /* key compare */
+    dictSdsDestructor,          /* key destructor */
+    dictRedisObjectDestructor   /* val destructor */
+};
+
 /* Db->expires */
 dictType keyptrDictType = {
     dictSdsHash,               /* hash function */
@@ -801,13 +810,8 @@ void clientsCron(void) {
  * a macro is used: run_with_period(milliseconds) { .... }
  */
 
-/* Using the following macro you can run code inside serverCron() with the
- * specified period, specified in milliseconds.
- * The actual resolution depends on REDIS_HZ. */
-#define run_with_period(_ms_) if (!(loops % ((_ms_)/(1000/REDIS_HZ))))
-
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
-    int j, loops = server.cronloops;
+    int j;
     REDIS_NOTUSED(eventLoop);
     REDIS_NOTUSED(id);
     REDIS_NOTUSED(clientData);
@@ -876,11 +880,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     }
 
     /* Show information about connected clients */
-    run_with_period(5000) {
-        redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
-            listLength(server.clients)-listLength(server.slaves),
-            listLength(server.slaves),
-            zmalloc_used_memory());
+    if (!server.sentinel_mode) {
+        run_with_period(5000) {
+            redisLog(REDIS_VERBOSE,
+                "%d clients connected (%d slaves), %zu bytes in use",
+                listLength(server.clients)-listLength(server.slaves),
+                listLength(server.slaves),
+                zmalloc_used_memory());
+        }
     }
 
     /* We need to do a few operations on clients asynchronously. */
@@ -907,8 +914,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
 
             if (pid == server.rdb_child_pid) {
                 backgroundSaveDoneHandler(exitcode,bysignal);
-            } else {
+            } else if (pid == server.aof_child_pid) {
                 backgroundRewriteDoneHandler(exitcode,bysignal);
+            } else {
+                redisLog(REDIS_WARNING,
+                    "Warning, detected child with unmatched pid: %ld",
+                    (long)pid);
             }
             updateDictResizePolicy();
         }
@@ -960,6 +971,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * to detect transfer failures. */
     run_with_period(1000) replicationCron();
 
+    /* Run the sentinel timer if we are in sentinel mode. */
+    run_with_period(100) {
+        if (server.sentinel_mode) sentinelTimer();
+    }
+
     server.cronloops++;
     return 1000/REDIS_HZ;
 }
@@ -1033,6 +1049,8 @@ void createSharedObjects(void) {
         "-READONLY You can't write against a read only slave.\r\n"));
     shared.oomerr = createObject(REDIS_STRING,sdsnew(
         "-OOM command not allowed when used memory > 'maxmemory'.\r\n"));
+    shared.execaborterr = createObject(REDIS_STRING,sdsnew(
+        "-EXECABORT Transaction discarded because of previous errors.\r\n"));
     shared.space = createObject(REDIS_STRING,sdsnew(" "));
     shared.colon = createObject(REDIS_STRING,sdsnew(":"));
     shared.plus = createObject(REDIS_STRING,sdsnew("+"));
@@ -1050,6 +1068,7 @@ void createSharedObjects(void) {
     shared.del = createStringObject("DEL",3);
     shared.rpop = createStringObject("RPOP",4);
     shared.lpop = createStringObject("LPOP",4);
+    shared.lpush = createStringObject("LPUSH",5);
     for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
         shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
         shared.integers[j]->encoding = REDIS_ENCODING_INT;
@@ -1091,6 +1110,9 @@ void initServerConfig() {
     server.aof_rewrite_base_size = 0;
     server.aof_rewrite_scheduled = 0;
     server.aof_last_fsync = time(NULL);
+    server.aof_rewrite_time_last = -1;
+    server.aof_rewrite_time_start = -1;
+    server.aof_lastbgrewrite_status = REDIS_OK;
     server.aof_delayed_fsync = 0;
     server.aof_fd = -1;
     server.aof_selected_db = -1; /* Make sure the first time will not match */
@@ -1098,6 +1120,9 @@ void initServerConfig() {
     server.pidfile = zstrdup("/var/run/redis.pid");
     server.rdb_filename = zstrdup("dump.rdb");
     server.aof_filename = zstrdup("appendonly.aof");
+    server.mdb_state = REDIS_MDB_OFF;
+    server.mdb_environment = zstrdup("archive");
+    server.mdb_mapsize = 10485760;
     server.requirepass = NULL;
     server.rdb_compression = 1;
     server.rdb_checksum = 1;
@@ -1138,6 +1163,7 @@ void initServerConfig() {
     server.repl_serve_stale_data = 1;
     server.repl_slave_ro = 1;
     server.repl_down_since = time(NULL);
+    server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
 
     /* Client output buffer limits */
     server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
@@ -1164,6 +1190,8 @@ void initServerConfig() {
     server.delCommand = lookupCommandByCString("del");
     server.multiCommand = lookupCommandByCString("multi");
     server.lpushCommand = lookupCommandByCString("lpush");
+    server.lpopCommand = lookupCommandByCString("lpop");
+    server.rpopCommand = lookupCommandByCString("rpop");
     
     /* Slow log */
     server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -1239,6 +1267,7 @@ void initServer() {
     server.slaves = listCreate();
     server.monitors = listCreate();
     server.unblocked_clients = listCreate();
+    server.ready_keys = listCreate();
 
     createSharedObjects();
     adjustOpenFilesLimit();
@@ -1269,6 +1298,7 @@ void initServer() {
         server.db[j].dict = dictCreate(&dbDictType,NULL);
         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
+        server.db[j].ready_keys = dictCreate(&setDictType,NULL);
         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
         server.db[j].id = j;
     }
@@ -1279,9 +1309,11 @@ void initServer() {
     server.cronloops = 0;
     server.rdb_child_pid = -1;
     server.aof_child_pid = -1;
-    server.aof_rewrite_buf = sdsempty();
+    aofRewriteBufferReset();
     server.aof_buf = sdsempty();
     server.lastsave = time(NULL);
+    server.rdb_save_time_last = -1;
+    server.rdb_save_time_start = -1;
     server.dirty = 0;
     server.stat_numcommands = 0;
     server.stat_numconnections = 0;
@@ -1302,9 +1334,9 @@ void initServer() {
     server.stop_writes_on_bgsave_err = 1;
     aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
     if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
-        acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
+        acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event.");
     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
-        acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");
+        acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
 
     if (server.aof_state == REDIS_AOF_ON) {
         server.aof_fd = open(server.aof_filename,
@@ -1316,13 +1348,22 @@ void initServer() {
         }
     }
 
+    if (server.aof_state == REDIS_MDB_ON) {
+        int retval = startKeyArchive();
+        if (retval != 0) {
+            redisLog(REDIS_WARNING, "Can't open the key-archive environment: %s",
+                mdb_strerror(retval));
+            exit(1);
+        }
+    }
+
     /* 32 bit instances are limited to 4GB of address space, so if there is
      * no explicit limit in the user provided configuration we set a limit
-     * at 3.5GB using maxmemory with 'noeviction' policy'. This saves
-     * useless crashes of the Redis instance. */
+     * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
+     * useless crashes of the Redis instance for out of memory. */
     if (server.arch_bits == 32 && server.maxmemory == 0) {
-        redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with 'noeviction' policy now.");
-        server.maxmemory = 3584LL*(1024*1024); /* 3584 MB = 3.5 GB */
+        redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
+        server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
         server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
     }
 
@@ -1353,6 +1394,9 @@ void populateCommandTable(void) {
             case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
             case 'R': c->flags |= REDIS_CMD_RANDOM; break;
             case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
+            case 'l': c->flags |= REDIS_CMD_LOADING; break;
+            case 't': c->flags |= REDIS_CMD_STALE; break;
+            case 'M': c->flags |= REDIS_CMD_SKIP_MONITOR; break;
             default: redisPanic("Unsupported command flag"); break;
             }
             f++;
@@ -1428,7 +1472,7 @@ struct redisCommand *lookupCommandByCString(char *s) {
 }
 
 /* Propagate the specified command (in the context of the specified database id)
- * to AOF, Slaves and Monitors.
+ * to AOF and Slaves.
  *
  * flags are an xor between:
  * + REDIS_PROPAGATE_NONE (no propagation of command at all)
@@ -1458,8 +1502,12 @@ void call(redisClient *c, int flags) {
 
     /* Sent the command to clients in MONITOR mode, only if the commands are
      * not geneated from reading an AOF. */
-    if (listLength(server.monitors) && !server.loading)
+    if (listLength(server.monitors) &&
+        !server.loading &&
+        !(c->cmd->flags & REDIS_CMD_SKIP_MONITOR))
+    {
         replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
+    }
 
     /* Call the command. */
     redisOpArrayInit(&server.also_propagate);
@@ -1509,7 +1557,7 @@ void call(redisClient *c, int flags) {
 }
 
 /* If this function gets called we already read a whole
- * command, argments are in the client argv/argc fields.
+ * command, arguments are in the client argv/argc fields.
  * processCommand() execute the command or prepare the
  * server for a bulk read from the client.
  *
@@ -1531,11 +1579,13 @@ int processCommand(redisClient *c) {
      * such as wrong arity, bad command name and so forth. */
     c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
     if (!c->cmd) {
+        flagTransaction(c);
         addReplyErrorFormat(c,"unknown command '%s'",
             (char*)c->argv[0]->ptr);
         return REDIS_OK;
     } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
                (c->argc < -c->cmd->arity)) {
+        flagTransaction(c);
         addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
             c->cmd->name);
         return REDIS_OK;
@@ -1544,6 +1594,7 @@ int processCommand(redisClient *c) {
     /* Check if the user is authenticated */
     if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
     {
+        flagTransaction(c);
         addReplyError(c,"operation not permitted");
         return REDIS_OK;
     }
@@ -1556,6 +1607,7 @@ int processCommand(redisClient *c) {
     if (server.maxmemory) {
         int retval = freeMemoryIfNeeded();
         if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
+            flagTransaction(c);
             addReply(c, shared.oomerr);
             return REDIS_OK;
         }
@@ -1567,11 +1619,12 @@ int processCommand(redisClient *c) {
         && server.lastbgsave_status == REDIS_ERR &&
         c->cmd->flags & REDIS_CMD_WRITE)
     {
+        flagTransaction(c);
         addReply(c, shared.bgsaveerr);
         return REDIS_OK;
     }
 
-    /* Don't accept wirte commands if this is a read only slave. But
+    /* Don't accept write commands if this is a read only slave. But
      * accept write commands if this is our master. */
     if (server.masterhost && server.repl_slave_ro &&
         !(c->flags & REDIS_MASTER) &&
@@ -1596,20 +1649,23 @@ int processCommand(redisClient *c) {
      * we are a slave with a broken link with master. */
     if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
         server.repl_serve_stale_data == 0 &&
-        c->cmd->proc != infoCommand && c->cmd->proc != slaveofCommand)
+        !(c->cmd->flags & REDIS_CMD_STALE))
     {
+        flagTransaction(c);
         addReply(c, shared.masterdownerr);
         return REDIS_OK;
     }
 
-    /* Loading DB? Return an error if the command is not INFO */
-    if (server.loading && c->cmd->proc != infoCommand) {
+    /* Loading DB? Return an error if the command has not the
+     * REDIS_CMD_LOADING flag. */
+    if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
         addReply(c, shared.loadingerr);
         return REDIS_OK;
     }
 
-    /* Lua script too slow? Only allow SHUTDOWN NOSAVE and SCRIPT KILL. */
+    /* Lua script too slow? Only allow commands with REDIS_CMD_STALE flag. */
     if (server.lua_timedout &&
+          c->cmd->proc != authCommand &&
         !(c->cmd->proc == shutdownCommand &&
           c->argc == 2 &&
           tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
@@ -1617,6 +1673,7 @@ int processCommand(redisClient *c) {
           c->argc == 2 &&
           tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
     {
+        flagTransaction(c);
         addReply(c, shared.slowscripterr);
         return REDIS_OK;
     }
@@ -1630,6 +1687,8 @@ int processCommand(redisClient *c) {
         addReply(c,shared.queued);
     } else {
         call(c,REDIS_CALL_FULL);
+        if (listLength(server.ready_keys))
+            handleClientsBlockedOnLists();
     }
     return REDIS_OK;
 }
@@ -1692,10 +1751,52 @@ int prepareForShutdown(int flags) {
 
 /*================================== Commands =============================== */
 
+/* Return zero if strings are the same, non-zero if they are not.
+ * The comparison is performed in a way that prevents an attacker to obtain
+ * information about the nature of the strings just monitoring the execution
+ * time of the function.
+ *
+ * Note that limiting the comparison length to strings up to 512 bytes we
+ * can avoid leaking any information about the password length and any
+ * possible branch misprediction related leak.
+ */
+int time_independent_strcmp(char *a, char *b) {
+    char bufa[REDIS_AUTHPASS_MAX_LEN], bufb[REDIS_AUTHPASS_MAX_LEN];
+    /* The above two strlen perform len(a) + len(b) operations where either
+     * a or b are fixed (our password) length, and the difference is only
+     * relative to the length of the user provided string, so no information
+     * leak is possible in the following two lines of code. */
+    int alen = strlen(a);
+    int blen = strlen(b);
+    int j;
+    int diff = 0;
+
+    /* We can't compare strings longer than our static buffers.
+     * Note that this will never pass the first test in practical circumstances
+     * so there is no info leak. */
+    if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
+
+    memset(bufa,0,sizeof(bufa));        /* Constant time. */
+    memset(bufb,0,sizeof(bufb));        /* Constant time. */
+    /* Again the time of the following two copies is proportional to
+     * len(a) + len(b) so no info is leaked. */
+    memcpy(bufa,a,alen);
+    memcpy(bufb,b,blen);
+
+    /* Always compare all the chars in the two buffers without
+     * conditional expressions. */
+    for (j = 0; j < sizeof(bufa); j++) {
+        diff |= (bufa[j] ^ bufb[j]);
+    }
+    /* Length must be equal as well. */
+    diff |= alen ^ blen;
+    return diff; /* If zero strings are the same. */
+}
+
 void authCommand(redisClient *c) {
     if (!server.requirepass) {
         addReplyError(c,"Client sent AUTH, but no password is set");
-    } else if (!strcmp(c->argv[1]->ptr, server.requirepass)) {
+    } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
       c->authenticated = 1;
       addReply(c,shared.ok);
     } else {
@@ -1755,7 +1856,7 @@ sds genRedisInfoString(char *section) {
     unsigned long lol, bib;
     int allsections = 0, defsections = 0;
     int sections = 0;
-    
+
     if (section) {
         allsections = strcasecmp(section,"all") == 0;
         defsections = strcasecmp(section,"default") == 0;
@@ -1768,7 +1869,11 @@ sds genRedisInfoString(char *section) {
     /* Server */
     if (allsections || defsections || !strcasecmp(section,"server")) {
         struct utsname name;
+        char *mode;
 
+        if (server.sentinel_mode) mode = "sentinel";
+        else mode = "standalone";
+    
         if (sections++) info = sdscat(info,"\r\n");
         uname(&name);
         info = sdscatprintf(info,
@@ -1776,6 +1881,7 @@ sds genRedisInfoString(char *section) {
             "redis_version:%s\r\n"
             "redis_git_sha1:%s\r\n"
             "redis_git_dirty:%d\r\n"
+            "redis_mode:%s\r\n"
             "os:%s %s %s\r\n"
             "arch_bits:%d\r\n"
             "multiplexing_api:%s\r\n"
@@ -1789,6 +1895,7 @@ sds genRedisInfoString(char *section) {
             REDIS_VERSION,
             redisGitSHA1(),
             strtol(redisGitDirty(),NULL,10) > 0,
+            mode,
             name.sysname, name.release, name.machine,
             server.arch_bits,
             aeGetApiName(),
@@ -1854,21 +1961,33 @@ sds genRedisInfoString(char *section) {
         info = sdscatprintf(info,
             "# Persistence\r\n"
             "loading:%d\r\n"
+            "rdb_changes_since_last_save:%lld\r\n"
+            "rdb_bgsave_in_progress:%d\r\n"
+            "rdb_last_save_time:%ld\r\n"
+            "rdb_last_bgsave_status:%s\r\n"
+            "rdb_last_bgsave_time_sec:%ld\r\n"
+            "rdb_current_bgsave_time_sec:%ld\r\n"
             "aof_enabled:%d\r\n"
-            "changes_since_last_save:%lld\r\n"
-            "bgsave_in_progress:%d\r\n"
-            "last_save_time:%ld\r\n"
-            "last_bgsave_status:%s\r\n"
-            "bgrewriteaof_in_progress:%d\r\n"
-            "bgrewriteaof_scheduled:%d\r\n",
+            "aof_rewrite_in_progress:%d\r\n"
+            "aof_rewrite_scheduled:%d\r\n"
+            "aof_last_rewrite_time_sec:%ld\r\n"
+            "aof_current_rewrite_time_sec:%ld\r\n"
+            "aof_last_bgrewrite_status:%s\r\n",
             server.loading,
-            server.aof_state != REDIS_AOF_OFF,
             server.dirty,
             server.rdb_child_pid != -1,
             server.lastsave,
-            server.lastbgsave_status == REDIS_OK ? "ok" : "err",
+            (server.lastbgsave_status == REDIS_OK) ? "ok" : "err",
+            server.rdb_save_time_last,
+            (server.rdb_child_pid == -1) ?
+                -1 : time(NULL)-server.rdb_save_time_start,
+            server.aof_state != REDIS_AOF_OFF,
             server.aof_child_pid != -1,
-            server.aof_rewrite_scheduled);
+            server.aof_rewrite_scheduled,
+            server.aof_rewrite_time_last,
+            (server.aof_child_pid == -1) ?
+                -1 : time(NULL)-server.aof_rewrite_time_start,
+            (server.aof_lastbgrewrite_status == REDIS_OK) ? "ok" : "err");
 
         if (server.aof_state != REDIS_AOF_OFF) {
             info = sdscatprintf(info,
@@ -1876,12 +1995,14 @@ sds genRedisInfoString(char *section) {
                 "aof_base_size:%lld\r\n"
                 "aof_pending_rewrite:%d\r\n"
                 "aof_buffer_length:%zu\r\n"
+                "aof_rewrite_buffer_length:%lu\r\n"
                 "aof_pending_bio_fsync:%llu\r\n"
                 "aof_delayed_fsync:%lu\r\n",
                 (long long) server.aof_current_size,
                 (long long) server.aof_rewrite_base_size,
                 server.aof_rewrite_scheduled,
                 sdslen(server.aof_buf),
+                aofRewriteBufferSize(),
                 bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC),
                 server.aof_delayed_fsync);
         }
@@ -1972,9 +2093,10 @@ sds genRedisInfoString(char *section) {
 
             if (server.repl_state == REDIS_REPL_TRANSFER) {
                 info = sdscatprintf(info,
-                    "master_sync_left_bytes:%ld\r\n"
+                    "master_sync_left_bytes:%lld\r\n"
                     "master_sync_last_io_seconds_ago:%d\r\n"
-                    ,(long)server.repl_transfer_left,
+                    , (long long)
+                        (server.repl_transfer_size - server.repl_transfer_read),
                     (int)(server.unixtime-server.repl_transfer_lastio)
                 );
             }
@@ -1984,6 +2106,11 @@ sds genRedisInfoString(char *section) {
                     "master_link_down_since_seconds:%ld\r\n",
                     (long)server.unixtime-server.repl_down_since);
             }
+            info = sdscatprintf(info,
+                "slave_priority:%d\r\n"
+                "slave_read_only:%d\r\n",
+                server.slave_priority,
+                server.repl_slave_ro);
         }
         info = sdscatprintf(info,
             "connected_slaves:%lu\r\n",
@@ -2015,7 +2142,7 @@ sds genRedisInfoString(char *section) {
                 }
                 if (state == NULL) continue;
                 info = sdscatprintf(info,"slave%d:%s,%d,%s\r\n",
-                    slaveid,ip,port,state);
+                    slaveid,ip,slave->slave_listening_port,state);
                 slaveid++;
             }
         }
@@ -2085,7 +2212,7 @@ void infoCommand(redisClient *c) {
 }
 
 void monitorCommand(redisClient *c) {
-    /* ignore MONITOR if aleady slave or in monitor mode */
+    /* ignore MONITOR if already slave or in monitor mode */
     if (c->flags & REDIS_SLAVE) return;
 
     c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
@@ -2134,7 +2261,7 @@ int freeMemoryIfNeeded(void) {
     }
     if (server.aof_state != REDIS_AOF_OFF) {
         mem_used -= sdslen(server.aof_buf);
-        mem_used -= sdslen(server.aof_rewrite_buf);
+        mem_used -= aofRewriteBufferSize();
     }
 
     /* Check if we are over the memory limit. */
@@ -2184,7 +2311,7 @@ int freeMemoryIfNeeded(void) {
 
                     de = dictGetRandomKey(dict);
                     thiskey = dictGetKey(de);
-                    /* When policy is volatile-lru we need an additonal lookup
+                    /* When policy is volatile-lru we need an additional lookup
                      * to locate the real key, as dict is set to db->expires. */
                     if (server.maxmemory_policy == REDIS_MAXMEMORY_VOLATILE_LRU)
                         de = dictFind(db->dict, thiskey);
@@ -2223,6 +2350,8 @@ int freeMemoryIfNeeded(void) {
                 long long delta;
 
                 robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
+                int archived = archive(db, keyobj);
+                redisAssert(archived != 0);
                 propagateExpire(db,keyobj);
                 /* We compute the amount of memory freed by dbDelete() alone.
                  * It is possible that actually the memory needed to propagate
@@ -2323,21 +2452,25 @@ void usage() {
     fprintf(stderr,"       ./redis-server /etc/redis/6379.conf\n");
     fprintf(stderr,"       ./redis-server --port 7777\n");
     fprintf(stderr,"       ./redis-server --port 7777 --slaveof 127.0.0.1 8888\n");
-    fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n");
+    fprintf(stderr,"       ./redis-server /etc/myredis.conf --loglevel verbose\n\n");
+    fprintf(stderr,"Sentinel mode:\n");
+    fprintf(stderr,"       ./redis-server /etc/sentinel.conf --sentinel\n");
     exit(1);
 }
 
 void redisAsciiArt(void) {
 #include "asciilogo.h"
     char *buf = zmalloc(1024*16);
+    char *mode = "stand alone";
+
+    if (server.sentinel_mode) mode = "sentinel";
 
     snprintf(buf,1024*16,ascii_logo,
         REDIS_VERSION,
         redisGitSHA1(),
         strtol(redisGitDirty(),NULL,10) > 0,
         (sizeof(long) == 8) ? "64" : "32",
-        "stand alone",
-        server.port,
+        mode, server.port,
         (long) getpid()
     );
     redisLogRaw(REDIS_NOTICE|REDIS_LOG_RAW,buf);
@@ -2375,17 +2508,60 @@ void setupSignalHandlers(void) {
 
 void memtest(size_t megabytes, int passes);
 
+/* Returns 1 if there is --sentinel among the arguments or if
+ * argv[0] is exactly "redis-sentinel". */
+int checkForSentinelMode(int argc, char **argv) {
+    int j;
+
+    if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
+    for (j = 1; j < argc; j++)
+        if (!strcmp(argv[j],"--sentinel")) return 1;
+    return 0;
+}
+
+/* Function called at startup to load RDB or AOF file in memory. */
+void loadDataFromDisk(void) {
+    long long start = ustime();
+    if (server.aof_state == REDIS_AOF_ON) {
+        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
+            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
+    } else {
+        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
+            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
+                (float)(ustime()-start)/1000000);
+        } else if (errno != ENOENT) {
+            redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
+            exit(1);
+        }
+    }
+}
+
+void redisOutOfMemoryHandler(size_t allocation_size) {
+    redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!",
+        allocation_size);
+    redisPanic("OOM");
+}
+
 int main(int argc, char **argv) {
-    long long start;
     struct timeval tv;
 
     /* We need to initialize our libraries, and the server configuration. */
     zmalloc_enable_thread_safeness();
+    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
     srand(time(NULL)^getpid());
     gettimeofday(&tv,NULL);
     dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());
+    server.sentinel_mode = checkForSentinelMode(argc,argv);
     initServerConfig();
 
+    /* We need to init sentinel right now as parsing the configuration file
+     * in sentinel mode will have the effect of populating the sentinel
+     * data structures with master nodes to monitor. */
+    if (server.sentinel_mode) {
+        initSentinelConfig();
+        initSentinel();
+    }
+
     if (argc >= 2) {
         int j = 1; /* First option to parse in argv[] */
         sds options = sdsempty();
@@ -2431,33 +2607,31 @@ int main(int argc, char **argv) {
         loadServerConfig(configfile,options);
         sdsfree(options);
     } else {
-        redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
+        redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
     }
     if (server.daemonize) daemonize();
     initServer();
     if (server.daemonize) createPidFile();
     redisAsciiArt();
-    redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
-#ifdef __linux__
-    linuxOvercommitMemoryWarning();
-#endif
-    start = ustime();
-    if (server.aof_state == REDIS_AOF_ON) {
-        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
-            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
-    } else {
-        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
-            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
-                (float)(ustime()-start)/1000000);
-        } else if (errno != ENOENT) {
-            redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
-            exit(1);
-        }
+
+    if (!server.sentinel_mode) {
+        /* Things only needed when not running in Sentinel mode. */
+        redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION);
+    #ifdef __linux__
+        linuxOvercommitMemoryWarning();
+    #endif
+        loadDataFromDisk();
+        if (server.ipfd > 0)
+            redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+        if (server.sofd > 0)
+            redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
+    }
+
+    /* Warning the user about suspicious maxmemory setting. */
+    if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
+        redisLog(REDIS_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
     }
-    if (server.ipfd > 0)
-        redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
-    if (server.sofd > 0)
-        redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
+
     aeSetBeforeSleepProc(server.el,beforeSleep);
     aeMain(server.el);
     aeDeleteEventLoop(server.el);