]> git.saurik.com Git - redis.git/commitdiff
still more work for diskstore
authorantirez <antirez@gmail.com>
Wed, 29 Dec 2010 02:57:35 +0000 (03:57 +0100)
committerantirez <antirez@gmail.com>
Wed, 29 Dec 2010 02:57:35 +0000 (03:57 +0100)
src/dscache.c
src/redis.h

index 5adbeeeda6c3c0ae785fc78917b193a0a06afcc8..e2fd7e30224dfc08a89bf10ce81186085b924058 100644 (file)
  * as a fully non-blocking VM.
  */
 
+void spawnIOThread(void);
+
 /* =================== Virtual Memory - Blocking Side  ====================== */
 
 void dsInit(void) {
-    off_t totsize;
     int pipefds[2];
     size_t stacksize;
-    struct flock fl;
 
     zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
 
@@ -239,42 +239,22 @@ int dsCanTouchDiskStore(void) {
 /* =================== Virtual Memory - Threaded I/O  ======================= */
 
 void freeIOJob(iojob *j) {
-    if ((j->type == REDIS_IOJOB_PREPARE_SWAP ||
-        j->type == REDIS_IOJOB_DO_SWAP ||
-        j->type == REDIS_IOJOB_LOAD) && j->val != NULL)
-    {
-         /* we fix the storage type, otherwise decrRefCount() will try to
-          * kill the I/O thread Job (that does no longer exists). */
-        if (j->val->storage == REDIS_VM_SWAPPING)
-            j->val->storage = REDIS_VM_MEMORY;
-        decrRefCount(j->val);
-    }
     decrRefCount(j->key);
     zfree(j);
 }
 
 /* Every time a thread finished a Job, it writes a byte into the write side
  * of an unix pipe in order to "awake" the main thread, and this function
- * is called.
- *
- * Note that this is called both by the event loop, when a I/O thread
- * sends a byte in the notification pipe, and is also directly called from
- * waitEmptyIOJobsQueue().
- *
- * In the latter case we don't want to swap more, so we use the
- * "privdata" argument setting it to a not NULL value to signal this
- * condition. */
+ * is called. */
 void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
             int mask)
 {
     char buf[1];
-    int retval, processed = 0, toprocess = -1, trytoswap = 1;
+    int retval, processed = 0, toprocess = -1;
     REDIS_NOTUSED(el);
     REDIS_NOTUSED(mask);
     REDIS_NOTUSED(privdata);
 
-    if (privdata != NULL) trytoswap = 0; /* check the comments above... */
-
     /* For every byte we read in the read side of the pipe, there is one
      * I/O job completed to process. */
     while((retval = read(fd,buf,1)) == 1) {
@@ -295,11 +275,7 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
         j = ln->value;
         listDelNode(server.io_processed,ln);
         unlockThreadedIO();
-        /* If this job is marked as canceled, just ignore it */
-        if (j->canceled) {
-            freeIOJob(j);
-            continue;
-        }
+
         /* Post process it in the main thread, as there are things we
          * can do just here to avoid race conditions and/or invasive locks */
         redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr);
@@ -322,27 +298,6 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
             handleClientsBlockedOnSwappedKey(db,j->key);
             freeIOJob(j);
             zfree(vp);
-        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
-            /* Now we know the amount of pages required to swap this object.
-             * Let's find some space for it, and queue this task again
-             * rebranded as REDIS_IOJOB_DO_SWAP. */
-            if (!vmCanSwapOut() ||
-                vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
-            {
-                /* Ooops... no space or we can't swap as there is
-                 * a fork()ed Redis trying to save stuff on disk. */
-                j->val->storage = REDIS_VM_MEMORY; /* undo operation */
-                freeIOJob(j);
-            } else {
-                /* Note that we need to mark this pages as used now,
-                 * if the job will be canceled, we'll mark them as freed
-                 * again. */
-                vmMarkPagesUsed(j->page,j->pages);
-                j->type = REDIS_IOJOB_DO_SWAP;
-                lockThreadedIO();
-                queueIOJob(j);
-                unlockThreadedIO();
-            }
         } else if (j->type == REDIS_IOJOB_DO_SWAP) {
             vmpointer *vp;
 
index 8dd461698e054db121abb30c9a86913c19429c2a..e12b1c18c04dd040ef28feb8499be90219db069d 100644 (file)
 #define REDIS_DS_SAVING 2       /* There is an IO Job created for this obj. */
 
 #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
+#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
 
 /* Client flags */
 #define REDIS_SLAVE 1       /* This client is a slave server */
@@ -542,22 +543,15 @@ typedef struct zset {
 } zset;
 
 /* VM threaded I/O request message */
-#define REDIS_IOJOB_LOAD 0          /* Load from disk to memory */
-#define REDIS_IOJOB_PREPARE_SWAP 1  /* Compute needed pages */
-#define REDIS_IOJOB_DO_SWAP 2       /* Swap from memory to disk */
+#define REDIS_IOJOB_LOAD 0
+#define REDIS_IOJOB_SAVE 1
+
 typedef struct iojob {
     int type;   /* Request type, REDIS_IOJOB_* */
     redisDb *db;/* Redis database */
-    robj *key;  /* This I/O request is about swapping this key */
-    robj *id;   /* Unique identifier of this job:
-                   this is the object to swap for REDIS_IOREQ_*_SWAP, or the
-                   vmpointer objct for REDIS_IOREQ_LOAD. */
-    robj *val;  /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
-                 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
-    off_t page; /* Swap page where to read/write the object */
-    off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */
-    int canceled; /* True if this command was canceled by blocking side of VM */
-    pthread_t thread; /* ID of the thread processing this entry */
+    robj *key;  /* This I/O request is about this key */
+    robj *val;  /* the value to swap for REDIS_IOJOB_SAVE, otherwise this
+                 * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */
 } iojob;
 
 /* Structure to hold list iteration abstraction. */