]> git.saurik.com Git - redis.git/blobdiff - redis.c
removed a bug in the function to cancel an I/O job
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index ad715f7824cf45fec31ddc53d05cd35b6452fcb4..4aacc41bf45f3288d70d7197978e7640699ef7ed 100644 (file)
--- a/redis.c
+++ b/redis.c
 #define REDIS_VM_MAX_NEAR_PAGES 65536
 #define REDIS_VM_MAX_RANDOM_JUMP 4096
 #define REDIS_VM_MAX_THREADS 32
 #define REDIS_VM_MAX_NEAR_PAGES 65536
 #define REDIS_VM_MAX_RANDOM_JUMP 4096
 #define REDIS_VM_MAX_THREADS 32
+#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
 /* The following is the number of completed I/O jobs to process when the
  * handelr is called. 1 is the minimum, and also the default, as it allows
  * to block as little as possible other accessing clients. While Virtual
 /* The following is the number of completed I/O jobs to process when the
  * handelr is called. 1 is the minimum, and also the default, as it allows
  * to block as little as possible other accessing clients. While Virtual
@@ -382,6 +383,7 @@ struct redisServer {
     int sort_bypattern;
     /* Virtual memory configuration */
     int vm_enabled;
     int sort_bypattern;
     /* Virtual memory configuration */
     int vm_enabled;
+    char *vm_swap_file;
     off_t vm_page_size;
     off_t vm_pages;
     unsigned long long vm_max_memory;
     off_t vm_page_size;
     off_t vm_pages;
     unsigned long long vm_max_memory;
@@ -403,6 +405,7 @@ struct redisServer {
     pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
     pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */
     pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
     pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
     pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */
     pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
+    pthread_attr_t io_threads_attr; /* attributes for threads creation */
     int io_active_threads; /* Number of running I/O threads */
     int vm_max_threads; /* Max number of I/O threads running at the same time */
     /* Our main thread is blocked on the event loop, locking for sockets ready
     int io_active_threads; /* Number of running I/O threads */
     int vm_max_threads; /* Max number of I/O threads running at the same time */
     /* Our main thread is blocked on the event loop, locking for sockets ready
@@ -561,6 +564,9 @@ static void freeIOJob(iojob *j);
 static void queueIOJob(iojob *j);
 static int vmWriteObjectOnSwap(robj *o, off_t page);
 static robj *vmReadObjectFromSwap(off_t page, int type);
 static void queueIOJob(iojob *j);
 static int vmWriteObjectOnSwap(robj *o, off_t page);
 static robj *vmReadObjectFromSwap(off_t page, int type);
+static void waitEmptyIOJobsQueue(void);
+static void vmReopenSwapFile(void);
+static int vmFreePage(off_t page);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
@@ -866,7 +872,7 @@ static void redisLog(int level, const char *fmt, ...) {
 
         now = time(NULL);
         strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
 
         now = time(NULL);
         strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
-        fprintf(fp,"%s %c ",buf,c[level]);
+        fprintf(fp,"[%d] %s %c ",(int)getpid(),buf,c[level]);
         vfprintf(fp, fmt, ap);
         fprintf(fp,"\n");
         fflush(fp);
         vfprintf(fp, fmt, ap);
         fprintf(fp,"\n");
         fflush(fp);
@@ -1018,9 +1024,10 @@ static void closeTimedoutClients(void) {
     redisClient *c;
     listNode *ln;
     time_t now = time(NULL);
     redisClient *c;
     listNode *ln;
     time_t now = time(NULL);
+    listIter li;
 
 
-    listRewind(server.clients);
-    while ((ln = listYield(server.clients)) != NULL) {
+    listRewind(server.clients,&li);
+    while ((ln = listNext(&li)) != NULL) {
         c = listNodeValue(ln);
         if (server.maxidletime &&
             !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
         c = listNodeValue(ln);
         if (server.maxidletime &&
             !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
@@ -1368,6 +1375,7 @@ static void initServerConfig() {
     server.blockedclients = 0;
     server.maxmemory = 0;
     server.vm_enabled = 0;
     server.blockedclients = 0;
     server.maxmemory = 0;
     server.vm_enabled = 0;
+    server.vm_swap_file = zstrdup("/tmp/redis-%p.vm");
     server.vm_page_size = 256;          /* 256 bytes per page */
     server.vm_pages = 1024*1024*100;    /* 104 millions of pages */
     server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
     server.vm_page_size = 256;          /* 256 bytes per page */
     server.vm_pages = 1024*1024*100;    /* 104 millions of pages */
     server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
@@ -1611,15 +1619,18 @@ static void loadServerConfig(char *filename) {
                 goto loaderr;
             }
         } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
                 goto loaderr;
             }
         } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
-          server.requirepass = zstrdup(argv[1]);
+            server.requirepass = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
         } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
-          server.pidfile = zstrdup(argv[1]);
+            server.pidfile = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
         } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
-          server.dbfilename = zstrdup(argv[1]);
+            server.dbfilename = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
             if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
             }
         } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
             if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
                 err = "argument must be 'yes' or 'no'"; goto loaderr;
             }
+        } else if (!strcasecmp(argv[0],"vm-swap-file") && argc == 2) {
+            zfree(server.vm_swap_file);
+            server.vm_swap_file = zstrdup(argv[1]);
         } else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) {
             server.vm_max_memory = strtoll(argv[1], NULL, 10);
         } else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) {
         } else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) {
             server.vm_max_memory = strtoll(argv[1], NULL, 10);
         } else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) {
@@ -1711,10 +1722,11 @@ static void glueReplyBuffersIfNeeded(redisClient *c) {
     int copylen = 0;
     char buf[GLUEREPLY_UP_TO];
     listNode *ln;
     int copylen = 0;
     char buf[GLUEREPLY_UP_TO];
     listNode *ln;
+    listIter li;
     robj *o;
 
     robj *o;
 
-    listRewind(c->reply);
-    while((ln = listYield(c->reply))) {
+    listRewind(c->reply,&li);
+    while((ln = listNext(&li))) {
         int objlen;
 
         o = ln->value;
         int objlen;
 
         o = ln->value;
@@ -2076,6 +2088,7 @@ static int processCommand(redisClient *c) {
 
 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
     listNode *ln;
 
 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
     listNode *ln;
+    listIter li;
     int outc = 0, j;
     robj **outv;
     /* (args*2)+1 is enough room for args, spaces, newlines */
     int outc = 0, j;
     robj **outv;
     /* (args*2)+1 is enough room for args, spaces, newlines */
@@ -2106,8 +2119,8 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di
      * be sure to free objects if there is no slave in a replication state
      * able to be feed with commands */
     for (j = 0; j < outc; j++) incrRefCount(outv[j]);
      * be sure to free objects if there is no slave in a replication state
      * able to be feed with commands */
     for (j = 0; j < outc; j++) incrRefCount(outv[j]);
-    listRewind(slaves);
-    while((ln = listYield(slaves))) {
+    listRewind(slaves,&li);
+    while((ln = listNext(&li))) {
         redisClient *slave = ln->value;
 
         /* Don't feed slaves that are still waiting for BGSAVE to start */
         redisClient *slave = ln->value;
 
         /* Don't feed slaves that are still waiting for BGSAVE to start */
@@ -2413,6 +2426,10 @@ static robj *createObject(int type, void *ptr) {
     o->ptr = ptr;
     o->refcount = 1;
     if (server.vm_enabled) {
     o->ptr = ptr;
     o->refcount = 1;
     if (server.vm_enabled) {
+        /* Note that this code may run in the context of an I/O thread
+         * and accessing to server.unixtime in theory is an error
+         * (no locks). But in practice this is safe, and even if we read
+         * garbage Redis will not fail, as it's just a statistical info */
         o->vm.atime = server.unixtime;
         o->storage = REDIS_VM_MEMORY;
     }
         o->vm.atime = server.unixtime;
         o->storage = REDIS_VM_MEMORY;
     }
@@ -2482,7 +2499,8 @@ static void incrRefCount(robj *o) {
 static void decrRefCount(void *obj) {
     robj *o = obj;
 
 static void decrRefCount(void *obj) {
     robj *o = obj;
 
-    /* Object is swapped out, or in the process of being loaded. */
+    /* Object is a key of a swapped out value, or in the process of being
+     * loaded. */
     if (server.vm_enabled &&
         (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
     {
     if (server.vm_enabled &&
         (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
     {
@@ -2813,8 +2831,6 @@ static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
     outlen = sdslen(obj->ptr)-4;
     if (outlen <= 0) return 0;
     if ((out = zmalloc(outlen+1)) == NULL) return 0;
     outlen = sdslen(obj->ptr)-4;
     if (outlen <= 0) return 0;
     if ((out = zmalloc(outlen+1)) == NULL) return 0;
-    printf("Calling LZF with ptr: %p\n", (void*)obj->ptr);
-    fflush(stdout);
     comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
     if (comprlen == 0) {
         zfree(out);
     comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
     if (comprlen == 0) {
         zfree(out);
@@ -2922,11 +2938,12 @@ static int rdbSaveObject(FILE *fp, robj *o) {
     } else if (o->type == REDIS_LIST) {
         /* Save a list value */
         list *list = o->ptr;
     } else if (o->type == REDIS_LIST) {
         /* Save a list value */
         list *list = o->ptr;
+        listIter li;
         listNode *ln;
 
         listNode *ln;
 
-        listRewind(list);
         if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
         if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
-        while((ln = listYield(list))) {
+        listRewind(list,&li);
+        while((ln = listNext(&li))) {
             robj *eleobj = listNodeValue(ln);
 
             if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
             robj *eleobj = listNodeValue(ln);
 
             if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
@@ -2992,6 +3009,12 @@ static int rdbSave(char *filename) {
     int j;
     time_t now = time(NULL);
 
     int j;
     time_t now = time(NULL);
 
+    /* Wait for I/O therads to terminate, just in case this is a
+     * foreground-saving, to avoid seeking the swap file descriptor at the
+     * same time. */
+    if (server.vm_enabled)
+        waitEmptyIOJobsQueue();
+
     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
     fp = fopen(tmpfile,"w");
     if (!fp) {
     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
     fp = fopen(tmpfile,"w");
     if (!fp) {
@@ -3081,8 +3104,10 @@ static int rdbSaveBackground(char *filename) {
     pid_t childpid;
 
     if (server.bgsavechildpid != -1) return REDIS_ERR;
     pid_t childpid;
 
     if (server.bgsavechildpid != -1) return REDIS_ERR;
+    if (server.vm_enabled) waitEmptyIOJobsQueue();
     if ((childpid = fork()) == 0) {
         /* Child */
     if ((childpid = fork()) == 0) {
         /* Child */
+        if (server.vm_enabled) vmReopenSwapFile();
         close(server.fd);
         if (rdbSave(filename) == REDIS_OK) {
             exit(0);
         close(server.fd);
         if (rdbSave(filename) == REDIS_OK) {
             exit(0);
@@ -3753,6 +3778,7 @@ static void shutdownCommand(redisClient *c) {
     if (server.appendonly) {
         /* Append only file: fsync() the AOF and exit */
         fsync(server.appendfd);
     if (server.appendonly) {
         /* Append only file: fsync() the AOF and exit */
         fsync(server.appendfd);
+        if (server.vm_enabled) unlink(server.vm_swap_file);
         exit(0);
     } else {
         /* Snapshotting. Perform a SYNC SAVE and exit */
         exit(0);
     } else {
         /* Snapshotting. Perform a SYNC SAVE and exit */
@@ -3761,6 +3787,7 @@ static void shutdownCommand(redisClient *c) {
                 unlink(server.pidfile);
             redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
             redisLog(REDIS_WARNING,"Server exit now, bye bye...");
                 unlink(server.pidfile);
             redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
             redisLog(REDIS_WARNING,"Server exit now, bye bye...");
+            if (server.vm_enabled) unlink(server.vm_swap_file);
             exit(0);
         } else {
             /* Ooops.. error saving! The best we can do is to continue operating.
             exit(0);
         } else {
             /* Ooops.. error saving! The best we can do is to continue operating.
@@ -5362,9 +5389,10 @@ static void sortCommand(redisClient *c) {
     if (sortval->type == REDIS_LIST) {
         list *list = sortval->ptr;
         listNode *ln;
     if (sortval->type == REDIS_LIST) {
         list *list = sortval->ptr;
         listNode *ln;
+        listIter li;
 
 
-        listRewind(list);
-        while((ln = listYield(list))) {
+        listRewind(list,&li);
+        while((ln = listNext(&li))) {
             robj *ele = ln->value;
             vector[j].obj = ele;
             vector[j].u.score = 0;
             robj *ele = ln->value;
             vector[j].obj = ele;
             vector[j].u.score = 0;
@@ -5460,13 +5488,15 @@ static void sortCommand(redisClient *c) {
         addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
         for (j = start; j <= end; j++) {
             listNode *ln;
         addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
         for (j = start; j <= end; j++) {
             listNode *ln;
+            listIter li;
+
             if (!getop) {
                 addReplyBulkLen(c,vector[j].obj);
                 addReply(c,vector[j].obj);
                 addReply(c,shared.crlf);
             }
             if (!getop) {
                 addReplyBulkLen(c,vector[j].obj);
                 addReply(c,vector[j].obj);
                 addReply(c,shared.crlf);
             }
-            listRewind(operations);
-            while((ln = listYield(operations))) {
+            listRewind(operations,&li);
+            while((ln = listNext(&li))) {
                 redisSortOperation *sop = ln->value;
                 robj *val = lookupKeyByPattern(c->db,sop->pattern,
                     vector[j].obj);
                 redisSortOperation *sop = ln->value;
                 robj *val = lookupKeyByPattern(c->db,sop->pattern,
                     vector[j].obj);
@@ -5491,12 +5521,14 @@ static void sortCommand(redisClient *c) {
         /* STORE option specified, set the sorting result as a List object */
         for (j = start; j <= end; j++) {
             listNode *ln;
         /* STORE option specified, set the sorting result as a List object */
         for (j = start; j <= end; j++) {
             listNode *ln;
+            listIter li;
+
             if (!getop) {
                 listAddNodeTail(listPtr,vector[j].obj);
                 incrRefCount(vector[j].obj);
             }
             if (!getop) {
                 listAddNodeTail(listPtr,vector[j].obj);
                 incrRefCount(vector[j].obj);
             }
-            listRewind(operations);
-            while((ln = listYield(operations))) {
+            listRewind(operations,&li);
+            while((ln = listNext(&li))) {
                 redisSortOperation *sop = ln->value;
                 robj *val = lookupKeyByPattern(c->db,sop->pattern,
                     vector[j].obj);
                 redisSortOperation *sop = ln->value;
                 robj *val = lookupKeyByPattern(c->db,sop->pattern,
                     vector[j].obj);
@@ -5618,6 +5650,7 @@ static sds genRedisInfoString(void) {
         );
     }
     if (server.vm_enabled) {
         );
     }
     if (server.vm_enabled) {
+        lockThreadedIO();
         info = sdscatprintf(info,
             "vm_conf_max_memory:%llu\r\n"
             "vm_conf_page_size:%llu\r\n"
         info = sdscatprintf(info,
             "vm_conf_max_memory:%llu\r\n"
             "vm_conf_page_size:%llu\r\n"
@@ -5644,6 +5677,7 @@ static sds genRedisInfoString(void) {
             (unsigned long) listLength(server.io_clients),
             (unsigned long) server.io_active_threads
         );
             (unsigned long) listLength(server.io_clients),
             (unsigned long) server.io_active_threads
         );
+        unlockThreadedIO();
     }
     for (j = 0; j < server.dbnum; j++) {
         long long keys, vkeys;
     }
     for (j = 0; j < server.dbnum; j++) {
         long long keys, vkeys;
@@ -6136,9 +6170,10 @@ static void syncCommand(redisClient *c) {
          * registering differences since the server forked to save */
         redisClient *slave;
         listNode *ln;
          * registering differences since the server forked to save */
         redisClient *slave;
         listNode *ln;
+        listIter li;
 
 
-        listRewind(server.slaves);
-        while((ln = listYield(server.slaves))) {
+        listRewind(server.slaves,&li);
+        while((ln = listNext(&li))) {
             slave = ln->value;
             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
         }
             slave = ln->value;
             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
         }
@@ -6235,9 +6270,10 @@ static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
 static void updateSlavesWaitingBgsave(int bgsaveerr) {
     listNode *ln;
     int startbgsave = 0;
 static void updateSlavesWaitingBgsave(int bgsaveerr) {
     listNode *ln;
     int startbgsave = 0;
+    listIter li;
 
 
-    listRewind(server.slaves);
-    while((ln = listYield(server.slaves))) {
+    listRewind(server.slaves,&li);
+    while((ln = listNext(&li))) {
         redisClient *slave = ln->value;
 
         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
         redisClient *slave = ln->value;
 
         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
@@ -6269,9 +6305,11 @@ static void updateSlavesWaitingBgsave(int bgsaveerr) {
     }
     if (startbgsave) {
         if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
     }
     if (startbgsave) {
         if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
-            listRewind(server.slaves);
+            listIter li;
+
+            listRewind(server.slaves,&li);
             redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
             redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
-            while((ln = listYield(server.slaves))) {
+            while((ln = listNext(&li))) {
                 redisClient *slave = ln->value;
 
                 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                 redisClient *slave = ln->value;
 
                 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
@@ -6792,9 +6830,10 @@ static int rewriteAppendOnlyFile(char *filename) {
                 /* Emit the RPUSHes needed to rebuild the list */
                 list *list = o->ptr;
                 listNode *ln;
                 /* Emit the RPUSHes needed to rebuild the list */
                 list *list = o->ptr;
                 listNode *ln;
+                listIter li;
 
 
-                listRewind(list);
-                while((ln = listYield(list))) {
+                listRewind(list,&li);
+                while((ln = listNext(&li))) {
                     char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
                     robj *eleobj = listNodeValue(ln);
 
                     char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
                     robj *eleobj = listNodeValue(ln);
 
@@ -6890,11 +6929,13 @@ static int rewriteAppendOnlyFileBackground(void) {
     pid_t childpid;
 
     if (server.bgrewritechildpid != -1) return REDIS_ERR;
     pid_t childpid;
 
     if (server.bgrewritechildpid != -1) return REDIS_ERR;
+    if (server.vm_enabled) waitEmptyIOJobsQueue();
     if ((childpid = fork()) == 0) {
         /* Child */
         char tmpfile[256];
     if ((childpid = fork()) == 0) {
         /* Child */
         char tmpfile[256];
-        close(server.fd);
 
 
+        if (server.vm_enabled) vmReopenSwapFile();
+        close(server.fd);
         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
         if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
             exit(0);
         snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
         if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
             exit(0);
@@ -6964,13 +7005,40 @@ static void aofRemoveTempFile(pid_t childpid) {
  */
 
 /* =================== Virtual Memory - Blocking Side  ====================== */
  */
 
 /* =================== Virtual Memory - Blocking Side  ====================== */
+
+/* substitute the first occurrence of '%p' with the process pid in the
+ * swap file name. */
+static void expandVmSwapFilename(void) {
+    char *p = strstr(server.vm_swap_file,"%p");
+    sds new;
+    
+    if (!p) return;
+    new = sdsempty();
+    *p = '\0';
+    new = sdscat(new,server.vm_swap_file);
+    new = sdscatprintf(new,"%ld",(long) getpid());
+    new = sdscat(new,p+2);
+    zfree(server.vm_swap_file);
+    server.vm_swap_file = new;
+}
+
 static void vmInit(void) {
     off_t totsize;
     int pipefds[2];
 static void vmInit(void) {
     off_t totsize;
     int pipefds[2];
+    size_t stacksize;
+
+    if (server.vm_max_threads != 0)
+        zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
 
 
-    server.vm_fp = fopen("/tmp/redisvm","w+b");
+    expandVmSwapFilename();
+    redisLog(REDIS_NOTICE,"Using '%s' as swap file",server.vm_swap_file);
+    if ((server.vm_fp = fopen(server.vm_swap_file,"r+b")) == NULL) {
+        server.vm_fp = fopen(server.vm_swap_file,"w+b");
+    }
     if (server.vm_fp == NULL) {
     if (server.vm_fp == NULL) {
-        redisLog(REDIS_WARNING,"Impossible to open the swap file. Exiting.");
+        redisLog(REDIS_WARNING,
+            "Impossible to open the swap file: %s. Exiting.",
+            strerror(errno));
         exit(1);
     }
     server.vm_fd = fileno(server.vm_fp);
         exit(1);
     }
     server.vm_fd = fileno(server.vm_fp);
@@ -6993,9 +7061,6 @@ static void vmInit(void) {
     redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages",
         (long long) (server.vm_pages+7)/8, server.vm_pages);
     memset(server.vm_bitmap,0,(server.vm_pages+7)/8);
     redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages",
         (long long) (server.vm_pages+7)/8, server.vm_pages);
     memset(server.vm_bitmap,0,(server.vm_pages+7)/8);
-    /* Try to remove the swap file, so the OS will really delete it from the
-     * file system when Redis exists. */
-    unlink("/tmp/redisvm");
 
     /* Initialize threaded I/O (used by Virtual Memory) */
     server.io_newjobs = listCreate();
 
     /* Initialize threaded I/O (used by Virtual Memory) */
     server.io_newjobs = listCreate();
@@ -7014,6 +7079,11 @@ static void vmInit(void) {
     server.io_ready_pipe_read = pipefds[0];
     server.io_ready_pipe_write = pipefds[1];
     redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR);
     server.io_ready_pipe_read = pipefds[0];
     server.io_ready_pipe_write = pipefds[1];
     redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR);
+    /* LZF requires a lot of stack */
+    pthread_attr_init(&server.io_threads_attr);
+    pthread_attr_getstacksize(&server.io_threads_attr, &stacksize);
+    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
+    pthread_attr_setstacksize(&server.io_threads_attr, stacksize);
     /* Listen for events in the threaded I/O pipe */
     if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE,
         vmThreadedIOCompletedJob, NULL) == AE_ERR)
     /* Listen for events in the threaded I/O pipe */
     if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE,
         vmThreadedIOCompletedJob, NULL) == AE_ERR)
@@ -7024,6 +7094,7 @@ static void vmInit(void) {
 static void vmMarkPageUsed(off_t page) {
     off_t byte = page/8;
     int bit = page&7;
 static void vmMarkPageUsed(off_t page) {
     off_t byte = page/8;
     int bit = page&7;
+    redisAssert(vmFreePage(page) == 1);
     server.vm_bitmap[byte] |= 1<<bit;
     redisLog(REDIS_DEBUG,"Mark used: %lld (byte:%lld bit:%d)\n",
         (long long)page, (long long)byte, bit);
     server.vm_bitmap[byte] |= 1<<bit;
     redisLog(REDIS_DEBUG,"Mark used: %lld (byte:%lld bit:%d)\n",
         (long long)page, (long long)byte, bit);
@@ -7042,7 +7113,10 @@ static void vmMarkPagesUsed(off_t page, off_t count) {
 static void vmMarkPageFree(off_t page) {
     off_t byte = page/8;
     int bit = page&7;
 static void vmMarkPageFree(off_t page) {
     off_t byte = page/8;
     int bit = page&7;
+    redisAssert(vmFreePage(page) == 0);
     server.vm_bitmap[byte] &= ~(1<<bit);
     server.vm_bitmap[byte] &= ~(1<<bit);
+    redisLog(REDIS_DEBUG,"Mark free: %lld (byte:%lld bit:%d)\n",
+        (long long)page, (long long)byte, bit);
 }
 
 /* Mark N contiguous pages as free, with 'page' being the first. */
 }
 
 /* Mark N contiguous pages as free, with 'page' being the first. */
@@ -7052,6 +7126,9 @@ static void vmMarkPagesFree(off_t page, off_t count) {
     for (j = 0; j < count; j++)
         vmMarkPageFree(page+j);
     server.vm_stats_used_pages -= count;
     for (j = 0; j < count; j++)
         vmMarkPageFree(page+j);
     server.vm_stats_used_pages -= count;
+    if (server.vm_stats_used_pages > 100000000) {
+        *((char*)-1) = 'x';
+    }
 }
 
 /* Test if the page is free */
 }
 
 /* Test if the page is free */
@@ -7080,7 +7157,7 @@ static int vmFreePage(off_t page) {
  * note: I implemented this function just after watching an episode of
  * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
  */
  * note: I implemented this function just after watching an episode of
  * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
  */
-static int vmFindContiguousPages(off_t *first, int n) {
+static int vmFindContiguousPages(off_t *first, off_t n) {
     off_t base, offset = 0, since_jump = 0, numfree = 0;
 
     if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) {
     off_t base, offset = 0, since_jump = 0, numfree = 0;
 
     if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) {
@@ -7330,7 +7407,14 @@ static int vmSwapOneObject(int usethreads) {
             de = dictGetRandomKey(db->dict);
             key = dictGetEntryKey(de);
             val = dictGetEntryVal(de);
             de = dictGetRandomKey(db->dict);
             key = dictGetEntryKey(de);
             val = dictGetEntryVal(de);
-            if (key->storage != REDIS_VM_MEMORY) {
+            /* Only swap objects that are currently in memory.
+             *
+             * Also don't swap shared objects if threaded VM is on, as we
+             * try to ensure that the main thread does not touch the
+             * object while the I/O thread is using it, but we can't
+             * control other keys without adding additional mutex. */
+            if (key->storage != REDIS_VM_MEMORY ||
+                (server.vm_max_threads != 0 && val->refcount != 1)) {
                 if (maxtries) i--; /* don't count this try */
                 continue;
             }
                 if (maxtries) i--; /* don't count this try */
                 continue;
             }
@@ -7432,10 +7516,10 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
         struct dictEntry *de;
 
         redisLog(REDIS_DEBUG,"Processing I/O completed job");
         struct dictEntry *de;
 
         redisLog(REDIS_DEBUG,"Processing I/O completed job");
-        assert(listLength(server.io_processed) != 0);
 
         /* Get the processed element (the oldest one) */
         lockThreadedIO();
 
         /* Get the processed element (the oldest one) */
         lockThreadedIO();
+        assert(listLength(server.io_processed) != 0);
         ln = listFirst(server.io_processed);
         j = ln->value;
         listDelNode(server.io_processed,ln);
         ln = listFirst(server.io_processed);
         j = ln->value;
         listDelNode(server.io_processed,ln);
@@ -7465,10 +7549,18 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
             /* Now we know the amount of pages required to swap this object.
              * Let's find some space for it, and queue this task again
              * rebranded as REDIS_IOJOB_DO_SWAP. */
             /* Now we know the amount of pages required to swap this object.
              * Let's find some space for it, and queue this task again
              * rebranded as REDIS_IOJOB_DO_SWAP. */
-            if (vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR) {
-                /* Ooops... no space! */
+            if (!vmCanSwapOut() ||
+                vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
+            {
+                /* Ooops... no space or we can't swap as there is
+                 * a fork()ed Redis trying to save stuff on disk. */
                 freeIOJob(j);
                 freeIOJob(j);
+                key->storage = REDIS_VM_MEMORY; /* undo operation */
             } else {
             } else {
+                /* Note that we need to mark this pages as used now,
+                 * if the job will be canceled, we'll mark them as freed
+                 * again. */
+                vmMarkPagesUsed(j->page,j->pages);
                 j->type = REDIS_IOJOB_DO_SWAP;
                 lockThreadedIO();
                 queueIOJob(j);
                 j->type = REDIS_IOJOB_DO_SWAP;
                 lockThreadedIO();
                 queueIOJob(j);
@@ -7494,7 +7586,6 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
             key->vtype = j->val->type;
             decrRefCount(val); /* Deallocate the object from memory. */
             dictGetEntryVal(de) = NULL;
             key->vtype = j->val->type;
             decrRefCount(val); /* Deallocate the object from memory. */
             dictGetEntryVal(de) = NULL;
-            vmMarkPagesUsed(j->page,j->pages);
             redisLog(REDIS_DEBUG,
                 "VM: object %s swapped out at %lld (%lld pages) (threaded)",
                 (unsigned char*) key->ptr,
             redisLog(REDIS_DEBUG,
                 "VM: object %s swapped out at %lld (%lld pages) (threaded)",
                 (unsigned char*) key->ptr,
@@ -7504,7 +7595,7 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
             freeIOJob(j);
             /* Put a few more swap requests in queue if we are still
              * out of memory */
             freeIOJob(j);
             /* Put a few more swap requests in queue if we are still
              * out of memory */
-            if (zmalloc_used_memory() > server.vm_max_memory) {
+            if (vmCanSwapOut() && zmalloc_used_memory() > server.vm_max_memory){
                 int more = 1;
                 while(more) {
                     lockThreadedIO();
                 int more = 1;
                 while(more) {
                     lockThreadedIO();
@@ -7545,19 +7636,27 @@ static void vmCancelThreadedIOJob(robj *o) {
     int i;
 
     assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING);
     int i;
 
     assert(o->storage == REDIS_VM_LOADING || o->storage == REDIS_VM_SWAPPING);
+again:
     lockThreadedIO();
     /* Search for a matching key in one of the queues */
     for (i = 0; i < 3; i++) {
         listNode *ln;
     lockThreadedIO();
     /* Search for a matching key in one of the queues */
     for (i = 0; i < 3; i++) {
         listNode *ln;
+        listIter li;
 
 
-        listRewind(lists[i]);
-        while ((ln = listYield(lists[i])) != NULL) {
+        listRewind(lists[i],&li);
+        while ((ln = listNext(&li)) != NULL) {
             iojob *job = ln->value;
 
             if (job->canceled) continue; /* Skip this, already canceled. */
             if (compareStringObjects(job->key,o) == 0) {
             iojob *job = ln->value;
 
             if (job->canceled) continue; /* Skip this, already canceled. */
             if (compareStringObjects(job->key,o) == 0) {
-                redisLog(REDIS_DEBUG,"*** CANCELED %p (%s)\n",
-                    (void*)job, (char*)o->ptr);
+                redisLog(REDIS_DEBUG,"*** CANCELED %p (%s) (type %d) (LIST ID %d)\n",
+                    (void*)job, (char*)o->ptr, job->type, i);
+                /* Mark the pages as free since the swap didn't happened
+                 * or happened but is now discarded. */
+                if (i != 1 && job->type == REDIS_IOJOB_DO_SWAP)
+                    vmMarkPagesFree(job->page,job->pages);
+                /* Cancel the job. It depends on the list the job is
+                 * living in. */
                 switch(i) {
                 case 0: /* io_newjobs */
                     /* If the job was yet not processed the best thing to do
                 switch(i) {
                 case 0: /* io_newjobs */
                     /* If the job was yet not processed the best thing to do
@@ -7566,10 +7665,32 @@ static void vmCancelThreadedIOJob(robj *o) {
                     listDelNode(lists[i],ln);
                     break;
                 case 1: /* io_processing */
                     listDelNode(lists[i],ln);
                     break;
                 case 1: /* io_processing */
+                    /* Oh Shi- the thread is messing with the Job, and
+                     * probably with the object if this is a
+                     * PREPARE_SWAP or DO_SWAP job. Better to wait for the
+                     * job to move into the next queue... */
+                    if (job->type != REDIS_IOJOB_LOAD) {
+                        /* Yes, we try again and again until the job
+                         * is completed. */
+                        unlockThreadedIO();
+                        /* But let's wait some time for the I/O thread
+                         * to finish with this job. After all this condition
+                         * should be very rare. */
+                        usleep(1);
+                        goto again;
+                    } else {
+                        job->canceled = 1;
+                        break;
+                    }
                 case 2: /* io_processed */
                 case 2: /* io_processed */
+                    /* The job was already processed, that's easy...
+                     * just mark it as canceled so that we'll ignore it
+                     * when processing completed jobs. */
                     job->canceled = 1;
                     break;
                 }
                     job->canceled = 1;
                     break;
                 }
+                /* Finally we have to adjust the storage type of the object
+                 * in order to "UNDO" the operaiton. */
                 if (o->storage == REDIS_VM_LOADING)
                     o->storage = REDIS_VM_SWAPPED;
                 else if (o->storage == REDIS_VM_SWAPPING)
                 if (o->storage == REDIS_VM_LOADING)
                     o->storage = REDIS_VM_SWAPPED;
                 else if (o->storage == REDIS_VM_SWAPPING)
@@ -7639,10 +7760,38 @@ static void *IOThreadEntryPoint(void *arg) {
 static void spawnIOThread(void) {
     pthread_t thread;
 
 static void spawnIOThread(void) {
     pthread_t thread;
 
-    pthread_create(&thread,NULL,IOThreadEntryPoint,NULL);
+    pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL);
     server.io_active_threads++;
 }
 
     server.io_active_threads++;
 }
 
+/* We need to wait for the last thread to exit before we are able to
+ * fork() in order to BGSAVE or BGREWRITEAOF. */
+static void waitEmptyIOJobsQueue(void) {
+    while(1) {
+        lockThreadedIO();
+        if (listLength(server.io_newjobs) == 0 &&
+            listLength(server.io_processing) == 0 &&
+            server.io_active_threads == 0)
+        {
+            unlockThreadedIO();
+            return;
+        }
+        unlockThreadedIO();
+        usleep(10000); /* 10 milliseconds */
+    }
+}
+
+static void vmReopenSwapFile(void) {
+    fclose(server.vm_fp);
+    server.vm_fp = fopen(server.vm_swap_file,"r+b");
+    if (server.vm_fp == NULL) {
+        redisLog(REDIS_WARNING,"Can't re-open the VM swap file: %s. Exiting.",
+            server.vm_swap_file);
+        exit(1);
+    }
+    server.vm_fd = fileno(server.vm_fp);
+}
+
 /* This function must be called while with threaded IO locked */
 static void queueIOJob(iojob *j) {
     redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
 /* This function must be called while with threaded IO locked */
 static void queueIOJob(iojob *j) {
     redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
@@ -7715,7 +7864,7 @@ static void debugCommand(redisClient *c) {
                 "+Key at:%p refcount:%d, value at:%p refcount:%d "
                 "encoding:%d serializedlength:%lld\r\n",
                 (void*)key, key->refcount, (void*)val, val->refcount,
                 "+Key at:%p refcount:%d, value at:%p refcount:%d "
                 "encoding:%d serializedlength:%lld\r\n",
                 (void*)key, key->refcount, (void*)val, val->refcount,
-                val->encoding, rdbSavedObjectLen(val,NULL)));
+                val->encoding, (long long) rdbSavedObjectLen(val,NULL)));
         } else {
             addReplySds(c,sdscatprintf(sdsempty(),
                 "+Key at:%p refcount:%d, value swapped at: page %llu "
         } else {
             addReplySds(c,sdscatprintf(sdsempty(),
                 "+Key at:%p refcount:%d, value swapped at: page %llu "