]> git.saurik.com Git - redis.git/blobdiff - redis.c
zipmap to hash conversion in HSET
[redis.git] / redis.c
diff --git a/redis.c b/redis.c
index b63121a0f7bdf29b5d35a2a47e8df6548c45d02d..b45b86184a51b46afed6a4b19d6803ceff7359c7 100644 (file)
--- a/redis.c
+++ b/redis.c
 #define REDIS_ENCODING_ZIPMAP 2 /* Encoded as zipmap */
 #define REDIS_ENCODING_HT 3     /* Encoded as an hash table */
 
 #define REDIS_ENCODING_ZIPMAP 2 /* Encoded as zipmap */
 #define REDIS_ENCODING_HT 3     /* Encoded as an hash table */
 
+static char* strencoding[] = {
+    "raw", "int", "zipmap", "hashtable"
+};
+
 /* Object types only used for dumping to disk */
 #define REDIS_EXPIRETIME 253
 #define REDIS_SELECTDB 254
 /* Object types only used for dumping to disk */
 #define REDIS_EXPIRETIME 253
 #define REDIS_SELECTDB 254
@@ -590,6 +594,7 @@ static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mas
 static struct redisCommand *lookupCommand(char *name);
 static void call(redisClient *c, struct redisCommand *cmd);
 static void resetClient(redisClient *c);
 static struct redisCommand *lookupCommand(char *name);
 static void call(redisClient *c, struct redisCommand *cmd);
 static void resetClient(redisClient *c);
+static void convertToRealHash(robj *o);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
 
 static void authCommand(redisClient *c);
 static void pingCommand(redisClient *c);
@@ -673,8 +678,13 @@ static void brpopCommand(redisClient *c);
 static void appendCommand(redisClient *c);
 static void substrCommand(redisClient *c);
 static void zrankCommand(redisClient *c);
 static void appendCommand(redisClient *c);
 static void substrCommand(redisClient *c);
 static void zrankCommand(redisClient *c);
+static void zrevrankCommand(redisClient *c);
 static void hsetCommand(redisClient *c);
 static void hgetCommand(redisClient *c);
 static void hsetCommand(redisClient *c);
 static void hgetCommand(redisClient *c);
+static void hdelCommand(redisClient *c);
+static void zremrangebyrankCommand(redisClient *c);
+static void zunionCommand(redisClient *c);
+static void zinterCommand(redisClient *c);
 
 /*================================= Globals ================================= */
 
 
 /*================================= Globals ================================= */
 
@@ -722,6 +732,9 @@ static struct redisCommand cmdTable[] = {
     {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
     {"zrem",zremCommand,3,REDIS_CMD_BULK,1,1,1},
     {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,1,1,1},
     {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
     {"zrem",zremCommand,3,REDIS_CMD_BULK,1,1,1},
     {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,1,1,1},
+    {"zremrangebyrank",zremrangebyrankCommand,4,REDIS_CMD_INLINE,1,1,1},
+    {"zunion",zunionCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,0,0,0},
+    {"zinter",zinterCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,0,0,0},
     {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,1,1,1},
     {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,1,1,1},
     {"zcount",zcountCommand,4,REDIS_CMD_INLINE,1,1,1},
     {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,1,1,1},
     {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,1,1,1},
     {"zcount",zcountCommand,4,REDIS_CMD_INLINE,1,1,1},
@@ -729,8 +742,10 @@ static struct redisCommand cmdTable[] = {
     {"zcard",zcardCommand,2,REDIS_CMD_INLINE,1,1,1},
     {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
     {"zrank",zrankCommand,3,REDIS_CMD_INLINE,1,1,1},
     {"zcard",zcardCommand,2,REDIS_CMD_INLINE,1,1,1},
     {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
     {"zrank",zrankCommand,3,REDIS_CMD_INLINE,1,1,1},
+    {"zrevrank",zrevrankCommand,3,REDIS_CMD_INLINE,1,1,1},
     {"hset",hsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
     {"hget",hgetCommand,3,REDIS_CMD_BULK,1,1,1},
     {"hset",hsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
     {"hget",hgetCommand,3,REDIS_CMD_BULK,1,1,1},
+    {"hdel",hdelCommand,3,REDIS_CMD_BULK,1,1,1},
     {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
     {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
     {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
     {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
     {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
     {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
@@ -3442,6 +3457,7 @@ static int rdbLoadDoubleValue(FILE *fp, double *val) {
 static robj *rdbLoadObject(int type, FILE *fp) {
     robj *o;
 
 static robj *rdbLoadObject(int type, FILE *fp) {
     robj *o;
 
+    redisLog(REDIS_DEBUG,"LOADING OBJECT %d (at %d)\n",type,ftell(fp));
     if (type == REDIS_STRING) {
         /* Read string value */
         if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
     if (type == REDIS_STRING) {
         /* Read string value */
         if ((o = rdbLoadStringObject(fp)) == NULL) return NULL;
@@ -3470,7 +3486,7 @@ static robj *rdbLoadObject(int type, FILE *fp) {
         }
     } else if (type == REDIS_ZSET) {
         /* Read list/set value */
         }
     } else if (type == REDIS_ZSET) {
         /* Read list/set value */
-        uint32_t zsetlen;
+        size_t zsetlen;
         zset *zs;
 
         if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
         zset *zs;
 
         if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
@@ -3488,6 +3504,46 @@ static robj *rdbLoadObject(int type, FILE *fp) {
             zslInsert(zs->zsl,*score,ele);
             incrRefCount(ele); /* added to skiplist */
         }
             zslInsert(zs->zsl,*score,ele);
             incrRefCount(ele); /* added to skiplist */
         }
+    } else if (type == REDIS_HASH) {
+        size_t hashlen;
+
+        if ((hashlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
+        o = createHashObject();
+        /* Too many entries? Use an hash table. */
+        if (hashlen > server.hash_max_zipmap_entries)
+            convertToRealHash(o);
+        /* Load every key/value, then set it into the zipmap or hash
+         * table, as needed. */
+        while(hashlen--) {
+            robj *key, *val;
+
+            if ((key = rdbLoadStringObject(fp)) == NULL) return NULL;
+            if ((val = rdbLoadStringObject(fp)) == NULL) return NULL;
+            /* If we are using a zipmap and there are too big values
+             * the object is converted to real hash table encoding. */
+            if (o->encoding != REDIS_ENCODING_HT &&
+               (sdslen(key->ptr) > server.hash_max_zipmap_value ||
+                sdslen(val->ptr) > server.hash_max_zipmap_value))
+            {
+                    convertToRealHash(o);
+            }
+
+            if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+                unsigned char *zm = o->ptr;
+
+                zm = zipmapSet(zm,key->ptr,sdslen(key->ptr),
+                                  val->ptr,sdslen(val->ptr),NULL);
+                o->ptr = zm;
+                decrRefCount(key);
+                decrRefCount(val);
+            } else {
+                tryObjectEncoding(key);
+                tryObjectEncoding(val);
+                dictAdd((dict*)o->ptr,key,val);
+                incrRefCount(key);
+                incrRefCount(val);
+            }
+        }
     } else {
         redisAssert(0 != 0);
     }
     } else {
         redisAssert(0 != 0);
     }
@@ -3879,7 +3935,7 @@ static void substrCommand(redisClient *c) {
             rangelen = (end-start)+1;
 
             /* Return the result */
             rangelen = (end-start)+1;
 
             /* Return the result */
-            addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",rangelen));
+            addReplySds(c,sdscatprintf(sdsempty(),"$%zu\r\n",rangelen));
             range = sdsnewlen((char*)o->ptr+start,rangelen);
             addReplySds(c,range);
             addReply(c,shared.crlf);
             range = sdsnewlen((char*)o->ptr+start,rangelen);
             addReplySds(c,range);
             addReply(c,shared.crlf);
@@ -3995,7 +4051,8 @@ static void typeCommand(redisClient *c) {
         case REDIS_LIST: type = "+list"; break;
         case REDIS_SET: type = "+set"; break;
         case REDIS_ZSET: type = "+zset"; break;
         case REDIS_LIST: type = "+list"; break;
         case REDIS_SET: type = "+set"; break;
         case REDIS_ZSET: type = "+zset"; break;
-        default: type = "unknown"; break;
+        case REDIS_HASH: type = "+hash"; break;
+        default: type = "+unknown"; break;
         }
     }
     addReplySds(c,sdsnew(type));
         }
     }
     addReplySds(c,sdsnew(type));
@@ -4791,6 +4848,7 @@ static void sinterstoreCommand(redisClient *c) {
 
 #define REDIS_OP_UNION 0
 #define REDIS_OP_DIFF 1
 
 #define REDIS_OP_UNION 0
 #define REDIS_OP_DIFF 1
+#define REDIS_OP_INTER 2
 
 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
     dict **dv = zmalloc(sizeof(dict*)*setsnum);
 
 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
     dict **dv = zmalloc(sizeof(dict*)*setsnum);
@@ -5034,6 +5092,31 @@ static void zslInsert(zskiplist *zsl, double score, robj *obj) {
     zsl->length++;
 }
 
     zsl->length++;
 }
 
+/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
+void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
+    int i;
+    for (i = 0; i < zsl->level; i++) {
+        if (update[i]->forward[i] == x) {
+            if (i > 0) {
+                update[i]->span[i-1] += x->span[i-1] - 1;
+            }
+            update[i]->forward[i] = x->forward[i];
+        } else {
+            /* invariant: i > 0, because update[0]->forward[0]
+             * is always equal to x */
+            update[i]->span[i-1] -= 1;
+        }
+    }
+    if (x->forward[0]) {
+        x->forward[0]->backward = x->backward;
+    } else {
+        zsl->tail = x->backward;
+    }
+    while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
+        zsl->level--;
+    zsl->length--;
+}
+
 /* Delete an element with matching score/object from the skiplist. */
 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
 /* Delete an element with matching score/object from the skiplist. */
 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
@@ -5052,27 +5135,8 @@ static int zslDelete(zskiplist *zsl, double score, robj *obj) {
      * is to find the element with both the right score and object. */
     x = x->forward[0];
     if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
      * is to find the element with both the right score and object. */
     x = x->forward[0];
     if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
-        for (i = 0; i < zsl->level; i++) {
-            if (update[i]->forward[i] == x) {
-                if (i > 0) {
-                    update[i]->span[i-1] += x->span[i-1] - 1;
-                }
-                update[i]->forward[i] = x->forward[i];
-            } else {
-                /* invariant: i > 0, because update[0]->forward[0]
-                 * is always equal to x */
-                update[i]->span[i-1] -= 1;
-            }
-        }
-        if (x->forward[0]) {
-            x->forward[0]->backward = x->backward;
-        } else {
-            zsl->tail = x->backward;
-        }
+        zslDeleteNode(zsl, x, update);
         zslFreeNode(x);
         zslFreeNode(x);
-        while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
-            zsl->level--;
-        zsl->length--;
         return 1;
     } else {
         return 0; /* not found */
         return 1;
     } else {
         return 0; /* not found */
@@ -5084,7 +5148,7 @@ static int zslDelete(zskiplist *zsl, double score, robj *obj) {
  * Min and mx are inclusive, so a score >= min || score <= max is deleted.
  * Note that this function takes the reference to the hash table view of the
  * sorted set, in order to remove the elements from the hash table too. */
  * Min and mx are inclusive, so a score >= min || score <= max is deleted.
  * Note that this function takes the reference to the hash table view of the
  * sorted set, in order to remove the elements from the hash table too. */
-static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
+static unsigned long zslDeleteRangeByScore(zskiplist *zsl, double min, double max, dict *dict) {
     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
     unsigned long removed = 0;
     int i;
     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
     unsigned long removed = 0;
     int i;
@@ -5099,35 +5163,44 @@ static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict
      * is to find the element with both the right score and object. */
     x = x->forward[0];
     while (x && x->score <= max) {
      * is to find the element with both the right score and object. */
     x = x->forward[0];
     while (x && x->score <= max) {
-        zskiplistNode *next;
+        zskiplistNode *next = x->forward[0];
+        zslDeleteNode(zsl, x, update);
+        dictDelete(dict,x->obj);
+        zslFreeNode(x);
+        removed++;
+        x = next;
+    }
+    return removed; /* not found */
+}
 
 
-        for (i = 0; i < zsl->level; i++) {
-            if (update[i]->forward[i] == x) {
-                if (i > 0) {
-                    update[i]->span[i-1] += x->span[i-1] - 1;
-                }
-                update[i]->forward[i] = x->forward[i];
-            } else {
-                /* invariant: i > 0, because update[0]->forward[0]
-                 * is always equal to x */
-                update[i]->span[i-1] -= 1;
-            }
-        }
-        if (x->forward[0]) {
-            x->forward[0]->backward = x->backward;
-        } else {
-            zsl->tail = x->backward;
+/* Delete all the elements with rank between start and end from the skiplist.
+ * Start and end are inclusive. Note that start and end need to be 1-based */
+static unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
+    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
+    unsigned long traversed = 0, removed = 0;
+    int i;
+
+    x = zsl->header;
+    for (i = zsl->level-1; i >= 0; i--) {
+        while (x->forward[i] && (traversed + (i > 0 ? x->span[i-1] : 1)) < start) {
+            traversed += i > 0 ? x->span[i-1] : 1;
+            x = x->forward[i];
         }
         }
-        next = x->forward[0];
+        update[i] = x;
+    }
+
+    traversed++;
+    x = x->forward[0];
+    while (x && traversed <= end) {
+        zskiplistNode *next = x->forward[0];
+        zslDeleteNode(zsl, x, update);
         dictDelete(dict,x->obj);
         zslFreeNode(x);
         dictDelete(dict,x->obj);
         zslFreeNode(x);
-        while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
-            zsl->level--;
-        zsl->length--;
         removed++;
         removed++;
+        traversed++;
         x = next;
     }
         x = next;
     }
-    return removed; /* not found */
+    return removed;
 }
 
 /* Find the first node having a score equal or greater than the specified one.
 }
 
 /* Find the first node having a score equal or greater than the specified one.
@@ -5342,13 +5415,219 @@ static void zremrangebyscoreCommand(redisClient *c) {
             return;
         }
         zs = zsetobj->ptr;
             return;
         }
         zs = zsetobj->ptr;
-        deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
+        deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict);
         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
         server.dirty += deleted;
         addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
     }
 }
 
         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
         server.dirty += deleted;
         addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
     }
 }
 
+static void zremrangebyrankCommand(redisClient *c) {
+    int start = atoi(c->argv[2]->ptr);
+    int end = atoi(c->argv[3]->ptr);
+    robj *zsetobj;
+    zset *zs;
+
+    zsetobj = lookupKeyWrite(c->db,c->argv[1]);
+    if (zsetobj == NULL) {
+        addReply(c,shared.czero);
+    } else {
+        if (zsetobj->type != REDIS_ZSET) {
+            addReply(c,shared.wrongtypeerr);
+            return;
+        }
+
+        zs = zsetobj->ptr;
+        int llen = zs->zsl->length;
+        long deleted;
+
+        /* convert negative indexes */
+        if (start < 0) start = llen+start;
+        if (end < 0) end = llen+end;
+        if (start < 0) start = 0;
+        if (end < 0) end = 0;
+
+        /* indexes sanity checks */
+        if (start > end || start >= llen) {
+            addReply(c,shared.czero);
+            return;
+        }
+        if (end >= llen) end = llen-1;
+
+        /* increment start and end because zsl*Rank functions
+         * use 1-based rank */
+        deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
+        if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+        server.dirty += deleted;
+        addReplyLong(c, deleted);
+    }
+}
+
+typedef struct {
+    dict *dict;
+    double weight;
+} zsetopsrc;
+
+static int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) {
+    zsetopsrc *d1 = (void*) s1, *d2 = (void*) s2;
+    unsigned long size1, size2;
+    size1 = d1->dict ? dictSize(d1->dict) : 0;
+    size2 = d2->dict ? dictSize(d2->dict) : 0;
+    return size1 - size2;
+}
+
+static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
+    int i, j, zsetnum;
+    zsetopsrc *src;
+    robj *dstobj;
+    zset *dstzset;
+    dictIterator *di;
+    dictEntry *de;
+
+    /* expect zsetnum input keys to be given */
+    zsetnum = atoi(c->argv[2]->ptr);
+    if (zsetnum < 1) {
+        addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNION/ZINTER\r\n"));
+        return;
+    }
+
+    /* test if the expected number of keys would overflow */
+    if (3+zsetnum > c->argc) {
+        addReply(c,shared.syntaxerr);
+        return;
+    }
+
+    /* read keys to be used for input */
+    src = zmalloc(sizeof(zsetopsrc) * zsetnum);
+    for (i = 0, j = 3; i < zsetnum; i++, j++) {
+        robj *zsetobj = lookupKeyWrite(c->db,c->argv[j]);
+        if (!zsetobj) {
+            src[i].dict = NULL;
+        } else {
+            if (zsetobj->type != REDIS_ZSET) {
+                zfree(src);
+                addReply(c,shared.wrongtypeerr);
+                return;
+            }
+            src[i].dict = ((zset*)zsetobj->ptr)->dict;
+        }
+
+        /* default all weights to 1 */
+        src[i].weight = 1.0;
+    }
+
+    /* parse optional extra arguments */
+    if (j < c->argc) {
+        int remaining = c->argc-j;
+
+        while (remaining) {
+            if (!strcasecmp(c->argv[j]->ptr,"weights")) {
+                j++; remaining--;
+                if (remaining < zsetnum) {
+                    zfree(src);
+                    addReplySds(c,sdsnew("-ERR not enough weights for ZUNION/ZINTER\r\n"));
+                    return;
+                }
+                for (i = 0; i < zsetnum; i++, j++, remaining--) {
+                    src[i].weight = strtod(c->argv[j]->ptr, NULL);
+                }
+            } else {
+                zfree(src);
+                addReply(c,shared.syntaxerr);
+                return;
+            }
+        }
+    }
+
+    dstobj = createZsetObject();
+    dstzset = dstobj->ptr;
+
+    if (op == REDIS_OP_INTER) {
+        /* sort sets from the smallest to largest, this will improve our
+         * algorithm's performance */
+        qsort(src,zsetnum,sizeof(zsetopsrc), qsortCompareZsetopsrcByCardinality);
+
+        /* skip going over all entries if the smallest zset is NULL or empty */
+        if (src[0].dict && dictSize(src[0].dict) > 0) {
+            /* precondition: as src[0].dict is non-empty and the zsets are ordered
+             * from small to large, all src[i > 0].dict are non-empty too */
+            di = dictGetIterator(src[0].dict);
+            while((de = dictNext(di)) != NULL) {
+                double *score = zmalloc(sizeof(double));
+                *score = 0.0;
+
+                for (j = 0; j < zsetnum; j++) {
+                    dictEntry *other = (j == 0) ? de : dictFind(src[j].dict,dictGetEntryKey(de));
+                    if (other) {
+                        *score = *score + src[j].weight * (*(double*)dictGetEntryVal(other));
+                    } else {
+                        break;
+                    }
+                }
+
+                /* skip entry when not present in every source dict */
+                if (j != zsetnum) {
+                    zfree(score);
+                } else {
+                    robj *o = dictGetEntryKey(de);
+                    dictAdd(dstzset->dict,o,score);
+                    incrRefCount(o); /* added to dictionary */
+                    zslInsert(dstzset->zsl,*score,o);
+                    incrRefCount(o); /* added to skiplist */
+                }
+            }
+            dictReleaseIterator(di);
+        }
+    } else if (op == REDIS_OP_UNION) {
+        for (i = 0; i < zsetnum; i++) {
+            if (!src[i].dict) continue;
+
+            di = dictGetIterator(src[i].dict);
+            while((de = dictNext(di)) != NULL) {
+                /* skip key when already processed */
+                if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue;
+
+                double *score = zmalloc(sizeof(double));
+                *score = 0.0;
+                for (j = 0; j < zsetnum; j++) {
+                    if (!src[j].dict) continue;
+
+                    dictEntry *other = (i == j) ? de : dictFind(src[j].dict,dictGetEntryKey(de));
+                    if (other) {
+                        *score = *score + src[j].weight * (*(double*)dictGetEntryVal(other));
+                    }
+                }
+
+                robj *o = dictGetEntryKey(de);
+                dictAdd(dstzset->dict,o,score);
+                incrRefCount(o); /* added to dictionary */
+                zslInsert(dstzset->zsl,*score,o);
+                incrRefCount(o); /* added to skiplist */
+            }
+            dictReleaseIterator(di);
+        }
+    } else {
+        /* unknown operator */
+        redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION);
+    }
+
+    deleteKey(c->db,dstkey);
+    dictAdd(c->db->dict,dstkey,dstobj);
+    incrRefCount(dstkey);
+
+    addReplyLong(c, dstzset->zsl->length);
+    server.dirty++;
+    zfree(src);
+}
+
+static void zunionCommand(redisClient *c) {
+    zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
+}
+
+static void zinterCommand(redisClient *c) {
+    zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
+}
+
 static void zrangeGenericCommand(redisClient *c, int reverse) {
     robj *o;
     int start = atoi(c->argv[2]->ptr);
 static void zrangeGenericCommand(redisClient *c, int reverse) {
     robj *o;
     int start = atoi(c->argv[2]->ptr);
@@ -5395,9 +5674,9 @@ static void zrangeGenericCommand(redisClient *c, int reverse) {
             /* check if starting point is trivial, before searching
              * the element in log(N) time */
             if (reverse) {
             /* check if starting point is trivial, before searching
              * the element in log(N) time */
             if (reverse) {
-                ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen - start);
+                ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start);
             } else {
             } else {
-                ln = start == 0 ? zsl->header->forward[0] : zslGetElementByRank(zsl, start + 1);
+                ln = start == 0 ? zsl->header->forward[0] : zslGetElementByRank(zsl, start+1);
             }
 
             /* Return the result in form of a multi-bulk reply */
             }
 
             /* Return the result in form of a multi-bulk reply */
@@ -5595,7 +5874,7 @@ static void zscoreCommand(redisClient *c) {
     }
 }
 
     }
 }
 
-static void zrankCommand(redisClient *c) {
+static void zrankGenericCommand(redisClient *c, int reverse) {
     robj *o;
     o = lookupKeyRead(c->db,c->argv[1]);
     if (o == NULL) {
     robj *o;
     o = lookupKeyRead(c->db,c->argv[1]);
     if (o == NULL) {
@@ -5619,13 +5898,25 @@ static void zrankCommand(redisClient *c) {
         double *score = dictGetEntryVal(de);
         rank = zslGetRank(zsl, *score, c->argv[2]);
         if (rank) {
         double *score = dictGetEntryVal(de);
         rank = zslGetRank(zsl, *score, c->argv[2]);
         if (rank) {
-            addReplyLong(c, rank-1);
+            if (reverse) {
+                addReplyLong(c, zsl->length - rank);
+            } else {
+                addReplyLong(c, rank-1);
+            }
         } else {
             addReply(c,shared.nullbulk);
         }
     }
 }
 
         } else {
             addReply(c,shared.nullbulk);
         }
     }
 }
 
+static void zrankCommand(redisClient *c) {
+    zrankGenericCommand(c, 0);
+}
+
+static void zrevrankCommand(redisClient *c) {
+    zrankGenericCommand(c, 1);
+}
+
 /* =================================== Hashes =============================== */
 static void hsetCommand(redisClient *c) {
     int update = 0;
 /* =================================== Hashes =============================== */
 static void hsetCommand(redisClient *c) {
     int update = 0;
@@ -5641,6 +5932,20 @@ static void hsetCommand(redisClient *c) {
             return;
         }
     }
             return;
         }
     }
+    /* We want to convert the zipmap into an hash table right now if the
+     * entry to be added is too big. Note that we check if the object
+     * is integer encoded before to try fetching the length in the test below.
+     * This is because integers are small, but currently stringObjectLen()
+     * performs a slow conversion: not worth it. */
+    if (o->encoding == REDIS_ENCODING_ZIPMAP &&
+        ((c->argv[2]->encoding == REDIS_ENCODING_RAW &&
+          sdslen(c->argv[2]->ptr) > server.hash_max_zipmap_value) ||
+         (c->argv[3]->encoding == REDIS_ENCODING_RAW &&
+          sdslen(c->argv[3]->ptr) > server.hash_max_zipmap_value)))
+    {
+        convertToRealHash(o);
+    }
+
     if (o->encoding == REDIS_ENCODING_ZIPMAP) {
         unsigned char *zm = o->ptr;
         robj *valobj = getDecodedObject(c->argv[3]);
     if (o->encoding == REDIS_ENCODING_ZIPMAP) {
         unsigned char *zm = o->ptr;
         robj *valobj = getDecodedObject(c->argv[3]);
@@ -5649,7 +5954,16 @@ static void hsetCommand(redisClient *c) {
             valobj->ptr,sdslen(valobj->ptr),&update);
         decrRefCount(valobj);
         o->ptr = zm;
             valobj->ptr,sdslen(valobj->ptr),&update);
         decrRefCount(valobj);
         o->ptr = zm;
+
+        /* And here there is the second check for hash conversion...
+         * we want to do it only if the operation was not just an update as
+         * zipmapLen() is O(N). */
+        if (!update && zipmapLen(zm) > server.hash_max_zipmap_entries)
+            convertToRealHash(o);
     } else {
     } else {
+        tryObjectEncoding(c->argv[2]);
+        /* note that c->argv[3] is already encoded, as the latest arg
+         * of a bulk command is always integer encoded if possible. */
         if (dictAdd(o->ptr,c->argv[2],c->argv[3]) == DICT_OK) {
             incrRefCount(c->argv[2]);
         } else {
         if (dictAdd(o->ptr,c->argv[2],c->argv[3]) == DICT_OK) {
             incrRefCount(c->argv[2]);
         } else {
@@ -5668,6 +5982,11 @@ static void hgetCommand(redisClient *c) {
         addReply(c,shared.nullbulk);
         return;
     } else {
         addReply(c,shared.nullbulk);
         return;
     } else {
+        if (o->type != REDIS_HASH) {
+            addReply(c,shared.wrongtypeerr);
+            return;
+        }
+
         if (o->encoding == REDIS_ENCODING_ZIPMAP) {
             unsigned char *zm = o->ptr;
             unsigned char *val;
         if (o->encoding == REDIS_ENCODING_ZIPMAP) {
             unsigned char *zm = o->ptr;
             unsigned char *val;
@@ -5699,6 +6018,52 @@ static void hgetCommand(redisClient *c) {
     }
 }
 
     }
 }
 
+static void hdelCommand(redisClient *c) {
+    robj *o = lookupKeyRead(c->db,c->argv[1]);
+
+    if (o == NULL) {
+        addReply(c,shared.czero);
+        return;
+    } else {
+        int deleted = 0;
+
+        if (o->type != REDIS_HASH) {
+            addReply(c,shared.wrongtypeerr);
+            return;
+        }
+
+        if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+            o->ptr = zipmapDel((unsigned char*) o->ptr,
+                (unsigned char*) c->argv[2]->ptr,
+                sdslen(c->argv[2]->ptr), &deleted);
+        } else {
+            deleted = dictDelete((dict*)o->ptr,c->argv[2]) == DICT_OK;
+        }
+        addReply(c,deleted ? shared.cone : shared.czero);
+    }
+}
+
+static void convertToRealHash(robj *o) {
+    unsigned char *key, *val, *p, *zm = o->ptr;
+    unsigned int klen, vlen;
+    dict *dict = dictCreate(&hashDictType,NULL);
+
+    assert(o->type == REDIS_HASH && o->encoding != REDIS_ENCODING_HT);
+    p = zipmapRewind(zm);
+    while((p = zipmapNext(p,&key,&klen,&val,&vlen)) != NULL) {
+        robj *keyobj, *valobj;
+
+        keyobj = createStringObject((char*)key,klen);
+        valobj = createStringObject((char*)val,vlen);
+        tryObjectEncoding(keyobj);
+        tryObjectEncoding(valobj);
+        dictAdd(dict,keyobj,valobj);
+    }
+    o->encoding = REDIS_ENCODING_HT;
+    o->ptr = dict;
+    zfree(zm);
+}
+
 /* ========================= Non type-specific commands  ==================== */
 
 static void flushdbCommand(redisClient *c) {
 /* ========================= Non type-specific commands  ==================== */
 
 static void flushdbCommand(redisClient *c) {
@@ -6107,6 +6472,9 @@ static sds genRedisInfoString(void) {
     time_t uptime = time(NULL)-server.stat_starttime;
     int j;
     char hmem[64];
     time_t uptime = time(NULL)-server.stat_starttime;
     int j;
     char hmem[64];
+
+    server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
+    server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
   
     bytesToHuman(hmem,zmalloc_used_memory());
     info = sdscatprintf(sdsempty(),
   
     bytesToHuman(hmem,zmalloc_used_memory());
     info = sdscatprintf(sdsempty(),
@@ -6127,6 +6495,8 @@ static sds genRedisInfoString(void) {
         "bgrewriteaof_in_progress:%d\r\n"
         "total_connections_received:%lld\r\n"
         "total_commands_processed:%lld\r\n"
         "bgrewriteaof_in_progress:%d\r\n"
         "total_connections_received:%lld\r\n"
         "total_commands_processed:%lld\r\n"
+        "hash_max_zipmap_entries:%ld\r\n"
+        "hash_max_zipmap_value:%ld\r\n"
         "vm_enabled:%d\r\n"
         "role:%s\r\n"
         ,REDIS_VERSION,
         "vm_enabled:%d\r\n"
         "role:%s\r\n"
         ,REDIS_VERSION,
@@ -6146,6 +6516,8 @@ static sds genRedisInfoString(void) {
         server.bgrewritechildpid != -1,
         server.stat_numconnections,
         server.stat_numcommands,
         server.bgrewritechildpid != -1,
         server.stat_numconnections,
         server.stat_numcommands,
+        server.hash_max_zipmap_entries,
+        server.hash_max_zipmap_value,
         server.vm_enabled != 0,
         server.masterhost == NULL ? "master" : "slave"
     );
         server.vm_enabled != 0,
         server.masterhost == NULL ? "master" : "slave"
     );
@@ -6841,7 +7213,7 @@ static int syncWithMaster(void) {
     char buf[1024], tmpfile[256], authcmd[1024];
     long dumpsize;
     int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
     char buf[1024], tmpfile[256], authcmd[1024];
     long dumpsize;
     int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
-    int dfd;
+    int dfd, maxtries = 5;
 
     if (fd == -1) {
         redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
 
     if (fd == -1) {
         redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
@@ -6894,8 +7266,13 @@ static int syncWithMaster(void) {
     dumpsize = strtol(buf+1,NULL,10);
     redisLog(REDIS_NOTICE,"Receiving %ld bytes data dump from MASTER",dumpsize);
     /* Read the bulk write data on a temp file */
     dumpsize = strtol(buf+1,NULL,10);
     redisLog(REDIS_NOTICE,"Receiving %ld bytes data dump from MASTER",dumpsize);
     /* Read the bulk write data on a temp file */
-    snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
-    dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
+    while(maxtries--) {
+        snprintf(tmpfile,256,
+            "temp-%d.%ld.rdb",(int)time(NULL),(long int)getpid());
+        dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
+        if (dfd != -1) break;
+        sleep(1);
+    }
     if (dfd == -1) {
         close(fd);
         redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
     if (dfd == -1) {
         close(fd);
         redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
@@ -7235,7 +7612,7 @@ fmterr:
 }
 
 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
 }
 
 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
-static int fwriteBulk(FILE *fp, robj *obj) {
+static int fwriteBulkObject(FILE *fp, robj *obj) {
     char buf[128];
     int decrrc = 0;
 
     char buf[128];
     int decrrc = 0;
 
@@ -7260,6 +7637,18 @@ err:
     return 0;
 }
 
     return 0;
 }
 
+/* Write binary-safe string into a file in the bulkformat
+ * $<count>\r\n<payload>\r\n */
+static int fwriteBulkString(FILE *fp, char *s, unsigned long len) {
+    char buf[128];
+
+    snprintf(buf,sizeof(buf),"$%ld\r\n",(unsigned long)len);
+    if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
+    if (len && fwrite(s,len,1,fp) == 0) return 0;
+    if (fwrite("\r\n",2,1,fp) == 0) return 0;
+    return 1;
+}
+
 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
 static int fwriteBulkDouble(FILE *fp, double d) {
     char buf[128], dbuf[128];
 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
 static int fwriteBulkDouble(FILE *fp, double d) {
     char buf[128], dbuf[128];
@@ -7342,8 +7731,8 @@ static int rewriteAppendOnlyFile(char *filename) {
                 char cmd[]="*3\r\n$3\r\nSET\r\n";
                 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
                 /* Key and value */
                 char cmd[]="*3\r\n$3\r\nSET\r\n";
                 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
                 /* Key and value */
-                if (fwriteBulk(fp,key) == 0) goto werr;
-                if (fwriteBulk(fp,o) == 0) goto werr;
+                if (fwriteBulkObject(fp,key) == 0) goto werr;
+                if (fwriteBulkObject(fp,o) == 0) goto werr;
             } else if (o->type == REDIS_LIST) {
                 /* Emit the RPUSHes needed to rebuild the list */
                 list *list = o->ptr;
             } else if (o->type == REDIS_LIST) {
                 /* Emit the RPUSHes needed to rebuild the list */
                 list *list = o->ptr;
@@ -7356,8 +7745,8 @@ static int rewriteAppendOnlyFile(char *filename) {
                     robj *eleobj = listNodeValue(ln);
 
                     if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
                     robj *eleobj = listNodeValue(ln);
 
                     if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                    if (fwriteBulk(fp,key) == 0) goto werr;
-                    if (fwriteBulk(fp,eleobj) == 0) goto werr;
+                    if (fwriteBulkObject(fp,key) == 0) goto werr;
+                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
                 }
             } else if (o->type == REDIS_SET) {
                 /* Emit the SADDs needed to rebuild the set */
                 }
             } else if (o->type == REDIS_SET) {
                 /* Emit the SADDs needed to rebuild the set */
@@ -7370,8 +7759,8 @@ static int rewriteAppendOnlyFile(char *filename) {
                     robj *eleobj = dictGetEntryKey(de);
 
                     if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
                     robj *eleobj = dictGetEntryKey(de);
 
                     if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                    if (fwriteBulk(fp,key) == 0) goto werr;
-                    if (fwriteBulk(fp,eleobj) == 0) goto werr;
+                    if (fwriteBulkObject(fp,key) == 0) goto werr;
+                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
                 }
                 dictReleaseIterator(di);
             } else if (o->type == REDIS_ZSET) {
                 }
                 dictReleaseIterator(di);
             } else if (o->type == REDIS_ZSET) {
@@ -7386,11 +7775,43 @@ static int rewriteAppendOnlyFile(char *filename) {
                     double *score = dictGetEntryVal(de);
 
                     if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
                     double *score = dictGetEntryVal(de);
 
                     if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                    if (fwriteBulk(fp,key) == 0) goto werr;
+                    if (fwriteBulkObject(fp,key) == 0) goto werr;
                     if (fwriteBulkDouble(fp,*score) == 0) goto werr;
                     if (fwriteBulkDouble(fp,*score) == 0) goto werr;
-                    if (fwriteBulk(fp,eleobj) == 0) goto werr;
+                    if (fwriteBulkObject(fp,eleobj) == 0) goto werr;
                 }
                 dictReleaseIterator(di);
                 }
                 dictReleaseIterator(di);
+            } else if (o->type == REDIS_HASH) {
+                char cmd[]="*4\r\n$4\r\nHSET\r\n";
+
+                /* Emit the HSETs needed to rebuild the hash */
+                if (o->encoding == REDIS_ENCODING_ZIPMAP) {
+                    unsigned char *p = zipmapRewind(o->ptr);
+                    unsigned char *field, *val;
+                    unsigned int flen, vlen;
+
+                    while((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) {
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,key) == 0) goto werr;
+                        if (fwriteBulkString(fp,(char*)field,flen) == -1)
+                            return -1;
+                        if (fwriteBulkString(fp,(char*)val,vlen) == -1)
+                            return -1;
+                    }
+                } else {
+                    dictIterator *di = dictGetIterator(o->ptr);
+                    dictEntry *de;
+
+                    while((de = dictNext(di)) != NULL) {
+                        robj *field = dictGetEntryKey(de);
+                        robj *val = dictGetEntryVal(de);
+
+                        if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
+                        if (fwriteBulkObject(fp,key) == 0) goto werr;
+                        if (fwriteBulkObject(fp,field) == -1) return -1;
+                        if (fwriteBulkObject(fp,val) == -1) return -1;
+                    }
+                    dictReleaseIterator(di);
+                }
             } else {
                 redisAssert(0 != 0);
             }
             } else {
                 redisAssert(0 != 0);
             }
@@ -7400,7 +7821,7 @@ static int rewriteAppendOnlyFile(char *filename) {
                 /* If this key is already expired skip it */
                 if (expiretime < now) continue;
                 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
                 /* If this key is already expired skip it */
                 if (expiretime < now) continue;
                 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
-                if (fwriteBulk(fp,key) == 0) goto werr;
+                if (fwriteBulkObject(fp,key) == 0) goto werr;
                 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
             }
             if (swapped) decrRefCount(o);
                 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
             }
             if (swapped) decrRefCount(o);
@@ -8569,11 +8990,20 @@ static void debugCommand(redisClient *c) {
         val = dictGetEntryVal(de);
         if (!server.vm_enabled || (key->storage == REDIS_VM_MEMORY ||
                                    key->storage == REDIS_VM_SWAPPING)) {
         val = dictGetEntryVal(de);
         if (!server.vm_enabled || (key->storage == REDIS_VM_MEMORY ||
                                    key->storage == REDIS_VM_SWAPPING)) {
+            char *strenc;
+            char buf[128];
+
+            if (val->encoding < (sizeof(strencoding)/sizeof(char*))) {
+                strenc = strencoding[val->encoding];
+            } else {
+                snprintf(buf,64,"unknown encoding %d\n", val->encoding);
+                strenc = buf;
+            }
             addReplySds(c,sdscatprintf(sdsempty(),
                 "+Key at:%p refcount:%d, value at:%p refcount:%d "
             addReplySds(c,sdscatprintf(sdsempty(),
                 "+Key at:%p refcount:%d, value at:%p refcount:%d "
-                "encoding:%d serializedlength:%lld\r\n",
+                "encoding:%s serializedlength:%lld\r\n",
                 (void*)key, key->refcount, (void*)val, val->refcount,
                 (void*)key, key->refcount, (void*)val, val->refcount,
-                val->encoding, (long long) rdbSavedObjectLen(val,NULL)));
+                strenc, (long long) rdbSavedObjectLen(val,NULL)));
         } else {
             addReplySds(c,sdscatprintf(sdsempty(),
                 "+Key at:%p refcount:%d, value swapped at: page %llu "
         } else {
             addReplySds(c,sdscatprintf(sdsempty(),
                 "+Key at:%p refcount:%d, value swapped at: page %llu "