]> git.saurik.com Git - redis.git/commitdiff
fixed two diskstore issues, a quasi-deadlock creating problems with I/O speed and...
authorantirez <antirez@gmail.com>
Fri, 11 Feb 2011 10:16:15 +0000 (11:16 +0100)
committerantirez <antirez@gmail.com>
Fri, 11 Feb 2011 10:16:15 +0000 (11:16 +0100)
src/dict.c
src/diskstore.c
src/dscache.c
src/rdb.c
src/redis.h

index 9be7fb168ed00b8b633e06a47b0b74c87e6a70fd..6b7010ba2efba11231be224f5fe0479d8ccb605f 100644 (file)
@@ -203,6 +203,7 @@ int dictRehash(dict *d, int n) {
 
         /* Note that rehashidx can't overflow as we are sure there are more
          * elements because ht[0].used != 0 */
+        assert(d->ht[0].size > (unsigned)d->rehashidx);
         while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
         de = d->ht[0].table[d->rehashidx];
         /* Move all the keys in this bucket from the old to the new hash HT */
index 49c8706a48528141eb48bf3bab6dbada6bb837f4..9e86364e9c2ddd0e9cd54d8092e8bca2bcdeb76a 100644 (file)
@@ -183,7 +183,7 @@ int dsKeyToPath(redisDb *db, char *buf, robj *key) {
     return (buf-origbuf)+41;
 }
 
-int dsSet(redisDb *db, robj *key, robj *val) {
+int dsSet(redisDb *db, robj *key, robj *val, time_t expire) {
     char buf[1024], buf2[1024];
     FILE *fp;
     int retval, len;
@@ -201,7 +201,7 @@ int dsSet(redisDb *db, robj *key, robj *val) {
             redisPanic("Unrecoverable diskstore error. Exiting.");
         }
     }
-    if ((retval = rdbSaveKeyValuePair(fp,db,key,val,time(NULL))) == -1)
+    if ((retval = rdbSaveKeyValuePair(fp,key,val,expire,time(NULL))) == -1)
         return REDIS_ERR;
     fclose(fp);
     if (retval == 0) {
index 8243e794827ae1f9160ee58d94dee5396b71e503..aeeae40a107ca4a3a7858d6860eead989c3c1afc 100644 (file)
@@ -418,10 +418,12 @@ void *IOThreadEntryPoint(void *arg) {
         /* Get a new job to process */
         if (listLength(server.io_newjobs) == 0) {
             /* Wait for more work to do */
+            redisLog(REDIS_DEBUG,"[T] wait for signal");
             pthread_cond_wait(&server.io_condvar,&server.io_mutex);
+            redisLog(REDIS_DEBUG,"[T] signal received");
             continue;
         }
-        redisLog(REDIS_DEBUG,"%ld IO jobs to process",
+        redisLog(REDIS_DEBUG,"[T] %ld IO jobs to process",
             listLength(server.io_newjobs));
         ln = listFirst(server.io_newjobs);
         j = ln->value;
@@ -431,7 +433,7 @@ void *IOThreadEntryPoint(void *arg) {
         ln = listLast(server.io_processing); /* We use ln later to remove it */
         unlockThreadedIO();
 
-        redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
+        redisLog(REDIS_DEBUG,"[T] %ld: new job type %s: %p about key '%s'",
             (long) pthread_self(),
             (j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
             (void*)j, (char*)j->key->ptr);
@@ -444,17 +446,19 @@ void *IOThreadEntryPoint(void *arg) {
             if (j->val) j->expire = expire;
         } else if (j->type == REDIS_IOJOB_SAVE) {
             if (j->val) {
-                dsSet(j->db,j->key,j->val);
+                dsSet(j->db,j->key,j->val,j->expire);
             } else {
                 dsDel(j->db,j->key);
             }
         }
 
         /* Done: insert the job into the processed queue */
-        redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
+        redisLog(REDIS_DEBUG,"[T] %ld completed the job: %p (key %s)",
             (long) pthread_self(), (void*)j, (char*)j->key->ptr);
 
+        redisLog(REDIS_DEBUG,"[T] lock IO");
         lockThreadedIO();
+        redisLog(REDIS_DEBUG,"[T] IO locked");
         listDelNode(server.io_processing,ln);
         listAddNodeTail(server.io_processed,j);
 
@@ -501,30 +505,39 @@ int processActiveIOJobs(int max) {
     while(max == -1 || max > 0) {
         int io_processed_len;
 
+        redisLog(REDIS_DEBUG,"[P] lock IO");
         lockThreadedIO();
+        redisLog(REDIS_DEBUG,"Waiting IO jobs processing: new:%d proessing:%d processed:%d",listLength(server.io_newjobs),listLength(server.io_processing),listLength(server.io_processed));
+
         if (listLength(server.io_newjobs) == 0 &&
             listLength(server.io_processing) == 0)
         {
             /* There is nothing more to process */
+            redisLog(REDIS_DEBUG,"[P] Nothing to process, unlock IO, return");
             unlockThreadedIO();
             break;
         }
 
-#if 0
+#if 1
         /* If there are new jobs we need to signal the thread to
          * process the next one. FIXME: drop this if useless. */
-        redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d",
+        redisLog(REDIS_DEBUG,"[P] waitEmptyIOJobsQueue: new %d, processing %d, processed %d",
             listLength(server.io_newjobs),
-            listLength(server.io_processing));
+            listLength(server.io_processing),
+            listLength(server.io_processed));
 
         if (listLength(server.io_newjobs)) {
+            redisLog(REDIS_DEBUG,"[P] There are new jobs, signal");
             pthread_cond_signal(&server.io_condvar);
         }
 #endif
 
         /* Check if we can process some finished job */
         io_processed_len = listLength(server.io_processed);
+        redisLog(REDIS_DEBUG,"[P] Unblock IO");
         unlockThreadedIO();
+        redisLog(REDIS_DEBUG,"[P] Wait");
+        usleep(10000);
         if (io_processed_len) {
             vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,
                                                         (void*)0xdeadbeef,0);
@@ -590,7 +603,7 @@ void cacheForcePointInTime(void) {
     processAllPendingIOJobs();
 }
 
-void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
+void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val, time_t expire) {
     iojob *j;
 
     j = zmalloc(sizeof(*j));
@@ -600,6 +613,7 @@ void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
     incrRefCount(key);
     j->val = val;
     if (val) incrRefCount(val);
+    j->expire = expire;
 
     lockThreadedIO();
     queueIOJob(j);
@@ -780,20 +794,23 @@ int cacheScheduleIOPushJobs(int flags) {
             op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr);
 
         if (op->type == REDIS_IO_LOAD) {
-            cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL);
+            cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL,0);
         } else {
+            time_t expire = -1;
+
             /* Lookup the key, in order to put the current value in the IO
              * Job. Otherwise if the key does not exists we schedule a disk
              * store delete operation, setting the value to NULL. */
             de = dictFind(op->db->dict,op->key->ptr);
             if (de) {
                 val = dictGetEntryVal(de);
+                expire = getExpire(op->db,op->key);
             } else {
                 /* Setting the value to NULL tells the IO thread to delete
                  * the key on disk. */
                 val = NULL;
             }
-            cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val);
+            cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val,expire);
         }
         /* Mark the operation as in progress. */
         cacheScheduleIODelFlag(op->db,op->key,op->type);
index 83fe81e503146c231083aa3a388b62c807a9a6aa..02317fda1d42718fe1a320d4d67175e7fc6863f3 100644 (file)
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -399,13 +399,9 @@ off_t rdbSavedObjectLen(robj *o) {
  * On error -1 is returned.
  * On success if the key was actaully saved 1 is returned, otherwise 0
  * is returned (the key was already expired). */
-int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val,
-                        time_t now)
+int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val,
+                        time_t expiretime, time_t now)
 {
-    time_t expiretime;
-    
-    expiretime = getExpire(db,key);
-
     /* Save the expire time */
     if (expiretime != -1) {
         /* If this key is already expired skip it */
@@ -460,9 +456,11 @@ int rdbSave(char *filename) {
         while((de = dictNext(di)) != NULL) {
             sds keystr = dictGetEntryKey(de);
             robj key, *o = dictGetEntryVal(de);
+            time_t expire;
             
             initStaticStringObject(key,keystr);
-            if (rdbSaveKeyValuePair(fp,db,&key,o,now) == -1) goto werr;
+            expire = getExpire(db,&key);
+            if (rdbSaveKeyValuePair(fp,&key,o,expire,now) == -1) goto werr;
         }
         dictReleaseIterator(di);
     }
index 5ae9cc1c14e009125220de47674ec2afdf811118..6d49243dbba0568765591a6e58eaa78557c1948e 100644 (file)
@@ -764,7 +764,7 @@ off_t rdbSavedObjectLen(robj *o);
 off_t rdbSavedObjectPages(robj *o);
 robj *rdbLoadObject(int type, FILE *fp);
 void backgroundSaveDoneHandler(int exitcode, int bysignal);
-int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, time_t now);
+int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, time_t expireitme, time_t now);
 int rdbLoadType(FILE *fp);
 time_t rdbLoadTime(FILE *fp);
 robj *rdbLoadStringObject(FILE *fp);
@@ -805,7 +805,7 @@ void resetCommandTableStats(void);
 /* Disk store */
 int dsOpen(void);
 int dsClose(void);
-int dsSet(redisDb *db, robj *key, robj *val);
+int dsSet(redisDb *db, robj *key, robj *val, time_t expire);
 robj *dsGet(redisDb *db, robj *key, time_t *expire);
 int dsDel(redisDb *db, robj *key);
 int dsExists(redisDb *db, robj *key);