]> git.saurik.com Git - redis.git/blobdiff - src/redis.c
Added the SRANDMEMBER key <count> variant.
[redis.git] / src / redis.c
index 4cc378e24a40d2de7d4955e6bd303147c1a1a548..193d3bf58b07f82572a5a7c14e890231dc11ebf7 100644 (file)
@@ -106,6 +106,10 @@ struct redisCommand *commandTable;
  *    results. For instance SPOP and RANDOMKEY are two random commands.
  * S: Sort command output array if called from script, so that the output
  *    is deterministic.
+ * l: Allow command while loading the database.
+ * t: Allow command while a slave has stale data but is not allowed to
+ *    server this data. Normally no command is accepted in this condition
+ *    but just a few.
  */
 struct redisCommand redisCommandTable[] = {
     {"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
@@ -148,7 +152,7 @@ struct redisCommand redisCommandTable[] = {
     {"sismember",sismemberCommand,3,"r",0,NULL,1,1,1,0,0},
     {"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0},
     {"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0},
-    {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0},
+    {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
     {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
     {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
     {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
@@ -215,22 +219,23 @@ struct redisCommand redisCommandTable[] = {
     {"exec",execCommand,1,"s",0,NULL,0,0,0,0,0},
     {"discard",discardCommand,1,"rs",0,NULL,0,0,0,0,0},
     {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
+    {"replconf",replconfCommand,-1,"ars",0,NULL,0,0,0,0,0},
     {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
     {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
-    {"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0},
-    {"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0},
+    {"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0},
+    {"info",infoCommand,-1,"rlt",0,NULL,0,0,0,0,0},
     {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
     {"ttl",ttlCommand,2,"r",0,NULL,1,1,1,0,0},
     {"pttl",pttlCommand,2,"r",0,NULL,1,1,1,0,0},
     {"persist",persistCommand,2,"w",0,NULL,1,1,1,0,0},
-    {"slaveof",slaveofCommand,3,"as",0,NULL,0,0,0,0,0},
+    {"slaveof",slaveofCommand,3,"ast",0,NULL,0,0,0,0,0},
     {"debug",debugCommand,-2,"as",0,NULL,0,0,0,0,0},
     {"config",configCommand,-2,"ar",0,NULL,0,0,0,0,0},
-    {"subscribe",subscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
-    {"unsubscribe",unsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
-    {"psubscribe",psubscribeCommand,-2,"rps",0,NULL,0,0,0,0,0},
-    {"punsubscribe",punsubscribeCommand,-1,"rps",0,NULL,0,0,0,0,0},
-    {"publish",publishCommand,3,"pf",0,NULL,0,0,0,0,0},
+    {"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
+    {"unsubscribe",unsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
+    {"psubscribe",psubscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0},
+    {"punsubscribe",punsubscribeCommand,-1,"rpslt",0,NULL,0,0,0,0,0},
+    {"publish",publishCommand,3,"pflt",0,NULL,0,0,0,0,0},
     {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
     {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
     {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
@@ -242,7 +247,9 @@ struct redisCommand redisCommandTable[] = {
     {"evalsha",evalShaCommand,-3,"s",0,zunionInterGetKeys,0,0,0,0,0},
     {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
     {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
-    {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}
+    {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0},
+    {"bitop",bitopCommand,-4,"wm",0,NULL,2,-1,1,0,0},
+    {"bitcount",bitcountCommand,-2,"r",0,NULL,1,1,1,0,0}
 };
 
 /*============================ Utility functions ============================ */
@@ -325,17 +332,6 @@ err:
     if (server.logfile) close(fd);
 }
 
-/* Redis generally does not try to recover from out of memory conditions
- * when allocating objects or strings, it is not clear if it will be possible
- * to report this condition to the client since the networking layer itself
- * is based on heap allocation for send buffers, so we simply abort.
- * At least the code will be simpler to read... */
-void oom(const char *msg) {
-    redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
-    sleep(1);
-    abort();
-}
-
 /* Return the UNIX time in microseconds */
 long long ustime(void) {
     struct timeval tv;
@@ -581,10 +577,16 @@ void incrementallyRehash(void) {
     int j;
 
     for (j = 0; j < server.dbnum; j++) {
+        /* Keys dictionary */
         if (dictIsRehashing(server.db[j].dict)) {
             dictRehashMilliseconds(server.db[j].dict,1);
             break; /* already used our millisecond for this loop... */
         }
+        /* Expires */
+        if (dictIsRehashing(server.db[j].expires)) {
+            dictRehashMilliseconds(server.db[j].expires,1);
+            break; /* already used our millisecond for this loop... */
+        }
     }
 }
 
@@ -608,19 +610,35 @@ void updateDictResizePolicy(void) {
  * it will get more aggressive to avoid that too much memory is used by
  * keys that can be removed from the keyspace. */
 void activeExpireCycle(void) {
-    int j;
-    long long start = mstime();
+    int j, iteration = 0;
+    long long start = ustime(), timelimit;
+
+    /* We can use at max REDIS_EXPIRELOOKUPS_TIME_PERC percentage of CPU time
+     * per iteration. Since this function gets called with a frequency of
+     * REDIS_HZ times per second, the following is the max amount of
+     * microseconds we can spend in this function. */
+    timelimit = 1000000*REDIS_EXPIRELOOKUPS_TIME_PERC/REDIS_HZ/100;
+    if (timelimit <= 0) timelimit = 1;
 
     for (j = 0; j < server.dbnum; j++) {
-        int expired, iteration = 0;
+        int expired;
         redisDb *db = server.db+j;
 
         /* Continue to expire if at the end of the cycle more than 25%
          * of the keys were expired. */
         do {
-            long num = dictSize(db->expires);
+            unsigned long num = dictSize(db->expires);
+            unsigned long slots = dictSlots(db->expires);
             long long now = mstime();
 
+            /* When there are less than 1% filled slots getting random
+             * keys is expensive, so stop here waiting for better times...
+             * The dictionary will be resized asap. */
+            if (num && slots > DICT_HT_INITIAL_SIZE &&
+                (num*100/slots < 1)) break;
+
+            /* The main collection cycle. Sample random keys among keys
+             * with an expire set, checking for expired ones. */
             expired = 0;
             if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
                 num = REDIS_EXPIRELOOKUPS_PER_CRON;
@@ -645,8 +663,8 @@ void activeExpireCycle(void) {
              * expire. So after a given amount of milliseconds return to the
              * caller waiting for the other active expire cycle. */
             iteration++;
-            if ((iteration & 0xff) == 0 &&  /* & 0xff is the same as % 255 */
-                (mstime()-start) > REDIS_EXPIRELOOKUPS_TIME_LIMIT) return;
+            if ((iteration & 0xf) == 0 && /* check once every 16 cycles. */
+                (ustime()-start) > timelimit) return;
         } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
     }
 }
@@ -732,13 +750,13 @@ int clientsCronResizeQueryBuffer(redisClient *c) {
 }
 
 void clientsCron(void) {
-    /* Make sure to process at least 1/100 of clients per call.
-     * Since this function is called 10 times per second we are sure that
+    /* Make sure to process at least 1/(REDIS_HZ*10) of clients per call.
+     * Since this function is called REDIS_HZ times per second we are sure that
      * in the worst case we process all the clients in 10 seconds.
      * In normal conditions (a reasonable number of clients) we process
      * all the clients in a shorter time. */
     int numclients = listLength(server.clients);
-    int iterations = numclients/100;
+    int iterations = numclients/(REDIS_HZ*10);
 
     if (iterations < 50)
         iterations = (numclients < 50) ? numclients : 50;
@@ -760,6 +778,30 @@ void clientsCron(void) {
     }
 }
 
+/* This is our timer interrupt, called REDIS_HZ times per second.
+ * Here is where we do a number of things that need to be done asynchronously.
+ * For instance:
+ *
+ * - Active expired keys collection (it is also performed in a lazy way on
+ *   lookup).
+ * - Software watchdong.
+ * - Update some statistic.
+ * - Incremental rehashing of the DBs hash tables.
+ * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
+ * - Clients timeout of differnet kinds.
+ * - Replication reconnection.
+ * - Many more...
+ *
+ * Everything directly called here will be called REDIS_HZ times per second,
+ * so in order to throttle execution of things we want to do less frequently
+ * a macro is used: run_with_period(milliseconds) { .... }
+ */
+
+/* Using the following macro you can run code inside serverCron() with the
+ * specified period, specified in milliseconds.
+ * The actual resolution depends on REDIS_HZ. */
+#define run_with_period(_ms_) if (!(loops % ((_ms_)/(1000/REDIS_HZ))))
+
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     int j, loops = server.cronloops;
     REDIS_NOTUSED(eventLoop);
@@ -776,7 +818,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * To access a global var is faster than calling time(NULL) */
     server.unixtime = time(NULL);
 
-    trackOperationsPerSecond();
+    run_with_period(100) trackOperationsPerSecond();
 
     /* We have just 22 bits per object for LRU information.
      * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
@@ -804,15 +846,17 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     }
 
     /* Show some info about non-empty databases */
-    for (j = 0; j < server.dbnum; j++) {
-        long long size, used, vkeys;
-
-        size = dictSlots(server.db[j].dict);
-        used = dictSize(server.db[j].dict);
-        vkeys = dictSize(server.db[j].expires);
-        if (!(loops % 50) && (used || vkeys)) {
-            redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
-            /* dictPrintStats(server.dict); */
+    run_with_period(5000) {
+        for (j = 0; j < server.dbnum; j++) {
+            long long size, used, vkeys;
+
+            size = dictSlots(server.db[j].dict);
+            used = dictSize(server.db[j].dict);
+            vkeys = dictSize(server.db[j].expires);
+            if (used || vkeys) {
+                redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
+                /* dictPrintStats(server.dict); */
+            }
         }
     }
 
@@ -823,12 +867,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * a lot of memory movements in the parent will cause a lot of pages
      * copied. */
     if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) {
-        if (!(loops % 10)) tryResizeHashTables();
+        tryResizeHashTables();
         if (server.activerehashing) incrementallyRehash();
     }
 
     /* Show information about connected clients */
-    if (!(loops % 50)) {
+    run_with_period(5000) {
         redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
             listLength(server.clients)-listLength(server.slaves),
             listLength(server.slaves),
@@ -910,10 +954,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
 
     /* Replication cron function -- used to reconnect to master and
      * to detect transfer failures. */
-    if (!(loops % 10)) replicationCron();
+    run_with_period(1000) replicationCron();
 
     server.cronloops++;
-    return 100;
+    return 1000/REDIS_HZ;
 }
 
 /* This function gets called every time Redis is entering the
@@ -1002,6 +1046,7 @@ void createSharedObjects(void) {
     shared.del = createStringObject("DEL",3);
     shared.rpop = createStringObject("RPOP",4);
     shared.lpop = createStringObject("LPOP",4);
+    shared.lpush = createStringObject("LPUSH",5);
     for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
         shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
         shared.integers[j]->encoding = REDIS_ENCODING_INT;
@@ -1043,6 +1088,9 @@ void initServerConfig() {
     server.aof_rewrite_base_size = 0;
     server.aof_rewrite_scheduled = 0;
     server.aof_last_fsync = time(NULL);
+    server.aof_rewrite_time_last = -1;
+    server.aof_rewrite_time_start = -1;
+    server.aof_lastbgrewrite_status = REDIS_OK;
     server.aof_delayed_fsync = 0;
     server.aof_fd = -1;
     server.aof_selected_db = -1; /* Make sure the first time will not match */
@@ -1090,6 +1138,7 @@ void initServerConfig() {
     server.repl_serve_stale_data = 1;
     server.repl_slave_ro = 1;
     server.repl_down_since = time(NULL);
+    server.slave_priority = REDIS_DEFAULT_SLAVE_PRIORITY;
 
     /* Client output buffer limits */
     server.client_obuf_limits[REDIS_CLIENT_LIMIT_CLASS_NORMAL].hard_limit_bytes = 0;
@@ -1116,6 +1165,8 @@ void initServerConfig() {
     server.delCommand = lookupCommandByCString("del");
     server.multiCommand = lookupCommandByCString("multi");
     server.lpushCommand = lookupCommandByCString("lpush");
+    server.lpopCommand = lookupCommandByCString("lpop");
+    server.rpopCommand = lookupCommandByCString("rpop");
     
     /* Slow log */
     server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -1191,6 +1242,7 @@ void initServer() {
     server.slaves = listCreate();
     server.monitors = listCreate();
     server.unblocked_clients = listCreate();
+    server.ready_keys = listCreate();
 
     createSharedObjects();
     adjustOpenFilesLimit();
@@ -1221,6 +1273,7 @@ void initServer() {
         server.db[j].dict = dictCreate(&dbDictType,NULL);
         server.db[j].expires = dictCreate(&keyptrDictType,NULL);
         server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
+        server.db[j].ready_keys = dictCreate(&setDictType,NULL);
         server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
         server.db[j].id = j;
     }
@@ -1231,9 +1284,11 @@ void initServer() {
     server.cronloops = 0;
     server.rdb_child_pid = -1;
     server.aof_child_pid = -1;
-    server.aof_rewrite_buf = sdsempty();
+    aofRewriteBufferReset();
     server.aof_buf = sdsempty();
     server.lastsave = time(NULL);
+    server.rdb_save_time_last = -1;
+    server.rdb_save_time_start = -1;
     server.dirty = 0;
     server.stat_numcommands = 0;
     server.stat_numconnections = 0;
@@ -1254,9 +1309,9 @@ void initServer() {
     server.stop_writes_on_bgsave_err = 1;
     aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
     if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
-        acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
+        acceptTcpHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.ipfd file event.");
     if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
-        acceptUnixHandler,NULL) == AE_ERR) oom("creating file event");
+        acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event.");
 
     if (server.aof_state == REDIS_AOF_ON) {
         server.aof_fd = open(server.aof_filename,
@@ -1305,6 +1360,8 @@ void populateCommandTable(void) {
             case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
             case 'R': c->flags |= REDIS_CMD_RANDOM; break;
             case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
+            case 'l': c->flags |= REDIS_CMD_LOADING; break;
+            case 't': c->flags |= REDIS_CMD_STALE; break;
             default: redisPanic("Unsupported command flag"); break;
             }
             f++;
@@ -1523,7 +1580,7 @@ int processCommand(redisClient *c) {
         return REDIS_OK;
     }
 
-    /* Don't accept wirte commands if this is a read only slave. But
+    /* Don't accept write commands if this is a read only slave. But
      * accept write commands if this is our master. */
     if (server.masterhost && server.repl_slave_ro &&
         !(c->flags & REDIS_MASTER) &&
@@ -1548,19 +1605,20 @@ int processCommand(redisClient *c) {
      * we are a slave with a broken link with master. */
     if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
         server.repl_serve_stale_data == 0 &&
-        c->cmd->proc != infoCommand && c->cmd->proc != slaveofCommand)
+        !(c->cmd->flags & REDIS_CMD_STALE))
     {
         addReply(c, shared.masterdownerr);
         return REDIS_OK;
     }
 
-    /* Loading DB? Return an error if the command is not INFO */
-    if (server.loading && c->cmd->proc != infoCommand) {
+    /* Loading DB? Return an error if the command has not the
+     * REDIS_CMD_LOADING flag. */
+    if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
         addReply(c, shared.loadingerr);
         return REDIS_OK;
     }
 
-    /* Lua script too slow? Only allow SHUTDOWN NOSAVE and SCRIPT KILL. */
+    /* Lua script too slow? Only allow commands with REDIS_CMD_STALE flag. */
     if (server.lua_timedout &&
         !(c->cmd->proc == shutdownCommand &&
           c->argc == 2 &&
@@ -1582,6 +1640,8 @@ int processCommand(redisClient *c) {
         addReply(c,shared.queued);
     } else {
         call(c,REDIS_CALL_FULL);
+        if (listLength(server.ready_keys))
+            handleClientsBlockedOnLists();
     }
     return REDIS_OK;
 }
@@ -1644,10 +1704,52 @@ int prepareForShutdown(int flags) {
 
 /*================================== Commands =============================== */
 
+/* Return zero if strings are the same, non-zero if they are not.
+ * The comparison is performed in a way that prevents an attacker to obtain
+ * information about the nature of the strings just monitoring the execution
+ * time of the function.
+ *
+ * Note that limiting the comparison length to strings up to 512 bytes we
+ * can avoid leaking any information about the password length and any
+ * possible branch misprediction related leak.
+ */
+int time_independent_strcmp(char *a, char *b) {
+    char bufa[REDIS_AUTHPASS_MAX_LEN], bufb[REDIS_AUTHPASS_MAX_LEN];
+    /* The above two strlen perform len(a) + len(b) operations where either
+     * a or b are fixed (our password) length, and the difference is only
+     * relative to the length of the user provided string, so no information
+     * leak is possible in the following two lines of code. */
+    int alen = strlen(a);
+    int blen = strlen(b);
+    int j;
+    int diff = 0;
+
+    /* We can't compare strings longer than our static buffers.
+     * Note that this will never pass the first test in practical circumstances
+     * so there is no info leak. */
+    if (alen > sizeof(bufa) || blen > sizeof(bufb)) return 1;
+
+    memset(bufa,0,sizeof(bufa));        /* Constant time. */
+    memset(bufb,0,sizeof(bufb));        /* Constant time. */
+    /* Again the time of the following two copies is proportional to
+     * len(a) + len(b) so no info is leaked. */
+    memcpy(bufa,a,alen);
+    memcpy(bufb,b,blen);
+
+    /* Always compare all the chars in the two buffers without
+     * conditional expressions. */
+    for (j = 0; j < sizeof(bufa); j++) {
+        diff |= (bufa[j] ^ bufb[j]);
+    }
+    /* Length must be equal as well. */
+    diff |= alen ^ blen;
+    return diff; /* If zero strings are the same. */
+}
+
 void authCommand(redisClient *c) {
     if (!server.requirepass) {
         addReplyError(c,"Client sent AUTH, but no password is set");
-    } else if (!strcmp(c->argv[1]->ptr, server.requirepass)) {
+    } else if (!time_independent_strcmp(c->argv[1]->ptr, server.requirepass)) {
       c->authenticated = 1;
       addReply(c,shared.ok);
     } else {
@@ -1806,21 +1908,33 @@ sds genRedisInfoString(char *section) {
         info = sdscatprintf(info,
             "# Persistence\r\n"
             "loading:%d\r\n"
+            "rdb_changes_since_last_save:%lld\r\n"
+            "rdb_bgsave_in_progress:%d\r\n"
+            "rdb_last_save_time:%ld\r\n"
+            "rdb_last_bgsave_status:%s\r\n"
+            "rdb_last_bgsave_time_sec:%ld\r\n"
+            "rdb_current_bgsave_time_sec:%ld\r\n"
             "aof_enabled:%d\r\n"
-            "changes_since_last_save:%lld\r\n"
-            "bgsave_in_progress:%d\r\n"
-            "last_save_time:%ld\r\n"
-            "last_bgsave_status:%s\r\n"
-            "bgrewriteaof_in_progress:%d\r\n"
-            "bgrewriteaof_scheduled:%d\r\n",
+            "aof_rewrite_in_progress:%d\r\n"
+            "aof_rewrite_scheduled:%d\r\n"
+            "aof_last_rewrite_time_sec:%ld\r\n"
+            "aof_current_rewrite_time_sec:%ld\r\n"
+            "aof_last_bgrewrite_status:%s\r\n",
             server.loading,
-            server.aof_state != REDIS_AOF_OFF,
             server.dirty,
             server.rdb_child_pid != -1,
             server.lastsave,
-            server.lastbgsave_status == REDIS_OK ? "ok" : "err",
+            (server.lastbgsave_status == REDIS_OK) ? "ok" : "err",
+            server.rdb_save_time_last,
+            (server.rdb_child_pid == -1) ?
+                -1 : time(NULL)-server.rdb_save_time_start,
+            server.aof_state != REDIS_AOF_OFF,
             server.aof_child_pid != -1,
-            server.aof_rewrite_scheduled);
+            server.aof_rewrite_scheduled,
+            server.aof_rewrite_time_last,
+            (server.aof_child_pid == -1) ?
+                -1 : time(NULL)-server.aof_rewrite_time_start,
+            (server.aof_lastbgrewrite_status == REDIS_OK) ? "ok" : "err");
 
         if (server.aof_state != REDIS_AOF_OFF) {
             info = sdscatprintf(info,
@@ -1828,12 +1942,14 @@ sds genRedisInfoString(char *section) {
                 "aof_base_size:%lld\r\n"
                 "aof_pending_rewrite:%d\r\n"
                 "aof_buffer_length:%zu\r\n"
+                "aof_rewrite_buffer_length:%lu\r\n"
                 "aof_pending_bio_fsync:%llu\r\n"
                 "aof_delayed_fsync:%lu\r\n",
                 (long long) server.aof_current_size,
                 (long long) server.aof_rewrite_base_size,
                 server.aof_rewrite_scheduled,
                 sdslen(server.aof_buf),
+                aofRewriteBufferSize(),
                 bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC),
                 server.aof_delayed_fsync);
         }
@@ -1924,9 +2040,10 @@ sds genRedisInfoString(char *section) {
 
             if (server.repl_state == REDIS_REPL_TRANSFER) {
                 info = sdscatprintf(info,
-                    "master_sync_left_bytes:%ld\r\n"
+                    "master_sync_left_bytes:%lld\r\n"
                     "master_sync_last_io_seconds_ago:%d\r\n"
-                    ,(long)server.repl_transfer_left,
+                    , (long long)
+                        (server.repl_transfer_size - server.repl_transfer_read),
                     (int)(server.unixtime-server.repl_transfer_lastio)
                 );
             }
@@ -1936,6 +2053,8 @@ sds genRedisInfoString(char *section) {
                     "master_link_down_since_seconds:%ld\r\n",
                     (long)server.unixtime-server.repl_down_since);
             }
+            info = sdscatprintf(info,
+                "slave_priority:%d\r\n", server.slave_priority);
         }
         info = sdscatprintf(info,
             "connected_slaves:%lu\r\n",
@@ -1967,7 +2086,7 @@ sds genRedisInfoString(char *section) {
                 }
                 if (state == NULL) continue;
                 info = sdscatprintf(info,"slave%d:%s,%d,%s\r\n",
-                    slaveid,ip,port,state);
+                    slaveid,ip,slave->slave_listening_port,state);
                 slaveid++;
             }
         }
@@ -2086,7 +2205,7 @@ int freeMemoryIfNeeded(void) {
     }
     if (server.aof_state != REDIS_AOF_OFF) {
         mem_used -= sdslen(server.aof_buf);
-        mem_used -= sdslen(server.aof_rewrite_buf);
+        mem_used -= aofRewriteBufferSize();
     }
 
     /* Check if we are over the memory limit. */
@@ -2327,12 +2446,19 @@ void setupSignalHandlers(void) {
 
 void memtest(size_t megabytes, int passes);
 
+void redisOutOfMemoryHandler(size_t allocation_size) {
+    redisLog(REDIS_WARNING,"Out Of Memory allocating %zu bytes!",
+        allocation_size);
+    redisPanic("OOM");
+}
+
 int main(int argc, char **argv) {
     long long start;
     struct timeval tv;
 
     /* We need to initialize our libraries, and the server configuration. */
     zmalloc_enable_thread_safeness();
+    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
     srand(time(NULL)^getpid());
     gettimeofday(&tv,NULL);
     dictSetHashFunctionSeed(tv.tv_sec^tv.tv_usec^getpid());