]> git.saurik.com Git - redis.git/commitdiff
merge antirez/smallkeys
authorPieter Noordhuis <pcnoordhuis@gmail.com>
Fri, 4 Jun 2010 07:54:06 +0000 (09:54 +0200)
committerPieter Noordhuis <pcnoordhuis@gmail.com>
Fri, 4 Jun 2010 08:10:50 +0000 (10:10 +0200)
1  2 
Makefile
redis.c
staticsymbols.h

diff --combined Makefile
index 718f45217114120739db3cce8712de0667eaafa1,31a763ad864ea7408ba04ae4f5509fa4b7e5604d..46df88bb26b180b6e4eedc7d11ceae2315818d6e
+++ b/Makefile
@@@ -15,7 -15,7 +15,7 @@@ endi
  CCOPT= $(CFLAGS) $(CCLINK) $(ARCH) $(PROF)
  DEBUG?= -g -rdynamic -ggdb 
  
 -OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o
 +OBJ = adlist.o ae.o anet.o dict.o redis.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o
  BENCHOBJ = ae.o anet.o redis-benchmark.o sds.o adlist.o zmalloc.o
  CLIOBJ = anet.o sds.o adlist.o redis-cli.o zmalloc.o linenoise.o
  CHECKDUMPOBJ = redis-check-dump.o lzf_c.o lzf_d.o
@@@ -53,7 -53,6 +53,7 @@@ redis.o: redis.c fmacros.h config.h red
    adlist.h zmalloc.h lzf.h pqsort.h zipmap.h staticsymbols.h sha1.h
  sds.o: sds.c sds.h zmalloc.h
  zipmap.o: zipmap.c zmalloc.h
 +ziplist.o: ziplist.c zmalloc.h
  zmalloc.o: zmalloc.c config.h
  
  redis-server: $(OBJ)
@@@ -87,7 -86,7 +87,7 @@@ staticsymbols
        tclsh utils/build-static-symbols.tcl > staticsymbols.h
  
  test:
-       tclsh8.5 tests/test_helper.tcl
+       tclsh8.5 tests/test_helper.tcl --tags "${TAGS}"
  
  bench:
        ./redis-benchmark
diff --combined redis.c
index 41bc9c9730eb4fb9738cde70c99bd1ec6dcd18e1,e67ba20d74a603cce9a30b2534df560f1369187e..16384d8723b8040189caa9b1090e1399f423a8ea
+++ b/redis.c
@@@ -75,7 -75,6 +75,7 @@@
  #include "lzf.h"    /* LZF compression library */
  #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
  #include "zipmap.h" /* Compact dictionary-alike data structure */
 +#include "ziplist.h" /* Compact list data structure */
  #include "sha1.h"   /* SHA1 is used for DEBUG DIGEST */
  #include "release.h" /* Release and/or git repository information */
  
  #define REDIS_SET 2
  #define REDIS_ZSET 3
  #define REDIS_HASH 4
+ #define REDIS_VMPOINTER 8
  
  /* Objects encoding. Some kind of objects like Strings and Hashes can be
   * internally represented in multiple ways. The 'encoding' field of the object
   * is set to one of this fields for this object. */
 -#define REDIS_ENCODING_RAW 0    /* Raw representation */
 -#define REDIS_ENCODING_INT 1    /* Encoded as integer */
 -#define REDIS_ENCODING_ZIPMAP 2 /* Encoded as zipmap */
 -#define REDIS_ENCODING_HT 3     /* Encoded as an hash table */
 +#define REDIS_ENCODING_RAW 0     /* Raw representation */
 +#define REDIS_ENCODING_INT 1     /* Encoded as integer */
 +#define REDIS_ENCODING_HT 2      /* Encoded as hash table */
 +#define REDIS_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
 +#define REDIS_ENCODING_LIST 4    /* Encoded as zipmap */
 +#define REDIS_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
  
  static char* strencoding[] = {
--    "raw", "int", "zipmap", "hashtable"
++    "raw", "int", "hashtable", "zipmap", "list", "ziplist"
  };
  
  /* Object types only used for dumping to disk */
@@@ -250,30 -248,41 +251,41 @@@ static void _redisPanic(char *msg, cha
  
  /* A redis object, that is a type able to hold a string / list / set */
  
- /* The VM object structure */
- struct redisObjectVM {
-     off_t page;         /* the page at witch the object is stored on disk */
-     off_t usedpages;    /* number of pages used on disk */
-     time_t atime;       /* Last access time */
- } vm;
  /* The actual Redis Object */
  typedef struct redisObject {
-     void *ptr;
-     unsigned char type;
-     unsigned char encoding;
-     unsigned char storage;  /* If this object is a key, where is the value?
-                              * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
-     unsigned char vtype; /* If this object is a key, and value is swapped out,
-                           * this is the type of the swapped out object. */
+     unsigned type:4;
+     unsigned storage:2;     /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */
+     unsigned encoding:4;
+     unsigned lru:22;        /* lru time (relative to server.lruclock) */
      int refcount;
+     void *ptr;
      /* VM fields, this are only allocated if VM is active, otherwise the
       * object allocation function will just allocate
       * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
       * Redis without VM active will not have any overhead. */
-     struct redisObjectVM vm;
  } robj;
  
+ /* The VM pointer structure - identifies an object in the swap file.
+  *
+  * This object is stored in place of the value
+  * object in the main key->value hash table representing a database.
+  * Note that the first fields (type, storage) are the same as the redisObject
+  * structure so that vmPointer strucuters can be accessed even when casted
+  * as redisObject structures.
+  *
+  * This is useful as we don't know if a value object is or not on disk, but we
+  * are always able to read obj->storage to check this. For vmPointer
+  * structures "type" is set to REDIS_VMPOINTER (even if without this field
+  * is still possible to check the kind of object from the value of 'storage').*/
+ typedef struct vmPointer {
+     unsigned type:4;
+     unsigned storage:2; /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
+     unsigned notused:26;
+     unsigned int vtype; /* type of the object stored in the swap file */
+     off_t page;         /* the page at witch the object is stored on disk */
+     off_t usedpages;    /* number of pages used on disk */
+ } vmpointer;
  /* Macro used to initalize a Redis object allocated on the stack.
   * Note that this macro is taken near the structure definition to make sure
   * we'll update it when the structure is changed, to avoid bugs like
      _var.type = REDIS_STRING; \
      _var.encoding = REDIS_ENCODING_RAW; \
      _var.ptr = _ptr; \
-     if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
+     _var.storage = REDIS_VM_MEMORY; \
  } while(0);
  
  typedef struct redisDb {
@@@ -372,6 -381,7 +384,7 @@@ struct redisServer 
      int daemonize;
      int appendonly;
      int appendfsync;
+     int no_appendfsync_on_rewrite;
      int shutdown_asap;
      time_t lastfsync;
      int appendfd;
      list *pubsub_patterns; /* A list of pubsub_patterns */
      /* Misc */
      FILE *devnull;
+     unsigned lruclock:22;        /* clock incrementing every minute, for LRU */
+     unsigned lruclock_padding:10;
  };
  
  typedef struct pubsubPattern {
@@@ -544,6 -556,9 +559,9 @@@ typedef struct iojob 
      int type;   /* Request type, REDIS_IOJOB_* */
      redisDb *db;/* Redis database */
      robj *key;  /* This I/O request is about swapping this key */
+     robj *id;   /* Unique identifier of this job:
+                    this is the object to swap for REDIS_IOREQ_*_SWAP, or the
+                    vmpointer objct for REDIS_IOREQ_LOAD. */
      robj *val;  /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
                   * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
      off_t page; /* Swap page where to read/write the object */
@@@ -577,8 -592,7 +595,7 @@@ static robj *getDecodedObject(robj *o)
  static int removeExpire(redisDb *db, robj *key);
  static int expireIfNeeded(redisDb *db, robj *key);
  static int deleteIfVolatile(redisDb *db, robj *key);
- static int deleteIfSwapped(redisDb *db, robj *key);
- static int deleteKey(redisDb *db, robj *key);
+ static int dbDelete(redisDb *db, robj *key);
  static time_t getExpire(redisDb *db, robj *key);
  static int setExpire(redisDb *db, robj *key, time_t when);
  static void updateSlavesWaitingBgsave(int bgsaveerr);
@@@ -600,8 -614,8 +617,8 @@@ static void unblockClientWaitingData(re
  static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
  static void vmInit(void);
  static void vmMarkPagesFree(off_t page, off_t count);
- static robj *vmLoadObject(robj *key);
- static robj *vmPreviewObject(robj *key);
+ static robj *vmLoadObject(robj *o);
+ static robj *vmPreviewObject(robj *o);
  static int vmSwapOneObjectBlocking(void);
  static int vmSwapOneObjectThreaded(void);
  static int vmCanSwapOut(void);
@@@ -637,7 -651,7 +654,7 @@@ static int compareStringObjects(robj *a
  static int equalStringObjects(robj *a, robj *b);
  static void usage();
  static int rewriteAppendOnlyFileBackground(void);
- static int vmSwapObjectBlocking(robj *key, robj *val);
+ static vmpointer *vmSwapObjectBlocking(robj *val);
  static int prepareForShutdown();
  static void touchWatchedKey(redisDb *db, robj *key);
  static void touchWatchedKeysOnFlush(int dbid);
@@@ -1110,7 -1124,7 +1127,7 @@@ static void dictListDestructor(void *pr
      listRelease((list*)val);
  }
  
- static int sdsDictKeyCompare(void *privdata, const void *key1,
+ static int dictSdsKeyCompare(void *privdata, const void *key1,
          const void *key2)
  {
      int l1,l2;
@@@ -1130,11 -1144,18 +1147,18 @@@ static void dictRedisObjectDestructor(v
      decrRefCount(val);
  }
  
+ static void dictSdsDestructor(void *privdata, void *val)
+ {
+     DICT_NOTUSED(privdata);
+     sdsfree(val);
+ }
  static int dictObjKeyCompare(void *privdata, const void *key1,
          const void *key2)
  {
      const robj *o1 = key1, *o2 = key2;
-     return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
+     return dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
  }
  
  static unsigned int dictObjHash(const void *key) {
      return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
  }
  
+ static unsigned int dictSdsHash(const void *key) {
+     return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
+ }
  static int dictEncObjKeyCompare(void *privdata, const void *key1,
          const void *key2)
  {
  
      o1 = getDecodedObject(o1);
      o2 = getDecodedObject(o2);
-     cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
+     cmp = dictSdsKeyCompare(privdata,o1->ptr,o2->ptr);
      decrRefCount(o1);
      decrRefCount(o2);
      return cmp;
@@@ -1183,7 -1208,7 +1211,7 @@@ static unsigned int dictEncObjHash(cons
      }
  }
  
- /* Sets type and expires */
+ /* Sets type */
  static dictType setDictType = {
      dictEncObjHash,            /* hash function */
      NULL,                      /* key dup */
@@@ -1203,23 -1228,23 +1231,23 @@@ static dictType zsetDictType = 
      dictVanillaFree            /* val destructor of malloc(sizeof(double)) */
  };
  
- /* Db->dict */
+ /* Db->dict, keys are sds strings, vals are Redis objects. */
  static dictType dbDictType = {
-     dictObjHash,                /* hash function */
+     dictSdsHash,                /* hash function */
      NULL,                       /* key dup */
      NULL,                       /* val dup */
-     dictObjKeyCompare,          /* key compare */
-     dictRedisObjectDestructor,  /* key destructor */
+     dictSdsKeyCompare,          /* key compare */
+     dictSdsDestructor,          /* key destructor */
      dictRedisObjectDestructor   /* val destructor */
  };
  
  /* Db->expires */
  static dictType keyptrDictType = {
-     dictObjHash,               /* hash function */
+     dictSdsHash,               /* hash function */
      NULL,                      /* key dup */
      NULL,                      /* val dup */
-     dictObjKeyCompare,         /* key compare */
-     dictRedisObjectDestructor, /* key destructor */
+     dictSdsKeyCompare,         /* key compare */
+     dictSdsDestructor,         /* key destructor */
      NULL                       /* val destructor */
  };
  
@@@ -1388,7 -1413,7 +1416,7 @@@ void backgroundRewriteDoneHandler(int s
              /* If append only is actually enabled... */
              close(server.appendfd);
              server.appendfd = fd;
-             fsync(fd);
+             if (server.appendfsync != APPENDFSYNC_NO) aof_fsync(fd);
              server.appendseldb = -1; /* Make sure it will issue SELECT */
              redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
          } else {
@@@ -1434,6 -1459,19 +1462,19 @@@ static int serverCron(struct aeEventLoo
       * in objects at every object access, and accuracy is not needed.
       * To access a global var is faster than calling time(NULL) */
      server.unixtime = time(NULL);
+     /* We have just 21 bits per object for LRU information.
+      * So we use an (eventually wrapping) LRU clock with minutes resolution.
+      *
+      * When we need to select what object to swap, we compute the minimum
+      * time distance between the current lruclock and the object last access
+      * lruclock info. Even if clocks will wrap on overflow, there is
+      * the interesting property that we are sure that at least
+      * ABS(A-B) minutes passed between current time and timestamp B.
+      *
+      * This is not precise but we don't need at all precision, but just
+      * something statistically reasonable.
+      */
+     server.lruclock = (time(NULL)/60)&((1<<21)-1);
  
      /* We received a SIGTERM, shutting down here in a safe way, as it is
       * not ok doing so inside the signal handler. */
                  if ((de = dictGetRandomKey(db->expires)) == NULL) break;
                  t = (time_t) dictGetEntryVal(de);
                  if (now > t) {
-                     deleteKey(db,dictGetEntryKey(de));
+                     sds key = dictGetEntryKey(de);
+                     robj *keyobj = createStringObject(key,sdslen(key));
+                     dbDelete(db,keyobj);
+                     decrRefCount(keyobj);
                      expired++;
                      server.stat_expiredkeys++;
                  }
@@@ -1688,6 -1730,7 +1733,7 @@@ static void initServerConfig() 
      server.daemonize = 0;
      server.appendonly = 0;
      server.appendfsync = APPENDFSYNC_EVERYSEC;
+     server.no_appendfsync_on_rewrite = 0;
      server.lastfsync = time(NULL);
      server.appendfd = -1;
      server.appendseldb = -1; /* Make sure the first time will not match */
@@@ -1944,6 -1987,11 +1990,11 @@@ static void loadServerConfig(char *file
          } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) {
              zfree(server.appendfilename);
              server.appendfilename = zstrdup(argv[1]);
+         } else if (!strcasecmp(argv[0],"no-appendfsync-on-rewrite")
+                    && argc == 2) {
+             if ((server.no_appendfsync_on_rewrite= yesnotoi(argv[1])) == -1) {
+                 err = "argument must be 'yes' or 'no'"; goto loaderr;
+             }
          } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
              if (!strcasecmp(argv[1],"no")) {
                  server.appendfsync = APPENDFSYNC_NO;
@@@ -2875,6 -2923,12 +2926,12 @@@ static void addReplyBulk(redisClient *c
      addReply(c,shared.crlf);
  }
  
+ static void addReplyBulkSds(redisClient *c, sds s) {
+     robj *o = createStringObject(s, sdslen(s));
+     addReplyBulk(c,o);
+     decrRefCount(o);
+ }
  /* In the CONFIG command we need to add vanilla C string as bulk replies */
  static void addReplyBulkCString(redisClient *c, char *s) {
      if (s == NULL) {
@@@ -2934,12 -2988,9 +2991,9 @@@ static robj *createObject(int type, voi
          listDelNode(server.objfreelist,head);
          if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex);
      } else {
-         if (server.vm_enabled) {
+         if (server.vm_enabled)
              pthread_mutex_unlock(&server.obj_freelist_mutex);
-             o = zmalloc(sizeof(*o));
-         } else {
-             o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
-         }
+         o = zmalloc(sizeof(*o));
      }
      o->type = type;
      o->encoding = REDIS_ENCODING_RAW;
      o->refcount = 1;
      if (server.vm_enabled) {
          /* Note that this code may run in the context of an I/O thread
-          * and accessing to server.unixtime in theory is an error
+          * and accessing server.lruclock in theory is an error
           * (no locks). But in practice this is safe, and even if we read
-          * garbage Redis will not fail, as it's just a statistical info */
-         o->vm.atime = server.unixtime;
+          * garbage Redis will not fail. */
+         o->lru = server.lruclock;
          o->storage = REDIS_VM_MEMORY;
      }
      return o;
@@@ -2984,17 -3035,9 +3038,17 @@@ static robj *dupStringObject(robj *o) 
  
  static robj *createListObject(void) {
      list *l = listCreate();
 -
 +    robj *o = createObject(REDIS_LIST,l);
      listSetFreeMethod(l,decrRefCount);
 -    return createObject(REDIS_LIST,l);
 +    o->encoding = REDIS_ENCODING_LIST;
 +    return o;
 +}
 +
 +static robj *createZiplistObject(void) {
 +    unsigned char *zl = ziplistNew();
 +    robj *o = createObject(REDIS_LIST,zl);
 +    o->encoding = REDIS_ENCODING_ZIPLIST;
 +    return o;
  }
  
  static robj *createSetObject(void) {
@@@ -3027,16 -3070,7 +3081,16 @@@ static void freeStringObject(robj *o) 
  }
  
  static void freeListObject(robj *o) {
 -    listRelease((list*) o->ptr);
 +    switch (o->encoding) {
 +    case REDIS_ENCODING_LIST:
 +        listRelease((list*) o->ptr);
 +        break;
 +    case REDIS_ENCODING_ZIPLIST:
 +        zfree(o->ptr);
 +        break;
 +    default:
 +        redisPanic("Unknown list encoding type");
 +    }
  }
  
  static void freeSetObject(robj *o) {
@@@ -3072,28 -3106,31 +3126,31 @@@ static void incrRefCount(robj *o) 
  static void decrRefCount(void *obj) {
      robj *o = obj;
  
-     if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
-     /* Object is a key of a swapped out value, or in the process of being
-      * loaded. */
+     /* Object is a swapped out value, or in the process of being loaded. */
      if (server.vm_enabled &&
          (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
      {
-         if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(obj);
-         redisAssert(o->type == REDIS_STRING);
-         freeStringObject(o);
-         vmMarkPagesFree(o->vm.page,o->vm.usedpages);
-         pthread_mutex_lock(&server.obj_freelist_mutex);
-         if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
-             !listAddNodeHead(server.objfreelist,o))
-             zfree(o);
-         pthread_mutex_unlock(&server.obj_freelist_mutex);
+         vmpointer *vp = obj;
+         if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(o);
+         vmMarkPagesFree(vp->page,vp->usedpages);
          server.vm_stats_swapped_objects--;
+         zfree(vp);
          return;
      }
-     /* Object is in memory, or in the process of being swapped out. */
+     if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
+     /* Object is in memory, or in the process of being swapped out.
+      *
+      * If the object is being swapped out, abort the operation on
+      * decrRefCount even if the refcount does not drop to 0: the object
+      * is referenced at least two times, as value of the key AND as
+      * job->val in the iojob. So if we don't invalidate the iojob, when it is
+      * done but the relevant key was removed in the meantime, the
+      * complete jobs handler will not find the key about the job and the
+      * assert will fail. */
+     if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING)
+         vmCancelThreadedIOJob(o);
      if (--(o->refcount) == 0) {
-         if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING)
-             vmCancelThreadedIOJob(obj);
          switch(o->type) {
          case REDIS_STRING: freeStringObject(o); break;
          case REDIS_LIST: freeListObject(o); break;
      }
  }
  
- static robj *lookupKey(redisDb *db, robj *key) {
-     dictEntry *de = dictFind(db->dict,key);
-     if (de) {
-         robj *key = dictGetEntryKey(de);
-         robj *val = dictGetEntryVal(de);
-         if (server.vm_enabled) {
-             if (key->storage == REDIS_VM_MEMORY ||
-                 key->storage == REDIS_VM_SWAPPING)
-             {
-                 /* If we were swapping the object out, stop it, this key
-                  * was requested. */
-                 if (key->storage == REDIS_VM_SWAPPING)
-                     vmCancelThreadedIOJob(key);
-                 /* Update the access time of the key for the aging algorithm. */
-                 key->vm.atime = server.unixtime;
-             } else {
-                 int notify = (key->storage == REDIS_VM_LOADING);
-                 /* Our value was swapped on disk. Bring it at home. */
-                 redisAssert(val == NULL);
-                 val = vmLoadObject(key);
-                 dictGetEntryVal(de) = val;
-                 /* Clients blocked by the VM subsystem may be waiting for
-                  * this key... */
-                 if (notify) handleClientsBlockedOnSwappedKey(db,key);
-             }
-         }
-         return val;
-     } else {
-         return NULL;
-     }
- }
- static robj *lookupKeyRead(redisDb *db, robj *key) {
-     expireIfNeeded(db,key);
-     return lookupKey(db,key);
- }
- static robj *lookupKeyWrite(redisDb *db, robj *key) {
-     deleteIfVolatile(db,key);
-     touchWatchedKey(db,key);
-     return lookupKey(db,key);
- }
- static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
-     robj *o = lookupKeyRead(c->db, key);
-     if (!o) addReply(c,reply);
-     return o;
- }
- static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
-     robj *o = lookupKeyWrite(c->db, key);
-     if (!o) addReply(c,reply);
-     return o;
- }
  static int checkType(redisClient *c, robj *o, int type) {
      if (o->type != type) {
          addReply(c,shared.wrongtypeerr);
      return 0;
  }
  
- static int deleteKey(redisDb *db, robj *key) {
-     int retval;
-     /* We need to protect key from destruction: after the first dictDelete()
-      * it may happen that 'key' is no longer valid if we don't increment
-      * it's count. This may happen when we get the object reference directly
-      * from the hash table with dictRandomKey() or dict iterators */
-     incrRefCount(key);
-     if (dictSize(db->expires)) dictDelete(db->expires,key);
-     retval = dictDelete(db->dict,key);
-     decrRefCount(key);
-     return retval == DICT_OK;
- }
  /* Check if the nul-terminated string 's' can be represented by a long
   * (that is, is a number that fits into long without any other space or
   * character before or after the digits).
@@@ -3410,6 -3374,134 +3394,134 @@@ static int getLongFromObjectOrReply(red
      return REDIS_OK;
  }
  
+ /* =========================== Keyspace access API ========================== */
+ static robj *lookupKey(redisDb *db, robj *key) {
+     dictEntry *de = dictFind(db->dict,key->ptr);
+     if (de) {
+         robj *val = dictGetEntryVal(de);
+         if (server.vm_enabled) {
+             if (val->storage == REDIS_VM_MEMORY ||
+                 val->storage == REDIS_VM_SWAPPING)
+             {
+                 /* If we were swapping the object out, cancel the operation */
+                 if (val->storage == REDIS_VM_SWAPPING)
+                     vmCancelThreadedIOJob(val);
+                 /* Update the access time for the aging algorithm. */
+                 val->lru = server.lruclock;
+             } else {
+                 int notify = (val->storage == REDIS_VM_LOADING);
+                 /* Our value was swapped on disk. Bring it at home. */
+                 redisAssert(val->type == REDIS_VMPOINTER);
+                 val = vmLoadObject(val);
+                 dictGetEntryVal(de) = val;
+                 /* Clients blocked by the VM subsystem may be waiting for
+                  * this key... */
+                 if (notify) handleClientsBlockedOnSwappedKey(db,key);
+             }
+         }
+         return val;
+     } else {
+         return NULL;
+     }
+ }
+ static robj *lookupKeyRead(redisDb *db, robj *key) {
+     expireIfNeeded(db,key);
+     return lookupKey(db,key);
+ }
+ static robj *lookupKeyWrite(redisDb *db, robj *key) {
+     deleteIfVolatile(db,key);
+     touchWatchedKey(db,key);
+     return lookupKey(db,key);
+ }
+ static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
+     robj *o = lookupKeyRead(c->db, key);
+     if (!o) addReply(c,reply);
+     return o;
+ }
+ static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
+     robj *o = lookupKeyWrite(c->db, key);
+     if (!o) addReply(c,reply);
+     return o;
+ }
+ /* Add the key to the DB. If the key already exists REDIS_ERR is returned,
+  * otherwise REDIS_OK is returned, and the caller should increment the
+  * refcount of 'val'. */
+ static int dbAdd(redisDb *db, robj *key, robj *val) {
+     /* Perform a lookup before adding the key, as we need to copy the
+      * key value. */
+     if (dictFind(db->dict, key->ptr) != NULL) {
+         return REDIS_ERR;
+     } else {
+         sds copy = sdsdup(key->ptr);
+         dictAdd(db->dict, copy, val);
+         return REDIS_OK;
+     }
+ }
+ /* If the key does not exist, this is just like dbAdd(). Otherwise
+  * the value associated to the key is replaced with the new one.
+  *
+  * On update (key already existed) 0 is returned. Otherwise 1. */
+ static int dbReplace(redisDb *db, robj *key, robj *val) {
+     if (dictFind(db->dict,key->ptr) == NULL) {
+         sds copy = sdsdup(key->ptr);
+         dictAdd(db->dict, copy, val);
+         return 1;
+     } else {
+         dictReplace(db->dict, key->ptr, val);
+         return 0;
+     }
+ }
+ static int dbExists(redisDb *db, robj *key) {
+     return dictFind(db->dict,key->ptr) != NULL;
+ }
+ /* Return a random key, in form of a Redis object.
+  * If there are no keys, NULL is returned.
+  *
+  * The function makes sure to return keys not already expired. */
+ static robj *dbRandomKey(redisDb *db) {
+     struct dictEntry *de;
+     while(1) {
+         sds key;
+         robj *keyobj;
+         de = dictGetRandomKey(db->dict);
+         if (de == NULL) return NULL;
+         key = dictGetEntryKey(de);
+         keyobj = createStringObject(key,sdslen(key));
+         if (dictFind(db->expires,key)) {
+             if (expireIfNeeded(db,keyobj)) {
+                 decrRefCount(keyobj);
+                 continue; /* search for another key. This expired. */
+             }
+         }
+         return keyobj;
+     }
+ }
+ /* Delete a key, value, and associated expiration entry if any, from the DB */
+ static int dbDelete(redisDb *db, robj *key) {
+     int retval;
+     if (dictSize(db->expires)) dictDelete(db->expires,key->ptr);
+     retval = dictDelete(db->dict,key->ptr);
+     return retval == DICT_OK;
+ }
  /*============================ RDB saving/loading =========================== */
  
  static int rdbSaveType(FILE *fp, unsigned char type) {
@@@ -3552,32 -3644,38 +3664,32 @@@ static int rdbSaveRawString(FILE *fp, u
      return 0;
  }
  
 +/* Save a long long value as either an encoded string or a string. */
 +static int rdbSaveLongLongAsStringObject(FILE *fp, long long value) {
 +    unsigned char buf[32];
 +    int enclen = rdbEncodeInteger(value,buf);
 +    if (enclen > 0) {
 +        if (fwrite(buf,enclen,1,fp) == 0) return -1;
 +    } else {
 +        /* Encode as string */
 +        enclen = ll2string((char*)buf,32,value);
 +        redisAssert(enclen < 32);
 +        if (rdbSaveLen(fp,enclen) == -1) return -1;
 +        if (fwrite(buf,enclen,1,fp) == 0) return -1;
 +    }
 +    return 0;
 +}
 +
  /* Like rdbSaveStringObjectRaw() but handle encoded objects */
  static int rdbSaveStringObject(FILE *fp, robj *obj) {
 -    int retval;
 -
      /* Avoid to decode the object, then encode it again, if the
       * object is alrady integer encoded. */
      if (obj->encoding == REDIS_ENCODING_INT) {
 -        long val = (long) obj->ptr;
 -        unsigned char buf[5];
 -        int enclen;
 -
 -        if ((enclen = rdbEncodeInteger(val,buf)) > 0) {
 -            if (fwrite(buf,enclen,1,fp) == 0) return -1;
 -            return 0;
 -        }
 -        /* otherwise... fall throught and continue with the usual
 -         * code path. */
 -    }
 -
 -    /* Avoid incr/decr ref count business when possible.
 -     * This plays well with copy-on-write given that we are probably
 -     * in a child process (BGSAVE). Also this makes sure key objects
 -     * of swapped objects are not incRefCount-ed (an assert does not allow
 -     * this in order to avoid bugs) */
 -    if (obj->encoding != REDIS_ENCODING_RAW) {
 -        obj = getDecodedObject(obj);
 -        retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
 -        decrRefCount(obj);
 +        return rdbSaveLongLongAsStringObject(fp,(long)obj->ptr);
      } else {
 -        retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
 +        redisAssert(obj->encoding == REDIS_ENCODING_RAW);
 +        return rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
      }
 -    return retval;
  }
  
  /* Save a double value. Doubles are saved as strings prefixed by an unsigned
@@@ -3630,37 -3728,16 +3742,37 @@@ static int rdbSaveObject(FILE *fp, rob
          if (rdbSaveStringObject(fp,o) == -1) return -1;
      } else if (o->type == REDIS_LIST) {
          /* Save a list value */
 -        list *list = o->ptr;
 -        listIter li;
 -        listNode *ln;
 -
 -        if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
 -        listRewind(list,&li);
 -        while((ln = listNext(&li))) {
 -            robj *eleobj = listNodeValue(ln);
 +        if (o->encoding == REDIS_ENCODING_ZIPLIST) {
 +            unsigned char *p;
 +            unsigned char *vstr;
 +            unsigned int vlen;
 +            long long vlong;
 +
 +            if (rdbSaveLen(fp,ziplistLen(o->ptr)) == -1) return -1;
 +            p = ziplistIndex(o->ptr,0);
 +            while(ziplistGet(p,&vstr,&vlen,&vlong)) {
 +                if (vstr) {
 +                    if (rdbSaveRawString(fp,vstr,vlen) == -1)
 +                        return -1;
 +                } else {
 +                    if (rdbSaveLongLongAsStringObject(fp,vlong) == -1)
 +                        return -1;
 +                }
 +                p = ziplistNext(o->ptr,p);
 +            }
 +        } else if (o->encoding == REDIS_ENCODING_LIST) {
 +            list *list = o->ptr;
 +            listIter li;
 +            listNode *ln;
  
 -            if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
 +            if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
 +            listRewind(list,&li);
 +            while((ln = listNext(&li))) {
 +                robj *eleobj = listNodeValue(ln);
 +                if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
 +            }
 +        } else {
 +            redisPanic("Unknown list encoding");
          }
      } else if (o->type == REDIS_SET) {
          /* Save a set value */
@@@ -3779,9 -3856,12 +3891,12 @@@ static int rdbSave(char *filename) 
  
          /* Iterate this DB writing every entry */
          while((de = dictNext(di)) != NULL) {
-             robj *key = dictGetEntryKey(de);
-             robj *o = dictGetEntryVal(de);
-             time_t expiretime = getExpire(db,key);
+             sds keystr = dictGetEntryKey(de);
+             robj key, *o = dictGetEntryVal(de);
+             time_t expiretime;
+             
+             initStaticStringObject(key,keystr);
+             expiretime = getExpire(db,&key);
  
              /* Save the expire time */
              if (expiretime != -1) {
              }
              /* Save the key and associated value. This requires special
               * handling if the value is swapped out. */
-             if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY ||
-                                       key->storage == REDIS_VM_SWAPPING) {
+             if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
+                                       o->storage == REDIS_VM_SWAPPING) {
                  /* Save type, key, value */
                  if (rdbSaveType(fp,o->type) == -1) goto werr;
-                 if (rdbSaveStringObject(fp,key) == -1) goto werr;
+                 if (rdbSaveStringObject(fp,&key) == -1) goto werr;
                  if (rdbSaveObject(fp,o) == -1) goto werr;
              } else {
                  /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
                  robj *po;
                  /* Get a preview of the object in memory */
-                 po = vmPreviewObject(key);
+                 po = vmPreviewObject(o);
                  /* Save type, key, value */
-                 if (rdbSaveType(fp,key->vtype) == -1) goto werr;
-                 if (rdbSaveStringObject(fp,key) == -1) goto werr;
+                 if (rdbSaveType(fp,po->type) == -1) goto werr;
+                 if (rdbSaveStringObject(fp,&key) == -1) goto werr;
                  if (rdbSaveObject(fp,po) == -1) goto werr;
                  /* Remove the loaded object from memory */
                  decrRefCount(po);
@@@ -4027,48 -4107,34 +4142,48 @@@ static int rdbLoadDoubleValue(FILE *fp
  /* Load a Redis object of the specified type from the specified file.
   * On success a newly allocated object is returned, otherwise NULL. */
  static robj *rdbLoadObject(int type, FILE *fp) {
 -    robj *o;
 +    robj *o, *ele, *dec;
 +    size_t len;
  
      redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp));
      if (type == REDIS_STRING) {
          /* Read string value */
          if ((o = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
          o = tryObjectEncoding(o);
 -    } else if (type == REDIS_LIST || type == REDIS_SET) {
 -        /* Read list/set value */
 -        uint32_t listlen;
 +    } else if (type == REDIS_LIST) {
 +        /* Read list value */
 +        if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
  
 -        if ((listlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
 -        o = (type == REDIS_LIST) ? createListObject() : createSetObject();
 +        o = createZiplistObject();
 +
 +        /* Load every single element of the list */
 +        while(len--) {
 +            if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
 +
 +            if (o->encoding == REDIS_ENCODING_ZIPLIST) {
 +                dec = getDecodedObject(ele);
 +                o->ptr = ziplistPush(o->ptr,dec->ptr,sdslen(dec->ptr),REDIS_TAIL);
 +                decrRefCount(dec);
 +                decrRefCount(ele);
 +            } else {
 +                ele = tryObjectEncoding(ele);
 +                listAddNodeTail(o->ptr,ele);
 +                incrRefCount(ele);
 +            }
 +        }
 +    } else if (type == REDIS_SET) {
 +        /* Read list/set value */
 +        if ((len = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
 +        o = createSetObject();
          /* It's faster to expand the dict to the right size asap in order
           * to avoid rehashing */
 -        if (type == REDIS_SET && listlen > DICT_HT_INITIAL_SIZE)
 -            dictExpand(o->ptr,listlen);
 +        if (len > DICT_HT_INITIAL_SIZE)
 +            dictExpand(o->ptr,len);
          /* Load every single element of the list/set */
 -        while(listlen--) {
 -            robj *ele;
 -
 +        while(len--) {
              if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
              ele = tryObjectEncoding(ele);
 -            if (type == REDIS_LIST) {
 -                listAddNodeTail((list*)o->ptr,ele);
 -            } else {
 -                dictAdd((dict*)o->ptr,ele,NULL);
 -            }
 +            dictAdd((dict*)o->ptr,ele,NULL);
          }
      } else if (type == REDIS_ZSET) {
          /* Read list/set value */
@@@ -4139,11 -4205,9 +4254,9 @@@ static int rdbLoad(char *filename) 
      uint32_t dbid;
      int type, retval, rdbver;
      int swap_all_values = 0;
-     dict *d = server.db[0].dict;
      redisDb *db = server.db+0;
      char buf[1024];
      time_t expiretime, now = time(NULL);
-     long long loadedkeys = 0;
  
      fp = fopen(filename,"r");
      if (!fp) return REDIS_ERR;
      }
      while(1) {
          robj *key, *val;
+         int force_swapout;
  
          expiretime = -1;
          /* Read type. */
                  exit(1);
              }
              db = server.db+dbid;
-             d = db->dict;
              continue;
          }
          /* Read key */
              continue;
          }
          /* Add the new object in the hash table */
-         retval = dictAdd(d,key,val);
-         if (retval == DICT_ERR) {
+         retval = dbAdd(db,key,val);
+         if (retval == REDIS_ERR) {
              redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", key->ptr);
              exit(1);
          }
-         loadedkeys++;
          /* Set the expire time if needed */
          if (expiretime != -1) setExpire(db,key,expiretime);
  
           * to random sampling, otherwise we may try to swap already
           * swapped keys. */
          if (swap_all_values) {
-             dictEntry *de = dictFind(d,key);
+             dictEntry *de = dictFind(db->dict,key->ptr);
  
              /* de may be NULL since the key already expired */
              if (de) {
-                 key = dictGetEntryKey(de);
+                 vmpointer *vp;
                  val = dictGetEntryVal(de);
  
-                 if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
-                     dictGetEntryVal(de) = NULL;
-                 }
+                 if (val->refcount == 1 &&
+                     (vp = vmSwapObjectBlocking(val)) != NULL)
+                     dictGetEntryVal(de) = vp;
              }
+             decrRefCount(key);
              continue;
          }
+         decrRefCount(key);
+         /* Flush data on disk once 32 MB of additional RAM are used... */
+         force_swapout = 0;
+         if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
+             force_swapout = 1;
  
          /* If we have still some hope of having some value fitting memory
           * then we try random sampling. */
-         if (!swap_all_values && server.vm_enabled && (loadedkeys % 5000) == 0) {
+         if (!swap_all_values && server.vm_enabled && force_swapout) {
              while (zmalloc_used_memory() > server.vm_max_memory) {
                  if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
              }
@@@ -4258,7 -4328,7 +4377,7 @@@ static int prepareForShutdown() 
      }
      if (server.appendonly) {
          /* Append only file: fsync() the AOF and exit */
-         fsync(server.appendfd);
+         aof_fsync(server.appendfd);
          if (server.vm_enabled) unlink(server.vm_swap_file);
      } else {
          /* Snapshotting. Perform a SYNC SAVE and exit */
@@@ -4317,23 -4387,16 +4436,16 @@@ static void setGenericCommand(redisClie
  
      touchWatchedKey(c->db,key);
      if (nx) deleteIfVolatile(c->db,key);
-     retval = dictAdd(c->db->dict,key,val);
-     if (retval == DICT_ERR) {
+     retval = dbAdd(c->db,key,val);
+     if (retval == REDIS_ERR) {
          if (!nx) {
-             /* If the key is about a swapped value, we want a new key object
-              * to overwrite the old. So we delete the old key in the database.
-              * This will also make sure that swap pages about the old object
-              * will be marked as free. */
-             if (server.vm_enabled && deleteIfSwapped(c->db,key))
-                 incrRefCount(key);
-             dictReplace(c->db->dict,key,val);
+             dbReplace(c->db,key,val);
              incrRefCount(val);
          } else {
              addReply(c,shared.czero);
              return;
          }
      } else {
-         incrRefCount(key);
          incrRefCount(val);
      }
      server.dirty++;
@@@ -4375,11 -4438,7 +4487,7 @@@ static void getCommand(redisClient *c) 
  
  static void getsetCommand(redisClient *c) {
      if (getGenericCommand(c) == REDIS_ERR) return;
-     if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
-         dictReplace(c->db->dict,c->argv[1],c->argv[2]);
-     } else {
-         incrRefCount(c->argv[1]);
-     }
+     dbReplace(c->db,c->argv[1],c->argv[2]);
      incrRefCount(c->argv[2]);
      server.dirty++;
      removeExpire(c->db,c->argv[1]);
@@@ -4425,17 -4484,9 +4533,9 @@@ static void msetGenericCommand(redisCli
      }
  
      for (j = 1; j < c->argc; j += 2) {
-         int retval;
          c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
-         retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
-         if (retval == DICT_ERR) {
-             dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
-             incrRefCount(c->argv[j+1]);
-         } else {
-             incrRefCount(c->argv[j]);
-             incrRefCount(c->argv[j+1]);
-         }
+         dbReplace(c->db,c->argv[j],c->argv[j+1]);
+         incrRefCount(c->argv[j+1]);
          removeExpire(c->db,c->argv[j]);
      }
      server.dirty += (c->argc-1)/2;
@@@ -4452,7 -4503,6 +4552,6 @@@ static void msetnxCommand(redisClient *
  
  static void incrDecrCommand(redisClient *c, long long incr) {
      long long value;
-     int retval;
      robj *o;
  
      o = lookupKeyWrite(c->db,c->argv[1]);
  
      value += incr;
      o = createStringObjectFromLongLong(value);
-     retval = dictAdd(c->db->dict,c->argv[1],o);
-     if (retval == DICT_ERR) {
-         dictReplace(c->db->dict,c->argv[1],o);
-         removeExpire(c->db,c->argv[1]);
-     } else {
-         incrRefCount(c->argv[1]);
-     }
+     dbReplace(c->db,c->argv[1],o);
      server.dirty++;
      addReply(c,shared.colon);
      addReply(c,o);
@@@ -4504,17 -4548,10 +4597,10 @@@ static void appendCommand(redisClient *
      o = lookupKeyWrite(c->db,c->argv[1]);
      if (o == NULL) {
          /* Create the key */
-         retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
-         incrRefCount(c->argv[1]);
+         retval = dbAdd(c->db,c->argv[1],c->argv[2]);
          incrRefCount(c->argv[2]);
          totlen = stringObjectLen(c->argv[2]);
      } else {
-         dictEntry *de;
-         de = dictFind(c->db->dict,c->argv[1]);
-         assert(de != NULL);
-         o = dictGetEntryVal(de);
          if (o->type != REDIS_STRING) {
              addReply(c,shared.wrongtypeerr);
              return;
  
              o = createStringObject(decoded->ptr, sdslen(decoded->ptr));
              decrRefCount(decoded);
-             dictReplace(c->db->dict,c->argv[1],o);
+             dbReplace(c->db,c->argv[1],o);
          }
          /* APPEND! */
          if (c->argv[2]->encoding == REDIS_ENCODING_RAW) {
@@@ -4585,7 -4622,7 +4671,7 @@@ static void delCommand(redisClient *c) 
      int deleted = 0, j;
  
      for (j = 1; j < c->argc; j++) {
-         if (deleteKey(c->db,c->argv[j])) {
+         if (dbDelete(c->db,c->argv[j])) {
              touchWatchedKey(c->db,c->argv[j]);
              server.dirty++;
              deleted++;
  
  static void existsCommand(redisClient *c) {
      expireIfNeeded(c->db,c->argv[1]);
-     if (dictFind(c->db->dict,c->argv[1])) {
+     if (dbExists(c->db,c->argv[1])) {
          addReply(c, shared.cone);
      } else {
          addReply(c, shared.czero);
@@@ -4614,27 -4651,15 +4700,15 @@@ static void selectCommand(redisClient *
  }
  
  static void randomkeyCommand(redisClient *c) {
-     dictEntry *de;
      robj *key;
  
-     while(1) {
-         de = dictGetRandomKey(c->db->dict);
-         if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
-     }
-     if (de == NULL) {
+     if ((key = dbRandomKey(c->db)) == NULL) {
          addReply(c,shared.nullbulk);
          return;
      }
  
-     key = dictGetEntryKey(de);
-     if (server.vm_enabled) {
-         key = dupStringObject(key);
-         addReplyBulk(c,key);
-         decrRefCount(key);
-     } else {
-         addReplyBulk(c,key);
-     }
+     addReplyBulk(c,key);
+     decrRefCount(key);
  }
  
  static void keysCommand(redisClient *c) {
      addReply(c,lenobj);
      decrRefCount(lenobj);
      while((de = dictNext(di)) != NULL) {
-         robj *keyobj = dictGetEntryKey(de);
+         sds key = dictGetEntryKey(de);
+         robj *keyobj;
  
-         sds key = keyobj->ptr;
          if ((pattern[0] == '*' && pattern[1] == '\0') ||
              stringmatchlen(pattern,plen,key,sdslen(key),0)) {
+             keyobj = createStringObject(key,sdslen(key));
              if (expireIfNeeded(c->db,keyobj) == 0) {
                  addReplyBulk(c,keyobj);
                  numkeys++;
              }
+             decrRefCount(keyobj);
          }
      }
      dictReleaseIterator(di);
@@@ -4740,17 -4767,15 +4816,15 @@@ static void renameGenericCommand(redisC
  
      incrRefCount(o);
      deleteIfVolatile(c->db,c->argv[2]);
-     if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
+     if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
          if (nx) {
              decrRefCount(o);
              addReply(c,shared.czero);
              return;
          }
-         dictReplace(c->db->dict,c->argv[2],o);
-     } else {
-         incrRefCount(c->argv[2]);
+         dbReplace(c->db,c->argv[2],o);
      }
-     deleteKey(c->db,c->argv[1]);
+     dbDelete(c->db,c->argv[1]);
      touchWatchedKey(c->db,c->argv[2]);
      server.dirty++;
      addReply(c,nx ? shared.cone : shared.ok);
@@@ -4795,225 -4820,38 +4869,223 @@@ static void moveCommand(redisClient *c
  
      /* Try to add the element to the target DB */
      deleteIfVolatile(dst,c->argv[1]);
-     if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
+     if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
          addReply(c,shared.czero);
          return;
      }
-     incrRefCount(c->argv[1]);
      incrRefCount(o);
  
      /* OK! key moved, free the entry in the source DB */
-     deleteKey(src,c->argv[1]);
+     dbDelete(src,c->argv[1]);
      server.dirty++;
      addReply(c,shared.cone);
  }
  
  /* =================================== Lists ================================ */
 -static void pushGenericCommand(redisClient *c, int where) {
 -    robj *lobj;
 -    list *list;
 +static void lPush(robj *subject, robj *value, int where) {
 +    if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
 +        int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
 +        value = getDecodedObject(value);
 +        subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
 +        decrRefCount(value);
 +    } else if (subject->encoding == REDIS_ENCODING_LIST) {
 +        if (where == REDIS_HEAD) {
 +            listAddNodeHead(subject->ptr,value);
 +        } else {
 +            listAddNodeTail(subject->ptr,value);
 +        }
 +        incrRefCount(value);
 +    } else {
 +        redisPanic("Unknown list encoding");
 +    }
 +}
 +
 +static robj *lPop(robj *subject, int where) {
 +    robj *value = NULL;
 +    if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
 +        unsigned char *p;
 +        unsigned char *vstr;
 +        unsigned int vlen;
 +        long long vlong;
 +        int pos = (where == REDIS_HEAD) ? 0 : -1;
 +        p = ziplistIndex(subject->ptr,pos);
 +        if (ziplistGet(p,&vstr,&vlen,&vlong)) {
 +            if (vstr) {
 +                value = createStringObject((char*)vstr,vlen);
 +            } else {
 +                value = createStringObjectFromLongLong(vlong);
 +            }
 +            /* We only need to delete an element when it exists */
 +            subject->ptr = ziplistDelete(subject->ptr,&p);
 +        }
 +    } else if (subject->encoding == REDIS_ENCODING_LIST) {
 +        list *list = subject->ptr;
 +        listNode *ln;
 +        if (where == REDIS_HEAD) {
 +            ln = listFirst(list);
 +        } else {
 +            ln = listLast(list);
 +        }
 +        if (ln != NULL) {
 +            value = listNodeValue(ln);
 +            incrRefCount(value);
 +            listDelNode(list,ln);
 +        }
 +    } else {
 +        redisPanic("Unknown list encoding");
 +    }
 +    return value;
 +}
 +
 +static unsigned long lLength(robj *subject) {
 +    if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
 +        return ziplistLen(subject->ptr);
 +    } else if (subject->encoding == REDIS_ENCODING_LIST) {
 +        return listLength((list*)subject->ptr);
 +    } else {
 +        redisPanic("Unknown list encoding");
 +    }
 +}
 +
 +/* Structure to hold set iteration abstraction. */
 +typedef struct {
 +    robj *subject;
 +    unsigned char encoding;
 +    unsigned char direction; /* Iteration direction */
 +    unsigned char *zi;
 +    listNode *ln;
 +} lIterator;
 +
 +/* Structure for an entry while iterating over a list. */
 +typedef struct {
 +    lIterator *li;
 +    unsigned char *zi;  /* Entry in ziplist */
 +    listNode *ln;       /* Entry in linked list */
 +} lEntry;
 +
 +/* Initialize an iterator at the specified index. */
 +static lIterator *lInitIterator(robj *subject, int index, unsigned char direction) {
 +    lIterator *li = zmalloc(sizeof(lIterator));
 +    li->subject = subject;
 +    li->encoding = subject->encoding;
 +    li->direction = direction;
 +    if (li->encoding == REDIS_ENCODING_ZIPLIST) {
 +        li->zi = ziplistIndex(subject->ptr,index);
 +    } else if (li->encoding == REDIS_ENCODING_LIST) {
 +        li->ln = listIndex(subject->ptr,index);
 +    } else {
 +        redisPanic("Unknown list encoding");
 +    }
 +    return li;
 +}
 +
 +/* Clean up the iterator. */
 +static void lReleaseIterator(lIterator *li) {
 +    zfree(li);
 +}
 +
 +/* Stores pointer to current the entry in the provided entry structure
 + * and advances the position of the iterator. Returns 1 when the current
 + * entry is in fact an entry, 0 otherwise. */
 +static int lNext(lIterator *li, lEntry *entry) {
 +    entry->li = li;
 +    if (li->encoding == REDIS_ENCODING_ZIPLIST) {
 +        entry->zi = li->zi;
 +        if (entry->zi != NULL) {
 +            if (li->direction == REDIS_TAIL)
 +                li->zi = ziplistNext(li->subject->ptr,li->zi);
 +            else
 +                li->zi = ziplistPrev(li->subject->ptr,li->zi);
 +            return 1;
 +        }
 +    } else if (li->encoding == REDIS_ENCODING_LIST) {
 +        entry->ln = li->ln;
 +        if (entry->ln != NULL) {
 +            if (li->direction == REDIS_TAIL)
 +                li->ln = li->ln->next;
 +            else
 +                li->ln = li->ln->prev;
 +            return 1;
 +        }
 +    } else {
 +        redisPanic("Unknown list encoding");
 +    }
 +    return 0;
 +}
 +
 +/* Return entry or NULL at the current position of the iterator. */
 +static robj *lGet(lEntry *entry) {
 +    lIterator *li = entry->li;
 +    robj *value = NULL;
 +    if (li->encoding == REDIS_ENCODING_ZIPLIST) {
 +        unsigned char *vstr;
 +        unsigned int vlen;
 +        long long vlong;
 +        redisAssert(entry->zi != NULL);
 +        if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
 +            if (vstr) {
 +                value = createStringObject((char*)vstr,vlen);
 +            } else {
 +                value = createStringObjectFromLongLong(vlong);
 +            }
 +        }
 +    } else if (li->encoding == REDIS_ENCODING_LIST) {
 +        redisAssert(entry->ln != NULL);
 +        value = listNodeValue(entry->ln);
 +        incrRefCount(value);
 +    } else {
 +        redisPanic("Unknown list encoding");
 +    }
 +    return value;
 +}
 +
 +/* Compare the given object with the entry at the current position. */
 +static int lEqual(lEntry *entry, robj *o) {
 +    lIterator *li = entry->li;
 +    if (li->encoding == REDIS_ENCODING_ZIPLIST) {
 +        redisAssert(o->encoding == REDIS_ENCODING_RAW);
 +        return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
 +    } else if (li->encoding == REDIS_ENCODING_LIST) {
 +        return equalStringObjects(o,listNodeValue(entry->ln));
 +    } else {
 +        redisPanic("Unknown list encoding");
 +    }
 +}
 +
 +/* Delete the element pointed to. */
 +static void lDelete(lEntry *entry) {
 +    lIterator *li = entry->li;
 +    if (li->encoding == REDIS_ENCODING_ZIPLIST) {
 +        unsigned char *p = entry->zi;
 +        li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
  
 -    lobj = lookupKeyWrite(c->db,c->argv[1]);
 +        /* Update position of the iterator depending on the direction */
 +        if (li->direction == REDIS_TAIL)
 +            li->zi = p;
 +        else
 +            li->zi = ziplistPrev(li->subject->ptr,p);
 +    } else if (entry->li->encoding == REDIS_ENCODING_LIST) {
 +        listNode *next;
 +        if (li->direction == REDIS_TAIL)
 +            next = entry->ln->next;
 +        else
 +            next = entry->ln->prev;
 +        listDelNode(li->subject->ptr,entry->ln);
 +        li->ln = next;
 +    } else {
 +        redisPanic("Unknown list encoding");
 +    }
 +}
 +
 +static void pushGenericCommand(redisClient *c, int where) {
 +    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
      if (lobj == NULL) {
          if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
              addReply(c,shared.cone);
              return;
          }
 -        lobj = createListObject();
 -        list = lobj->ptr;
 -        if (where == REDIS_HEAD) {
 -            listAddNodeHead(list,c->argv[2]);
 -        } else {
 -            listAddNodeTail(list,c->argv[2]);
 -        }
 -        incrRefCount(c->argv[2]);
 +        lobj = createZiplistObject();
-         dictAdd(c->db->dict,c->argv[1],lobj);
-         incrRefCount(c->argv[1]);
+         dbAdd(c->db,c->argv[1],lobj);
      } else {
          if (lobj->type != REDIS_LIST) {
              addReply(c,shared.wrongtypeerr);
              addReply(c,shared.cone);
              return;
          }
 -        list = lobj->ptr;
 -        if (where == REDIS_HEAD) {
 -            listAddNodeHead(list,c->argv[2]);
 -        } else {
 -            listAddNodeTail(list,c->argv[2]);
 -        }
 -        incrRefCount(c->argv[2]);
      }
 +    lPush(lobj,c->argv[2],where);
 +    addReplyLongLong(c,lLength(lobj));
      server.dirty++;
 -    addReplyLongLong(c,listLength(list));
  }
  
  static void lpushCommand(redisClient *c) {
@@@ -5038,93 -4882,80 +5110,93 @@@ static void rpushCommand(redisClient *c
  }
  
  static void llenCommand(redisClient *c) {
 -    robj *o;
 -    list *l;
 -
 -    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
 -        checkType(c,o,REDIS_LIST)) return;
 -
 -    l = o->ptr;
 -    addReplyUlong(c,listLength(l));
 +    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
 +    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
 +    addReplyUlong(c,lLength(o));
  }
  
  static void lindexCommand(redisClient *c) {
 -    robj *o;
 +    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
 +    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
      int index = atoi(c->argv[2]->ptr);
 -    list *list;
 -    listNode *ln;
 -
 -    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
 -        checkType(c,o,REDIS_LIST)) return;
 -    list = o->ptr;
 +    robj *value = NULL;
  
 -    ln = listIndex(list, index);
 -    if (ln == NULL) {
 -        addReply(c,shared.nullbulk);
 +    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
 +        unsigned char *p;
 +        unsigned char *vstr;
 +        unsigned int vlen;
 +        long long vlong;
 +        p = ziplistIndex(o->ptr,index);
 +        if (ziplistGet(p,&vstr,&vlen,&vlong)) {
 +            if (vstr) {
 +                value = createStringObject((char*)vstr,vlen);
 +            } else {
 +                value = createStringObjectFromLongLong(vlong);
 +            }
 +            addReplyBulk(c,value);
 +            decrRefCount(value);
 +        } else {
 +            addReply(c,shared.nullbulk);
 +        }
 +    } else if (o->encoding == REDIS_ENCODING_LIST) {
 +        listNode *ln = listIndex(o->ptr,index);
 +        if (ln != NULL) {
 +            value = listNodeValue(ln);
 +            addReplyBulk(c,value);
 +        } else {
 +            addReply(c,shared.nullbulk);
 +        }
      } else {
 -        robj *ele = listNodeValue(ln);
 -        addReplyBulk(c,ele);
 +        redisPanic("Unknown list encoding");
      }
  }
  
  static void lsetCommand(redisClient *c) {
 -    robj *o;
 +    robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
 +    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
      int index = atoi(c->argv[2]->ptr);
 -    list *list;
 -    listNode *ln;
 +    robj *value = c->argv[3];
  
 -    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL ||
 -        checkType(c,o,REDIS_LIST)) return;
 -    list = o->ptr;
 -
 -    ln = listIndex(list, index);
 -    if (ln == NULL) {
 -        addReply(c,shared.outofrangeerr);
 +    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
 +        unsigned char *p, *zl = o->ptr;
 +        p = ziplistIndex(zl,index);
 +        if (p == NULL) {
 +            addReply(c,shared.outofrangeerr);
 +        } else {
 +            o->ptr = ziplistDelete(o->ptr,&p);
 +            value = getDecodedObject(value);
 +            o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
 +            decrRefCount(value);
 +            addReply(c,shared.ok);
 +            server.dirty++;
 +        }
 +    } else if (o->encoding == REDIS_ENCODING_LIST) {
 +        listNode *ln = listIndex(o->ptr,index);
 +        if (ln == NULL) {
 +            addReply(c,shared.outofrangeerr);
 +        } else {
 +            decrRefCount((robj*)listNodeValue(ln));
 +            listNodeValue(ln) = value;
 +            incrRefCount(value);
 +            addReply(c,shared.ok);
 +            server.dirty++;
 +        }
      } else {
 -        robj *ele = listNodeValue(ln);
 -
 -        decrRefCount(ele);
 -        listNodeValue(ln) = c->argv[3];
 -        incrRefCount(c->argv[3]);
 -        addReply(c,shared.ok);
 -        server.dirty++;
 +        redisPanic("Unknown list encoding");
      }
  }
  
  static void popGenericCommand(redisClient *c, int where) {
 -    robj *o;
 -    list *list;
 -    listNode *ln;
 +    robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
 +    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
  
 -    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
 -        checkType(c,o,REDIS_LIST)) return;
 -    list = o->ptr;
 -
 -    if (where == REDIS_HEAD)
 -        ln = listFirst(list);
 -    else
 -        ln = listLast(list);
 -
 -    if (ln == NULL) {
 +    robj *value = lPop(o,where);
 +    if (value == NULL) {
          addReply(c,shared.nullbulk);
      } else {
 -        robj *ele = listNodeValue(ln);
 -        addReplyBulk(c,ele);
 -        listDelNode(list,ln);
 -        if (listLength(list) == 0) dbDelete(c->db,c->argv[1]);
 +        addReplyBulk(c,value);
 +        decrRefCount(value);
-         if (lLength(o) == 0) deleteKey(c->db,c->argv[1]);
++        if (lLength(o) == 0) dbDelete(c->db,c->argv[1]);
          server.dirty++;
      }
  }
@@@ -5138,16 -4969,19 +5210,16 @@@ static void rpopCommand(redisClient *c
  }
  
  static void lrangeCommand(redisClient *c) {
 -    robj *o;
 +    robj *o, *value;
      int start = atoi(c->argv[2]->ptr);
      int end = atoi(c->argv[3]->ptr);
      int llen;
      int rangelen, j;
 -    list *list;
 -    listNode *ln;
 -    robj *ele;
 +    lEntry entry;
  
      if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
           || checkType(c,o,REDIS_LIST)) return;
 -    list = o->ptr;
 -    llen = listLength(list);
 +    llen = lLength(o);
  
      /* convert negative indexes */
      if (start < 0) start = llen+start;
      rangelen = (end-start)+1;
  
      /* Return the result in form of a multi-bulk reply */
 -    ln = listIndex(list, start);
      addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
 +    lIterator *li = lInitIterator(o,start,REDIS_TAIL);
      for (j = 0; j < rangelen; j++) {
 -        ele = listNodeValue(ln);
 -        addReplyBulk(c,ele);
 -        ln = ln->next;
 +        redisAssert(lNext(li,&entry));
 +        value = lGet(&entry);
 +        addReplyBulk(c,value);
 +        decrRefCount(value);
      }
 +    lReleaseIterator(li);
  }
  
  static void ltrimCommand(redisClient *c) {
  
      if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
          checkType(c,o,REDIS_LIST)) return;
 -    list = o->ptr;
 -    llen = listLength(list);
 +    llen = lLength(o);
  
      /* convert negative indexes */
      if (start < 0) start = llen+start;
      }
  
      /* Remove list elements to perform the trim */
 -    for (j = 0; j < ltrim; j++) {
 -        ln = listFirst(list);
 -        listDelNode(list,ln);
 -    }
 -    for (j = 0; j < rtrim; j++) {
 -        ln = listLast(list);
 -        listDelNode(list,ln);
 +    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
 +        o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
 +        o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
 +    } else if (o->encoding == REDIS_ENCODING_LIST) {
 +        list = o->ptr;
 +        for (j = 0; j < ltrim; j++) {
 +            ln = listFirst(list);
 +            listDelNode(list,ln);
 +        }
 +        for (j = 0; j < rtrim; j++) {
 +            ln = listLast(list);
 +            listDelNode(list,ln);
 +        }
 +    } else {
 +        redisPanic("Unknown list encoding");
      }
-     if (lLength(o) == 0) deleteKey(c->db,c->argv[1]);
 -    if (listLength(list) == 0) dbDelete(c->db,c->argv[1]);
++    if (lLength(o) == 0) dbDelete(c->db,c->argv[1]);
      server.dirty++;
      addReply(c,shared.ok);
  }
  
  static void lremCommand(redisClient *c) {
 -    robj *o;
 -    list *list;
 -    listNode *ln, *next;
 +    robj *subject, *obj = c->argv[3];
      int toremove = atoi(c->argv[2]->ptr);
      int removed = 0;
 -    int fromtail = 0;
 +    lEntry entry;
  
 -    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
 -        checkType(c,o,REDIS_LIST)) return;
 -    list = o->ptr;
 +    subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
 +    if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
 +
 +    /* Make sure obj is raw when we're dealing with a ziplist */
 +    if (subject->encoding == REDIS_ENCODING_ZIPLIST)
 +        obj = getDecodedObject(obj);
  
 +    lIterator *li;
      if (toremove < 0) {
          toremove = -toremove;
 -        fromtail = 1;
 +        li = lInitIterator(subject,-1,REDIS_HEAD);
 +    } else {
 +        li = lInitIterator(subject,0,REDIS_TAIL);
      }
 -    ln = fromtail ? list->tail : list->head;
 -    while (ln) {
 -        robj *ele = listNodeValue(ln);
  
 -        next = fromtail ? ln->prev : ln->next;
 -        if (equalStringObjects(ele,c->argv[3])) {
 -            listDelNode(list,ln);
 +    while (lNext(li,&entry)) {
 +        if (lEqual(&entry,obj)) {
 +            lDelete(&entry);
              server.dirty++;
              removed++;
              if (toremove && removed == toremove) break;
          }
 -        ln = next;
      }
 -    if (listLength(list) == 0) dbDelete(c->db,c->argv[1]);
 +    lReleaseIterator(li);
 +
 +    /* Clean up raw encoded object */
 +    if (subject->encoding == REDIS_ENCODING_ZIPLIST)
 +        decrRefCount(obj);
 +
-     if (lLength(subject) == 0) deleteKey(c->db,c->argv[1]);
++    if (lLength(subject) == 0) dbDelete(c->db,c->argv[1]);
      addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
  }
  
   * as well. This command was originally proposed by Ezra Zygmuntowicz.
   */
  static void rpoplpushcommand(redisClient *c) {
 -    robj *sobj;
 -    list *srclist;
 -    listNode *ln;
 -
 +    robj *sobj, *value;
      if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
          checkType(c,sobj,REDIS_LIST)) return;
 -    srclist = sobj->ptr;
 -    ln = listLast(srclist);
  
 -    if (ln == NULL) {
 +    if (lLength(sobj) == 0) {
          addReply(c,shared.nullbulk);
      } else {
          robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
 -        robj *ele = listNodeValue(ln);
 -        list *dstlist;
 -
 -        if (dobj && dobj->type != REDIS_LIST) {
 -            addReply(c,shared.wrongtypeerr);
 -            return;
 -        }
 +        if (dobj && checkType(c,dobj,REDIS_LIST)) return;
 +        value = lPop(sobj,REDIS_TAIL);
  
          /* Add the element to the target list (unless it's directly
           * passed to some BLPOP-ing client */
 -        if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
 -            if (dobj == NULL) {
 -                /* Create the list if the key does not exist */
 -                dobj = createListObject();
 +        if (!handleClientsWaitingListPush(c,c->argv[2],value)) {
 +            /* Create the list if the key does not exist */
 +            if (!dobj) {
 +                dobj = createZiplistObject();
-                 dictAdd(c->db->dict,c->argv[2],dobj);
-                 incrRefCount(c->argv[2]);
+                 dbAdd(c->db,c->argv[2],dobj);
              }
 -            dstlist = dobj->ptr;
 -            listAddNodeHead(dstlist,ele);
 -            incrRefCount(ele);
 +            lPush(dobj,value,REDIS_HEAD);
          }
  
          /* Send the element to the client as reply as well */
 -        addReplyBulk(c,ele);
 +        addReplyBulk(c,value);
  
 -        /* Finally remove the element from the source list */
 -        listDelNode(srclist,ln);
 -        if (listLength(srclist) == 0) dbDelete(c->db,c->argv[1]);
 +        /* lPop returns an object with its refcount incremented */
 +        decrRefCount(value);
 +
 +        /* Delete the source list when it is empty */
-         if (lLength(sobj) == 0) deleteKey(c->db,c->argv[1]);
++        if (lLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
          server.dirty++;
      }
  }
@@@ -5326,8 -5154,7 +5397,7 @@@ static void saddCommand(redisClient *c
      set = lookupKeyWrite(c->db,c->argv[1]);
      if (set == NULL) {
          set = createSetObject();
-         dictAdd(c->db->dict,c->argv[1],set);
-         incrRefCount(c->argv[1]);
+         dbAdd(c->db,c->argv[1],set);
      } else {
          if (set->type != REDIS_SET) {
              addReply(c,shared.wrongtypeerr);
@@@ -5352,7 -5179,7 +5422,7 @@@ static void sremCommand(redisClient *c
      if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
          server.dirty++;
          if (htNeedsResize(set->ptr)) dictResize(set->ptr);
-         if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]);
+         if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]);
          addReply(c,shared.cone);
      } else {
          addReply(c,shared.czero);
@@@ -5383,13 -5210,12 +5453,12 @@@ static void smoveCommand(redisClient *c
          return;
      }
      if (dictSize((dict*)srcset->ptr) == 0 && srcset != dstset)
-         deleteKey(c->db,c->argv[1]);
+         dbDelete(c->db,c->argv[1]);
      server.dirty++;
      /* Add the element to the destination set */
      if (!dstset) {
          dstset = createSetObject();
-         dictAdd(c->db->dict,c->argv[2],dstset);
-         incrRefCount(c->argv[2]);
+         dbAdd(c->db,c->argv[2],dstset);
      }
      if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
          incrRefCount(c->argv[3]);
@@@ -5435,7 -5261,7 +5504,7 @@@ static void spopCommand(redisClient *c
          addReplyBulk(c,ele);
          dictDelete(set->ptr,ele);
          if (htNeedsResize(set->ptr)) dictResize(set->ptr);
-         if (dictSize((dict*)set->ptr) == 0) deleteKey(c->db,c->argv[1]);
+         if (dictSize((dict*)set->ptr) == 0) dbDelete(c->db,c->argv[1]);
          server.dirty++;
      }
  }
@@@ -5479,7 -5305,7 +5548,7 @@@ static void sinterGenericCommand(redisC
          if (!setobj) {
              zfree(dv);
              if (dstkey) {
-                 if (deleteKey(c->db,dstkey))
+                 if (dbDelete(c->db,dstkey))
                      server.dirty++;
                  addReply(c,shared.czero);
              } else {
      if (dstkey) {
          /* Store the resulting set into the target, if the intersection
           * is not an empty set. */
-         deleteKey(c->db,dstkey);
+         dbDelete(c->db,dstkey);
          if (dictSize((dict*)dstset->ptr) > 0) {
-             dictAdd(c->db->dict,dstkey,dstset);
-             incrRefCount(dstkey);
+             dbAdd(c->db,dstkey,dstset);
              addReplyLongLong(c,dictSize((dict*)dstset->ptr));
          } else {
              decrRefCount(dstset);
@@@ -5642,10 -5467,9 +5710,9 @@@ static void sunionDiffGenericCommand(re
      } else {
          /* If we have a target key where to store the resulting set
           * create this key with the result set inside */
-         deleteKey(c->db,dstkey);
+         dbDelete(c->db,dstkey);
          if (dictSize((dict*)dstset->ptr) > 0) {
-             dictAdd(c->db->dict,dstkey,dstset);
-             incrRefCount(dstkey);
+             dbAdd(c->db,dstkey,dstset);
              addReplyLongLong(c,dictSize((dict*)dstset->ptr));
          } else {
              decrRefCount(dstset);
@@@ -6001,8 -5825,7 +6068,7 @@@ static void zaddGenericCommand(redisCli
      zsetobj = lookupKeyWrite(c->db,key);
      if (zsetobj == NULL) {
          zsetobj = createZsetObject();
-         dictAdd(c->db->dict,key,zsetobj);
-         incrRefCount(key);
+         dbAdd(c->db,key,zsetobj);
      } else {
          if (zsetobj->type != REDIS_ZSET) {
              addReply(c,shared.wrongtypeerr);
@@@ -6118,7 -5941,7 +6184,7 @@@ static void zremCommand(redisClient *c
      /* Delete from the hash table */
      dictDelete(zs->dict,c->argv[2]);
      if (htNeedsResize(zs->dict)) dictResize(zs->dict);
-     if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]);
+     if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
      server.dirty++;
      addReply(c,shared.cone);
  }
@@@ -6139,7 -5962,7 +6205,7 @@@ static void zremrangebyscoreCommand(red
      zs = zsetobj->ptr;
      deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict);
      if (htNeedsResize(zs->dict)) dictResize(zs->dict);
-     if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]);
+     if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
      server.dirty += deleted;
      addReplyLongLong(c,deleted);
  }
@@@ -6177,7 -6000,7 +6243,7 @@@ static void zremrangebyrankCommand(redi
       * use 1-based rank */
      deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
      if (htNeedsResize(zs->dict)) dictResize(zs->dict);
-     if (dictSize(zs->dict) == 0) deleteKey(c->db,c->argv[1]);
+     if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
      server.dirty += deleted;
      addReplyLongLong(c, deleted);
  }
@@@ -6365,10 -6188,9 +6431,9 @@@ static void zunionInterGenericCommand(r
          redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION);
      }
  
-     deleteKey(c->db,dstkey);
+     dbDelete(c->db,dstkey);
      if (dstzset->zsl->length) {
-         dictAdd(c->db->dict,dstkey,dstobj);
-         incrRefCount(dstkey);
+         dbAdd(c->db,dstkey,dstobj);
          addReplyLongLong(c, dstzset->zsl->length);
          server.dirty++;
      } else {
@@@ -6844,8 -6666,7 +6909,7 @@@ static robj *hashLookupWriteOrCreate(re
      robj *o = lookupKeyWrite(c->db,key);
      if (o == NULL) {
          o = createHashObject();
-         dictAdd(c->db->dict,key,o);
-         incrRefCount(key);
+         dbAdd(c->db,key,o);
      } else {
          if (o->type != REDIS_HASH) {
              addReply(c,shared.wrongtypeerr);
@@@ -6969,7 -6790,7 +7033,7 @@@ static void hdelCommand(redisClient *c
          checkType(c,o,REDIS_HASH)) return;
  
      if (hashDelete(o,c->argv[2])) {
-         if (hashLength(o) == 0) deleteKey(c->db,c->argv[1]);
+         if (hashLength(o) == 0) dbDelete(c->db,c->argv[1]);
          addReply(c,shared.cone);
          server.dirty++;
      } else {
@@@ -7210,7 -7031,7 +7274,7 @@@ static int sortCompare(const void *s1, 
   * is optimized for speed and a bit less for readability */
  static void sortCommand(redisClient *c) {
      list *operations;
 -    int outputlen = 0;
 +    unsigned int outputlen = 0;
      int desc = 0, alpha = 0;
      int limit_start = 0, limit_count = -1, start, end;
      int j, dontsort = 0, vectorlen;
  
      /* Load the sorting vector with all the objects to sort */
      switch(sortval->type) {
 -    case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
 +    case REDIS_LIST: vectorlen = lLength(sortval); break;
      case REDIS_SET: vectorlen =  dictSize((dict*)sortval->ptr); break;
      case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
      default: vectorlen = 0; redisPanic("Bad SORT type"); /* Avoid GCC warning */
      j = 0;
  
      if (sortval->type == REDIS_LIST) {
 -        list *list = sortval->ptr;
 -        listNode *ln;
 -        listIter li;
 -
 -        listRewind(list,&li);
 -        while((ln = listNext(&li))) {
 -            robj *ele = ln->value;
 -            vector[j].obj = ele;
 +        lIterator *li = lInitIterator(sortval,0,REDIS_TAIL);
 +        lEntry entry;
 +        while(lNext(li,&entry)) {
 +            vector[j].obj = lGet(&entry);
              vector[j].u.score = 0;
              vector[j].u.cmpobj = NULL;
              j++;
          }
 +        lReleaseIterator(li);
      } else {
          dict *set;
          dictIterator *di;
              }
          }
      } else {
 -        robj *listObject = createListObject();
 -        list *listPtr = (list*) listObject->ptr;
 +        robj *sobj = createZiplistObject();
  
          /* STORE option specified, set the sorting result as a List object */
          for (j = start; j <= end; j++) {
              listIter li;
  
              if (!getop) {
 -                listAddNodeTail(listPtr,vector[j].obj);
 -                incrRefCount(vector[j].obj);
 -            }
 -            listRewind(operations,&li);
 -            while((ln = listNext(&li))) {
 -                redisSortOperation *sop = ln->value;
 -                robj *val = lookupKeyByPattern(c->db,sop->pattern,
 -                    vector[j].obj);
 +                lPush(sobj,vector[j].obj,REDIS_TAIL);
 +            } else {
 +                listRewind(operations,&li);
 +                while((ln = listNext(&li))) {
 +                    redisSortOperation *sop = ln->value;
 +                    robj *val = lookupKeyByPattern(c->db,sop->pattern,
 +                        vector[j].obj);
  
 -                if (sop->type == REDIS_SORT_GET) {
 -                    if (!val) {
 -                        listAddNodeTail(listPtr,createStringObject("",0));
 +                    if (sop->type == REDIS_SORT_GET) {
 +                        if (!val) val = createStringObject("",0);
 +
 +                        /* lPush does an incrRefCount, so we should take care
 +                         * care of the incremented refcount caused by either
 +                         * lookupKeyByPattern or createStringObject("",0) */
 +                        lPush(sobj,val,REDIS_TAIL);
 +                        decrRefCount(val);
                      } else {
 -                        /* We should do a incrRefCount on val because it is
 -                         * added to the list, but also a decrRefCount because
 -                         * it is returned by lookupKeyByPattern. This results
 -                         * in doing nothing at all. */
 -                        listAddNodeTail(listPtr,val);
 +                        /* always fails */
 +                        redisAssert(sop->type == REDIS_SORT_GET);
                      }
 -                } else {
 -                    redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
                  }
              }
          }
-         if (dictReplace(c->db->dict,storekey,sobj)) {
-             incrRefCount(storekey);
-         }
 -        dbReplace(c->db,storekey,listObject);
++        dbReplace(c->db,storekey,sobj);
          /* Note: we add 1 because the DB is dirty anyway since even if the
           * SORT result is empty a new key is set and maybe the old content
           * replaced. */
      }
  
      /* Cleanup */
 +    if (sortval->type == REDIS_LIST)
 +        for (j = 0; j < vectorlen; j++)
 +            decrRefCount(vector[j].obj);
      decrRefCount(sortval);
      listRelease(operations);
      for (j = 0; j < vectorlen; j++) {
@@@ -7622,7 -7443,7 +7684,7 @@@ static void monitorCommand(redisClient 
  
  /* ================================= Expire ================================= */
  static int removeExpire(redisDb *db, robj *key) {
-     if (dictDelete(db->expires,key) == DICT_OK) {
+     if (dictDelete(db->expires,key->ptr) == DICT_OK) {
          return 1;
      } else {
          return 0;
  }
  
  static int setExpire(redisDb *db, robj *key, time_t when) {
-     if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
+     sds copy = sdsdup(key->ptr);
+     if (dictAdd(db->expires,copy,(void*)when) == DICT_ERR) {
+         sdsfree(copy);
          return 0;
      } else {
-         incrRefCount(key);
          return 1;
      }
  }
@@@ -7645,7 -7467,7 +7708,7 @@@ static time_t getExpire(redisDb *db, ro
  
      /* No expire? return ASAP */
      if (dictSize(db->expires) == 0 ||
-        (de = dictFind(db->expires,key)) == NULL) return -1;
+        (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
  
      return (time_t) dictGetEntryVal(de);
  }
@@@ -7656,16 -7478,16 +7719,16 @@@ static int expireIfNeeded(redisDb *db, 
  
      /* No expire? return ASAP */
      if (dictSize(db->expires) == 0 ||
-        (de = dictFind(db->expires,key)) == NULL) return 0;
+        (de = dictFind(db->expires,key->ptr)) == NULL) return 0;
  
      /* Lookup the expire */
      when = (time_t) dictGetEntryVal(de);
      if (time(NULL) <= when) return 0;
  
      /* Delete the key */
-     dictDelete(db->expires,key);
+     dbDelete(db,key);
      server.stat_expiredkeys++;
-     return dictDelete(db->dict,key) == DICT_OK;
+     return 1;
  }
  
  static int deleteIfVolatile(redisDb *db, robj *key) {
  
      /* No expire? return ASAP */
      if (dictSize(db->expires) == 0 ||
-        (de = dictFind(db->expires,key)) == NULL) return 0;
+        (de = dictFind(db->expires,key->ptr)) == NULL) return 0;
  
      /* Delete the key */
      server.dirty++;
      server.stat_expiredkeys++;
-     dictDelete(db->expires,key);
-     return dictDelete(db->dict,key) == DICT_OK;
+     dictDelete(db->expires,key->ptr);
+     return dictDelete(db->dict,key->ptr) == DICT_OK;
  }
  
  static void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
  
      seconds -= offset;
  
-     de = dictFind(c->db->dict,key);
+     de = dictFind(c->db->dict,key->ptr);
      if (de == NULL) {
          addReply(c,shared.czero);
          return;
      }
      if (seconds <= 0) {
-         if (deleteKey(c->db,key)) server.dirty++;
+         if (dbDelete(c->db,key)) server.dirty++;
          addReply(c, shared.cone);
          return;
      } else {
@@@ -8465,7 -8287,7 +8528,7 @@@ static void freeMemoryIfNeeded(void) 
                          minttl = t;
                      }
                  }
-                 deleteKey(server.db+j,minkey);
+                 dbDelete(server.db+j,minkey);
              }
          }
          if (!freed) return; /* nothing to free... */
  
  /* ============================== Append Only file ========================== */
  
+ /* Called when the user switches from "appendonly yes" to "appendonly no"
+  * at runtime using the CONFIG command. */
+ static void stopAppendOnly(void) {
+     flushAppendOnlyFile();
+     aof_fsync(server.appendfd);
+     close(server.appendfd);
+     server.appendfd = -1;
+     server.appendseldb = -1;
+     server.appendonly = 0;
+     /* rewrite operation in progress? kill it, wait child exit */
+     if (server.bgsavechildpid != -1) {
+         int statloc;
+         if (kill(server.bgsavechildpid,SIGKILL) != -1)
+             wait3(&statloc,0,NULL);
+         /* reset the buffer accumulating changes while the child saves */
+         sdsfree(server.bgrewritebuf);
+         server.bgrewritebuf = sdsempty();
+         server.bgsavechildpid = -1;
+     }
+ }
+ /* Called when the user switches from "appendonly no" to "appendonly yes"
+  * at runtime using the CONFIG command. */
+ static int startAppendOnly(void) {
+     server.appendonly = 1;
+     server.lastfsync = time(NULL);
+     server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
+     if (server.appendfd == -1) {
+         redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno));
+         return REDIS_ERR;
+     }
+     if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {
+         server.appendonly = 0;
+         close(server.appendfd);
+         redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno));
+         return REDIS_ERR;
+     }
+     return REDIS_OK;
+ }
  /* Write the append only file buffer on disk.
   *
   * Since we are required to write the AOF before replying to the client,
@@@ -8507,6 -8371,11 +8612,11 @@@ static void flushAppendOnlyFile(void) 
      sdsfree(server.aofbuf);
      server.aofbuf = sdsempty();
  
+     /* Don't Fsync if no-appendfsync-on-rewrite is set to yes and we have
+      * childs performing heavy I/O on disk. */
+     if (server.no_appendfsync_on_rewrite &&
+         (server.bgrewritechildpid != -1 || server.bgsavechildpid != -1))
+             return;
      /* Fsync if needed */
      now = time(NULL);
      if (server.appendfsync == APPENDFSYNC_ALWAYS ||
@@@ -8633,7 -8502,6 +8743,6 @@@ int loadAppendOnlyFile(char *filename) 
      struct redisClient *fakeClient;
      FILE *fp = fopen(filename,"r");
      struct redis_stat sb;
-     unsigned long long loadedkeys = 0;
      int appendonly = server.appendonly;
  
      if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
          char buf[128];
          sds argsds;
          struct redisCommand *cmd;
+         int force_swapout;
  
          if (fgets(buf,sizeof(buf),fp) == NULL) {
              if (feof(fp))
          for (j = 0; j < argc; j++) decrRefCount(argv[j]);
          zfree(argv);
          /* Handle swapping while loading big datasets when VM is on */
-         loadedkeys++;
-         if (server.vm_enabled && (loadedkeys % 5000) == 0) {
+         force_swapout = 0;
+         if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
+             force_swapout = 1;
+         if (server.vm_enabled && force_swapout) {
              while (zmalloc_used_memory() > server.vm_max_memory) {
                  if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
              }
@@@ -8725,17 -8597,40 +8838,17 @@@ fmterr
      exit(1);
  }
  
 -/* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
 -static int fwriteBulkObject(FILE *fp, robj *obj) {
 -    char buf[128];
 -    int decrrc = 0;
 -
 -    /* Avoid the incr/decr ref count business if possible to help
 -     * copy-on-write (we are often in a child process when this function
 -     * is called).
 -     * Also makes sure that key objects don't get incrRefCount-ed when VM
 -     * is enabled */
 -    if (obj->encoding != REDIS_ENCODING_RAW) {
 -        obj = getDecodedObject(obj);
 -        decrrc = 1;
 -    }
 -    snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
 -    if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
 -    if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
 -        goto err;
 -    if (fwrite("\r\n",2,1,fp) == 0) goto err;
 -    if (decrrc) decrRefCount(obj);
 -    return 1;
 -err:
 -    if (decrrc) decrRefCount(obj);
 -    return 0;
 -}
 -
  /* Write binary-safe string into a file in the bulkformat
   * $<count>\r\n<payload>\r\n */
  static int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
 -    char buf[128];
 -
 -    snprintf(buf,sizeof(buf),"$%ld\r\n",(unsigned long)len);
 -    if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
 -    if (len && fwrite(s,len,1,fp) == 0) return 0;
 +    char cbuf[128];
 +    int clen;
 +    cbuf[0] = '$';
 +    clen = 1+ll2string(cbuf+1,sizeof(cbuf)-1,len);
 +    cbuf[clen++] = '\r';
 +    cbuf[clen++] = '\n';
 +    if (fwrite(cbuf,clen,1,fp) == 0) return 0;
 +    if (len > 0 && fwrite(s,len,1,fp) == 0) return 0;
      if (fwrite("\r\n",2,1,fp) == 0) return 0;
      return 1;
  }
@@@ -8752,28 -8647,16 +8865,28 @@@ static int fwriteBulkDouble(FILE *fp, d
  }
  
  /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
 -static int fwriteBulkLong(FILE *fp, long l) {
 -    char buf[128], lbuf[128];
 -
 -    snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
 -    snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
 -    if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
 -    if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
 +static int fwriteBulkLongLong(FILE *fp, long long l) {
 +    char bbuf[128], lbuf[128];
 +    unsigned int blen, llen;
 +    llen = ll2string(lbuf,32,l);
 +    blen = snprintf(bbuf,sizeof(bbuf),"$%u\r\n%s\r\n",llen,lbuf);
 +    if (fwrite(bbuf,blen,1,fp) == 0) return 0;
      return 1;
  }
  
 +/* Delegate writing an object to writing a bulk string or bulk long long. */
 +static int fwriteBulkObject(FILE *fp, robj *obj) {
 +    /* Avoid using getDecodedObject to help copy-on-write (we are often
 +     * in a child process when this function is called). */
 +    if (obj->encoding == REDIS_ENCODING_INT) {
 +        return fwriteBulkLongLong(fp,(long)obj->ptr);
 +    } else if (obj->encoding == REDIS_ENCODING_RAW) {
 +        return fwriteBulkString(fp,obj->ptr,sdslen(obj->ptr));
 +    } else {
 +        redisPanic("Unknown string encoding");
 +    }
 +}
 +
  /* Write a sequence of commands able to fully rebuild the dataset into
   * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
  static int rewriteAppendOnlyFile(char *filename) {
  
          /* SELECT the new DB */
          if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
 -        if (fwriteBulkLong(fp,j) == 0) goto werr;
 +        if (fwriteBulkLongLong(fp,j) == 0) goto werr;
  
          /* Iterate this DB writing every entry */
          while((de = dictNext(di)) != NULL) {
-             robj *key, *o;
+             sds keystr = dictGetEntryKey(de);
+             robj key, *o;
              time_t expiretime;
              int swapped;
  
-             key = dictGetEntryKey(de);
+             keystr = dictGetEntryKey(de);
+             o = dictGetEntryVal(de);
+             initStaticStringObject(key,keystr);
              /* If the value for this key is swapped, load a preview in memory.
               * We use a "swapped" flag to remember if we need to free the
               * value object instead to just increment the ref count anyway
               * in order to avoid copy-on-write of pages if we are forked() */
-             if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY ||
-                 key->storage == REDIS_VM_SWAPPING) {
-                 o = dictGetEntryVal(de);
+             if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
+                 o->storage == REDIS_VM_SWAPPING) {
                  swapped = 0;
              } else {
-                 o = vmPreviewObject(key);
+                 o = vmPreviewObject(o);
                  swapped = 1;
              }
-             expiretime = getExpire(db,key);
+             expiretime = getExpire(db,&key);
  
              /* Save the key and associated value */
              if (o->type == REDIS_STRING) {
                  char cmd[]="*3\r\n$3\r\nSET\r\n";
                  if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
                  /* Key and value */
-                 if (fwriteBulkObject(fp,key) == 0) goto werr;
+                 if (fwriteBulkObject(fp,&key) == 0) goto werr;
                  if (fwriteBulkObject(fp,o) == 0) goto werr;
              } else if (o->type == REDIS_LIST) {
                  /* Emit the RPUSHes needed to rebuild the list */
 -                list *list = o->ptr;
 -                listNode *ln;
 -                listIter li;
 +                char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
 +                if (o->encoding == REDIS_ENCODING_ZIPLIST) {
 +                    unsigned char *zl = o->ptr;
 +                    unsigned char *p = ziplistIndex(zl,0);
 +                    unsigned char *vstr;
 +                    unsigned int vlen;
 +                    long long vlong;
 +
 +                    while(ziplistGet(p,&vstr,&vlen,&vlong)) {
 +                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,key) == 0) goto werr;
++                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
 +                        if (vstr) {
 +                            if (fwriteBulkString(fp,(char*)vstr,vlen) == 0)
 +                                goto werr;
 +                        } else {
 +                            if (fwriteBulkLongLong(fp,vlong) == 0)
 +                                goto werr;
 +                        }
 +                        p = ziplistNext(zl,p);
 +                    }
 +                } else if (o->encoding == REDIS_ENCODING_LIST) {
 +                    list *list = o->ptr;
 +                    listNode *ln;
 +                    listIter li;
  
 -                listRewind(list,&li);
 -                while((ln = listNext(&li))) {
 -                    char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
 -                    robj *eleobj = listNodeValue(ln);
 +                    listRewind(list,&li);
 +                    while((ln = listNext(&li))) {
 +                        robj *eleobj = listNodeValue(ln);
  
 -                    if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
 -                    if (fwriteBulkObject(fp,&key) == 0) goto werr;
 -                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
 +                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,key) == 0) goto werr;
++                        if (fwriteBulkObject(fp,&key) == 0) goto werr;
 +                        if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
 +                    }
 +                } else {
 +                    redisPanic("Unknown list encoding");
                  }
              } else if (o->type == REDIS_SET) {
                  /* Emit the SADDs needed to rebuild the set */
                      robj *eleobj = dictGetEntryKey(de);
  
                      if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                     if (fwriteBulkObject(fp,key) == 0) goto werr;
+                     if (fwriteBulkObject(fp,&key) == 0) goto werr;
                      if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
                  }
                  dictReleaseIterator(di);
                      double *score = dictGetEntryVal(de);
  
                      if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                     if (fwriteBulkObject(fp,key) == 0) goto werr;
+                     if (fwriteBulkObject(fp,&key) == 0) goto werr;
                      if (fwriteBulkDouble(fp,*score) == 0) goto werr;
                      if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
                  }
  
                      while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
                          if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,key) == 0) goto werr;
+                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
                          if (fwriteBulkString(fp,(char*)field,flen) == -1)
                              return -1;
                          if (fwriteBulkString(fp,(char*)val,vlen) == -1)
                          robj *val = dictGetEntryVal(de);
  
                          if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                         if (fwriteBulkObject(fp,key) == 0) goto werr;
+                         if (fwriteBulkObject(fp,&key) == 0) goto werr;
                          if (fwriteBulkObject(fp,field) == -1) return -1;
                          if (fwriteBulkObject(fp,val) == -1) return -1;
                      }
                  /* If this key is already expired skip it */
                  if (expiretime < now) continue;
                  if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                 if (fwriteBulkObject(fp,key) == 0) goto werr;
+                 if (fwriteBulkObject(fp,&key) == 0) goto werr;
 -                if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
 +                if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
              }
              if (swapped) decrRefCount(o);
          }
  
      /* Make sure data will not remain on the OS's output buffers */
      fflush(fp);
-     fsync(fileno(fp));
+     aof_fsync(fileno(fp));
      fclose(fp);
  
      /* Use RENAME to make sure the DB file is changed atomically only
@@@ -9070,50 -8932,19 +9185,19 @@@ static void aofRemoveTempFile(pid_t chi
   * as a fully non-blocking VM.
   */
  
- /* Called when the user switches from "appendonly yes" to "appendonly no"
-  * at runtime using the CONFIG command. */
- static void stopAppendOnly(void) {
-     flushAppendOnlyFile();
-     fsync(server.appendfd);
-     close(server.appendfd);
-     server.appendfd = -1;
-     server.appendseldb = -1;
-     server.appendonly = 0;
-     /* rewrite operation in progress? kill it, wait child exit */
-     if (server.bgsavechildpid != -1) {
-         int statloc;
+ /* =================== Virtual Memory - Blocking Side  ====================== */
  
-         if (kill(server.bgsavechildpid,SIGKILL) != -1)
-             wait3(&statloc,0,NULL);
-         /* reset the buffer accumulating changes while the child saves */
-         sdsfree(server.bgrewritebuf);
-         server.bgrewritebuf = sdsempty();
-         server.bgsavechildpid = -1;
-     }
- }
+ /* Create a VM pointer object. This kind of objects are used in place of
+  * values in the key -> value hash table, for swapped out objects. */
+ static vmpointer *createVmPointer(int vtype) {
+     vmpointer *vp = zmalloc(sizeof(vmpointer));
  
- /* Called when the user switches from "appendonly no" to "appendonly yes"
-  * at runtime using the CONFIG command. */
- static int startAppendOnly(void) {
-     server.appendonly = 1;
-     server.lastfsync = time(NULL);
-     server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
-     if (server.appendfd == -1) {
-         redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno));
-         return REDIS_ERR;
-     }
-     if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {
-         server.appendonly = 0;
-         close(server.appendfd);
-         redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno));
-         return REDIS_ERR;
-     }
-     return REDIS_OK;
+     vp->type = REDIS_VMPOINTER;
+     vp->storage = REDIS_VM_SWAPPED;
+     vp->vtype = vtype;
+     return vp;
  }
  
- /* =================== Virtual Memory - Blocking Side  ====================== */
  static void vmInit(void) {
      off_t totsize;
      int pipefds[2];
@@@ -9328,30 -9159,33 +9412,33 @@@ static int vmWriteObjectOnSwap(robj *o
      return REDIS_OK;
  }
  
- /* Swap the 'val' object relative to 'key' into disk. Store all the information
-  * needed to later retrieve the object into the key object.
+ /* Transfers the 'val' object to disk. Store all the information
+  * a 'vmpointer' object containing all the information needed to load the
+  * object back later is returned.
+  *
   * If we can't find enough contiguous empty pages to swap the object on disk
-  * REDIS_ERR is returned. */
- static int vmSwapObjectBlocking(robj *key, robj *val) {
+  * NULL is returned. */
+ static vmpointer *vmSwapObjectBlocking(robj *val) {
      off_t pages = rdbSavedObjectPages(val,NULL);
      off_t page;
+     vmpointer *vp;
  
-     assert(key->storage == REDIS_VM_MEMORY);
-     assert(key->refcount == 1);
-     if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR;
-     if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return REDIS_ERR;
-     key->vm.page = page;
-     key->vm.usedpages = pages;
-     key->storage = REDIS_VM_SWAPPED;
-     key->vtype = val->type;
+     assert(val->storage == REDIS_VM_MEMORY);
+     assert(val->refcount == 1);
+     if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return NULL;
+     if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return NULL;
+     vp = createVmPointer(val->type);
+     vp->page = page;
+     vp->usedpages = pages;
      decrRefCount(val); /* Deallocate the object from memory. */
      vmMarkPagesUsed(page,pages);
-     redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)",
-         (unsigned char*) key->ptr,
+     redisLog(REDIS_DEBUG,"VM: object %p swapped out at %lld (%lld pages)",
+         (void*) val,
          (unsigned long long) page, (unsigned long long) pages);
      server.vm_stats_swapped_objects++;
      server.vm_stats_swapouts++;
-     return REDIS_OK;
+     return vp;
  }
  
  static robj *vmReadObjectFromSwap(off_t page, int type) {
      return o;
  }
  
- /* Load the value object relative to the 'key' object from swap to memory.
+ /* Load the specified object from swap to memory.
   * The newly allocated object is returned.
   *
   * If preview is true the unserialized object is returned to the caller but
-  * no changes are made to the key object, nor the pages are marked as freed */
- static robj *vmGenericLoadObject(robj *key, int preview) {
+  * the pages are not marked as freed, nor the vp object is freed. */
+ static robj *vmGenericLoadObject(vmpointer *vp, int preview) {
      robj *val;
  
-     redisAssert(key->storage == REDIS_VM_SWAPPED || key->storage == REDIS_VM_LOADING);
-     val = vmReadObjectFromSwap(key->vm.page,key->vtype);
+     redisAssert(vp->type == REDIS_VMPOINTER &&
+         (vp->storage == REDIS_VM_SWAPPED || vp->storage == REDIS_VM_LOADING));
+     val = vmReadObjectFromSwap(vp->page,vp->vtype);
      if (!preview) {
-         key->storage = REDIS_VM_MEMORY;
-         key->vm.atime = server.unixtime;
-         vmMarkPagesFree(key->vm.page,key->vm.usedpages);
-         redisLog(REDIS_DEBUG, "VM: object %s loaded from disk",
-             (unsigned char*) key->ptr);
+         redisLog(REDIS_DEBUG, "VM: object %p loaded from disk", (void*)vp);
+         vmMarkPagesFree(vp->page,vp->usedpages);
+         zfree(vp);
          server.vm_stats_swapped_objects--;
      } else {
-         redisLog(REDIS_DEBUG, "VM: object %s previewed from disk",
-             (unsigned char*) key->ptr);
+         redisLog(REDIS_DEBUG, "VM: object %p previewed from disk", (void*)vp);
      }
      server.vm_stats_swapins++;
      return val;
  }
  
- /* Plain object loading, from swap to memory */
- static robj *vmLoadObject(robj *key) {
+ /* Plain object loading, from swap to memory.
+  *
+  * 'o' is actually a redisVmPointer structure that will be freed by the call.
+  * The return value is the loaded object. */
+ static robj *vmLoadObject(robj *o) {
      /* If we are loading the object in background, stop it, we
       * need to load this object synchronously ASAP. */
-     if (key->storage == REDIS_VM_LOADING)
-         vmCancelThreadedIOJob(key);
-     return vmGenericLoadObject(key,0);
+     if (o->storage == REDIS_VM_LOADING)
+         vmCancelThreadedIOJob(o);
+     return vmGenericLoadObject((vmpointer*)o,0);
  }
  
  /* Just load the value on disk, without to modify the key.
   * This is useful when we want to perform some operation on the value
   * without to really bring it from swap to memory, like while saving the
   * dataset or rewriting the append only log. */
- static robj *vmPreviewObject(robj *key) {
-     return vmGenericLoadObject(key,1);
+ static robj *vmPreviewObject(robj *o) {
+     return vmGenericLoadObject((vmpointer*)o,1);
  }
  
  /* How a good candidate is this object for swapping?
   * proportionally, this is why we use the logarithm. This algorithm is
   * just a first try and will probably be tuned later. */
  static double computeObjectSwappability(robj *o) {
-     time_t age = server.unixtime - o->vm.atime;
+     /* actual age can be >= minage, but not < minage. As we use wrapping
+      * 21 bit clocks with minutes resolution for the LRU. */
+     time_t minage = abs(server.lruclock - o->lru);
      long asize = 0;
      list *l;
      dict *d;
      struct dictEntry *de;
      int z;
  
-     if (age <= 0) return 0;
+     if (minage <= 0) return 0;
      switch(o->type) {
      case REDIS_STRING:
          if (o->encoding != REDIS_ENCODING_RAW) {
              long elesize;
  
              elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
-                             (sizeof(*o)+sdslen(ele->ptr)) :
-                             sizeof(*o);
+                             (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
              asize += (sizeof(listNode)+elesize)*listLength(l);
          }
          break;
              de = dictGetRandomKey(d);
              ele = dictGetEntryKey(de);
              elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
-                             (sizeof(*o)+sdslen(ele->ptr)) :
-                             sizeof(*o);
+                             (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
              asize += (sizeof(struct dictEntry)+elesize)*dictSize(d);
              if (z) asize += sizeof(zskiplistNode)*dictSize(d);
          }
                  de = dictGetRandomKey(d);
                  ele = dictGetEntryKey(de);
                  elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
-                                 (sizeof(*o)+sdslen(ele->ptr)) :
-                                 sizeof(*o);
+                                 (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
                  ele = dictGetEntryVal(de);
                  elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
-                                 (sizeof(*o)+sdslen(ele->ptr)) :
-                                 sizeof(*o);
+                                 (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
                  asize += (sizeof(struct dictEntry)+elesize)*dictSize(d);
              }
          }
          break;
      }
-     return (double)age*log(1+asize);
+     return (double)minage*log(1+asize);
  }
  
  /* Try to swap an object that's a good candidate for swapping.
@@@ -9525,7 -9358,8 +9611,8 @@@ static int vmSwapOneObject(int usethrea
      struct dictEntry *best = NULL;
      double best_swappability = 0;
      redisDb *best_db = NULL;
-     robj *key, *val;
+     robj *val;
+     sds key;
  
      for (j = 0; j < server.dbnum; j++) {
          redisDb *db = server.db+j;
  
              if (maxtries) maxtries--;
              de = dictGetRandomKey(db->dict);
-             key = dictGetEntryKey(de);
              val = dictGetEntryVal(de);
              /* Only swap objects that are currently in memory.
               *
-              * Also don't swap shared objects if threaded VM is on, as we
-              * try to ensure that the main thread does not touch the
+              * Also don't swap shared objects: not a good idea in general and
+              * we need to ensure that the main thread does not touch the
               * object while the I/O thread is using it, but we can't
               * control other keys without adding additional mutex. */
-             if (key->storage != REDIS_VM_MEMORY ||
-                 (server.vm_max_threads != 0 && val->refcount != 1)) {
+             if (val->storage != REDIS_VM_MEMORY || val->refcount != 1) {
                  if (maxtries) i--; /* don't count this try */
                  continue;
              }
      val = dictGetEntryVal(best);
  
      redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f",
-         key->ptr, best_swappability);
+         key, best_swappability);
  
-     /* Unshare the key if needed */
-     if (key->refcount > 1) {
-         robj *newkey = dupStringObject(key);
-         decrRefCount(key);
-         key = dictGetEntryKey(best) = newkey;
-     }
      /* Swap it */
      if (usethreads) {
-         vmSwapObjectThreaded(key,val,best_db);
+         robj *keyobj = createStringObject(key,sdslen(key));
+         vmSwapObjectThreaded(keyobj,val,best_db);
+         decrRefCount(keyobj);
          return REDIS_OK;
      } else {
-         if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
-             dictGetEntryVal(best) = NULL;
+         vmpointer *vp;
+         if ((vp = vmSwapObjectBlocking(val)) != NULL) {
+             dictGetEntryVal(best) = vp;
              return REDIS_OK;
          } else {
              return REDIS_ERR;
@@@ -9604,30 -9434,20 +9687,20 @@@ static int vmCanSwapOut(void) 
      return (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1);
  }
  
- /* Delete a key if swapped. Returns 1 if the key was found, was swapped
-  * and was deleted. Otherwise 0 is returned. */
- static int deleteIfSwapped(redisDb *db, robj *key) {
-     dictEntry *de;
-     robj *foundkey;
-     if ((de = dictFind(db->dict,key)) == NULL) return 0;
-     foundkey = dictGetEntryKey(de);
-     if (foundkey->storage == REDIS_VM_MEMORY) return 0;
-     deleteKey(db,key);
-     return 1;
- }
  /* =================== Virtual Memory - Threaded I/O  ======================= */
  
  static void freeIOJob(iojob *j) {
      if ((j->type == REDIS_IOJOB_PREPARE_SWAP ||
          j->type == REDIS_IOJOB_DO_SWAP ||
          j->type == REDIS_IOJOB_LOAD) && j->val != NULL)
+     {
+          /* we fix the storage type, otherwise decrRefCount() will try to
+           * kill the I/O thread Job (that does no longer exists). */
+         if (j->val->storage == REDIS_VM_SWAPPING)
+             j->val->storage = REDIS_VM_MEMORY;
          decrRefCount(j->val);
-     /* We don't decrRefCount the j->key field as we did't incremented
-      * the count creating IO Jobs. This is because the key field here is
-      * just used as an indentifier and if a key is removed the Job should
-      * never be touched again. */
+     }
+     decrRefCount(j->key);
      zfree(j);
  }
  
@@@ -9648,7 -9468,6 +9721,6 @@@ static void vmThreadedIOCompletedJob(ae
      while((retval = read(fd,buf,1)) == 1) {
          iojob *j;
          listNode *ln;
-         robj *key;
          struct dictEntry *de;
  
          redisLog(REDIS_DEBUG,"Processing I/O completed job");
          }
          /* Post process it in the main thread, as there are things we
           * can do just here to avoid race conditions and/or invasive locks */
-         redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
-         de = dictFind(j->db->dict,j->key);
-         assert(de != NULL);
-         key = dictGetEntryKey(de);
+         redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr);
+         de = dictFind(j->db->dict,j->key->ptr);
+         redisAssert(de != NULL);
          if (j->type == REDIS_IOJOB_LOAD) {
              redisDb *db;
+             vmpointer *vp = dictGetEntryVal(de);
  
              /* Key loaded, bring it at home */
-             key->storage = REDIS_VM_MEMORY;
-             key->vm.atime = server.unixtime;
-             vmMarkPagesFree(key->vm.page,key->vm.usedpages);
+             vmMarkPagesFree(vp->page,vp->usedpages);
              redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
-                 (unsigned char*) key->ptr);
+                 (unsigned char*) j->key->ptr);
              server.vm_stats_swapped_objects--;
              server.vm_stats_swapins++;
              dictGetEntryVal(de) = j->val;
              incrRefCount(j->val);
              db = j->db;
-             freeIOJob(j);
              /* Handle clients waiting for this key to be loaded. */
-             handleClientsBlockedOnSwappedKey(db,key);
+             handleClientsBlockedOnSwappedKey(db,j->key);
+             freeIOJob(j);
+             zfree(vp);
          } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
              /* Now we know the amount of pages required to swap this object.
               * Let's find some space for it, and queue this task again
              {
                  /* Ooops... no space or we can't swap as there is
                   * a fork()ed Redis trying to save stuff on disk. */
+                 j->val->storage = REDIS_VM_MEMORY; /* undo operation */
                  freeIOJob(j);
-                 key->storage = REDIS_VM_MEMORY; /* undo operation */
              } else {
                  /* Note that we need to mark this pages as used now,
                   * if the job will be canceled, we'll mark them as freed
                  unlockThreadedIO();
              }
          } else if (j->type == REDIS_IOJOB_DO_SWAP) {
-             robj *val;
+             vmpointer *vp;
  
              /* Key swapped. We can finally free some memory. */
-             if (key->storage != REDIS_VM_SWAPPING) {
-                 printf("key->storage: %d\n",key->storage);
-                 printf("key->name: %s\n",(char*)key->ptr);
-                 printf("key->refcount: %d\n",key->refcount);
+             if (j->val->storage != REDIS_VM_SWAPPING) {
+                 vmpointer *vp = (vmpointer*) j->id;
+                 printf("storage: %d\n",vp->storage);
+                 printf("key->name: %s\n",(char*)j->key->ptr);
                  printf("val: %p\n",(void*)j->val);
                  printf("val->type: %d\n",j->val->type);
                  printf("val->ptr: %s\n",(char*)j->val->ptr);
              }
-             redisAssert(key->storage == REDIS_VM_SWAPPING);
-             val = dictGetEntryVal(de);
-             key->vm.page = j->page;
-             key->vm.usedpages = j->pages;
-             key->storage = REDIS_VM_SWAPPED;
-             key->vtype = j->val->type;
-             decrRefCount(val); /* Deallocate the object from memory. */
-             dictGetEntryVal(de) = NULL;
+             redisAssert(j->val->storage == REDIS_VM_SWAPPING);
+             vp = createVmPointer(j->val->type);
+             vp->page = j->page;
+             vp->usedpages = j->pages;
+             dictGetEntryVal(de) = vp;
+             /* Fix the storage otherwise decrRefCount will attempt to
+              * remove the associated I/O job */
+             j->val->storage = REDIS_VM_MEMORY;
+             decrRefCount(j->val);
              redisLog(REDIS_DEBUG,
                  "VM: object %s swapped out at %lld (%lld pages) (threaded)",
-                 (unsigned char*) key->ptr,
+                 (unsigned char*) j->key->ptr,
                  (unsigned long long) j->page, (unsigned long long) j->pages);
              server.vm_stats_swapped_objects++;
              server.vm_stats_swapouts++;
@@@ -9790,7 -9609,7 +9862,7 @@@ static void vmCancelThreadedIOJob(robj 
      assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING);
  again:
      lockThreadedIO();
-     /* Search for a matching key in one of the queues */
+     /* Search for a matching object in one of the queues */
      for (i = 0; i < 3; i++) {
          listNode *ln;
          listIter li;
              iojob *job = ln->value;
  
              if (job->canceled) continue; /* Skip this, already canceled. */
-             if (job->key == o) {
-                 redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n",
-                     (void*)job, (char*)o->ptr, job->type, i);
+             if (job->id == o) {
+                 redisLog(REDIS_DEBUG,"*** CANCELED %p (key %s) (type %d) (LIST ID %d)\n",
+                     (void*)job, (char*)job->key->ptr, job->type, i);
                  /* Mark the pages as free since the swap didn't happened
                   * or happened but is now discarded. */
                  if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP)
                  else if (o->storage == REDIS_VM_SWAPPING)
                      o->storage = REDIS_VM_MEMORY;
                  unlockThreadedIO();
+                 redisLog(REDIS_DEBUG,"*** DONE");
                  return;
              }
          }
      }
      unlockThreadedIO();
-     assert(1 != 1); /* We should never reach this */
+     printf("Not found: %p\n", (void*)o);
+     redisAssert(1 != 1); /* We should never reach this */
  }
  
  static void *IOThreadEntryPoint(void *arg) {
  
          /* Process the Job */
          if (j->type == REDIS_IOJOB_LOAD) {
-             j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
+             vmpointer *vp = (vmpointer*)j->id;
+             j->val = vmReadObjectFromSwap(j->page,vp->vtype);
          } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
              FILE *fp = fopen("/dev/null","w+");
              j->pages = rdbSavedObjectPages(j->val,fp);
@@@ -9984,18 -9806,16 +10059,16 @@@ static void queueIOJob(iojob *j) 
  static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
      iojob *j;
  
-     assert(key->storage == REDIS_VM_MEMORY);
-     assert(key->refcount == 1);
      j = zmalloc(sizeof(*j));
      j->type = REDIS_IOJOB_PREPARE_SWAP;
      j->db = db;
      j->key = key;
-     j->val = val;
+     incrRefCount(key);
+     j->id = j->val = val;
      incrRefCount(val);
      j->canceled = 0;
      j->thread = (pthread_t) -1;
-     key->storage = REDIS_VM_SWAPPING;
+     val->storage = REDIS_VM_SWAPPING;
  
      lockThreadedIO();
      queueIOJob(j);
@@@ -10017,9 -9837,9 +10090,9 @@@ static int waitForSwappedKey(redisClien
  
      /* If the key does not exist or is already in RAM we don't need to
       * block the client at all. */
-     de = dictFind(c->db->dict,key);
+     de = dictFind(c->db->dict,key->ptr);
      if (de == NULL) return 0;
-     o = dictGetEntryKey(de);
+     o = dictGetEntryVal(de);
      if (o->storage == REDIS_VM_MEMORY) {
          return 0;
      } else if (o->storage == REDIS_VM_SWAPPING) {
      /* Are we already loading the key from disk? If not create a job */
      if (o->storage == REDIS_VM_SWAPPED) {
          iojob *j;
+         vmpointer *vp = (vmpointer*)o;
  
          o->storage = REDIS_VM_LOADING;
          j = zmalloc(sizeof(*j));
          j->type = REDIS_IOJOB_LOAD;
          j->db = c->db;
-         j->key = o;
-         j->key->vtype = o->vtype;
-         j->page = o->vm.page;
+         j->id = (robj*)vp;
+         j->key = key;
+         incrRefCount(key);
+         j->page = vp->page;
          j->val = NULL;
          j->canceled = 0;
          j->thread = (pthread_t) -1;
@@@ -10185,6 -10007,8 +10260,8 @@@ static int dontWaitForSwappedKey(redisC
      return listLength(c->io_keys) == 0;
  }
  
+ /* Every time we now a key was loaded back in memory, we handle clients
+  * waiting for this key if any. */
  static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) {
      struct dictEntry *de;
      list *l;
@@@ -10243,6 -10067,11 +10320,11 @@@ static void configSetCommand(redisClien
          } else {
              goto badfmt;
          }
+     } else if (!strcasecmp(c->argv[2]->ptr,"no-appendfsync-on-rewrite")) {
+         int yn = yesnotoi(o->ptr);
+         if (yn == -1) goto badfmt;
+         server.no_appendfsync_on_rewrite = yn;
      } else if (!strcasecmp(c->argv[2]->ptr,"appendonly")) {
          int old = server.appendonly;
          int new = yesnotoi(o->ptr);
@@@ -10358,6 -10187,11 +10440,11 @@@ static void configGetCommand(redisClien
          addReplyBulkCString(c,server.appendonly ? "yes" : "no");
          matches++;
      }
+     if (stringmatch(pattern,"no-appendfsync-on-rewrite",0)) {
+         addReplyBulkCString(c,"no-appendfsync-on-rewrite");
+         addReplyBulkCString(c,server.no_appendfsync_on_rewrite ? "yes" : "no");
+         matches++;
+     }
      if (stringmatch(pattern,"appendfsync",0)) {
          char *policy;
  
@@@ -10793,7 -10627,7 +10880,7 @@@ static void touchWatchedKeysOnFlush(in
               * key exists, mark the client as dirty, as the key will be
               * removed. */
              if (dbid == -1 || wk->db->id == dbid) {
-                 if (dictFind(wk->db->dict, wk->key) != NULL)
+                 if (dictFind(wk->db->dict, wk->key->ptr) != NULL)
                      c->flags |= REDIS_DIRTY_CAS;
              }
          }
@@@ -10904,40 -10738,37 +10991,35 @@@ static void computeDatasetDigest(unsign
  
          /* Iterate this DB writing every entry */
          while((de = dictNext(di)) != NULL) {
-             robj *key, *o, *kcopy;
+             sds key;
+             robj *keyobj, *o;
              time_t expiretime;
  
              memset(digest,0,20); /* This key-val digest */
              key = dictGetEntryKey(de);
+             keyobj = createStringObject(key,sdslen(key));
+             mixDigest(digest,key,sdslen(key));
+             /* Make sure the key is loaded if VM is active */
+             o = lookupKeyRead(db,keyobj);
  
-             if (!server.vm_enabled) {
-                 mixObjectDigest(digest,key);
-                 o = dictGetEntryVal(de);
-             } else {
-                 /* Don't work with the key directly as when VM is active
-                  * this is unsafe: TODO: fix decrRefCount to check if the
-                  * count really reached 0 to avoid this mess */
-                 kcopy = dupStringObject(key);
-                 mixObjectDigest(digest,kcopy);
-                 o = lookupKeyRead(db,kcopy);
-                 decrRefCount(kcopy);
-             }
              aux = htonl(o->type);
              mixDigest(digest,&aux,sizeof(aux));
-             expiretime = getExpire(db,key);
+             expiretime = getExpire(db,keyobj);
  
              /* Save the key and associated value */
              if (o->type == REDIS_STRING) {
                  mixObjectDigest(digest,o);
              } else if (o->type == REDIS_LIST) {
 -                list *list = o->ptr;
 -                listNode *ln;
 -                listIter li;
 -
 -                listRewind(list,&li);
 -                while((ln = listNext(&li))) {
 -                    robj *eleobj = listNodeValue(ln);
 -
 +                lIterator *li = lInitIterator(o,0,REDIS_TAIL);
 +                lEntry entry;
 +                while(lNext(li,&entry)) {
 +                    robj *eleobj = lGet(&entry);
                      mixObjectDigest(digest,eleobj);
 +                    decrRefCount(eleobj);
                  }
 +                lReleaseIterator(li);
              } else if (o->type == REDIS_SET) {
                  dict *set = o->ptr;
                  dictIterator *di = dictGetIterator(set);
              if (expiretime != -1) xorDigest(digest,"!!expire!!",10);
              /* We can finally xor the key-val digest to the final digest */
              xorDigest(final,digest,20);
+             decrRefCount(keyobj);
          }
          dictReleaseIterator(di);
      }
@@@ -11020,17 -10852,16 +11103,16 @@@ static void debugCommand(redisClient *c
          redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
          addReply(c,shared.ok);
      } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
-         dictEntry *de = dictFind(c->db->dict,c->argv[2]);
-         robj *key, *val;
+         dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
+         robj *val;
  
          if (!de) {
              addReply(c,shared.nokeyerr);
              return;
          }
-         key = dictGetEntryKey(de);
          val = dictGetEntryVal(de);
-         if (!server.vm_enabled || (key->storage == REDIS_VM_MEMORY ||
-                                    key->storage == REDIS_VM_SWAPPING)) {
+         if (!server.vm_enabled || (val->storage == REDIS_VM_MEMORY ||
+                                    val->storage == REDIS_VM_SWAPPING)) {
              char *strenc;
              char buf[128];
  
                  strenc = buf;
              }
              addReplySds(c,sdscatprintf(sdsempty(),
-                 "+Key at:%p refcount:%d, value at:%p refcount:%d "
+                 "+Value at:%p refcount:%d "
                  "encoding:%s serializedlength:%lld\r\n",
-                 (void*)key, key->refcount, (void*)val, val->refcount,
+                 (void*)val, val->refcount,
                  strenc, (long long) rdbSavedObjectLen(val,NULL)));
          } else {
+             vmpointer *vp = (vmpointer*) val;
              addReplySds(c,sdscatprintf(sdsempty(),
-                 "+Key at:%p refcount:%d, value swapped at: page %llu "
+                 "+Value swapped at: page %llu "
                  "using %llu pages\r\n",
-                 (void*)key, key->refcount, (unsigned long long) key->vm.page,
-                 (unsigned long long) key->vm.usedpages));
+                 (unsigned long long) vp->page,
+                 (unsigned long long) vp->usedpages));
          }
      } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) {
          lookupKeyRead(c->db,c->argv[2]);
          addReply(c,shared.ok);
      } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) {
-         dictEntry *de = dictFind(c->db->dict,c->argv[2]);
-         robj *key, *val;
+         dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
+         robj *val;
+         vmpointer *vp;
  
          if (!server.vm_enabled) {
              addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n"));
              addReply(c,shared.nokeyerr);
              return;
          }
-         key = dictGetEntryKey(de);
          val = dictGetEntryVal(de);
-         /* If the key is shared we want to create a copy */
-         if (key->refcount > 1) {
-             robj *newkey = dupStringObject(key);
-             decrRefCount(key);
-             key = dictGetEntryKey(de) = newkey;
-         }
          /* Swap it */
-         if (key->storage != REDIS_VM_MEMORY) {
+         if (val->storage != REDIS_VM_MEMORY) {
              addReplySds(c,sdsnew("-ERR This key is not in memory\r\n"));
-         } else if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
-             dictGetEntryVal(de) = NULL;
+         } else if (val->refcount != 1) {
+             addReplySds(c,sdsnew("-ERR Object is shared\r\n"));
+         } else if ((vp = vmSwapObjectBlocking(val)) != NULL) {
+             dictGetEntryVal(de) = vp;
              addReply(c,shared.ok);
          } else {
              addReply(c,shared.err);
              }
              snprintf(buf,sizeof(buf),"value:%lu",j);
              val = createStringObject(buf,strlen(buf));
-             dictAdd(c->db->dict,key,val);
+             dbAdd(c->db,key,val);
+             decrRefCount(key);
          }
          addReply(c,shared.ok);
      } else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) {
diff --combined staticsymbols.h
index 16c3680b3ed1091ddfbe829d421830d2ef1a26d5,8c751325fdf6a57742ca0c7aa8dbeb4ff0413798..32517791bdc56cca6c591e7ee40811bd317f0532
@@@ -7,6 -7,7 +7,7 @@@ static struct redisFunctionSym symsTabl
  {"addReplyBulk",(unsigned long)addReplyBulk},
  {"addReplyBulkCString",(unsigned long)addReplyBulkCString},
  {"addReplyBulkLen",(unsigned long)addReplyBulkLen},
+ {"addReplyBulkSds",(unsigned long)addReplyBulkSds},
  {"addReplyDouble",(unsigned long)addReplyDouble},
  {"addReplyLongLong",(unsigned long)addReplyLongLong},
  {"addReplySds",(unsigned long)addReplySds},
  {"createSortOperation",(unsigned long)createSortOperation},
  {"createStringObject",(unsigned long)createStringObject},
  {"createStringObjectFromLongLong",(unsigned long)createStringObjectFromLongLong},
+ {"createVmPointer",(unsigned long)createVmPointer},
  {"createZsetObject",(unsigned long)createZsetObject},
  {"daemonize",(unsigned long)daemonize},
+ {"dbAdd",(unsigned long)dbAdd},
+ {"dbDelete",(unsigned long)dbDelete},
+ {"dbExists",(unsigned long)dbExists},
+ {"dbRandomKey",(unsigned long)dbRandomKey},
+ {"dbReplace",(unsigned long)dbReplace},
  {"dbsizeCommand",(unsigned long)dbsizeCommand},
  {"debugCommand",(unsigned long)debugCommand},
  {"decrCommand",(unsigned long)decrCommand},
  {"decrRefCount",(unsigned long)decrRefCount},
  {"decrbyCommand",(unsigned long)decrbyCommand},
  {"delCommand",(unsigned long)delCommand},
- {"deleteIfSwapped",(unsigned long)deleteIfSwapped},
  {"deleteIfVolatile",(unsigned long)deleteIfVolatile},
- {"deleteKey",(unsigned long)deleteKey},
  {"dictEncObjKeyCompare",(unsigned long)dictEncObjKeyCompare},
  {"dictListDestructor",(unsigned long)dictListDestructor},
  {"dictObjKeyCompare",(unsigned long)dictObjKeyCompare},
  {"dictRedisObjectDestructor",(unsigned long)dictRedisObjectDestructor},
+ {"dictSdsDestructor",(unsigned long)dictSdsDestructor},
+ {"dictSdsKeyCompare",(unsigned long)dictSdsKeyCompare},
  {"dictVanillaFree",(unsigned long)dictVanillaFree},
  {"discardCommand",(unsigned long)discardCommand},
  {"dontWaitForSwappedKey",(unsigned long)dontWaitForSwappedKey},
  {"freeStringObject",(unsigned long)freeStringObject},
  {"freeZsetObject",(unsigned long)freeZsetObject},
  {"fwriteBulkDouble",(unsigned long)fwriteBulkDouble},
 -{"fwriteBulkLong",(unsigned long)fwriteBulkLong},
 +{"fwriteBulkLongLong",(unsigned long)fwriteBulkLongLong},
  {"fwriteBulkObject",(unsigned long)fwriteBulkObject},
  {"fwriteBulkString",(unsigned long)fwriteBulkString},
  {"genRedisInfoString",(unsigned long)genRedisInfoString},
  {"pushGenericCommand",(unsigned long)pushGenericCommand},
  {"qsortCompareSetsByCardinality",(unsigned long)qsortCompareSetsByCardinality},
  {"qsortCompareZsetopsrcByCardinality",(unsigned long)qsortCompareZsetopsrcByCardinality},
+ {"qsortRedisCommands",(unsigned long)qsortRedisCommands},
  {"queueIOJob",(unsigned long)queueIOJob},
  {"queueMultiCommand",(unsigned long)queueMultiCommand},
  {"randomkeyCommand",(unsigned long)randomkeyCommand},
  {"scardCommand",(unsigned long)scardCommand},
  {"sdiffCommand",(unsigned long)sdiffCommand},
  {"sdiffstoreCommand",(unsigned long)sdiffstoreCommand},
- {"sdsDictKeyCompare",(unsigned long)sdsDictKeyCompare},
  {"sdscatrepr",(unsigned long)sdscatrepr},
  {"segvHandler",(unsigned long)segvHandler},
  {"selectCommand",(unsigned long)selectCommand},
  {"slaveofCommand",(unsigned long)slaveofCommand},
  {"smoveCommand",(unsigned long)smoveCommand},
  {"sortCommand",(unsigned long)sortCommand},
+ {"sortCommandTable",(unsigned long)sortCommandTable},
  {"sortCompare",(unsigned long)sortCompare},
  {"spawnIOThread",(unsigned long)spawnIOThread},
  {"spopCommand",(unsigned long)spopCommand},
  {"syncWithMaster",(unsigned long)syncWithMaster},
  {"syncWrite",(unsigned long)syncWrite},
  {"touchWatchedKey",(unsigned long)touchWatchedKey},
+ {"touchWatchedKeysOnFlush",(unsigned long)touchWatchedKeysOnFlush},
  {"tryFreeOneObjectFromFreelist",(unsigned long)tryFreeOneObjectFromFreelist},
  {"tryObjectEncoding",(unsigned long)tryObjectEncoding},
  {"tryResizeHashTables",(unsigned long)tryResizeHashTables},