]> git.saurik.com Git - redis.git/blobdiff - redis.c
sorted sets saving fixed
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index ba7fb6acba18ed3443e32fa5ec9eaeda70604edf..9221dc0d2ffb3b2bdf8624bc82b31c79e5bc5290 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -27,7 +27,7 @@
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
-#define REDIS_VERSION "1.050"
+#define REDIS_VERSION "1.06"
 
 #include "fmacros.h"
 #include "config.h"
 
 #include "fmacros.h"
 #include "config.h"
@@ -56,6 +56,7 @@
 #include <fcntl.h>
 #include <sys/time.h>
 #include <sys/resource.h>
 #include <fcntl.h>
 #include <sys/time.h>
 #include <sys/resource.h>
+#include <sys/uio.h>
 #include <limits.h>
 #include <math.h>
 
 #include <limits.h>
 #include <math.h>
 
 #define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */
 #define REDIS_EXPIRELOOKUPS_PER_CRON    100 /* try to expire 100 keys/second */
 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
 #define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */
 #define REDIS_EXPIRELOOKUPS_PER_CRON    100 /* try to expire 100 keys/second */
 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
-#define REDIS_REQUEST_MAX_SIZE  (1024*1024*256) /* max bytes in inline command */
+#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
+
+/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
+#define REDIS_WRITEV_THRESHOLD      3
+/* Max number of iovecs used for each writev call */
+#define REDIS_WRITEV_IOVEC_COUNT    256
 
 /* Hash table parameters */
 #define REDIS_HT_MINFILL        10      /* Minimal hash table fill 10% */
 
 /* Hash table parameters */
 #define REDIS_HT_MINFILL        10      /* Minimal hash table fill 10% */
 
 /* Sort operations */
 #define REDIS_SORT_GET 0
 
 /* Sort operations */
 #define REDIS_SORT_GET 0
-#define REDIS_SORT_DEL 1
-#define REDIS_SORT_INCR 2
-#define REDIS_SORT_DECR 3
-#define REDIS_SORT_ASC 4
-#define REDIS_SORT_DESC 5
+#define REDIS_SORT_ASC 1
+#define REDIS_SORT_DESC 2
 #define REDIS_SORTKEY_MAX 1024
 
 /* Log levels */
 #define REDIS_SORTKEY_MAX 1024
 
 /* Log levels */
 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
 #define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */
 
 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
 #define ZSKIPLIST_P 0.25      /* Skiplist P = 1/4 */
 
+/* Append only defines */
+#define APPENDFSYNC_NO 0
+#define APPENDFSYNC_ALWAYS 1
+#define APPENDFSYNC_EVERYSEC 2
+
 /*================================= Data types ============================== */
 
 /* A redis object, that is a type able to hold a string / list / set */
 /*================================= Data types ============================== */
 
 /* A redis object, that is a type able to hold a string / list / set */
@@ -262,6 +270,8 @@ struct redisServer {
     int dbnum;
     int daemonize;
     int appendonly;
     int dbnum;
     int daemonize;
     int appendonly;
+    int appendfsync;
+    time_t lastfsync;
     int appendfd;
     int appendseldb;
     char *pidfile;
     int appendfd;
     int appendseldb;
     char *pidfile;
@@ -277,6 +287,7 @@ struct redisServer {
     int shareobjects;
     /* Replication related */
     int isslave;
     int shareobjects;
     /* Replication related */
     int isslave;
+    char *masterauth;
     char *masterhost;
     int masterport;
     redisClient *master;    /* client that is master for this slave */
     char *masterhost;
     int masterport;
     redisClient *master;    /* client that is master for this slave */
@@ -389,6 +400,7 @@ static void processInputBuffer(redisClient *c);
 static zskiplist *zslCreate(void);
 static void zslFree(zskiplist *zsl);
 static void zslInsert(zskiplist *zsl, double score, robj *obj);
 static zskiplist *zslCreate(void);
 static void zslFree(zskiplist *zsl);
 static void zslInsert(zskiplist *zsl, double score, robj *obj);
+static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
@@ -441,6 +453,7 @@ static void flushdbCommand(redisClient *c);
 static void flushallCommand(redisClient *c);
 static void sortCommand(redisClient *c);
 static void lremCommand(redisClient *c);
 static void flushallCommand(redisClient *c);
 static void sortCommand(redisClient *c);
 static void lremCommand(redisClient *c);
+static void rpoplpushcommand(redisClient *c);
 static void infoCommand(redisClient *c);
 static void mgetCommand(redisClient *c);
 static void monitorCommand(redisClient *c);
 static void infoCommand(redisClient *c);
 static void mgetCommand(redisClient *c);
 static void monitorCommand(redisClient *c);
@@ -453,6 +466,7 @@ static void debugCommand(redisClient *c);
 static void msetCommand(redisClient *c);
 static void msetnxCommand(redisClient *c);
 static void zaddCommand(redisClient *c);
 static void msetCommand(redisClient *c);
 static void msetnxCommand(redisClient *c);
 static void zaddCommand(redisClient *c);
+static void zincrbyCommand(redisClient *c);
 static void zrangeCommand(redisClient *c);
 static void zrangebyscoreCommand(redisClient *c);
 static void zrevrangeCommand(redisClient *c);
 static void zrangeCommand(redisClient *c);
 static void zrangebyscoreCommand(redisClient *c);
 static void zrevrangeCommand(redisClient *c);
@@ -484,6 +498,7 @@ static struct redisCommand cmdTable[] = {
     {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
     {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
     {"lrem",lremCommand,4,REDIS_CMD_BULK},
     {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
     {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
     {"lrem",lremCommand,4,REDIS_CMD_BULK},
+    {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
     {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"srem",sremCommand,3,REDIS_CMD_BULK},
     {"smove",smoveCommand,4,REDIS_CMD_BULK},
     {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"srem",sremCommand,3,REDIS_CMD_BULK},
     {"smove",smoveCommand,4,REDIS_CMD_BULK},
@@ -499,6 +514,7 @@ static struct redisCommand cmdTable[] = {
     {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
     {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
     {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
     {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
     {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
+    {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
     {"zrem",zremCommand,3,REDIS_CMD_BULK},
     {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
     {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
     {"zrem",zremCommand,3,REDIS_CMD_BULK},
     {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
     {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
@@ -539,6 +555,7 @@ static struct redisCommand cmdTable[] = {
     {"debug",debugCommand,-2,REDIS_CMD_INLINE},
     {NULL,NULL,0,0}
 };
     {"debug",debugCommand,-2,REDIS_CMD_INLINE},
     {NULL,NULL,0,0}
 };
+
 /*============================ Utility functions ============================ */
 
 /* Glob-style pattern matching. */
 /*============================ Utility functions ============================ */
 
 /* Glob-style pattern matching. */
@@ -898,7 +915,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
     /* Check if a background saving in progress terminated */
     if (server.bgsaveinprogress) {
         int statloc;
     /* Check if a background saving in progress terminated */
     if (server.bgsaveinprogress) {
         int statloc;
-        if (wait4(-1,&statloc,WNOHANG,NULL)) {
+        if (wait3(&statloc,WNOHANG,NULL)) {
             int exitcode = WEXITSTATUS(statloc);
             int bysignal = WIFSIGNALED(statloc);
 
             int exitcode = WEXITSTATUS(statloc);
             int bysignal = WIFSIGNALED(statloc);
 
@@ -935,14 +952,21 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
          }
     }
 
          }
     }
 
-    /* Try to expire a few timed out keys */
+    /* Try to expire a few timed out keys. The algorithm used is adaptive and
+     * will use few CPU cycles if there are few expiring keys, otherwise
+     * it will get more aggressive to avoid that too much memory is used by
+     * keys that can be removed from the keyspace. */
     for (j = 0; j < server.dbnum; j++) {
     for (j = 0; j < server.dbnum; j++) {
+        int expired;
         redisDb *db = server.db+j;
         redisDb *db = server.db+j;
-        int num = dictSize(db->expires);
 
 
-        if (num) {
+        /* Continue to expire if at the end of the cycle more than 25%
+         * of the keys were expired. */
+        do {
+            int num = dictSize(db->expires);
             time_t now = time(NULL);
 
             time_t now = time(NULL);
 
+            expired = 0;
             if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
                 num = REDIS_EXPIRELOOKUPS_PER_CRON;
             while (num--) {
             if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
                 num = REDIS_EXPIRELOOKUPS_PER_CRON;
             while (num--) {
@@ -953,9 +977,10 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
                 t = (time_t) dictGetEntryVal(de);
                 if (now > t) {
                     deleteKey(db,dictGetEntryKey(de));
                 t = (time_t) dictGetEntryVal(de);
                 if (now > t) {
                     deleteKey(db,dictGetEntryKey(de));
+                    expired++;
                 }
             }
                 }
             }
-        }
+        } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
     }
 
     /* Check if we should connect to a MASTER */
     }
 
     /* Check if we should connect to a MASTER */
@@ -1012,7 +1037,7 @@ static void appendServerSaveParams(time_t seconds, int changes) {
     server.saveparamslen++;
 }
 
     server.saveparamslen++;
 }
 
-static void ResetServerSaveParams() {
+static void resetServerSaveParams() {
     zfree(server.saveparams);
     server.saveparams = NULL;
     server.saveparamslen = 0;
     zfree(server.saveparams);
     server.saveparams = NULL;
     server.saveparamslen = 0;
@@ -1029,6 +1054,8 @@ static void initServerConfig() {
     server.glueoutputbuf = 1;
     server.daemonize = 0;
     server.appendonly = 0;
     server.glueoutputbuf = 1;
     server.daemonize = 0;
     server.appendonly = 0;
+    server.appendfsync = APPENDFSYNC_ALWAYS;
+    server.lastfsync = time(NULL);
     server.appendfd = -1;
     server.appendseldb = -1; /* Make sure the first time will not match */
     server.pidfile = "/var/run/redis.pid";
     server.appendfd = -1;
     server.appendseldb = -1; /* Make sure the first time will not match */
     server.pidfile = "/var/run/redis.pid";
@@ -1039,13 +1066,14 @@ static void initServerConfig() {
     server.sharingpoolsize = 1024;
     server.maxclients = 0;
     server.maxmemory = 0;
     server.sharingpoolsize = 1024;
     server.maxclients = 0;
     server.maxmemory = 0;
-    ResetServerSaveParams();
+    resetServerSaveParams();
 
     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
     /* Replication related */
     server.isslave = 0;
 
     appendServerSaveParams(60*60,1);  /* save after 1 hour and 1 change */
     appendServerSaveParams(300,100);  /* save after 5 minutes and 100 changes */
     appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
     /* Replication related */
     server.isslave = 0;
+    server.masterauth = NULL;
     server.masterhost = NULL;
     server.masterport = 6379;
     server.master = NULL;
     server.masterhost = NULL;
     server.masterport = 6379;
     server.master = NULL;
@@ -1092,10 +1120,10 @@ static void initServer() {
     server.stat_numcommands = 0;
     server.stat_numconnections = 0;
     server.stat_starttime = time(NULL);
     server.stat_numcommands = 0;
     server.stat_numconnections = 0;
     server.stat_starttime = time(NULL);
-    aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
+    aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
 
     if (server.appendonly) {
 
     if (server.appendonly) {
-        server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT);
+        server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
         if (server.appendfd == -1) {
             redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
                 strerror(errno));
         if (server.appendfd == -1) {
             redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
                 strerror(errno));
@@ -1224,6 +1252,8 @@ static void loadServerConfig(char *filename) {
             server.masterhost = sdsnew(argv[1]);
             server.masterport = atoi(argv[2]);
             server.replstate = REDIS_REPL_CONNECT;
             server.masterhost = sdsnew(argv[1]);
             server.masterport = atoi(argv[2]);
             server.replstate = REDIS_REPL_CONNECT;
+        } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
+               server.masterauth = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
             if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
         } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
             if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
@@ -1245,6 +1275,17 @@ static void loadServerConfig(char *filename) {
             if ((server.appendonly = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
             }
             if ((server.appendonly = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
             }
+        } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
+            if (!strcasecmp(argv[1],"no")) {
+                server.appendfsync = APPENDFSYNC_NO;
+            } else if (!strcasecmp(argv[1],"always")) {
+                server.appendfsync = APPENDFSYNC_ALWAYS;
+            } else if (!strcasecmp(argv[1],"everysec")) {
+                server.appendfsync = APPENDFSYNC_EVERYSEC;
+            } else {
+                err = "argument must be 'no', 'always' or 'everysec'";
+                goto loaderr;
+            }
         } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
           server.requirepass = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
         } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
           server.requirepass = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
@@ -1310,34 +1351,31 @@ static void freeClient(redisClient *c) {
     zfree(c);
 }
 
     zfree(c);
 }
 
+#define GLUEREPLY_UP_TO (1024)
 static void glueReplyBuffersIfNeeded(redisClient *c) {
 static void glueReplyBuffersIfNeeded(redisClient *c) {
-    int totlen = 0;
+    int copylen = 0;
+    char buf[GLUEREPLY_UP_TO];
     listNode *ln;
     robj *o;
 
     listRewind(c->reply);
     while((ln = listYield(c->reply))) {
     listNode *ln;
     robj *o;
 
     listRewind(c->reply);
     while((ln = listYield(c->reply))) {
+        int objlen;
+
         o = ln->value;
         o = ln->value;
-        totlen += sdslen(o->ptr);
-        /* This optimization makes more sense if we don't have to copy
-         * too much data */
-        if (totlen > 1024) return;
-    }
-    if (totlen > 0) {
-        char buf[1024];
-        int copylen = 0;
-
-        listRewind(c->reply);
-        while((ln = listYield(c->reply))) {
-            o = ln->value;
-            memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
-            copylen += sdslen(o->ptr);
+        objlen = sdslen(o->ptr);
+        if (copylen + objlen <= GLUEREPLY_UP_TO) {
+            memcpy(buf+copylen,o->ptr,objlen);
+            copylen += objlen;
             listDelNode(c->reply,ln);
             listDelNode(c->reply,ln);
+        } else {
+            if (copylen == 0) return;
+            break;
         }
         }
-        /* Now the output buffer is empty, add the new single element */
-        o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
-        listAddNodeTail(c->reply,o);
     }
     }
+    /* Now the output buffer is empty, add the new single element */
+    o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
+    listAddNodeHead(c->reply,o);
 }
 
 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
 }
 
 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
@@ -1347,9 +1385,19 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(mask);
 
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(mask);
 
-    if (server.glueoutputbuf && listLength(c->reply) > 1)
-        glueReplyBuffersIfNeeded(c);
+    /* Use writev() if we have enough buffers to send */
+    if (!server.glueoutputbuf &&
+        listLength(c->reply) > REDIS_WRITEV_THRESHOLD && 
+        !(c->flags & REDIS_MASTER))
+    {
+        sendReplyToClientWritev(el, fd, privdata, mask);
+        return;
+    }
+
     while(listLength(c->reply)) {
     while(listLength(c->reply)) {
+        if (server.glueoutputbuf && listLength(c->reply) > 1)
+            glueReplyBuffersIfNeeded(c);
+
         o = listNodeValue(listFirst(c->reply));
         objlen = sdslen(o->ptr);
 
         o = listNodeValue(listFirst(c->reply));
         objlen = sdslen(o->ptr);
 
@@ -1373,10 +1421,10 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
             c->sentlen = 0;
         }
         /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
             c->sentlen = 0;
         }
         /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
-         * bytes, in a single threaded server it's a good idea to server
+         * bytes, in a single threaded server it's a good idea to serve
          * other clients as well, even if a very large request comes from
          * super fast link that is always able to accept data (in real world
          * other clients as well, even if a very large request comes from
          * super fast link that is always able to accept data (in real world
-         * terms think to 'KEYS *' against the loopback interfae) */
+         * scenario think about 'KEYS *' against the loopback interfae) */
         if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
     }
     if (nwritten == -1) {
         if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
     }
     if (nwritten == -1) {
@@ -1396,6 +1444,84 @@ static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask)
     }
 }
 
     }
 }
 
+static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
+{
+    redisClient *c = privdata;
+    int nwritten = 0, totwritten = 0, objlen, willwrite;
+    robj *o;
+    struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
+    int offset, ion = 0;
+    REDIS_NOTUSED(el);
+    REDIS_NOTUSED(mask);
+
+    listNode *node;
+    while (listLength(c->reply)) {
+        offset = c->sentlen;
+        ion = 0;
+        willwrite = 0;
+
+        /* fill-in the iov[] array */
+        for(node = listFirst(c->reply); node; node = listNextNode(node)) {
+            o = listNodeValue(node);
+            objlen = sdslen(o->ptr);
+
+            if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT) 
+                break;
+
+            if(ion == REDIS_WRITEV_IOVEC_COUNT)
+                break; /* no more iovecs */
+
+            iov[ion].iov_base = ((char*)o->ptr) + offset;
+            iov[ion].iov_len = objlen - offset;
+            willwrite += objlen - offset;
+            offset = 0; /* just for the first item */
+            ion++;
+        }
+
+        if(willwrite == 0)
+            break;
+
+        /* write all collected blocks at once */
+        if((nwritten = writev(fd, iov, ion)) < 0) {
+            if (errno != EAGAIN) {
+                redisLog(REDIS_DEBUG,
+                         "Error writing to client: %s", strerror(errno));
+                freeClient(c);
+                return;
+            }
+            break;
+        }
+
+        totwritten += nwritten;
+        offset = c->sentlen;
+
+        /* remove written robjs from c->reply */
+        while (nwritten && listLength(c->reply)) {
+            o = listNodeValue(listFirst(c->reply));
+            objlen = sdslen(o->ptr);
+
+            if(nwritten >= objlen - offset) {
+                listDelNode(c->reply, listFirst(c->reply));
+                nwritten -= objlen - offset;
+                c->sentlen = 0;
+            } else {
+                /* partial write */
+                c->sentlen += nwritten;
+                break;
+            }
+            offset = 0;
+        }
+    }
+
+    if (totwritten > 0) 
+        c->lastinteraction = time(NULL);
+
+    if (listLength(c->reply) == 0) {
+        c->sentlen = 0;
+        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
+    }
+}
+
 static struct redisCommand *lookupCommand(char *name) {
     int j = 0;
     while(cmdTable[j].name != NULL) {
 static struct redisCommand *lookupCommand(char *name) {
     int j = 0;
     while(cmdTable[j].name != NULL) {
@@ -1560,9 +1686,9 @@ static int processCommand(redisClient *c) {
     /* Exec the command */
     dirty = server.dirty;
     cmd->proc(c);
     /* Exec the command */
     dirty = server.dirty;
     cmd->proc(c);
-    if (server.appendonly != 0)
+    if (server.appendonly && server.dirty-dirty)
         feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
         feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
-    if (server.dirty-dirty != 0 && listLength(server.slaves))
+    if (server.dirty-dirty && listLength(server.slaves))
         replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
     if (listLength(server.monitors))
         replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
         replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
     if (listLength(server.monitors))
         replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
@@ -1646,55 +1772,6 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di
     if (outv != static_outv) zfree(outv);
 }
 
     if (outv != static_outv) zfree(outv);
 }
 
-/* TODO: translate EXPIREs into EXPIRETOs */
-static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
-    sds buf = sdsempty();
-    int j;
-    ssize_t nwritten;
-
-    /* The DB this command was targetting is not the same as the last command
-     * we appendend. To issue a SELECT command is needed. */
-    if (dictid != server.appendseldb) {
-        char seldb[64];
-
-        snprintf(seldb,sizeof(seldb),"%d",dictid);
-        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
-            strlen(seldb),seldb);
-        server.appendseldb = dictid;
-    }
-    /* Append the actual command */
-    buf = sdscatprintf(buf,"*%d\r\n",argc);
-    for (j = 0; j < argc; j++) {
-        robj *o = argv[j];
-
-        if (o->encoding != REDIS_ENCODING_RAW)
-            o = getDecodedObject(o);
-        buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
-        buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
-        buf = sdscatlen(buf,"\r\n",2);
-        if (o != argv[j])
-            decrRefCount(o);
-    }
-    /* We want to perform a single write. This should be guaranteed atomic
-     * at least if the filesystem we are writing is a real physical one.
-     * While this will save us against the server being killed I don't think
-     * there is much to do about the whole server stopping for power problems
-     * or alike */
-     nwritten = write(server.appendfd,buf,sdslen(buf));
-     if (nwritten != (unsigned)sdslen(buf)) {
-        /* Ooops, we are in troubles. The best thing to do for now is
-         * to simply exit instead to give the illusion that everything is
-         * working as expected. */
-         if (nwritten == -1) {
-            redisLog(REDIS_WARNING,"Aborting on error writing to the append-only file: %s",strerror(errno));
-         } else {
-            redisLog(REDIS_WARNING,"Aborting on short write while writing to the append-only file: %s",strerror(errno));
-         }
-         abort();
-     }
-     fsync(server.appendfd); /* Let's try to get this data on the disk */
-}
-
 static void processInputBuffer(redisClient *c) {
 again:
     if (c->bulklen == -1) {
 static void processInputBuffer(redisClient *c) {
 again:
     if (c->bulklen == -1) {
@@ -1835,7 +1912,7 @@ static redisClient *createClient(int fd) {
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
     if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
     if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
-        readQueryFromClient, c, NULL) == AE_ERR) {
+        readQueryFromClient, c) == AE_ERR) {
         freeClient(c);
         return NULL;
     }
         freeClient(c);
         return NULL;
     }
@@ -1848,7 +1925,7 @@ static void addReply(redisClient *c, robj *obj) {
         (c->replstate == REDIS_REPL_NONE ||
          c->replstate == REDIS_REPL_ONLINE) &&
         aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
         (c->replstate == REDIS_REPL_NONE ||
          c->replstate == REDIS_REPL_ONLINE) &&
         aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
-        sendReplyToClient, c, NULL) == AE_ERR) return;
+        sendReplyToClient, c) == AE_ERR) return;
     if (obj->encoding != REDIS_ENCODING_RAW) {
         obj = getDecodedObject(obj);
     } else {
     if (obj->encoding != REDIS_ENCODING_RAW) {
         obj = getDecodedObject(obj);
     } else {
@@ -1863,6 +1940,14 @@ static void addReplySds(redisClient *c, sds s) {
     decrRefCount(o);
 }
 
     decrRefCount(o);
 }
 
+static void addReplyDouble(redisClient *c, double d) {
+    char buf[128];
+
+    snprintf(buf,sizeof(buf),"%.17g",d);
+    addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
+        strlen(buf),buf));
+}
+
 static void addReplyBulkLen(redisClient *c, robj *obj) {
     size_t len;
 
 static void addReplyBulkLen(redisClient *c, robj *obj) {
     size_t len;
 
@@ -1910,7 +1995,9 @@ static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
         char *err = "-ERR max number of clients reached\r\n";
 
         /* That's a best effort error message, don't check write errors */
         char *err = "-ERR max number of clients reached\r\n";
 
         /* That's a best effort error message, don't check write errors */
-        (void) write(c->fd,err,strlen(err));
+        if (write(c->fd,err,strlen(err)) == -1) {
+            /* Nothing to do, Just to avoid the warning... */
+        }
         freeClient(c);
         return;
     }
         freeClient(c);
         return;
     }
@@ -2363,8 +2450,8 @@ static int rdbSaveDoubleValue(FILE *fp, double val) {
         len = 1;
         buf[0] = (val < 0) ? 255 : 254;
     } else {
         len = 1;
         buf[0] = (val < 0) ? 255 : 254;
     } else {
-        snprintf((char*)buf+1,sizeof(buf)-1,"%.16g",val);
-        buf[0] = strlen((char*)buf);
+        snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
+        buf[0] = strlen((char*)buf+1);
         len = buf[0]+1;
     }
     if (fwrite(buf,len,1,fp) == 0) return -1;
         len = buf[0]+1;
     }
     if (fwrite(buf,len,1,fp) == 0) return -1;
@@ -2783,7 +2870,7 @@ static int rdbLoad(char *filename) {
 
 eoferr: /* unexpected end of file is handled here with a fatal exit */
     if (keyobj) decrRefCount(keyobj);
 
 eoferr: /* unexpected end of file is handled here with a fatal exit */
     if (keyobj) decrRefCount(keyobj);
-    redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
+    redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
     exit(1);
     return REDIS_ERR; /* Just to avoid warning */
 }
     exit(1);
     return REDIS_ERR; /* Just to avoid warning */
 }
@@ -2889,6 +2976,49 @@ static void mgetCommand(redisClient *c) {
     }
 }
 
     }
 }
 
+static void msetGenericCommand(redisClient *c, int nx) {
+    int j;
+
+    if ((c->argc % 2) == 0) {
+        addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
+        return;
+    }
+    /* Handle the NX flag. The MSETNX semantic is to return zero and don't
+     * set nothing at all if at least one already key exists. */
+    if (nx) {
+        for (j = 1; j < c->argc; j += 2) {
+            if (dictFind(c->db->dict,c->argv[j]) != NULL) {
+                addReply(c, shared.czero);
+                return;
+            }
+        }
+    }
+
+    for (j = 1; j < c->argc; j += 2) {
+        int retval;
+
+        retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
+        if (retval == DICT_ERR) {
+            dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
+            incrRefCount(c->argv[j+1]);
+        } else {
+            incrRefCount(c->argv[j]);
+            incrRefCount(c->argv[j+1]);
+        }
+        removeExpire(c->db,c->argv[j]);
+    }
+    server.dirty += (c->argc-1)/2;
+    addReply(c, nx ? shared.cone : shared.ok);
+}
+
+static void msetCommand(redisClient *c) {
+    msetGenericCommand(c,0);
+}
+
+static void msetnxCommand(redisClient *c) {
+    msetGenericCommand(c,1);
+}
+
 static void incrDecrCommand(redisClient *c, long long incr) {
     long long value;
     int retval;
 static void incrDecrCommand(redisClient *c, long long incr) {
     long long value;
     int retval;
@@ -3054,6 +3184,7 @@ static void typeCommand(redisClient *c) {
         case REDIS_STRING: type = "+string"; break;
         case REDIS_LIST: type = "+list"; break;
         case REDIS_SET: type = "+set"; break;
         case REDIS_STRING: type = "+string"; break;
         case REDIS_LIST: type = "+list"; break;
         case REDIS_SET: type = "+set"; break;
+        case REDIS_ZSET: type = "+zset"; break;
         default: type = "unknown"; break;
         }
     }
         default: type = "unknown"; break;
         }
     }
@@ -3486,6 +3617,70 @@ static void lremCommand(redisClient *c) {
     }
 }
 
     }
 }
 
+/* This is the semantic of this command:
+ *  RPOPLPUSH srclist dstlist:
+ *   IF LLEN(srclist) > 0
+ *     element = RPOP srclist
+ *     LPUSH dstlist element
+ *     RETURN element
+ *   ELSE
+ *     RETURN nil
+ *   END
+ *  END
+ *
+ * The idea is to be able to get an element from a list in a reliable way
+ * since the element is not just returned but pushed against another list
+ * as well. This command was originally proposed by Ezra Zygmuntowicz.
+ */
+static void rpoplpushcommand(redisClient *c) {
+    robj *sobj;
+
+    sobj = lookupKeyWrite(c->db,c->argv[1]);
+    if (sobj == NULL) {
+        addReply(c,shared.nullbulk);
+    } else {
+        if (sobj->type != REDIS_LIST) {
+            addReply(c,shared.wrongtypeerr);
+        } else {
+            list *srclist = sobj->ptr;
+            listNode *ln = listLast(srclist);
+
+            if (ln == NULL) {
+                addReply(c,shared.nullbulk);
+            } else {
+                robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
+                robj *ele = listNodeValue(ln);
+                list *dstlist;
+
+                if (dobj == NULL) {
+
+                    /* Create the list if the key does not exist */
+                    dobj = createListObject();
+                    dictAdd(c->db->dict,c->argv[2],dobj);
+                    incrRefCount(c->argv[2]);
+                } else if (dobj->type != REDIS_LIST) {
+                    addReply(c,shared.wrongtypeerr);
+                    return;
+                }
+                /* Add the element to the target list */
+                dstlist = dobj->ptr;
+                listAddNodeHead(dstlist,ele);
+                incrRefCount(ele);
+
+                /* Send the element to the client as reply as well */
+                addReplyBulkLen(c,ele);
+                addReply(c,ele);
+                addReply(c,shared.crlf);
+
+                /* Finally remove the element from the source list */
+                listDelNode(srclist,ln);
+                server.dirty++;
+            }
+        }
+    }
+}
+
+
 /* ==================================== Sets ================================ */
 
 static void saddCommand(redisClient *c) {
 /* ==================================== Sets ================================ */
 
 static void saddCommand(redisClient *c) {
@@ -4077,56 +4272,101 @@ static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
 
 /* The actual Z-commands implementations */
 
 
 /* The actual Z-commands implementations */
 
-static void zaddCommand(redisClient *c) {
+/* This generic command implements both ZADD and ZINCRBY.
+ * scoreval is the score if the operation is a ZADD (doincrement == 0) or
+ * the increment if the operation is a ZINCRBY (doincrement == 1). */
+static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
     robj *zsetobj;
     zset *zs;
     double *score;
 
     robj *zsetobj;
     zset *zs;
     double *score;
 
-    zsetobj = lookupKeyWrite(c->db,c->argv[1]);
+    zsetobj = lookupKeyWrite(c->db,key);
     if (zsetobj == NULL) {
         zsetobj = createZsetObject();
     if (zsetobj == NULL) {
         zsetobj = createZsetObject();
-        dictAdd(c->db->dict,c->argv[1],zsetobj);
-        incrRefCount(c->argv[1]);
+        dictAdd(c->db->dict,key,zsetobj);
+        incrRefCount(key);
     } else {
         if (zsetobj->type != REDIS_ZSET) {
             addReply(c,shared.wrongtypeerr);
             return;
         }
     }
     } else {
         if (zsetobj->type != REDIS_ZSET) {
             addReply(c,shared.wrongtypeerr);
             return;
         }
     }
-    score = zmalloc(sizeof(double));
-    *score = strtod(c->argv[2]->ptr,NULL);
     zs = zsetobj->ptr;
     zs = zsetobj->ptr;
-    if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
+
+    /* Ok now since we implement both ZADD and ZINCRBY here the code
+     * needs to handle the two different conditions. It's all about setting
+     * '*score', that is, the new score to set, to the right value. */
+    score = zmalloc(sizeof(double));
+    if (doincrement) {
+        dictEntry *de;
+
+        /* Read the old score. If the element was not present starts from 0 */
+        de = dictFind(zs->dict,ele);
+        if (de) {
+            double *oldscore = dictGetEntryVal(de);
+            *score = *oldscore + scoreval;
+        } else {
+            *score = scoreval;
+        }
+    } else {
+        *score = scoreval;
+    }
+
+    /* What follows is a simple remove and re-insert operation that is common
+     * to both ZADD and ZINCRBY... */
+    if (dictAdd(zs->dict,ele,score) == DICT_OK) {
         /* case 1: New element */
         /* case 1: New element */
-        incrRefCount(c->argv[3]); /* added to hash */
-        zslInsert(zs->zsl,*score,c->argv[3]);
-        incrRefCount(c->argv[3]); /* added to skiplist */
+        incrRefCount(ele); /* added to hash */
+        zslInsert(zs->zsl,*score,ele);
+        incrRefCount(ele); /* added to skiplist */
         server.dirty++;
         server.dirty++;
-        addReply(c,shared.cone);
+        if (doincrement)
+            addReplyDouble(c,*score);
+        else
+            addReply(c,shared.cone);
     } else {
         dictEntry *de;
         double *oldscore;
         
         /* case 2: Score update operation */
     } else {
         dictEntry *de;
         double *oldscore;
         
         /* case 2: Score update operation */
-        de = dictFind(zs->dict,c->argv[3]);
+        de = dictFind(zs->dict,ele);
         assert(de != NULL);
         oldscore = dictGetEntryVal(de);
         if (*score != *oldscore) {
             int deleted;
 
         assert(de != NULL);
         oldscore = dictGetEntryVal(de);
         if (*score != *oldscore) {
             int deleted;
 
-            deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
+            /* Remove and insert the element in the skip list with new score */
+            deleted = zslDelete(zs->zsl,*oldscore,ele);
             assert(deleted != 0);
             assert(deleted != 0);
-            zslInsert(zs->zsl,*score,c->argv[3]);
-            incrRefCount(c->argv[3]);
-            dictReplace(zs->dict,c->argv[3],score);
+            zslInsert(zs->zsl,*score,ele);
+            incrRefCount(ele);
+            /* Update the score in the hash table */
+            dictReplace(zs->dict,ele,score);
             server.dirty++;
         } else {
             zfree(score);
         }
             server.dirty++;
         } else {
             zfree(score);
         }
-        addReply(c,shared.czero);
+        if (doincrement)
+            addReplyDouble(c,*score);
+        else
+            addReply(c,shared.czero);
     }
 }
 
     }
 }
 
+static void zaddCommand(redisClient *c) {
+    double scoreval;
+
+    scoreval = strtod(c->argv[2]->ptr,NULL);
+    zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
+}
+
+static void zincrbyCommand(redisClient *c) {
+    double scoreval;
+
+    scoreval = strtod(c->argv[2]->ptr,NULL);
+    zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
+}
+
 static void zremCommand(redisClient *c) {
     robj *zsetobj;
     zset *zs;
 static void zremCommand(redisClient *c) {
     robj *zsetobj;
     zset *zs;
@@ -4322,7 +4562,7 @@ static void zscoreCommand(redisClient *c) {
     
     o = lookupKeyRead(c->db,c->argv[1]);
     if (o == NULL) {
     
     o = lookupKeyRead(c->db,c->argv[1]);
     if (o == NULL) {
-        addReply(c,shared.czero);
+        addReply(c,shared.nullbulk);
         return;
     } else {
         if (o->type != REDIS_ZSET) {
         return;
     } else {
         if (o->type != REDIS_ZSET) {
@@ -4335,12 +4575,9 @@ static void zscoreCommand(redisClient *c) {
             if (!de) {
                 addReply(c,shared.nullbulk);
             } else {
             if (!de) {
                 addReply(c,shared.nullbulk);
             } else {
-                char buf[128];
                 double *score = dictGetEntryVal(de);
 
                 double *score = dictGetEntryVal(de);
 
-                snprintf(buf,sizeof(buf),"%.16g",*score);
-                addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
-                    strlen(buf),buf));
+                addReplyDouble(c,*score);
             }
         }
     }
             }
         }
     }
@@ -4383,17 +4620,31 @@ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
         char buf[REDIS_SORTKEY_MAX+1];
     } keyname;
 
         char buf[REDIS_SORTKEY_MAX+1];
     } keyname;
 
+    /* If the pattern is "#" return the substitution object itself in order
+     * to implement the "SORT ... GET #" feature. */
+    spat = pattern->ptr;
+    if (spat[0] == '#' && spat[1] == '\0') {
+        return subst;
+    }
+
+    /* The substitution object may be specially encoded. If so we create
+     * a decoded object on the fly. */
     if (subst->encoding == REDIS_ENCODING_RAW)
     if (subst->encoding == REDIS_ENCODING_RAW)
+        /* If we don't need to get a decoded object increment the refcount
+         * so that the final decrRefCount() call will restore the original
+         * count */
         incrRefCount(subst);
     else {
         subst = getDecodedObject(subst);
     }
 
         incrRefCount(subst);
     else {
         subst = getDecodedObject(subst);
     }
 
-    spat = pattern->ptr;
     ssub = subst->ptr;
     if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
     p = strchr(spat,'*');
     ssub = subst->ptr;
     if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
     p = strchr(spat,'*');
-    if (!p) return NULL;
+    if (!p) {
+        decrRefCount(subst);
+        return NULL;
+    }
 
     prefixlen = p-spat;
     sublen = sdslen(ssub);
 
     prefixlen = p-spat;
     sublen = sdslen(ssub);
@@ -4475,7 +4726,7 @@ static void sortCommand(redisClient *c) {
     int limit_start = 0, limit_count = -1, start, end;
     int j, dontsort = 0, vectorlen;
     int getop = 0; /* GET operation counter */
     int limit_start = 0, limit_count = -1, start, end;
     int j, dontsort = 0, vectorlen;
     int getop = 0; /* GET operation counter */
-    robj *sortval, *sortby = NULL;
+    robj *sortval, *sortby = NULL, *storekey = NULL;
     redisSortObject *vector; /* Resulting vector to sort */
 
     /* Lookup the key to sort. It must be of the right types */
     redisSortObject *vector; /* Resulting vector to sort */
 
     /* Lookup the key to sort. It must be of the right types */
@@ -4513,6 +4764,9 @@ static void sortCommand(redisClient *c) {
             limit_start = atoi(c->argv[j+1]->ptr);
             limit_count = atoi(c->argv[j+2]->ptr);
             j+=2;
             limit_start = atoi(c->argv[j+1]->ptr);
             limit_count = atoi(c->argv[j+2]->ptr);
             j+=2;
+        } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
+            storekey = c->argv[j+1];
+            j++;
         } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
             sortby = c->argv[j+1];
             /* If the BY pattern does not contain '*', i.e. it is constant,
         } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
             sortby = c->argv[j+1];
             /* If the BY pattern does not contain '*', i.e. it is constant,
@@ -4524,18 +4778,6 @@ static void sortCommand(redisClient *c) {
                 REDIS_SORT_GET,c->argv[j+1]));
             getop++;
             j++;
                 REDIS_SORT_GET,c->argv[j+1]));
             getop++;
             j++;
-        } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
-            listAddNodeTail(operations,createSortOperation(
-                REDIS_SORT_DEL,c->argv[j+1]));
-            j++;
-        } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
-            listAddNodeTail(operations,createSortOperation(
-                REDIS_SORT_INCR,c->argv[j+1]));
-            j++;
-        } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
-            listAddNodeTail(operations,createSortOperation(
-                REDIS_SORT_DECR,c->argv[j+1]));
-            j++;
         } else {
             decrRefCount(sortval);
             listRelease(operations);
         } else {
             decrRefCount(sortval);
             listRelease(operations);
@@ -4642,32 +4884,72 @@ static void sortCommand(redisClient *c) {
     /* Send command output to the output buffer, performing the specified
      * GET/DEL/INCR/DECR operations if any. */
     outputlen = getop ? getop*(end-start+1) : end-start+1;
     /* Send command output to the output buffer, performing the specified
      * GET/DEL/INCR/DECR operations if any. */
     outputlen = getop ? getop*(end-start+1) : end-start+1;
-    addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
-    for (j = start; j <= end; j++) {
-        listNode *ln;
-        if (!getop) {
-            addReplyBulkLen(c,vector[j].obj);
-            addReply(c,vector[j].obj);
-            addReply(c,shared.crlf);
+    if (storekey == NULL) {
+        /* STORE option not specified, sent the sorting result to client */
+        addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
+        for (j = start; j <= end; j++) {
+            listNode *ln;
+            if (!getop) {
+                addReplyBulkLen(c,vector[j].obj);
+                addReply(c,vector[j].obj);
+                addReply(c,shared.crlf);
+            }
+            listRewind(operations);
+            while((ln = listYield(operations))) {
+                redisSortOperation *sop = ln->value;
+                robj *val = lookupKeyByPattern(c->db,sop->pattern,
+                    vector[j].obj);
+
+                if (sop->type == REDIS_SORT_GET) {
+                    if (!val || val->type != REDIS_STRING) {
+                        addReply(c,shared.nullbulk);
+                    } else {
+                        addReplyBulkLen(c,val);
+                        addReply(c,val);
+                        addReply(c,shared.crlf);
+                    }
+                } else {
+                    assert(sop->type == REDIS_SORT_GET); /* always fails */
+                }
+            }
         }
         }
-        listRewind(operations);
-        while((ln = listYield(operations))) {
-            redisSortOperation *sop = ln->value;
-            robj *val = lookupKeyByPattern(c->db,sop->pattern,
-                vector[j].obj);
+    } else {
+        robj *listObject = createListObject();
+        list *listPtr = (list*) listObject->ptr;
 
 
-            if (sop->type == REDIS_SORT_GET) {
-                if (!val || val->type != REDIS_STRING) {
-                    addReply(c,shared.nullbulk);
+        /* STORE option specified, set the sorting result as a List object */
+        for (j = start; j <= end; j++) {
+            listNode *ln;
+            if (!getop) {
+                listAddNodeTail(listPtr,vector[j].obj);
+                incrRefCount(vector[j].obj);
+            }
+            listRewind(operations);
+            while((ln = listYield(operations))) {
+                redisSortOperation *sop = ln->value;
+                robj *val = lookupKeyByPattern(c->db,sop->pattern,
+                    vector[j].obj);
+
+                if (sop->type == REDIS_SORT_GET) {
+                    if (!val || val->type != REDIS_STRING) {
+                        listAddNodeTail(listPtr,createStringObject("",0));
+                    } else {
+                        listAddNodeTail(listPtr,val);
+                        incrRefCount(val);
+                    }
                 } else {
                 } else {
-                    addReplyBulkLen(c,val);
-                    addReply(c,val);
-                    addReply(c,shared.crlf);
+                    assert(sop->type == REDIS_SORT_GET); /* always fails */
                 }
                 }
-            } else if (sop->type == REDIS_SORT_DEL) {
-                /* TODO */
             }
         }
             }
         }
+        if (dictReplace(c->db->dict,storekey,listObject)) {
+            incrRefCount(storekey);
+        }
+        /* Note: we add 1 because the DB is dirty anyway since even if the
+         * SORT result is empty a new key is set and maybe the old content
+         * replaced. */
+        server.dirty += 1+outputlen;
+        addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
     }
 
     /* Cleanup */
     }
 
     /* Cleanup */
@@ -4855,54 +5137,11 @@ static void ttlCommand(redisClient *c) {
     addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
 }
 
     addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
 }
 
-static void msetGenericCommand(redisClient *c, int nx) {
-    int j;
-
-    if ((c->argc % 2) == 0) {
-        addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
-        return;
-    }
-    /* Handle the NX flag. The MSETNX semantic is to return zero and don't
-     * set nothing at all if at least one already key exists. */
-    if (nx) {
-        for (j = 1; j < c->argc; j += 2) {
-            if (dictFind(c->db->dict,c->argv[j]) != NULL) {
-                addReply(c, shared.czero);
-                return;
-            }
-        }
-    }
+/* =============================== Replication  ============================= */
 
 
-    for (j = 1; j < c->argc; j += 2) {
-        int retval;
-
-        retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
-        if (retval == DICT_ERR) {
-            dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
-            incrRefCount(c->argv[j+1]);
-        } else {
-            incrRefCount(c->argv[j]);
-            incrRefCount(c->argv[j+1]);
-        }
-        removeExpire(c->db,c->argv[j]);
-    }
-    server.dirty += (c->argc-1)/2;
-    addReply(c, nx ? shared.cone : shared.ok);
-}
-
-static void msetCommand(redisClient *c) {
-    msetGenericCommand(c,0);
-}
-
-static void msetnxCommand(redisClient *c) {
-    msetGenericCommand(c,1);
-}
-
-/* =============================== Replication  ============================= */
-
-static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
-    ssize_t nwritten, ret = size;
-    time_t start = time(NULL);
+static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
+    ssize_t nwritten, ret = size;
+    time_t start = time(NULL);
 
     timeout++;
     while(size) {
 
     timeout++;
     while(size) {
@@ -5065,7 +5304,7 @@ static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
         slave->replstate = REDIS_REPL_ONLINE;
         if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
         aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
         slave->replstate = REDIS_REPL_ONLINE;
         if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
-            sendReplyToClient, slave, NULL) == AE_ERR) {
+            sendReplyToClient, slave) == AE_ERR) {
             freeClient(slave);
             return;
         }
             freeClient(slave);
             return;
         }
@@ -5109,7 +5348,7 @@ static void updateSlavesWaitingBgsave(int bgsaveerr) {
             slave->repldbsize = buf.st_size;
             slave->replstate = REDIS_REPL_SEND_BULK;
             aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
             slave->repldbsize = buf.st_size;
             slave->replstate = REDIS_REPL_SEND_BULK;
             aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
-            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
+            if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
                 freeClient(slave);
                 continue;
             }
                 freeClient(slave);
                 continue;
             }
@@ -5130,7 +5369,7 @@ static void updateSlavesWaitingBgsave(int bgsaveerr) {
 }
 
 static int syncWithMaster(void) {
 }
 
 static int syncWithMaster(void) {
-    char buf[1024], tmpfile[256];
+    char buf[1024], tmpfile[256], authcmd[1024];
     int dumpsize;
     int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
     int dfd;
     int dumpsize;
     int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
     int dfd;
@@ -5140,6 +5379,30 @@ static int syncWithMaster(void) {
             strerror(errno));
         return REDIS_ERR;
     }
             strerror(errno));
         return REDIS_ERR;
     }
+
+    /* AUTH with the master if required. */
+    if(server.masterauth) {
+       snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
+       if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
+            close(fd);
+            redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
+                strerror(errno));
+            return REDIS_ERR;
+       }
+        /* Read the AUTH result.  */
+        if (syncReadLine(fd,buf,1024,3600) == -1) {
+            close(fd);
+            redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
+                strerror(errno));
+            return REDIS_ERR;
+        }
+        if (buf[0] != '+') {
+            close(fd);
+            redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
+            return REDIS_ERR;
+        }
+    }
+
     /* Issue the SYNC command */
     if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
         close(fd);
     /* Issue the SYNC command */
     if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
         close(fd);
@@ -5282,6 +5545,198 @@ static void freeMemoryIfNeeded(void) {
     }
 }
 
     }
 }
 
+/* ============================== Append Only file ========================== */
+
+static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
+    sds buf = sdsempty();
+    int j;
+    ssize_t nwritten;
+    time_t now;
+    robj *tmpargv[3];
+
+    /* The DB this command was targetting is not the same as the last command
+     * we appendend. To issue a SELECT command is needed. */
+    if (dictid != server.appendseldb) {
+        char seldb[64];
+
+        snprintf(seldb,sizeof(seldb),"%d",dictid);
+        buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
+            strlen(seldb),seldb);
+        server.appendseldb = dictid;
+    }
+
+    /* "Fix" the argv vector if the command is EXPIRE. We want to translate
+     * EXPIREs into EXPIREATs calls */
+    if (cmd->proc == expireCommand) {
+        long when;
+
+        tmpargv[0] = createStringObject("EXPIREAT",8);
+        tmpargv[1] = argv[1];
+        incrRefCount(argv[1]);
+        when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
+        tmpargv[2] = createObject(REDIS_STRING,
+            sdscatprintf(sdsempty(),"%ld",when));
+        argv = tmpargv;
+    }
+
+    /* Append the actual command */
+    buf = sdscatprintf(buf,"*%d\r\n",argc);
+    for (j = 0; j < argc; j++) {
+        robj *o = argv[j];
+
+        if (o->encoding != REDIS_ENCODING_RAW)
+            o = getDecodedObject(o);
+        buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
+        buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
+        buf = sdscatlen(buf,"\r\n",2);
+        if (o != argv[j])
+            decrRefCount(o);
+    }
+
+    /* Free the objects from the modified argv for EXPIREAT */
+    if (cmd->proc == expireCommand) {
+        for (j = 0; j < 3; j++)
+            decrRefCount(argv[j]);
+    }
+
+    /* We want to perform a single write. This should be guaranteed atomic
+     * at least if the filesystem we are writing is a real physical one.
+     * While this will save us against the server being killed I don't think
+     * there is much to do about the whole server stopping for power problems
+     * or alike */
+     nwritten = write(server.appendfd,buf,sdslen(buf));
+     if (nwritten != (signed)sdslen(buf)) {
+        /* Ooops, we are in troubles. The best thing to do for now is
+         * to simply exit instead to give the illusion that everything is
+         * working as expected. */
+         if (nwritten == -1) {
+            redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
+         } else {
+            redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
+         }
+         exit(1);
+    }
+    now = time(NULL);
+    if (server.appendfsync == APPENDFSYNC_ALWAYS ||
+        (server.appendfsync == APPENDFSYNC_EVERYSEC &&
+         now-server.lastfsync > 1))
+    {
+        fsync(server.appendfd); /* Let's try to get this data on the disk */
+        server.lastfsync = now;
+    }
+}
+
+/* In Redis commands are always executed in the context of a client, so in
+ * order to load the append only file we need to create a fake client. */
+static struct redisClient *createFakeClient(void) {
+    struct redisClient *c = zmalloc(sizeof(*c));
+
+    selectDb(c,0);
+    c->fd = -1;
+    c->querybuf = sdsempty();
+    c->argc = 0;
+    c->argv = NULL;
+    c->flags = 0;
+    /* We set the fake client as a slave waiting for the synchronization
+     * so that Redis will not try to send replies to this client. */
+    c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
+    c->reply = listCreate();
+    listSetFreeMethod(c->reply,decrRefCount);
+    listSetDupMethod(c->reply,dupClientReplyValue);
+    return c;
+}
+
+static void freeFakeClient(struct redisClient *c) {
+    sdsfree(c->querybuf);
+    listRelease(c->reply);
+    zfree(c);
+}
+
+/* Replay the append log file. On error REDIS_OK is returned. On non fatal
+ * error (the append only file is zero-length) REDIS_ERR is returned. On
+ * fatal error an error message is logged and the program exists. */
+int loadAppendOnlyFile(char *filename) {
+    struct redisClient *fakeClient;
+    FILE *fp = fopen(filename,"r");
+    struct redis_stat sb;
+
+    if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
+        return REDIS_ERR;
+
+    if (fp == NULL) {
+        redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
+        exit(1);
+    }
+
+    fakeClient = createFakeClient();
+    while(1) {
+        int argc, j;
+        unsigned long len;
+        robj **argv;
+        char buf[128];
+        sds argsds;
+        struct redisCommand *cmd;
+
+        if (fgets(buf,sizeof(buf),fp) == NULL) {
+            if (feof(fp))
+                break;
+            else
+                goto readerr;
+        }
+        if (buf[0] != '*') goto fmterr;
+        argc = atoi(buf+1);
+        argv = zmalloc(sizeof(robj*)*argc);
+        for (j = 0; j < argc; j++) {
+            if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
+            if (buf[0] != '$') goto fmterr;
+            len = strtol(buf+1,NULL,10);
+            argsds = sdsnewlen(NULL,len);
+            if (fread(argsds,len,1,fp) == 0) goto fmterr;
+            argv[j] = createObject(REDIS_STRING,argsds);
+            if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
+        }
+
+        /* Command lookup */
+        cmd = lookupCommand(argv[0]->ptr);
+        if (!cmd) {
+            redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
+            exit(1);
+        }
+        /* Try object sharing and encoding */
+        if (server.shareobjects) {
+            int j;
+            for(j = 1; j < argc; j++)
+                argv[j] = tryObjectSharing(argv[j]);
+        }
+        if (cmd->flags & REDIS_CMD_BULK)
+            tryObjectEncoding(argv[argc-1]);
+        /* Run the command in the context of a fake client */
+        fakeClient->argc = argc;
+        fakeClient->argv = argv;
+        cmd->proc(fakeClient);
+        /* Discard the reply objects list from the fake client */
+        while(listLength(fakeClient->reply))
+            listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
+        /* Clean up, ready for the next command */
+        for (j = 0; j < argc; j++) decrRefCount(argv[j]);
+        zfree(argv);
+    }
+    fclose(fp);
+    freeFakeClient(fakeClient);
+    return REDIS_OK;
+
+readerr:
+    if (feof(fp)) {
+        redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
+    } else {
+        redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
+    }
+    exit(1);
+fmterr:
+    redisLog(REDIS_WARNING,"Bad file format reading the append only file");
+    exit(1);
+}
+
 /* ================================= Debugging ============================== */
 
 static void debugCommand(redisClient *c) {
 /* ================================= Debugging ============================== */
 
 static void debugCommand(redisClient *c) {
@@ -5306,149 +5761,91 @@ static void debugCommand(redisClient *c) {
     }
 }
 
     }
 }
 
-#ifdef HAVE_BACKTRACE
-static struct redisFunctionSym symsTable[] = {
-{"compareStringObjects", (unsigned long)compareStringObjects},
-{"isStringRepresentableAsLong", (unsigned long)isStringRepresentableAsLong},
-{"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
-{"dictEncObjHash", (unsigned long)dictEncObjHash},
-{"incrDecrCommand", (unsigned long)incrDecrCommand},
-{"freeStringObject", (unsigned long)freeStringObject},
-{"freeListObject", (unsigned long)freeListObject},
-{"freeSetObject", (unsigned long)freeSetObject},
-{"decrRefCount", (unsigned long)decrRefCount},
-{"createObject", (unsigned long)createObject},
-{"freeClient", (unsigned long)freeClient},
-{"rdbLoad", (unsigned long)rdbLoad},
-{"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
-{"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
-{"addReply", (unsigned long)addReply},
-{"addReplySds", (unsigned long)addReplySds},
-{"incrRefCount", (unsigned long)incrRefCount},
-{"rdbSaveBackground", (unsigned long)rdbSaveBackground},
-{"createStringObject", (unsigned long)createStringObject},
-{"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
-{"syncWithMaster", (unsigned long)syncWithMaster},
-{"tryObjectSharing", (unsigned long)tryObjectSharing},
-{"tryObjectEncoding", (unsigned long)tryObjectEncoding},
-{"getDecodedObject", (unsigned long)getDecodedObject},
-{"removeExpire", (unsigned long)removeExpire},
-{"expireIfNeeded", (unsigned long)expireIfNeeded},
-{"deleteIfVolatile", (unsigned long)deleteIfVolatile},
-{"deleteKey", (unsigned long)deleteKey},
-{"getExpire", (unsigned long)getExpire},
-{"setExpire", (unsigned long)setExpire},
-{"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
-{"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
-{"authCommand", (unsigned long)authCommand},
-{"pingCommand", (unsigned long)pingCommand},
-{"echoCommand", (unsigned long)echoCommand},
-{"setCommand", (unsigned long)setCommand},
-{"setnxCommand", (unsigned long)setnxCommand},
-{"getCommand", (unsigned long)getCommand},
-{"delCommand", (unsigned long)delCommand},
-{"existsCommand", (unsigned long)existsCommand},
-{"incrCommand", (unsigned long)incrCommand},
-{"decrCommand", (unsigned long)decrCommand},
-{"incrbyCommand", (unsigned long)incrbyCommand},
-{"decrbyCommand", (unsigned long)decrbyCommand},
-{"selectCommand", (unsigned long)selectCommand},
-{"randomkeyCommand", (unsigned long)randomkeyCommand},
-{"keysCommand", (unsigned long)keysCommand},
-{"dbsizeCommand", (unsigned long)dbsizeCommand},
-{"lastsaveCommand", (unsigned long)lastsaveCommand},
-{"saveCommand", (unsigned long)saveCommand},
-{"bgsaveCommand", (unsigned long)bgsaveCommand},
-{"shutdownCommand", (unsigned long)shutdownCommand},
-{"moveCommand", (unsigned long)moveCommand},
-{"renameCommand", (unsigned long)renameCommand},
-{"renamenxCommand", (unsigned long)renamenxCommand},
-{"lpushCommand", (unsigned long)lpushCommand},
-{"rpushCommand", (unsigned long)rpushCommand},
-{"lpopCommand", (unsigned long)lpopCommand},
-{"rpopCommand", (unsigned long)rpopCommand},
-{"llenCommand", (unsigned long)llenCommand},
-{"lindexCommand", (unsigned long)lindexCommand},
-{"lrangeCommand", (unsigned long)lrangeCommand},
-{"ltrimCommand", (unsigned long)ltrimCommand},
-{"typeCommand", (unsigned long)typeCommand},
-{"lsetCommand", (unsigned long)lsetCommand},
-{"saddCommand", (unsigned long)saddCommand},
-{"sremCommand", (unsigned long)sremCommand},
-{"smoveCommand", (unsigned long)smoveCommand},
-{"sismemberCommand", (unsigned long)sismemberCommand},
-{"scardCommand", (unsigned long)scardCommand},
-{"spopCommand", (unsigned long)spopCommand},
-{"srandmemberCommand", (unsigned long)srandmemberCommand},
-{"sinterCommand", (unsigned long)sinterCommand},
-{"sinterstoreCommand", (unsigned long)sinterstoreCommand},
-{"sunionCommand", (unsigned long)sunionCommand},
-{"sunionstoreCommand", (unsigned long)sunionstoreCommand},
-{"sdiffCommand", (unsigned long)sdiffCommand},
-{"sdiffstoreCommand", (unsigned long)sdiffstoreCommand},
-{"syncCommand", (unsigned long)syncCommand},
-{"flushdbCommand", (unsigned long)flushdbCommand},
-{"flushallCommand", (unsigned long)flushallCommand},
-{"sortCommand", (unsigned long)sortCommand},
-{"lremCommand", (unsigned long)lremCommand},
-{"infoCommand", (unsigned long)infoCommand},
-{"mgetCommand", (unsigned long)mgetCommand},
-{"monitorCommand", (unsigned long)monitorCommand},
-{"expireCommand", (unsigned long)expireCommand},
-{"expireatCommand", (unsigned long)expireatCommand},
-{"getsetCommand", (unsigned long)getsetCommand},
-{"ttlCommand", (unsigned long)ttlCommand},
-{"slaveofCommand", (unsigned long)slaveofCommand},
-{"debugCommand", (unsigned long)debugCommand},
-{"processCommand", (unsigned long)processCommand},
-{"setupSigSegvAction", (unsigned long)setupSigSegvAction},
-{"readQueryFromClient", (unsigned long)readQueryFromClient},
-{"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
-{"msetGenericCommand", (unsigned long)msetGenericCommand},
-{"msetCommand", (unsigned long)msetCommand},
-{"msetnxCommand", (unsigned long)msetnxCommand},
-{"zslCreateNode", (unsigned long)zslCreateNode},
-{"zslCreate", (unsigned long)zslCreate},
-{"zslFreeNode",(unsigned long)zslFreeNode},
-{"zslFree",(unsigned long)zslFree},
-{"zslRandomLevel",(unsigned long)zslRandomLevel},
-{"zslInsert",(unsigned long)zslInsert},
-{"zslDelete",(unsigned long)zslDelete},
-{"createZsetObject",(unsigned long)createZsetObject},
-{"zaddCommand",(unsigned long)zaddCommand},
-{"zrangeGenericCommand",(unsigned long)zrangeGenericCommand},
-{"zrangeCommand",(unsigned long)zrangeCommand},
-{"zrevrangeCommand",(unsigned long)zrevrangeCommand},
-{"zremCommand",(unsigned long)zremCommand},
-{"rdbSaveDoubleValue",(unsigned long)rdbSaveDoubleValue},
-{"rdbLoadDoubleValue",(unsigned long)rdbLoadDoubleValue},
-{NULL,0}
-};
+/* =================================== Main! ================================ */
 
 
-/* This function try to convert a pointer into a function name. It's used in
- * oreder to provide a backtrace under segmentation fault that's able to
- * display functions declared as static (otherwise the backtrace is useless). */
-static char *findFuncName(void *pointer, unsigned long *offset){
-    int i, ret = -1;
-    unsigned long off, minoff = 0;
+#ifdef __linux__
+int linuxOvercommitMemoryValue(void) {
+    FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
+    char buf[64];
 
 
-    /* Try to match against the Symbol with the smallest offset */
-    for (i=0; symsTable[i].pointer; i++) {
-        unsigned long lp = (unsigned long) pointer;
+    if (!fp) return -1;
+    if (fgets(buf,64,fp) == NULL) {
+        fclose(fp);
+        return -1;
+    }
+    fclose(fp);
 
 
-        if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
-            off=lp-symsTable[i].pointer;
-            if (ret < 0 || off < minoff) {
-                minoff=off;
-                ret=i;
-            }
-        }
+    return atoi(buf);
+}
+
+void linuxOvercommitMemoryWarning(void) {
+    if (linuxOvercommitMemoryValue() == 0) {
+        redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
+    }
+}
+#endif /* __linux__ */
+
+static void daemonize(void) {
+    int fd;
+    FILE *fp;
+
+    if (fork() != 0) exit(0); /* parent exits */
+    setsid(); /* create a new session */
+
+    /* Every output goes to /dev/null. If Redis is daemonized but
+     * the 'logfile' is set to 'stdout' in the configuration file
+     * it will not log at all. */
+    if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
+        dup2(fd, STDIN_FILENO);
+        dup2(fd, STDOUT_FILENO);
+        dup2(fd, STDERR_FILENO);
+        if (fd > STDERR_FILENO) close(fd);
+    }
+    /* Try to write the pid file */
+    fp = fopen(server.pidfile,"w");
+    if (fp) {
+        fprintf(fp,"%d\n",getpid());
+        fclose(fp);
     }
     }
-    if (ret == -1) return NULL;
-    *offset = minoff;
-    return symsTable[ret].name;
 }
 
 }
 
+int main(int argc, char **argv) {
+    initServerConfig();
+    if (argc == 2) {
+        resetServerSaveParams();
+        loadServerConfig(argv[1]);
+    } else if (argc > 2) {
+        fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
+        exit(1);
+    } else {
+        redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
+    }
+    initServer();
+    if (server.daemonize) daemonize();
+    redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
+#ifdef __linux__
+    linuxOvercommitMemoryWarning();
+#endif
+    if (server.appendonly) {
+        if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
+            redisLog(REDIS_NOTICE,"DB loaded from append only file");
+    } else {
+        if (rdbLoad(server.dbfilename) == REDIS_OK)
+            redisLog(REDIS_NOTICE,"DB loaded from disk");
+    }
+    if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
+        acceptHandler, NULL) == AE_ERR) oom("creating file event");
+    redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
+    aeMain(server.el);
+    aeDeleteEventLoop(server.el);
+    return 0;
+}
+
+/* ============================= Backtrace support ========================= */
+
+#ifdef HAVE_BACKTRACE
+static char *findFuncName(void *pointer, unsigned long *offset);
+
 static void *getMcontextEip(ucontext_t *uc) {
 #if defined(__FreeBSD__)
     return (void*) uc->uc_mcontext.mc_eip;
 static void *getMcontextEip(ucontext_t *uc) {
 #if defined(__FreeBSD__)
     return (void*) uc->uc_mcontext.mc_eip;
@@ -5543,82 +5940,39 @@ static void setupSigSegvAction(void) {
     sigaction (SIGBUS, &act, NULL);
     return;
 }
     sigaction (SIGBUS, &act, NULL);
     return;
 }
-#else /* HAVE_BACKTRACE */
-static void setupSigSegvAction(void) {
-}
-#endif /* HAVE_BACKTRACE */
 
 
-/* =================================== Main! ================================ */
+#include "staticsymbols.h"
+/* This function try to convert a pointer into a function name. It's used in
+ * oreder to provide a backtrace under segmentation fault that's able to
+ * display functions declared as static (otherwise the backtrace is useless). */
+static char *findFuncName(void *pointer, unsigned long *offset){
+    int i, ret = -1;
+    unsigned long off, minoff = 0;
 
 
-#ifdef __linux__
-int linuxOvercommitMemoryValue(void) {
-    FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
-    char buf[64];
+    /* Try to match against the Symbol with the smallest offset */
+    for (i=0; symsTable[i].pointer; i++) {
+        unsigned long lp = (unsigned long) pointer;
 
 
-    if (!fp) return -1;
-    if (fgets(buf,64,fp) == NULL) {
-        fclose(fp);
-        return -1;
+        if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
+            off=lp-symsTable[i].pointer;
+            if (ret < 0 || off < minoff) {
+                minoff=off;
+                ret=i;
+            }
+        }
     }
     }
-    fclose(fp);
-
-    return atoi(buf);
+    if (ret == -1) return NULL;
+    *offset = minoff;
+    return symsTable[ret].name;
 }
 }
-
-void linuxOvercommitMemoryWarning(void) {
-    if (linuxOvercommitMemoryValue() == 0) {
-        redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
-    }
+#else /* HAVE_BACKTRACE */
+static void setupSigSegvAction(void) {
 }
 }
-#endif /* __linux__ */
+#endif /* HAVE_BACKTRACE */
 
 
-static void daemonize(void) {
-    int fd;
-    FILE *fp;
 
 
-    if (fork() != 0) exit(0); /* parent exits */
-    setsid(); /* create a new session */
 
 
-    /* Every output goes to /dev/null. If Redis is daemonized but
-     * the 'logfile' is set to 'stdout' in the configuration file
-     * it will not log at all. */
-    if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
-        dup2(fd, STDIN_FILENO);
-        dup2(fd, STDOUT_FILENO);
-        dup2(fd, STDERR_FILENO);
-        if (fd > STDERR_FILENO) close(fd);
-    }
-    /* Try to write the pid file */
-    fp = fopen(server.pidfile,"w");
-    if (fp) {
-        fprintf(fp,"%d\n",getpid());
-        fclose(fp);
-    }
-}
+/* The End */
+
+
 
 
-int main(int argc, char **argv) {
-    initServerConfig();
-    if (argc == 2) {
-        ResetServerSaveParams();
-        loadServerConfig(argv[1]);
-    } else if (argc > 2) {
-        fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
-        exit(1);
-    } else {
-        redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
-    }
-    initServer();
-    if (server.daemonize) daemonize();
-    redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
-#ifdef __linux__
-    linuxOvercommitMemoryWarning();
-#endif
-    if (rdbLoad(server.dbfilename) == REDIS_OK)
-        redisLog(REDIS_NOTICE,"DB loaded from disk");
-    if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
-        acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
-    redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
-    aeMain(server.el);
-    aeDeleteEventLoop(server.el);
-    return 0;
-}