]> git.saurik.com Git - redis.git/blobdiff - src/redis.c
sds.c: sdsAllocSize() function added.
[redis.git] / src / redis.c
index ca49271dfba1a5794c32125fdc64a893d620ee60..58b5f1c4bf2d973e8035abb39ede205dea14de95 100644 (file)
@@ -102,7 +102,10 @@ struct redisCommand *commandTable;
  * s: command not allowed in scripts.
  * R: random command. Command is not deterministic, that is, the same command
  *    with the same arguments, with the same key space, may have different
- *    results. For instance SPOP and RANDOMKEY are two random commands. */
+ *    results. For instance SPOP and RANDOMKEY are two random commands.
+ * S: Sort command output array if called from script, so that the output
+ *    is deterministic.
+ */
 struct redisCommand redisCommandTable[] = {
     {"get",getCommand,2,"r",0,NULL,1,1,1,0,0},
     {"set",setCommand,3,"wm",0,noPreloadGetKeys,1,1,1,0,0},
@@ -145,13 +148,13 @@ struct redisCommand redisCommandTable[] = {
     {"scard",scardCommand,2,"r",0,NULL,1,1,1,0,0},
     {"spop",spopCommand,2,"wRs",0,NULL,1,1,1,0,0},
     {"srandmember",srandmemberCommand,2,"rR",0,NULL,1,1,1,0,0},
-    {"sinter",sinterCommand,-2,"r",0,NULL,1,-1,1,0,0},
+    {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
     {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
-    {"sunion",sunionCommand,-2,"r",0,NULL,1,-1,1,0,0},
+    {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
     {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
-    {"sdiff",sdiffCommand,-2,"r",0,NULL,1,-1,1,0,0},
+    {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
     {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
-    {"smembers",sinterCommand,2,"r",0,NULL,1,1,1,0,0},
+    {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
     {"zadd",zaddCommand,-4,"wm",0,NULL,1,1,1,0,0},
     {"zincrby",zincrbyCommand,4,"wm",0,NULL,1,1,1,0,0},
     {"zrem",zremCommand,-3,"w",0,NULL,1,1,1,0,0},
@@ -177,8 +180,8 @@ struct redisCommand redisCommandTable[] = {
     {"hincrbyfloat",hincrbyfloatCommand,4,"wm",0,NULL,1,1,1,0,0},
     {"hdel",hdelCommand,-3,"w",0,NULL,1,1,1,0,0},
     {"hlen",hlenCommand,2,"r",0,NULL,1,1,1,0,0},
-    {"hkeys",hkeysCommand,2,"r",0,NULL,1,1,1,0,0},
-    {"hvals",hvalsCommand,2,"r",0,NULL,1,1,1,0,0},
+    {"hkeys",hkeysCommand,2,"rS",0,NULL,1,1,1,0,0},
+    {"hvals",hvalsCommand,2,"rS",0,NULL,1,1,1,0,0},
     {"hgetall",hgetallCommand,2,"r",0,NULL,1,1,1,0,0},
     {"hexists",hexistsCommand,3,"r",0,NULL,1,1,1,0,0},
     {"incrby",incrbyCommand,3,"wm",0,NULL,1,1,1,0,0},
@@ -196,7 +199,7 @@ struct redisCommand redisCommandTable[] = {
     {"expireat",expireatCommand,3,"w",0,NULL,1,1,1,0,0},
     {"pexpire",pexpireCommand,3,"w",0,NULL,1,1,1,0,0},
     {"pexpireat",pexpireatCommand,3,"w",0,NULL,1,1,1,0,0},
-    {"keys",keysCommand,2,"r",0,NULL,0,0,0,0,0},
+    {"keys",keysCommand,2,"rS",0,NULL,0,0,0,0,0},
     {"dbsize",dbsizeCommand,1,"r",0,NULL,0,0,0,0,0},
     {"auth",authCommand,2,"rs",0,NULL,0,0,0,0,0},
     {"ping",pingCommand,1,"r",0,NULL,0,0,0,0,0},
@@ -213,7 +216,7 @@ struct redisCommand redisCommandTable[] = {
     {"sync",syncCommand,1,"ars",0,NULL,0,0,0,0,0},
     {"flushdb",flushdbCommand,1,"w",0,NULL,0,0,0,0,0},
     {"flushall",flushallCommand,1,"w",0,NULL,0,0,0,0,0},
-    {"sort",sortCommand,-2,"wm",0,NULL,1,1,1,0,0},
+    {"sort",sortCommand,-2,"wmS",0,NULL,1,1,1,0,0},
     {"info",infoCommand,-1,"r",0,NULL,0,0,0,0,0},
     {"monitor",monitorCommand,1,"ars",0,NULL,0,0,0,0,0},
     {"ttl",ttlCommand,2,"r",0,NULL,1,1,1,0,0},
@@ -229,17 +232,16 @@ struct redisCommand redisCommandTable[] = {
     {"publish",publishCommand,3,"rpf",0,NULL,0,0,0,0,0},
     {"watch",watchCommand,-2,"rs",0,noPreloadGetKeys,1,-1,1,0,0},
     {"unwatch",unwatchCommand,1,"rs",0,NULL,0,0,0,0,0},
-    {"cluster",clusterCommand,-2,"ar",0,NULL,0,0,0,0,0},
     {"restore",restoreCommand,4,"awm",0,NULL,1,1,1,0,0},
     {"migrate",migrateCommand,6,"aw",0,NULL,0,0,0,0,0},
-    {"asking",askingCommand,1,"r",0,NULL,0,0,0,0,0},
     {"dump",dumpCommand,2,"ar",0,NULL,1,1,1,0,0},
     {"object",objectCommand,-2,"r",0,NULL,2,2,2,0,0},
     {"client",clientCommand,-2,"ar",0,NULL,0,0,0,0,0},
     {"eval",evalCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
     {"evalsha",evalShaCommand,-3,"wms",0,zunionInterGetKeys,0,0,0,0,0},
     {"slowlog",slowlogCommand,-2,"r",0,NULL,0,0,0,0,0},
-    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0}
+    {"script",scriptCommand,-2,"ras",0,NULL,0,0,0,0,0},
+    {"time",timeCommand,1,"rR",0,NULL,0,0,0,0,0}
 };
 
 /*============================ Utility functions ============================ */
@@ -503,17 +505,6 @@ dictType keylistDictType = {
     dictListDestructor          /* val destructor */
 };
 
-/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
- * clusterNode structures. */
-dictType clusterNodesDictType = {
-    dictSdsHash,                /* hash function */
-    NULL,                       /* key dup */
-    NULL,                       /* val dup */
-    dictSdsKeyCompare,          /* key compare */
-    dictSdsDestructor,          /* key destructor */
-    NULL                        /* val destructor */
-};
-
 int htNeedsResize(dict *dict) {
     long long size, used;
 
@@ -612,6 +603,75 @@ void updateLRUClock(void) {
                                                 REDIS_LRU_CLOCK_MAX;
 }
 
+
+/* Add a sample to the operations per second array of samples. */
+void trackOperationsPerSecond(void) {
+    long long t = mstime() - server.ops_sec_last_sample_time;
+    long long ops = server.stat_numcommands - server.ops_sec_last_sample_ops;
+    long long ops_sec;
+
+    ops_sec = t > 0 ? (ops*1000/t) : 0;
+
+    server.ops_sec_samples[server.ops_sec_idx] = ops_sec;
+    server.ops_sec_idx = (server.ops_sec_idx+1) % REDIS_OPS_SEC_SAMPLES;
+    server.ops_sec_last_sample_time = mstime();
+    server.ops_sec_last_sample_ops = server.stat_numcommands;
+}
+
+/* Return the mean of all the samples. */
+long long getOperationsPerSecond(void) {
+    int j;
+    long long sum = 0;
+
+    for (j = 0; j < REDIS_OPS_SEC_SAMPLES; j++)
+        sum += server.ops_sec_samples[j];
+    return sum / REDIS_OPS_SEC_SAMPLES;
+}
+
+void clientsCronHandleTimeout(redisClient *c) {
+    time_t now = time(NULL);
+
+    if (server.maxidletime &&
+        !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
+        !(c->flags & REDIS_MASTER) &&   /* no timeout for masters */
+        !(c->flags & REDIS_BLOCKED) &&  /* no timeout for BLPOP */
+        dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
+        listLength(c->pubsub_patterns) == 0 &&
+        (now - c->lastinteraction > server.maxidletime))
+    {
+        redisLog(REDIS_VERBOSE,"Closing idle client");
+        freeClient(c);
+    } else if (c->flags & REDIS_BLOCKED) {
+        if (c->bpop.timeout != 0 && c->bpop.timeout < now) {
+            addReply(c,shared.nullmultibulk);
+            unblockClientWaitingData(c);
+        }
+    }
+}
+
+void clientsCron(void) {
+    /* Make sure to process at least 1/100 of clients per call.
+     * Since this function is called 10 times per second we are sure that
+     * in the worst case we process all the clients in 10 seconds.
+     * In normal conditions (a reasonable number of clients) we process
+     * all the clients in a shorter time. */
+    int iterations = listLength(server.clients)/100;
+    if (iterations < 50) iterations = 50;
+
+    while(listLength(server.clients) && iterations--) {
+        redisClient *c;
+        listNode *head;
+
+        /* Rotate the list, take the current head, process.
+         * This way if the client must be removed from the list it's the
+         * first element and we don't incur into O(N) computation. */
+        listRotate(server.clients);
+        head = listFirst(server.clients);
+        c = listNodeValue(head);
+        clientsCronHandleTimeout(c);
+    }
+}
+
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     int j, loops = server.cronloops;
     REDIS_NOTUSED(eventLoop);
@@ -624,6 +684,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * To access a global var is faster than calling time(NULL) */
     server.unixtime = time(NULL);
 
+    trackOperationsPerSecond();
+
     /* We have just 22 bits per object for LRU information.
      * So we use an (eventually wrapping) LRU clock with 10 seconds resolution.
      * 2^22 bits with 10 seconds resoluton is more or less 1.5 years.
@@ -681,9 +743,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
             zmalloc_used_memory());
     }
 
-    /* Close connections of timedout clients */
-    if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
-        closeTimedoutClients();
+    /* We need to do a few operations on clients asynchronously. */
+    clientsCron();
 
     /* Start a scheduled AOF rewrite if this was requested by the user while
      * a BGSAVE was in progress. */
@@ -761,9 +822,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
      * to detect transfer failures. */
     if (!(loops % 10)) replicationCron();
 
-    /* Run other sub-systems specific cron jobs */
-    if (server.cluster_enabled && !(loops % 10)) clusterCron();
-
     server.cronloops++;
     return 100;
 }
@@ -829,6 +887,8 @@ void createSharedObjects(void) {
         "-LOADING Redis is loading the dataset in memory\r\n"));
     shared.slowscripterr = createObject(REDIS_STRING,sdsnew(
         "-BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.\r\n"));
+    shared.bgsaveerr = createObject(REDIS_STRING,sdsnew(
+        "-MISCONF Redis is configured to save RDB snapshots, but is currently not able to persist on disk. Write commands are disabled. Please check Redis logs for details about the error.\r\n"));
     shared.space = createObject(REDIS_STRING,sdsnew(" "));
     shared.colon = createObject(REDIS_STRING,sdsnew(":"));
     shared.plus = createObject(REDIS_STRING,sdsnew("+"));
@@ -848,15 +908,25 @@ void createSharedObjects(void) {
     shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
-    shared.mbulk3 = createStringObject("*3\r\n",4);
-    shared.mbulk4 = createStringObject("*4\r\n",4);
+    shared.del = createStringObject("DEL",3);
+    shared.rpop = createStringObject("RPOP",4);
+    shared.lpop = createStringObject("LPOP",4);
     for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
         shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
         shared.integers[j]->encoding = REDIS_ENCODING_INT;
     }
+    for (j = 0; j < REDIS_SHARED_BULKHDR_LEN; j++) {
+        shared.mbulkhdr[j] = createObject(REDIS_STRING,
+            sdscatprintf(sdsempty(),"*%d\r\n",j));
+        shared.bulkhdr[j] = createObject(REDIS_STRING,
+            sdscatprintf(sdsempty(),"$%d\r\n",j));
+    }
 }
 
 void initServerConfig() {
+    getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
+    server.runid[REDIS_RUN_ID_SIZE] = '\0';
+    server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
     server.port = REDIS_SERVERPORT;
     server.bindaddr = NULL;
     server.unixsocket = NULL;
@@ -896,8 +966,8 @@ void initServerConfig() {
     server.maxmemory = 0;
     server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
     server.maxmemory_samples = 3;
-    server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
-    server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
+    server.hash_max_ziplist_entries = REDIS_HASH_MAX_ZIPLIST_ENTRIES;
+    server.hash_max_ziplist_value = REDIS_HASH_MAX_ZIPLIST_VALUE;
     server.list_max_ziplist_entries = REDIS_LIST_MAX_ZIPLIST_ENTRIES;
     server.list_max_ziplist_value = REDIS_LIST_MAX_ZIPLIST_VALUE;
     server.set_max_intset_entries = REDIS_SET_MAX_INTSET_ENTRIES;
@@ -906,8 +976,6 @@ void initServerConfig() {
     server.shutdown_asap = 0;
     server.repl_ping_slave_period = REDIS_REPL_PING_SLAVE_PERIOD;
     server.repl_timeout = REDIS_REPL_TIMEOUT;
-    server.cluster_enabled = 0;
-    server.cluster.configfile = zstrdup("nodes.conf");
     server.lua_caller = NULL;
     server.lua_time_limit = REDIS_LUA_TIME_LIMIT;
     server.lua_client = NULL;
@@ -953,6 +1021,7 @@ void initServerConfig() {
     populateCommandTable();
     server.delCommand = lookupCommandByCString("del");
     server.multiCommand = lookupCommandByCString("multi");
+    server.lpushCommand = lookupCommandByCString("lpush");
     
     /* Slow log */
     server.slowlog_log_slower_than = REDIS_SLOWLOG_LOG_SLOWER_THAN;
@@ -1074,7 +1143,13 @@ void initServer() {
     server.stat_peak_memory = 0;
     server.stat_fork_time = 0;
     server.stat_rejected_conn = 0;
+    memset(server.ops_sec_samples,0,sizeof(server.ops_sec_samples));
+    server.ops_sec_idx = 0;
+    server.ops_sec_last_sample_time = mstime();
+    server.ops_sec_last_sample_ops = 0;
     server.unixtime = time(NULL);
+    server.lastbgsave_status = REDIS_OK;
+    server.stop_writes_on_bgsave_err = 1;
     aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
     if (server.ipfd > 0 && aeCreateFileEvent(server.el,server.ipfd,AE_READABLE,
         acceptTcpHandler,NULL) == AE_ERR) oom("creating file event");
@@ -1091,7 +1166,16 @@ void initServer() {
         }
     }
 
-    if (server.cluster_enabled) clusterInit();
+    /* 32 bit instances are limited to 4GB of address space, so if there is
+     * no explicit limit in the user provided configuration we set a limit
+     * at 3.5GB using maxmemory with 'noeviction' policy'. This saves
+     * useless crashes of the Redis instance. */
+    if (server.arch_bits == 32 && server.maxmemory == 0) {
+        redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3.5 GB maxmemory limit with 'noeviction' policy now.");
+        server.maxmemory = 3584LL*(1024*1024); /* 3584 MB = 3.5 GB */
+        server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION;
+    }
+
     scriptingInit();
     slowlogInit();
     bioInit();
@@ -1118,6 +1202,7 @@ void populateCommandTable(void) {
             case 'f': c->flags |= REDIS_CMD_FORCE_REPLICATION; break;
             case 's': c->flags |= REDIS_CMD_NOSCRIPT; break;
             case 'R': c->flags |= REDIS_CMD_RANDOM; break;
+            case 'S': c->flags |= REDIS_CMD_SORT_FOR_SCRIPT; break;
             default: redisPanic("Unsupported command flag"); break;
             }
             f++;
@@ -1140,6 +1225,43 @@ void resetCommandTableStats(void) {
     }
 }
 
+/* ========================== Redis OP Array API ============================ */
+
+void redisOpArrayInit(redisOpArray *oa) {
+    oa->ops = NULL;
+    oa->numops = 0;
+}
+
+int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
+                       robj **argv, int argc, int target)
+{
+    redisOp *op;
+
+    oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
+    op = oa->ops+oa->numops;
+    op->cmd = cmd;
+    op->dbid = dbid;
+    op->argv = argv;
+    op->argc = argc;
+    op->target = target;
+    oa->numops++;
+    return oa->numops;
+}
+
+void redisOpArrayFree(redisOpArray *oa) {
+    while(oa->numops) {
+        int j;
+        redisOp *op;
+
+        oa->numops--;
+        op = oa->ops+oa->numops;
+        for (j = 0; j < op->argc; j++)
+            decrRefCount(op->argv[j]);
+        zfree(op->argv);
+    }
+    zfree(oa->ops);
+}
+
 /* ====================== Commands lookup and execution ===================== */
 
 struct redisCommand *lookupCommand(sds name) {
@@ -1155,25 +1277,84 @@ struct redisCommand *lookupCommandByCString(char *s) {
     return cmd;
 }
 
+/* Propagate the specified command (in the context of the specified database id)
+ * to AOF, Slaves and Monitors.
+ *
+ * flags are an xor between:
+ * + REDIS_PROPAGATE_NONE (no propagation of command at all)
+ * + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)
+ * + REDIS_PROPAGATE_REPL (propagate into the replication link)
+ */
+void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+               int flags)
+{
+    if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)
+        feedAppendOnlyFile(cmd,dbid,argv,argc);
+    if (flags & REDIS_PROPAGATE_REPL && listLength(server.slaves))
+        replicationFeedSlaves(server.slaves,dbid,argv,argc);
+}
+
+/* Used inside commands to schedule the propagation of additional commands
+ * after the current command is propagated to AOF / Replication. */
+void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
+                   int target)
+{
+    redisOpArrayAppend(&server.also_propagate,cmd,dbid,argv,argc,target);
+}
+
 /* Call() is the core of Redis execution of a command */
-void call(redisClient *c) {
+void call(redisClient *c, int flags) {
     long long dirty, start = ustime(), duration;
 
+    /* Sent the command to clients in MONITOR mode, only if the commands are
+     * not geneated from reading an AOF. */
+    if (listLength(server.monitors) && !server.loading)
+        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
+
+    /* Call the command. */
+    redisOpArrayInit(&server.also_propagate);
     dirty = server.dirty;
     c->cmd->proc(c);
     dirty = server.dirty-dirty;
     duration = ustime()-start;
-    c->cmd->microseconds += duration;
-    slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
-    c->cmd->calls++;
-
-    if (server.aof_state != REDIS_AOF_OFF && dirty > 0)
-        feedAppendOnlyFile(c->cmd,c->db->id,c->argv,c->argc);
-    if ((dirty > 0 || c->cmd->flags & REDIS_CMD_FORCE_REPLICATION) &&
-        listLength(server.slaves))
-        replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
-    if (listLength(server.monitors))
-        replicationFeedMonitors(server.monitors,c->db->id,c->argv,c->argc);
+
+    /* When EVAL is called loading the AOF we don't want commands called
+     * from Lua to go into the slowlog or to populate statistics. */
+    if (server.loading && c->flags & REDIS_LUA_CLIENT)
+        flags &= ~(REDIS_CALL_SLOWLOG | REDIS_CALL_STATS);
+
+    /* Log the command into the Slow log if needed, and populate the
+     * per-command statistics that we show in INFO commandstats. */
+    if (flags & REDIS_CALL_SLOWLOG)
+        slowlogPushEntryIfNeeded(c->argv,c->argc,duration);
+    if (flags & REDIS_CALL_STATS) {
+        c->cmd->microseconds += duration;
+        c->cmd->calls++;
+    }
+
+    /* Propagate the command into the AOF and replication link */
+    if (flags & REDIS_CALL_PROPAGATE) {
+        int flags = REDIS_PROPAGATE_NONE;
+
+        if (c->cmd->flags & REDIS_CMD_FORCE_REPLICATION)
+            flags |= REDIS_PROPAGATE_REPL;
+        if (dirty)
+            flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);
+        if (flags != REDIS_PROPAGATE_NONE)
+            propagate(c->cmd,c->db->id,c->argv,c->argc,flags);
+    }
+    /* Commands such as LPUSH or BRPOPLPUSH may propagate an additional
+     * PUSH command. */
+    if (server.also_propagate.numops) {
+        int j;
+        redisOp *rop;
+
+        for (j = 0; j < server.also_propagate.numops; j++) {
+            rop = &server.also_propagate.ops[j];
+            propagate(rop->cmd, rop->dbid, rop->argv, rop->argc, rop->target);
+        }
+        redisOpArrayFree(&server.also_propagate);
+    }
     server.stat_numcommands++;
 }
 
@@ -1217,39 +1398,27 @@ int processCommand(redisClient *c) {
         return REDIS_OK;
     }
 
-    /* If cluster is enabled, redirect here */
-    if (server.cluster_enabled &&
-                !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) {
-        int hashslot;
-
-        if (server.cluster.state != REDIS_CLUSTER_OK) {
-            addReplyError(c,"The cluster is down. Check with CLUSTER INFO for more information");
-            return REDIS_OK;
-        } else {
-            int ask;
-            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&ask);
-            if (n == NULL) {
-                addReplyError(c,"Multi keys request invalid in cluster");
-                return REDIS_OK;
-            } else if (n != server.cluster.myself) {
-                addReplySds(c,sdscatprintf(sdsempty(),
-                    "-%s %d %s:%d\r\n", ask ? "ASK" : "MOVED",
-                    hashslot,n->ip,n->port));
-                return REDIS_OK;
-            }
-        }
-    }
-
     /* Handle the maxmemory directive.
      *
      * First we try to free some memory if possible (if there are volatile
      * keys in the dataset). If there are not the only thing we can do
      * is returning an error. */
-    if (server.maxmemory) freeMemoryIfNeeded();
-    if (server.maxmemory && (c->cmd->flags & REDIS_CMD_DENYOOM) &&
-        zmalloc_used_memory() > server.maxmemory)
+    if (server.maxmemory) {
+        int retval = freeMemoryIfNeeded();
+        if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
+            addReplyError(c,
+                "command not allowed when used memory > 'maxmemory'");
+            return REDIS_OK;
+        }
+    }
+
+    /* Don't accept write commands if there are problems persisting on disk. */
+    if (server.stop_writes_on_bgsave_err &&
+        server.saveparamslen > 0
+        && server.lastbgsave_status == REDIS_ERR &&
+        c->cmd->flags & REDIS_CMD_WRITE)
     {
-        addReplyError(c,"command not allowed when used memory > 'maxmemory'");
+        addReply(c, shared.bgsaveerr);
         return REDIS_OK;
     }
 
@@ -1302,7 +1471,7 @@ int processCommand(redisClient *c) {
         queueMultiCommand(c);
         addReply(c,shared.queued);
     } else {
-        call(c);
+        call(c,REDIS_CALL_FULL);
     }
     return REDIS_OK;
 }
@@ -1385,6 +1554,17 @@ void echoCommand(redisClient *c) {
     addReplyBulk(c,c->argv[1]);
 }
 
+void timeCommand(redisClient *c) {
+    struct timeval tv;
+
+    /* gettimeofday() can only fail if &tv is a bad addresss so we
+     * don't check for errors. */
+    gettimeofday(&tv,NULL);
+    addReplyMultiBulkLen(c,2);
+    addReplyBulkLongLong(c,tv.tv_sec);
+    addReplyBulkLongLong(c,tv.tv_usec);
+}
+
 /* Convert an amount of bytes into a human readable string in the form
  * of 100B, 2G, 100M, 4K, and so forth. */
 void bytesToHuman(char *s, unsigned long long n) {
@@ -1435,10 +1615,11 @@ sds genRedisInfoString(char *section) {
             "redis_version:%s\r\n"
             "redis_git_sha1:%s\r\n"
             "redis_git_dirty:%d\r\n"
-            "arch_bits:%s\r\n"
+            "arch_bits:%d\r\n"
             "multiplexing_api:%s\r\n"
             "gcc_version:%d.%d.%d\r\n"
             "process_id:%ld\r\n"
+            "run_id:%s\r\n"
             "tcp_port:%d\r\n"
             "uptime_in_seconds:%ld\r\n"
             "uptime_in_days:%ld\r\n"
@@ -1446,7 +1627,7 @@ sds genRedisInfoString(char *section) {
             REDIS_VERSION,
             redisGitSHA1(),
             strtol(redisGitDirty(),NULL,10) > 0,
-            (sizeof(long) == 8) ? "64" : "32",
+            server.arch_bits,
             aeGetApiName(),
 #ifdef __GNUC__
             __GNUC__,__GNUC_MINOR__,__GNUC_PATCHLEVEL__,
@@ -1454,6 +1635,7 @@ sds genRedisInfoString(char *section) {
             0,0,0,
 #endif
             (long) getpid(),
+            server.runid,
             server.port,
             uptime,
             uptime/(3600*24),
@@ -1465,7 +1647,7 @@ sds genRedisInfoString(char *section) {
         if (sections++) info = sdscat(info,"\r\n");
         info = sdscatprintf(info,
             "# Clients\r\n"
-            "connected_clients:%d\r\n"
+            "connected_clients:%lu\r\n"
             "client_longest_output_list:%lu\r\n"
             "client_biggest_input_buf:%lu\r\n"
             "blocked_clients:%d\r\n",
@@ -1513,12 +1695,14 @@ sds genRedisInfoString(char *section) {
             "changes_since_last_save:%lld\r\n"
             "bgsave_in_progress:%d\r\n"
             "last_save_time:%ld\r\n"
+            "last_bgsave_status:%s\r\n"
             "bgrewriteaof_in_progress:%d\r\n",
             server.loading,
             server.aof_state != REDIS_AOF_OFF,
             server.dirty,
             server.rdb_child_pid != -1,
             server.lastsave,
+            server.lastbgsave_status == REDIS_OK ? "ok" : "err",
             server.aof_child_pid != -1);
 
         if (server.aof_state != REDIS_AOF_OFF) {
@@ -1574,16 +1758,18 @@ sds genRedisInfoString(char *section) {
             "# Stats\r\n"
             "total_connections_received:%lld\r\n"
             "total_commands_processed:%lld\r\n"
+            "instantaneous_ops_per_sec:%lld\r\n"
             "rejected_connections:%lld\r\n"
             "expired_keys:%lld\r\n"
             "evicted_keys:%lld\r\n"
             "keyspace_hits:%lld\r\n"
             "keyspace_misses:%lld\r\n"
             "pubsub_channels:%ld\r\n"
-            "pubsub_patterns:%u\r\n"
+            "pubsub_patterns:%lu\r\n"
             "latest_fork_usec:%lld\r\n",
             server.stat_numconnections,
             server.stat_numcommands,
+            getOperationsPerSecond(),
             server.stat_rejected_conn,
             server.stat_expiredkeys,
             server.stat_evictedkeys,
@@ -1633,7 +1819,7 @@ sds genRedisInfoString(char *section) {
             }
         }
         info = sdscatprintf(info,
-            "connected_slaves:%d\r\n",
+            "connected_slaves:%lu\r\n",
             listLength(server.slaves));
         if (listLength(server.slaves)) {
             int slaveid = 0;
@@ -1699,15 +1885,6 @@ sds genRedisInfoString(char *section) {
         }
     }
 
-    /* Clusetr */
-    if (allsections || defsections || !strcasecmp(section,"cluster")) {
-        if (sections++) info = sdscat(info,"\r\n");
-        info = sdscatprintf(info,
-        "# Cluster\r\n"
-        "cluster_enabled:%d\r\n",
-        server.cluster_enabled);
-    }
-
     /* Key space */
     if (allsections || defsections || !strcasecmp(section,"keyspace")) {
         if (sections++) info = sdscat(info,"\r\n");
@@ -1753,23 +1930,57 @@ void monitorCommand(redisClient *c) {
 /* ============================ Maxmemory directive  ======================== */
 
 /* This function gets called when 'maxmemory' is set on the config file to limit
- * the max memory used by the server, and we are out of memory.
- * This function will try to, in order:
+ * the max memory used by the server, before processing a command.
+ *
+ * The goal of the function is to free enough memory to keep Redis under the
+ * configured memory limit.
  *
- * - Free objects from the free list
- * - Try to remove keys with an EXPIRE set
+ * The function starts calculating how many bytes should be freed to keep
+ * Redis under the limit, and enters a loop selecting the best keys to
+ * evict accordingly to the configured policy.
  *
- * It is not possible to free enough memory to reach used-memory < maxmemory
- * the server will start refusing commands that will enlarge even more the
- * memory usage.
+ * If all the bytes needed to return back under the limit were freed the
+ * function returns REDIS_OK, otherwise REDIS_ERR is returned, and the caller
+ * should block the execution of commands that will result in more memory
+ * used by the server.
  */
-void freeMemoryIfNeeded(void) {
-    /* Remove keys accordingly to the active policy as long as we are
-     * over the memory limit. */
-    if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION) return;
+int freeMemoryIfNeeded(void) {
+    size_t mem_used, mem_tofree, mem_freed;
+    int slaves = listLength(server.slaves);
+
+    /* Remove the size of slaves output buffers and AOF buffer from the
+     * count of used memory. */
+    mem_used = zmalloc_used_memory();
+    if (slaves) {
+        listIter li;
+        listNode *ln;
+
+        listRewind(server.slaves,&li);
+        while((ln = listNext(&li))) {
+            redisClient *slave = listNodeValue(ln);
+            unsigned long obuf_bytes = getClientOutputBufferMemoryUsage(slave);
+            if (obuf_bytes > mem_used)
+                mem_used = 0;
+            else
+                mem_used -= obuf_bytes;
+        }
+    }
+    if (server.aof_state != REDIS_AOF_OFF) {
+        mem_used -= sdslen(server.aof_buf);
+        mem_used -= sdslen(server.aof_rewrite_buf);
+    }
 
-    while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
-        int j, k, freed = 0;
+    /* Check if we are over the memory limit. */
+    if (mem_used <= server.maxmemory) return REDIS_OK;
+
+    if (server.maxmemory_policy == REDIS_MAXMEMORY_NO_EVICTION)
+        return REDIS_ERR; /* We need to free memory, but policy forbids. */
+
+    /* Compute how much memory we need to free. */
+    mem_tofree = mem_used - server.maxmemory;
+    mem_freed = 0;
+    while (mem_freed < mem_tofree) {
+        int j, k, keys_freed = 0;
 
         for (j = 0; j < server.dbnum; j++) {
             long bestval = 0; /* just to prevent warning */
@@ -1842,16 +2053,36 @@ void freeMemoryIfNeeded(void) {
 
             /* Finally remove the selected key. */
             if (bestkey) {
+                long long delta;
+
                 robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
                 propagateExpire(db,keyobj);
+                /* We compute the amount of memory freed by dbDelete() alone.
+                 * It is possible that actually the memory needed to propagate
+                 * the DEL in AOF and replication link is greater than the one
+                 * we are freeing removing the key, but we can't account for
+                 * that otherwise we would never exit the loop.
+                 *
+                 * AOF and Output buffer memory will be freed eventually so
+                 * we only care about memory used by the key space. */
+                delta = (long long) zmalloc_used_memory();
                 dbDelete(db,keyobj);
+                delta -= (long long) zmalloc_used_memory();
+                mem_freed += delta;
                 server.stat_evictedkeys++;
                 decrRefCount(keyobj);
-                freed++;
+                keys_freed++;
+
+                /* When the memory to free starts to be big enough, we may
+                 * start spending so much time here that is impossible to
+                 * deliver data to the slaves fast enough, so we force the
+                 * transmission here inside the loop. */
+                if (slaves) flushSlavesOutputBuffers();
             }
         }
-        if (!freed) return; /* nothing to free... */
+        if (!keys_freed) return REDIS_ERR; /* nothing to free... */
     }
+    return REDIS_OK;
 }
 
 /* =================================== Main! ================================ */
@@ -1933,7 +2164,7 @@ void redisAsciiArt(void) {
         redisGitSHA1(),
         strtol(redisGitDirty(),NULL,10) > 0,
         (sizeof(long) == 8) ? "64" : "32",
-        server.cluster_enabled ? "cluster" : "stand alone",
+        "stand alone",
         server.port,
         (long) getpid()
     );