]> git.saurik.com Git - redis.git/blobdiff - redis.c
implemented HMSET
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index e4ddd99c0431dde468c651843883bbbabedbdd76..6b23c7bc3b17a80ca305d1c79e1538ee7f9eb5c7 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -342,8 +342,6 @@ struct redisServer {
     int port;
     int fd;
     redisDb *db;
-    dict *sharingpool;          /* Poll used for object sharing */
-    unsigned int sharingpoolsize;
     long long dirty;            /* changes to DB from the last save */
     list *clients;
     list *slaves, *monitors;
@@ -507,6 +505,7 @@ typedef struct zset {
 
 /* Our shared "common" objects */
 
+#define REDIS_SHARED_INTEGERS 10000
 struct sharedObjectsStruct {
     robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
     *colon, *nullbulk, *nullmultibulk, *queued,
@@ -515,7 +514,7 @@ struct sharedObjectsStruct {
     *select0, *select1, *select2, *select3, *select4,
     *select5, *select6, *select7, *select8, *select9,
     *messagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
-    *psubscribebulk, *punsubscribebulk;
+    *psubscribebulk, *punsubscribebulk, *integers[REDIS_SHARED_INTEGERS];
 } shared;
 
 /* Global vars that are actally used as constants. The following double
@@ -558,8 +557,7 @@ static robj *dupStringObject(robj *o);
 static void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
 static int syncWithMaster(void);
-static robj *tryObjectSharing(robj *o);
-static int tryObjectEncoding(robj *o);
+static robj *tryObjectEncoding(robj *o);
 static robj *getDecodedObject(robj *o);
 static int removeExpire(redisDb *db, robj *key);
 static int expireIfNeeded(redisDb *db, robj *key);
@@ -706,6 +704,7 @@ static void substrCommand(redisClient *c);
 static void zrankCommand(redisClient *c);
 static void zrevrankCommand(redisClient *c);
 static void hsetCommand(redisClient *c);
+static void hmsetCommand(redisClient *c);
 static void hgetCommand(redisClient *c);
 static void hdelCommand(redisClient *c);
 static void hlenCommand(redisClient *c);
@@ -782,6 +781,7 @@ static struct redisCommand cmdTable[] = {
     {"zrank",zrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
     {"zrevrank",zrevrankCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
     {"hset",hsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1},
+    {"hmset",hmsetCommand,-4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,NULL,1,1,1},
     {"hincrby",hincrbyCommand,4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,NULL,1,1,1},
     {"hget",hgetCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
     {"hdel",hdelCommand,3,REDIS_CMD_BULK,NULL,1,1,1},
@@ -1223,7 +1223,7 @@ void backgroundSaveDoneHandler(int statloc) {
         redisLog(REDIS_WARNING, "Background saving error");
     } else {
         redisLog(REDIS_WARNING,
-            "Background saving terminated by signal");
+            "Background saving terminated by signal %d", WTERMSIG(statloc));
         rdbRemoveTempFile(server.bgsavechildpid);
     }
     server.bgsavechildpid = -1;
@@ -1284,7 +1284,8 @@ void backgroundRewriteDoneHandler(int statloc) {
         redisLog(REDIS_WARNING, "Background append only file rewriting error");
     } else {
         redisLog(REDIS_WARNING,
-            "Background append only file rewriting terminated by signal");
+            "Background append only file rewriting terminated by signal %d",
+            WTERMSIG(statloc));
     }
 cleanup:
     sdsfree(server.bgrewritebuf);
@@ -1293,6 +1294,19 @@ cleanup:
     server.bgrewritechildpid = -1;
 }
 
+/* This function is called once a background process of some kind terminates,
+ * as we want to avoid resizing the hash tables when there is a child in order
+ * to play well with copy-on-write (otherwise when a resize happens lots of
+ * memory pages are copied). The goal of this function is to update the ability
+ * for dict.c to resize the hash tables accordingly to the fact we have o not
+ * running childs. */
+static void updateDictResizePolicy(void) {
+    if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
+        dictEnableResize();
+    else
+        dictDisableResize();
+}
+
 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     int j, loops = server.cronloops++;
     REDIS_NOTUSED(eventLoop);
@@ -1324,15 +1338,18 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
      * if we resize the HT while there is the saving child at work actually
      * a lot of memory movements in the parent will cause a lot of pages
      * copied. */
-    if (server.bgsavechildpid == -1 && !(loops % 10)) tryResizeHashTables();
+    if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1 &&
+        !(loops % 10))
+    {
+        tryResizeHashTables();
+    }
 
     /* Show information about connected clients */
     if (!(loops % 50)) {
-        redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
+        redisLog(REDIS_VERBOSE,"%d clients connected (%d slaves), %zu bytes in use",
             listLength(server.clients)-listLength(server.slaves),
             listLength(server.slaves),
-            zmalloc_used_memory(),
-            dictSize(server.sharingpool));
+            zmalloc_used_memory());
     }
 
     /* Close connections of timedout clients */
@@ -1350,6 +1367,7 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
             } else {
                 backgroundRewriteDoneHandler(statloc);
             }
+            updateDictResizePolicy();
         }
     } else {
         /* If there is not a background saving in progress check if
@@ -1469,6 +1487,8 @@ static void beforeSleep(struct aeEventLoop *eventLoop) {
 }
 
 static void createSharedObjects(void) {
+    int j;
+
     shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
     shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
     shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
@@ -1509,6 +1529,10 @@ static void createSharedObjects(void) {
     shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
     shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
     shared.mbulk3 = createStringObject("*3\r\n",4);
+    for (j = 0; j < REDIS_SHARED_INTEGERS; j++) {
+        shared.integers[j] = createObject(REDIS_STRING,(void*)(long)j);
+        shared.integers[j]->encoding = REDIS_ENCODING_INT;
+    }
 }
 
 static void appendServerSaveParams(time_t seconds, int changes) {
@@ -1545,7 +1569,6 @@ static void initServerConfig() {
     server.requirepass = NULL;
     server.shareobjects = 0;
     server.rdbcompression = 1;
-    server.sharingpoolsize = 1024;
     server.maxclients = 0;
     server.blpop_blocked_clients = 0;
     server.maxmemory = 0;
@@ -1598,7 +1621,6 @@ static void initServer() {
     createSharedObjects();
     server.el = aeCreateEventLoop();
     server.db = zmalloc(sizeof(redisDb)*server.dbnum);
-    server.sharingpool = dictCreate(&setDictType,NULL);
     server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
     if (server.fd == -1) {
         redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
@@ -1783,11 +1805,6 @@ static void loadServerConfig(char *filename) {
             if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
             }
-        } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
-            server.sharingpoolsize = atoi(argv[1]);
-            if (server.sharingpoolsize < 1) {
-                err = "invalid object sharing pool size"; goto loaderr;
-            }
         } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
             if ((server.daemonize = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
@@ -2267,15 +2284,9 @@ static int processCommand(redisClient *c) {
             return 1;
         }
     }
-    /* Let's try to share objects on the command arguments vector */
-    if (server.shareobjects) {
-        int j;
-        for(j = 1; j < c->argc; j++)
-            c->argv[j] = tryObjectSharing(c->argv[j]);
-    }
     /* Let's try to encode the bulk object to save space. */
     if (cmd->flags & REDIS_CMD_BULK)
-        tryObjectEncoding(c->argv[c->argc-1]);
+        c->argv[c->argc-1] = tryObjectEncoding(c->argv[c->argc-1]);
 
     /* Check if the user is authenticated */
     if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
@@ -2601,6 +2612,21 @@ static void addReplyLong(redisClient *c, long l) {
     addReplySds(c,sdsnewlen(buf,len));
 }
 
+static void addReplyLongLong(redisClient *c, long long ll) {
+    char buf[128];
+    size_t len;
+
+    if (ll == 0) {
+        addReply(c,shared.czero);
+        return;
+    } else if (ll == 1) {
+        addReply(c,shared.cone);
+        return;
+    }
+    len = snprintf(buf,sizeof(buf),":%lld\r\n",ll);
+    addReplySds(c,sdsnewlen(buf,len));
+}
+
 static void addReplyUlong(redisClient *c, unsigned long ul) {
     char buf[128];
     size_t len;
@@ -2800,7 +2826,6 @@ static void freeHashObject(robj *o) {
 }
 
 static void incrRefCount(robj *o) {
-    redisAssert(!server.vm_enabled || o->storage == REDIS_VM_MEMORY);
     o->refcount++;
 }
 
@@ -2812,9 +2837,6 @@ static void decrRefCount(void *obj) {
     if (server.vm_enabled &&
         (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
     {
-        if (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING) {
-            redisAssert(o->refcount == 1);
-        }
         if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(obj);
         redisAssert(o->type == REDIS_STRING);
         freeStringObject(o);
@@ -2927,51 +2949,6 @@ static int deleteKey(redisDb *db, robj *key) {
     return retval == DICT_OK;
 }
 
-/* Try to share an object against the shared objects pool */
-static robj *tryObjectSharing(robj *o) {
-    struct dictEntry *de;
-    unsigned long c;
-
-    if (o == NULL || server.shareobjects == 0) return o;
-
-    redisAssert(o->type == REDIS_STRING);
-    de = dictFind(server.sharingpool,o);
-    if (de) {
-        robj *shared = dictGetEntryKey(de);
-
-        c = ((unsigned long) dictGetEntryVal(de))+1;
-        dictGetEntryVal(de) = (void*) c;
-        incrRefCount(shared);
-        decrRefCount(o);
-        return shared;
-    } else {
-        /* Here we are using a stream algorihtm: Every time an object is
-         * shared we increment its count, everytime there is a miss we
-         * recrement the counter of a random object. If this object reaches
-         * zero we remove the object and put the current object instead. */
-        if (dictSize(server.sharingpool) >=
-                server.sharingpoolsize) {
-            de = dictGetRandomKey(server.sharingpool);
-            redisAssert(de != NULL);
-            c = ((unsigned long) dictGetEntryVal(de))-1;
-            dictGetEntryVal(de) = (void*) c;
-            if (c == 0) {
-                dictDelete(server.sharingpool,de->key);
-            }
-        } else {
-            c = 0; /* If the pool is empty we want to add this object */
-        }
-        if (c == 0) {
-            int retval;
-
-            retval = dictAdd(server.sharingpool,o,(void*)1);
-            redisAssert(retval == DICT_OK);
-            incrRefCount(o);
-        }
-        return o;
-    }
-}
-
 /* Check if the nul-terminated string 's' can be represented by a long
  * (that is, is a number that fits into long without any other space or
  * character before or after the digits).
@@ -2995,29 +2972,35 @@ static int isStringRepresentableAsLong(sds s, long *longval) {
 }
 
 /* Try to encode a string object in order to save space */
-static int tryObjectEncoding(robj *o) {
+static robj *tryObjectEncoding(robj *o) {
     long value;
     sds s = o->ptr;
 
     if (o->encoding != REDIS_ENCODING_RAW)
-        return REDIS_ERR; /* Already encoded */
+        return o; /* Already encoded */
 
-    /* It's not save to encode shared objects: shared objects can be shared
+    /* It's not safe to encode shared objects: shared objects can be shared
      * everywhere in the "object space" of Redis. Encoded objects can only
      * appear as "values" (and not, for instance, as keys) */
-     if (o->refcount > 1) return REDIS_ERR;
+     if (o->refcount > 1) return o;
 
     /* Currently we try to encode only strings */
     redisAssert(o->type == REDIS_STRING);
 
     /* Check if we can represent this string as a long integer */
-    if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
+    if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return o;
 
     /* Ok, this object can be encoded */
-    o->encoding = REDIS_ENCODING_INT;
-    sdsfree(o->ptr);
-    o->ptr = (void*) value;
-    return REDIS_OK;
+    if (value >= 0 && value < REDIS_SHARED_INTEGERS) {
+        decrRefCount(o);
+        incrRefCount(shared.integers[value]);
+        return shared.integers[value];
+    } else {
+        o->encoding = REDIS_ENCODING_INT;
+        sdsfree(o->ptr);
+        o->ptr = (void*) value;
+        return o;
+    }
 }
 
 /* Get a decoded version of an encoded object (returned as a new object).
@@ -3481,6 +3464,7 @@ static int rdbSaveBackground(char *filename) {
         }
         redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
         server.bgsavechildpid = childpid;
+        updateDictResizePolicy();
         return REDIS_OK;
     }
     return REDIS_OK; /* unreached */
@@ -3590,9 +3574,9 @@ static robj *rdbLoadStringObject(FILE*fp) {
         case REDIS_RDB_ENC_INT8:
         case REDIS_RDB_ENC_INT16:
         case REDIS_RDB_ENC_INT32:
-            return tryObjectSharing(rdbLoadIntegerObject(fp,len));
+            return rdbLoadIntegerObject(fp,len);
         case REDIS_RDB_ENC_LZF:
-            return tryObjectSharing(rdbLoadLzfStringObject(fp));
+            return rdbLoadLzfStringObject(fp);
         default:
             redisAssert(0);
         }
@@ -3604,7 +3588,7 @@ static robj *rdbLoadStringObject(FILE*fp) {
         sdsfree(val);
         return NULL;
     }
-    return tryObjectSharing(createObject(REDIS_STRING,val));
+    return createObject(REDIS_STRING,val);
 }
 
 /* For information about double serialization check rdbSaveDoubleValue() */
@@ -3634,7 +3618,7 @@ static robj *rdbLoadObject(int type, FILE *fp) {
     if (type == REDIS_STRING) {
         /* Read string value */
         if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
-        tryObjectEncoding(o);
+        o = tryObjectEncoding(o);
     } else if (type == REDIS_LIST || type == REDIS_SET) {
         /* Read list/set value */
         uint32_t listlen;
@@ -3650,7 +3634,7 @@ static robj *rdbLoadObject(int type, FILE *fp) {
             robj *ele;
 
             if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
-            tryObjectEncoding(ele);
+            ele = tryObjectEncoding(ele);
             if (type == REDIS_LIST) {
                 listAddNodeTail((list*)o->ptr,ele);
             } else {
@@ -3671,7 +3655,7 @@ static robj *rdbLoadObject(int type, FILE *fp) {
             double *score = zmalloc(sizeof(double));
 
             if ((ele = rdbLoadStringObject(fp)) == NULL) return NULL;
-            tryObjectEncoding(ele);
+            ele = tryObjectEncoding(ele);
             if (rdbLoadDoubleValue(fp,score) == -1) return NULL;
             dictAdd(zs->dict,ele,score);
             zslInsert(zs->zsl,*score,ele);
@@ -3710,8 +3694,8 @@ static robj *rdbLoadObject(int type, FILE *fp) {
                 decrRefCount(key);
                 decrRefCount(val);
             } else {
-                tryObjectEncoding(key);
-                tryObjectEncoding(val);
+                key = tryObjectEncoding(key);
+                val = tryObjectEncoding(val);
                 dictAdd((dict*)o->ptr,key,val);
             }
         }
@@ -3937,7 +3921,7 @@ static void msetGenericCommand(redisClient *c, int nx) {
     for (j = 1; j < c->argc; j += 2) {
         int retval;
 
-        tryObjectEncoding(c->argv[j+1]);
+        c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
         retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
         if (retval == DICT_ERR) {
             dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
@@ -3985,7 +3969,7 @@ static void incrDecrCommand(redisClient *c, long long incr) {
 
     value += incr;
     o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
-    tryObjectEncoding(o);
+    o = tryObjectEncoding(o);
     retval = dictAdd(c->db->dict,c->argv[1],o);
     if (retval == DICT_ERR) {
         dictReplace(c->db->dict,c->argv[1],o);
@@ -5091,7 +5075,7 @@ static int zslRandomLevel(void) {
     int level = 1;
     while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
         level += 1;
-    return level;
+    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
 }
 
 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
@@ -6008,7 +5992,7 @@ static void hsetCommand(redisClient *c) {
         if (zipmapLen(zm) > server.hash_max_zipmap_entries)
             convertToRealHash(o);
     } else {
-        tryObjectEncoding(c->argv[2]);
+        c->argv[2] = tryObjectEncoding(c->argv[2]);
         /* note that c->argv[3] is already encoded, as the latest arg
          * of a bulk command is always integer encoded if possible. */
         if (dictReplace(o->ptr,c->argv[2],c->argv[3])) {
@@ -6022,6 +6006,70 @@ static void hsetCommand(redisClient *c) {
     addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",update == 0));
 }
 
+static void hmsetCommand(redisClient *c) {
+    int i;
+    robj *o, *key, *val;
+
+    if ((c->argc % 2) == 1) {
+        addReplySds(c,sdsnew("-ERR wrong number of arguments for HMSET\r\n"));
+        return;
+    }
+
+    if ((o = lookupKeyWrite(c->db,c->argv[1])) == NULL) {
+        o = createHashObject();
+        dictAdd(c->db->dict,c->argv[1],o);
+        incrRefCount(c->argv[1]);
+    } else {
+        if (o->type != REDIS_HASH) {
+            addReply(c,shared.wrongtypeerr);
+            return;
+        }
+    }
+
+    /* We want to convert the zipmap into an hash table right now if the
+     * entry to be added is too big. */
+    if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+        for (i = 2; i < c->argc; i+=2) {
+            if ((c->argv[i]->encoding == REDIS_ENCODING_RAW &&
+                  sdslen(c->argv[i]->ptr) > server.hash_max_zipmap_value) ||
+                (c->argv[i+1]->encoding == REDIS_ENCODING_RAW &&
+                  sdslen(c->argv[i+1]->ptr) > server.hash_max_zipmap_value)) {
+                convertToRealHash(o);
+                break;
+            }
+        }
+    }
+
+    if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+        unsigned char *zm = o->ptr;
+
+        for (i = 2; i < c->argc; i+=2) {
+            key = getDecodedObject(c->argv[i]);
+            val = getDecodedObject(c->argv[i+1]);
+            zm = zipmapSet(zm,key->ptr,sdslen(key->ptr),
+                              val->ptr,sdslen(val->ptr),NULL);
+            decrRefCount(key);
+            decrRefCount(val);
+            o->ptr = zm;
+        }
+
+        /* And here there is the second check for hash conversion. */
+        if (zipmapLen(zm) > server.hash_max_zipmap_entries)
+            convertToRealHash(o);
+    } else {
+        for (i = 2; i < c->argc; i+=2) {
+            key = tryObjectEncoding(c->argv[i]);
+            val = tryObjectEncoding(c->argv[i+1]);
+            if (dictReplace(o->ptr,key,val)) {
+                incrRefCount(key);
+            }
+            incrRefCount(val);
+        }
+    }
+
+    addReply(c, shared.ok);
+}
+
 static void hincrbyCommand(redisClient *c) {
     long long value = 0, incr = 0;
     robj *o = lookupKeyWrite(c->db,c->argv[1]);
@@ -6081,14 +6129,14 @@ static void hincrbyCommand(redisClient *c) {
 
         value += incr;
         hval = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
-        tryObjectEncoding(hval);
+        hval = tryObjectEncoding(hval);
         if (dictReplace(o->ptr,c->argv[2],hval)) {
             incrRefCount(c->argv[2]);
         }
     }
 
     server.dirty++;
-    addReplyLong(c, value);
+    addReplyLongLong(c, value);
 }
 
 static void hgetCommand(redisClient *c) {
@@ -6266,8 +6314,8 @@ static void convertToRealHash(robj *o) {
 
         keyobj = createStringObject((char*)key,klen);
         valobj = createStringObject((char*)val,vlen);
-        tryObjectEncoding(keyobj);
-        tryObjectEncoding(valobj);
+        keyobj = tryObjectEncoding(keyobj);
+        valobj = tryObjectEncoding(valobj);
         dictAdd(dict,keyobj,valobj);
     }
     o->encoding = REDIS_ENCODING_HT;
@@ -7777,14 +7825,9 @@ int loadAppendOnlyFile(char *filename) {
             redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
             exit(1);
         }
-        /* Try object sharing and encoding */
-        if (server.shareobjects) {
-            int j;
-            for(j = 1; j < argc; j++)
-                argv[j] = tryObjectSharing(argv[j]);
-        }
+        /* Try object encoding */
         if (cmd->flags & REDIS_CMD_BULK)
-            tryObjectEncoding(argv[argc-1]);
+            argv[argc-1] = tryObjectEncoding(argv[argc-1]);
         /* Run the command in the context of a fake client */
         fakeClient->argc = argc;
         fakeClient->argv = argv;
@@ -8100,6 +8143,7 @@ static int rewriteAppendOnlyFileBackground(void) {
         redisLog(REDIS_NOTICE,
             "Background append only file rewriting started by pid %d",childpid);
         server.bgrewritechildpid = childpid;
+        updateDictResizePolicy();
         /* We set appendseldb to -1 in order to force the next call to the
          * feedAppendOnlyFile() to issue a SELECT command, so the differences
          * accumulated by the parent into server.bgrewritebuf will start
@@ -8667,7 +8711,10 @@ static void freeIOJob(iojob *j) {
         j->type == REDIS_IOJOB_DO_SWAP ||
         j->type == REDIS_IOJOB_LOAD) && j->val != NULL)
         decrRefCount(j->val);
-    decrRefCount(j->key);
+    /* We don't decrRefCount the j->key field as we did't incremented
+     * the count creating IO Jobs. This is because the key field here is
+     * just used as an indentifier and if a key is removed the Job should
+     * never be touched again. */
     zfree(j);
 }
 
@@ -8840,7 +8887,7 @@ again:
             iojob *job = ln->value;
 
             if (job->canceled) continue; /* Skip this, already canceled. */
-            if (compareStringObjects(job->key,o) == 0) {
+            if (job->key == o) {
                 redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n",
                     (void*)job, (char*)o->ptr, job->type, i);
                 /* Mark the pages as free since the swap didn't happened
@@ -9030,7 +9077,7 @@ static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
     j = zmalloc(sizeof(*j));
     j->type = REDIS_IOJOB_PREPARE_SWAP;
     j->db = db;
-    j->key = dupStringObject(key);
+    j->key = key;
     j->val = val;
     incrRefCount(val);
     j->canceled = 0;
@@ -9098,7 +9145,7 @@ static int waitForSwappedKey(redisClient *c, robj *key) {
         j = zmalloc(sizeof(*j));
         j->type = REDIS_IOJOB_LOAD;
         j->db = c->db;
-        j->key = dupStringObject(key);
+        j->key = o;
         j->key->vtype = o->vtype;
         j->page = o->vm.page;
         j->val = NULL;
@@ -9617,6 +9664,9 @@ static void debugCommand(redisClient *c) {
                 (void*)key, key->refcount, (unsigned long long) key->vm.page,
                 (unsigned long long) key->vm.usedpages));
         }
+    } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) {
+        lookupKeyRead(c->db,c->argv[2]);
+        addReply(c,shared.ok);
     } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) {
         dictEntry *de = dictFind(c->db->dict,c->argv[2]);
         robj *key, *val;
@@ -9648,7 +9698,7 @@ static void debugCommand(redisClient *c) {
         }
     } else {
         addReplySds(c,sdsnew(
-            "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPOUT <key>|RELOAD]\r\n"));
+            "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|SWAPIN <key>|SWAPOUT <key>|RELOAD]\r\n"));
     }
 }