} else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
robj *val;
+ char *strenc;
+ char *storage;
if (!de) {
addReply(c,shared.nokeyerr);
return;
}
val = dictGetEntryVal(de);
- if (!server.vm_enabled || (val->storage == REDIS_VM_MEMORY ||
- val->storage == REDIS_VM_SWAPPING)) {
- char *strenc;
-
- strenc = strEncoding(val->encoding);
- addReplyStatusFormat(c,
- "Value at:%p refcount:%d "
- "encoding:%s serializedlength:%lld "
- "lru:%d lru_seconds_idle:%lu",
- (void*)val, val->refcount,
- strenc, (long long) rdbSavedObjectLen(val),
- val->lru, estimateObjectIdleTime(val));
- } else {
- vmpointer *vp = (vmpointer*) val;
- addReplyStatusFormat(c,
- "Value swapped at: page %llu "
- "using %llu pages",
- (unsigned long long) vp->page,
- (unsigned long long) vp->usedpages);
- }
- } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) {
- lookupKeyRead(c->db,c->argv[2]);
- addReply(c,shared.ok);
- } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) {
- dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr);
- robj *val;
- vmpointer *vp;
-
- if (!server.vm_enabled) {
- addReplyError(c,"Virtual Memory is disabled");
- return;
- }
- if (!de) {
- addReply(c,shared.nokeyerr);
- return;
- }
- val = dictGetEntryVal(de);
- /* Swap it */
- if (val->storage != REDIS_VM_MEMORY) {
- addReplyError(c,"This key is not in memory");
- } else if (val->refcount != 1) {
- addReplyError(c,"Object is shared");
- } else if ((vp = vmSwapObjectBlocking(val)) != NULL) {
- dictGetEntryVal(de) = vp;
- addReply(c,shared.ok);
- } else {
- addReply(c,shared.err);
+ strenc = strEncoding(val->encoding);
+ switch(val->storage) {
+ case REDIS_DS_MEMORY: storage = "memory"; break;
+ case REDIS_DS_DIRTY: storage = "dirty"; break;
+ case REDIS_DS_SAVING: storage = "saving"; break;
+ default: storage = "unknown"; break;
}
+ addReplyStatusFormat(c,
+ "Value at:%p refcount:%d "
+ "encoding:%s serializedlength:%lld "
+ "lru:%d lru_seconds_idle:%lu storage:%s",
+ (void*)val, val->refcount,
+ strenc, (long long) rdbSavedObjectLen(val),
+ val->lru, estimateObjectIdleTime(val), storage);
} else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) {
long keys, j;
robj *key, *val;
dbDelete(best_db,kobj);
decrRefCount(kobj);
}
+ return REDIS_OK;
}
/* Return true if it's safe to swap out objects in a given moment.
void freeIOJob(iojob *j) {
decrRefCount(j->key);
- decrRefCount(j->val);
+ /* j->val can be NULL if the job is about deleting the key from disk. */
+ if (j->val) decrRefCount(j->val);
zfree(j);
}
/* Post process it in the main thread, as there are things we
* can do just here to avoid race conditions and/or invasive locks */
- redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr);
+ redisLog(REDIS_DEBUG,"COMPLETED Job type %s, key: %s",
+ (j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
+ (unsigned char*)j->key->ptr);
de = dictFind(j->db->dict,j->key->ptr);
redisAssert(de != NULL);
if (j->type == REDIS_IOJOB_LOAD) {
+ /* Create the key-value pair in the in-memory database */
dbAdd(j->db,j->key,j->val);
+ /* Handle clients waiting for this key to be loaded. */
+ handleClientsBlockedOnSwappedKey(j->db,j->key);
freeIOJob(j);
- /* FIXME: notify clients waiting for this key */
} else if (j->type == REDIS_IOJOB_SAVE) {
redisAssert(j->val->storage == REDIS_DS_SAVING);
j->val->storage = REDIS_DS_MEMORY;
j = ln->value;
listDelNode(server.io_newjobs,ln);
/* Add the job in the processing queue */
- j->thread = pthread_self();
listAddNodeTail(server.io_processing,j);
ln = listLast(server.io_processing); /* We use ln later to remove it */
unlockThreadedIO();
- redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
- (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
+ redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
+ (long) pthread_self(),
+ (j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
+ (void*)j, (char*)j->key->ptr);
/* Process the Job */
if (j->type == REDIS_IOJOB_LOAD) {
- vmpointer *vp = (vmpointer*)j->id;
- j->val = vmReadObjectFromSwap(j->page,vp->vtype);
- } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
- j->pages = rdbSavedObjectPages(j->val);
- } else if (j->type == REDIS_IOJOB_DO_SWAP) {
- if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
- j->canceled = 1;
+ j->val = dsGet(j->db,j->key);
+ redisAssert(j->val != NULL);
+ } else if (j->type == REDIS_IOJOB_SAVE) {
+ if (j->val)
+ dsSet(j->db,j->key,j->val);
+ else
+ dsDel(j->db,j->key);
}
/* Done: insert the job into the processed queue */
spawnIOThread();
}
-int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
+void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
iojob *j;
j = zmalloc(sizeof(*j));
- j->type = REDIS_IOJOB_PREPARE_SWAP;
+ j->type = type;
j->db = db;
j->key = key;
incrRefCount(key);
- j->id = j->val = val;
+ j->val = val;
incrRefCount(val);
- j->canceled = 0;
- j->thread = (pthread_t) -1;
- val->storage = REDIS_VM_SWAPPING;
lockThreadedIO();
queueIOJob(j);
unlockThreadedIO();
- return REDIS_OK;
}
/* ============ Virtual Memory - Blocking clients on missing keys =========== */
/* This function makes the clinet 'c' waiting for the key 'key' to be loaded.
- * If there is not already a job loading the key, it is craeted.
- * The key is added to the io_keys list in the client structure, and also
+ * If the key is already in memory we don't need to block, regardless
+ * of the storage of the value object for this key:
+ *
+ * - If it's REDIS_DS_MEMORY we have the key in memory.
+ * - If it's REDIS_DS_DIRTY they key was modified, but still in memory.
+ * - if it's REDIS_DS_SAVING the key is being saved by an IO Job. When
+ * the client will lookup the key it will block if the key is still
+ * in this stage but it's more or less the best we can do.
+ * FIXME: we should try if it's actually better to suspend the client
+ * accessing an object that is being saved, and awake it only when
+ * the saving was completed.
+ *
+ * Otherwise if the key is not in memory, we block the client and start
+ * an IO Job to load it:
+ *
+ * the key is added to the io_keys list in the client structure, and also
* in the hash table mapping swapped keys to waiting clients, that is,
* server.io_waited_keys. */
int waitForSwappedKey(redisClient *c, robj *key) {
struct dictEntry *de;
- robj *o;
list *l;
- /* If the key does not exist or is already in RAM we don't need to
- * block the client at all. */
+ /* Return ASAP if the key is in memory */
de = dictFind(c->db->dict,key->ptr);
- if (de == NULL) return 0;
- o = dictGetEntryVal(de);
- if (o->storage == REDIS_VM_MEMORY) {
- return 0;
- } else if (o->storage == REDIS_VM_SWAPPING) {
- /* We were swapping the key, undo it! */
- vmCancelThreadedIOJob(o);
- return 0;
- }
-
- /* OK: the key is either swapped, or being loaded just now. */
+ if (de != NULL) return 0;
/* Add the key to the list of keys this client is waiting for.
* This maps clients to keys they are waiting for. */
listAddNodeTail(l,c);
/* Are we already loading the key from disk? If not create a job */
- if (o->storage == REDIS_VM_SWAPPED) {
- iojob *j;
- vmpointer *vp = (vmpointer*)o;
-
- o->storage = REDIS_VM_LOADING;
- j = zmalloc(sizeof(*j));
- j->type = REDIS_IOJOB_LOAD;
- j->db = c->db;
- j->id = (robj*)vp;
- j->key = key;
- incrRefCount(key);
- j->page = vp->page;
- j->val = NULL;
- j->canceled = 0;
- j->thread = (pthread_t) -1;
- lockThreadedIO();
- queueIOJob(j);
- unlockThreadedIO();
- }
+ if (de == NULL)
+ dsCreateIOJob(REDIS_IOJOB_LOAD,c->db,key,NULL);
return 1;
}
if (listLength(c->io_keys)) {
c->flags |= REDIS_IO_WAIT;
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
- server.vm_blocked_clients++;
+ server.cache_blocked_clients++;
return 1;
} else {
return 0;