]> git.saurik.com Git - redis.git/blobdiff - redis.c
another minor fix for Solaris boxes
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index 1237a048d25f92221ed7528475756ebe2c8cb39d..2761541c95236c388cff19e8a0181a4769e078e3 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -27,7 +27,7 @@
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
-#define REDIS_VERSION "0.091"
+#define REDIS_VERSION "0.101"
 
 #include "fmacros.h"
 
@@ -56,7 +56,8 @@
 #include "dict.h"   /* Hash tables */
 #include "adlist.h" /* Linked lists */
 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
-#include "lzf.h"
+#include "lzf.h"    /* LZF compression library */
+#include "pqsort.h" /* Partial qsort for SORT+LIMIT */
 
 /* Error codes */
 #define REDIS_OK                0
@@ -67,7 +68,7 @@
 #define REDIS_MAXIDLETIME       (60*5)  /* default client timeout */
 #define REDIS_IOBUF_LEN         1024
 #define REDIS_LOADBUF_LEN       1024
-#define REDIS_MAX_ARGS          16
+#define REDIS_STATIC_ARGS       4
 #define REDIS_DEFAULT_DBNUM     16
 #define REDIS_CONFIGLINE_MAX    1024
 #define REDIS_OBJFREELIST_MAX   1000000 /* Max number of objects to cache */
 #define REDIS_HT_MINSLOTS       16384   /* Never resize the HT under this */
 
 /* Command flags */
-#define REDIS_CMD_BULK          1
-#define REDIS_CMD_INLINE        2
+#define REDIS_CMD_BULK          1       /* Bulk write command */
+#define REDIS_CMD_INLINE        2       /* Inline command */
+/* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
+   this flags will return an error when the 'maxmemory' option is set in the
+   config file and the server is using more than maxmemory bytes of memory.
+   In short this commands are denied on low memory conditions. */
+#define REDIS_CMD_DENYOOM       4
 
 /* Object types */
 #define REDIS_STRING 0
@@ -183,7 +189,7 @@ typedef struct redisClient {
     redisDb *db;
     int dictid;
     sds querybuf;
-    robj *argv[REDIS_MAX_ARGS];
+    robj **argv;
     int argc;
     int bulklen;            /* bulk read len. -1 if not in bulk read mode */
     list *reply;
@@ -244,6 +250,8 @@ struct redisServer {
     int masterport;
     redisClient *master;    /* client that is master for this slave */
     int replstate;
+    unsigned int maxclients;
+    unsigned int maxmemory;
     /* Sort parameters - qsort_r() is only available under BSD so we
      * have to take this state global, in order to pass it to sortCompare() */
     int sort_desc;
@@ -305,6 +313,7 @@ static int deleteKey(redisDb *db, robj *key);
 static time_t getExpire(redisDb *db, robj *key);
 static int setExpire(redisDb *db, robj *key, time_t when);
 static void updateSalvesWaitingBgsave(int bgsaveerr);
+static void freeMemoryIfNeeded(void);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
@@ -348,6 +357,8 @@ static void sinterCommand(redisClient *c);
 static void sinterstoreCommand(redisClient *c);
 static void sunionCommand(redisClient *c);
 static void sunionstoreCommand(redisClient *c);
+static void sdiffCommand(redisClient *c);
+static void sdiffstoreCommand(redisClient *c);
 static void syncCommand(redisClient *c);
 static void flushdbCommand(redisClient *c);
 static void flushallCommand(redisClient *c);
@@ -358,6 +369,8 @@ static void mgetCommand(redisClient *c);
 static void monitorCommand(redisClient *c);
 static void expireCommand(redisClient *c);
 static void getSetCommand(redisClient *c);
+static void ttlCommand(redisClient *c);
+static void slaveofCommand(redisClient *c);
 
 /*================================= Globals ================================= */
 
@@ -365,41 +378,44 @@ static void getSetCommand(redisClient *c);
 static struct redisServer server; /* server global state */
 static struct redisCommand cmdTable[] = {
     {"get",getCommand,2,REDIS_CMD_INLINE},
-    {"set",setCommand,3,REDIS_CMD_BULK},
-    {"setnx",setnxCommand,3,REDIS_CMD_BULK},
-    {"del",delCommand,2,REDIS_CMD_INLINE},
+    {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
+    {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
+    {"del",delCommand,-2,REDIS_CMD_INLINE},
     {"exists",existsCommand,2,REDIS_CMD_INLINE},
-    {"incr",incrCommand,2,REDIS_CMD_INLINE},
-    {"decr",decrCommand,2,REDIS_CMD_INLINE},
+    {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
+    {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
     {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
-    {"rpush",rpushCommand,3,REDIS_CMD_BULK},
-    {"lpush",lpushCommand,3,REDIS_CMD_BULK},
+    {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
+    {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
     {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
     {"llen",llenCommand,2,REDIS_CMD_INLINE},
     {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
-    {"lset",lsetCommand,4,REDIS_CMD_BULK},
+    {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
     {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
     {"lrem",lremCommand,4,REDIS_CMD_BULK},
-    {"sadd",saddCommand,3,REDIS_CMD_BULK},
+    {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"srem",sremCommand,3,REDIS_CMD_BULK},
     {"smove",smoveCommand,4,REDIS_CMD_BULK},
     {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
     {"scard",scardCommand,2,REDIS_CMD_INLINE},
-    {"sinter",sinterCommand,-2,REDIS_CMD_INLINE},
-    {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE},
-    {"sunion",sunionCommand,-2,REDIS_CMD_INLINE},
-    {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE},
+    {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
+    {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
+    {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
+    {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
+    {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
+    {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
     {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
-    {"incrby",incrbyCommand,3,REDIS_CMD_INLINE},
-    {"decrby",decrbyCommand,3,REDIS_CMD_INLINE},
-    {"getset",getSetCommand,3,REDIS_CMD_BULK},
+    {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
+    {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
+    {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
     {"select",selectCommand,2,REDIS_CMD_INLINE},
     {"move",moveCommand,3,REDIS_CMD_INLINE},
     {"rename",renameCommand,3,REDIS_CMD_INLINE},
     {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
+    {"expire",expireCommand,3,REDIS_CMD_INLINE},
     {"keys",keysCommand,2,REDIS_CMD_INLINE},
     {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
     {"auth",authCommand,2,REDIS_CMD_INLINE},
@@ -413,10 +429,11 @@ static struct redisCommand cmdTable[] = {
     {"sync",syncCommand,1,REDIS_CMD_INLINE},
     {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
     {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
-    {"sort",sortCommand,-2,REDIS_CMD_INLINE},
+    {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
     {"info",infoCommand,1,REDIS_CMD_INLINE},
     {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
-    {"expire",expireCommand,3,REDIS_CMD_INLINE},
+    {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
+    {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
     {NULL,NULL,0,0}
 };
 
@@ -556,7 +573,12 @@ void redisLog(int level, const char *fmt, ...)
     va_start(ap, fmt);
     if (level >= server.verbosity) {
         char *c = ".-*";
-        fprintf(fp,"%c ",c[level]);
+        char buf[64];
+        time_t now;
+
+        now = time(NULL);
+        strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
+        fprintf(fp,"%s %c ",buf,c[level]);
         vfprintf(fp, fmt, ap);
         fprintf(fp,"\n");
         fflush(fp);
@@ -645,6 +667,7 @@ void closeTimedoutClients(void) {
     while ((ln = listYield(server.clients)) != NULL) {
         c = listNodeValue(ln);
         if (!(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
+            !(c->flags & REDIS_MASTER) &&   /* no timeout for masters */
              (now - c->lastinteraction > server.maxidletime)) {
             redisLog(REDIS_DEBUG,"Closing idle client");
             freeClient(c);
@@ -652,6 +675,25 @@ void closeTimedoutClients(void) {
     }
 }
 
+/* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
+ * we resize the hash table to save memory */
+void tryResizeHashTables(void) {
+    int j;
+
+    for (j = 0; j < server.dbnum; j++) {
+        long long size, used;
+
+        size = dictSlots(server.db[j].dict);
+        used = dictSize(server.db[j].dict);
+        if (size && used && size > REDIS_HT_MINSLOTS &&
+            (used*100/size < REDIS_HT_MINFILL)) {
+            redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
+            dictResize(server.db[j].dict);
+            redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
+        }
+    }
+}
+
 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     int j, loops = server.cronloops++;
     REDIS_NOTUSED(eventLoop);
@@ -661,10 +703,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     /* Update the global state with the amount of used memory */
     server.usedmemory = zmalloc_used_memory();
 
-    /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
-     * we resize the hash table to save memory */
+    /* Show some info about non-empty databases */
     for (j = 0; j < server.dbnum; j++) {
-        int size, used, vkeys;
+        long long size, used, vkeys;
 
         size = dictSlots(server.db[j].dict);
         used = dictSize(server.db[j].dict);
@@ -673,14 +714,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
             redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
             /* dictPrintStats(server.dict); */
         }
-        if (size && used && size > REDIS_HT_MINSLOTS &&
-            (used*100/size < REDIS_HT_MINFILL)) {
-            redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
-            dictResize(server.db[j].dict);
-            redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
-        }
     }
 
+    /* We don't want to resize the hash tables while a bacground saving
+     * is in progress: the saving child is created using fork() that is
+     * implemented with a copy-on-write semantic in most modern systems, so
+     * if we resize the HT while there is the saving child at work actually
+     * a lot of memory movements in the parent will cause a lot of pages
+     * copied. */
+    if (!server.bgsaveinprogress) tryResizeHashTables();
+
     /* Show information about connected clients */
     if (!(loops % 5)) {
         redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
@@ -691,7 +734,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     }
 
     /* Close connections of timedout clients */
-    if (!(loops % 10))
+    if (server.maxidletime && !(loops % 10))
         closeTimedoutClients();
 
     /* Check if a background saving in progress terminated */
@@ -827,6 +870,8 @@ static void initServerConfig() {
     server.dbfilename = "dump.rdb";
     server.requirepass = NULL;
     server.shareobjects = 0;
+    server.maxclients = 0;
+    server.maxmemory = 0;
     ResetServerSaveParams();
 
     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
@@ -930,7 +975,7 @@ static void loadServerConfig(char *filename) {
         /* Execute config directives */
         if (!strcasecmp(argv[0],"timeout") && argc == 2) {
             server.maxidletime = atoi(argv[1]);
-            if (server.maxidletime < 1) {
+            if (server.maxidletime < 0) {
                 err = "Invalid timeout value"; goto loaderr;
             }
         } else if (!strcasecmp(argv[0],"port") && argc == 2) {
@@ -985,6 +1030,10 @@ static void loadServerConfig(char *filename) {
             if (server.dbnum < 1) {
                 err = "Invalid number of databases"; goto loaderr;
             }
+        } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
+            server.maxclients = atoi(argv[1]);
+        } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
+            server.maxmemory = atoi(argv[1]);
         } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
             server.masterhost = sdsnew(argv[1]);
             server.masterport = atoi(argv[2]);
@@ -1058,6 +1107,7 @@ static void freeClient(redisClient *c) {
         server.master = NULL;
         server.replstate = REDIS_REPL_CONNECT;
     }
+    zfree(c->argv);
     zfree(c);
 }
 
@@ -1086,7 +1136,8 @@ static void glueReplyBuffersIfNeeded(redisClient *c) {
             listDelNode(c->reply,ln);
         }
         /* Now the output buffer is empty, add the new single element */
-        addReplySds(c,sdsnewlen(buf,totlen));
+        o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
+        if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
     }
 }
 
@@ -1166,6 +1217,9 @@ static int processCommand(redisClient *c) {
     struct redisCommand *cmd;
     long long dirty;
 
+    /* Free some memory if needed (maxmemory setting) */
+    if (server.maxmemory) freeMemoryIfNeeded();
+
     /* The QUIT command is handled as a special case. Normal command
      * procs are unable to close the client connection safely */
     if (!strcasecmp(c->argv[0]->ptr,"quit")) {
@@ -1182,6 +1236,10 @@ static int processCommand(redisClient *c) {
         addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
         resetClient(c);
         return 1;
+    } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
+        addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
+        resetClient(c);
+        return 1;
     } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
         int bulklen = atoi(c->argv[c->argc-1]->ptr);
 
@@ -1237,8 +1295,17 @@ static int processCommand(redisClient *c) {
 
 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
     listNode *ln;
-    robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */
     int outc = 0, j;
+    robj **outv;
+    /* (args*2)+1 is enough room for args, spaces, newlines */
+    robj *static_outv[REDIS_STATIC_ARGS*2+1];
+
+    if (argc <= REDIS_STATIC_ARGS) {
+        outv = static_outv;
+    } else {
+        outv = zmalloc(sizeof(robj*)*(argc*2+1));
+        if (!outv) oom("replicationFeedSlaves");
+    }
     
     for (j = 0; j < argc; j++) {
         if (j != 0) outv[outc++] = shared.space;
@@ -1292,6 +1359,7 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di
         for (j = 0; j < outc; j++) addReply(slave,outv[j]);
     }
     for (j = 0; j < outc; j++) decrRefCount(outv[j]);
+    if (outv != static_outv) zfree(outv);
 }
 
 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
@@ -1349,9 +1417,14 @@ again:
                 return;
             }
             argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
-            sdsfree(query);
             if (argv == NULL) oom("sdssplitlen");
-            for (j = 0; j < argc && j < REDIS_MAX_ARGS; j++) {
+            sdsfree(query);
+
+            if (c->argv) zfree(c->argv);
+            c->argv = zmalloc(sizeof(robj*)*argc);
+            if (c->argv == NULL) oom("allocating arguments list for client");
+
+            for (j = 0; j < argc; j++) {
                 if (sdslen(argv[j])) {
                     c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
                     c->argc++;
@@ -1365,7 +1438,7 @@ again:
              * on the query buffer try to process the next command. */
             if (processCommand(c) && sdslen(c->querybuf)) goto again;
             return;
-        } else if (sdslen(c->querybuf) >= 1024) {
+        } else if (sdslen(c->querybuf) >= 1024*32) {
             redisLog(REDIS_DEBUG, "Client protocol error");
             freeClient(c);
             return;
@@ -1410,6 +1483,7 @@ static redisClient *createClient(int fd) {
     c->fd = fd;
     c->querybuf = sdsempty();
     c->argc = 0;
+    c->argv = NULL;
     c->bulklen = -1;
     c->sentlen = 0;
     c->flags = 0;
@@ -1447,6 +1521,7 @@ static void addReplySds(redisClient *c, sds s) {
 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     int cport, cfd;
     char cip[128];
+    redisClient *c;
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(mask);
     REDIS_NOTUSED(privdata);
@@ -1457,11 +1532,23 @@ static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
         return;
     }
     redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
-    if (createClient(cfd) == NULL) {
+    if ((c = createClient(cfd)) == NULL) {
         redisLog(REDIS_WARNING,"Error allocating resoures for the client");
         close(cfd); /* May be already closed, just ingore errors */
         return;
     }
+    /* If maxclient directive is set and this is one client more... close the
+     * connection. Note that we create the client instead to check before
+     * for this condition, since now the socket is already set in nonblocking
+     * mode and we can send an error for free using the Kernel I/O */
+    if (server.maxclients && listLength(server.clients) > server.maxclients) {
+        char *err = "-ERR max number of clients reached\r\n";
+
+        /* That's a best effort error message, don't check write errors */
+        (void) write(c->fd,err,strlen(err));
+        freeClient(c);
+        return;
+    }
     server.stat_numconnections++;
 }
 
@@ -1879,6 +1966,11 @@ static int rdbSaveBackground(char *filename) {
         }
     } else {
         /* Parent */
+        if (childpid == -1) {
+            redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
+                strerror(errno));
+            return REDIS_ERR;
+        }
         redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
         server.bgsaveinprogress = 1;
         return REDIS_OK;
@@ -1970,6 +2062,7 @@ static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
     if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
     if (fread(c,clen,1,fp) == 0) goto err;
     if (lzf_decompress(c,clen,val,len) == 0) goto err;
+    zfree(c);
     return createObject(REDIS_STRING,val);
 err:
     zfree(c);
@@ -2263,11 +2356,24 @@ static void decrbyCommand(redisClient *c) {
 /* ========================= Type agnostic commands ========================= */
 
 static void delCommand(redisClient *c) {
-    if (deleteKey(c->db,c->argv[1])) {
-        server.dirty++;
-        addReply(c,shared.cone);
-    } else {
+    int deleted = 0, j;
+
+    for (j = 1; j < c->argc; j++) {
+        if (deleteKey(c->db,c->argv[j])) {
+            server.dirty++;
+            deleted++;
+        }
+    }
+    switch(deleted) {
+    case 0:
         addReply(c,shared.czero);
+        break;
+    case 1:
+        addReply(c,shared.cone);
+        break;
+    default:
+        addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
+        break;
     }
 }
 
@@ -2907,7 +3013,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r
     robj *lenobj = NULL, *dstset = NULL;
     int j, cardinality = 0;
 
-    if (!dv) oom("sinterCommand");
+    if (!dv) oom("sinterGenericCommand");
     for (j = 0; j < setsnum; j++) {
         robj *setobj;
 
@@ -2948,9 +3054,6 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r
         /* If we have a target key where to store the resulting set
          * create this key with an empty set inside */
         dstset = createSetObject();
-        deleteKey(c->db,dstkey);
-        dictAdd(c->db->dict,dstkey,dstset);
-        incrRefCount(dstkey);
     }
 
     /* Iterate all the elements of the first (smallest) set, and test
@@ -2979,10 +3082,18 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r
     }
     dictReleaseIterator(di);
 
+    if (dstkey) {
+        /* Store the resulting set into the target */
+        deleteKey(c->db,dstkey);
+        dictAdd(c->db->dict,dstkey,dstset);
+        incrRefCount(dstkey);
+    }
+
     if (!dstkey) {
         lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
     } else {
-        addReply(c,shared.ok);
+        addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
+            dictSize((dict*)dstset->ptr)));
         server.dirty++;
     }
     zfree(dv);
@@ -2996,14 +3107,17 @@ static void sinterstoreCommand(redisClient *c) {
     sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
 }
 
-static void sunionGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
+#define REDIS_OP_UNION 0
+#define REDIS_OP_DIFF 1
+
+static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
     dict **dv = zmalloc(sizeof(dict*)*setsnum);
     dictIterator *di;
     dictEntry *de;
-    robj *lenobj = NULL, *dstset = NULL;
+    robj *dstset = NULL;
     int j, cardinality = 0;
 
-    if (!dv) oom("sunionCommand");
+    if (!dv) oom("sunionDiffGenericCommand");
     for (j = 0; j < setsnum; j++) {
         robj *setobj;
 
@@ -3027,27 +3141,10 @@ static void sunionGenericCommand(redisClient *c, robj **setskeys, int setsnum, r
      * this set object will be the resulting object to set into the target key*/
     dstset = createSetObject();
 
-    /* The first thing we should output is the total number of elements...
-     * since this is a multi-bulk write, but at this stage we don't know
-     * the intersection set size, so we use a trick, append an empty object
-     * to the output list and save the pointer to later modify it with the
-     * right length */
-    if (!dstkey) {
-        lenobj = createObject(REDIS_STRING,NULL);
-        addReply(c,lenobj);
-        decrRefCount(lenobj);
-    } else {
-        /* If we have a target key where to store the resulting set
-         * create this key with an empty set inside */
-        deleteKey(c->db,dstkey);
-        dictAdd(c->db->dict,dstkey,dstset);
-        incrRefCount(dstkey);
-        server.dirty++;
-    }
-
     /* Iterate all the elements of all the sets, add every element a single
      * time to the result set */
     for (j = 0; j < setsnum; j++) {
+        if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
         if (!dv[j]) continue; /* non existing keys are like empty sets */
 
         di = dictGetIterator(dv[j]);
@@ -3058,36 +3155,70 @@ static void sunionGenericCommand(redisClient *c, robj **setskeys, int setsnum, r
 
             /* dictAdd will not add the same element multiple times */
             ele = dictGetEntryKey(de);
-            if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
-                incrRefCount(ele);
-                if (!dstkey) {
-                    addReplySds(c,sdscatprintf(sdsempty(),
-                            "$%d\r\n",sdslen(ele->ptr)));
-                    addReply(c,ele);
-                    addReply(c,shared.crlf);
+            if (op == REDIS_OP_UNION || j == 0) {
+                if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
+                    incrRefCount(ele);
                     cardinality++;
                 }
+            } else if (op == REDIS_OP_DIFF) {
+                if (dictDelete(dstset->ptr,ele) == DICT_OK) {
+                    cardinality--;
+                }
             }
         }
         dictReleaseIterator(di);
+
+        if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
     }
 
+    /* Output the content of the resulting set, if not in STORE mode */
+    if (!dstkey) {
+        addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
+        di = dictGetIterator(dstset->ptr);
+        if (!di) oom("dictGetIterator");
+        while((de = dictNext(di)) != NULL) {
+            robj *ele;
+
+            ele = dictGetEntryKey(de);
+            addReplySds(c,sdscatprintf(sdsempty(),
+                    "$%d\r\n",sdslen(ele->ptr)));
+            addReply(c,ele);
+            addReply(c,shared.crlf);
+        }
+        dictReleaseIterator(di);
+    } else {
+        /* If we have a target key where to store the resulting set
+         * create this key with the result set inside */
+        deleteKey(c->db,dstkey);
+        dictAdd(c->db->dict,dstkey,dstset);
+        incrRefCount(dstkey);
+    }
+
+    /* Cleanup */
     if (!dstkey) {
-        lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
         decrRefCount(dstset);
     } else {
-        addReply(c,shared.ok);
+        addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
+            dictSize((dict*)dstset->ptr)));
         server.dirty++;
     }
     zfree(dv);
 }
 
 static void sunionCommand(redisClient *c) {
-    sunionGenericCommand(c,c->argv+1,c->argc-1,NULL);
+    sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
 }
 
 static void sunionstoreCommand(redisClient *c) {
-    sunionGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
+    sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
+}
+
+static void sdiffCommand(redisClient *c) {
+    sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
+}
+
+static void sdiffstoreCommand(redisClient *c) {
+    sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
 }
 
 static void flushdbCommand(redisClient *c) {
@@ -3337,7 +3468,10 @@ static void sortCommand(redisClient *c) {
         server.sort_desc = desc;
         server.sort_alpha = alpha;
         server.sort_bypattern = sortby ? 1 : 0;
-        qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
+        if (sortby && (start != 0 || end != vectorlen-1))
+            pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
+        else
+            qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
     }
 
     /* Send command output to the output buffer, performing the specified
@@ -3389,6 +3523,8 @@ static void infoCommand(redisClient *c) {
     
     info = sdscatprintf(sdsempty(),
         "redis_version:%s\r\n"
+        "uptime_in_seconds:%d\r\n"
+        "uptime_in_days:%d\r\n"
         "connected_clients:%d\r\n"
         "connected_slaves:%d\r\n"
         "used_memory:%zu\r\n"
@@ -3397,9 +3533,10 @@ static void infoCommand(redisClient *c) {
         "last_save_time:%d\r\n"
         "total_connections_received:%lld\r\n"
         "total_commands_processed:%lld\r\n"
-        "uptime_in_seconds:%d\r\n"
-        "uptime_in_days:%d\r\n"
+        "role:%s\r\n"
         ,REDIS_VERSION,
+        uptime,
+        uptime/(3600*24),
         listLength(server.clients)-listLength(server.slaves),
         listLength(server.slaves),
         server.usedmemory,
@@ -3408,9 +3545,21 @@ static void infoCommand(redisClient *c) {
         server.lastsave,
         server.stat_numconnections,
         server.stat_numcommands,
-        uptime,
-        uptime/(3600*24)
+        server.masterhost == NULL ? "master" : "slave"
     );
+    if (server.masterhost) {
+        info = sdscatprintf(info,
+            "master_host:%s\r\n"
+            "master_port:%d\r\n"
+            "master_link_status:%s\r\n"
+            "master_last_io_seconds_ago:%d\r\n"
+            ,server.masterhost,
+            server.masterport,
+            (server.replstate == REDIS_REPL_CONNECTED) ?
+                "up" : "down",
+            (int)(time(NULL)-server.master->lastinteraction)
+        );
+    }
     addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
     addReplySds(c,info);
     addReply(c,shared.crlf);
@@ -3508,6 +3657,18 @@ static void expireCommand(redisClient *c) {
     }
 }
 
+static void ttlCommand(redisClient *c) {
+    time_t expire;
+    int ttl = -1;
+
+    expire = getExpire(c->db,c->argv[1]);
+    if (expire != -1) {
+        ttl = (int) (expire-time(NULL));
+        if (ttl < 0) ttl = -1;
+    }
+    addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
+}
+
 /* =============================== Replication  ============================= */
 
 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
@@ -3753,7 +3914,7 @@ static int syncWithMaster(void) {
         return REDIS_ERR;
     }
     /* Read the bulk write count */
-    if (syncReadLine(fd,buf,1024,5) == -1) {
+    if (syncReadLine(fd,buf,1024,3600) == -1) {
         close(fd);
         redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
             strerror(errno));
@@ -3808,8 +3969,104 @@ static int syncWithMaster(void) {
     return REDIS_OK;
 }
 
+static void slaveofCommand(redisClient *c) {
+    if (!strcasecmp(c->argv[1]->ptr,"no") &&
+        !strcasecmp(c->argv[2]->ptr,"one")) {
+        if (server.masterhost) {
+            sdsfree(server.masterhost);
+            server.masterhost = NULL;
+            if (server.master) freeClient(server.master);
+            server.replstate = REDIS_REPL_NONE;
+            redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
+        }
+    } else {
+        sdsfree(server.masterhost);
+        server.masterhost = sdsdup(c->argv[1]->ptr);
+        server.masterport = atoi(c->argv[2]->ptr);
+        if (server.master) freeClient(server.master);
+        server.replstate = REDIS_REPL_CONNECT;
+        redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
+            server.masterhost, server.masterport);
+    }
+    addReply(c,shared.ok);
+}
+
+/* ============================ Maxmemory directive  ======================== */
+
+/* This function gets called when 'maxmemory' is set on the config file to limit
+ * the max memory used by the server, and we are out of memory.
+ * This function will try to, in order:
+ *
+ * - Free objects from the free list
+ * - Try to remove keys with an EXPIRE set
+ *
+ * It is not possible to free enough memory to reach used-memory < maxmemory
+ * the server will start refusing commands that will enlarge even more the
+ * memory usage.
+ */
+static void freeMemoryIfNeeded(void) {
+    while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
+        if (listLength(server.objfreelist)) {
+            robj *o;
+
+            listNode *head = listFirst(server.objfreelist);
+            o = listNodeValue(head);
+            listDelNode(server.objfreelist,head);
+            zfree(o);
+        } else {
+            int j, k, freed = 0;
+
+            for (j = 0; j < server.dbnum; j++) {
+                int minttl = -1;
+                robj *minkey = NULL;
+                struct dictEntry *de;
+
+                if (dictSize(server.db[j].expires)) {
+                    freed = 1;
+                    /* From a sample of three keys drop the one nearest to
+                     * the natural expire */
+                    for (k = 0; k < 3; k++) {
+                        time_t t;
+
+                        de = dictGetRandomKey(server.db[j].expires);
+                        t = (time_t) dictGetEntryVal(de);
+                        if (minttl == -1 || t < minttl) {
+                            minkey = dictGetEntryKey(de);
+                            minttl = t;
+                        }
+                    }
+                    deleteKey(server.db+j,minkey);
+                }
+            }
+            if (!freed) return; /* nothing to free... */
+        }
+    }
+}
+
 /* =================================== Main! ================================ */
 
+#ifdef __linux__
+int linuxOvercommitMemoryValue(void) {
+    FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
+    char buf[64];
+
+    if (!fp) return -1;
+    if (fgets(buf,64,fp) == NULL) {
+        fclose(fp);
+        return -1;
+    }
+    fclose(fp);
+
+    return atoi(buf);
+}
+
+void linuxOvercommitMemoryWarning(void) {
+    if (linuxOvercommitMemoryValue() == 0) {
+        redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
+    }
+}
+#endif /* __linux__ */
+
 static void daemonize(void) {
     int fd;
     FILE *fp;
@@ -3835,6 +4092,10 @@ static void daemonize(void) {
 }
 
 int main(int argc, char **argv) {
+#ifdef __linux__
+    linuxOvercommitMemoryWarning();
+#endif
+
     initServerConfig();
     if (argc == 2) {
         ResetServerSaveParams();
@@ -3842,6 +4103,8 @@ int main(int argc, char **argv) {
     } else if (argc > 2) {
         fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
         exit(1);
+    } else {
+        redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
     }
     initServer();
     if (server.daemonize) daemonize();