]> git.saurik.com Git - redis.git/blobdiff - src/rdb.c
Read-only flag removed from PUBLISH command.
[redis.git] / src / rdb.c
index c74ec41c53fcce04e2ceaa40b5423ae43964b6e0..519b645d157281e86b6e2213ddfdfb648e36069a 100644 (file)
--- a/src/rdb.c
+++ b/src/rdb.c
@@ -616,7 +616,7 @@ int rdbSave(char *filename) {
     }
 
     rioInitWithFile(&rdb,fp);
-    if (rdbWriteRaw(&rdb,"REDIS0003",9) == -1) goto werr;
+    if (rdbWriteRaw(&rdb,"REDIS0004",9) == -1) goto werr;
 
     for (j = 0; j < server.dbnum; j++) {
         redisDb *db = server.db+j;
@@ -662,6 +662,7 @@ int rdbSave(char *filename) {
     redisLog(REDIS_NOTICE,"DB saved on disk");
     server.dirty = 0;
     server.lastsave = time(NULL);
+    server.lastbgsave_status = REDIS_OK;
     return REDIS_OK;
 
 werr:
@@ -843,9 +844,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
             hashTypeConvert(o, REDIS_ENCODING_HT);
 
         /* Load every field and value into the ziplist */
-        while (o->encoding == REDIS_ENCODING_ZIPLIST && len-- > 0) {
+        while (o->encoding == REDIS_ENCODING_ZIPLIST && len > 0) {
             robj *field, *value;
 
+            len--;
             /* Load raw strings */
             field = rdbLoadStringObject(rdb);
             if (field == NULL) return NULL;
@@ -854,6 +856,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
             if (value == NULL) return NULL;
             redisAssert(field->encoding == REDIS_ENCODING_RAW);
 
+            /* Add pair to ziplist */
+            o->ptr = ziplistPush(o->ptr, field->ptr, sdslen(field->ptr), ZIPLIST_TAIL);
+            o->ptr = ziplistPush(o->ptr, value->ptr, sdslen(value->ptr), ZIPLIST_TAIL);
+
             /* Convert to hash table if size threshold is exceeded */
             if (sdslen(field->ptr) > server.hash_max_ziplist_value ||
                 sdslen(value->ptr) > server.hash_max_ziplist_value)
@@ -861,16 +867,13 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
                 hashTypeConvert(o, REDIS_ENCODING_HT);
                 break;
             }
-
-            /* Add pair to ziplist */
-            o->ptr = ziplistPush(o->ptr, field->ptr, sdslen(field->ptr), ZIPLIST_TAIL);
-            o->ptr = ziplistPush(o->ptr, value->ptr, sdslen(value->ptr), ZIPLIST_TAIL);
         }
 
         /* Load remaining fields and values into the hash table */
-        while (o->encoding == REDIS_ENCODING_HT && len-- > 0) {
+        while (o->encoding == REDIS_ENCODING_HT && len > 0) {
             robj *field, *value;
 
+            len--;
             /* Load encoded strings */
             field = rdbLoadEncodedStringObject(rdb);
             if (field == NULL) return NULL;
@@ -915,12 +918,13 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
                 {
                     unsigned char *zl = ziplistNew();
                     unsigned char *zi = zipmapRewind(o->ptr);
+                    unsigned char *fstr, *vstr;
+                    unsigned int flen, vlen;
+                    unsigned int maxlen = 0;
 
-                    while (zi != NULL) {
-                        unsigned char *fstr, *vstr;
-                        unsigned int flen, vlen;
-
-                        zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen);
+                    while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) {
+                        if (flen > maxlen) maxlen = flen;
+                        if (vlen > maxlen) maxlen = vlen;
                         zl = ziplistPush(zl, fstr, flen, ZIPLIST_TAIL);
                         zl = ziplistPush(zl, vstr, vlen, ZIPLIST_TAIL);
                     }
@@ -930,8 +934,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
                     o->type = REDIS_HASH;
                     o->encoding = REDIS_ENCODING_ZIPLIST;
 
-                    if (hashTypeLength(o) > server.hash_max_ziplist_entries)
+                    if (hashTypeLength(o) > server.hash_max_ziplist_entries ||
+                        maxlen > server.hash_max_ziplist_value)
+                    {
                         hashTypeConvert(o, REDIS_ENCODING_HT);
+                    }
                 }
                 break;
             case REDIS_RDB_TYPE_LIST_ZIPLIST:
@@ -1018,7 +1025,7 @@ int rdbLoad(char *filename) {
         return REDIS_ERR;
     }
     rdbver = atoi(buf+5);
-    if (rdbver < 1 || rdbver > 3) {
+    if (rdbver < 1 || rdbver > 4) {
         fclose(fp);
         redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
         errno = EINVAL;
@@ -1071,8 +1078,12 @@ int rdbLoad(char *filename) {
         if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
         /* Read value */
         if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
-        /* Check if the key already expired */
-        if (expiretime != -1 && expiretime < now) {
+        /* Check if the key already expired. This function is used when loading
+         * an RDB file from disk, either at startup, or when an RDB was
+         * received from the master. In the latter case, the master is
+         * responsible for key expiry. If we would expire keys here, the
+         * snapshot taken by the master may not be reflected on the slave. */
+        if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
             decrRefCount(key);
             decrRefCount(val);
             continue;
@@ -1102,12 +1113,15 @@ void backgroundSaveDoneHandler(int exitcode, int bysignal) {
             "Background saving terminated with success");
         server.dirty = server.dirty - server.dirty_before_bgsave;
         server.lastsave = time(NULL);
+        server.lastbgsave_status = REDIS_OK;
     } else if (!bysignal && exitcode != 0) {
         redisLog(REDIS_WARNING, "Background saving error");
+        server.lastbgsave_status = REDIS_ERR;
     } else {
         redisLog(REDIS_WARNING,
             "Background saving terminated by signal %d", bysignal);
         rdbRemoveTempFile(server.rdb_child_pid);
+        server.lastbgsave_status = REDIS_ERR;
     }
     server.rdb_child_pid = -1;
     /* Possibly there are slaves waiting for a BGSAVE in order to be served