]> git.saurik.com Git - redis.git/commitdiff
a few non blocking VM bugs fixed
authorantirez <antirez@gmail.com>
Mon, 11 Jan 2010 17:45:15 +0000 (12:45 -0500)
committerantirez <antirez@gmail.com>
Mon, 11 Jan 2010 17:45:15 +0000 (12:45 -0500)
redis.c
staticsymbols.h

diff --git a/redis.c b/redis.c
index 677078a2fa65f55ebf6c0ce8c740d0e130ea8090..733bc17d81fae75061f1a93b45bbc3ecbf1fd289 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -7384,6 +7384,11 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
         }
         /* Post process it in the main thread, as there are things we
          * can do just here to avoid race conditions and/or invasive locks */
+        redisLog(REDIS_DEBUG,"Job type: %d, key at %p (%s) refcount: %d\n", j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
+        if (j->key->refcount <= 0) {
+            printf("Ooops ref count is <= 0!\n");
+            exit(1);
+        }
         de = dictFind(j->db->dict,j->key);
         assert(de != NULL);
         key = dictGetEntryKey(de);
@@ -7420,6 +7425,7 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
             key->storage = REDIS_VM_SWAPPED;
             key->vtype = j->val->type;
             decrRefCount(val); /* Deallocate the object from memory. */
+            dictGetEntryVal(de) = NULL;
             vmMarkPagesUsed(j->page,j->pages);
             redisLog(REDIS_DEBUG,
                 "VM: object %s swapped out at %lld (%lld pages) (threaded)",
@@ -7428,6 +7434,19 @@ static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
             server.vm_stats_swapped_objects++;
             server.vm_stats_swapouts++;
             freeIOJob(j);
+            /* Put a few more swap requests in queue if we are still
+             * out of memory */
+            if (zmalloc_used_memory() > server.vm_max_memory) {
+                int more = 1;
+                while(more) {
+                    lockThreadedIO();
+                    more = listLength(server.io_newjobs) <
+                            (unsigned) server.vm_max_threads;
+                    unlockThreadedIO();
+                    /* Don't waste CPU time if swappable objects are rare. */
+                    if (vmSwapOneObjectThreaded() == REDIS_ERR) break;
+                }
+            }
         }
     }
     if (retval < 0 && errno != EAGAIN) {
@@ -7572,6 +7591,7 @@ static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
     incrRefCount(val);
     j->canceled = 0;
     j->thread = (pthread_t) -1;
+    key->storage = REDIS_VM_SWAPPING;
 
     lockThreadedIO();
     queueIOJob(j);
index a7f94694889a08d458c1a7905c1221f4115627f9..c29df1ac4dc6e0e6a9799aaf5dc1488f2b59ae09 100644 (file)
@@ -1,4 +1,5 @@
 static struct redisFunctionSym symsTable[] = {
+{"IOThreadEntryPoint",(unsigned long)IOThreadEntryPoint},
 {"_redisAssert",(unsigned long)_redisAssert},
 {"acceptHandler",(unsigned long)acceptHandler},
 {"addReply",(unsigned long)addReply},
@@ -60,6 +61,7 @@ static struct redisFunctionSym symsTable[] = {
 {"freeClientMultiState",(unsigned long)freeClientMultiState},
 {"freeFakeClient",(unsigned long)freeFakeClient},
 {"freeHashObject",(unsigned long)freeHashObject},
+{"freeIOJob",(unsigned long)freeIOJob},
 {"freeListObject",(unsigned long)freeListObject},
 {"freeMemoryIfNeeded",(unsigned long)freeMemoryIfNeeded},
 {"freeOneObjectFromFreelist",(unsigned long)freeOneObjectFromFreelist},
@@ -118,6 +120,7 @@ static struct redisFunctionSym symsTable[] = {
 {"processInputBuffer",(unsigned long)processInputBuffer},
 {"pushGenericCommand",(unsigned long)pushGenericCommand},
 {"qsortCompareSetsByCardinality",(unsigned long)qsortCompareSetsByCardinality},
+{"queueIOJob",(unsigned long)queueIOJob},
 {"queueMultiCommand",(unsigned long)queueMultiCommand},
 {"randomkeyCommand",(unsigned long)randomkeyCommand},
 {"rdbLoad",(unsigned long)rdbLoad},
@@ -184,6 +187,7 @@ static struct redisFunctionSym symsTable[] = {
 {"smoveCommand",(unsigned long)smoveCommand},
 {"sortCommand",(unsigned long)sortCommand},
 {"sortCompare",(unsigned long)sortCompare},
+{"spawnIOThread",(unsigned long)spawnIOThread},
 {"spopCommand",(unsigned long)spopCommand},
 {"srandmemberCommand",(unsigned long)srandmemberCommand},
 {"sremCommand",(unsigned long)sremCommand},