X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/d4a3cfed9c95e0ca5f8bcc771f8844ad17895c69..8361d6c406fec73106c627159f28e092cedef1ee:/src/rdb.c diff --git a/src/rdb.c b/src/rdb.c index 2c0feb6d..90c2ea08 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1,5 +1,7 @@ #include "redis.h" #include "lzf.h" /* LZF compression library */ +#include "zipmap.h" +#include "endianconv.h" #include #include @@ -19,17 +21,15 @@ int rdbSaveType(rio *rdb, unsigned char type) { return rdbWriteRaw(rdb,&type,1); } +/* Load a "type" in RDB format, that is a one byte unsigned integer. + * This function is not only used to load object types, but also special + * "types" like the end-of-file type, the EXPIRE type, and so forth. */ int rdbLoadType(rio *rdb) { unsigned char type; if (rioRead(rdb,&type,1) == 0) return -1; return type; } -int rdbSaveTime(rio *rdb, time_t t) { - int32_t t32 = (int32_t) t; - return rdbWriteRaw(rdb,&t32,4); -} - time_t rdbLoadTime(rio *rdb) { int32_t t32; if (rioRead(rdb,&t32,4) == 0) return -1; @@ -253,7 +253,7 @@ int rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) { /* Try LZF compression - under 20 bytes it's unable to compress even * aaaaaaaaaaaaaaaaaa so skip it */ - if (server.rdbcompression && len > 20) { + if (server.rdb_compression && len > 20) { n = rdbSaveLzfStringObject(rdb,s,len); if (n == -1) return -1; if (n > 0) return n; @@ -424,8 +424,8 @@ int rdbSaveObjectType(rio *rdb, robj *o) { else redisPanic("Unknown sorted set encoding"); case REDIS_HASH: - if (o->encoding == REDIS_ENCODING_ZIPMAP) - return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH_ZIPMAP); + if (o->encoding == REDIS_ENCODING_ZIPLIST) + return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH_ZIPLIST); else if (o->encoding == REDIS_ENCODING_HT) return rdbSaveType(rdb,REDIS_RDB_TYPE_HASH); else @@ -436,7 +436,8 @@ int rdbSaveObjectType(rio *rdb, robj *o) { return -1; /* avoid warning */ } -/* Load object type. Return -1 when the byte doesn't contain an object type. */ +/* Use rdbLoadType() to load a TYPE in RDB format, but returns -1 if the + * type is not specifically a valid Object Type. */ int rdbLoadObjectType(rio *rdb) { int type; if ((type = rdbLoadType(rdb)) == -1) return -1; @@ -530,12 +531,13 @@ int rdbSaveObject(rio *rdb, robj *o) { } } else if (o->type == REDIS_HASH) { /* Save a hash value */ - if (o->encoding == REDIS_ENCODING_ZIPMAP) { - size_t l = zipmapBlobLen((unsigned char*)o->ptr); + if (o->encoding == REDIS_ENCODING_ZIPLIST) { + size_t l = ziplistBlobLen((unsigned char*)o->ptr); if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1; nwritten += n; - } else { + + } else if (o->encoding == REDIS_ENCODING_HT) { dictIterator *di = dictGetIterator(o->ptr); dictEntry *de; @@ -552,7 +554,11 @@ int rdbSaveObject(rio *rdb, robj *o) { nwritten += n; } dictReleaseIterator(di); + + } else { + redisPanic("Unknown hash encoding"); } + } else { redisPanic("Unknown object type"); } @@ -596,10 +602,12 @@ int rdbSave(char *filename) { dictIterator *di = NULL; dictEntry *de; char tmpfile[256]; + char magic[10]; int j; long long now = mstime(); FILE *fp; rio rdb; + uint64_t cksum; snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid()); fp = fopen(tmpfile,"w"); @@ -610,7 +618,10 @@ int rdbSave(char *filename) { } rioInitWithFile(&rdb,fp); - if (rdbWriteRaw(&rdb,"REDIS0003",9) == -1) goto werr; + if (server.rdb_checksum) + rdb.update_cksum = rioGenericUpdateChecksum; + snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION); + if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -638,9 +649,17 @@ int rdbSave(char *filename) { } dictReleaseIterator(di); } + di = NULL; /* So that we don't release it again on error. */ + /* EOF opcode */ if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr; + /* CRC64 checksum. It will be zero if checksum computation is disabled, the + * loading code skips the check in this case. */ + cksum = rdb.cksum; + memrev64ifbe(&cksum); + rioWrite(&rdb,&cksum,8); + /* Make sure data will not remain on the OS's output buffers */ fflush(fp); fsync(fileno(fp)); @@ -656,6 +675,7 @@ int rdbSave(char *filename) { redisLog(REDIS_NOTICE,"DB saved on disk"); server.dirty = 0; server.lastsave = time(NULL); + server.lastbgsave_status = REDIS_OK; return REDIS_OK; werr: @@ -670,7 +690,7 @@ int rdbSaveBackground(char *filename) { pid_t childpid; long long start; - if (server.bgsavechildpid != -1) return REDIS_ERR; + if (server.rdb_child_pid != -1) return REDIS_ERR; server.dirty_before_bgsave = server.dirty; @@ -682,7 +702,7 @@ int rdbSaveBackground(char *filename) { if (server.ipfd > 0) close(server.ipfd); if (server.sofd > 0) close(server.sofd); retval = rdbSave(filename); - _exit((retval == REDIS_OK) ? 0 : 1); + exitFromChild((retval == REDIS_OK) ? 0 : 1); } else { /* Parent */ server.stat_fork_time = ustime()-start; @@ -692,7 +712,8 @@ int rdbSaveBackground(char *filename) { return REDIS_ERR; } redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid); - server.bgsavechildpid = childpid; + server.rdb_save_time_start = time(NULL); + server.rdb_child_pid = childpid; updateDictResizePolicy(); return REDIS_OK; } @@ -713,7 +734,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { size_t len; unsigned int i; - redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",rdbtype,rdb->tell(rdb)); + redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",rdbtype,rioTell(rdb)); if (rdbtype == REDIS_RDB_TYPE_STRING) { /* Read string value */ if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; @@ -824,55 +845,74 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { maxelelen <= server.zset_max_ziplist_value) zsetConvert(o,REDIS_ENCODING_ZIPLIST); } else if (rdbtype == REDIS_RDB_TYPE_HASH) { - size_t hashlen; + size_t len; + int ret; + + len = rdbLoadLen(rdb, NULL); + if (len == REDIS_RDB_LENERR) return NULL; - if ((hashlen = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL; o = createHashObject(); + /* Too many entries? Use an hash table. */ - if (hashlen > server.hash_max_zipmap_entries) - convertToRealHash(o); - /* Load every key/value, then set it into the zipmap or hash - * table, as needed. */ - while(hashlen--) { - robj *key, *val; - - if ((key = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; - if ((val = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; - /* If we are using a zipmap and there are too big values - * the object is converted to real hash table encoding. */ - if (o->encoding != REDIS_ENCODING_HT && - ((key->encoding == REDIS_ENCODING_RAW && - sdslen(key->ptr) > server.hash_max_zipmap_value) || - (val->encoding == REDIS_ENCODING_RAW && - sdslen(val->ptr) > server.hash_max_zipmap_value))) + if (len > server.hash_max_ziplist_entries) + hashTypeConvert(o, REDIS_ENCODING_HT); + + /* Load every field and value into the ziplist */ + while (o->encoding == REDIS_ENCODING_ZIPLIST && len > 0) { + robj *field, *value; + + len--; + /* Load raw strings */ + field = rdbLoadStringObject(rdb); + if (field == NULL) return NULL; + redisAssert(field->encoding == REDIS_ENCODING_RAW); + value = rdbLoadStringObject(rdb); + if (value == NULL) return NULL; + redisAssert(field->encoding == REDIS_ENCODING_RAW); + + /* Add pair to ziplist */ + o->ptr = ziplistPush(o->ptr, field->ptr, sdslen(field->ptr), ZIPLIST_TAIL); + o->ptr = ziplistPush(o->ptr, value->ptr, sdslen(value->ptr), ZIPLIST_TAIL); + /* Convert to hash table if size threshold is exceeded */ + if (sdslen(field->ptr) > server.hash_max_ziplist_value || + sdslen(value->ptr) > server.hash_max_ziplist_value) { - convertToRealHash(o); + decrRefCount(field); + decrRefCount(value); + hashTypeConvert(o, REDIS_ENCODING_HT); + break; } + decrRefCount(field); + decrRefCount(value); + } - if (o->encoding == REDIS_ENCODING_ZIPMAP) { - unsigned char *zm = o->ptr; - robj *deckey, *decval; - - /* We need raw string objects to add them to the zipmap */ - deckey = getDecodedObject(key); - decval = getDecodedObject(val); - zm = zipmapSet(zm,deckey->ptr,sdslen(deckey->ptr), - decval->ptr,sdslen(decval->ptr),NULL); - o->ptr = zm; - decrRefCount(deckey); - decrRefCount(decval); - decrRefCount(key); - decrRefCount(val); - } else { - key = tryObjectEncoding(key); - val = tryObjectEncoding(val); - dictAdd((dict*)o->ptr,key,val); - } + /* Load remaining fields and values into the hash table */ + while (o->encoding == REDIS_ENCODING_HT && len > 0) { + robj *field, *value; + + len--; + /* Load encoded strings */ + field = rdbLoadEncodedStringObject(rdb); + if (field == NULL) return NULL; + value = rdbLoadEncodedStringObject(rdb); + if (value == NULL) return NULL; + + field = tryObjectEncoding(field); + value = tryObjectEncoding(value); + + /* Add pair to hash table */ + ret = dictAdd((dict*)o->ptr, field, value); + redisAssert(ret == REDIS_OK); } + + /* All pairs should be read by now */ + redisAssert(len == 0); + } else if (rdbtype == REDIS_RDB_TYPE_HASH_ZIPMAP || rdbtype == REDIS_RDB_TYPE_LIST_ZIPLIST || rdbtype == REDIS_RDB_TYPE_SET_INTSET || - rdbtype == REDIS_RDB_TYPE_ZSET_ZIPLIST) + rdbtype == REDIS_RDB_TYPE_ZSET_ZIPLIST || + rdbtype == REDIS_RDB_TYPE_HASH_ZIPLIST) { robj *aux = rdbLoadStringObject(rdb); @@ -890,10 +930,33 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { * converted. */ switch(rdbtype) { case REDIS_RDB_TYPE_HASH_ZIPMAP: - o->type = REDIS_HASH; - o->encoding = REDIS_ENCODING_ZIPMAP; - if (zipmapLen(o->ptr) > server.hash_max_zipmap_entries) - convertToRealHash(o); + /* Convert to ziplist encoded hash. This must be deprecated + * when loading dumps created by Redis 2.4 gets deprecated. */ + { + unsigned char *zl = ziplistNew(); + unsigned char *zi = zipmapRewind(o->ptr); + unsigned char *fstr, *vstr; + unsigned int flen, vlen; + unsigned int maxlen = 0; + + while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) { + if (flen > maxlen) maxlen = flen; + if (vlen > maxlen) maxlen = vlen; + zl = ziplistPush(zl, fstr, flen, ZIPLIST_TAIL); + zl = ziplistPush(zl, vstr, vlen, ZIPLIST_TAIL); + } + + zfree(o->ptr); + o->ptr = zl; + o->type = REDIS_HASH; + o->encoding = REDIS_ENCODING_ZIPLIST; + + if (hashTypeLength(o) > server.hash_max_ziplist_entries || + maxlen > server.hash_max_ziplist_value) + { + hashTypeConvert(o, REDIS_ENCODING_HT); + } + } break; case REDIS_RDB_TYPE_LIST_ZIPLIST: o->type = REDIS_LIST; @@ -913,6 +976,12 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { if (zsetLength(o) > server.zset_max_ziplist_entries) zsetConvert(o,REDIS_ENCODING_SKIPLIST); break; + case REDIS_RDB_TYPE_HASH_ZIPLIST: + o->type = REDIS_HASH; + o->encoding = REDIS_ENCODING_ZIPLIST; + if (hashTypeLength(o) > server.hash_max_ziplist_entries) + hashTypeConvert(o, REDIS_ENCODING_HT); + break; default: redisPanic("Unknown encoding"); break; @@ -964,6 +1033,8 @@ int rdbLoad(char *filename) { return REDIS_ERR; } rioInitWithFile(&rdb,fp); + if (server.rdb_checksum) + rdb.update_cksum = rioGenericUpdateChecksum; if (rioRead(&rdb,buf,9) == 0) goto eoferr; buf[9] = '\0'; if (memcmp(buf,"REDIS",5) != 0) { @@ -973,7 +1044,7 @@ int rdbLoad(char *filename) { return REDIS_ERR; } rdbver = atoi(buf+5); - if (rdbver < 1 || rdbver > 3) { + if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) { fclose(fp); redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver); errno = EINVAL; @@ -987,7 +1058,7 @@ int rdbLoad(char *filename) { /* Serve the clients from time to time */ if (!(loops++ % 1000)) { - loadingProgress(rdb.tell(&rdb)); + loadingProgress(rioTell(&rdb)); aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); } @@ -1026,8 +1097,12 @@ int rdbLoad(char *filename) { if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr; /* Read value */ if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr; - /* Check if the key already expired */ - if (expiretime != -1 && expiretime < now) { + /* Check if the key already expired. This function is used when loading + * an RDB file from disk, either at startup, or when an RDB was + * received from the master. In the latter case, the master is + * responsible for key expiry. If we would expire keys here, the + * snapshot taken by the master may not be reflected on the slave. */ + if (server.masterhost == NULL && expiretime != -1 && expiretime < now) { decrRefCount(key); decrRefCount(val); continue; @@ -1040,6 +1115,20 @@ int rdbLoad(char *filename) { decrRefCount(key); } + /* Verify the checksum if RDB version is >= 5 */ + if (rdbver >= 5 && server.rdb_checksum) { + uint64_t cksum, expected = rdb.cksum; + + if (rioRead(&rdb,&cksum,8) == 0) goto eoferr; + memrev64ifbe(&cksum); + if (cksum == 0) { + redisLog(REDIS_WARNING,"RDB file was saved with checksum disabled: no check performed."); + } else if (cksum != expected) { + redisLog(REDIS_WARNING,"Wrong RDB checksum. Aborting now."); + exit(1); + } + } + fclose(fp); stopLoading(); return REDIS_OK; @@ -1057,25 +1146,30 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) { "Background saving terminated with success"); server.dirty = server.dirty - server.dirty_before_bgsave; server.lastsave = time(NULL); + server.lastbgsave_status = REDIS_OK; } else if (!bysignal && exitcode != 0) { redisLog(REDIS_WARNING, "Background saving error"); + server.lastbgsave_status = REDIS_ERR; } else { redisLog(REDIS_WARNING, "Background saving terminated by signal %d", bysignal); - rdbRemoveTempFile(server.bgsavechildpid); + rdbRemoveTempFile(server.rdb_child_pid); + server.lastbgsave_status = REDIS_ERR; } - server.bgsavechildpid = -1; + server.rdb_child_pid = -1; + server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start; + server.rdb_save_time_start = -1; /* Possibly there are slaves waiting for a BGSAVE in order to be served * (the first stage of SYNC is a bulk transfer of dump.rdb) */ updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR); } void saveCommand(redisClient *c) { - if (server.bgsavechildpid != -1) { + if (server.rdb_child_pid != -1) { addReplyError(c,"Background save already in progress"); return; } - if (rdbSave(server.dbfilename) == REDIS_OK) { + if (rdbSave(server.rdb_filename) == REDIS_OK) { addReply(c,shared.ok); } else { addReply(c,shared.err); @@ -1083,11 +1177,11 @@ void saveCommand(redisClient *c) { } void bgsaveCommand(redisClient *c) { - if (server.bgsavechildpid != -1) { + if (server.rdb_child_pid != -1) { addReplyError(c,"Background save already in progress"); - } else if (server.bgrewritechildpid != -1) { + } else if (server.aof_child_pid != -1) { addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress"); - } else if (rdbSaveBackground(server.dbfilename) == REDIS_OK) { + } else if (rdbSaveBackground(server.rdb_filename) == REDIS_OK) { addReplyStatus(c,"Background saving started"); } else { addReply(c,shared.err);