]> git.saurik.com Git - redis.git/commitdiff
more step forwards for disk store to be able to run
authorantirez <antirez@gmail.com>
Wed, 29 Dec 2010 15:58:57 +0000 (16:58 +0100)
committerantirez <antirez@gmail.com>
Wed, 29 Dec 2010 15:58:57 +0000 (16:58 +0100)
src/debug.c
src/diskstore.c
src/dscache.c
src/redis.h

index fff8d7277f8728ded6d1a37ef5c4f012edb7232d..88c88ca91afeebf61e84742fe26c3f5678b04d12 100644 (file)
@@ -200,60 +200,28 @@ void debugCommand(redisClient *c) {
     } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
         dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
         robj *val;
+        char *strenc;
+        char *storage;
 
         if (!de) {
             addReply(c,shared.nokeyerr);
             return;
         }
         val = dictGetEntryVal(de);
-        if (!server.vm_enabled || (val->storage == REDIS_VM_MEMORY ||
-                                   val->storage == REDIS_VM_SWAPPING)) {
-            char *strenc;
-
-            strenc = strEncoding(val->encoding);
-            addReplyStatusFormat(c,
-                "Value at:%p refcount:%d "
-                "encoding:%s serializedlength:%lld "
-                "lru:%d lru_seconds_idle:%lu",
-                (void*)val, val->refcount,
-                strenc, (long long) rdbSavedObjectLen(val),
-                val->lru, estimateObjectIdleTime(val));
-        } else {
-            vmpointer *vp = (vmpointer*) val;
-            addReplyStatusFormat(c,
-                "Value swapped at: page %llu "
-                "using %llu pages",
-                (unsigned long long) vp->page,
-                (unsigned long long) vp->usedpages);
-        }
-    } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) {
-        lookupKeyRead(c->db,c->argv[2]);
-        addReply(c,shared.ok);
-    } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) {
-        dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
-        robj *val;
-        vmpointer *vp;
-
-        if (!server.vm_enabled) {
-            addReplyError(c,"Virtual Memory is disabled");
-            return;
-        }
-        if (!de) {
-            addReply(c,shared.nokeyerr);
-            return;
-        }
-        val = dictGetEntryVal(de);
-        /* Swap it */
-        if (val->storage != REDIS_VM_MEMORY) {
-            addReplyError(c,"This key is not in memory");
-        } else if (val->refcount != 1) {
-            addReplyError(c,"Object is shared");
-        } else if ((vp = vmSwapObjectBlocking(val)) != NULL) {
-            dictGetEntryVal(de) = vp;
-            addReply(c,shared.ok);
-        } else {
-            addReply(c,shared.err);
+        strenc = strEncoding(val->encoding);
+        switch(val->storage) {
+        case REDIS_DS_MEMORY: storage = "memory"; break;
+        case REDIS_DS_DIRTY: storage = "dirty"; break;
+        case REDIS_DS_SAVING: storage = "saving"; break;
+        default: storage = "unknown"; break;
         }
+        addReplyStatusFormat(c,
+            "Value at:%p refcount:%d "
+            "encoding:%s serializedlength:%lld "
+            "lru:%d lru_seconds_idle:%lu storage:%s",
+            (void*)val, val->refcount,
+            strenc, (long long) rdbSavedObjectLen(val),
+            val->lru, estimateObjectIdleTime(val), storage);
     } else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) {
         long keys, j;
         robj *key, *val;
index acc7c16f93ed6cb9c151fda11c0eac1ffcaa50d8..3904310d3c5c4a070ff0e8a1762014c043826c10 100644 (file)
@@ -115,5 +115,8 @@ int dsSet(redisDb *db, robj *key, robj *val) {
 robj *dsGet(redisDb *db, robj *key) {
 }
 
+int dsDel(redisDb *db, robj *key) {
+}
+
 int dsExists(redisDb *db, robj *key) {
 }
index 2bda0b509016ddeff0c0afe077fdda3cdee0d6dd..5570e9c5dba59f4019d7b579cfa6f6c93a3bf212 100644 (file)
@@ -227,6 +227,7 @@ int cacheFreeOneEntry(void) {
         dbDelete(best_db,kobj);
         decrRefCount(kobj);
     }
+    return REDIS_OK;
 }
 
 /* Return true if it's safe to swap out objects in a given moment.
@@ -240,7 +241,8 @@ int dsCanTouchDiskStore(void) {
 
 void freeIOJob(iojob *j) {
     decrRefCount(j->key);
-    decrRefCount(j->val);
+    /* j->val can be NULL if the job is about deleting the key from disk. */
+    if (j->val) decrRefCount(j->val);
     zfree(j);
 }
 
@@ -279,13 +281,17 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
 
         /* Post process it in the main thread, as there are things we
          * can do just here to avoid race conditions and/or invasive locks */
-        redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr);
+        redisLog(REDIS_DEBUG,"COMPLETED Job type %s, key: %s",
+            (j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
+            (unsigned char*)j->key->ptr);
         de = dictFind(j->db->dict,j->key->ptr);
         redisAssert(de != NULL);
         if (j->type == REDIS_IOJOB_LOAD) {
+            /* Create the key-value pair in the in-memory database */
             dbAdd(j->db,j->key,j->val);
+            /* Handle clients waiting for this key to be loaded. */
+            handleClientsBlockedOnSwappedKey(j->db,j->key);
             freeIOJob(j);
-            /* FIXME: notify clients waiting for this key */
         } else if (j->type == REDIS_IOJOB_SAVE) {
             redisAssert(j->val->storage == REDIS_DS_SAVING);
             j->val->storage = REDIS_DS_MEMORY;
@@ -330,22 +336,23 @@ void *IOThreadEntryPoint(void *arg) {
         j = ln->value;
         listDelNode(server.io_newjobs,ln);
         /* Add the job in the processing queue */
-        j->thread = pthread_self();
         listAddNodeTail(server.io_processing,j);
         ln = listLast(server.io_processing); /* We use ln later to remove it */
         unlockThreadedIO();
-        redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
-            (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
+        redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
+            (long) pthread_self(),
+            (j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
+            (void*)j, (char*)j->key->ptr);
 
         /* Process the Job */
         if (j->type == REDIS_IOJOB_LOAD) {
-            vmpointer *vp = (vmpointer*)j->id;
-            j->val = vmReadObjectFromSwap(j->page,vp->vtype);
-        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
-            j->pages = rdbSavedObjectPages(j->val);
-        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
-            if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
-                j->canceled = 1;
+            j->val = dsGet(j->db,j->key);
+            redisAssert(j->val != NULL);
+        } else if (j->type == REDIS_IOJOB_SAVE) {
+            if (j->val)
+                dsSet(j->db,j->key,j->val);
+            else
+                dsDel(j->db,j->key);
         }
 
         /* Done: insert the job into the processed queue */
@@ -420,52 +427,50 @@ void queueIOJob(iojob *j) {
         spawnIOThread();
 }
 
-int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
+void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
     iojob *j;
 
     j = zmalloc(sizeof(*j));
-    j->type = REDIS_IOJOB_PREPARE_SWAP;
+    j->type = type;
     j->db = db;
     j->key = key;
     incrRefCount(key);
-    j->id = j->val = val;
+    j->val = val;
     incrRefCount(val);
-    j->canceled = 0;
-    j->thread = (pthread_t) -1;
-    val->storage = REDIS_VM_SWAPPING;
 
     lockThreadedIO();
     queueIOJob(j);
     unlockThreadedIO();
-    return REDIS_OK;
 }
 
 /* ============ Virtual Memory - Blocking clients on missing keys =========== */
 
 /* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
- * If there is not already a job loading the key, it is craeted.
- * The key is added to the io_keys list in the client structure, and also
+ * If the key is already in memory we don't need to block, regardless
+ * of the storage of the value object for this key:
+ *
+ * - If it's REDIS_DS_MEMORY we have the key in memory.
+ * - If it's REDIS_DS_DIRTY they key was modified, but still in memory.
+ * - if it's REDIS_DS_SAVING the key is being saved by an IO Job. When
+ *   the client will lookup the key it will block if the key is still
+ *   in this stage but it's more or less the best we can do.
+ *   FIXME: we should try if it's actually better to suspend the client
+ *   accessing an object that is being saved, and awake it only when
+ *   the saving was completed.
+ *
+ * Otherwise if the key is not in memory, we block the client and start
+ * an IO Job to load it:
+ *
+ * the key is added to the io_keys list in the client structure, and also
  * in the hash table mapping swapped keys to waiting clients, that is,
  * server.io_waited_keys. */
 int waitForSwappedKey(redisClient *c, robj *key) {
     struct dictEntry *de;
-    robj *o;
     list *l;
 
-    /* If the key does not exist or is already in RAM we don't need to
-     * block the client at all. */
+    /* Return ASAP if the key is in memory */
     de = dictFind(c->db->dict,key->ptr);
-    if (de == NULL) return 0;
-    o = dictGetEntryVal(de);
-    if (o->storage == REDIS_VM_MEMORY) {
-        return 0;
-    } else if (o->storage == REDIS_VM_SWAPPING) {
-        /* We were swapping the key, undo it! */
-        vmCancelThreadedIOJob(o);
-        return 0;
-    }
-
-    /* OK: the key is either swapped, or being loaded just now. */
+    if (de != NULL) return 0;
 
     /* Add the key to the list of keys this client is waiting for.
      * This maps clients to keys they are waiting for. */
@@ -488,25 +493,8 @@ int waitForSwappedKey(redisClient *c, robj *key) {
     listAddNodeTail(l,c);
 
     /* Are we already loading the key from disk? If not create a job */
-    if (o->storage == REDIS_VM_SWAPPED) {
-        iojob *j;
-        vmpointer *vp = (vmpointer*)o;
-
-        o->storage = REDIS_VM_LOADING;
-        j = zmalloc(sizeof(*j));
-        j->type = REDIS_IOJOB_LOAD;
-        j->db = c->db;
-        j->id = (robj*)vp;
-        j->key = key;
-        incrRefCount(key);
-        j->page = vp->page;
-        j->val = NULL;
-        j->canceled = 0;
-        j->thread = (pthread_t) -1;
-        lockThreadedIO();
-        queueIOJob(j);
-        unlockThreadedIO();
-    }
+    if (de == NULL)
+        dsCreateIOJob(REDIS_IOJOB_LOAD,c->db,key,NULL);
     return 1;
 }
 
@@ -584,7 +572,7 @@ int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) {
     if (listLength(c->io_keys)) {
         c->flags |= REDIS_IO_WAIT;
         aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
-        server.vm_blocked_clients++;
+        server.cache_blocked_clients++;
         return 1;
     } else {
         return 0;
index e12b1c18c04dd040ef28feb8499be90219db069d..1557e2604fd3296c2ecd870591d05659150de831 100644 (file)
@@ -772,6 +772,7 @@ int dsOpen(void);
 int dsClose(void);
 int dsSet(redisDb *db, robj *key, robj *val);
 robj *dsGet(redisDb *db, robj *key);
+int dsDel(redisDb *db, robj *key);
 int dsExists(redisDb *db, robj *key);
 
 /* Disk Store Cache */