X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/120b9ba8f82699f1987be17ccde597bcfa9fae46..9ad853ccde374ed371efee3b1654c6ebb39e06fa:/src/diskstore.c?ds=inline diff --git a/src/diskstore.c b/src/diskstore.c index 711dd693..9e86364e 100644 --- a/src/diskstore.c +++ b/src/diskstore.c @@ -141,12 +141,13 @@ int dsClose(void) { } /* Convert key into full path for this object. Dirty but hopefully - * is fast enough. */ -void dsKeyToPath(redisDb *db, char *buf, robj *key) { + * is fast enough. Returns the length of the returned path. */ +int dsKeyToPath(redisDb *db, char *buf, robj *key) { SHA1_CTX ctx; unsigned char hash[20]; char hex[40], digits[] = "0123456789abcdef"; int j, l; + char *origbuf = buf; SHA1Init(&ctx); SHA1Update(&ctx,key->ptr,sdslen(key->ptr)); @@ -179,19 +180,42 @@ void dsKeyToPath(redisDb *db, char *buf, robj *key) { buf[0] = '_'; memcpy(buf+1,hex,40); buf[41] = '\0'; + return (buf-origbuf)+41; } -int dsSet(redisDb *db, robj *key, robj *val) { - char buf[1024]; +int dsSet(redisDb *db, robj *key, robj *val, time_t expire) { + char buf[1024], buf2[1024]; FILE *fp; - int retval; - - dsKeyToPath(db,buf,key); - fp = fopen(buf,"w"); - if ((retval = rdbSaveKeyValuePair(fp,db,key,val,time(NULL))) == -1) + int retval, len; + + len = dsKeyToPath(db,buf,key); + memcpy(buf2,buf,len); + snprintf(buf2+len,sizeof(buf2)-len,"-%ld-%ld",(long)time(NULL),(long)val); + while ((fp = fopen(buf2,"w")) == NULL) { + if (errno == ENOSPC) { + redisLog(REDIS_WARNING,"Diskstore: No space left on device. Please make room and wait 30 seconds for Redis to continue."); + sleep(30); + } else { + redisLog(REDIS_WARNING,"diskstore error opening %s: %s", + buf2, strerror(errno)); + redisPanic("Unrecoverable diskstore error. Exiting."); + } + } + if ((retval = rdbSaveKeyValuePair(fp,key,val,expire,time(NULL))) == -1) return REDIS_ERR; fclose(fp); - if (retval == 0) unlink(buf); /* Expired key. Unlink failing not critical */ + if (retval == 0) { + /* Expired key. Unlink failing not critical */ + unlink(buf); + unlink(buf2); + } else { + /* Use rename for atomic updadte of value */ + if (rename(buf2,buf) == -1) { + redisLog(REDIS_WARNING,"rename(2) returned an error: %s", + strerror(errno)); + redisPanic("Unrecoverable diskstore error. Exiting."); + } + } return REDIS_OK; } @@ -271,6 +295,17 @@ int dsExists(redisDb *db, robj *key) { return access(buf,R_OK) == 0; } +int dsGetDbidFromFilename(char *path) { + char id[64]; + char *p = strchr(path,'_'); + int len = (p - path); + + redisAssert(p != NULL && len < 64); + memcpy(id,path,len); + id[len] = '\0'; + return atoi(id); +} + void dsFlushOneDir(char *path, int dbid) { DIR *dir; struct dirent *dp, de; @@ -282,25 +317,21 @@ void dsFlushOneDir(char *path, int dbid) { redisPanic("Unrecoverable Disk store errore. Existing."); } while(1) { + char buf[1024]; + readdir_r(dir,&de,&dp); if (dp == NULL) break; if (dp->d_name[0] == '.') continue; /* Check if we need to remove this entry accordingly to the - * DB number */ - if (dbid != -1) { - char id[64]; - char *p = strchr(dp->d_name,'_'); - int len = (p - dp->d_name); - - redisAssert(p != NULL && len < 64); - memcpy(id,dp->d_name,len); - id[len] = '\0'; - if (atoi(id) != dbid) continue; /* skip this file */ - } - if (unlink(path) == -1) { + * DB number. */ + if (dbid != -1 && dsGetDbidFromFilename(dp->d_name)) continue; + + /* Finally unlink the file */ + snprintf(buf,1024,"%s/%s",path,dp->d_name); + if (unlink(buf) == -1) { redisLog(REDIS_WARNING, - "Can't unlink %s: %s", path, strerror(errno)); + "Can't unlink %s: %s", buf, strerror(errno)); redisPanic("Unrecoverable Disk store errore. Existing."); } } @@ -319,3 +350,160 @@ void dsFlushDb(int dbid) { } } } + +void dsRdbSaveSetState(int state) { + pthread_mutex_lock(&server.bgsavethread_mutex); + server.bgsavethread_state = state; + pthread_mutex_unlock(&server.bgsavethread_mutex); +} + +void *dsRdbSave_thread(void *arg) { + char tmpfile[256], *filename = (char*)arg; + struct dirent *dp, de; + int j, i, last_dbid = -1; + FILE *fp; + + /* Change state to ACTIVE, to signal there is a saving thead working. */ + redisLog(REDIS_NOTICE,"Diskstore BGSAVE thread started"); + dsRdbSaveSetState(REDIS_BGSAVE_THREAD_ACTIVE); + + snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); + fp = fopen(tmpfile,"w"); + if (!fp) { + redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s", + strerror(errno)); + dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR); + return NULL; + } + if (fwrite("REDIS0001",9,1,fp) == 0) goto werr; + + sleep(5); + + /* Scan all diskstore dirs looking for keys */ + for (j = 0; j < 256; j++) { + for (i = 0; i < 256; i++) { + DIR *dir; + char buf[1024]; + + /* For every directory, collect all the keys */ + snprintf(buf,sizeof(buf),"%s/%02x/%02x",server.ds_path,j,i); + if ((dir = opendir(buf)) == NULL) { + redisLog(REDIS_WARNING,"Disk store can't open dir %s: %s", + buf, strerror(errno)); + goto werr; + } + + while(1) { + char buf[1024]; + int dbid; + FILE *entryfp; + + readdir_r(dir,&de,&dp); + if (dp == NULL) break; + if (dp->d_name[0] == '.') continue; + /* If there is a '-' char in the file name, it's a temp file */ + if (strchr(dp->d_name,'-') != NULL) continue; + + /* Emit the SELECT DB opcode if needed. */ + dbid = dsGetDbidFromFilename(dp->d_name); + if (dbid != last_dbid) { + last_dbid = dbid; + if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr; + if (rdbSaveLen(fp,dbid) == -1) goto werr; + } + + /* Let's copy this file into the target .rdb */ + snprintf(buf,sizeof(buf),"%s/%02x/%02x/%s", + server.ds_path,j,i,dp->d_name); + if ((entryfp = fopen(buf,"r")) == NULL) { + redisLog(REDIS_WARNING,"Can't open %s: %s", + buf,strerror(errno)); + closedir(dir); + goto werr; + } + while(1) { + int nread = fread(buf,1,sizeof(buf),entryfp); + + if (nread == 0) { + if (ferror(entryfp)) { + redisLog(REDIS_WARNING,"Error reading from file entry while performing BGSAVE for diskstore: %s", strerror(errno)); + closedir(dir); + goto werr; + } else { + break; + } + } + if (fwrite(buf,1,nread,fp) != (unsigned)nread) { + closedir(dir); + goto werr; + } + } + fclose(entryfp); + } + closedir(dir); + } + } + + /* Output the end of file opcode */ + if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr; + + /* Make sure data will not remain on the OS's output buffers */ + fflush(fp); + fsync(fileno(fp)); + fclose(fp); + zfree(filename); + + /* Use RENAME to make sure the DB file is changed atomically only + * if the generate DB file is ok. */ + if (rename(tmpfile,filename) == -1) { + redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s (diskstore)", strerror(errno)); + unlink(tmpfile); + dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR); + return NULL; + } + redisLog(REDIS_NOTICE,"DB saved on disk by diskstore thread"); + dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_OK); + return NULL; + +werr: + zfree(filename); + fclose(fp); + unlink(tmpfile); + dsRdbSaveSetState(REDIS_BGSAVE_THREAD_DONE_ERR); + redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno)); + return NULL; +} + +int dsRdbSaveBackground(char *filename) { + pthread_t thread; + + if (pthread_create(&thread,NULL,dsRdbSave_thread,zstrdup(filename)) != 0) { + redisLog(REDIS_WARNING,"Can't create diskstore BGSAVE thread: %s", + strerror(errno)); + return REDIS_ERR; + } else { + server.bgsavethread = thread; + return REDIS_OK; + } +} + +int dsRdbSave(char *filename) { + /* A blocking save is actually a non blocking save... just we wait + * for it to terminate in a non-busy loop. */ + + redisLog(REDIS_NOTICE,"Starting a blocking SAVE (BGSAVE + blocking wait)"); + server.dirty_before_bgsave = server.dirty; + if (dsRdbSaveBackground(filename) == REDIS_ERR) return REDIS_ERR; + while(1) { + usleep(1000); + int state; + + pthread_mutex_lock(&server.bgsavethread_mutex); + state = server.bgsavethread_state; + pthread_mutex_unlock(&server.bgsavethread_mutex); + + if (state == REDIS_BGSAVE_THREAD_DONE_OK || + state == REDIS_BGSAVE_THREAD_DONE_ERR) break; + } + return REDIS_OK; +}