return rdbWriteRaw(fp,buf,len);
}
-/* Save a Redis object. */
+/* Save a Redis object. Returns -1 on error, 0 on success. */
int rdbSaveObject(FILE *fp, robj *o) {
int n, nwritten = 0;
} else if (o->type == REDIS_LIST) {
/* Save a list value */
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
- unsigned char *p;
- unsigned char *vstr;
- unsigned int vlen;
- long long vlong;
+ size_t l = ziplistBlobLen((unsigned char*)o->ptr);
- if ((n = rdbSaveLen(fp,ziplistLen(o->ptr))) == -1) return -1;
+ if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
nwritten += n;
-
- p = ziplistIndex(o->ptr,0);
- while(ziplistGet(p,&vstr,&vlen,&vlong)) {
- if (vstr) {
- if ((n = rdbSaveRawString(fp,vstr,vlen)) == -1)
- return -1;
- nwritten += n;
- } else {
- if ((n = rdbSaveLongLongAsStringObject(fp,vlong)) == -1)
- return -1;
- nwritten += n;
- }
- p = ziplistNext(o->ptr,p);
- }
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
list *list = o->ptr;
listIter li;
}
dictReleaseIterator(di);
} else if (o->encoding == REDIS_ENCODING_INTSET) {
- intset *is = o->ptr;
- int64_t llval;
- int i = 0;
+ size_t l = intsetBlobLen((intset*)o->ptr);
- if ((n = rdbSaveLen(fp,intsetLen(is))) == -1) return -1;
+ if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
nwritten += n;
-
- while(intsetGet(is,i++,&llval)) {
- if ((n = rdbSaveLongLongAsStringObject(fp,llval)) == -1) return -1;
- nwritten += n;
- }
} else {
redisPanic("Unknown set encoding");
}
} else if (o->type == REDIS_ZSET) {
- /* Save a set value */
- zset *zs = o->ptr;
- dictIterator *di = dictGetIterator(zs->dict);
- dictEntry *de;
-
- if ((n = rdbSaveLen(fp,dictSize(zs->dict))) == -1) return -1;
- nwritten += n;
-
- while((de = dictNext(di)) != NULL) {
- robj *eleobj = dictGetEntryKey(de);
- double *score = dictGetEntryVal(de);
+ /* Save a sorted set value */
+ if (o->encoding == REDIS_ENCODING_ZIPLIST) {
+ size_t l = ziplistBlobLen((unsigned char*)o->ptr);
- if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
+ if ((n = rdbSaveRawString(fp,o->ptr,l)) == -1) return -1;
nwritten += n;
- if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1;
+ } else if (o->encoding == REDIS_ENCODING_SKIPLIST) {
+ zset *zs = o->ptr;
+ dictIterator *di = dictGetIterator(zs->dict);
+ dictEntry *de;
+
+ if ((n = rdbSaveLen(fp,dictSize(zs->dict))) == -1) return -1;
nwritten += n;
+
+ while((de = dictNext(di)) != NULL) {
+ robj *eleobj = dictGetEntryKey(de);
+ double *score = dictGetEntryVal(de);
+
+ if ((n = rdbSaveStringObject(fp,eleobj)) == -1) return -1;
+ nwritten += n;
+ if ((n = rdbSaveDoubleValue(fp,*score)) == -1) return -1;
+ nwritten += n;
+ }
+ dictReleaseIterator(di);
+ } else {
+ redisPanic("Unknown sorted set encoding");
}
- dictReleaseIterator(di);
} else if (o->type == REDIS_HASH) {
/* Save a hash value */
if (o->encoding == REDIS_ENCODING_ZIPMAP) {
vtype = val->type;
if (vtype == REDIS_HASH && val->encoding == REDIS_ENCODING_ZIPMAP)
vtype = REDIS_HASH_ZIPMAP;
+ else if (vtype == REDIS_LIST && val->encoding == REDIS_ENCODING_ZIPLIST)
+ vtype = REDIS_LIST_ZIPLIST;
+ else if (vtype == REDIS_SET && val->encoding == REDIS_ENCODING_INTSET)
+ vtype = REDIS_SET_INTSET;
+ else if (vtype == REDIS_ZSET && val->encoding == REDIS_ENCODING_ZIPLIST)
+ vtype = REDIS_ZSET_ZIPLIST;
/* Save type, key, value */
- if (rdbSaveType(fp,val->type) == -1) return -1;
+ if (rdbSaveType(fp,vtype) == -1) return -1;
if (rdbSaveStringObject(fp,key) == -1) return -1;
if (rdbSaveObject(fp,val) == -1) return -1;
return 1;
} else if (type == REDIS_ZSET) {
/* Read list/set value */
size_t zsetlen;
+ size_t maxelelen = 0;
zset *zs;
if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
o = createZsetObject();
zs = o->ptr;
+
/* Load every single element of the list/set */
while(zsetlen--) {
robj *ele;
if ((ele = rdbLoadEncodedStringObject(fp)) == NULL) return NULL;
ele = tryObjectEncoding(ele);
if (rdbLoadDoubleValue(fp,&score) == -1) return NULL;
+
+ /* Don't care about integer-encoded strings. */
+ if (ele->encoding == REDIS_ENCODING_RAW &&
+ sdslen(ele->ptr) > maxelelen)
+ maxelelen = sdslen(ele->ptr);
+
znode = zslInsert(zs->zsl,score,ele);
dictAdd(zs->dict,ele,&znode->score);
incrRefCount(ele); /* added to skiplist */
}
+
+ /* Convert *after* loading, since sorted sets are not stored ordered. */
+ if (zsetLength(o) <= server.zset_max_ziplist_entries &&
+ maxelelen <= server.zset_max_ziplist_value)
+ zsetConvert(o,REDIS_ENCODING_ZIPLIST);
} else if (type == REDIS_HASH) {
size_t hashlen;
dictAdd((dict*)o->ptr,key,val);
}
}
- } else if (type == REDIS_HASH_ZIPMAP) {
+ } else if (type == REDIS_HASH_ZIPMAP ||
+ type == REDIS_LIST_ZIPLIST ||
+ type == REDIS_SET_INTSET ||
+ type == REDIS_ZSET_ZIPLIST)
+ {
robj *aux = rdbLoadStringObject(fp);
if (aux == NULL) return NULL;
- o = createHashObject();
- o->encoding = REDIS_ENCODING_ZIPMAP;
+ o = createObject(REDIS_STRING,NULL); /* string is just placeholder */
o->ptr = zmalloc(sdslen(aux->ptr));
memcpy(o->ptr,aux->ptr,sdslen(aux->ptr));
decrRefCount(aux);
- /* FIXME: conver the object if needed */
+
+ /* Fix the object encoding, and make sure to convert the encoded
+ * data type into the base type if accordingly to the current
+ * configuration there are too many elements in the encoded data
+ * type. Note that we only check the length and not max element
+ * size as this is an O(N) scan. Eventually everything will get
+ * converted. */
+ switch(type) {
+ case REDIS_HASH_ZIPMAP:
+ o->type = REDIS_HASH;
+ o->encoding = REDIS_ENCODING_ZIPMAP;
+ if (zipmapLen(o->ptr) > server.hash_max_zipmap_entries)
+ convertToRealHash(o);
+ break;
+ case REDIS_LIST_ZIPLIST:
+ o->type = REDIS_LIST;
+ o->encoding = REDIS_ENCODING_ZIPLIST;
+ if (ziplistLen(o->ptr) > server.list_max_ziplist_entries)
+ listTypeConvert(o,REDIS_ENCODING_LINKEDLIST);
+ break;
+ case REDIS_SET_INTSET:
+ o->type = REDIS_SET;
+ o->encoding = REDIS_ENCODING_INTSET;
+ if (intsetLen(o->ptr) > server.set_max_intset_entries)
+ setTypeConvert(o,REDIS_ENCODING_HT);
+ break;
+ case REDIS_ZSET_ZIPLIST:
+ o->type = REDIS_ZSET;
+ o->encoding = REDIS_ENCODING_ZIPLIST;
+ if (zsetLength(o) > server.zset_max_ziplist_entries)
+ zsetConvert(o,REDIS_ENCODING_RAW);
+ break;
+ default:
+ redisPanic("Unknown enoding");
+ break;
+ }
} else {
redisPanic("Unknown object type");
}