]> git.saurik.com Git - redis.git/commitdiff
don't use small shared integer objects when disk store is enabled
authorantirez <antirez@gmail.com>
Thu, 30 Dec 2010 17:17:01 +0000 (18:17 +0100)
committerantirez <antirez@gmail.com>
Thu, 30 Dec 2010 17:17:01 +0000 (18:17 +0100)
src/dscache.c
src/networking.c
src/object.c
src/redis.h

index fc358c5290e57899ae59dfc8941c600de6fc540e..fe3f1c1dd20fbd25f5a777cb11a8ff6205ecf9f1 100644 (file)
@@ -87,6 +87,8 @@
  *
  * - If dsSet() fails on the write thread log the error and reschedule the
  *   key for flush.
+ *
+ * - Check why INCR will not update the LRU info for the object.
  */
 
 /* Virtual Memory is composed mainly of two subsystems:
@@ -133,6 +135,7 @@ void dsInit(void) {
     server.io_processed = listCreate();
     server.io_ready_clients = listCreate();
     pthread_mutex_init(&server.io_mutex,NULL);
+    pthread_cond_init(&server.io_condvar,NULL);
     server.io_active_threads = 0;
     if (pipe(pipefds) == -1) {
         redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting."
@@ -329,13 +332,14 @@ void *IOThreadEntryPoint(void *arg) {
     REDIS_NOTUSED(arg);
 
     pthread_detach(pthread_self());
+    lockThreadedIO();
     while(1) {
+        /* Wait for more work to do */
+        pthread_cond_wait(&server.io_condvar,&server.io_mutex);
         /* Get a new job to process */
-        lockThreadedIO();
         if (listLength(server.io_newjobs) == 0) {
-            /* No new jobs in queue, exit. */
+            /* No new jobs in queue, reiterate. */
             unlockThreadedIO();
-            sleep(1);
             continue;
         }
         ln = listFirst(server.io_newjobs);
@@ -345,6 +349,7 @@ void *IOThreadEntryPoint(void *arg) {
         listAddNodeTail(server.io_processing,j);
         ln = listLast(server.io_processing); /* We use ln later to remove it */
         unlockThreadedIO();
+
         redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
             (long) pthread_self(),
             (j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
@@ -367,15 +372,17 @@ void *IOThreadEntryPoint(void *arg) {
         /* Done: insert the job into the processed queue */
         redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
             (long) pthread_self(), (void*)j, (char*)j->key->ptr);
+
         lockThreadedIO();
         listDelNode(server.io_processing,ln);
         listAddNodeTail(server.io_processed,j);
-        unlockThreadedIO();
 
         /* Signal the main thread there is new stuff to process */
         redisAssert(write(server.io_ready_pipe_write,"x",1) == 1);
     }
-    return NULL; /* never reached */
+    /* never reached, but that's the full pattern... */
+    unlockThreadedIO();
+    return NULL;
 }
 
 void spawnIOThread(void) {
@@ -449,6 +456,7 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
 
     lockThreadedIO();
     queueIOJob(j);
+    pthread_cond_signal(&server.io_condvar);
     unlockThreadedIO();
 }
 
index 524c396c38b97529c45c014223cd57fe5d6a740f..0774f7ee174030404180103ecba6fe9b05d7d3f3 100644 (file)
@@ -167,7 +167,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) {
 
 void addReply(redisClient *c, robj *obj) {
     if (_installWriteEvent(c) != REDIS_OK) return;
-    redisAssert(!server.ds_enabled || obj->storage == REDIS_DS_MEMORY);
+    redisAssert(!server.ds_enabled || obj->storage != REDIS_DS_SAVING);
 
     /* This is an important place where we can avoid copy-on-write
      * when there is a saving child running, avoiding touching the
index f4c34dcfd8ab039d2f5801632b5dfff9bff38ae7..a132665e99ce6366e686ba7ca3182b4ba4538cd0 100644 (file)
@@ -32,6 +32,7 @@ robj *createStringObject(char *ptr, size_t len) {
 robj *createStringObjectFromLongLong(long long value) {
     robj *o;
     if (value >= 0 && value < REDIS_SHARED_INTEGERS &&
+        !server.ds_enabled &&
         pthread_equal(pthread_self(),server.mainthread)) {
         incrRefCount(shared.integers[value]);
         o = shared.integers[value];
@@ -214,7 +215,7 @@ robj *tryObjectEncoding(robj *o) {
      * Note that we also avoid using shared integers when maxmemory is used
      * because every object needs to have a private LRU field for the LRU
      * algorithm to work well. */
-    if (server.ds_enabled == 0 &&
+    if (!server.ds_enabled &&
         server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS &&
         pthread_equal(pthread_self(),server.mainthread))
     {
index 054ee930d5f88fba7a3e2911926529203c49889b..3a5a274b02558820486149d9e52eaba2f4e9fdf7 100644 (file)
@@ -459,7 +459,7 @@ struct redisServer {
     list *io_processed; /* List of VM I/O jobs already processed */
     list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */
     pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */
-    pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */
+    pthread_cond_t io_condvar; /* I/O threads conditional variable */
     pthread_attr_t io_threads_attr; /* attributes for threads creation */
     int io_active_threads; /* Number of running I/O threads */
     int vm_max_threads; /* Max number of I/O threads running at the same time */