]> git.saurik.com Git - redis.git/blobdiff - src/vm.c
bulk transfers limited to 512 MB as this is the new limit of all the redis strings
[redis.git] / src / vm.c
index 1aaa57eb54bd338bbd2c6ca963764e872ec924ad..ac0d92e332cf091b88c5caeb2b3a9b439869a2a5 100644 (file)
--- a/src/vm.c
+++ b/src/vm.c
@@ -86,10 +86,9 @@ void vmInit(void) {
     } else {
         redisLog(REDIS_NOTICE,"Swap file allocated with success");
     }
     } else {
         redisLog(REDIS_NOTICE,"Swap file allocated with success");
     }
-    server.vm_bitmap = zmalloc((server.vm_pages+7)/8);
+    server.vm_bitmap = zcalloc((server.vm_pages+7)/8);
     redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages",
         (long long) (server.vm_pages+7)/8, server.vm_pages);
     redisLog(REDIS_VERBOSE,"Allocated %lld bytes page table for %lld pages",
         (long long) (server.vm_pages+7)/8, server.vm_pages);
-    memset(server.vm_bitmap,0,(server.vm_pages+7)/8);
 
     /* Initialize threaded I/O (used by Virtual Memory) */
     server.io_newjobs = listCreate();
 
     /* Initialize threaded I/O (used by Virtual Memory) */
     server.io_newjobs = listCreate();
@@ -97,7 +96,6 @@ void vmInit(void) {
     server.io_processed = listCreate();
     server.io_ready_clients = listCreate();
     pthread_mutex_init(&server.io_mutex,NULL);
     server.io_processed = listCreate();
     server.io_ready_clients = listCreate();
     pthread_mutex_init(&server.io_mutex,NULL);
-    pthread_mutex_init(&server.obj_freelist_mutex,NULL);
     pthread_mutex_init(&server.io_swapfile_mutex,NULL);
     server.io_active_threads = 0;
     if (pipe(pipefds) == -1) {
     pthread_mutex_init(&server.io_swapfile_mutex,NULL);
     server.io_active_threads = 0;
     if (pipe(pipefds) == -1) {
@@ -111,6 +109,11 @@ void vmInit(void) {
     /* LZF requires a lot of stack */
     pthread_attr_init(&server.io_threads_attr);
     pthread_attr_getstacksize(&server.io_threads_attr, &stacksize);
     /* LZF requires a lot of stack */
     pthread_attr_init(&server.io_threads_attr);
     pthread_attr_getstacksize(&server.io_threads_attr, &stacksize);
+
+    /* Solaris may report a stacksize of 0, let's set it to 1 otherwise
+     * multiplying it by 2 in the while loop later will not really help ;) */
+    if (!stacksize) stacksize = 1;
+
     while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
     pthread_attr_setstacksize(&server.io_threads_attr, stacksize);
     /* Listen for events in the threaded I/O pipe */
     while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
     pthread_attr_setstacksize(&server.io_threads_attr, stacksize);
     /* Listen for events in the threaded I/O pipe */
@@ -260,7 +263,7 @@ int vmWriteObjectOnSwap(robj *o, off_t page) {
  * If we can't find enough contiguous empty pages to swap the object on disk
  * NULL is returned. */
 vmpointer *vmSwapObjectBlocking(robj *val) {
  * If we can't find enough contiguous empty pages to swap the object on disk
  * NULL is returned. */
 vmpointer *vmSwapObjectBlocking(robj *val) {
-    off_t pages = rdbSavedObjectPages(val,NULL);
+    off_t pages = rdbSavedObjectPages(val);
     off_t page;
     vmpointer *vp;
 
     off_t page;
     vmpointer *vp;
 
@@ -358,7 +361,7 @@ robj *vmPreviewObject(robj *o) {
 double computeObjectSwappability(robj *o) {
     /* actual age can be >= minage, but not < minage. As we use wrapping
      * 21 bit clocks with minutes resolution for the LRU. */
 double computeObjectSwappability(robj *o) {
     /* actual age can be >= minage, but not < minage. As we use wrapping
      * 21 bit clocks with minutes resolution for the LRU. */
-    time_t minage = abs(server.lruclock - o->lru);
+    time_t minage = estimateObjectIdleTime(o);
     long asize = 0, elesize;
     robj *ele;
     list *l;
     long asize = 0, elesize;
     robj *ele;
     list *l;
@@ -396,15 +399,20 @@ double computeObjectSwappability(robj *o) {
         z = (o->type == REDIS_ZSET);
         d = z ? ((zset*)o->ptr)->dict : o->ptr;
 
         z = (o->type == REDIS_ZSET);
         d = z ? ((zset*)o->ptr)->dict : o->ptr;
 
-        asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
-        if (z) asize += sizeof(zset)-sizeof(dict);
-        if (dictSize(d)) {
-            de = dictGetRandomKey(d);
-            ele = dictGetEntryKey(de);
-            elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
-                            (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
-            asize += (sizeof(struct dictEntry)+elesize)*dictSize(d);
-            if (z) asize += sizeof(zskiplistNode)*dictSize(d);
+        if (!z && o->encoding == REDIS_ENCODING_INTSET) {
+            intset *is = o->ptr;
+            asize = sizeof(*is)+is->encoding*is->length;
+        } else {
+            asize = sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
+            if (z) asize += sizeof(zset)-sizeof(dict);
+            if (dictSize(d)) {
+                de = dictGetRandomKey(d);
+                ele = dictGetEntryKey(de);
+                elesize = (ele->encoding == REDIS_ENCODING_RAW) ?
+                                (sizeof(*o)+sdslen(ele->ptr)) : sizeof(*o);
+                asize += (sizeof(struct dictEntry)+elesize)*dictSize(d);
+                if (z) asize += sizeof(zskiplistNode)*dictSize(d);
+            }
         }
         break;
     case REDIS_HASH:
         }
         break;
     case REDIS_HASH:
@@ -544,7 +552,15 @@ void freeIOJob(iojob *j) {
 
 /* Every time a thread finished a Job, it writes a byte into the write side
  * of an unix pipe in order to "awake" the main thread, and this function
 
 /* Every time a thread finished a Job, it writes a byte into the write side
  * of an unix pipe in order to "awake" the main thread, and this function
- * is called. */
+ * is called.
+ *
+ * Note that this is called both by the event loop, when a I/O thread
+ * sends a byte in the notification pipe, and is also directly called from
+ * waitEmptyIOJobsQueue().
+ *
+ * In the latter case we don't want to swap more, so we use the
+ * "privdata" argument setting it to a not NULL value to signal this
+ * condition. */
 void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
             int mask)
 {
 void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
             int mask)
 {
@@ -554,6 +570,8 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
     REDIS_NOTUSED(mask);
     REDIS_NOTUSED(privdata);
 
     REDIS_NOTUSED(mask);
     REDIS_NOTUSED(privdata);
 
+    if (privdata != NULL) trytoswap = 0; /* check the comments above... */
+
     /* For every byte we read in the read side of the pipe, there is one
      * I/O job completed to process. */
     while((retval = read(fd,buf,1)) == 1) {
     /* For every byte we read in the read side of the pipe, there is one
      * I/O job completed to process. */
     while((retval = read(fd,buf,1)) == 1) {
@@ -803,9 +821,7 @@ void *IOThreadEntryPoint(void *arg) {
             vmpointer *vp = (vmpointer*)j->id;
             j->val = vmReadObjectFromSwap(j->page,vp->vtype);
         } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
             vmpointer *vp = (vmpointer*)j->id;
             j->val = vmReadObjectFromSwap(j->page,vp->vtype);
         } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
-            FILE *fp = fopen("/dev/null","w+");
-            j->pages = rdbSavedObjectPages(j->val,fp);
-            fclose(fp);
+            j->pages = rdbSavedObjectPages(j->val);
         } else if (j->type == REDIS_IOJOB_DO_SWAP) {
             if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
                 j->canceled = 1;
         } else if (j->type == REDIS_IOJOB_DO_SWAP) {
             if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
                 j->canceled = 1;
@@ -865,7 +881,8 @@ void waitEmptyIOJobsQueue(void) {
         io_processed_len = listLength(server.io_processed);
         unlockThreadedIO();
         if (io_processed_len) {
         io_processed_len = listLength(server.io_processed);
         unlockThreadedIO();
         if (io_processed_len) {
-            vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,NULL,0);
+            vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,
+                                                        (void*)0xdeadbeef,0);
             usleep(1000); /* 1 millisecond */
         } else {
             usleep(10000); /* 10 milliseconds */
             usleep(1000); /* 1 millisecond */
         } else {
             usleep(10000); /* 10 milliseconds */
@@ -1075,6 +1092,11 @@ int dontWaitForSwappedKey(redisClient *c, robj *key) {
     listIter li;
     struct dictEntry *de;
 
     listIter li;
     struct dictEntry *de;
 
+    /* The key object might be destroyed when deleted from the c->io_keys
+     * list (and the "key" argument is physically the same object as the
+     * object inside the list), so we need to protect it. */
+    incrRefCount(key);
+
     /* Remove the key from the list of keys this client is waiting for. */
     listRewind(c->io_keys,&li);
     while ((ln = listNext(&li)) != NULL) {
     /* Remove the key from the list of keys this client is waiting for. */
     listRewind(c->io_keys,&li);
     while ((ln = listNext(&li)) != NULL) {
@@ -1095,6 +1117,7 @@ int dontWaitForSwappedKey(redisClient *c, robj *key) {
     if (listLength(l) == 0)
         dictDelete(c->db->io_keys,key);
 
     if (listLength(l) == 0)
         dictDelete(c->db->io_keys,key);
 
+    decrRefCount(key);
     return listLength(c->io_keys) == 0;
 }
 
     return listLength(c->io_keys) == 0;
 }