]> git.saurik.com Git - redis.git/commitdiff
diskstore BGSAVE should work now
authorantirez <antirez@gmail.com>
Fri, 7 Jan 2011 22:41:00 +0000 (23:41 +0100)
committerantirez <antirez@gmail.com>
Fri, 7 Jan 2011 22:41:00 +0000 (23:41 +0100)
TODO
src/diskstore.c
src/dscache.c
src/redis.c
src/redis.h

diff --git a/TODO b/TODO
index d7ba4a2f38da21ae99834bfd61a1385ef070688e..e46bb4a59d3b259217fd3f08400762769504c0c3 100644 (file)
--- a/TODO
+++ b/TODO
@@ -20,6 +20,7 @@ DISKSTORE TODO
 * Fix RANDOMKEY to really do something interesting
 * Fix DBSIZE to really do something interesting
 * Add a DEBUG command to check if an entry is or not in memory currently
 * Fix RANDOMKEY to really do something interesting
 * Fix DBSIZE to really do something interesting
 * Add a DEBUG command to check if an entry is or not in memory currently
+* Prevent io jobs from running while there is a BGSAVE thread.
 
 REPLICATION
 ===========
 
 REPLICATION
 ===========
index d5abf0e660205cd38c878600f3b7d682490c7353..447f10b36c4369a05e5a58037df3faed219d52a9 100644 (file)
@@ -295,6 +295,17 @@ int dsExists(redisDb *db, robj *key) {
     return access(buf,R_OK) == 0;
 }
 
     return access(buf,R_OK) == 0;
 }
 
+int dsGetDbidFromFilename(char *path) {
+    char id[64];
+    char *p = strchr(path,'_');
+    int len = (p - path);
+
+    redisAssert(p != NULL && len < 64);
+    memcpy(id,path,len);
+    id[len] = '\0';
+    return atoi(id);
+}
+
 void dsFlushOneDir(char *path, int dbid) {
     DIR *dir;
     struct dirent *dp, de;
 void dsFlushOneDir(char *path, int dbid) {
     DIR *dir;
     struct dirent *dp, de;
@@ -313,17 +324,8 @@ void dsFlushOneDir(char *path, int dbid) {
         if (dp->d_name[0] == '.') continue;
 
         /* Check if we need to remove this entry accordingly to the
         if (dp->d_name[0] == '.') continue;
 
         /* Check if we need to remove this entry accordingly to the
-         * DB number */
-        if (dbid != -1) {
-            char id[64];
-            char *p = strchr(dp->d_name,'_');
-            int len = (p - dp->d_name);
-
-            redisAssert(p != NULL && len < 64);
-            memcpy(id,dp->d_name,len);
-            id[len] = '\0';
-            if (atoi(id) != dbid) continue; /* skip this file */
-        }
+         * DB number. */
+        if (dbid != -1 && dsGetDbidFromFilename(dp->d_name)) continue;
         
         /* Finally unlink the file */
         snprintf(buf,1024,"%s/%s",path,dp->d_name);
         
         /* Finally unlink the file */
         snprintf(buf,1024,"%s/%s",path,dp->d_name);
@@ -357,14 +359,13 @@ void dsRdbSaveSetState(int state) {
 
 void *dsRdbSave_thread(void *arg) {
     char tmpfile[256], *filename = (char*)arg;
 
 void *dsRdbSave_thread(void *arg) {
     char tmpfile[256], *filename = (char*)arg;
-    int j, i;
-    time_t now = time(NULL);
+    struct dirent *dp, de;
+    int j, i, last_dbid = -1;
     FILE *fp;
 
     /* Change state to ACTIVE, to signal there is a saving thead working. */
     FILE *fp;
 
     /* Change state to ACTIVE, to signal there is a saving thead working. */
-    pthread_mutex_lock(&server.bgsavethread_mutex);
-    server.bgsavethread_state = REDIS_BGSAVE_THREAD_ACTIVE;
-    pthread_mutex_unlock(&server.bgsavethread_mutex);
+    redisLog(REDIS_NOTICE,"Diskstore BGSAVE thread started");
+    dsRdbSaveSetState(REDIS_BGSAVE_THREAD_ACTIVE);
 
     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
     fp = fopen(tmpfile,"w");
 
     snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
     fp = fopen(tmpfile,"w");
@@ -378,18 +379,71 @@ void *dsRdbSave_thread(void *arg) {
 
     sleep(5);
 
 
     sleep(5);
 
-#if 0
     /* Scan all diskstore dirs looking for keys */
     for (j = 0; j < 256; j++) {
         for (i = 0; i < 256; i++) {
     /* Scan all diskstore dirs looking for keys */
     for (j = 0; j < 256; j++) {
         for (i = 0; i < 256; i++) {
-            snprintf(buf,1024,"%s/%02x/%02x",server.ds_path,j,i);
-
-            /* Write the SELECT DB opcode */
-            if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
-            if (rdbSaveLen(fp,j) == -1) goto werr;
+            DIR *dir;
+            char buf[1024];
+
+            /* For every directory, collect all the keys */
+            snprintf(buf,sizeof(buf),"%s/%02x/%02x",server.ds_path,j,i);
+            if ((dir = opendir(buf)) == NULL) {
+                redisLog(REDIS_WARNING,"Disk store can't open dir %s: %s",
+                    buf, strerror(errno));
+                goto werr;
+            }
+
+            while(1) {
+                char buf[1024];
+                int dbid;
+                FILE *entryfp;
+
+                readdir_r(dir,&de,&dp);
+                if (dp == NULL) break;
+                if (dp->d_name[0] == '.') continue;
+
+                /* Emit the SELECT DB opcode if needed. */
+                dbid = dsGetDbidFromFilename(dp->d_name);
+                if (dbid != last_dbid) {
+                    last_dbid = dbid;
+                    if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
+                    if (rdbSaveLen(fp,dbid) == -1) goto werr;
+                }
+
+                /* Let's copy this file into the target .rdb */
+                snprintf(buf,sizeof(buf),"%s/%02x/%02x/%s",
+                    server.ds_path,j,i,dp->d_name);
+                if ((entryfp = fopen(buf,"r")) == NULL) {
+                    redisLog(REDIS_WARNING,"Can't open %s: %s",
+                        buf,strerror(errno));
+                    closedir(dir);
+                    goto werr;
+                }
+                while(1) {
+                    int nread = fread(buf,1,sizeof(buf),entryfp);
+
+                    if (nread == 0) {
+                        if (ferror(entryfp)) {
+                            redisLog(REDIS_WARNING,"Error reading from file entry while performing BGSAVE for diskstore: %s", strerror(errno));
+                            closedir(dir);
+                            goto werr;
+                        } else {
+                            break;
+                        }
+                    }
+                    if (fwrite(buf,1,nread,fp) != (unsigned)nread) {
+                        closedir(dir);
+                        goto werr;
+                    }
+                }
+                fclose(entryfp);
+            }
+            closedir(dir);
         }
     }
         }
     }
-#endif
+    
+    /* Output the end of file opcode */
+    if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
 
     /* Make sure data will not remain on the OS's output buffers */
     fflush(fp);
 
     /* Make sure data will not remain on the OS's output buffers */
     fflush(fp);
@@ -405,7 +459,7 @@ void *dsRdbSave_thread(void *arg) {
         dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR);
         return NULL;
     }
         dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR);
         return NULL;
     }
-    redisLog(REDIS_NOTICE,"DB saved on disk");
+    redisLog(REDIS_NOTICE,"DB saved on disk by diskstore thread");
     dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_OK);
     return NULL;
 
     dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_OK);
     return NULL;
 
index 1c419c6a77fb582ae4586286f9e212c06d9eaefc..de9449710b0d1ec6102a4a105158c532d90288ba 100644 (file)
@@ -732,6 +732,9 @@ int cacheScheduleIOPushJobs(int flags) {
     listNode *ln;
     int jobs, topush = 0, pushed = 0;
 
     listNode *ln;
     int jobs, topush = 0, pushed = 0;
 
+    /* Don't push new jobs if there is a threaded BGSAVE in progress. */
+    if (server.bgsavethread != (pthread_t) -1) return 0;
+
     /* Sync stuff on disk, but only if we have less
      * than MAX_IO_JOBS_QUEUE IO jobs. */
     lockThreadedIO();
     /* Sync stuff on disk, but only if we have less
      * than MAX_IO_JOBS_QUEUE IO jobs. */
     lockThreadedIO();
index f800018050ab03095f3cfef23659258760ca3489..dc69ea28fdd76c6f120852621d25f2f167342800 100644 (file)
@@ -583,7 +583,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
     if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
         closeTimedoutClients();
 
     if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
         closeTimedoutClients();
 
-    /* Check if a background saving or AOF rewrite in progress terminated */
+    /* Check if a background saving or AOF rewrite in progress terminated. */
     if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
         int statloc;
         pid_t pid;
     if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
         int statloc;
         pid_t pid;
@@ -601,6 +601,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
             }
             updateDictResizePolicy();
         }
             }
             updateDictResizePolicy();
         }
+    } else if (server.bgsavethread != (pthread_t) -1) {
         if (server.bgsavethread != (pthread_t) -1) {
             int state;
 
         if (server.bgsavethread != (pthread_t) -1) {
             int state;
 
index a117c1fc9d84c8cf0ffc91c17408c61f18065e22..6ff62916bd060ba0fced7e9eaec6b58e591d57c7 100644 (file)
@@ -761,6 +761,8 @@ int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, time_t now)
 int rdbLoadType(FILE *fp);
 time_t rdbLoadTime(FILE *fp);
 robj *rdbLoadStringObject(FILE *fp);
 int rdbLoadType(FILE *fp);
 time_t rdbLoadTime(FILE *fp);
 robj *rdbLoadStringObject(FILE *fp);
+int rdbSaveType(FILE *fp, unsigned char type);
+int rdbSaveLen(FILE *fp, uint32_t len);
 
 /* AOF persistence */
 void flushAppendOnlyFile(void);
 
 /* AOF persistence */
 void flushAppendOnlyFile(void);