]> git.saurik.com Git - redis.git/blobdiff - redis.c
incremented version number to 1.001, AKA Redis edge is no longer stable...
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index 2caf06c34ee3e1fede62b919a272ec6721b0ccdf..5034a802f7ad7049477230341d9b0bae632e6132 100644 (file)
--- a/redis.c
+++ b/redis.c
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
-#define REDIS_VERSION "0.101"
+#define REDIS_VERSION "1.001"
 
 #include "fmacros.h"
+#include "config.h"
 
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
 #define __USE_POSIX199309
 #include <signal.h>
+
+#ifdef HAVE_BACKTRACE
 #include <execinfo.h>
 #include <ucontext.h>
+#endif /* HAVE_BACKTRACE */
+
 #include <sys/wait.h>
 #include <errno.h>
 #include <assert.h>
@@ -52,7 +57,6 @@
 #include <sys/time.h>
 #include <sys/resource.h>
 #include <limits.h>
-#include <execinfo.h>
 
 #include "redis.h"
 #include "ae.h"     /* Event driven programming library */
@@ -64,8 +68,6 @@
 #include "lzf.h"    /* LZF compression library */
 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
 
-#include "config.h"
-
 /* Error codes */
 #define REDIS_OK                0
 #define REDIS_ERR               -1
 #define REDIS_MAX_SYNC_TIME     60      /* Slave can't take more to sync */
 #define REDIS_EXPIRELOOKUPS_PER_CRON    100 /* try to expire 100 keys/second */
 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
+#define REDIS_REQUEST_MAX_SIZE  (1024*1024*256) /* max bytes in inline command */
 
 /* Hash table parameters */
 #define REDIS_HT_MINFILL        10      /* Minimal hash table fill 10% */
-#define REDIS_HT_MINSLOTS       16384   /* Never resize the HT under this */
 
 /* Command flags */
 #define REDIS_CMD_BULK          1       /* Bulk write command */
 #define REDIS_SET 2
 #define REDIS_HASH 3
 
+/* Objects encoding */
+#define REDIS_ENCODING_RAW 0    /* Raw representation */
+#define REDIS_ENCODING_INT 1    /* Encoded as integer */
+
 /* Object types only used for dumping to disk */
 #define REDIS_EXPIRETIME 253
 #define REDIS_SELECTDB 254
 /* A redis object, that is a type able to hold a string / list / set */
 typedef struct redisObject {
     void *ptr;
-    int type;
+    unsigned char type;
+    unsigned char encoding;
+    unsigned char notused[2];
     int refcount;
 } robj;
 
@@ -321,16 +329,19 @@ static robj *createStringObject(char *ptr, size_t len);
 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
 static int syncWithMaster(void);
 static robj *tryObjectSharing(robj *o);
+static int tryObjectEncoding(robj *o);
+static robj *getDecodedObject(const robj *o);
 static int removeExpire(redisDb *db, robj *key);
 static int expireIfNeeded(redisDb *db, robj *key);
 static int deleteIfVolatile(redisDb *db, robj *key);
 static int deleteKey(redisDb *db, robj *key);
 static time_t getExpire(redisDb *db, robj *key);
 static int setExpire(redisDb *db, robj *key, time_t when);
-static void updateSalvesWaitingBgsave(int bgsaveerr);
+static void updateSlavesWaitingBgsave(int bgsaveerr);
 static void freeMemoryIfNeeded(void);
 static int processCommand(redisClient *c);
 static void setupSigSegvAction(void);
+static void rdbRemoveTempFile(pid_t childpid);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
@@ -370,6 +381,7 @@ static void sremCommand(redisClient *c);
 static void smoveCommand(redisClient *c);
 static void sismemberCommand(redisClient *c);
 static void scardCommand(redisClient *c);
+static void spopCommand(redisClient *c);
 static void sinterCommand(redisClient *c);
 static void sinterstoreCommand(redisClient *c);
 static void sunionCommand(redisClient *c);
@@ -417,6 +429,7 @@ static struct redisCommand cmdTable[] = {
     {"smove",smoveCommand,4,REDIS_CMD_BULK},
     {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
     {"scard",scardCommand,2,REDIS_CMD_INLINE},
+    {"spop",spopCommand,2,REDIS_CMD_INLINE},
     {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
     {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
     {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
@@ -629,32 +642,68 @@ static void dictRedisObjectDestructor(void *privdata, void *val)
     decrRefCount(val);
 }
 
-static int dictSdsKeyCompare(void *privdata, const void *key1,
+static int dictObjKeyCompare(void *privdata, const void *key1,
         const void *key2)
 {
     const robj *o1 = key1, *o2 = key2;
     return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
 }
 
-static unsigned int dictSdsHash(const void *key) {
+static unsigned int dictObjHash(const void *key) {
     const robj *o = key;
     return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
 }
 
+static int dictEncObjKeyCompare(void *privdata, const void *key1,
+        const void *key2)
+{
+    const robj *o1 = key1, *o2 = key2;
+
+    if (o1->encoding == REDIS_ENCODING_RAW &&
+        o2->encoding == REDIS_ENCODING_RAW)
+        return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
+    else {
+        robj *dec1, *dec2;
+        int cmp;
+
+        dec1 = o1->encoding != REDIS_ENCODING_RAW ?
+            getDecodedObject(o1) : (robj*)o1;
+        dec2 = o2->encoding != REDIS_ENCODING_RAW ?
+            getDecodedObject(o2) : (robj*)o2;
+        cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
+        if (dec1 != o1) decrRefCount(dec1);
+        if (dec2 != o2) decrRefCount(dec2);
+        return cmp;
+    }
+}
+
+static unsigned int dictEncObjHash(const void *key) {
+    const robj *o = key;
+
+    if (o->encoding == REDIS_ENCODING_RAW)
+        return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
+    else {
+        robj *dec = getDecodedObject(o);
+        unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
+        decrRefCount(dec);
+        return hash;
+    }
+}
+
 static dictType setDictType = {
-    dictSdsHash,               /* hash function */
+    dictEncObjHash,            /* hash function */
     NULL,                      /* key dup */
     NULL,                      /* val dup */
-    dictSdsKeyCompare,         /* key compare */
+    dictEncObjKeyCompare,      /* key compare */
     dictRedisObjectDestructor, /* key destructor */
     NULL                       /* val destructor */
 };
 
 static dictType hashDictType = {
-    dictSdsHash,                /* hash function */
+    dictObjHash,                /* hash function */
     NULL,                       /* key dup */
     NULL,                       /* val dup */
-    dictSdsKeyCompare,          /* key compare */
+    dictObjKeyCompare,          /* key compare */
     dictRedisObjectDestructor,  /* key destructor */
     dictRedisObjectDestructor   /* val destructor */
 };
@@ -691,22 +740,28 @@ static void closeTimedoutClients(void) {
     }
 }
 
+static int htNeedsResize(dict *dict) {
+    long long size, used;
+
+    size = dictSlots(dict);
+    used = dictSize(dict);
+    return (size && used && size > DICT_HT_INITIAL_SIZE &&
+            (used*100/size < REDIS_HT_MINFILL));
+}
+
 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
  * we resize the hash table to save memory */
 static void tryResizeHashTables(void) {
     int j;
 
     for (j = 0; j < server.dbnum; j++) {
-        long long size, used;
-
-        size = dictSlots(server.db[j].dict);
-        used = dictSize(server.db[j].dict);
-        if (size && used && size > REDIS_HT_MINSLOTS &&
-            (used*100/size < REDIS_HT_MINFILL)) {
-            redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
+        if (htNeedsResize(server.db[j].dict)) {
+            redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
             dictResize(server.db[j].dict);
-            redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
+            redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
         }
+        if (htNeedsResize(server.db[j].expires))
+            dictResize(server.db[j].expires);
     }
 }
 
@@ -726,8 +781,8 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
         size = dictSlots(server.db[j].dict);
         used = dictSize(server.db[j].dict);
         vkeys = dictSize(server.db[j].expires);
-        if (!(loops % 5) && used > 0) {
-            redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
+        if (!(loops % 5) && (used || vkeys)) {
+            redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
             /* dictPrintStats(server.dict); */
         }
     }
@@ -742,7 +797,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
 
     /* Show information about connected clients */
     if (!(loops % 5)) {
-        redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
+        redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
             listLength(server.clients)-listLength(server.slaves),
             listLength(server.slaves),
             server.usedmemory,
@@ -756,7 +811,6 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
     /* Check if a background saving in progress terminated */
     if (server.bgsaveinprogress) {
         int statloc;
-        /* XXX: TODO handle the case of the saving child killed */
         if (wait4(-1,&statloc,WNOHANG,NULL)) {
             int exitcode = WEXITSTATUS(statloc);
             int bysignal = WIFSIGNALED(statloc);
@@ -771,10 +825,11 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
             } else {
                 redisLog(REDIS_WARNING,
                     "Background saving terminated by signal");
+                rdbRemoveTempFile(server.bgsavechildpid);
             }
             server.bgsaveinprogress = 0;
             server.bgsavechildpid = -1;
-            updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
+            updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
         }
     } else {
         /* If there is not a background saving in progress check if
@@ -891,6 +946,7 @@ static void initServerConfig() {
     server.dbfilename = "dump.rdb";
     server.requirepass = NULL;
     server.shareobjects = 0;
+    server.sharingpoolsize = 1024;
     server.maxclients = 0;
     server.maxmemory = 0;
     ResetServerSaveParams();
@@ -921,7 +977,6 @@ static void initServer() {
     server.el = aeCreateEventLoop();
     server.db = zmalloc(sizeof(redisDb)*server.dbnum);
     server.sharingpool = dictCreate(&setDictType,NULL);
-    server.sharingpoolsize = 1024;
     if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
         oom("server initialization"); /* Fatal OOM */
     server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
@@ -968,15 +1023,20 @@ static int yesnotoi(char *s) {
 /* I agree, this is a very rudimental way to load a configuration...
    will improve later if the config gets more complex */
 static void loadServerConfig(char *filename) {
-    FILE *fp = fopen(filename,"r");
+    FILE *fp;
     char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
     int linenum = 0;
     sds line = NULL;
-    
-    if (!fp) {
-        redisLog(REDIS_WARNING,"Fatal error, can't open config file");
-        exit(1);
+
+    if (filename[0] == '-' && filename[1] == '\0')
+        fp = stdin;
+    else {
+        if ((fp = fopen(filename,"r")) == NULL) {
+            redisLog(REDIS_WARNING,"Fatal error, can't open config file");
+            exit(1);
+        }
     }
+
     while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
         sds *argv;
         int argc, j;
@@ -1030,7 +1090,7 @@ static void loadServerConfig(char *filename) {
                 goto loaderr;
             }
         } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
-            FILE *fp;
+            FILE *logfp;
 
             server.logfile = zstrdup(argv[1]);
             if (!strcasecmp(server.logfile,"stdout")) {
@@ -1040,13 +1100,13 @@ static void loadServerConfig(char *filename) {
             if (server.logfile) {
                 /* Test if we are able to open the file. The server will not
                  * be able to abort just for this problem later... */
-                fp = fopen(server.logfile,"a");
-                if (fp == NULL) {
+                logfp = fopen(server.logfile,"a");
+                if (logfp == NULL) {
                     err = sdscatprintf(sdsempty(),
                         "Can't open the log file: %s", strerror(errno));
                     goto loaderr;
                 }
-                fclose(fp);
+                fclose(logfp);
             }
         } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
             server.dbnum = atoi(argv[1]);
@@ -1092,7 +1152,7 @@ static void loadServerConfig(char *filename) {
         zfree(argv);
         sdsfree(line);
     }
-    fclose(fp);
+    if (fp != stdin) fclose(fp);
     return;
 
 loaderr:
@@ -1303,6 +1363,10 @@ static int processCommand(redisClient *c) {
         for(j = 1; j < c->argc; j++)
             c->argv[j] = tryObjectSharing(c->argv[j]);
     }
+    /* Let's try to encode the bulk object to save space. */
+    if (cmd->flags & REDIS_CMD_BULK)
+        tryObjectEncoding(c->argv[c->argc-1]);
+
     /* Check if the user is authenticated */
     if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
         addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
@@ -1430,6 +1494,7 @@ again:
         /* Read the first line of the query */
         char *p = strchr(c->querybuf,'\n');
         size_t querylen;
+
         if (p) {
             sds query, *argv;
             int argc, j;
@@ -1471,9 +1536,9 @@ again:
             /* Execute the command. If the client is still valid
              * after processCommand() return and there is something
              * on the query buffer try to process the next command. */
-            if (processCommand(c) && sdslen(c->querybuf)) goto again;
+            if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
             return;
-        } else if (sdslen(c->querybuf) >= 1024*32) {
+        } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
             redisLog(REDIS_DEBUG, "Client protocol error");
             freeClient(c);
             return;
@@ -1543,8 +1608,12 @@ static void addReply(redisClient *c, robj *obj) {
          c->replstate == REDIS_REPL_ONLINE) &&
         aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
         sendReplyToClient, c, NULL) == AE_ERR) return;
+    if (obj->encoding != REDIS_ENCODING_RAW) {
+        obj = getDecodedObject(obj);
+    } else {
+        incrRefCount(obj);
+    }
     if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
-    incrRefCount(obj);
 }
 
 static void addReplySds(redisClient *c, sds s) {
@@ -1553,6 +1622,26 @@ static void addReplySds(redisClient *c, sds s) {
     decrRefCount(o);
 }
 
+static void addReplyBulkLen(redisClient *c, robj *obj) {
+    size_t len;
+
+    if (obj->encoding == REDIS_ENCODING_RAW) {
+        len = sdslen(obj->ptr);
+    } else {
+        long n = (long)obj->ptr;
+
+        len = 1;
+        if (n < 0) {
+            len++;
+            n = -n;
+        }
+        while((n = n/10) != 0) {
+            len++;
+        }
+    }
+    addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
+}
+
 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
     int cport, cfd;
     char cip[128];
@@ -1601,6 +1690,7 @@ static robj *createObject(int type, void *ptr) {
     }
     if (!o) oom("createObject");
     o->type = type;
+    o->encoding = REDIS_ENCODING_RAW;
     o->ptr = ptr;
     o->refcount = 1;
     return o;
@@ -1625,7 +1715,9 @@ static robj *createSetObject(void) {
 }
 
 static void freeStringObject(robj *o) {
-    sdsfree(o->ptr);
+    if (o->encoding == REDIS_ENCODING_RAW) {
+        sdsfree(o->ptr);
+    }
 }
 
 static void freeListObject(robj *o) {
@@ -1669,6 +1761,36 @@ static void decrRefCount(void *obj) {
     }
 }
 
+static robj *lookupKey(redisDb *db, robj *key) {
+    dictEntry *de = dictFind(db->dict,key);
+    return de ? dictGetEntryVal(de) : NULL;
+}
+
+static robj *lookupKeyRead(redisDb *db, robj *key) {
+    expireIfNeeded(db,key);
+    return lookupKey(db,key);
+}
+
+static robj *lookupKeyWrite(redisDb *db, robj *key) {
+    deleteIfVolatile(db,key);
+    return lookupKey(db,key);
+}
+
+static int deleteKey(redisDb *db, robj *key) {
+    int retval;
+
+    /* We need to protect key from destruction: after the first dictDelete()
+     * it may happen that 'key' is no longer valid if we don't increment
+     * it's count. This may happen when we get the object reference directly
+     * from the hash table with dictRandomKey() or dict iterators */
+    incrRefCount(key);
+    if (dictSize(db->expires)) dictDelete(db->expires,key);
+    retval = dictDelete(db->dict,key);
+    decrRefCount(key);
+
+    return retval == DICT_OK;
+}
+
 /* Try to share an object against the shared objects pool */
 static robj *tryObjectSharing(robj *o) {
     struct dictEntry *de;
@@ -1714,34 +1836,54 @@ static robj *tryObjectSharing(robj *o) {
     }
 }
 
-static robj *lookupKey(redisDb *db, robj *key) {
-    dictEntry *de = dictFind(db->dict,key);
-    return de ? dictGetEntryVal(de) : NULL;
-}
+/* Try to encode a string object in order to save space */
+static int tryObjectEncoding(robj *o) {
+    long value;
+    char *endptr, buf[32];
+    sds s = o->ptr;
 
-static robj *lookupKeyRead(redisDb *db, robj *key) {
-    expireIfNeeded(db,key);
-    return lookupKey(db,key);
-}
+    if (o->encoding != REDIS_ENCODING_RAW)
+        return REDIS_ERR; /* Already encoded */
 
-static robj *lookupKeyWrite(redisDb *db, robj *key) {
-    deleteIfVolatile(db,key);
-    return lookupKey(db,key);
-}
+    /* It's not save to encode shared objects: shared objects can be shared
+     * everywhere in the "object space" of Redis. Encoded objects can only
+     * appear as "values" (and not, for instance, as keys) */
+     if (o->refcount > 1) return REDIS_ERR;
 
-static int deleteKey(redisDb *db, robj *key) {
-    int retval;
+    /* Currently we try to encode only strings */
+    assert(o->type == REDIS_STRING);
 
-    /* We need to protect key from destruction: after the first dictDelete()
-     * it may happen that 'key' is no longer valid if we don't increment
-     * it's count. This may happen when we get the object reference directly
-     * from the hash table with dictRandomKey() or dict iterators */
-    incrRefCount(key);
-    if (dictSize(db->expires)) dictDelete(db->expires,key);
-    retval = dictDelete(db->dict,key);
-    decrRefCount(key);
+    /* Check if it's possible to encode this value as a long. We are assuming
+     * that sizeof(long) = sizeof(void) in all the supported archs. */
+    value = strtol(s, &endptr, 10);
+    if (endptr[0] != '\0') return REDIS_ERR;
+    snprintf(buf,32,"%ld",value);
 
-    return retval == DICT_OK;
+    /* If the number converted back into a string is not identical
+     * then it's not possible to encode the string as integer */
+    if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return REDIS_ERR;
+
+    /* Ok, this object can be encoded */
+    o->encoding = REDIS_ENCODING_INT;
+    sdsfree(o->ptr);
+    o->ptr = (void*) value;
+    return REDIS_OK;
+}
+
+/* Get a decoded version of an encoded object (returned as a new object) */
+static robj *getDecodedObject(const robj *o) {
+    robj *dec;
+    
+    assert(o->encoding != REDIS_ENCODING_RAW);
+    if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
+        char buf[32];
+
+        snprintf(buf,32,"%ld",(long)o->ptr);
+        dec = createStringObject(buf,strlen(buf));
+        return dec;
+    } else {
+        assert(1 != 1);
+    }
 }
 
 /*============================ DB saving/loading ============================ */
@@ -1848,10 +1990,12 @@ writeerr:
 
 /* Save a string objet as [len][data] on disk. If the object is a string
  * representation of an integer value we try to safe it in a special form */
-static int rdbSaveStringObject(FILE *fp, robj *obj) {
-    size_t len = sdslen(obj->ptr);
+static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
+    size_t len;
     int enclen;
 
+    len = sdslen(obj->ptr);
+
     /* Try integer encoding */
     if (len <= 11) {
         unsigned char buf[5];
@@ -1863,7 +2007,7 @@ static int rdbSaveStringObject(FILE *fp, robj *obj) {
 
     /* Try LZF compression - under 20 bytes it's unable to compress even
      * aaaaaaaaaaaaaaaaaa so skip it */
-    if (1 && len > 20) {
+    if (len > 20) {
         int retval;
 
         retval = rdbSaveLzfStringObject(fp,obj);
@@ -1878,6 +2022,21 @@ static int rdbSaveStringObject(FILE *fp, robj *obj) {
     return 0;
 }
 
+/* Like rdbSaveStringObjectRaw() but handle encoded objects */
+static int rdbSaveStringObject(FILE *fp, robj *obj) {
+    int retval;
+    robj *dec;
+
+    if (obj->encoding != REDIS_ENCODING_RAW) {
+        dec = getDecodedObject(obj);
+        retval = rdbSaveStringObjectRaw(fp,dec);
+        decrRefCount(dec);
+        return retval;
+    } else {
+        return rdbSaveStringObjectRaw(fp,obj);
+    }
+}
+
 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
 static int rdbSave(char *filename) {
     dictIterator *di = NULL;
@@ -1887,7 +2046,7 @@ static int rdbSave(char *filename) {
     int j;
     time_t now = time(NULL);
 
-    snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
+    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
     fp = fopen(tmpfile,"w");
     if (!fp) {
         redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
@@ -2014,6 +2173,13 @@ static int rdbSaveBackground(char *filename) {
     return REDIS_OK; /* unreached */
 }
 
+static void rdbRemoveTempFile(pid_t childpid) {
+    char tmpfile[256];
+
+    snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
+    unlink(tmpfile);
+}
+
 static int rdbLoadType(FILE *fp) {
     unsigned char type;
     if (fread(&type,1,1,fp) == 0) return -1;
@@ -2188,6 +2354,7 @@ static int rdbLoad(char *filename) {
         if (type == REDIS_STRING) {
             /* Read string value */
             if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
+            tryObjectEncoding(o);
         } else if (type == REDIS_LIST || type == REDIS_SET) {
             /* Read list/set value */
             uint32_t listlen;
@@ -2200,6 +2367,7 @@ static int rdbLoad(char *filename) {
                 robj *ele;
 
                 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
+                tryObjectEncoding(ele);
                 if (type == REDIS_LIST) {
                     if (!listAddNodeTail((list*)o->ptr,ele))
                         oom("listAddNodeTail");
@@ -2253,8 +2421,7 @@ static void pingCommand(redisClient *c) {
 }
 
 static void echoCommand(redisClient *c) {
-    addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
-        (int)sdslen(c->argv[1]->ptr)));
+    addReplyBulkLen(c,c->argv[1]);
     addReply(c,c->argv[1]);
     addReply(c,shared.crlf);
 }
@@ -2299,7 +2466,7 @@ static void getCommand(redisClient *c) {
         if (o->type != REDIS_STRING) {
             addReply(c,shared.wrongtypeerr);
         } else {
-            addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
+            addReplyBulkLen(c,o);
             addReply(c,o);
             addReply(c,shared.crlf);
         }
@@ -2330,7 +2497,7 @@ static void mgetCommand(redisClient *c) {
             if (o->type != REDIS_STRING) {
                 addReply(c,shared.nullbulk);
             } else {
-                addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
+                addReplyBulkLen(c,o);
                 addReply(c,o);
                 addReply(c,shared.crlf);
             }
@@ -2352,12 +2519,18 @@ static void incrDecrCommand(redisClient *c, long long incr) {
         } else {
             char *eptr;
 
-            value = strtoll(o->ptr, &eptr, 10);
+            if (o->encoding == REDIS_ENCODING_RAW)
+                value = strtoll(o->ptr, &eptr, 10);
+            else if (o->encoding == REDIS_ENCODING_INT)
+                value = (long)o->ptr;
+            else
+                assert(1 != 1);
         }
     }
 
     value += incr;
     o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
+    tryObjectEncoding(o);
     retval = dictAdd(c->db->dict,c->argv[1],o);
     if (retval == DICT_ERR) {
         dictReplace(c->db->dict,c->argv[1],o);
@@ -2531,11 +2704,15 @@ static void bgsaveCommand(redisClient *c) {
 
 static void shutdownCommand(redisClient *c) {
     redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
+    /* Kill the saving child if there is a background saving in progress.
+       We want to avoid race conditions, for instance our saving child may
+       overwrite the synchronous saving did by SHUTDOWN. */
     if (server.bgsaveinprogress) {
         redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
-        signal(SIGCHLD, SIG_IGN);
         kill(server.bgsavechildpid,SIGKILL);
+        rdbRemoveTempFile(server.bgsavechildpid);
     }
+    /* SYNC SAVE */
     if (rdbSave(server.dbfilename) == REDIS_OK) {
         if (server.daemonize)
             unlink(server.pidfile);
@@ -2543,7 +2720,10 @@ static void shutdownCommand(redisClient *c) {
         redisLog(REDIS_WARNING,"Server exit now, bye bye...");
         exit(1);
     } else {
-        signal(SIGCHLD, SIG_DFL);
+        /* Ooops.. error saving! The best we can do is to continue operating.
+         * Note that if there was a background saving process, in the next
+         * cron() Redis will be notified that the background saving aborted,
+         * handling special stuff like slaves pending for synchronization... */
         redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit"); 
         addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
     }
@@ -2711,7 +2891,7 @@ static void lindexCommand(redisClient *c) {
                 addReply(c,shared.nullbulk);
             } else {
                 robj *ele = listNodeValue(ln);
-                addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
+                addReplyBulkLen(c,ele);
                 addReply(c,ele);
                 addReply(c,shared.crlf);
             }
@@ -2771,7 +2951,7 @@ static void popGenericCommand(redisClient *c, int where) {
                 addReply(c,shared.nullbulk);
             } else {
                 robj *ele = listNodeValue(ln);
-                addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
+                addReplyBulkLen(c,ele);
                 addReply(c,ele);
                 addReply(c,shared.crlf);
                 listDelNode(list,ln);
@@ -2827,7 +3007,7 @@ static void lrangeCommand(redisClient *c) {
             addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
             for (j = 0; j < rangelen; j++) {
                 ele = listNodeValue(ln);
-                addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
+                addReplyBulkLen(c,ele);
                 addReply(c,ele);
                 addReply(c,shared.crlf);
                 ln = ln->next;
@@ -2879,8 +3059,8 @@ static void ltrimCommand(redisClient *c) {
                 ln = listLast(list);
                 listDelNode(list,ln);
             }
-            addReply(c,shared.ok);
             server.dirty++;
+            addReply(c,shared.ok);
         }
     }
 }
@@ -2961,6 +3141,7 @@ static void sremCommand(redisClient *c) {
         }
         if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
             server.dirty++;
+            if (htNeedsResize(set->ptr)) dictResize(set->ptr);
             addReply(c,shared.cone);
         } else {
             addReply(c,shared.czero);
@@ -3040,6 +3221,34 @@ static void scardCommand(redisClient *c) {
     }
 }
 
+static void spopCommand(redisClient *c) {
+    robj *set;
+    dictEntry *de;
+
+    set = lookupKeyWrite(c->db,c->argv[1]);
+    if (set == NULL) {
+        addReply(c,shared.nullbulk);
+    } else {
+        if (set->type != REDIS_SET) {
+            addReply(c,shared.wrongtypeerr);
+            return;
+        }
+        de = dictGetRandomKey(set->ptr);
+        if (de == NULL) {
+            addReply(c,shared.nullbulk);
+        } else {
+            robj *ele = dictGetEntryKey(de);
+
+            addReplyBulkLen(c,ele);
+            addReply(c,ele);
+            addReply(c,shared.crlf);
+            dictDelete(set->ptr,ele);
+            if (htNeedsResize(set->ptr)) dictResize(set->ptr);
+            server.dirty++;
+        }
+    }
+}
+
 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
     dict **d1 = (void*) s1, **d2 = (void*) s2;
 
@@ -3111,7 +3320,7 @@ static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, r
             continue; /* at least one set does not contain the member */
         ele = dictGetEntryKey(de);
         if (!dstkey) {
-            addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
+            addReplyBulkLen(c,ele);
             addReply(c,ele);
             addReply(c,shared.crlf);
             cardinality++;
@@ -3220,8 +3429,7 @@ static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnu
             robj *ele;
 
             ele = dictGetEntryKey(de);
-            addReplySds(c,sdscatprintf(sdsempty(),
-                    "$%d\r\n",sdslen(ele->ptr)));
+            addReplyBulkLen(c,ele);
             addReply(c,ele);
             addReply(c,shared.crlf);
         }
@@ -3297,6 +3505,12 @@ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
         char buf[REDIS_SORTKEY_MAX+1];
     } keyname;
 
+    if (subst->encoding == REDIS_ENCODING_RAW)
+        incrRefCount(subst);
+    else {
+        subst = getDecodedObject(subst);
+    }
+
     spat = pattern->ptr;
     ssub = subst->ptr;
     if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
@@ -3316,6 +3530,8 @@ static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
     keyobj.type = REDIS_STRING;
     keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
 
+    decrRefCount(subst);
+
     /* printf("lookup '%s' => %p\n", keyname.buf,de); */
     return lookupKeyRead(db,&keyobj);
 }
@@ -3353,7 +3569,20 @@ static int sortCompare(const void *s1, const void *s2) {
             }
         } else {
             /* Compare elements directly */
-            cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
+            if (so1->obj->encoding == REDIS_ENCODING_RAW &&
+                so2->obj->encoding == REDIS_ENCODING_RAW) {
+                cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
+            } else {
+                robj *dec1, *dec2;
+
+                dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
+                    so1->obj : getDecodedObject(so1->obj);
+                dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
+                    so2->obj : getDecodedObject(so2->obj);
+                cmp = strcoll(dec1->ptr,dec2->ptr);
+                if (dec1 != so1->obj) decrRefCount(dec1);
+                if (dec2 != so2->obj) decrRefCount(dec2);
+            }
         }
     }
     return server.sort_desc ? -cmp : cmp;
@@ -3483,13 +3712,33 @@ static void sortCommand(redisClient *c) {
                 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
                 if (!byval || byval->type != REDIS_STRING) continue;
                 if (alpha) {
-                    vector[j].u.cmpobj = byval;
-                    incrRefCount(byval);
+                    if (byval->encoding == REDIS_ENCODING_RAW) {
+                        vector[j].u.cmpobj = byval;
+                        incrRefCount(byval);
+                    } else {
+                        vector[j].u.cmpobj = getDecodedObject(byval);
+                    }
                 } else {
-                    vector[j].u.score = strtod(byval->ptr,NULL);
+                    if (byval->encoding == REDIS_ENCODING_RAW) {
+                        vector[j].u.score = strtod(byval->ptr,NULL);
+                    } else {
+                        if (byval->encoding == REDIS_ENCODING_INT)
+                            vector[j].u.score = (long)byval->ptr;
+                        else
+                            assert(1 != 1);
+                    }
                 }
             } else {
-                if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
+                if (!alpha) {
+                    if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
+                        vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
+                    else {
+                        if (vector[j].obj->encoding == REDIS_ENCODING_INT)
+                            vector[j].u.score = (long) vector[j].obj->ptr;
+                        else
+                            assert(1 != 1);
+                    }
+                }
             }
         }
     }
@@ -3521,8 +3770,7 @@ static void sortCommand(redisClient *c) {
     for (j = start; j <= end; j++) {
         listNode *ln;
         if (!getop) {
-            addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
-                sdslen(vector[j].obj->ptr)));
+            addReplyBulkLen(c,vector[j].obj);
             addReply(c,vector[j].obj);
             addReply(c,shared.crlf);
         }
@@ -3536,8 +3784,7 @@ static void sortCommand(redisClient *c) {
                 if (!val || val->type != REDIS_STRING) {
                     addReply(c,shared.nullbulk);
                 } else {
-                    addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
-                        sdslen(val->ptr)));
+                    addReplyBulkLen(c,val);
                     addReply(c,val);
                     addReply(c,shared.crlf);
                 }
@@ -3560,6 +3807,7 @@ static void sortCommand(redisClient *c) {
 static void infoCommand(redisClient *c) {
     sds info;
     time_t uptime = time(NULL)-server.stat_starttime;
+    int j;
     
     info = sdscatprintf(sdsempty(),
         "redis_version:%s\r\n"
@@ -3600,6 +3848,16 @@ static void infoCommand(redisClient *c) {
             (int)(time(NULL)-server.master->lastinteraction)
         );
     }
+    for (j = 0; j < server.dbnum; j++) {
+        long long keys, vkeys;
+
+        keys = dictSize(server.db[j].dict);
+        vkeys = dictSize(server.db[j].expires);
+        if (keys || vkeys) {
+            info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
+                j, keys, vkeys);
+        }
+    }
     addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
     addReplySds(c,info);
     addReply(c,shared.crlf);
@@ -3689,10 +3947,12 @@ static void expireCommand(redisClient *c) {
         return;
     } else {
         time_t when = time(NULL)+seconds;
-        if (setExpire(c->db,c->argv[1],when))
+        if (setExpire(c->db,c->argv[1],when)) {
             addReply(c,shared.cone);
-        else
+            server.dirty++;
+        } else {
             addReply(c,shared.czero);
+        }
         return;
     }
 }
@@ -3886,7 +4146,13 @@ static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
     }
 }
 
-static void updateSalvesWaitingBgsave(int bgsaveerr) {
+/* This function is called at the end of every backgrond saving.
+ * The argument bgsaveerr is REDIS_OK if the background saving succeeded
+ * otherwise REDIS_ERR is passed to the function.
+ *
+ * The goal of this function is to handle slaves waiting for a successful
+ * background saving in order to perform non-blocking synchronization. */
+static void updateSlavesWaitingBgsave(int bgsaveerr) {
     listNode *ln;
     int startbgsave = 0;
 
@@ -4099,8 +4365,8 @@ static void debugCommand(redisClient *c) {
         key = dictGetEntryKey(de);
         val = dictGetEntryVal(de);
         addReplySds(c,sdscatprintf(sdsempty(),
-            "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
-                key, key->refcount, val, val->refcount));
+            "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
+                key, key->refcount, val, val->refcount, val->encoding));
     } else {
         addReplySds(c,sdsnew(
             "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
@@ -4109,6 +4375,9 @@ static void debugCommand(redisClient *c) {
 
 #ifdef HAVE_BACKTRACE
 static struct redisFunctionSym symsTable[] = {
+{"dictEncObjKeyCompare", (unsigned long)dictEncObjKeyCompare},
+{"dictEncObjHash", (unsigned long)dictEncObjHash},
+{"incrDecrCommand", (unsigned long)incrDecrCommand},
 {"freeStringObject", (unsigned long)freeStringObject},
 {"freeListObject", (unsigned long)freeListObject},
 {"freeSetObject", (unsigned long)freeSetObject},
@@ -4116,6 +4385,8 @@ static struct redisFunctionSym symsTable[] = {
 {"createObject", (unsigned long)createObject},
 {"freeClient", (unsigned long)freeClient},
 {"rdbLoad", (unsigned long)rdbLoad},
+{"rdbSaveStringObject", (unsigned long)rdbSaveStringObject},
+{"rdbSaveStringObjectRaw", (unsigned long)rdbSaveStringObjectRaw},
 {"addReply", (unsigned long)addReply},
 {"addReplySds", (unsigned long)addReplySds},
 {"incrRefCount", (unsigned long)incrRefCount},
@@ -4124,13 +4395,15 @@ static struct redisFunctionSym symsTable[] = {
 {"replicationFeedSlaves", (unsigned long)replicationFeedSlaves},
 {"syncWithMaster", (unsigned long)syncWithMaster},
 {"tryObjectSharing", (unsigned long)tryObjectSharing},
+{"tryObjectEncoding", (unsigned long)tryObjectEncoding},
+{"getDecodedObject", (unsigned long)getDecodedObject},
 {"removeExpire", (unsigned long)removeExpire},
 {"expireIfNeeded", (unsigned long)expireIfNeeded},
 {"deleteIfVolatile", (unsigned long)deleteIfVolatile},
 {"deleteKey", (unsigned long)deleteKey},
 {"getExpire", (unsigned long)getExpire},
 {"setExpire", (unsigned long)setExpire},
-{"updateSalvesWaitingBgsave", (unsigned long)updateSalvesWaitingBgsave},
+{"updateSlavesWaitingBgsave", (unsigned long)updateSlavesWaitingBgsave},
 {"freeMemoryIfNeeded", (unsigned long)freeMemoryIfNeeded},
 {"authCommand", (unsigned long)authCommand},
 {"pingCommand", (unsigned long)pingCommand},
@@ -4170,6 +4443,7 @@ static struct redisFunctionSym symsTable[] = {
 {"smoveCommand", (unsigned long)smoveCommand},
 {"sismemberCommand", (unsigned long)sismemberCommand},
 {"scardCommand", (unsigned long)scardCommand},
+{"spopCommand", (unsigned long)spopCommand},
 {"sinterCommand", (unsigned long)sinterCommand},
 {"sinterstoreCommand", (unsigned long)sinterstoreCommand},
 {"sunionCommand", (unsigned long)sunionCommand},
@@ -4192,6 +4466,7 @@ static struct redisFunctionSym symsTable[] = {
 {"processCommand", (unsigned long)processCommand},
 {"setupSigSegvAction", (unsigned long)setupSigSegvAction},
 {"readQueryFromClient", (unsigned long)readQueryFromClient},
+{"rdbRemoveTempFile", (unsigned long)rdbRemoveTempFile},
 {NULL,0}
 };
 
@@ -4224,10 +4499,20 @@ static void *getMcontextEip(ucontext_t *uc) {
     return (void*) uc->uc_mcontext.mc_eip;
 #elif defined(__dietlibc__)
     return (void*) uc->uc_mcontext.eip;
-#elif defined(__APPLE__)
+#elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
+    return (void*) uc->uc_mcontext->__ss.__eip;
+#elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
+  #ifdef _STRUCT_X86_THREAD_STATE64
+    return (void*) uc->uc_mcontext->__ss.__rip;
+  #else
     return (void*) uc->uc_mcontext->__ss.__eip;
-#else /* Linux */
+  #endif 
+#elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
     return (void*) uc->uc_mcontext.gregs[REG_EIP];
+#elif defined(__ia64__) /* Linux IA64 */
+    return (void*) uc->uc_mcontext.sc_ip;
+#else
+    return NULL;
 #endif
 }
 
@@ -4269,7 +4554,9 @@ static void segvHandler(int sig, siginfo_t *info, void *secret) {
     
     trace_size = backtrace(trace, 100);
     /* overwrite sigaction with caller's address */
-    trace[1] = getMcontextEip(uc);
+    if (getMcontextEip(uc) != NULL) {
+        trace[1] = getMcontextEip(uc);
+    }
     messages = backtrace_symbols(trace, trace_size);
 
     for (i=1; i<trace_size; ++i) {
@@ -4296,6 +4583,9 @@ static void setupSigSegvAction(void) {
     act.sa_sigaction = segvHandler;
     sigaction (SIGSEGV, &act, NULL);
     sigaction (SIGBUS, &act, NULL);
+    sigaction (SIGFPE, &act, NULL);
+    sigaction (SIGILL, &act, NULL);
+    sigaction (SIGBUS, &act, NULL);
     return;
 }
 #else /* HAVE_BACKTRACE */
@@ -4322,7 +4612,7 @@ int linuxOvercommitMemoryValue(void) {
 
 void linuxOvercommitMemoryWarning(void) {
     if (linuxOvercommitMemoryValue() == 0) {
-        redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
+        redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
     }
 }
 #endif /* __linux__ */
@@ -4352,10 +4642,6 @@ static void daemonize(void) {
 }
 
 int main(int argc, char **argv) {
-#ifdef __linux__
-    linuxOvercommitMemoryWarning();
-#endif
-    
     initServerConfig();
     if (argc == 2) {
         ResetServerSaveParams();
@@ -4369,6 +4655,9 @@ int main(int argc, char **argv) {
     initServer();
     if (server.daemonize) daemonize();
     redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
+#ifdef __linux__
+    linuxOvercommitMemoryWarning();
+#endif
     if (rdbLoad(server.dbfilename) == REDIS_OK)
         redisLog(REDIS_NOTICE,"DB loaded from disk");
     if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,