char buf[128];
sds argsds;
struct redisCommand *cmd;
- int force_swapout;
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
/* Clean up, ready for the next command */
for (j = 0; j < argc; j++) decrRefCount(argv[j]);
zfree(argv);
-
- /* Handle swapping while loading big datasets when VM is on */
- force_swapout = 0;
- if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
- force_swapout = 1;
-
- if (server.vm_enabled && force_swapout) {
- while (zmalloc_used_memory() > server.vm_max_memory) {
- if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
- }
- }
}
/* This point can only be reached when EOF is reached without errors.
sds keystr = dictGetEntryKey(de);
robj key, *o;
time_t expiretime;
- int swapped;
keystr = dictGetEntryKey(de);
o = dictGetEntryVal(de);
initStaticStringObject(key,keystr);
- /* If the value for this key is swapped, load a preview in memory.
- * We use a "swapped" flag to remember if we need to free the
- * value object instead to just increment the ref count anyway
- * in order to avoid copy-on-write of pages if we are forked() */
- if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
- o->storage == REDIS_VM_SWAPPING) {
- swapped = 0;
- } else {
- o = vmPreviewObject(o);
- swapped = 1;
- }
+
expiretime = getExpire(db,&key);
/* Save the key and associated value */
if (fwriteBulkObject(fp,&key) == 0) goto werr;
if (fwriteBulkLongLong(fp,expiretime) == 0) goto werr;
}
- if (swapped) decrRefCount(o);
}
dictReleaseIterator(di);
}
pid_t childpid;
if (server.bgrewritechildpid != -1) return REDIS_ERR;
- if (server.vm_enabled) waitEmptyIOJobsQueue();
+ redisAssert(server.ds_enabled == 0);
if ((childpid = fork()) == 0) {
/* Child */
char tmpfile[256];
- if (server.vm_enabled) vmReopenSwapFile();
if (server.ipfd > 0) close(server.ipfd);
if (server.sofd > 0) close(server.sofd);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
} else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
zfree(server.dbfilename);
server.dbfilename = zstrdup(argv[1]);
- } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
- if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
+ } else if (!strcasecmp(argv[0],"diskstore-enabled") && argc == 2) {
+ if ((server.ds_enabled = yesnotoi(argv[1])) == -1) {
err = "argument must be 'yes' or 'no'"; goto loaderr;
}
- } else if (!strcasecmp(argv[0],"vm-swap-file") && argc == 2) {
- zfree(server.vm_swap_file);
- server.vm_swap_file = zstrdup(argv[1]);
- } else if (!strcasecmp(argv[0],"vm-max-memory") && argc == 2) {
- server.vm_max_memory = memtoll(argv[1],NULL);
- } else if (!strcasecmp(argv[0],"vm-page-size") && argc == 2) {
- server.vm_page_size = memtoll(argv[1], NULL);
- } else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) {
- server.vm_pages = memtoll(argv[1], NULL);
- } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) {
- server.vm_max_threads = strtoll(argv[1], NULL, 10);
+ } else if (!strcasecmp(argv[0],"diskstore-path") && argc == 2) {
+ zfree(server.ds_path);
+ server.ds_path = zstrdup(argv[1]);
+ } else if (!strcasecmp(argv[0],"cache-max-memory") && argc == 2) {
+ server.cache_max_memory = memtoll(argv[1],NULL);
} else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2) {
server.hash_max_zipmap_entries = memtoll(argv[1], NULL);
} else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2) {
if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
val->lru = server.lruclock;
- if (server.vm_enabled) {
- if (val->storage == REDIS_VM_MEMORY ||
- val->storage == REDIS_VM_SWAPPING)
- {
- /* If we were swapping the object out, cancel the operation */
- if (val->storage == REDIS_VM_SWAPPING)
- vmCancelThreadedIOJob(val);
- } else {
- int notify = (val->storage == REDIS_VM_LOADING);
-
- /* Our value was swapped on disk. Bring it at home. */
- redisAssert(val->type == REDIS_VMPOINTER);
- val = vmLoadObject(val);
- dictGetEntryVal(de) = val;
-
- /* Clients blocked by the VM subsystem may be waiting for
- * this key... */
- if (notify) handleClientsBlockedOnSwappedKey(db,key);
- }
+ if (server.ds_enabled && val->storage == REDIS_DS_SAVING) {
+ /* FIXME: change this code to just wait for our object to
+ * get out of the IO Job. */
+ waitEmptyIOJobsQueue();
+ redisAssert(val->storage != REDIS_DS_SAVING);
}
server.stat_keyspace_hits++;
return val;
} else {
+ /* FIXME: Check if the object is on disk, if it is, load it
+ * in a blocking way now. */
server.stat_keyspace_misses++;
return NULL;
}
* deleting the key will kill the I/O thread bringing the key from swap
* to memory, so the client will never be notified and unblocked if we
* don't do it now. */
- if (server.vm_enabled) handleClientsBlockedOnSwappedKey(db,key);
+ if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
+
+ /* FIXME: we need to delete the IO Job loading the key, or simply we can
+ * wait for it to finish. */
+
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
*
* - cron() checks if there are elements on this list. When there are things
* to flush, we create an IO Job for the I/O thread.
- * FIXME: how to mark this key as "busy"? With VM we used to change the
- * object->storage field, but this time we need this to work with every
- * kind of object, including shared ones. One possibility is just killing
- * object sharing at all. So let's assume this will be our solution.
- *
- * So we set keys that are in the process of being saved as
- * object->storage = REDIS_STORAGE_SAVING;
+ * NOTE: We disalbe object sharing when server.ds_enabled == 1 so objects
+ * that are referenced an IO job for flushing on disk are marked as
+ * o->storage == REDIS_DS_SAVING.
*
* - This is what we do on key lookup:
- * 1) The key already exists in memory. object->storage == REDIS_DS_MEMORY.
+ * 1) The key already exists in memory. object->storage == REDIS_DS_MEMORY
+ * or it is object->storage == REDIS_DS_DIRTY:
* We don't do nothing special, lookup, return value object pointer.
* 2) The key is in memory but object->storage == REDIS_DS_SAVING.
- * This is an explicit lookup so we have to abort the saving operation.
- * We kill the IO Job, set the storage to == REDIS_DB_MEMORY but
- * re-queue the object in the server.ds_cache_dirty list.
- *
- * Btw here we need some protection against the problem of continuously
- * writing against a value having the effect of this value to be never
- * saved on disk. That is, at some point we need to block and write it
- * if there is too much delay.
+ * When this happens we block waiting for the I/O thread to process
+ * this object. Then continue.
* 3) The key is not in memory. We block to load the key from disk.
* Of course the key may not be present at all on the disk store as well,
* in such case we just detect this condition and continue, returning
* keys a client is going to use. We block the client, load keys
* using the I/O thread, unblock the client. Same code as VM more or less.
*
- * - Transfering keys from memory to disk.
- * Again while in cron() we detect our memory limit was reached. What we
- * do is transfering random keys that are not set as dirty on disk, using
- * LRU to select the key.
+ * - Reclaiming memory.
+ * In cron() we detect our memory limit was reached. What we
+ * do is deleting keys that are REDIS_DS_MEMORY, using LRU.
+ *
* If this is not enough to return again under the memory limits we also
* start to flush keys that need to be synched on disk synchronously,
- * removing it from the memory.
+ * removing it from the memory. We do this blocking as memory limit is a
+ * much "harder" barrirer in the new design.
*
* - IO thread operations are no longer stopped for sync loading/saving of
- * things. When a key is found to be in the process of being saved or
- * loaded we simply wait for the IO thread to end its work.
+ * things. When a key is found to be in the process of being saved
+ * we simply wait for the IO thread to end its work.
*
* Otherwise if there is to load a key without any IO thread operation
* just started it is blocking-loaded in the lookup function.
+ *
+ * - What happens when an object is destroyed?
+ *
+ * If o->storage == REDIS_DS_MEMORY then we simply destory the object.
+ * If o->storage == REDIS_DS_DIRTY we can still remove the object. It had
+ * changes not flushed on disk, but is being removed so
+ * who cares.
+ * if o->storage == REDIS_DS_SAVING then the object is being saved so
+ * it is impossible that its refcount == 1, must be at
+ * least two. When the object is saved the storage will
+ * be set back to DS_MEMORY.
+ *
+ * - What happens when keys are deleted?
+ *
+ * We simply schedule a key flush operation as usually, but when the
+ * IO thread will be created the object pointer will be set to NULL
+ * so the IO thread will know that the work to do is to delete the key
+ * from the disk store.
+ *
+ * - What happens with MULTI/EXEC?
+ *
+ * Good question.
*/
/* Virtual Memory is composed mainly of two subsystems:
void addReply(redisClient *c, robj *obj) {
if (_installWriteEvent(c) != REDIS_OK) return;
- redisAssert(!server.ds_enabled || obj->storage == REDIS_VM_MEMORY);
+ redisAssert(!server.ds_enabled || obj->storage == REDIS_DS_MEMORY);
/* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the
/* The following is only needed if VM is active, but since the conditional
* is probably more costly than initializing the field it's better to
* have every field properly initialized anyway. */
- o->storage = REDIS_VM_MEMORY;
+ o->storage = REDIS_DS_MEMORY;
return o;
}
void decrRefCount(void *obj) {
robj *o = obj;
- /* Object is a swapped out value, or in the process of being loaded. */
- if (server.vm_enabled &&
- (o->storage == REDIS_VM_SWAPPED || o->storage == REDIS_VM_LOADING))
- {
- vmpointer *vp = obj;
- if (o->storage == REDIS_VM_LOADING) vmCancelThreadedIOJob(o);
- vmMarkPagesFree(vp->page,vp->usedpages);
- server.vm_stats_swapped_objects--;
- zfree(vp);
- return;
- }
-
if (o->refcount <= 0) redisPanic("decrRefCount against refcount <= 0");
- /* Object is in memory, or in the process of being swapped out.
- *
- * If the object is being swapped out, abort the operation on
- * decrRefCount even if the refcount does not drop to 0: the object
- * is referenced at least two times, as value of the key AND as
- * job->val in the iojob. So if we don't invalidate the iojob, when it is
- * done but the relevant key was removed in the meantime, the
- * complete jobs handler will not find the key about the job and the
- * assert will fail. */
- if (server.vm_enabled && o->storage == REDIS_VM_SWAPPING)
- vmCancelThreadedIOJob(o);
if (--(o->refcount) == 0) {
+ /* DS_SAVING objects should always have a reference in the
+ * IO Job structure. So we should never reach this state. */
+ redisAssert(o->storage != REDIS_DS_SAVING);
switch(o->type) {
case REDIS_STRING: freeStringObject(o); break;
case REDIS_LIST: freeListObject(o); break;
return len;
}
-/* Return the number of pages required to save this object in the swap file */
-off_t rdbSavedObjectPages(robj *o) {
- off_t bytes = rdbSavedObjectLen(o);
- return (bytes+(server.vm_page_size-1))/server.vm_page_size;
-}
-
/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
int rdbSave(char *filename) {
dictIterator *di = NULL;
int j;
time_t now = time(NULL);
- /* Wait for I/O therads to terminate, just in case this is a
- * foreground-saving, to avoid seeking the swap file descriptor at the
- * same time. */
- if (server.vm_enabled)
- waitEmptyIOJobsQueue();
+ /* FIXME: implement .rdb save for disk store properly */
+ redisAssert(server.ds_enabled == 0);
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
if (rdbSaveTime(fp,expiretime) == -1) goto werr;
}
- /* Save the key and associated value. This requires special
- * handling if the value is swapped out. */
- if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY ||
- o->storage == REDIS_VM_SWAPPING) {
- /* Save type, key, value */
- if (rdbSaveType(fp,o->type) == -1) goto werr;
- if (rdbSaveStringObject(fp,&key) == -1) goto werr;
- if (rdbSaveObject(fp,o) == -1) goto werr;
- } else {
- /* REDIS_VM_SWAPPED or REDIS_VM_LOADING */
- robj *po;
- /* Get a preview of the object in memory */
- po = vmPreviewObject(o);
- /* Save type, key, value */
- if (rdbSaveType(fp,po->type) == -1) goto werr;
- if (rdbSaveStringObject(fp,&key) == -1) goto werr;
- if (rdbSaveObject(fp,po) == -1) goto werr;
- /* Remove the loaded object from memory */
- decrRefCount(po);
- }
+ /* Save type, key, value */
+ if (rdbSaveType(fp,o->type) == -1) goto werr;
+ if (rdbSaveStringObject(fp,&key) == -1) goto werr;
+ if (rdbSaveObject(fp,o) == -1) goto werr;
}
dictReleaseIterator(di);
}
pid_t childpid;
if (server.bgsavechildpid != -1) return REDIS_ERR;
- if (server.vm_enabled) waitEmptyIOJobsQueue();
+ redisAssert(server.ds_enabled == 0);
server.dirty_before_bgsave = server.dirty;
if ((childpid = fork()) == 0) {
/* Child */
- if (server.vm_enabled) vmReopenSwapFile();
if (server.ipfd > 0) close(server.ipfd);
if (server.sofd > 0) close(server.sofd);
if (rdbSave(filename) == REDIS_OK) {
startLoading(fp);
while(1) {
robj *key, *val;
- int force_swapout;
-
expiretime = -1;
/* Serve the clients from time to time */
continue;
}
decrRefCount(key);
-
- /* Flush data on disk once 32 MB of additional RAM are used... */
- force_swapout = 0;
- if ((zmalloc_used_memory() - server.vm_max_memory) > 1024*1024*32)
- force_swapout = 1;
-
- /* If we have still some hope of having some value fitting memory
- * then we try random sampling. */
- if (!swap_all_values && server.vm_enabled && force_swapout) {
- while (zmalloc_used_memory() > server.vm_max_memory) {
- if (vmSwapOneObjectBlocking() == REDIS_ERR) break;
- }
- if (zmalloc_used_memory() > server.vm_max_memory)
- swap_all_values = 1; /* We are already using too much mem */
- }
}
fclose(fp);
stopLoading();
#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
-/* Virtual memory object->where field. */
-#define REDIS_VM_MEMORY 0 /* The object is on memory */
-#define REDIS_VM_SWAPPED 1 /* The object is on disk */
-#define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
-#define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
-
-/* Virtual memory static configuration stuff.
- * Check vmFindContiguousPages() to know more about this magic numbers. */
-#define REDIS_VM_MAX_NEAR_PAGES 65536
-#define REDIS_VM_MAX_RANDOM_JUMP 4096
-#define REDIS_VM_MAX_THREADS 32
-#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
-/* The following is the *percentage* of completed I/O jobs to process when the
- * handelr is called. While Virtual Memory I/O operations are performed by
- * threads, this operations must be processed by the main thread when completed
- * in order to take effect. */
+/* Disk store cache object->storage values */
+#define REDIS_DS_MEMORY 0 /* The object is on memory */
+#define REDIS_DS_DIRTY 1 /* The object was modified */
+#define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */
+
#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
/* Client flags */
_var.type = REDIS_STRING; \
_var.encoding = REDIS_ENCODING_RAW; \
_var.ptr = _ptr; \
- _var.storage = REDIS_VM_MEMORY; \
+ _var.storage = REDIS_DS_MEMORY; \
} while(0);
typedef struct redisDb {