]> git.saurik.com Git - redis.git/blobdiff - redis.c
Set the new threads stack size to a LZF friendly amount
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index 43a0bf783117fd965543f38f6a8215436513d1e0..ce047a406bf570075a6b97f7f52e40ab8fa2d447 100644 (file)
--- a/redis.c
+++ b/redis.c
 #define REDIS_VM_MAX_NEAR_PAGES 65536
 #define REDIS_VM_MAX_RANDOM_JUMP 4096
 #define REDIS_VM_MAX_THREADS 32
 #define REDIS_VM_MAX_NEAR_PAGES 65536
 #define REDIS_VM_MAX_RANDOM_JUMP 4096
 #define REDIS_VM_MAX_THREADS 32
+#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
 /* The following is the number of completed I/O jobs to process when the
  * handelr is called. 1 is the minimum, and also the default, as it allows
  * to block as little as possible other accessing clients. While Virtual
 /* The following is the number of completed I/O jobs to process when the
  * handelr is called. 1 is the minimum, and also the default, as it allows
  * to block as little as possible other accessing clients. While Virtual
@@ -403,6 +404,7 @@ struct redisServer {
     pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
     pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */
     pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
     pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
     pthread_mutex_t obj_freelist_mutex; /* safe redis objects creation/free */
     pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
+    pthread_attr_t io_threads_attr; /* attributes for threads creation */
     int io_active_threads; /* Number of running I/O threads */
     int vm_max_threads; /* Max number of I/O threads running at the same time */
     /* Our main thread is blocked on the event loop, locking for sockets ready
     int io_active_threads; /* Number of running I/O threads */
     int vm_max_threads; /* Max number of I/O threads running at the same time */
     /* Our main thread is blocked on the event loop, locking for sockets ready
@@ -561,6 +563,7 @@ static void freeIOJob(iojob *j);
 static void queueIOJob(iojob *j);
 static int vmWriteObjectOnSwap(robj *o, off_t page);
 static robj *vmReadObjectFromSwap(off_t page, int type);
 static void queueIOJob(iojob *j);
 static int vmWriteObjectOnSwap(robj *o, off_t page);
 static robj *vmReadObjectFromSwap(off_t page, int type);
+static void waitZeroActiveThreads(void);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
@@ -1018,9 +1021,10 @@ static void closeTimedoutClients(void) {
     redisClient *c;
     listNode *ln;
     time_t now = time(NULL);
     redisClient *c;
     listNode *ln;
     time_t now = time(NULL);
+    listIter li;
 
 
-    listRewind(server.clients);
-    while ((ln = listYield(server.clients)) != NULL) {
+    listRewind(server.clients,&li);
+    while ((ln = listNext(&li)) != NULL) {
         c = listNodeValue(ln);
         if (server.maxidletime &&
             !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
         c = listNodeValue(ln);
         if (server.maxidletime &&
             !(c->flags & REDIS_SLAVE) &&    /* no timeout for slaves */
@@ -1262,18 +1266,23 @@ static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientD
         while (server.vm_enabled && zmalloc_used_memory() >
                 server.vm_max_memory)
         {
         while (server.vm_enabled && zmalloc_used_memory() >
                 server.vm_max_memory)
         {
+            int retval;
+
             if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
             if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
-            if (vmSwapOneObjectThreaded() == REDIS_ERR) {
-                if ((loops % 30) == 0 && zmalloc_used_memory() >
-                    (server.vm_max_memory+server.vm_max_memory/10)) {
-                    redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
-                }
+            retval = (server.vm_max_threads == 0) ?
+                        vmSwapOneObjectBlocking() :
+                        vmSwapOneObjectThreaded();
+            if (retval == REDIS_ERR && (loops % 30) == 0 &&
+                zmalloc_used_memory() >
+                (server.vm_max_memory+server.vm_max_memory/10))
+            {
+                redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
             }
             }
-            /* Note that we freed just one object, because anyway when
-             * the I/O thread in charge to swap this object out will
-             * do its work, the handler of completed jobs will try to swap
-             * more objects if we are out of memory. */
-            break;
+            /* Note that when using threade I/O we free just one object,
+             * because anyway when the I/O thread in charge to swap this
+             * object out will finish, the handler of completed jobs
+             * will try to swap more objects if we are still out of memory. */
+            if (retval == REDIS_ERR || server.vm_max_threads > 0) break;
         }
     }
 
         }
     }
 
@@ -1706,10 +1715,11 @@ static void glueReplyBuffersIfNeeded(redisClient *c) {
     int copylen = 0;
     char buf[GLUEREPLY_UP_TO];
     listNode *ln;
     int copylen = 0;
     char buf[GLUEREPLY_UP_TO];
     listNode *ln;
+    listIter li;
     robj *o;
 
     robj *o;
 
-    listRewind(c->reply);
-    while((ln = listYield(c->reply))) {
+    listRewind(c->reply,&li);
+    while((ln = listNext(&li))) {
         int objlen;
 
         o = ln->value;
         int objlen;
 
         o = ln->value;
@@ -2071,6 +2081,7 @@ static int processCommand(redisClient *c) {
 
 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
     listNode *ln;
 
 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
     listNode *ln;
+    listIter li;
     int outc = 0, j;
     robj **outv;
     /* (args*2)+1 is enough room for args, spaces, newlines */
     int outc = 0, j;
     robj **outv;
     /* (args*2)+1 is enough room for args, spaces, newlines */
@@ -2101,8 +2112,8 @@ static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int di
      * be sure to free objects if there is no slave in a replication state
      * able to be feed with commands */
     for (j = 0; j < outc; j++) incrRefCount(outv[j]);
      * be sure to free objects if there is no slave in a replication state
      * able to be feed with commands */
     for (j = 0; j < outc; j++) incrRefCount(outv[j]);
-    listRewind(slaves);
-    while((ln = listYield(slaves))) {
+    listRewind(slaves,&li);
+    while((ln = listNext(&li))) {
         redisClient *slave = ln->value;
 
         /* Don't feed slaves that are still waiting for BGSAVE to start */
         redisClient *slave = ln->value;
 
         /* Don't feed slaves that are still waiting for BGSAVE to start */
@@ -2917,11 +2928,12 @@ static int rdbSaveObject(FILE *fp, robj *o) {
     } else if (o->type == REDIS_LIST) {
         /* Save a list value */
         list *list = o->ptr;
     } else if (o->type == REDIS_LIST) {
         /* Save a list value */
         list *list = o->ptr;
+        listIter li;
         listNode *ln;
 
         listNode *ln;
 
-        listRewind(list);
         if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
         if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
-        while((ln = listYield(list))) {
+        listRewind(list,&li);
+        while((ln = listNext(&li))) {
             robj *eleobj = listNodeValue(ln);
 
             if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
             robj *eleobj = listNodeValue(ln);
 
             if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
@@ -3076,6 +3088,7 @@ static int rdbSaveBackground(char *filename) {
     pid_t childpid;
 
     if (server.bgsavechildpid != -1) return REDIS_ERR;
     pid_t childpid;
 
     if (server.bgsavechildpid != -1) return REDIS_ERR;
+    if (server.vm_enabled) waitZeroActiveThreads();
     if ((childpid = fork()) == 0) {
         /* Child */
         close(server.fd);
     if ((childpid = fork()) == 0) {
         /* Child */
         close(server.fd);
@@ -5357,9 +5370,10 @@ static void sortCommand(redisClient *c) {
     if (sortval->type == REDIS_LIST) {
         list *list = sortval->ptr;
         listNode *ln;
     if (sortval->type == REDIS_LIST) {
         list *list = sortval->ptr;
         listNode *ln;
+        listIter li;
 
 
-        listRewind(list);
-        while((ln = listYield(list))) {
+        listRewind(list,&li);
+        while((ln = listNext(&li))) {
             robj *ele = ln->value;
             vector[j].obj = ele;
             vector[j].u.score = 0;
             robj *ele = ln->value;
             vector[j].obj = ele;
             vector[j].u.score = 0;
@@ -5455,13 +5469,15 @@ static void sortCommand(redisClient *c) {
         addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
         for (j = start; j <= end; j++) {
             listNode *ln;
         addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
         for (j = start; j <= end; j++) {
             listNode *ln;
+            listIter li;
+
             if (!getop) {
                 addReplyBulkLen(c,vector[j].obj);
                 addReply(c,vector[j].obj);
                 addReply(c,shared.crlf);
             }
             if (!getop) {
                 addReplyBulkLen(c,vector[j].obj);
                 addReply(c,vector[j].obj);
                 addReply(c,shared.crlf);
             }
-            listRewind(operations);
-            while((ln = listYield(operations))) {
+            listRewind(operations,&li);
+            while((ln = listNext(&li))) {
                 redisSortOperation *sop = ln->value;
                 robj *val = lookupKeyByPattern(c->db,sop->pattern,
                     vector[j].obj);
                 redisSortOperation *sop = ln->value;
                 robj *val = lookupKeyByPattern(c->db,sop->pattern,
                     vector[j].obj);
@@ -5486,12 +5502,14 @@ static void sortCommand(redisClient *c) {
         /* STORE option specified, set the sorting result as a List object */
         for (j = start; j <= end; j++) {
             listNode *ln;
         /* STORE option specified, set the sorting result as a List object */
         for (j = start; j <= end; j++) {
             listNode *ln;
+            listIter li;
+
             if (!getop) {
                 listAddNodeTail(listPtr,vector[j].obj);
                 incrRefCount(vector[j].obj);
             }
             if (!getop) {
                 listAddNodeTail(listPtr,vector[j].obj);
                 incrRefCount(vector[j].obj);
             }
-            listRewind(operations);
-            while((ln = listYield(operations))) {
+            listRewind(operations,&li);
+            while((ln = listNext(&li))) {
                 redisSortOperation *sop = ln->value;
                 robj *val = lookupKeyByPattern(c->db,sop->pattern,
                     vector[j].obj);
                 redisSortOperation *sop = ln->value;
                 robj *val = lookupKeyByPattern(c->db,sop->pattern,
                     vector[j].obj);
@@ -6131,9 +6149,10 @@ static void syncCommand(redisClient *c) {
          * registering differences since the server forked to save */
         redisClient *slave;
         listNode *ln;
          * registering differences since the server forked to save */
         redisClient *slave;
         listNode *ln;
+        listIter li;
 
 
-        listRewind(server.slaves);
-        while((ln = listYield(server.slaves))) {
+        listRewind(server.slaves,&li);
+        while((ln = listNext(&li))) {
             slave = ln->value;
             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
         }
             slave = ln->value;
             if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
         }
@@ -6230,9 +6249,10 @@ static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
 static void updateSlavesWaitingBgsave(int bgsaveerr) {
     listNode *ln;
     int startbgsave = 0;
 static void updateSlavesWaitingBgsave(int bgsaveerr) {
     listNode *ln;
     int startbgsave = 0;
+    listIter li;
 
 
-    listRewind(server.slaves);
-    while((ln = listYield(server.slaves))) {
+    listRewind(server.slaves,&li);
+    while((ln = listNext(&li))) {
         redisClient *slave = ln->value;
 
         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
         redisClient *slave = ln->value;
 
         if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
@@ -6264,9 +6284,11 @@ static void updateSlavesWaitingBgsave(int bgsaveerr) {
     }
     if (startbgsave) {
         if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
     }
     if (startbgsave) {
         if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
-            listRewind(server.slaves);
+            listIter li;
+
+            listRewind(server.slaves,&li);
             redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
             redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
-            while((ln = listYield(server.slaves))) {
+            while((ln = listNext(&li))) {
                 redisClient *slave = ln->value;
 
                 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
                 redisClient *slave = ln->value;
 
                 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
@@ -6787,9 +6809,10 @@ static int rewriteAppendOnlyFile(char *filename) {
                 /* Emit the RPUSHes needed to rebuild the list */
                 list *list = o->ptr;
                 listNode *ln;
                 /* Emit the RPUSHes needed to rebuild the list */
                 list *list = o->ptr;
                 listNode *ln;
+                listIter li;
 
 
-                listRewind(list);
-                while((ln = listYield(list))) {
+                listRewind(list,&li);
+                while((ln = listNext(&li))) {
                     char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
                     robj *eleobj = listNodeValue(ln);
 
                     char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
                     robj *eleobj = listNodeValue(ln);
 
@@ -6885,6 +6908,7 @@ static int rewriteAppendOnlyFileBackground(void) {
     pid_t childpid;
 
     if (server.bgrewritechildpid != -1) return REDIS_ERR;
     pid_t childpid;
 
     if (server.bgrewritechildpid != -1) return REDIS_ERR;
+    if (server.vm_enabled) waitZeroActiveThreads();
     if ((childpid = fork()) == 0) {
         /* Child */
         char tmpfile[256];
     if ((childpid = fork()) == 0) {
         /* Child */
         char tmpfile[256];
@@ -6962,6 +6986,7 @@ static void aofRemoveTempFile(pid_t childpid) {
 static void vmInit(void) {
     off_t totsize;
     int pipefds[2];
 static void vmInit(void) {
     off_t totsize;
     int pipefds[2];
+    size_t stacksize;
 
     server.vm_fp = fopen("/tmp/redisvm","w+b");
     if (server.vm_fp == NULL) {
 
     server.vm_fp = fopen("/tmp/redisvm","w+b");
     if (server.vm_fp == NULL) {
@@ -7009,6 +7034,11 @@ static void vmInit(void) {
     server.io_ready_pipe_read = pipefds[0];
     server.io_ready_pipe_write = pipefds[1];
     redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR);
     server.io_ready_pipe_read = pipefds[0];
     server.io_ready_pipe_write = pipefds[1];
     redisAssert(anetNonBlock(NULL,server.io_ready_pipe_read) != ANET_ERR);
+    /* LZF requires a lot of stack */
+    pthread_attr_init(&server.io_threads_attr);
+    pthread_attr_getstacksize(&server.io_threads_attr, &stacksize);
+    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
+    pthread_attr_setstacksize(&server.io_threads_attr, stacksize);
     /* Listen for events in the threaded I/O pipe */
     if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE,
         vmThreadedIOCompletedJob, NULL) == AE_ERR)
     /* Listen for events in the threaded I/O pipe */
     if (aeCreateFileEvent(server.el, server.io_ready_pipe_read, AE_READABLE,
         vmThreadedIOCompletedJob, NULL) == AE_ERR)
@@ -7075,7 +7105,7 @@ static int vmFreePage(off_t page) {
  * note: I implemented this function just after watching an episode of
  * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
  */
  * note: I implemented this function just after watching an episode of
  * Battlestar Galactica, where the hybrid was continuing to say "JUMP!"
  */
-static int vmFindContiguousPages(off_t *first, int n) {
+static int vmFindContiguousPages(off_t *first, off_t n) {
     off_t base, offset = 0, since_jump = 0, numfree = 0;
 
     if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) {
     off_t base, offset = 0, since_jump = 0, numfree = 0;
 
     if (server.vm_near_pages == REDIS_VM_MAX_NEAR_PAGES) {
@@ -7464,6 +7494,10 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
                 /* Ooops... no space! */
                 freeIOJob(j);
             } else {
                 /* Ooops... no space! */
                 freeIOJob(j);
             } else {
+                /* Note that we need to mark this pages as used now,
+                 * if the job will be canceled, we'll mark them as freed
+                 * again. */
+                vmMarkPagesUsed(j->page,j->pages);
                 j->type = REDIS_IOJOB_DO_SWAP;
                 lockThreadedIO();
                 queueIOJob(j);
                 j->type = REDIS_IOJOB_DO_SWAP;
                 lockThreadedIO();
                 queueIOJob(j);
@@ -7489,7 +7523,6 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
             key->vtype = j->val->type;
             decrRefCount(val); /* Deallocate the object from memory. */
             dictGetEntryVal(de) = NULL;
             key->vtype = j->val->type;
             decrRefCount(val); /* Deallocate the object from memory. */
             dictGetEntryVal(de) = NULL;
-            vmMarkPagesUsed(j->page,j->pages);
             redisLog(REDIS_DEBUG,
                 "VM: object %s swapped out at %lld (%lld pages) (threaded)",
                 (unsigned char*) key->ptr,
             redisLog(REDIS_DEBUG,
                 "VM: object %s swapped out at %lld (%lld pages) (threaded)",
                 (unsigned char*) key->ptr,
@@ -7544,15 +7577,22 @@ static void vmCancelThreadedIOJob(robj *o) {
     /* Search for a matching key in one of the queues */
     for (i = 0; i < 3; i++) {
         listNode *ln;
     /* Search for a matching key in one of the queues */
     for (i = 0; i < 3; i++) {
         listNode *ln;
+        listIter li;
 
 
-        listRewind(lists[i]);
-        while ((ln = listYield(lists[i])) != NULL) {
+        listRewind(lists[i],&li);
+        while ((ln = listNext(&li)) != NULL) {
             iojob *job = ln->value;
 
             if (job->canceled) continue; /* Skip this, already canceled. */
             if (compareStringObjects(job->key,o) == 0) {
                 redisLog(REDIS_DEBUG,"*** CANCELED %p (%s)\n",
                     (void*)job, (char*)o->ptr);
             iojob *job = ln->value;
 
             if (job->canceled) continue; /* Skip this, already canceled. */
             if (compareStringObjects(job->key,o) == 0) {
                 redisLog(REDIS_DEBUG,"*** CANCELED %p (%s)\n",
                     (void*)job, (char*)o->ptr);
+                /* Mark the pages as free since the swap didn't happened
+                 * or happened but is now discarded. */
+                if (job->type == REDIS_IOJOB_DO_SWAP)
+                    vmMarkPagesFree(job->page,job->pages);
+                /* Cancel the job. It depends on the list the job is
+                 * living in. */
                 switch(i) {
                 case 0: /* io_newjobs */
                     /* If the job was yet not processed the best thing to do
                 switch(i) {
                 case 0: /* io_newjobs */
                     /* If the job was yet not processed the best thing to do
@@ -7565,6 +7605,8 @@ static void vmCancelThreadedIOJob(robj *o) {
                     job->canceled = 1;
                     break;
                 }
                     job->canceled = 1;
                     break;
                 }
+                /* Finally we have to adjust the storage type of the object
+                 * in order to "UNDO" the operaiton. */
                 if (o->storage == REDIS_VM_LOADING)
                     o->storage = REDIS_VM_SWAPPED;
                 else if (o->storage == REDIS_VM_SWAPPING)
                 if (o->storage == REDIS_VM_LOADING)
                     o->storage = REDIS_VM_SWAPPED;
                 else if (o->storage == REDIS_VM_SWAPPING)
@@ -7589,7 +7631,7 @@ static void *IOThreadEntryPoint(void *arg) {
         lockThreadedIO();
         if (listLength(server.io_newjobs) == 0) {
             /* No new jobs in queue, exit. */
         lockThreadedIO();
         if (listLength(server.io_newjobs) == 0) {
             /* No new jobs in queue, exit. */
-            redisLog(REDIS_DEBUG,"Thread %lld exiting, nothing to do\n",
+            redisLog(REDIS_DEBUG,"Thread %lld exiting, nothing to do",
                 (long long) pthread_self());
             server.io_active_threads--;
             unlockThreadedIO();
                 (long long) pthread_self());
             server.io_active_threads--;
             unlockThreadedIO();
@@ -7603,7 +7645,7 @@ static void *IOThreadEntryPoint(void *arg) {
         listAddNodeTail(server.io_processing,j);
         ln = listLast(server.io_processing); /* We use ln later to remove it */
         unlockThreadedIO();
         listAddNodeTail(server.io_processing,j);
         ln = listLast(server.io_processing); /* We use ln later to remove it */
         unlockThreadedIO();
-        redisLog(REDIS_DEBUG,"Thread %lld got a new job (type %d): %p about key '%s'\n",
+        redisLog(REDIS_DEBUG,"Thread %lld got a new job (type %d): %p about key '%s'",
             (long long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
 
         /* Process the Job */
             (long long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
 
         /* Process the Job */
@@ -7618,7 +7660,7 @@ static void *IOThreadEntryPoint(void *arg) {
         }
 
         /* Done: insert the job into the processed queue */
         }
 
         /* Done: insert the job into the processed queue */
-        redisLog(REDIS_DEBUG,"Thread %lld completed the job: %p (key %s)\n",
+        redisLog(REDIS_DEBUG,"Thread %lld completed the job: %p (key %s)",
             (long long) pthread_self(), (void*)j, (char*)j->key->ptr);
         lockThreadedIO();
         listDelNode(server.io_processing,ln);
             (long long) pthread_self(), (void*)j, (char*)j->key->ptr);
         lockThreadedIO();
         listDelNode(server.io_processing,ln);
@@ -7634,10 +7676,24 @@ static void *IOThreadEntryPoint(void *arg) {
 static void spawnIOThread(void) {
     pthread_t thread;
 
 static void spawnIOThread(void) {
     pthread_t thread;
 
-    pthread_create(&thread,NULL,IOThreadEntryPoint,NULL);
+    pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL);
     server.io_active_threads++;
 }
 
     server.io_active_threads++;
 }
 
+/* We need to wait for the last thread to exit before we are able to
+ * fork() in order to BGSAVE or BGREWRITEAOF. */
+static void waitZeroActiveThreads(void) {
+    while(1) {
+        lockThreadedIO();
+        if (server.io_active_threads == 0) {
+            unlockThreadedIO();
+            return;
+        }
+        unlockThreadedIO();
+        usleep(10000); /* 10 milliseconds */
+    }
+}
+
 /* This function must be called while with threaded IO locked */
 static void queueIOJob(iojob *j) {
     redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
 /* This function must be called while with threaded IO locked */
 static void queueIOJob(iojob *j) {
     redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",