]> git.saurik.com Git - redis.git/blobdiff - redis.c
reduce code complexity because zipmapLen now is O(1)
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index 72cdd256b0590a713c6cd531fa7a97c0cd97ac0d..e69d3a3abf89b99aaff9cf2fdca493c13c5077eb 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -27,7 +27,7 @@
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
-#define REDIS_VERSION "1.3.6"
+#define REDIS_VERSION "1.3.7"
 
 #include "fmacros.h"
 #include "config.h"
 #define REDIS_MAXIDLETIME       (60*5)  /* default client timeout */
 #define REDIS_IOBUF_LEN         1024
 #define REDIS_LOADBUF_LEN       1024
-#define REDIS_STATIC_ARGS       4
+#define REDIS_STATIC_ARGS       8
 #define REDIS_DEFAULT_DBNUM     16
 #define REDIS_CONFIGLINE_MAX    1024
 #define REDIS_OBJFREELIST_MAX   1000000 /* Max number of objects to cache */
 #define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */
-#define REDIS_EXPIRELOOKUPS_PER_CRON    100 /* try to expire 100 keys/second */
+#define REDIS_EXPIRELOOKUPS_PER_CRON    10 /* try to expire 10 keys/loop */
 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
 
@@ -353,6 +353,7 @@ struct redisServer {
     time_t stat_starttime;         /* server start time */
     long long stat_numcommands;    /* number of processed commands */
     long long stat_numconnections; /* number of connections received */
+    long long stat_expiredkeys;   /* number of expired keys */
     /* Configuration */
     int verbosity;
     int glueoutputbuf;
@@ -520,7 +521,7 @@ typedef struct iojob {
     robj *val;  /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
                  * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
     off_t page; /* Swap page where to read/write the object */
-    off_t pages; /* Swap pages needed to safe object. PREPARE_SWAP return val */
+    off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */
     int canceled; /* True if this command was canceled by blocking side of VM */
     pthread_t thread; /* ID of the thread processing this entry */
 } iojob;
@@ -540,7 +541,7 @@ static void incrRefCount(robj *o);
 static int rdbSaveBackground(char *filename);
 static robj *createStringObject(char *ptr, size_t len);
 static robj *dupStringObject(robj *o);
-static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
+static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
 static int syncWithMaster(void);
 static robj *tryObjectSharing(robj *o);
@@ -695,6 +696,8 @@ static void hkeysCommand(redisClient *c);
 static void hvalsCommand(redisClient *c);
 static void hgetallCommand(redisClient *c);
 static void hexistsCommand(redisClient *c);
+static void configCommand(redisClient *c);
+static void hincrbyCommand(redisClient *c);
 
 /*================================= Globals ================================= */
 
@@ -754,6 +757,7 @@ static struct redisCommand cmdTable[] = {
     {"zrank",zrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
     {"zrevrank",zrevrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
     {"hset",hsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1},
+    {"hincrby",hincrbyCommand,4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1},
     {"hget",hgetCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
     {"hdel",hdelCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
     {"hlen",hlenCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
@@ -796,13 +800,16 @@ static struct redisCommand cmdTable[] = {
     {"ttl",ttlCommand,2,REDIS_CMD_INLINE,NULL,1,1,1},
     {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE,NULL,0,0,0},
     {"debug",debugCommand,-2,REDIS_CMD_INLINE,NULL,0,0,0},
+    {"config",configCommand,-2,REDIS_CMD_BULK,NULL,0,0,0},
     {NULL,NULL,0,0,NULL,0,0,0}
 };
 
+static void usage();
+
 /*============================ Utility functions ============================ */
 
 /* Glob-style pattern matching. */
-int stringmatchlen(const char *pattern, int patternLen,
+static int stringmatchlen(const char *pattern, int patternLen,
         const char *string, int stringLen, int nocase)
 {
     while(patternLen) {
@@ -924,6 +931,10 @@ int stringmatchlen(const char *pattern, int patternLen,
     return 0;
 }
 
+static int stringmatch(const char *pattern, const char *string, int nocase) {
+    return stringmatchlen(pattern,strlen(pattern),string,strlen(string),nocase);
+}
+
 static void redisLog(int level, const char *fmt, ...) {
     va_list ap;
     FILE *fp;
@@ -1102,6 +1113,8 @@ static dictType keylistDictType = {
     dictListDestructor          /* val destructor */
 };
 
+static void version();
+
 /* ========================= Random utility functions ======================= */
 
 /* Redis generally does not try to recover from out of memory conditions
@@ -1269,7 +1282,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
         size = dictSlots(server.db[j].dict);
         used = dictSize(server.db[j].dict);
         vkeys = dictSize(server.db[j].expires);
-        if (!(loops % 5) && (used || vkeys)) {
+        if (!(loops % 50) && (used || vkeys)) {
             redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
             /* dictPrintStats(server.dict); */
         }
@@ -1281,10 +1294,10 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
      * if we resize the HT while there is the saving child at work actually
      * a lot of memory movements in the parent will cause a lot of pages
      * copied. */
-    if (server.bgsavechildpid == -1) tryResizeHashTables();
+    if (server.bgsavechildpid == -1 && !(loops % 10)) tryResizeHashTables();
 
     /* Show information about connected clients */
-    if (!(loops % 5)) {
+    if (!(loops % 50)) {
         redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
             listLength(server.clients)-listLength(server.slaves),
             listLength(server.slaves),
@@ -1293,7 +1306,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
     }
 
     /* Close connections of timedout clients */
-    if ((server.maxidletime && !(loops % 10)) || server.blpop_blocked_clients)
+    if ((server.maxidletime && !(loops % 100)) || server.blpop_blocked_clients)
         closeTimedoutClients();
 
     /* Check if a background saving or AOF rewrite in progress terminated */
@@ -1351,6 +1364,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
                 if (now > t) {
                     deleteKey(db,dictGetEntryKey(de));
                     expired++;
+                    server.stat_expiredkeys++;
                 }
             }
         } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
@@ -1368,7 +1382,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
             retval = (server.vm_max_threads == 0) ?
                         vmSwapOneObjectBlocking() :
                         vmSwapOneObjectThreaded();
-            if (retval == REDIS_ERR && (loops % 30) == 0 &&
+            if (retval == REDIS_ERR && !(loops % 300) &&
                 zmalloc_used_memory() >
                 (server.vm_max_memory+server.vm_max_memory/10))
             {
@@ -1383,13 +1397,13 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
     }
 
     /* Check if we should connect to a MASTER */
-    if (server.replstate == REDIS_REPL_CONNECT) {
+    if (server.replstate == REDIS_REPL_CONNECT && !(loops % 10)) {
         redisLog(REDIS_NOTICE,"Connecting to MASTER...");
         if (syncWithMaster() == REDIS_OK) {
             redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
         }
     }
-    return 1000;
+    return 100;
 }
 
 /* This function gets called every time Redis is entering the
@@ -1489,9 +1503,9 @@ static void initServerConfig() {
     server.lastfsync = time(NULL);
     server.appendfd = -1;
     server.appendseldb = -1; /* Make sure the first time will not match */
-    server.pidfile = "/var/run/redis.pid";
-    server.dbfilename = "dump.rdb";
-    server.appendfilename = "appendonly.aof";
+    server.pidfile = zstrdup("/var/run/redis.pid");
+    server.dbfilename = zstrdup("dump.rdb");
+    server.appendfilename = zstrdup("appendonly.aof");
     server.requirepass = NULL;
     server.shareobjects = 0;
     server.rdbcompression = 1;
@@ -1570,6 +1584,7 @@ static void initServer() {
     server.dirty = 0;
     server.stat_numcommands = 0;
     server.stat_numconnections = 0;
+    server.stat_expiredkeys = 0;
     server.stat_starttime = time(NULL);
     server.unixtime = time(NULL);
     aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
@@ -1704,6 +1719,8 @@ static void loadServerConfig(char *filename) {
             if (server.dbnum < 1) {
                 err = "Invalid number of databases"; goto loaderr;
             }
+        } else if (!strcasecmp(argv[0],"include") && argc == 2) {
+            loadServerConfig(argv[1]);
         } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
             server.maxclients = atoi(argv[1]);
         } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
@@ -1753,8 +1770,10 @@ static void loadServerConfig(char *filename) {
         } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
             server.requirepass = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
+            zfree(server.pidfile);
             server.pidfile = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
+            zfree(server.dbfilename);
             server.dbfilename = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
             if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
@@ -2058,9 +2077,9 @@ static void call(redisClient *c, struct redisCommand *cmd) {
     if (server.appendonly && server.dirty-dirty)
         feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
     if (server.dirty-dirty && listLength(server.slaves))
-        replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
+        replicationFeedSlaves(server.slaves,c->db->id,c->argv,c->argc);
     if (listLength(server.monitors))
-        replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
+        replicationFeedSlaves(server.monitors,c->db->id,c->argv,c->argc);
     server.stat_numcommands++;
 }
 
@@ -2170,10 +2189,6 @@ static int processCommand(redisClient *c) {
                 cmd->name));
         resetClient(c);
         return 1;
-    } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
-        addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
-        resetClient(c);
-        return 1;
     } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
         /* This is a bulk command, we have to read the last argument yet. */
         int bulklen = atoi(c->argv[c->argc-1]->ptr);
@@ -2219,6 +2234,15 @@ static int processCommand(redisClient *c) {
         return 1;
     }
 
+    /* Handle the maxmemory directive */
+    if (server.maxmemory && (cmd->flags & REDIS_CMD_DENYOOM) &&
+        zmalloc_used_memory() > server.maxmemory)
+    {
+        addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
+        resetClient(c);
+        return 1;
+    }
+
     /* Exec the command */
     if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) {
         queueMultiCommand(c,cmd);
@@ -2234,34 +2258,36 @@ static int processCommand(redisClient *c) {
     return 1;
 }
 
-static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
+static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
     listNode *ln;
     listIter li;
     int outc = 0, j;
     robj **outv;
-    /* (args*2)+1 is enough room for args, spaces, newlines */
-    robj *static_outv[REDIS_STATIC_ARGS*2+1];
+    /* We need 1+(ARGS*3) objects since commands are using the new protocol
+     * and we one 1 object for the first "*<count>\r\n" multibulk count, then
+     * for every additional object we have "$<count>\r\n" + object + "\r\n". */
+    robj *static_outv[REDIS_STATIC_ARGS*3+1];
+    robj *lenobj;
 
     if (argc <= REDIS_STATIC_ARGS) {
         outv = static_outv;
     } else {
-        outv = zmalloc(sizeof(robj*)*(argc*2+1));
+        outv = zmalloc(sizeof(robj*)*(argc*3+1));
     }
-    
-    for (j = 0; j < argc; j++) {
-        if (j != 0) outv[outc++] = shared.space;
-        if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
-            robj *lenobj;
 
-            lenobj = createObject(REDIS_STRING,
-                sdscatprintf(sdsempty(),"%lu\r\n",
-                    (unsigned long) stringObjectLen(argv[j])));
-            lenobj->refcount = 0;
-            outv[outc++] = lenobj;
-        }
+    lenobj = createObject(REDIS_STRING,
+            sdscatprintf(sdsempty(), "*%d\r\n", argc));
+    lenobj->refcount = 0;
+    outv[outc++] = lenobj;
+    for (j = 0; j < argc; j++) {
+        lenobj = createObject(REDIS_STRING,
+            sdscatprintf(sdsempty(),"$%lu\r\n",
+                (unsigned long) stringObjectLen(argv[j])));
+        lenobj->refcount = 0;
+        outv[outc++] = lenobj;
         outv[outc++] = argv[j];
+        outv[outc++] = shared.crlf;
     }
-    outv[outc++] = shared.crlf;
 
     /* Increment all the refcounts at start and decrement at end in order to
      * be sure to free objects if there is no slave in a replication state
@@ -2413,8 +2439,7 @@ static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mas
     } else {
         return;
     }
-    if (!(c->flags & REDIS_BLOCKED))
-        processInputBuffer(c);
+    processInputBuffer(c);
 }
 
 static int selectDb(redisClient *c, int id) {
@@ -2551,6 +2576,17 @@ static void addReplyBulk(redisClient *c, robj *obj) {
     addReply(c,shared.crlf);
 }
 
+/* In the CONFIG command we need to add vanilla C string as bulk replies */
+static void addReplyBulkCString(redisClient *c, char *s) {
+    if (s == NULL) {
+        addReply(c,shared.nullbulk);
+    } else {
+        robj *o = createStringObject(s,strlen(s));
+        addReplyBulk(c,o);
+        decrRefCount(o);
+    }
+}
+
 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     int cport, cfd;
     char cip[128];
@@ -4367,6 +4403,7 @@ static void popGenericCommand(redisClient *c, int where) {
         robj *ele = listNodeValue(ln);
         addReplyBulk(c,ele);
         listDelNode(list,ln);
+        if (listLength(list) == 0) deleteKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
@@ -4459,6 +4496,7 @@ static void ltrimCommand(redisClient *c) {
         ln = listLast(list);
         listDelNode(list,ln);
     }
+    if (listLength(list) == 0) deleteKey(c->db,c->argv[1]);
     server.dirty++;
     addReply(c,shared.ok);
 }
@@ -4492,6 +4530,7 @@ static void lremCommand(redisClient *c) {
         }
         ln = next;
     }
+    if (listLength(list) == 0) deleteKey(c->db,c->argv[1]);
     addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
 }
 
@@ -4551,6 +4590,7 @@ static void rpoplpushcommand(redisClient *c) {
 
         /* Finally remove the element from the source list */
         listDelNode(srclist,ln);
+        if (listLength(srclist) == 0) deleteKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
@@ -4589,6 +4629,7 @@ static void sremCommand(redisClient *c) {
     if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
         server.dirty++;
         if (htNeedsResize(set->ptr)) dictResize(set->ptr);
+        if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]);
         addReply(c,shared.cone);
     } else {
         addReply(c,shared.czero);
@@ -4618,6 +4659,8 @@ static void smoveCommand(redisClient *c) {
         addReply(c,shared.czero);
         return;
     }
+    if (dictSize((dict*)srcset->ptr) == 0 && srcset != dstset)
+        deleteKey(c->db,c->argv[1]);
     server.dirty++;
     /* Add the element to the destination set */
     if (!dstset) {
@@ -4669,6 +4712,7 @@ static void spopCommand(redisClient *c) {
         addReplyBulk(c,ele);
         dictDelete(set->ptr,ele);
         if (htNeedsResize(set->ptr)) dictResize(set->ptr);
+        if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]);
         server.dirty++;
     }
 }
@@ -4770,18 +4814,20 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long
     dictReleaseIterator(di);
 
     if (dstkey) {
-        /* Store the resulting set into the target */
+        /* Store the resulting set into the target, if the intersection
+         * is not an empty set. */
         deleteKey(c->db,dstkey);
-        dictAdd(c->db->dict,dstkey,dstset);
-        incrRefCount(dstkey);
-    }
-
-    if (!dstkey) {
-        lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
-    } else {
-        addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
-            dictSize((dict*)dstset->ptr)));
+        if (dictSize((dict*)dstset->ptr) > 0) {
+            dictAdd(c->db->dict,dstkey,dstset);
+            incrRefCount(dstkey);
+            addReplyLong(c,dictSize((dict*)dstset->ptr));
+        } else {
+            decrRefCount(dstset);
+            addReply(c,shared.czero);
+        }
         server.dirty++;
+    } else {
+        lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
     }
     zfree(dv);
 }
@@ -4854,7 +4900,8 @@ static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnu
         }
         dictReleaseIterator(di);
 
-        if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
+        /* result set is empty? Exit asap. */
+        if (op == REDIS_OP_DIFF && cardinality == 0) break;
     }
 
     /* Output the content of the resulting set, if not in STORE mode */
@@ -4868,20 +4915,19 @@ static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnu
             addReplyBulk(c,ele);
         }
         dictReleaseIterator(di);
+        decrRefCount(dstset);
     } else {
         /* If we have a target key where to store the resulting set
          * create this key with the result set inside */
         deleteKey(c->db,dstkey);
-        dictAdd(c->db->dict,dstkey,dstset);
-        incrRefCount(dstkey);
-    }
-
-    /* Cleanup */
-    if (!dstkey) {
-        decrRefCount(dstset);
-    } else {
-        addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
-            dictSize((dict*)dstset->ptr)));
+        if (dictSize((dict*)dstset->ptr) > 0) {
+            dictAdd(c->db->dict,dstkey,dstset);
+            incrRefCount(dstkey);
+            addReplyLong(c,dictSize((dict*)dstset->ptr));
+        } else {
+            decrRefCount(dstset);
+            addReply(c,shared.czero);
+        }
         server.dirty++;
     }
     zfree(dv);
@@ -5333,6 +5379,7 @@ static void zremCommand(redisClient *c) {
     /* Delete from the hash table */
     dictDelete(zs->dict,c->argv[2]);
     if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+    if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]);
     server.dirty++;
     addReply(c,shared.cone);
 }
@@ -5350,6 +5397,7 @@ static void zremrangebyscoreCommand(redisClient *c) {
     zs = zsetobj->ptr;
     deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict);
     if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+    if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]);
     server.dirty += deleted;
     addReplyLong(c,deleted);
 }
@@ -5384,6 +5432,7 @@ static void zremrangebyrankCommand(redisClient *c) {
      * use 1-based rank */
     deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
     if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+    if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]);
     server.dirty += deleted;
     addReplyLong(c, deleted);
 }
@@ -5567,11 +5616,15 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
     }
 
     deleteKey(c->db,dstkey);
-    dictAdd(c->db->dict,dstkey,dstobj);
-    incrRefCount(dstkey);
-
-    addReplyLong(c, dstzset->zsl->length);
-    server.dirty++;
+    if (dstzset->zsl->length) {
+        dictAdd(c->db->dict,dstkey,dstobj);
+        incrRefCount(dstkey);
+        addReplyLong(c, dstzset->zsl->length);
+        server.dirty++;
+    } else {
+        decrRefCount(dstzset);
+        addReply(c, shared.czero);
+    }
     zfree(src);
 }
 
@@ -5884,10 +5937,8 @@ static void hsetCommand(redisClient *c) {
         decrRefCount(valobj);
         o->ptr = zm;
 
-        /* And here there is the second check for hash conversion...
-         * we want to do it only if the operation was not just an update as
-         * zipmapLen() is O(N). */
-        if (!update && zipmapLen(zm) > server.hash_max_zipmap_entries)
+        /* And here there is the second check for hash conversion. */
+        if (zipmapLen(zm) > server.hash_max_zipmap_entries)
             convertToRealHash(o);
     } else {
         tryObjectEncoding(c->argv[2]);
@@ -5904,6 +5955,78 @@ static void hsetCommand(redisClient *c) {
     addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",update == 0));
 }
 
+static void hincrbyCommand(redisClient *c) {
+    long long value = 0, incr = 0;
+    robj *o = lookupKeyWrite(c->db,c->argv[1]);
+
+    if (o == NULL) {
+        o = createHashObject();
+        dictAdd(c->db->dict,c->argv[1],o);
+        incrRefCount(c->argv[1]);
+    } else {
+        if (o->type != REDIS_HASH) {
+            addReply(c,shared.wrongtypeerr);
+            return;
+        }
+    }
+
+    robj *o_incr = getDecodedObject(c->argv[3]);
+    incr = strtoll(o_incr->ptr, NULL, 10);
+    decrRefCount(o_incr);
+
+    if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+        unsigned char *zm = o->ptr;
+        unsigned char *zval;
+        unsigned int zvlen;
+
+        /* Find value if already present in hash */
+        if (zipmapGet(zm,c->argv[2]->ptr,sdslen(c->argv[2]->ptr),
+            &zval,&zvlen)) {
+            /* strtoll needs the char* to have a trailing \0, but
+             * the zipmap doesn't include them. */
+            sds szval = sdsnewlen(zval, zvlen);
+            value = strtoll(szval,NULL,10);
+            sdsfree(szval);
+        }
+
+        value += incr;
+        sds svalue = sdscatprintf(sdsempty(),"%lld",value);
+        zm = zipmapSet(zm,c->argv[2]->ptr,sdslen(c->argv[2]->ptr),
+            (unsigned char*)svalue,sdslen(svalue),NULL);
+        sdsfree(svalue);
+        o->ptr = zm;
+
+        /* Check if the zipmap needs to be converted. */
+        if (zipmapLen(zm) > server.hash_max_zipmap_entries)
+            convertToRealHash(o);
+    } else {
+        robj *hval;
+        dictEntry *de;
+
+        /* Find value if already present in hash */
+        de = dictFind(o->ptr,c->argv[2]);
+        if (de != NULL) {
+            hval = dictGetEntryVal(de);
+            if (hval->encoding == REDIS_ENCODING_RAW)
+                value = strtoll(hval->ptr,NULL,10);
+            else if (hval->encoding == REDIS_ENCODING_INT)
+                value = (long)hval->ptr;
+            else
+                redisAssert(1 != 1);
+        }
+
+        value += incr;
+        hval = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
+        tryObjectEncoding(hval);
+        if (dictReplace(o->ptr,c->argv[2],hval)) {
+            incrRefCount(c->argv[2]);
+        }
+    }
+
+    server.dirty++;
+    addReplyLong(c, value);
+}
+
 static void hgetCommand(redisClient *c) {
     robj *o;
 
@@ -5956,8 +6079,12 @@ static void hdelCommand(redisClient *c) {
             (unsigned char*) field->ptr,
             sdslen(field->ptr), &deleted);
         decrRefCount(field);
+        if (zipmapLen((unsigned char*) o->ptr) == 0)
+            deleteKey(c->db,c->argv[1]);
     } else {
         deleted = dictDelete((dict*)o->ptr,c->argv[2]) == DICT_OK;
+        if (htNeedsResize(o->ptr)) dictResize(o->ptr);
+        if (dictSize((dict*)o->ptr) == 0) deleteKey(c->db,c->argv[1]);
     }
     if (deleted) server.dirty++;
     addReply(c,deleted ? shared.cone : shared.czero);
@@ -6096,6 +6223,10 @@ static void flushdbCommand(redisClient *c) {
 static void flushallCommand(redisClient *c) {
     server.dirty += emptyDb();
     addReply(c,shared.ok);
+    if (server.bgsavechildpid != -1) {
+        kill(server.bgsavechildpid,SIGKILL);
+        rdbRemoveTempFile(server.bgsavechildpid);
+    }
     rdbSave(server.dbfilename);
     server.dirty++;
 }
@@ -6487,9 +6618,6 @@ static sds genRedisInfoString(void) {
     int j;
     char hmem[64];
 
-    server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
-    server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
-  
     bytesToHuman(hmem,zmalloc_used_memory());
     info = sdscatprintf(sdsempty(),
         "redis_version:%s\r\n"
@@ -6509,6 +6637,7 @@ static sds genRedisInfoString(void) {
         "bgrewriteaof_in_progress:%d\r\n"
         "total_connections_received:%lld\r\n"
         "total_commands_processed:%lld\r\n"
+        "expired_keys:%lld\r\n"
         "hash_max_zipmap_entries:%ld\r\n"
         "hash_max_zipmap_value:%ld\r\n"
         "vm_enabled:%d\r\n"
@@ -6530,6 +6659,7 @@ static sds genRedisInfoString(void) {
         server.bgrewritechildpid != -1,
         server.stat_numconnections,
         server.stat_numcommands,
+        server.stat_expiredkeys,
         server.hash_max_zipmap_entries,
         server.hash_max_zipmap_value,
         server.vm_enabled != 0,
@@ -6653,6 +6783,7 @@ static int expireIfNeeded(redisDb *db, robj *key) {
 
     /* Delete the key */
     dictDelete(db->expires,key);
+    server.stat_expiredkeys++;
     return dictDelete(db->dict,key) == DICT_OK;
 }
 
@@ -6665,6 +6796,7 @@ static int deleteIfVolatile(redisDb *db, robj *key) {
 
     /* Delete the key */
     server.dirty++;
+    server.stat_expiredkeys++;
     dictDelete(db->expires,key);
     return dictDelete(db->dict,key) == DICT_OK;
 }
@@ -8321,6 +8453,38 @@ static double computeObjectSwappability(robj *o) {
             if (z) asize += sizeof(zskiplistNode)*dictSize(d);
         }
         break;
+    case REDIS_HASH:
+        if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+            unsigned char *p = zipmapRewind((unsigned char*)o->ptr);
+            unsigned int len = zipmapLen((unsigned char*)o->ptr);
+            unsigned int klen, vlen;
+            unsigned char *key, *val;
+
+            if ((p = zipmapNext(p,&key,&klen,&val,&vlen)) == NULL) {
+                klen = 0;
+                vlen = 0;
+            }
+            asize = len*(klen+vlen+3);
+        } else if (o->encoding == REDIS_ENCODING_HT) {
+            d = o->ptr;
+            asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
+            if (dictSize(d)) {
+                long elesize;
+                robj *ele;
+
+                de = dictGetRandomKey(d);
+                ele = dictGetEntryKey(de);
+                elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
+                                (sizeof(*o)+sdslen(ele->ptr)) :
+                                sizeof(*o);
+                ele = dictGetEntryVal(de);
+                elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
+                                (sizeof(*o)+sdslen(ele->ptr)) :
+                                sizeof(*o);
+                asize += (sizeof(struct dictEntry)+elesize)*dictSize(d);
+            }
+        }
+        break;
     }
     return (double)age*log(1+asize);
 }
@@ -8723,13 +8887,18 @@ static void *IOThreadEntryPoint(void *arg) {
 static void spawnIOThread(void) {
     pthread_t thread;
     sigset_t mask, omask;
+    int err;
 
     sigemptyset(&mask);
     sigaddset(&mask,SIGCHLD);
     sigaddset(&mask,SIGHUP);
     sigaddset(&mask,SIGPIPE);
     pthread_sigmask(SIG_SETMASK, &mask, &omask);
-    pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL);
+    while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) {
+        redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s",
+            strerror(err));
+        usleep(1000000);
+    }
     pthread_sigmask(SIG_SETMASK, &omask, NULL);
     server.io_active_threads++;
 }
@@ -8975,6 +9144,94 @@ static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) {
     }
 }
 
+/* =========================== Remote Configuration ========================= */
+
+static void configSetCommand(redisClient *c) {
+    robj *o = getDecodedObject(c->argv[3]);
+    if (!strcasecmp(c->argv[2]->ptr,"dbfilename")) {
+        zfree(server.dbfilename);
+        server.dbfilename = zstrdup(o->ptr);
+    } else if (!strcasecmp(c->argv[2]->ptr,"requirepass")) {
+        zfree(server.requirepass);
+        server.requirepass = zstrdup(o->ptr);
+    } else if (!strcasecmp(c->argv[2]->ptr,"masterauth")) {
+        zfree(server.masterauth);
+        server.masterauth = zstrdup(o->ptr);
+    } else if (!strcasecmp(c->argv[2]->ptr,"maxmemory")) {
+        server.maxmemory = strtoll(o->ptr, NULL, 10);
+    } else {
+        addReplySds(c,sdscatprintf(sdsempty(),
+            "-ERR not supported CONFIG parameter %s\r\n",
+            (char*)c->argv[2]->ptr));
+        decrRefCount(o);
+        return;
+    }
+    decrRefCount(o);
+    addReply(c,shared.ok);
+}
+
+static void configGetCommand(redisClient *c) {
+    robj *o = getDecodedObject(c->argv[2]);
+    robj *lenobj = createObject(REDIS_STRING,NULL);
+    char *pattern = o->ptr;
+    int matches = 0;
+
+    addReply(c,lenobj);
+    decrRefCount(lenobj);
+
+    if (stringmatch(pattern,"dbfilename",0)) {
+        addReplyBulkCString(c,"dbfilename");
+        addReplyBulkCString(c,server.dbfilename);
+        matches++;
+    }
+    if (stringmatch(pattern,"requirepass",0)) {
+        addReplyBulkCString(c,"requirepass");
+        addReplyBulkCString(c,server.requirepass);
+        matches++;
+    }
+    if (stringmatch(pattern,"masterauth",0)) {
+        addReplyBulkCString(c,"masterauth");
+        addReplyBulkCString(c,server.masterauth);
+        matches++;
+    }
+    if (stringmatch(pattern,"maxmemory",0)) {
+        char buf[128];
+
+        snprintf(buf,128,"%llu\n",server.maxmemory);
+        addReplyBulkCString(c,"maxmemory");
+        addReplyBulkCString(c,buf);
+        matches++;
+    }
+    decrRefCount(o);
+    lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",matches*2);
+}
+
+static void configCommand(redisClient *c) {
+    if (!strcasecmp(c->argv[1]->ptr,"set")) {
+        if (c->argc != 4) goto badarity;
+        configSetCommand(c);
+    } else if (!strcasecmp(c->argv[1]->ptr,"get")) {
+        if (c->argc != 3) goto badarity;
+        configGetCommand(c);
+    } else if (!strcasecmp(c->argv[1]->ptr,"resetstat")) {
+        if (c->argc != 2) goto badarity;
+        server.stat_numcommands = 0;
+        server.stat_numconnections = 0;
+        server.stat_expiredkeys = 0;
+        server.stat_starttime = time(NULL);
+        addReply(c,shared.ok);
+    } else {
+        addReplySds(c,sdscatprintf(sdsempty(),
+            "-ERR CONFIG subcommand must be one of GET, SET, RESETSTAT\r\n"));
+    }
+    return;
+
+badarity:
+    addReplySds(c,sdscatprintf(sdsempty(),
+        "-ERR Wrong number of arguments for CONFIG %s\r\n",
+        (char*) c->argv[1]->ptr));
+}
+
 /* ================================= Debugging ============================== */
 
 static void debugCommand(redisClient *c) {
@@ -9125,16 +9382,29 @@ static void daemonize(void) {
     }
 }
 
+static void version() {
+    printf("Redis server version %s\n", REDIS_VERSION);
+    exit(0);
+}
+
+static void usage() {
+    fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
+    fprintf(stderr,"       ./redis-server - (read config from stdin)\n");
+    exit(1);
+}
+
 int main(int argc, char **argv) {
     time_t start;
 
     initServerConfig();
     if (argc == 2) {
+        if (strcmp(argv[1], "-v") == 0 ||
+            strcmp(argv[1], "--version") == 0) version();
+        if (strcmp(argv[1], "--help") == 0) usage();
         resetServerSaveParams();
         loadServerConfig(argv[1]);
-    } else if (argc > 2) {
-        fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
-        exit(1);
+    } else if ((argc > 2)) {
+        usage();
     } else {
         redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
     }