]> git.saurik.com Git - redis.git/blobdiff - redis.c
Better to increment the version minor number when a VM bug is fixed... it will be...
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index bc1593198af3bc719e7137e7e71b115beb1ae7f0..dce4eb2d869f3c8373a41c20dd8268145b73c28a 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
+ * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -27,7 +27,7 @@
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
-#define REDIS_VERSION "1.3.3"
+#define REDIS_VERSION "1.3.4"
 
 #include "fmacros.h"
 #include "config.h"
@@ -426,6 +426,10 @@ struct redisCommand {
     redisCommandProc *proc;
     int arity;
     int flags;
+    /* What keys should be loaded in background when calling this command? */
+    int vm_firstkey; /* The first argument that's a key (0 = no keys) */
+    int vm_lastkey;  /* THe last argument that's a key */
+    int vm_keystep;  /* The step between first and last key */
 };
 
 struct redisFunctionSym {
@@ -641,6 +645,7 @@ static void zaddCommand(redisClient *c);
 static void zincrbyCommand(redisClient *c);
 static void zrangeCommand(redisClient *c);
 static void zrangebyscoreCommand(redisClient *c);
+static void zcountCommand(redisClient *c);
 static void zrevrangeCommand(redisClient *c);
 static void zcardCommand(redisClient *c);
 static void zremCommand(redisClient *c);
@@ -648,93 +653,98 @@ static void zscoreCommand(redisClient *c);
 static void zremrangebyscoreCommand(redisClient *c);
 static void multiCommand(redisClient *c);
 static void execCommand(redisClient *c);
+static void discardCommand(redisClient *c);
 static void blpopCommand(redisClient *c);
 static void brpopCommand(redisClient *c);
+static void appendCommand(redisClient *c);
 
 /*================================= Globals ================================= */
 
 /* Global vars */
 static struct redisServer server; /* server global state */
 static struct redisCommand cmdTable[] = {
-    {"get",getCommand,2,REDIS_CMD_INLINE},
-    {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"del",delCommand,-2,REDIS_CMD_INLINE},
-    {"exists",existsCommand,2,REDIS_CMD_INLINE},
-    {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
-    {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
-    {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
-    {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
-    {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
-    {"llen",llenCommand,2,REDIS_CMD_INLINE},
-    {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
-    {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
-    {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
-    {"lrem",lremCommand,4,REDIS_CMD_BULK},
-    {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"srem",sremCommand,3,REDIS_CMD_BULK},
-    {"smove",smoveCommand,4,REDIS_CMD_BULK},
-    {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
-    {"scard",scardCommand,2,REDIS_CMD_INLINE},
-    {"spop",spopCommand,2,REDIS_CMD_INLINE},
-    {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
-    {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
-    {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"zrem",zremCommand,3,REDIS_CMD_BULK},
-    {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
-    {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
-    {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
-    {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
-    {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
-    {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
-    {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
-    {"select",selectCommand,2,REDIS_CMD_INLINE},
-    {"move",moveCommand,3,REDIS_CMD_INLINE},
-    {"rename",renameCommand,3,REDIS_CMD_INLINE},
-    {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
-    {"expire",expireCommand,3,REDIS_CMD_INLINE},
-    {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
-    {"keys",keysCommand,2,REDIS_CMD_INLINE},
-    {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
-    {"auth",authCommand,2,REDIS_CMD_INLINE},
-    {"ping",pingCommand,1,REDIS_CMD_INLINE},
-    {"echo",echoCommand,2,REDIS_CMD_BULK},
-    {"save",saveCommand,1,REDIS_CMD_INLINE},
-    {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
-    {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
-    {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
-    {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
-    {"type",typeCommand,2,REDIS_CMD_INLINE},
-    {"multi",multiCommand,1,REDIS_CMD_INLINE},
-    {"exec",execCommand,1,REDIS_CMD_INLINE},
-    {"sync",syncCommand,1,REDIS_CMD_INLINE},
-    {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
-    {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
-    {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
-    {"info",infoCommand,1,REDIS_CMD_INLINE},
-    {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
-    {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
-    {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
-    {"debug",debugCommand,-2,REDIS_CMD_INLINE},
-    {NULL,NULL,0,0}
+    {"get",getCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,0,0,0},
+    {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,0,0,0},
+    {"append",appendCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
+    {"del",delCommand,-2,REDIS_CMD_INLINE,0,0,0},
+    {"exists",existsCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
+    {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
+    {"mget",mgetCommand,-2,REDIS_CMD_INLINE,1,-1,1},
+    {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
+    {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
+    {"rpop",rpopCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"lpop",lpopCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"brpop",brpopCommand,-3,REDIS_CMD_INLINE,1,1,1},
+    {"blpop",blpopCommand,-3,REDIS_CMD_INLINE,1,1,1},
+    {"llen",llenCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"lindex",lindexCommand,3,REDIS_CMD_INLINE,1,1,1},
+    {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
+    {"lrange",lrangeCommand,4,REDIS_CMD_INLINE,1,1,1},
+    {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE,1,1,1},
+    {"lrem",lremCommand,4,REDIS_CMD_BULK,1,1,1},
+    {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,2,1},
+    {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
+    {"srem",sremCommand,3,REDIS_CMD_BULK,1,1,1},
+    {"smove",smoveCommand,4,REDIS_CMD_BULK,1,2,1},
+    {"sismember",sismemberCommand,3,REDIS_CMD_BULK,1,1,1},
+    {"scard",scardCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"spop",spopCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,-1,1},
+    {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,2,-1,1},
+    {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,-1,1},
+    {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,2,-1,1},
+    {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,-1,1},
+    {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,2,-1,1},
+    {"smembers",sinterCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
+    {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
+    {"zrem",zremCommand,3,REDIS_CMD_BULK,1,1,1},
+    {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,1,1,1},
+    {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,1,1,1},
+    {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,1,1,1},
+    {"zcount",zcountCommand,4,REDIS_CMD_INLINE,1,1,1},
+    {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE,1,1,1},
+    {"zcard",zcardCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
+    {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
+    {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
+    {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
+    {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,-1,2},
+    {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,-1,2},
+    {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"select",selectCommand,2,REDIS_CMD_INLINE,0,0,0},
+    {"move",moveCommand,3,REDIS_CMD_INLINE,1,1,1},
+    {"rename",renameCommand,3,REDIS_CMD_INLINE,1,1,1},
+    {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE,1,1,1},
+    {"expire",expireCommand,3,REDIS_CMD_INLINE,0,0,0},
+    {"expireat",expireatCommand,3,REDIS_CMD_INLINE,0,0,0},
+    {"keys",keysCommand,2,REDIS_CMD_INLINE,0,0,0},
+    {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"auth",authCommand,2,REDIS_CMD_INLINE,0,0,0},
+    {"ping",pingCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"echo",echoCommand,2,REDIS_CMD_BULK,0,0,0},
+    {"save",saveCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"type",typeCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"multi",multiCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"exec",execCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"discard",discardCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"sync",syncCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"flushall",flushallCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
+    {"info",infoCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"monitor",monitorCommand,1,REDIS_CMD_INLINE,0,0,0},
+    {"ttl",ttlCommand,2,REDIS_CMD_INLINE,1,1,1},
+    {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,0,0,0},
+    {"debug",debugCommand,-2,REDIS_CMD_INLINE,0,0,0},
+    {NULL,NULL,0,0,0,0,0}
 };
 
 /*============================ Utility functions ============================ */
@@ -871,7 +881,7 @@ static void redisLog(int level, const char *fmt, ...) {
 
     va_start(ap, fmt);
     if (level >= server.verbosity) {
-        char *c = ".-*";
+        char *c = ".-*#";
         char buf[64];
         time_t now;
 
@@ -954,10 +964,24 @@ static int dictEncObjKeyCompare(void *privdata, const void *key1,
 static unsigned int dictEncObjHash(const void *key) {
     robj *o = (robj*) key;
 
-    o = getDecodedObject(o);
-    unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
-    decrRefCount(o);
-    return hash;
+    if (o->encoding == REDIS_ENCODING_RAW) {
+        return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
+    } else {
+        if (o->encoding == REDIS_ENCODING_INT) {
+            char buf[32];
+            int len;
+
+            len = snprintf(buf,32,"%ld",(long)o->ptr);
+            return dictGenHashFunction((unsigned char*)buf, len);
+        } else {
+            unsigned int hash;
+
+            o = getDecodedObject(o);
+            hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
+            decrRefCount(o);
+            return hash;
+        }
+    }
 }
 
 /* Sets type and expires */
@@ -2119,7 +2143,7 @@ static int processCommand(redisClient *c) {
     }
 
     /* Exec the command */
-    if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
+    if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) {
         queueMultiCommand(c,cmd);
         addReply(c,shared.queued);
     } else {
@@ -2312,7 +2336,8 @@ static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mas
     } else {
         return;
     }
-    processInputBuffer(c);
+    if (!(c->flags & REDIS_BLOCKED))
+        processInputBuffer(c);
 }
 
 static int selectDb(redisClient *c, int id) {
@@ -2324,7 +2349,7 @@ static int selectDb(redisClient *c, int id) {
 
 static void *dupClientReplyValue(void *o) {
     incrRefCount((robj*)o);
-    return 0;
+    return o;
 }
 
 static redisClient *createClient(int fd) {
@@ -2392,6 +2417,14 @@ static void addReplyDouble(redisClient *c, double d) {
         (unsigned long) strlen(buf),buf));
 }
 
+static void addReplyLong(redisClient *c, long l) {
+    char buf[128];
+    size_t len;
+
+    len = snprintf(buf,sizeof(buf),":%ld\r\n",l);
+    addReplySds(c,sdsnewlen(buf,len));
+}
+
 static void addReplyBulkLen(redisClient *c, robj *obj) {
     size_t len;
 
@@ -3335,6 +3368,10 @@ static robj *rdbLoadObject(int type, FILE *fp) {
 
         if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
         o = (type == REDIS_LIST) ? createListObject() : createSetObject();
+        /* It's faster to expand the dict to the right size asap in order
+         * to avoid rehashing */
+        if (type == REDIS_SET && listlen > DICT_HT_INITIAL_SIZE)
+            dictExpand(o->ptr,listlen);
         /* Load every single element of the list/set */
         while(listlen--) {
             robj *ele;
@@ -3677,6 +3714,52 @@ static void decrbyCommand(redisClient *c) {
     incrDecrCommand(c,-incr);
 }
 
+static void appendCommand(redisClient *c) {
+    int retval;
+    size_t totlen;
+    robj *o;
+
+    o = lookupKeyWrite(c->db,c->argv[1]);
+    if (o == NULL) {
+        /* Create the key */
+        retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
+        incrRefCount(c->argv[1]);
+        incrRefCount(c->argv[2]);
+        totlen = stringObjectLen(c->argv[2]);
+    } else {
+        dictEntry *de;
+       
+        de = dictFind(c->db->dict,c->argv[1]);
+        assert(de != NULL);
+
+        o = dictGetEntryVal(de);
+        if (o->type != REDIS_STRING) {
+            addReply(c,shared.wrongtypeerr);
+            return;
+        }
+        /* If the object is specially encoded or shared we have to make
+         * a copy */
+        if (o->refcount != 1 || o->encoding != REDIS_ENCODING_RAW) {
+            robj *decoded = getDecodedObject(o);
+
+            o = createStringObject(decoded->ptr, sdslen(decoded->ptr));
+            decrRefCount(decoded);
+            dictReplace(c->db->dict,c->argv[1],o);
+        }
+        /* APPEND! */
+        if (c->argv[2]->encoding == REDIS_ENCODING_RAW) {
+            o->ptr = sdscatlen(o->ptr,
+                c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
+        } else {
+            o->ptr = sdscatprintf(o->ptr, "%ld",
+                (unsigned long) c->argv[2]->ptr);
+        }
+        totlen = sdslen(o->ptr);
+    }
+    server.dirty++;
+    addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",(unsigned long)totlen));
+}
+
 /* ========================= Type agnostic commands ========================= */
 
 static void delCommand(redisClient *c) {
@@ -3737,7 +3820,7 @@ static void keysCommand(redisClient *c) {
     dictEntry *de;
     sds pattern = c->argv[1]->ptr;
     int plen = sdslen(pattern);
-    unsigned long numkeys = 0, keyslen = 0;
+    unsigned long numkeys = 0;
     robj *lenobj = createObject(REDIS_STRING,NULL);
 
     di = dictGetIterator(c->db->dict);
@@ -3750,17 +3833,15 @@ static void keysCommand(redisClient *c) {
         if ((pattern[0] == '*' && pattern[1] == '\0') ||
             stringmatchlen(pattern,plen,key,sdslen(key),0)) {
             if (expireIfNeeded(c->db,keyobj) == 0) {
-                if (numkeys != 0)
-                    addReply(c,shared.space);
+                addReplyBulkLen(c,keyobj);
                 addReply(c,keyobj);
+                addReply(c,shared.crlf);
                 numkeys++;
-                keyslen += sdslen(key);
             }
         }
     }
     dictReleaseIterator(di);
-    lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
-    addReply(c,shared.crlf);
+    lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",numkeys);
 }
 
 static void dbsizeCommand(redisClient *c) {
@@ -5126,28 +5207,64 @@ static void zrevrangeCommand(redisClient *c) {
     zrangeGenericCommand(c,1);
 }
 
-static void zrangebyscoreCommand(redisClient *c) {
+/* This command implements both ZRANGEBYSCORE and ZCOUNT.
+ * If justcount is non-zero, just the count is returned. */
+static void genericZrangebyscoreCommand(redisClient *c, int justcount) {
     robj *o;
-    double min = strtod(c->argv[2]->ptr,NULL);
-    double max = strtod(c->argv[3]->ptr,NULL);
+    double min, max;
+    int minex = 0, maxex = 0; /* are min or max exclusive? */
     int offset = 0, limit = -1;
+    int withscores = 0;
+    int badsyntax = 0;
+
+    /* Parse the min-max interval. If one of the values is prefixed
+     * by the "(" character, it's considered "open". For instance
+     * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max
+     * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */
+    if (((char*)c->argv[2]->ptr)[0] == '(') {
+        min = strtod((char*)c->argv[2]->ptr+1,NULL);
+        minex = 1;
+    } else {
+        min = strtod(c->argv[2]->ptr,NULL);
+    }
+    if (((char*)c->argv[3]->ptr)[0] == '(') {
+        max = strtod((char*)c->argv[3]->ptr+1,NULL);
+        maxex = 1;
+    } else {
+        max = strtod(c->argv[3]->ptr,NULL);
+    }
 
-    if (c->argc != 4 && c->argc != 7) {
+    /* Parse "WITHSCORES": note that if the command was called with
+     * the name ZCOUNT then we are sure that c->argc == 4, so we'll never
+     * enter the following paths to parse WITHSCORES and LIMIT. */
+    if (c->argc == 5 || c->argc == 8) {
+        if (strcasecmp(c->argv[c->argc-1]->ptr,"withscores") == 0)
+            withscores = 1;
+        else
+            badsyntax = 1;
+    }
+    if (c->argc != (4 + withscores) && c->argc != (7 + withscores))
+        badsyntax = 1;
+    if (badsyntax) {
         addReplySds(c,
             sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
         return;
-    } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
+    }
+
+    /* Parse "LIMIT" */
+    if (c->argc == (7 + withscores) && strcasecmp(c->argv[4]->ptr,"limit")) {
         addReply(c,shared.syntaxerr);
         return;
-    } else if (c->argc == 7) {
+    } else if (c->argc == (7 + withscores)) {
         offset = atoi(c->argv[5]->ptr);
         limit = atoi(c->argv[6]->ptr);
         if (offset < 0) offset = 0;
     }
 
+    /* Ok, lookup the key and get the range */
     o = lookupKeyRead(c->db,c->argv[1]);
     if (o == NULL) {
-        addReply(c,shared.nullmultibulk);
+        addReply(c,justcount ? shared.czero : shared.nullmultibulk);
     } else {
         if (o->type != REDIS_ZSET) {
             addReply(c,shared.wrongtypeerr);
@@ -5155,14 +5272,17 @@ static void zrangebyscoreCommand(redisClient *c) {
             zset *zsetobj = o->ptr;
             zskiplist *zsl = zsetobj->zsl;
             zskiplistNode *ln;
-            robj *ele, *lenobj;
-            unsigned int rangelen = 0;
+            robj *ele, *lenobj = NULL;
+            unsigned long rangelen = 0;
 
-            /* Get the first node with the score >= min */
+            /* Get the first node with the score >= min, or with
+             * score > min if 'minex' is true. */
             ln = zslFirstWithScore(zsl,min);
+            while (minex && ln && ln->score == min) ln = ln->forward[0];
+
             if (ln == NULL) {
                 /* No element matching the speciifed interval */
-                addReply(c,shared.emptymultibulk);
+                addReply(c,justcount ? shared.czero : shared.emptymultibulk);
                 return;
             }
 
@@ -5170,30 +5290,49 @@ static void zrangebyscoreCommand(redisClient *c) {
              * are in the list, so we push this object that will represent
              * the multi-bulk length in the output buffer, and will "fix"
              * it later */
-            lenobj = createObject(REDIS_STRING,NULL);
-            addReply(c,lenobj);
-            decrRefCount(lenobj);
+            if (!justcount) {
+                lenobj = createObject(REDIS_STRING,NULL);
+                addReply(c,lenobj);
+                decrRefCount(lenobj);
+            }
 
-            while(ln && ln->score <= max) {
+            while(ln && (maxex ? (ln->score < max) : (ln->score <= max))) {
                 if (offset) {
                     offset--;
                     ln = ln->forward[0];
                     continue;
                 }
                 if (limit == 0) break;
-                ele = ln->obj;
-                addReplyBulkLen(c,ele);
-                addReply(c,ele);
-                addReply(c,shared.crlf);
+                if (!justcount) {
+                    ele = ln->obj;
+                    addReplyBulkLen(c,ele);
+                    addReply(c,ele);
+                    addReply(c,shared.crlf);
+                    if (withscores)
+                        addReplyDouble(c,ln->score);
+                }
                 ln = ln->forward[0];
                 rangelen++;
                 if (limit > 0) limit--;
             }
-            lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
+            if (justcount) {
+                addReplyLong(c,(long)rangelen);
+            } else {
+                lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",
+                     withscores ? (rangelen*2) : rangelen);
+            }
         }
     }
 }
 
+static void zrangebyscoreCommand(redisClient *c) {
+    genericZrangebyscoreCommand(c,0);
+}
+
+static void zcountCommand(redisClient *c) {
+    genericZrangebyscoreCommand(c,1);
+}
+
 static void zcardCommand(redisClient *c) {
     robj *o;
     zset *zs;
@@ -5912,6 +6051,18 @@ static void multiCommand(redisClient *c) {
     addReply(c,shared.ok);
 }
 
+static void discardCommand(redisClient *c) {
+    if (!(c->flags & REDIS_MULTI)) {
+        addReplySds(c,sdsnew("-ERR DISCARD without MULTI\r\n"));
+        return;
+    }
+
+    freeClientMultiState(c);
+    initClientMultiState(c);
+    c->flags &= (~REDIS_MULTI);
+    addReply(c,shared.ok);
+}
+
 static void execCommand(redisClient *c) {
     int j;
     robj **orig_argv;
@@ -6000,7 +6151,6 @@ static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeou
     }
     /* Mark the client as a blocked client */
     c->flags |= REDIS_BLOCKED;
-    aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
     server.blpop_blocked_clients++;
 }
 
@@ -6028,14 +6178,7 @@ static void unblockClientWaitingData(redisClient *c) {
     c->blockingkeys = NULL;
     c->flags &= (~REDIS_BLOCKED);
     server.blpop_blocked_clients--;
-    /* Ok now we are ready to get read events from socket, note that we
-     * can't trap errors here as it's possible that unblockClientWaitingDatas() is
-     * called from freeClient() itself, and the only thing we can do
-     * if we failed to register the READABLE event is to kill the client.
-     * Still the following function should never fail in the real world as
-     * we are sure the file descriptor is sane, and we exit on out of mem. */
-    aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
-    /* As a final step we want to process data if there is some command waiting
+    /* We want to process data if there is some command waiting
      * in the input buffer. Note that this is safe even if
      * unblockClientWaitingData() gets called from freeClient() because
      * freeClient() will be smart enough to call this function
@@ -7150,8 +7293,6 @@ static void vmMarkPageUsed(off_t page) {
     int bit = page&7;
     redisAssert(vmFreePage(page) == 1);
     server.vm_bitmap[byte] |= 1<<bit;
-    redisLog(REDIS_DEBUG,"Mark used: %lld (byte:%lld bit:%d)\n",
-        (long long)page, (long long)byte, bit);
 }
 
 /* Mark N contiguous pages as used, with 'page' being the first. */
@@ -7161,6 +7302,8 @@ static void vmMarkPagesUsed(off_t page, off_t count) {
     for (j = 0; j < count; j++)
         vmMarkPageUsed(page+j);
     server.vm_stats_used_pages += count;
+    redisLog(REDIS_DEBUG,"Mark USED pages: %lld pages at %lld\n",
+        (long long)count, (long long)page);
 }
 
 /* Mark the page as free */
@@ -7169,8 +7312,6 @@ static void vmMarkPageFree(off_t page) {
     int bit = page&7;
     redisAssert(vmFreePage(page) == 0);
     server.vm_bitmap[byte] &= ~(1<<bit);
-    redisLog(REDIS_DEBUG,"Mark free: %lld (byte:%lld bit:%d)\n",
-        (long long)page, (long long)byte, bit);
 }
 
 /* Mark N contiguous pages as free, with 'page' being the first. */
@@ -7180,9 +7321,8 @@ static void vmMarkPagesFree(off_t page, off_t count) {
     for (j = 0; j < count; j++)
         vmMarkPageFree(page+j);
     server.vm_stats_used_pages -= count;
-    if (server.vm_stats_used_pages > 100000000) {
-        *((char*)-1) = 'x';
-    }
+    redisLog(REDIS_DEBUG,"Mark FREE pages: %lld pages at %lld\n",
+        (long long)count, (long long)page);
 }
 
 /* Test if the page is free */
@@ -7233,7 +7373,6 @@ static int vmFindContiguousPages(off_t *first, off_t n) {
                 numfree = 0;
             }
         }
-        redisLog(REDIS_DEBUG, "THIS: %lld (%c)\n", (long long) this, vmFreePage(this) ? 'F' : 'X');
         if (vmFreePage(this)) {
             /* This is a free page */
             numfree++;
@@ -7241,6 +7380,7 @@ static int vmFindContiguousPages(off_t *first, off_t n) {
             if (numfree == n) {
                 *first = this-(n-1);
                 server.vm_next_page = this+1;
+                redisLog(REDIS_DEBUG, "FOUND CONTIGUOUS PAGES: %lld pages at %lld\n", (long long) n, (long long) *first);
                 return REDIS_OK;
             }
         } else {
@@ -7271,11 +7411,12 @@ static int vmWriteObjectOnSwap(robj *o, off_t page) {
     if (fseeko(server.vm_fp,page*server.vm_page_size,SEEK_SET) == -1) {
         if (server.vm_enabled) pthread_mutex_unlock(&server.io_swapfile_mutex);
         redisLog(REDIS_WARNING,
-            "Critical VM problem in vmSwapObjectBlocking(): can't seek: %s",
+            "Critical VM problem in vmWriteObjectOnSwap(): can't seek: %s",
             strerror(errno));
         return REDIS_ERR;
     }
     rdbSaveObject(server.vm_fp,o);
+    fflush(server.vm_fp);
     if (server.vm_enabled) pthread_mutex_unlock(&server.io_swapfile_mutex);
     return REDIS_OK;
 }
@@ -7303,7 +7444,6 @@ static int vmSwapObjectBlocking(robj *key, robj *val) {
         (unsigned long long) page, (unsigned long long) pages);
     server.vm_stats_swapped_objects++;
     server.vm_stats_swapouts++;
-    fflush(server.vm_fp);
     return REDIS_OK;
 }
 
@@ -7432,7 +7572,7 @@ static double computeObjectSwappability(robj *o) {
         }
         break;
     }
-    return (double)asize*log(1+asize);
+    return (double)age*log(1+asize);
 }
 
 /* Try to swap an object that's a good candidate for swapping.
@@ -7483,10 +7623,7 @@ static int vmSwapOneObject(int usethreads) {
             }
         }
     }
-    if (best == NULL) {
-        redisLog(REDIS_DEBUG,"No swappable key found!");
-        return REDIS_ERR;
-    }
+    if (best == NULL) return REDIS_ERR;
     key = dictGetEntryKey(best);
     val = dictGetEntryVal(best);
 
@@ -7790,8 +7927,8 @@ static void *IOThreadEntryPoint(void *arg) {
         lockThreadedIO();
         if (listLength(server.io_newjobs) == 0) {
             /* No new jobs in queue, exit. */
-            redisLog(REDIS_DEBUG,"Thread %lld exiting, nothing to do",
-                (long long) pthread_self());
+            redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
+                (long) pthread_self());
             server.io_active_threads--;
             unlockThreadedIO();
             return NULL;
@@ -7804,8 +7941,8 @@ static void *IOThreadEntryPoint(void *arg) {
         listAddNodeTail(server.io_processing,j);
         ln = listLast(server.io_processing); /* We use ln later to remove it */
         unlockThreadedIO();
-        redisLog(REDIS_DEBUG,"Thread %lld got a new job (type %d): %p about key '%s'",
-            (long long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
+        redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
+            (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
 
         /* Process the Job */
         if (j->type == REDIS_IOJOB_LOAD) {
@@ -7820,8 +7957,8 @@ static void *IOThreadEntryPoint(void *arg) {
         }
 
         /* Done: insert the job into the processed queue */
-        redisLog(REDIS_DEBUG,"Thread %lld completed the job: %p (key %s)",
-            (long long) pthread_self(), (void*)j, (char*)j->key->ptr);
+        redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
+            (long) pthread_self(), (void*)j, (char*)j->key->ptr);
         lockThreadedIO();
         listDelNode(server.io_processing,ln);
         listAddNodeTail(server.io_processed,j);
@@ -7998,9 +8135,13 @@ static int waitForSwappedKey(redisClient *c, robj *key) {
  * Return 1 if the client is marked as blocked, 0 if the client can
  * continue as the keys it is going to access appear to be in memory. */
 static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c) {
-    if (cmd->proc == getCommand) {
-        waitForSwappedKey(c,c->argv[1]);
-    }
+    int j, last;
+
+    if (cmd->vm_firstkey == 0) return 0;
+    last = cmd->vm_lastkey;
+    if (last < 0) last = c->argc+last;
+    for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep)
+        waitForSwappedKey(c,c->argv[j]);
     /* If the client was blocked for at least one key, mark it as blocked. */
     if (listLength(c->io_keys)) {
         c->flags |= REDIS_IO_WAIT;
@@ -8105,8 +8246,8 @@ static void debugCommand(redisClient *c) {
         }
         key = dictGetEntryKey(de);
         val = dictGetEntryVal(de);
-        if (server.vm_enabled && (key->storage == REDIS_VM_MEMORY ||
-                                  key->storage == REDIS_VM_SWAPPING)) {
+        if (!server.vm_enabled || (key->storage == REDIS_VM_MEMORY ||
+                                   key->storage == REDIS_VM_SWAPPING)) {
             addReplySds(c,sdscatprintf(sdsempty(),
                 "+Key at:%p refcount:%d, value at:%p refcount:%d "
                 "encoding:%d serializedlength:%lld\r\n",
@@ -8212,6 +8353,8 @@ static void daemonize(void) {
 }
 
 int main(int argc, char **argv) {
+    time_t start;
+
     initServerConfig();
     if (argc == 2) {
         resetServerSaveParams();
@@ -8228,12 +8371,13 @@ int main(int argc, char **argv) {
 #ifdef __linux__
     linuxOvercommitMemoryWarning();
 #endif
+    start = time(NULL);
     if (server.appendonly) {
         if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
-            redisLog(REDIS_NOTICE,"DB loaded from append only file");
+            redisLog(REDIS_NOTICE,"DB loaded from append only file: %ld seconds",time(NULL)-start);
     } else {
         if (rdbLoad(server.dbfilename) == REDIS_OK)
-            redisLog(REDIS_NOTICE,"DB loaded from disk");
+            redisLog(REDIS_NOTICE,"DB loaded from disk: %ld seconds",time(NULL)-start);
     }
     redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
     aeSetBeforeSleepProc(server.el,beforeSleep);