]> git.saurik.com Git - redis.git/commitdiff
non-blocking VM data structures, just a start
authorantirez <antirez@gmail.com>
Sat, 9 Jan 2010 17:46:52 +0000 (12:46 -0500)
committerantirez <antirez@gmail.com>
Sat, 9 Jan 2010 17:46:52 +0000 (12:46 -0500)
Makefile
redis.c
redis.conf

index c036e10cd5d180419185191da54432ce6cfb96d8..50b328874dd69024f006365bbbace244d4d2fe30 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -6,9 +6,11 @@ uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not')
 ifeq ($(uname_S),SunOS)
   CFLAGS?= -std=c99 -pedantic -O2 -Wall -W -D__EXTENSIONS__ -D_XPG6
   CCLINK?= -ldl -lnsl -lsocket -lm
 ifeq ($(uname_S),SunOS)
   CFLAGS?= -std=c99 -pedantic -O2 -Wall -W -D__EXTENSIONS__ -D_XPG6
   CCLINK?= -ldl -lnsl -lsocket -lm
+  PTLINK?= -lpthread
 else
   CFLAGS?= -std=c99 -pedantic -O2 -Wall -W $(ARCH) $(PROF)
   CCLINK?= -lm
 else
   CFLAGS?= -std=c99 -pedantic -O2 -Wall -W $(ARCH) $(PROF)
   CCLINK?= -lm
+  PTLINK?= -lpthread
 endif
 CCOPT= $(CFLAGS) $(CCLINK) $(ARCH) $(PROF)
 DEBUG?= -g -rdynamic -ggdb 
 endif
 CCOPT= $(CFLAGS) $(CCLINK) $(ARCH) $(PROF)
 DEBUG?= -g -rdynamic -ggdb 
@@ -40,7 +42,7 @@ sds.o: sds.c sds.h zmalloc.h
 zmalloc.o: zmalloc.c config.h
 
 redis-server: $(OBJ)
 zmalloc.o: zmalloc.c config.h
 
 redis-server: $(OBJ)
-       $(CC) -o $(PRGNAME) $(CCOPT) $(DEBUG) $(OBJ)
+       $(CC) -o $(PRGNAME) $(CCOPT) $(PTLINK) $(DEBUG) $(OBJ)
        @echo ""
        @echo "Hint: To run the test-redis.tcl script is a good idea."
        @echo "Launch the redis server with ./redis-server, then in another"
        @echo ""
        @echo "Hint: To run the test-redis.tcl script is a good idea."
        @echo "Launch the redis server with ./redis-server, then in another"
diff --git a/redis.c b/redis.c
index 35e11fa5461649690cc03e0958be4ee0bb84c382..c77e8d1fb31847898517cbf519e245d6789a5cd3 100644 (file)
--- a/redis.c
+++ b/redis.c
@@ -59,6 +59,7 @@
 #include <sys/uio.h>
 #include <limits.h>
 #include <math.h>
 #include <sys/uio.h>
 #include <limits.h>
 #include <math.h>
+#include <pthread.h>
 
 #if defined(__sun)
 #include "solarisfixes.h"
 
 #if defined(__sun)
 #include "solarisfixes.h"
  * Check vmFindContiguousPages() to know more about this magic numbers. */
 #define REDIS_VM_MAX_NEAR_PAGES 65536
 #define REDIS_VM_MAX_RANDOM_JUMP 4096
  * Check vmFindContiguousPages() to know more about this magic numbers. */
 #define REDIS_VM_MAX_NEAR_PAGES 65536
 #define REDIS_VM_MAX_RANDOM_JUMP 4096
+#define REDIS_VM_MAX_THREADS 32
 
 /* Client flags */
 #define REDIS_CLOSE 1       /* This client connection should be closed ASAP */
 
 /* Client flags */
 #define REDIS_CLOSE 1       /* This client connection should be closed ASAP */
 #define REDIS_MONITOR 8      /* This client is a slave monitor, see MONITOR */
 #define REDIS_MULTI 16      /* This client is in a MULTI context */
 #define REDIS_BLOCKED 32    /* The client is waiting in a blocking operation */
 #define REDIS_MONITOR 8      /* This client is a slave monitor, see MONITOR */
 #define REDIS_MULTI 16      /* This client is in a MULTI context */
 #define REDIS_BLOCKED 32    /* The client is waiting in a blocking operation */
+#define REDIS_IO_WAIT 64    /* The client is waiting for Virutal Memory I/O */
 
 /* Slave replication state - slave side */
 #define REDIS_REPL_NONE 0   /* No active replication */
 
 /* Slave replication state - slave side */
 #define REDIS_REPL_NONE 0   /* No active replication */
@@ -303,6 +306,8 @@ typedef struct redisClient {
     int blockingkeysnum;    /* Number of blocking keys */
     time_t blockingto;      /* Blocking operation timeout. If UNIX current time
                              * is >= blockingto then the operation timed out. */
     int blockingkeysnum;    /* Number of blocking keys */
     time_t blockingto;      /* Blocking operation timeout. If UNIX current time
                              * is >= blockingto then the operation timed out. */
+    list *io_keys;          /* Keys this client is waiting to be loaded from the
+                             * swap file in order to continue. */
 } redisClient;
 
 struct saveparam {
 } redisClient;
 
 struct saveparam {
@@ -381,6 +386,16 @@ struct redisServer {
     off_t vm_near_pages; /* Number of pages allocated sequentially */
     unsigned char *vm_bitmap; /* Bitmap of free/used pages */
     time_t unixtime;    /* Unix time sampled every second. */
     off_t vm_near_pages; /* Number of pages allocated sequentially */
     unsigned char *vm_bitmap; /* Bitmap of free/used pages */
     time_t unixtime;    /* Unix time sampled every second. */
+    /* Virtual memory I/O threads stuff */
+    pthread_t io_threads[REDIS_VM_MAX_THREADS];
+    /* An I/O thread process an element taken from the io_jobs queue and
+     * put the result of the operation in the io_done list. */
+    list *io_jobs; /* List of VM I/O jobs */
+    list *io_done; /* List of VM processed jobs */
+    list *io_clients; /* All the clients waiting for SWAP I/O operations */
+    pthread_mutex_t io_mutex; /* lock to access io_jobs and io_done */
+    int io_active_threads; /* Number of running I/O threads */
+    int vm_max_threads; /* Max number of I/O threads running at the same time */
     /* Virtual memory stats */
     unsigned long long vm_stats_used_pages;
     unsigned long long vm_stats_swapped_objects;
     /* Virtual memory stats */
     unsigned long long vm_stats_used_pages;
     unsigned long long vm_stats_swapped_objects;
@@ -451,6 +466,18 @@ struct sharedObjectsStruct {
 
 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
 
 
 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
 
+/* VM threaded I/O request message */
+#define REDIS_IOREQ_LOAD 0
+#define REDIS_IOREQ_SWAP 1
+typedef struct ioreq {
+    int type;   /* Request type, REDIS_IOREQ_* */
+    int dbid;   /* Redis database ID */
+    robj *key;  /* This I/O request is about swapping this key */
+    robj *val;  /* the value to swap for REDIS_IOREQ_SWAP, otherwise this
+                 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
+    off_t page; /* Swap page where to read/write the object */
+} ioreq;
+
 /*================================ Prototypes =============================== */
 
 static void freeStringObject(robj *o);
 /*================================ Prototypes =============================== */
 
 static void freeStringObject(robj *o);
@@ -1292,6 +1319,7 @@ static void initServerConfig() {
     server.vm_page_size = 256;          /* 256 bytes per page */
     server.vm_pages = 1024*1024*100;    /* 104 millions of pages */
     server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
     server.vm_page_size = 256;          /* 256 bytes per page */
     server.vm_pages = 1024*1024*100;    /* 104 millions of pages */
     server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
+    server.vm_max_threads = 4;
 
     resetServerSaveParams();
 
 
     resetServerSaveParams();
 
@@ -1539,6 +1567,8 @@ static void loadServerConfig(char *filename) {
             server.vm_page_size = strtoll(argv[1], NULL, 10);
         } else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) {
             server.vm_pages = strtoll(argv[1], NULL, 10);
             server.vm_page_size = strtoll(argv[1], NULL, 10);
         } else if (!strcasecmp(argv[0],"vm-pages") && argc == 2) {
             server.vm_pages = strtoll(argv[1], NULL, 10);
+        } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) {
+            server.vm_max_threads = strtoll(argv[1], NULL, 10);
         } else {
             err = "Bad directive or wrong number of arguments"; goto loaderr;
         }
         } else {
             err = "Bad directive or wrong number of arguments"; goto loaderr;
         }
@@ -1587,9 +1617,17 @@ static void freeClient(redisClient *c) {
     listRelease(c->reply);
     freeClientArgv(c);
     close(c->fd);
     listRelease(c->reply);
     freeClientArgv(c);
     close(c->fd);
+    /* Remove from the list of clients */
     ln = listSearchKey(server.clients,c);
     redisAssert(ln != NULL);
     listDelNode(server.clients,ln);
     ln = listSearchKey(server.clients,c);
     redisAssert(ln != NULL);
     listDelNode(server.clients,ln);
+    /* Remove from the list of clients waiting for VM operations */
+    if (server.vm_enabled && listLength(c->io_keys)) {
+        ln = listSearchKey(server.io_clients,c);
+        if (ln) listDelNode(server.io_clients,ln);
+        listRelease(c->io_keys);
+    }
+    /* Other cleanup */
     if (c->flags & REDIS_SLAVE) {
         if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
             close(c->repldbfd);
     if (c->flags & REDIS_SLAVE) {
         if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
             close(c->repldbfd);
@@ -2053,7 +2091,7 @@ again:
      * would not be called at all, but after the execution of the first commands
      * in the input buffer the client may be blocked, and the "goto again"
      * will try to reiterate. The following line will make it return asap. */
      * would not be called at all, but after the execution of the first commands
      * in the input buffer the client may be blocked, and the "goto again"
      * will try to reiterate. The following line will make it return asap. */
-    if (c->flags & REDIS_BLOCKED) return;
+    if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
     if (c->bulklen == -1) {
         /* Read the first line of the query */
         char *p = strchr(c->querybuf,'\n');
     if (c->bulklen == -1) {
         /* Read the first line of the query */
         char *p = strchr(c->querybuf,'\n');
@@ -2190,10 +2228,12 @@ static redisClient *createClient(int fd) {
     c->authenticated = 0;
     c->replstate = REDIS_REPL_NONE;
     c->reply = listCreate();
     c->authenticated = 0;
     c->replstate = REDIS_REPL_NONE;
     c->reply = listCreate();
-    c->blockingkeys = NULL;
-    c->blockingkeysnum = 0;
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
     listSetFreeMethod(c->reply,decrRefCount);
     listSetDupMethod(c->reply,dupClientReplyValue);
+    c->blockingkeys = NULL;
+    c->blockingkeysnum = 0;
+    c->io_keys = listCreate();
+    listSetFreeMethod(c->io_keys,decrRefCount);
     if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
         readQueryFromClient, c) == AE_ERR) {
         freeClient(c);
     if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
         readQueryFromClient, c) == AE_ERR) {
         freeClient(c);
@@ -6823,6 +6863,13 @@ static void vmInit(void) {
     /* Try to remove the swap file, so the OS will really delete it from the
      * file system when Redis exists. */
     unlink("/tmp/redisvm");
     /* Try to remove the swap file, so the OS will really delete it from the
      * file system when Redis exists. */
     unlink("/tmp/redisvm");
+
+    /* Initialize threaded I/O */
+    server.io_jobs = listCreate();
+    server.io_done = listCreate();
+    server.io_clients = listCreate();
+    pthread_mutex_init(&server.io_mutex,NULL);
+    server.io_active_threads = 0;
 }
 
 /* Mark the page as used */
 }
 
 /* Mark the page as used */
index ac90eb4ca409983978c3c519195f0381cffc4274..b541cbe203351b8dd4093a292dd2f309dc356a2f 100644 (file)
@@ -101,7 +101,7 @@ dir ./
 
 # Set the max number of connected clients at the same time. By default there
 # is no limit, and it's up to the number of file descriptors the Redis process
 
 # Set the max number of connected clients at the same time. By default there
 # is no limit, and it's up to the number of file descriptors the Redis process
-# is able to open. The special value '0' means no limts.
+# is able to open. The special value '0' means no limits.
 # Once the limit is reached Redis will close all the new connections sending
 # an error 'max number of clients reached'.
 #
 # Once the limit is reached Redis will close all the new connections sending
 # an error 'max number of clients reached'.
 #
@@ -200,7 +200,7 @@ vm-max-memory 10000000
 #
 # If you use a lot of small objects, use a page size of 64 or 32 bytes.
 # If you use a lot of big objects, use a bigger page size.
 #
 # If you use a lot of small objects, use a page size of 64 or 32 bytes.
 # If you use a lot of big objects, use a bigger page size.
-# If unsure, use the defualt :)
+# If unsure, use the default :)
 vm-page-size 256
 
 # Number of total memory pages in the swap file.
 vm-page-size 256
 
 # Number of total memory pages in the swap file.
@@ -210,12 +210,20 @@ vm-page-size 256
 # The total swap size is vm-page-size * vm-pages
 #
 # With the default of 256-bytes memory pages and 104857600 pages Redis will
 # The total swap size is vm-page-size * vm-pages
 #
 # With the default of 256-bytes memory pages and 104857600 pages Redis will
-# use a 25 GB swap file, that will use rougly 13 MB of RAM for the page table.
+# use a 25 GB swap file, that will use roughly 13 MB of RAM for the page table.
 #
 # It's better to use the smallest acceptable value for your application,
 # but the default is large in order to work in most conditions.
 vm-pages 104857600
 
 #
 # It's better to use the smallest acceptable value for your application,
 # but the default is large in order to work in most conditions.
 vm-pages 104857600
 
+# Max number of VM I/O threads running at the same time.
+# This threads are used to read/write data from/to swap file, since they
+# also encode and decode objects from disk to memory or the reverse, a bigger
+# number of threads can help with big objects even if they can't help with
+# I/O itself as the physical device may not be able to couple with many
+# reads/writes operations at the same time.
+vm-max-threads 4
+
 ############################### ADVANCED CONFIG ###############################
 
 # Glue small output buffers together in order to send small replies in a
 ############################### ADVANCED CONFIG ###############################
 
 # Glue small output buffers together in order to send small replies in a