Merge branch 'master' into smallkeys
authorantirez <antirez@gmail.com>
Wed, 2 Jun 2010 13:19:50 +0000 (15:19 +0200)
committerantirez <antirez@gmail.com>
Wed, 2 Jun 2010 13:19:50 +0000 (15:19 +0200)
1  2 
redis.c

diff --combined redis.c
index 6d762a7b9cd4b1788ce000946be278cba55e41d7,8b5ddae23ae2255e4065ae05d2b6269853273a1a..7ae800ab140168878ecc4100504dff5c74b67e61
+++ b/redis.c
  #define REDIS_SET 2
  #define REDIS_ZSET 3
  #define REDIS_HASH 4
 +#define REDIS_VMPOINTER 8
  
  /* Objects encoding. Some kind of objects like Strings and Hashes can be
   * internally represented in multiple ways. The 'encoding' field of the object
@@@ -248,41 -247,30 +248,41 @@@ static void _redisPanic(char *msg, cha
  
  /* A redis object, that is a type able to hold a string / list / set */
  
 -/* The VM object structure */
 -struct redisObjectVM {
 -    off_t page;         /* the page at witch the object is stored on disk */
 -    off_t usedpages;    /* number of pages used on disk */
 -    time_t atime;       /* Last access time */
 -} vm;
 -
  /* The actual Redis Object */
  typedef struct redisObject {
 -    void *ptr;
 -    unsigned char type;
 -    unsigned char encoding;
 -    unsigned char storage;  /* If this object is a key, where is the value?
 -                             * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
 -    unsigned char vtype; /* If this object is a key, and value is swapped out,
 -                          * this is the type of the swapped out object. */
 +    unsigned type:4;
 +    unsigned storage:2;     /* REDIS_VM_MEMORY or REDIS_VM_SWAPPING */
 +    unsigned encoding:4;
 +    unsigned lru:22;        /* lru time (relative to server.lruclock) */
      int refcount;
 +    void *ptr;
      /* VM fields, this are only allocated if VM is active, otherwise the
       * object allocation function will just allocate
       * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
       * Redis without VM active will not have any overhead. */
 -    struct redisObjectVM vm;
  } robj;
  
 +/* The VM pointer structure - identifies an object in the swap file.
 + *
 + * This object is stored in place of the value
 + * object in the main key->value hash table representing a database.
 + * Note that the first fields (type, storage) are the same as the redisObject
 + * structure so that vmPointer strucuters can be accessed even when casted
 + * as redisObject structures.
 + *
 + * This is useful as we don't know if a value object is or not on disk, but we
 + * are always able to read obj->storage to check this. For vmPointer
 + * structures "type" is set to REDIS_VMPOINTER (even if without this field
 + * is still possible to check the kind of object from the value of 'storage').*/
 +typedef struct vmPointer {
 +    unsigned type:4;
 +    unsigned storage:2; /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
 +    unsigned notused:26;
 +    unsigned int vtype; /* type of the object stored in the swap file */
 +    off_t page;         /* the page at witch the object is stored on disk */
 +    off_t usedpages;    /* number of pages used on disk */
 +} vmpointer;
 +
  /* Macro used to initalize a Redis object allocated on the stack.
   * Note that this macro is taken near the structure definition to make sure
   * we'll update it when the structure is changed, to avoid bugs like
      _var.type = REDIS_STRING; \
      _var.encoding = REDIS_ENCODING_RAW; \
      _var.ptr = _ptr; \
 -    if (server.vm_enabled) _var.storage = REDIS_VM_MEMORY; \
 +    _var.storage = REDIS_VM_MEMORY; \
  } while(0);
  
  typedef struct redisDb {
@@@ -462,8 -450,6 +462,8 @@@ struct redisServer 
      list *pubsub_patterns; /* A list of pubsub_patterns */
      /* Misc */
      FILE *devnull;
 +    unsigned lruclock:22;        /* clock incrementing every minute, for LRU */
 +    unsigned lruclock_padding:10;
  };
  
  typedef struct pubsubPattern {
@@@ -556,9 -542,6 +556,9 @@@ typedef struct iojob 
      int type;   /* Request type, REDIS_IOJOB_* */
      redisDb *db;/* Redis database */
      robj *key;  /* This I/O request is about swapping this key */
 +    robj *id;   /* Unique identifier of this job:
 +                   this is the object to swap for REDIS_IOREQ_*_SWAP, or the
 +                   vmpointer objct for REDIS_IOREQ_LOAD. */
      robj *val;  /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
                   * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
      off_t page; /* Swap page where to read/write the object */
@@@ -615,8 -598,8 +615,8 @@@ static void unblockClientWaitingData(re
  static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
  static void vmInit(void);
  static void vmMarkPagesFree(off_t page, off_t count);
 -static robj *vmLoadObject(robj *key);
 -static robj *vmPreviewObject(robj *key);
 +static robj *vmLoadObject(robj *o);
 +static robj *vmPreviewObject(robj *o);
  static int vmSwapOneObjectBlocking(void);
  static int vmSwapOneObjectThreaded(void);
  static int vmCanSwapOut(void);
@@@ -652,7 -635,7 +652,7 @@@ static int compareStringObjects(robj *a
  static int equalStringObjects(robj *a, robj *b);
  static void usage();
  static int rewriteAppendOnlyFileBackground(void);
 -static int vmSwapObjectBlocking(robj *key, robj *val);
 +static vmpointer *vmSwapObjectBlocking(robj *val);
  static int prepareForShutdown();
  static void touchWatchedKey(redisDb *db, robj *key);
  static void touchWatchedKeysOnFlush(int dbid);
@@@ -1449,19 -1432,6 +1449,19 @@@ static int serverCron(struct aeEventLoo
       * in objects at every object access, and accuracy is not needed.
       * To access a global var is faster than calling time(NULL) */
      server.unixtime = time(NULL);
 +    /* We have just 21 bits per object for LRU information.
 +     * So we use an (eventually wrapping) LRU clock with minutes resolution.
 +     *
 +     * When we need to select what object to swap, we compute the minimum
 +     * time distance between the current lruclock and the object last access
 +     * lruclock info. Even if clocks will wrap on overflow, there is
 +     * the interesting property that we are sure that at least
 +     * ABS(A-B) minutes passed between current time and timestamp B.
 +     *
 +     * This is not precise but we don't need at all precision, but just
 +     * something statistically reasonable.
 +     */
 +    server.lruclock = (time(NULL)/60)&((1<<21)-1);
  
      /* We received a SIGTERM, shutting down here in a safe way, as it is
       * not ok doing so inside the signal handler. */
@@@ -2968,9 -2938,12 +2968,9 @@@ static robj *createObject(int type, voi
          listDelNode(server.objfreelist,head);
          if (server.vm_enabled) pthread_mutex_unlock(&server.obj_freelist_mutex);
      } else {
 -        if (server.vm_enabled) {
 +        if (server.vm_enabled)
              pthread_mutex_unlock(&server.obj_freelist_mutex);
 -            o = zmalloc(sizeof(*o));
 -        } else {
 -            o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
 -        }
 +        o = zmalloc(sizeof(*o));
      }
      o->type = type;
      o->encoding = REDIS_ENCODING_RAW;
      o->refcount = 1;
      if (server.vm_enabled) {
          /* Note that this code may run in the context of an I/O thread
 -         * and accessing to server.unixtime in theory is an error
 +         * and accessing server.lruclock in theory is an error
           * (no locks). But in practice this is safe, and even if we read
 -         * garbage Redis will not fail, as it's just a statistical info */
 -        o->vm.atime = server.unixtime;
 +         * garbage Redis will not fail. */
 +        o->lru = server.lruclock;
          o->storage = REDIS_VM_MEMORY;
      }
      return o;
@@@ -3086,31 -3059,28 +3086,31 @@@ static void incrRefCount(robj *o) 
  static void decrRefCount(void *obj) {
      robj *o = obj;
  
 -    if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
 -    /* Object is a key of a swapped out value, or in the process of being
 -     * loaded. */
 +    /* Object is a swapped out value, or in the process of being loaded. */
      if (server.vm_enabled &&
          (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
      {
 -        if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(obj);
 -        redisAssert(o->type == REDIS_STRING);
 -        freeStringObject(o);
 -        vmMarkPagesFree(o->vm.page,o->vm.usedpages);
 -        pthread_mutex_lock(&server.obj_freelist_mutex);
 -        if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
 -            !listAddNodeHead(server.objfreelist,o))
 -            zfree(o);
 -        pthread_mutex_unlock(&server.obj_freelist_mutex);
 +        vmpointer *vp = obj;
 +        if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(o);
 +        vmMarkPagesFree(vp->page,vp->usedpages);
          server.vm_stats_swapped_objects--;
 +        zfree(vp);
          return;
      }
 -    /* Object is in memory, or in the process of being swapped out. */
 +
 +    if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
 +    /* Object is in memory, or in the process of being swapped out.
 +     *
 +     * If the object is being swapped out, abort the operation on
 +     * decrRefCount even if the refcount does not drop to 0: the object
 +     * is referenced at least two times, as value of the key AND as
 +     * job->val in the iojob. So if we don't invalidate the iojob, when it is
 +     * done but the relevant key was removed in the meantime, the
 +     * complete jobs handler will not find the key about the job and the
 +     * assert will fail. */
 +    if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING)
 +        vmCancelThreadedIOJob(o);
      if (--(o->refcount) == 0) {
 -        if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING)
 -            vmCancelThreadedIOJob(obj);
          switch(o->type) {
          case REDIS_STRING: freeStringObject(o); break;
          case REDIS_LIST: freeListObject(o); break;
      }
  }
  
 -static robj *lookupKey(redisDb *db, robj *key) {
 -    dictEntry *de = dictFind(db->dict,key);
 -    if (de) {
 -        robj *key = dictGetEntryKey(de);
 -        robj *val = dictGetEntryVal(de);
 -
 -        if (server.vm_enabled) {
 -            if (key->storage == REDIS_VM_MEMORY ||
 -                key->storage == REDIS_VM_SWAPPING)
 -            {
 -                /* If we were swapping the object out, stop it, this key
 -                 * was requested. */
 -                if (key->storage == REDIS_VM_SWAPPING)
 -                    vmCancelThreadedIOJob(key);
 -                /* Update the access time of the key for the aging algorithm. */
 -                key->vm.atime = server.unixtime;
 -            } else {
 -                int notify = (key->storage == REDIS_VM_LOADING);
 -
 -                /* Our value was swapped on disk. Bring it at home. */
 -                redisAssert(val == NULL);
 -                val = vmLoadObject(key);
 -                dictGetEntryVal(de) = val;
 -
 -                /* Clients blocked by the VM subsystem may be waiting for
 -                 * this key... */
 -                if (notify) handleClientsBlockedOnSwappedKey(db,key);
 -            }
 -        }
 -        return val;
 -    } else {
 -        return NULL;
 -    }
 -}
 -
 -static robj *lookupKeyRead(redisDb *db, robj *key) {
 -    expireIfNeeded(db,key);
 -    return lookupKey(db,key);
 -}
 -
 -static robj *lookupKeyWrite(redisDb *db, robj *key) {
 -    deleteIfVolatile(db,key);
 -    touchWatchedKey(db,key);
 -    return lookupKey(db,key);
 -}
 -
 -static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
 -    robj *o = lookupKeyRead(c->db, key);
 -    if (!o) addReply(c,reply);
 -    return o;
 -}
 -
 -static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
 -    robj *o = lookupKeyWrite(c->db, key);
 -    if (!o) addReply(c,reply);
 -    return o;
 -}
 -
  static int checkType(redisClient *c, robj *o, int type) {
      if (o->type != type) {
          addReply(c,shared.wrongtypeerr);
      return 0;
  }
  
 -static int deleteKey(redisDb *db, robj *key) {
 -    int retval;
 -
 -    /* We need to protect key from destruction: after the first dictDelete()
 -     * it may happen that 'key' is no longer valid if we don't increment
 -     * it's count. This may happen when we get the object reference directly
 -     * from the hash table with dictRandomKey() or dict iterators */
 -    incrRefCount(key);
 -    if (dictSize(db->expires)) dictDelete(db->expires,key);
 -    retval = dictDelete(db->dict,key);
 -    decrRefCount(key);
 -
 -    return retval == DICT_OK;
 -}
 -
  /* Check if the nul-terminated string 's' can be represented by a long
   * (that is, is a number that fits into long without any other space or
   * character before or after the digits).
@@@ -3354,80 -3397,6 +3354,80 @@@ static int getLongFromObjectOrReply(red
      return REDIS_OK;
  }
  
 +/* =========================== Keyspace access API ========================== */
 +
 +static robj *lookupKey(redisDb *db, robj *key) {
 +    dictEntry *de = dictFind(db->dict,key);
 +    if (de) {
 +        robj *key = dictGetEntryKey(de);
 +        robj *val = dictGetEntryVal(de);
 +
 +        if (server.vm_enabled) {
 +            if (val->storage == REDIS_VM_MEMORY ||
 +                val->storage == REDIS_VM_SWAPPING)
 +            {
 +                /* If we were swapping the object out, cancel the operation */
 +                if (val->storage == REDIS_VM_SWAPPING)
 +                    vmCancelThreadedIOJob(val);
 +                /* Update the access time of the key for the aging algorithm. */
 +                val->lru = server.lruclock;
 +            } else {
 +                int notify = (val->storage == REDIS_VM_LOADING);
 +
 +                /* Our value was swapped on disk. Bring it at home. */
 +                redisAssert(val->type == REDIS_VMPOINTER);
 +                val = vmLoadObject(val);
 +                dictGetEntryVal(de) = val;
 +
 +                /* Clients blocked by the VM subsystem may be waiting for
 +                 * this key... */
 +                if (notify) handleClientsBlockedOnSwappedKey(db,key);
 +            }
 +        }
 +        return val;
 +    } else {
 +        return NULL;
 +    }
 +}
 +
 +static robj *lookupKeyRead(redisDb *db, robj *key) {
 +    expireIfNeeded(db,key);
 +    return lookupKey(db,key);
 +}
 +
 +static robj *lookupKeyWrite(redisDb *db, robj *key) {
 +    deleteIfVolatile(db,key);
 +    touchWatchedKey(db,key);
 +    return lookupKey(db,key);
 +}
 +
 +static robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
 +    robj *o = lookupKeyRead(c->db, key);
 +    if (!o) addReply(c,reply);
 +    return o;
 +}
 +
 +static robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
 +    robj *o = lookupKeyWrite(c->db, key);
 +    if (!o) addReply(c,reply);
 +    return o;
 +}
 +
 +static int deleteKey(redisDb *db, robj *key) {
 +    int retval;
 +
 +    /* We need to protect key from destruction: after the first dictDelete()
 +     * it may happen that 'key' is no longer valid if we don't increment
 +     * it's count. This may happen when we get the object reference directly
 +     * from the hash table with dictRandomKey() or dict iterators */
 +    incrRefCount(key);
 +    if (dictSize(db->expires)) dictDelete(db->expires,key);
 +    retval = dictDelete(db->dict,key);
 +    decrRefCount(key);
 +
 +    return retval == DICT_OK;
 +}
 +
  /*============================ RDB saving/loading =========================== */
  
  static int rdbSaveType(FILE *fp, unsigned char type) {
@@@ -3795,8 -3764,8 +3795,8 @@@ static int rdbSave(char *filename) 
              }
              /* Save the key and associated value. This requires special
               * handling if the value is swapped out. */
 -            if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY ||
 -                                      key->storage == REDIS_VM_SWAPPING) {
 +            if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
 +                                      o->storage == REDIS_VM_SWAPPING) {
                  /* Save type, key, value */
                  if (rdbSaveType(fp,o->type) == -1) goto werr;
                  if (rdbSaveStringObject(fp,key) == -1) goto werr;
                  /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
                  robj *po;
                  /* Get a preview of the object in memory */
 -                po = vmPreviewObject(key);
 +                po = vmPreviewObject(o);
                  /* Save type, key, value */
 -                if (rdbSaveType(fp,key->vtype) == -1) goto werr;
 +                if (rdbSaveType(fp,po->type) == -1) goto werr;
                  if (rdbSaveStringObject(fp,key) == -1) goto werr;
                  if (rdbSaveObject(fp,po) == -1) goto werr;
                  /* Remove the loaded object from memory */
@@@ -4132,7 -4101,6 +4132,6 @@@ static int rdbLoad(char *filename) 
      redisDb *db = server.db+0;
      char buf[1024];
      time_t expiretime, now = time(NULL);
-     long long loadedkeys = 0;
  
      fp = fopen(filename,"r");
      if (!fp) return REDIS_ERR;
      }
      while(1) {
          robj *key, *val;
+         int force_swapout;
  
          expiretime = -1;
          /* Read type. */
              redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", key->ptr);
              exit(1);
          }
-         loadedkeys++;
          /* Set the expire time if needed */
          if (expiretime != -1) setExpire(db,key,expiretime);
  
  
              /* de may be NULL since the key already expired */
              if (de) {
 +                vmpointer *vp;
                  key = dictGetEntryKey(de);
                  val = dictGetEntryVal(de);
  
 -                if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
 -                    dictGetEntryVal(de) = NULL;
 -                }
 +                if (val->refcount == 1 &&
 +                    (vp = vmSwapObjectBlocking(val)) != NULL)
 +                    dictGetEntryVal(de) = vp;
              }
              continue;
          }
  
+         /* Flush data on disk once 32 MB of additional RAM are used... */
+         force_swapout = 0;
+         if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
+             force_swapout = 1;
          /* If we have still some hope of having some value fitting memory
           * then we try random sampling. */
-         if (!swap_all_values && server.vm_enabled && (loadedkeys % 5000) == 0) {
+         if (!swap_all_values && server.vm_enabled && force_swapout) {
              while (zmalloc_used_memory() > server.vm_max_memory) {
                  if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
              }
@@@ -8272,48 -8244,6 +8276,48 @@@ static void freeMemoryIfNeeded(void) 
  
  /* ============================== Append Only file ========================== */
  
 +/* Called when the user switches from "appendonly yes" to "appendonly no"
 + * at runtime using the CONFIG command. */
 +static void stopAppendOnly(void) {
 +    flushAppendOnlyFile();
 +    aof_fsync(server.appendfd);
 +    close(server.appendfd);
 +
 +    server.appendfd = -1;
 +    server.appendseldb = -1;
 +    server.appendonly = 0;
 +    /* rewrite operation in progress? kill it, wait child exit */
 +    if (server.bgsavechildpid != -1) {
 +        int statloc;
 +
 +        if (kill(server.bgsavechildpid,SIGKILL) != -1)
 +            wait3(&statloc,0,NULL);
 +        /* reset the buffer accumulating changes while the child saves */
 +        sdsfree(server.bgrewritebuf);
 +        server.bgrewritebuf = sdsempty();
 +        server.bgsavechildpid = -1;
 +    }
 +}
 +
 +/* Called when the user switches from "appendonly no" to "appendonly yes"
 + * at runtime using the CONFIG command. */
 +static int startAppendOnly(void) {
 +    server.appendonly = 1;
 +    server.lastfsync = time(NULL);
 +    server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
 +    if (server.appendfd == -1) {
 +        redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno));
 +        return REDIS_ERR;
 +    }
 +    if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {
 +        server.appendonly = 0;
 +        close(server.appendfd);
 +        redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno));
 +        return REDIS_ERR;
 +    }
 +    return REDIS_OK;
 +}
 +
  /* Write the append only file buffer on disk.
   *
   * Since we are required to write the AOF before replying to the client,
@@@ -8478,7 -8408,6 +8482,6 @@@ int loadAppendOnlyFile(char *filename) 
      struct redisClient *fakeClient;
      FILE *fp = fopen(filename,"r");
      struct redis_stat sb;
-     unsigned long long loadedkeys = 0;
      int appendonly = server.appendonly;
  
      if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
          char buf[128];
          sds argsds;
          struct redisCommand *cmd;
+         int force_swapout;
  
          if (fgets(buf,sizeof(buf),fp) == NULL) {
              if (feof(fp))
          for (j = 0; j < argc; j++) decrRefCount(argv[j]);
          zfree(argv);
          /* Handle swapping while loading big datasets when VM is on */
-         loadedkeys++;
-         if (server.vm_enabled && (loadedkeys % 5000) == 0) {
+         force_swapout = 0;
+         if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
+             force_swapout = 1;
+         if (server.vm_enabled && force_swapout) {
              while (zmalloc_used_memory() > server.vm_max_memory) {
                  if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
              }
@@@ -8670,16 -8603,16 +8677,16 @@@ static int rewriteAppendOnlyFile(char *
              int swapped;
  
              key = dictGetEntryKey(de);
 +            o = dictGetEntryVal(de);
              /* If the value for this key is swapped, load a preview in memory.
               * We use a "swapped" flag to remember if we need to free the
               * value object instead to just increment the ref count anyway
               * in order to avoid copy-on-write of pages if we are forked() */
 -            if (!server.vm_enabled || key->storage == REDIS_VM_MEMORY ||
 -                key->storage == REDIS_VM_SWAPPING) {
 -                o = dictGetEntryVal(de);
 +            if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
 +                o->storage == REDIS_VM_SWAPPING) {
                  swapped = 0;
              } else {
 -                o = vmPreviewObject(key);
 +                o = vmPreviewObject(o);
                  swapped = 1;
              }
              expiretime = getExpire(db,key);
@@@ -8903,19 -8836,50 +8910,19 @@@ static void aofRemoveTempFile(pid_t chi
   * as a fully non-blocking VM.
   */
  
 -/* Called when the user switches from "appendonly yes" to "appendonly no"
 - * at runtime using the CONFIG command. */
 -static void stopAppendOnly(void) {
 -    flushAppendOnlyFile();
 -    aof_fsync(server.appendfd);
 -    close(server.appendfd);
 -
 -    server.appendfd = -1;
 -    server.appendseldb = -1;
 -    server.appendonly = 0;
 -    /* rewrite operation in progress? kill it, wait child exit */
 -    if (server.bgsavechildpid != -1) {
 -        int statloc;
 +/* =================== Virtual Memory - Blocking Side  ====================== */
  
 -        if (kill(server.bgsavechildpid,SIGKILL) != -1)
 -            wait3(&statloc,0,NULL);
 -        /* reset the buffer accumulating changes while the child saves */
 -        sdsfree(server.bgrewritebuf);
 -        server.bgrewritebuf = sdsempty();
 -        server.bgsavechildpid = -1;
 -    }
 -}
 +/* Create a VM pointer object. This kind of objects are used in place of
 + * values in the key -> value hash table, for swapped out objects. */
 +static vmpointer *createVmPointer(int vtype) {
 +    vmpointer *vp = zmalloc(sizeof(vmpointer));
  
 -/* Called when the user switches from "appendonly no" to "appendonly yes"
 - * at runtime using the CONFIG command. */
 -static int startAppendOnly(void) {
 -    server.appendonly = 1;
 -    server.lastfsync = time(NULL);
 -    server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
 -    if (server.appendfd == -1) {
 -        redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s",strerror(errno));
 -        return REDIS_ERR;
 -    }
 -    if (rewriteAppendOnlyFileBackground() == REDIS_ERR) {
 -        server.appendonly = 0;
 -        close(server.appendfd);
 -        redisLog(REDIS_WARNING,"Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error.",strerror(errno));
 -        return REDIS_ERR;
 -    }
 -    return REDIS_OK;
 +    vp->type = REDIS_VMPOINTER;
 +    vp->storage = REDIS_VM_SWAPPED;
 +    vp->vtype = vtype;
 +    return vp;
  }
  
 -/* =================== Virtual Memory - Blocking Side  ====================== */
 -
  static void vmInit(void) {
      off_t totsize;
      int pipefds[2];
@@@ -9130,33 -9094,30 +9137,33 @@@ static int vmWriteObjectOnSwap(robj *o
      return REDIS_OK;
  }
  
 -/* Swap the 'val' object relative to 'key' into disk. Store all the information
 - * needed to later retrieve the object into the key object.
 +/* Transfers the 'val' object to disk. Store all the information
 + * a 'vmpointer' object containing all the information needed to load the
 + * object back later is returned.
 + *
   * If we can't find enough contiguous empty pages to swap the object on disk
 - * REDIS_ERR is returned. */
 -static int vmSwapObjectBlocking(robj *key, robj *val) {
 + * NULL is returned. */
 +static vmpointer *vmSwapObjectBlocking(robj *val) {
      off_t pages = rdbSavedObjectPages(val,NULL);
      off_t page;
 +    vmpointer *vp;
  
 -    assert(key->storage == REDIS_VM_MEMORY);
 -    assert(key->refcount == 1);
 -    if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR;
 -    if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return REDIS_ERR;
 -    key->vm.page = page;
 -    key->vm.usedpages = pages;
 -    key->storage = REDIS_VM_SWAPPED;
 -    key->vtype = val->type;
 +    assert(val->storage == REDIS_VM_MEMORY);
 +    assert(val->refcount == 1);
 +    if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return NULL;
 +    if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return NULL;
 +
 +    vp = createVmPointer(val->type);
 +    vp->page = page;
 +    vp->usedpages = pages;
      decrRefCount(val); /* Deallocate the object from memory. */
      vmMarkPagesUsed(page,pages);
 -    redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)",
 -        (unsigned char*) key->ptr,
 +    redisLog(REDIS_DEBUG,"VM: object %p swapped out at %lld (%lld pages)",
 +        (void*) val,
          (unsigned long long) page, (unsigned long long) pages);
      server.vm_stats_swapped_objects++;
      server.vm_stats_swapouts++;
 -    return REDIS_OK;
 +    return vp;
  }
  
  static robj *vmReadObjectFromSwap(off_t page, int type) {
      return o;
  }
  
 -/* Load the value object relative to the 'key' object from swap to memory.
 +/* Load the specified object from swap to memory.
   * The newly allocated object is returned.
   *
   * If preview is true the unserialized object is returned to the caller but
 - * no changes are made to the key object, nor the pages are marked as freed */
 -static robj *vmGenericLoadObject(robj *key, int preview) {
 + * the pages are not marked as freed, nor the vp object is freed. */
 +static robj *vmGenericLoadObject(vmpointer *vp, int preview) {
      robj *val;
  
 -    redisAssert(key->storage == REDIS_VM_SWAPPED || key->storage == REDIS_VM_LOADING);
 -    val = vmReadObjectFromSwap(key->vm.page,key->vtype);
 +    redisAssert(vp->type == REDIS_VMPOINTER &&
 +        (vp->storage == REDIS_VM_SWAPPED || vp->storage == REDIS_VM_LOADING));
 +    val = vmReadObjectFromSwap(vp->page,vp->vtype);
      if (!preview) {
 -        key->storage = REDIS_VM_MEMORY;
 -        key->vm.atime = server.unixtime;
 -        vmMarkPagesFree(key->vm.page,key->vm.usedpages);
 -        redisLog(REDIS_DEBUG, "VM: object %s loaded from disk",
 -            (unsigned char*) key->ptr);
 +        redisLog(REDIS_DEBUG, "VM: object %p loaded from disk", (void*)vp);
 +        vmMarkPagesFree(vp->page,vp->usedpages);
 +        zfree(vp);
          server.vm_stats_swapped_objects--;
      } else {
 -        redisLog(REDIS_DEBUG, "VM: object %s previewed from disk",
 -            (unsigned char*) key->ptr);
 +        redisLog(REDIS_DEBUG, "VM: object %p previewed from disk", (void*)vp);
      }
      server.vm_stats_swapins++;
      return val;
  }
  
 -/* Plain object loading, from swap to memory */
 -static robj *vmLoadObject(robj *key) {
 +/* Plain object loading, from swap to memory.
 + *
 + * 'o' is actually a redisVmPointer structure that will be freed by the call.
 + * The return value is the loaded object. */
 +static robj *vmLoadObject(robj *o) {
      /* If we are loading the object in background, stop it, we
       * need to load this object synchronously ASAP. */
 -    if (key->storage == REDIS_VM_LOADING)
 -        vmCancelThreadedIOJob(key);
 -    return vmGenericLoadObject(key,0);
 +    if (o->storage == REDIS_VM_LOADING)
 +        vmCancelThreadedIOJob(o);
 +    return vmGenericLoadObject((vmpointer*)o,0);
  }
  
  /* Just load the value on disk, without to modify the key.
   * This is useful when we want to perform some operation on the value
   * without to really bring it from swap to memory, like while saving the
   * dataset or rewriting the append only log. */
 -static robj *vmPreviewObject(robj *key) {
 -    return vmGenericLoadObject(key,1);
 +static robj *vmPreviewObject(robj *o) {
 +    return vmGenericLoadObject((vmpointer*)o,1);
  }
  
  /* How a good candidate is this object for swapping?
   * proportionally, this is why we use the logarithm. This algorithm is
   * just a first try and will probably be tuned later. */
  static double computeObjectSwappability(robj *o) {
 -    time_t age = server.unixtime - o->vm.atime;
 +    /* actual age can be >= minage, but not < minage. As we use wrapping
 +     * 21 bit clocks with minutes resolution for the LRU. */
 +    time_t minage = abs(server.lruclock - o->lru);
      long asize = 0;
      list *l;
      dict *d;
      struct dictEntry *de;
      int z;
  
 -    if (age <= 0) return 0;
 +    if (minage <= 0) return 0;
      switch(o->type) {
      case REDIS_STRING:
          if (o->encoding != REDIS_ENCODING_RAW) {
              long elesize;
  
              elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
 -                            (sizeof(*o)+sdslen(ele->ptr)) :
 -                            sizeof(*o);
 +                            (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
              asize += (sizeof(listNode)+elesize)*listLength(l);
          }
          break;
              de = dictGetRandomKey(d);
              ele = dictGetEntryKey(de);
              elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
 -                            (sizeof(*o)+sdslen(ele->ptr)) :
 -                            sizeof(*o);
 +                            (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
              asize += (sizeof(struct dictEntry)+elesize)*dictSize(d);
              if (z) asize += sizeof(zskiplistNode)*dictSize(d);
          }
                  de = dictGetRandomKey(d);
                  ele = dictGetEntryKey(de);
                  elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
 -                                (sizeof(*o)+sdslen(ele->ptr)) :
 -                                sizeof(*o);
 +                                (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
                  ele = dictGetEntryVal(de);
                  elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
 -                                (sizeof(*o)+sdslen(ele->ptr)) :
 -                                sizeof(*o);
 +                                (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
                  asize += (sizeof(struct dictEntry)+elesize)*dictSize(d);
              }
          }
          break;
      }
 -    return (double)age*log(1+asize);
 +    return (double)minage*log(1+asize);
  }
  
  /* Try to swap an object that's a good candidate for swapping.
@@@ -9349,11 -9311,12 +9356,11 @@@ static int vmSwapOneObject(int usethrea
              val = dictGetEntryVal(de);
              /* Only swap objects that are currently in memory.
               *
 -             * Also don't swap shared objects if threaded VM is on, as we
 -             * try to ensure that the main thread does not touch the
 +             * Also don't swap shared objects: not a good idea in general and
 +             * we need to ensure that the main thread does not touch the
               * object while the I/O thread is using it, but we can't
               * control other keys without adding additional mutex. */
 -            if (key->storage != REDIS_VM_MEMORY ||
 -                (server.vm_max_threads != 0 && val->refcount != 1)) {
 +            if (val->storage != REDIS_VM_MEMORY || val->refcount != 1) {
                  if (maxtries) i--; /* don't count this try */
                  continue;
              }
      redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f",
          key->ptr, best_swappability);
  
 -    /* Unshare the key if needed */
 -    if (key->refcount > 1) {
 -        robj *newkey = dupStringObject(key);
 -        decrRefCount(key);
 -        key = dictGetEntryKey(best) = newkey;
 -    }
      /* Swap it */
      if (usethreads) {
          vmSwapObjectThreaded(key,val,best_db);
          return REDIS_OK;
      } else {
 -        if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
 -            dictGetEntryVal(best) = NULL;
 +        vmpointer *vp;
 +
 +        if ((vp = vmSwapObjectBlocking(val)) != NULL) {
 +            dictGetEntryVal(best) = vp;
              return REDIS_OK;
          } else {
              return REDIS_ERR;
@@@ -9406,10 -9373,12 +9413,10 @@@ static int vmCanSwapOut(void) 
  /* Delete a key if swapped. Returns 1 if the key was found, was swapped
   * and was deleted. Otherwise 0 is returned. */
  static int deleteIfSwapped(redisDb *db, robj *key) {
 -    dictEntry *de;
 -    robj *foundkey;
 +    robj *val;
  
 -    if ((de = dictFind(db->dict,key)) == NULL) return 0;
 -    foundkey = dictGetEntryKey(de);
 -    if (foundkey->storage == REDIS_VM_MEMORY) return 0;
 +    if ((val = dictFetchValue(db->dict,key)) == NULL) return 0;
 +    if (val->storage == REDIS_VM_MEMORY) return 0;
      deleteKey(db,key);
      return 1;
  }
@@@ -9420,14 -9389,11 +9427,14 @@@ static void freeIOJob(iojob *j) 
      if ((j->type == REDIS_IOJOB_PREPARE_SWAP ||
          j->type == REDIS_IOJOB_DO_SWAP ||
          j->type == REDIS_IOJOB_LOAD) && j->val != NULL)
 +    {
 +         /* we fix the storage type, otherwise decrRefCount() will try to
 +          * kill the I/O thread Job (that does no longer exists). */
 +        if (j->val->storage == REDIS_VM_SWAPPING)
 +            j->val->storage = REDIS_VM_MEMORY;
          decrRefCount(j->val);
 -    /* We don't decrRefCount the j->key field as we did't incremented
 -     * the count creating IO Jobs. This is because the key field here is
 -     * just used as an indentifier and if a key is removed the Job should
 -     * never be touched again. */
 +    }
 +    decrRefCount(j->key);
      zfree(j);
  }
  
@@@ -9448,6 -9414,7 +9455,6 @@@ static void vmThreadedIOCompletedJob(ae
      while((retval = read(fd,buf,1)) == 1) {
          iojob *j;
          listNode *ln;
 -        robj *key;
          struct dictEntry *de;
  
          redisLog(REDIS_DEBUG,"Processing I/O completed job");
          }
          /* Post process it in the main thread, as there are things we
           * can do just here to avoid race conditions and/or invasive locks */
 -        redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
 +        redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr);
          de = dictFind(j->db->dict,j->key);
 -        assert(de != NULL);
 -        key = dictGetEntryKey(de);
 +        redisAssert(de != NULL);
          if (j->type == REDIS_IOJOB_LOAD) {
              redisDb *db;
 +            vmpointer *vp = dictGetEntryVal(de);
  
              /* Key loaded, bring it at home */
 -            key->storage = REDIS_VM_MEMORY;
 -            key->vm.atime = server.unixtime;
 -            vmMarkPagesFree(key->vm.page,key->vm.usedpages);
 +            vmMarkPagesFree(vp->page,vp->usedpages);
              redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
 -                (unsigned char*) key->ptr);
 +                (unsigned char*) j->key->ptr);
              server.vm_stats_swapped_objects--;
              server.vm_stats_swapins++;
              dictGetEntryVal(de) = j->val;
              incrRefCount(j->val);
              db = j->db;
 -            freeIOJob(j);
              /* Handle clients waiting for this key to be loaded. */
 -            handleClientsBlockedOnSwappedKey(db,key);
 +            handleClientsBlockedOnSwappedKey(db,j->key);
 +            freeIOJob(j);
 +            zfree(vp);
          } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
              /* Now we know the amount of pages required to swap this object.
               * Let's find some space for it, and queue this task again
              {
                  /* Ooops... no space or we can't swap as there is
                   * a fork()ed Redis trying to save stuff on disk. */
 +                j->val->storage = REDIS_VM_MEMORY; /* undo operation */
                  freeIOJob(j);
 -                key->storage = REDIS_VM_MEMORY; /* undo operation */
              } else {
                  /* Note that we need to mark this pages as used now,
                   * if the job will be canceled, we'll mark them as freed
                  unlockThreadedIO();
              }
          } else if (j->type == REDIS_IOJOB_DO_SWAP) {
 -            robj *val;
 +            vmpointer *vp;
  
              /* Key swapped. We can finally free some memory. */
 -            if (key->storage != REDIS_VM_SWAPPING) {
 -                printf("key->storage: %d\n",key->storage);
 -                printf("key->name: %s\n",(char*)key->ptr);
 -                printf("key->refcount: %d\n",key->refcount);
 +            if (j->val->storage != REDIS_VM_SWAPPING) {
 +                vmpointer *vp = (vmpointer*) j->id;
 +                printf("storage: %d\n",vp->storage);
 +                printf("key->name: %s\n",(char*)j->key->ptr);
                  printf("val: %p\n",(void*)j->val);
                  printf("val->type: %d\n",j->val->type);
                  printf("val->ptr: %s\n",(char*)j->val->ptr);
              }
 -            redisAssert(key->storage == REDIS_VM_SWAPPING);
 -            val = dictGetEntryVal(de);
 -            key->vm.page = j->page;
 -            key->vm.usedpages = j->pages;
 -            key->storage = REDIS_VM_SWAPPED;
 -            key->vtype = j->val->type;
 -            decrRefCount(val); /* Deallocate the object from memory. */
 -            dictGetEntryVal(de) = NULL;
 +            redisAssert(j->val->storage == REDIS_VM_SWAPPING);
 +            vp = createVmPointer(j->val->type);
 +            vp->page = j->page;
 +            vp->usedpages = j->pages;
 +            dictGetEntryVal(de) = vp;
 +            /* Fix the storage otherwise decrRefCount will attempt to
 +             * remove the associated I/O job */
 +            j->val->storage = REDIS_VM_MEMORY;
 +            decrRefCount(j->val);
              redisLog(REDIS_DEBUG,
                  "VM: object %s swapped out at %lld (%lld pages) (threaded)",
 -                (unsigned char*) key->ptr,
 +                (unsigned char*) j->key->ptr,
                  (unsigned long long) j->page, (unsigned long long) j->pages);
              server.vm_stats_swapped_objects++;
              server.vm_stats_swapouts++;
@@@ -9589,7 -9556,7 +9596,7 @@@ static void vmCancelThreadedIOJob(robj 
      assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING);
  again:
      lockThreadedIO();
 -    /* Search for a matching key in one of the queues */
 +    /* Search for a matching object in one of the queues */
      for (i = 0; i < 3; i++) {
          listNode *ln;
          listIter li;
              iojob *job = ln->value;
  
              if (job->canceled) continue; /* Skip this, already canceled. */
 -            if (job->key == o) {
 -                redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n",
 -                    (void*)job, (char*)o->ptr, job->type, i);
 +            if (job->id == o) {
 +                redisLog(REDIS_DEBUG,"*** CANCELED %p (key %s) (type %d) (LIST ID %d)\n",
 +                    (void*)job, (char*)job->key->ptr, job->type, i);
                  /* Mark the pages as free since the swap didn't happened
                   * or happened but is now discarded. */
                  if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP)
                  else if (o->storage == REDIS_VM_SWAPPING)
                      o->storage = REDIS_VM_MEMORY;
                  unlockThreadedIO();
 +                redisLog(REDIS_DEBUG,"*** DONE");
                  return;
              }
          }
      }
      unlockThreadedIO();
 -    assert(1 != 1); /* We should never reach this */
 +    printf("Not found: %p\n", (void*)o);
 +    redisAssert(1 != 1); /* We should never reach this */
  }
  
  static void *IOThreadEntryPoint(void *arg) {
  
          /* Process the Job */
          if (j->type == REDIS_IOJOB_LOAD) {
 -            j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
 +            vmpointer *vp = (vmpointer*)j->id;
 +            j->val = vmReadObjectFromSwap(j->page,vp->vtype);
          } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
              FILE *fp = fopen("/dev/null","w+");
              j->pages = rdbSavedObjectPages(j->val,fp);
@@@ -9787,17 -9751,17 +9794,17 @@@ static int vmSwapObjectThreaded(robj *k
      iojob *j;
  
      assert(key->storage == REDIS_VM_MEMORY);
 -    assert(key->refcount == 1);
  
      j = zmalloc(sizeof(*j));
      j->type = REDIS_IOJOB_PREPARE_SWAP;
      j->db = db;
      j->key = key;
 -    j->val = val;
 +    incrRefCount(key);
 +    j->id = j->val = val;
      incrRefCount(val);
      j->canceled = 0;
      j->thread = (pthread_t) -1;
 -    key->storage = REDIS_VM_SWAPPING;
 +    val->storage = REDIS_VM_SWAPPING;
  
      lockThreadedIO();
      queueIOJob(j);
@@@ -9821,7 -9785,7 +9828,7 @@@ static int waitForSwappedKey(redisClien
       * block the client at all. */
      de = dictFind(c->db->dict,key);
      if (de == NULL) return 0;
 -    o = dictGetEntryKey(de);
 +    o = dictGetEntryVal(de);
      if (o->storage == REDIS_VM_MEMORY) {
          return 0;
      } else if (o->storage == REDIS_VM_SWAPPING) {
      /* Are we already loading the key from disk? If not create a job */
      if (o->storage == REDIS_VM_SWAPPED) {
          iojob *j;
 +        vmpointer *vp = (vmpointer*)o;
  
          o->storage = REDIS_VM_LOADING;
          j = zmalloc(sizeof(*j));
          j->type = REDIS_IOJOB_LOAD;
          j->db = c->db;
 -        j->key = o;
 -        j->key->vtype = o->vtype;
 -        j->page = o->vm.page;
 +        j->id = (robj*)vp;
 +        j->key = key;
 +        incrRefCount(key);
 +        j->page = vp->page;
          j->val = NULL;
          j->canceled = 0;
          j->thread = (pthread_t) -1;
@@@ -9989,8 -9951,6 +9996,8 @@@ static int dontWaitForSwappedKey(redisC
      return listLength(c->io_keys) == 0;
  }
  
 +/* Every time we now a key was loaded back in memory, we handle clients
 + * waiting for this key if any. */
  static void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key) {
      struct dictEntry *de;
      list *l;
@@@ -10847,8 -10807,8 +10854,8 @@@ static void debugCommand(redisClient *c
          }
          key = dictGetEntryKey(de);
          val = dictGetEntryVal(de);
 -        if (!server.vm_enabled || (key->storage == REDIS_VM_MEMORY ||
 -                                   key->storage == REDIS_VM_SWAPPING)) {
 +        if (!server.vm_enabled || (val->storage == REDIS_VM_MEMORY ||
 +                                   val->storage == REDIS_VM_SWAPPING)) {
              char *strenc;
              char buf[128];
  
                  (void*)key, key->refcount, (void*)val, val->refcount,
                  strenc, (long long) rdbSavedObjectLen(val,NULL)));
          } else {
 +            vmpointer *vp = (vmpointer*) val;
              addReplySds(c,sdscatprintf(sdsempty(),
                  "+Key at:%p refcount:%d, value swapped at: page %llu "
                  "using %llu pages\r\n",
 -                (void*)key, key->refcount, (unsigned long long) key->vm.page,
 -                (unsigned long long) key->vm.usedpages));
 +                (void*)key, key->refcount, (unsigned long long) vp->page,
 +                (unsigned long long) vp->usedpages));
          }
      } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) {
          lookupKeyRead(c->db,c->argv[2]);
      } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) {
          dictEntry *de = dictFind(c->db->dict,c->argv[2]);
          robj *key, *val;
 +        vmpointer *vp;
  
          if (!server.vm_enabled) {
              addReplySds(c,sdsnew("-ERR Virtual Memory is disabled\r\n"));
          }
          key = dictGetEntryKey(de);
          val = dictGetEntryVal(de);
 -        /* If the key is shared we want to create a copy */
 -        if (key->refcount > 1) {
 -            robj *newkey = dupStringObject(key);
 -            decrRefCount(key);
 -            key = dictGetEntryKey(de) = newkey;
 -        }
          /* Swap it */
 -        if (key->storage != REDIS_VM_MEMORY) {
 +        if (val->storage != REDIS_VM_MEMORY) {
              addReplySds(c,sdsnew("-ERR This key is not in memory\r\n"));
 -        } else if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
 -            dictGetEntryVal(de) = NULL;
 +        } else if (val->refcount != 1) {
 +            addReplySds(c,sdsnew("-ERR Object is shared\r\n"));
 +        } else if ((vp = vmSwapObjectBlocking(val)) != NULL) {
 +            dictGetEntryVal(de) = vp;
              addReply(c,shared.ok);
          } else {
              addReply(c,shared.err);