]> git.saurik.com Git - redis.git/blobdiff - redis.c
zipmap to hash conversion in HSET
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index cc64efb846a49d0ae19ff4c0702072e413d89167..b45b86184a51b46afed6a4b19d6803ceff7359c7 100644 (file)
--- a/redis.c
+++ b/redis.c
 #define REDIS_ENCODING_ZIPMAP 2 /* Encoded as zipmap */
 #define REDIS_ENCODING_HT 3     /* Encoded as an hash table */
 
+static char* strencoding[] = {
+    "raw", "int", "zipmap", "hashtable"
+};
+
 /* Object types only used for dumping to disk */
 #define REDIS_EXPIRETIME 253
 #define REDIS_SELECTDB 254
@@ -590,6 +594,7 @@ static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mas
 static struct redisCommand *lookupCommand(char *name);
 static void call(redisClient *c, struct redisCommand *cmd);
 static void resetClient(redisClient *c);
+static void convertToRealHash(robj *o);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
@@ -673,8 +678,11 @@ static void brpopCommand(redisClient *c);
 static void appendCommand(redisClient *c);
 static void substrCommand(redisClient *c);
 static void zrankCommand(redisClient *c);
+static void zrevrankCommand(redisClient *c);
 static void hsetCommand(redisClient *c);
 static void hgetCommand(redisClient *c);
+static void hdelCommand(redisClient *c);
+static void zremrangebyrankCommand(redisClient *c);
 static void zunionCommand(redisClient *c);
 static void zinterCommand(redisClient *c);
 
@@ -724,6 +732,7 @@ static struct redisCommand cmdTable[] = {
     {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
     {"zrem",zremCommand,3,REDIS_CMD_BULK,1,1,1},
     {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,1,1,1},
+    {"zremrangebyrank",zremrangebyrankCommand,4,REDIS_CMD_INLINE,1,1,1},
     {"zunion",zunionCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,0,0,0},
     {"zinter",zinterCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,0,0,0},
     {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,1,1,1},
@@ -733,8 +742,10 @@ static struct redisCommand cmdTable[] = {
     {"zcard",zcardCommand,2,REDIS_CMD_INLINE,1,1,1},
     {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
     {"zrank",zrankCommand,3,REDIS_CMD_INLINE,1,1,1},
+    {"zrevrank",zrevrankCommand,3,REDIS_CMD_INLINE,1,1,1},
     {"hset",hsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
     {"hget",hgetCommand,3,REDIS_CMD_BULK,1,1,1},
+    {"hdel",hdelCommand,3,REDIS_CMD_BULK,1,1,1},
     {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
     {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
     {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
@@ -2937,7 +2948,7 @@ static int rdbSaveLen(FILE *fp, uint32_t len) {
 /* String objects in the form "2391" "-100" without any space and with a
  * range of values that can fit in an 8, 16 or 32 bit signed value can be
  * encoded as integers to save space */
-static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
+static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) {
     long long value;
     char *endptr, buf[32];
 
@@ -2948,7 +2959,7 @@ static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
 
     /* If the number converted back into a string is not identical
      * then it's not possible to encode the string as integer */
-    if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
+    if (strlen(buf) != len || memcmp(buf,s,len)) return 0;
 
     /* Finally check if it fits in our ranges */
     if (value >= -(1<<7) && value <= (1<<7)-1) {
@@ -2972,16 +2983,16 @@ static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
     }
 }
 
-static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
-    unsigned int comprlen, outlen;
+static int rdbSaveLzfStringObject(FILE *fp, unsigned char *s, size_t len) {
+    size_t comprlen, outlen;
     unsigned char byte;
     void *out;
 
     /* We require at least four bytes compression for this to be worth it */
-    outlen = sdslen(obj->ptr)-4;
-    if (outlen <= 0) return 0;
+    if (len <= 4) return 0;
+    outlen = len-4;
     if ((out = zmalloc(outlen+1)) == NULL) return 0;
-    comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
+    comprlen = lzf_compress(s, len, out, outlen);
     if (comprlen == 0) {
         zfree(out);
         return 0;
@@ -2990,7 +3001,7 @@ static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
     byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
     if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
     if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
-    if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
+    if (rdbSaveLen(fp,len) == -1) goto writeerr;
     if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
     zfree(out);
     return comprlen;
@@ -3002,16 +3013,13 @@ writeerr:
 
 /* Save a string objet as [len][data] on disk. If the object is a string
  * representation of an integer value we try to safe it in a special form */
-static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
-    size_t len;
+static int rdbSaveRawString(FILE *fp, unsigned char *s, size_t len) {
     int enclen;
 
-    len = sdslen(obj->ptr);
-
     /* Try integer encoding */
     if (len <= 11) {
         unsigned char buf[5];
-        if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
+        if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
             if (fwrite(buf,enclen,1,fp) == 0) return -1;
             return 0;
         }
@@ -3022,7 +3030,7 @@ static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
     if (server.rdbcompression && len > 20) {
         int retval;
 
-        retval = rdbSaveLzfStringObject(fp,obj);
+        retval = rdbSaveLzfStringObject(fp,s,len);
         if (retval == -1) return -1;
         if (retval > 0) return 0;
         /* retval == 0 means data can't be compressed, save the old way */
@@ -3030,7 +3038,7 @@ static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
 
     /* Store verbatim */
     if (rdbSaveLen(fp,len) == -1) return -1;
-    if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
+    if (len && fwrite(s,len,1,fp) == 0) return -1;
     return 0;
 }
 
@@ -3045,10 +3053,10 @@ static int rdbSaveStringObject(FILE *fp, robj *obj) {
      * this in order to avoid bugs) */
     if (obj->encoding != REDIS_ENCODING_RAW) {
         obj = getDecodedObject(obj);
-        retval = rdbSaveStringObjectRaw(fp,obj);
+        retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
         decrRefCount(obj);
     } else {
-        retval = rdbSaveStringObjectRaw(fp,obj);
+        retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
     }
     return retval;
 }
@@ -3126,6 +3134,33 @@ static int rdbSaveObject(FILE *fp, robj *o) {
             if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
         }
         dictReleaseIterator(di);
+    } else if (o->type == REDIS_HASH) {
+        /* Save a hash value */
+        if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+            unsigned char *p = zipmapRewind(o->ptr);
+            unsigned int count = zipmapLen(o->ptr);
+            unsigned char *key, *val;
+            unsigned int klen, vlen;
+
+            if (rdbSaveLen(fp,count) == -1) return -1;
+            while((p = zipmapNext(p,&key,&klen,&val,&vlen)) != NULL) {
+                if (rdbSaveRawString(fp,key,klen) == -1) return -1;
+                if (rdbSaveRawString(fp,val,vlen) == -1) return -1;
+            }
+        } else {
+            dictIterator *di = dictGetIterator(o->ptr);
+            dictEntry *de;
+
+            if (rdbSaveLen(fp,dictSize((dict*)o->ptr)) == -1) return -1;
+            while((de = dictNext(di)) != NULL) {
+                robj *key = dictGetEntryKey(de);
+                robj *val = dictGetEntryVal(de);
+
+                if (rdbSaveStringObject(fp,key) == -1) return -1;
+                if (rdbSaveStringObject(fp,val) == -1) return -1;
+            }
+            dictReleaseIterator(di);
+        }
     } else {
         redisAssert(0 != 0);
     }
@@ -3422,6 +3457,7 @@ static int rdbLoadDoubleValue(FILE *fp, double *val) {
 static robj *rdbLoadObject(int type, FILE *fp) {
     robj *o;
 
+    redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp));
     if (type == REDIS_STRING) {
         /* Read string value */
         if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
@@ -3450,7 +3486,7 @@ static robj *rdbLoadObject(int type, FILE *fp) {
         }
     } else if (type == REDIS_ZSET) {
         /* Read list/set value */
-        uint32_t zsetlen;
+        size_t zsetlen;
         zset *zs;
 
         if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
@@ -3468,6 +3504,46 @@ static robj *rdbLoadObject(int type, FILE *fp) {
             zslInsert(zs->zsl,*score,ele);
             incrRefCount(ele); /* added to skiplist */
         }
+    } else if (type == REDIS_HASH) {
+        size_t hashlen;
+
+        if ((hashlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
+        o = createHashObject();
+        /* Too many entries? Use an hash table. */
+        if (hashlen > server.hash_max_zipmap_entries)
+            convertToRealHash(o);
+        /* Load every key/value, then set it into the zipmap or hash
+         * table, as needed. */
+        while(hashlen--) {
+            robj *key, *val;
+
+            if ((key = rdbLoadStringObject(fp)) == NULL) return NULL;
+            if ((val = rdbLoadStringObject(fp)) == NULL) return NULL;
+            /* If we are using a zipmap and there are too big values
+             * the object is converted to real hash table encoding. */
+            if (o->encoding != REDIS_ENCODING_HT &&
+               (sdslen(key->ptr) > server.hash_max_zipmap_value ||
+                sdslen(val->ptr) > server.hash_max_zipmap_value))
+            {
+                    convertToRealHash(o);
+            }
+
+            if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+                unsigned char *zm = o->ptr;
+
+                zm = zipmapSet(zm,key->ptr,sdslen(key->ptr),
+                                  val->ptr,sdslen(val->ptr),NULL);
+                o->ptr = zm;
+                decrRefCount(key);
+                decrRefCount(val);
+            } else {
+                tryObjectEncoding(key);
+                tryObjectEncoding(val);
+                dictAdd((dict*)o->ptr,key,val);
+                incrRefCount(key);
+                incrRefCount(val);
+            }
+        }
     } else {
         redisAssert(0 != 0);
     }
@@ -3859,7 +3935,7 @@ static void substrCommand(redisClient *c) {
             rangelen = (end-start)+1;
 
             /* Return the result */
-            addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",rangelen));
+            addReplySds(c,sdscatprintf(sdsempty(),"$%zu\r\n",rangelen));
             range = sdsnewlen((char*)o->ptr+start,rangelen);
             addReplySds(c,range);
             addReply(c,shared.crlf);
@@ -3975,7 +4051,8 @@ static void typeCommand(redisClient *c) {
         case REDIS_LIST: type = "+list"; break;
         case REDIS_SET: type = "+set"; break;
         case REDIS_ZSET: type = "+zset"; break;
-        default: type = "unknown"; break;
+        case REDIS_HASH: type = "+hash"; break;
+        default: type = "+unknown"; break;
         }
     }
     addReplySds(c,sdsnew(type));
@@ -5015,6 +5092,31 @@ static void zslInsert(zskiplist *zsl, double score, robj *obj) {
     zsl->length++;
 }
 
+/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
+void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
+    int i;
+    for (i = 0; i < zsl->level; i++) {
+        if (update[i]->forward[i] == x) {
+            if (i > 0) {
+                update[i]->span[i-1] += x->span[i-1] - 1;
+            }
+            update[i]->forward[i] = x->forward[i];
+        } else {
+            /* invariant: i > 0, because update[0]->forward[0]
+             * is always equal to x */
+            update[i]->span[i-1] -= 1;
+        }
+    }
+    if (x->forward[0]) {
+        x->forward[0]->backward = x->backward;
+    } else {
+        zsl->tail = x->backward;
+    }
+    while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
+        zsl->level--;
+    zsl->length--;
+}
+
 /* Delete an element with matching score/object from the skiplist. */
 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
@@ -5033,27 +5135,8 @@ static int zslDelete(zskiplist *zsl, double score, robj *obj) {
      * is to find the element with both the right score and object. */
     x = x->forward[0];
     if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
-        for (i = 0; i < zsl->level; i++) {
-            if (update[i]->forward[i] == x) {
-                if (i > 0) {
-                    update[i]->span[i-1] += x->span[i-1] - 1;
-                }
-                update[i]->forward[i] = x->forward[i];
-            } else {
-                /* invariant: i > 0, because update[0]->forward[0]
-                 * is always equal to x */
-                update[i]->span[i-1] -= 1;
-            }
-        }
-        if (x->forward[0]) {
-            x->forward[0]->backward = x->backward;
-        } else {
-            zsl->tail = x->backward;
-        }
+        zslDeleteNode(zsl, x, update);
         zslFreeNode(x);
-        while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
-            zsl->level--;
-        zsl->length--;
         return 1;
     } else {
         return 0; /* not found */
@@ -5065,7 +5148,7 @@ static int zslDelete(zskiplist *zsl, double score, robj *obj) {
  * Min and mx are inclusive, so a score >= min || score <= max is deleted.
  * Note that this function takes the reference to the hash table view of the
  * sorted set, in order to remove the elements from the hash table too. */
-static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
+static unsigned long zslDeleteRangeByScore(zskiplist *zsl, double min, double max, dict *dict) {
     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
     unsigned long removed = 0;
     int i;
@@ -5080,35 +5163,44 @@ static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict
      * is to find the element with both the right score and object. */
     x = x->forward[0];
     while (x && x->score <= max) {
-        zskiplistNode *next;
+        zskiplistNode *next = x->forward[0];
+        zslDeleteNode(zsl, x, update);
+        dictDelete(dict,x->obj);
+        zslFreeNode(x);
+        removed++;
+        x = next;
+    }
+    return removed; /* not found */
+}
 
-        for (i = 0; i < zsl->level; i++) {
-            if (update[i]->forward[i] == x) {
-                if (i > 0) {
-                    update[i]->span[i-1] += x->span[i-1] - 1;
-                }
-                update[i]->forward[i] = x->forward[i];
-            } else {
-                /* invariant: i > 0, because update[0]->forward[0]
-                 * is always equal to x */
-                update[i]->span[i-1] -= 1;
-            }
-        }
-        if (x->forward[0]) {
-            x->forward[0]->backward = x->backward;
-        } else {
-            zsl->tail = x->backward;
+/* Delete all the elements with rank between start and end from the skiplist.
+ * Start and end are inclusive. Note that start and end need to be 1-based */
+static unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
+    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
+    unsigned long traversed = 0, removed = 0;
+    int i;
+
+    x = zsl->header;
+    for (i = zsl->level-1; i >= 0; i--) {
+        while (x->forward[i] && (traversed + (i > 0 ? x->span[i-1] : 1)) < start) {
+            traversed += i > 0 ? x->span[i-1] : 1;
+            x = x->forward[i];
         }
-        next = x->forward[0];
+        update[i] = x;
+    }
+
+    traversed++;
+    x = x->forward[0];
+    while (x && traversed <= end) {
+        zskiplistNode *next = x->forward[0];
+        zslDeleteNode(zsl, x, update);
         dictDelete(dict,x->obj);
         zslFreeNode(x);
-        while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
-            zsl->level--;
-        zsl->length--;
         removed++;
+        traversed++;
         x = next;
     }
-    return removed; /* not found */
+    return removed;
 }
 
 /* Find the first node having a score equal or greater than the specified one.
@@ -5323,13 +5415,54 @@ static void zremrangebyscoreCommand(redisClient *c) {
             return;
         }
         zs = zsetobj->ptr;
-        deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
+        deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict);
         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
         server.dirty += deleted;
         addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
     }
 }
 
+static void zremrangebyrankCommand(redisClient *c) {
+    int start = atoi(c->argv[2]->ptr);
+    int end = atoi(c->argv[3]->ptr);
+    robj *zsetobj;
+    zset *zs;
+
+    zsetobj = lookupKeyWrite(c->db,c->argv[1]);
+    if (zsetobj == NULL) {
+        addReply(c,shared.czero);
+    } else {
+        if (zsetobj->type != REDIS_ZSET) {
+            addReply(c,shared.wrongtypeerr);
+            return;
+        }
+
+        zs = zsetobj->ptr;
+        int llen = zs->zsl->length;
+        long deleted;
+
+        /* convert negative indexes */
+        if (start < 0) start = llen+start;
+        if (end < 0) end = llen+end;
+        if (start < 0) start = 0;
+        if (end < 0) end = 0;
+
+        /* indexes sanity checks */
+        if (start > end || start >= llen) {
+            addReply(c,shared.czero);
+            return;
+        }
+        if (end >= llen) end = llen-1;
+
+        /* increment start and end because zsl*Rank functions
+         * use 1-based rank */
+        deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
+        if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+        server.dirty += deleted;
+        addReplyLong(c, deleted);
+    }
+}
+
 typedef struct {
     dict *dict;
     double weight;
@@ -5365,7 +5498,7 @@ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
     }
 
     /* read keys to be used for input */
-    src = malloc(sizeof(zsetopsrc) * zsetnum);
+    src = zmalloc(sizeof(zsetopsrc) * zsetnum);
     for (i = 0, j = 3; i < zsetnum; i++, j++) {
         robj *zsetobj = lookupKeyWrite(c->db,c->argv[j]);
         if (!zsetobj) {
@@ -5541,9 +5674,9 @@ static void zrangeGenericCommand(redisClient *c, int reverse) {
             /* check if starting point is trivial, before searching
              * the element in log(N) time */
             if (reverse) {
-                ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen - start);
+                ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start);
             } else {
-                ln = start == 0 ? zsl->header->forward[0] : zslGetElementByRank(zsl, start + 1);
+                ln = start == 0 ? zsl->header->forward[0] : zslGetElementByRank(zsl, start+1);
             }
 
             /* Return the result in form of a multi-bulk reply */
@@ -5741,7 +5874,7 @@ static void zscoreCommand(redisClient *c) {
     }
 }
 
-static void zrankCommand(redisClient *c) {
+static void zrankGenericCommand(redisClient *c, int reverse) {
     robj *o;
     o = lookupKeyRead(c->db,c->argv[1]);
     if (o == NULL) {
@@ -5765,13 +5898,25 @@ static void zrankCommand(redisClient *c) {
         double *score = dictGetEntryVal(de);
         rank = zslGetRank(zsl, *score, c->argv[2]);
         if (rank) {
-            addReplyLong(c, rank-1);
+            if (reverse) {
+                addReplyLong(c, zsl->length - rank);
+            } else {
+                addReplyLong(c, rank-1);
+            }
         } else {
             addReply(c,shared.nullbulk);
         }
     }
 }
 
+static void zrankCommand(redisClient *c) {
+    zrankGenericCommand(c, 0);
+}
+
+static void zrevrankCommand(redisClient *c) {
+    zrankGenericCommand(c, 1);
+}
+
 /* =================================== Hashes =============================== */
 static void hsetCommand(redisClient *c) {
     int update = 0;
@@ -5787,13 +5932,38 @@ static void hsetCommand(redisClient *c) {
             return;
         }
     }
+    /* We want to convert the zipmap into an hash table right now if the
+     * entry to be added is too big. Note that we check if the object
+     * is integer encoded before to try fetching the length in the test below.
+     * This is because integers are small, but currently stringObjectLen()
+     * performs a slow conversion: not worth it. */
+    if (o->encoding == REDIS_ENCODING_ZIPMAP &&
+        ((c->argv[2]->encoding == REDIS_ENCODING_RAW &&
+          sdslen(c->argv[2]->ptr) > server.hash_max_zipmap_value) ||
+         (c->argv[3]->encoding == REDIS_ENCODING_RAW &&
+          sdslen(c->argv[3]->ptr) > server.hash_max_zipmap_value)))
+    {
+        convertToRealHash(o);
+    }
+
     if (o->encoding == REDIS_ENCODING_ZIPMAP) {
         unsigned char *zm = o->ptr;
+        robj *valobj = getDecodedObject(c->argv[3]);
 
         zm = zipmapSet(zm,c->argv[2]->ptr,sdslen(c->argv[2]->ptr),
-            c->argv[3]->ptr,sdslen(c->argv[3]->ptr),&update);
+            valobj->ptr,sdslen(valobj->ptr),&update);
+        decrRefCount(valobj);
         o->ptr = zm;
+
+        /* And here there is the second check for hash conversion...
+         * we want to do it only if the operation was not just an update as
+         * zipmapLen() is O(N). */
+        if (!update && zipmapLen(zm) > server.hash_max_zipmap_entries)
+            convertToRealHash(o);
     } else {
+        tryObjectEncoding(c->argv[2]);
+        /* note that c->argv[3] is already encoded, as the latest arg
+         * of a bulk command is always integer encoded if possible. */
         if (dictAdd(o->ptr,c->argv[2],c->argv[3]) == DICT_OK) {
             incrRefCount(c->argv[2]);
         } else {
@@ -5812,6 +5982,11 @@ static void hgetCommand(redisClient *c) {
         addReply(c,shared.nullbulk);
         return;
     } else {
+        if (o->type != REDIS_HASH) {
+            addReply(c,shared.wrongtypeerr);
+            return;
+        }
+
         if (o->encoding == REDIS_ENCODING_ZIPMAP) {
             unsigned char *zm = o->ptr;
             unsigned char *val;
@@ -5843,6 +6018,52 @@ static void hgetCommand(redisClient *c) {
     }
 }
 
+static void hdelCommand(redisClient *c) {
+    robj *o = lookupKeyRead(c->db,c->argv[1]);
+
+    if (o == NULL) {
+        addReply(c,shared.czero);
+        return;
+    } else {
+        int deleted = 0;
+
+        if (o->type != REDIS_HASH) {
+            addReply(c,shared.wrongtypeerr);
+            return;
+        }
+
+        if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+            o->ptr = zipmapDel((unsigned char*) o->ptr,
+                (unsigned char*) c->argv[2]->ptr,
+                sdslen(c->argv[2]->ptr), &deleted);
+        } else {
+            deleted = dictDelete((dict*)o->ptr,c->argv[2]) == DICT_OK;
+        }
+        addReply(c,deleted ? shared.cone : shared.czero);
+    }
+}
+
+static void convertToRealHash(robj *o) {
+    unsigned char *key, *val, *p, *zm = o->ptr;
+    unsigned int klen, vlen;
+    dict *dict = dictCreate(&hashDictType,NULL);
+
+    assert(o->type == REDIS_HASH && o->encoding != REDIS_ENCODING_HT);
+    p = zipmapRewind(zm);
+    while((p = zipmapNext(p,&key,&klen,&val,&vlen)) != NULL) {
+        robj *keyobj, *valobj;
+
+        keyobj = createStringObject((char*)key,klen);
+        valobj = createStringObject((char*)val,vlen);
+        tryObjectEncoding(keyobj);
+        tryObjectEncoding(valobj);
+        dictAdd(dict,keyobj,valobj);
+    }
+    o->encoding = REDIS_ENCODING_HT;
+    o->ptr = dict;
+    zfree(zm);
+}
+
 /* ========================= Non type-specific commands  ==================== */
 
 static void flushdbCommand(redisClient *c) {
@@ -6251,6 +6472,9 @@ static sds genRedisInfoString(void) {
     time_t uptime = time(NULL)-server.stat_starttime;
     int j;
     char hmem[64];
+
+    server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
+    server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
   
     bytesToHuman(hmem,zmalloc_used_memory());
     info = sdscatprintf(sdsempty(),
@@ -6271,6 +6495,8 @@ static sds genRedisInfoString(void) {
         "bgrewriteaof_in_progress:%d\r\n"
         "total_connections_received:%lld\r\n"
         "total_commands_processed:%lld\r\n"
+        "hash_max_zipmap_entries:%ld\r\n"
+        "hash_max_zipmap_value:%ld\r\n"
         "vm_enabled:%d\r\n"
         "role:%s\r\n"
         ,REDIS_VERSION,
@@ -6290,6 +6516,8 @@ static sds genRedisInfoString(void) {
         server.bgrewritechildpid != -1,
         server.stat_numconnections,
         server.stat_numcommands,
+        server.hash_max_zipmap_entries,
+        server.hash_max_zipmap_value,
         server.vm_enabled != 0,
         server.masterhost == NULL ? "master" : "slave"
     );
@@ -6985,7 +7213,7 @@ static int syncWithMaster(void) {
     char buf[1024], tmpfile[256], authcmd[1024];
     long dumpsize;
     int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
-    int dfd;
+    int dfd, maxtries = 5;
 
     if (fd == -1) {
         redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
@@ -7038,8 +7266,13 @@ static int syncWithMaster(void) {
     dumpsize = strtol(buf+1,NULL,10);
     redisLog(REDIS_NOTICE,"Receiving %ld bytes data dump from MASTER",dumpsize);
     /* Read the bulk write data on a temp file */
-    snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
-    dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
+    while(maxtries--) {
+        snprintf(tmpfile,256,
+            "temp-%d.%ld.rdb",(int)time(NULL),(long int)getpid());
+        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
+        if (dfd != -1) break;
+        sleep(1);
+    }
     if (dfd == -1) {
         close(fd);
         redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
@@ -7379,7 +7612,7 @@ fmterr:
 }
 
 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
-static int fwriteBulk(FILE *fp, robj *obj) {
+static int fwriteBulkObject(FILE *fp, robj *obj) {
     char buf[128];
     int decrrc = 0;
 
@@ -7404,6 +7637,18 @@ err:
     return 0;
 }
 
+/* Write binary-safe string into a file in the bulkformat
+ * $<count>\r\n<payload>\r\n */
+static int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
+    char buf[128];
+
+    snprintf(buf,sizeof(buf),"$%ld\r\n",(unsigned long)len);
+    if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
+    if (len && fwrite(s,len,1,fp) == 0) return 0;
+    if (fwrite("\r\n",2,1,fp) == 0) return 0;
+    return 1;
+}
+
 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
 static int fwriteBulkDouble(FILE *fp, double d) {
     char buf[128], dbuf[128];
@@ -7486,8 +7731,8 @@ static int rewriteAppendOnlyFile(char *filename) {
                 char cmd[]="*3\r\n$3\r\nSET\r\n";
                 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
                 /* Key and value */
-                if (fwriteBulk(fp,key) == 0) goto werr;
-                if (fwriteBulk(fp,o) == 0) goto werr;
+                if (fwriteBulkObject(fp,key) == 0) goto werr;
+                if (fwriteBulkObject(fp,o) == 0) goto werr;
             } else if (o->type == REDIS_LIST) {
                 /* Emit the RPUSHes needed to rebuild the list */
                 list *list = o->ptr;
@@ -7500,8 +7745,8 @@ static int rewriteAppendOnlyFile(char *filename) {
                     robj *eleobj = listNodeValue(ln);
 
                     if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                    if (fwriteBulk(fp,key) == 0) goto werr;
-                    if (fwriteBulk(fp,eleobj) == 0) goto werr;
+                    if (fwriteBulkObject(fp,key) == 0) goto werr;
+                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
                 }
             } else if (o->type == REDIS_SET) {
                 /* Emit the SADDs needed to rebuild the set */
@@ -7514,8 +7759,8 @@ static int rewriteAppendOnlyFile(char *filename) {
                     robj *eleobj = dictGetEntryKey(de);
 
                     if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                    if (fwriteBulk(fp,key) == 0) goto werr;
-                    if (fwriteBulk(fp,eleobj) == 0) goto werr;
+                    if (fwriteBulkObject(fp,key) == 0) goto werr;
+                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
                 }
                 dictReleaseIterator(di);
             } else if (o->type == REDIS_ZSET) {
@@ -7530,11 +7775,43 @@ static int rewriteAppendOnlyFile(char *filename) {
                     double *score = dictGetEntryVal(de);
 
                     if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                    if (fwriteBulk(fp,key) == 0) goto werr;
+                    if (fwriteBulkObject(fp,key) == 0) goto werr;
                     if (fwriteBulkDouble(fp,*score) == 0) goto werr;
-                    if (fwriteBulk(fp,eleobj) == 0) goto werr;
+                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
                 }
                 dictReleaseIterator(di);
+            } else if (o->type == REDIS_HASH) {
+                char cmd[]="*4\r\n$4\r\nHSET\r\n";
+
+                /* Emit the HSETs needed to rebuild the hash */
+                if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+                    unsigned char *p = zipmapRewind(o->ptr);
+                    unsigned char *field, *val;
+                    unsigned int flen, vlen;
+
+                    while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,key) == 0) goto werr;
+                        if (fwriteBulkString(fp,(char*)field,flen) == -1)
+                            return -1;
+                        if (fwriteBulkString(fp,(char*)val,vlen) == -1)
+                            return -1;
+                    }
+                } else {
+                    dictIterator *di = dictGetIterator(o->ptr);
+                    dictEntry *de;
+
+                    while((de = dictNext(di)) != NULL) {
+                        robj *field = dictGetEntryKey(de);
+                        robj *val = dictGetEntryVal(de);
+
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,key) == 0) goto werr;
+                        if (fwriteBulkObject(fp,field) == -1) return -1;
+                        if (fwriteBulkObject(fp,val) == -1) return -1;
+                    }
+                    dictReleaseIterator(di);
+                }
             } else {
                 redisAssert(0 != 0);
             }
@@ -7544,7 +7821,7 @@ static int rewriteAppendOnlyFile(char *filename) {
                 /* If this key is already expired skip it */
                 if (expiretime < now) continue;
                 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                if (fwriteBulk(fp,key) == 0) goto werr;
+                if (fwriteBulkObject(fp,key) == 0) goto werr;
                 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
             }
             if (swapped) decrRefCount(o);
@@ -8713,11 +8990,20 @@ static void debugCommand(redisClient *c) {
         val = dictGetEntryVal(de);
         if (!server.vm_enabled || (key->storage == REDIS_VM_MEMORY ||
                                    key->storage == REDIS_VM_SWAPPING)) {
+            char *strenc;
+            char buf[128];
+
+            if (val->encoding < (sizeof(strencoding)/sizeof(char*))) {
+                strenc = strencoding[val->encoding];
+            } else {
+                snprintf(buf,64,"unknown encoding %d\n", val->encoding);
+                strenc = buf;
+            }
             addReplySds(c,sdscatprintf(sdsempty(),
                 "+Key at:%p refcount:%d, value at:%p refcount:%d "
-                "encoding:%d serializedlength:%lld\r\n",
+                "encoding:%s serializedlength:%lld\r\n",
                 (void*)key, key->refcount, (void*)val, val->refcount,
-                val->encoding, (long long) rdbSavedObjectLen(val,NULL)));
+                strenc, (long long) rdbSavedObjectLen(val,NULL)));
         } else {
             addReplySds(c,sdscatprintf(sdsempty(),
                 "+Key at:%p refcount:%d, value swapped at: page %llu "