#include <sys/uio.h>
#include <limits.h>
#include <math.h>
+#include <pthread.h>
#if defined(__sun)
#include "solarisfixes.h"
* Check vmFindContiguousPages() to know more about this magic numbers. */
#define REDIS_VM_MAX_NEAR_PAGES 65536
#define REDIS_VM_MAX_RANDOM_JUMP 4096
+#define REDIS_VM_MAX_THREADS 32
/* Client flags */
#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
#define REDIS_MULTI 16 /* This client is in a MULTI context */
#define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
+#define REDIS_IO_WAIT 64 /* The client is waiting for Virutal Memory I/O */
/* Slave replication state - slave side */
#define REDIS_REPL_NONE 0 /* No active replication */
int blockingkeysnum; /* Number of blocking keys */
time_t blockingto; /* Blocking operation timeout. If UNIX current time
* is >= blockingto then the operation timed out. */
+ list *io_keys; /* Keys this client is waiting to be loaded from the
+ * swap file in order to continue. */
} redisClient;
struct saveparam {
off_t vm_near_pages; /* Number of pages allocated sequentially */
unsigned char *vm_bitmap; /* Bitmap of free/used pages */
time_t unixtime; /* Unix time sampled every second. */
+ /* Virtual memory I/O threads stuff */
+ pthread_t io_threads[REDIS_VM_MAX_THREADS];
+ /* An I/O thread process an element taken from the io_jobs queue and
+ * put the result of the operation in the io_done list. */
+ list *io_jobs; /* List of VM I/O jobs */
+ list *io_done; /* List of VM processed jobs */
+ list *io_clients; /* All the clients waiting for SWAP I/O operations */
+ pthread_mutex_t io_mutex; /* lock to access io_jobs and io_done */
+ int io_active_threads; /* Number of running I/O threads */
+ int vm_max_threads; /* Max number of I/O threads running at the same time */
/* Virtual memory stats */
unsigned long long vm_stats_used_pages;
unsigned long long vm_stats_swapped_objects;
static double R_Zero, R_PosInf, R_NegInf, R_Nan;
+/* VM threaded I/O request message */
+#define REDIS_IOREQ_LOAD 0
+#define REDIS_IOREQ_SWAP 1
+typedef struct ioreq {
+ int type; /* Request type, REDIS_IOREQ_* */
+ int dbid; /* Redis database ID */
+ robj *key; /* This I/O request is about swapping this key */
+ robj *val; /* the value to swap for REDIS_IOREQ_SWAP, otherwise this
+ * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
+ off_t page; /* Swap page where to read/write the object */
+} ioreq;
+
/*================================ Prototypes =============================== */
static void freeStringObject(robj *o);
server.vm_page_size = 256; /* 256 bytes per page */
server.vm_pages = 1024*1024*100; /* 104 millions of pages */
server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
+ server.vm_max_threads = 4;
resetServerSaveParams();
server.vm_page_size = strtoll(argv[1], NULL, 10);
} else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) {
server.vm_pages = strtoll(argv[1], NULL, 10);
+ } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) {
+ server.vm_max_threads = strtoll(argv[1], NULL, 10);
} else {
err = "Bad directive or wrong number of arguments"; goto loaderr;
}
listRelease(c->reply);
freeClientArgv(c);
close(c->fd);
+ /* Remove from the list of clients */
ln = listSearchKey(server.clients,c);
redisAssert(ln != NULL);
listDelNode(server.clients,ln);
+ /* Remove from the list of clients waiting for VM operations */
+ if (server.vm_enabled && listLength(c->io_keys)) {
+ ln = listSearchKey(server.io_clients,c);
+ if (ln) listDelNode(server.io_clients,ln);
+ listRelease(c->io_keys);
+ }
+ /* Other cleanup */
if (c->flags & REDIS_SLAVE) {
if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
close(c->repldbfd);
* would not be called at all, but after the execution of the first commands
* in the input buffer the client may be blocked, and the "goto again"
* will try to reiterate. The following line will make it return asap. */
- if (c->flags & REDIS_BLOCKED) return;
+ if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
if (c->bulklen == -1) {
/* Read the first line of the query */
char *p = strchr(c->querybuf,'\n');
c->authenticated = 0;
c->replstate = REDIS_REPL_NONE;
c->reply = listCreate();
- c->blockingkeys = NULL;
- c->blockingkeysnum = 0;
listSetFreeMethod(c->reply,decrRefCount);
listSetDupMethod(c->reply,dupClientReplyValue);
+ c->blockingkeys = NULL;
+ c->blockingkeysnum = 0;
+ c->io_keys = listCreate();
+ listSetFreeMethod(c->io_keys,decrRefCount);
if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c) == AE_ERR) {
freeClient(c);
/* Try to remove the swap file, so the OS will really delete it from the
* file system when Redis exists. */
unlink("/tmp/redisvm");
+
+ /* Initialize threaded I/O */
+ server.io_jobs = listCreate();
+ server.io_done = listCreate();
+ server.io_clients = listCreate();
+ pthread_mutex_init(&server.io_mutex,NULL);
+ server.io_active_threads = 0;
}
/* Mark the page as used */
# Set the max number of connected clients at the same time. By default there
# is no limit, and it's up to the number of file descriptors the Redis process
-# is able to open. The special value '0' means no limts.
+# is able to open. The special value '0' means no limits.
# Once the limit is reached Redis will close all the new connections sending
# an error 'max number of clients reached'.
#
#
# If you use a lot of small objects, use a page size of 64 or 32 bytes.
# If you use a lot of big objects, use a bigger page size.
-# If unsure, use the defualt :)
+# If unsure, use the default :)
vm-page-size 256
# Number of total memory pages in the swap file.
# The total swap size is vm-page-size * vm-pages
#
# With the default of 256-bytes memory pages and 104857600 pages Redis will
-# use a 25 GB swap file, that will use rougly 13 MB of RAM for the page table.
+# use a 25 GB swap file, that will use roughly 13 MB of RAM for the page table.
#
# It's better to use the smallest acceptable value for your application,
# but the default is large in order to work in most conditions.
vm-pages 104857600
+# Max number of VM I/O threads running at the same time.
+# This threads are used to read/write data from/to swap file, since they
+# also encode and decode objects from disk to memory or the reverse, a bigger
+# number of threads can help with big objects even if they can't help with
+# I/O itself as the physical device may not be able to couple with many
+# reads/writes operations at the same time.
+vm-max-threads 4
+
############################### ADVANCED CONFIG ###############################
# Glue small output buffers together in order to send small replies in a