Merged zsetops branch from Pietern
authorantirez <antirez@gmail.com>
Tue, 9 Mar 2010 15:25:55 +0000 (16:25 +0100)
committerantirez <antirez@gmail.com>
Tue, 9 Mar 2010 15:25:55 +0000 (16:25 +0100)
1  2 
redis.c
test-redis.tcl

diff --combined redis.c
index 8658bc1979263919d8ab0b809d03db3c8f2226e8,cc64efb846a49d0ae19ff4c0702072e413d89167..d15efbbc7763595cd84404f622e18d89bf464b88
+++ b/redis.c
@@@ -590,7 -590,6 +590,7 @@@ static void readQueryFromClient(aeEvent
  static struct redisCommand *lookupCommand(char *name);
  static void call(redisClient *c, struct redisCommand *cmd);
  static void resetClient(redisClient *c);
 +static void convertToRealHash(robj *o);
  
  static void authCommand(redisClient *c);
  static void pingCommand(redisClient *c);
@@@ -674,10 -673,10 +674,12 @@@ static void brpopCommand(redisClient *c
  static void appendCommand(redisClient *c);
  static void substrCommand(redisClient *c);
  static void zrankCommand(redisClient *c);
 +static void zrevrankCommand(redisClient *c);
  static void hsetCommand(redisClient *c);
  static void hgetCommand(redisClient *c);
 +static void zremrangebyrankCommand(redisClient *c);
+ static void zunionCommand(redisClient *c);
+ static void zinterCommand(redisClient *c);
  
  /*================================= Globals ================================= */
  
@@@ -725,7 -724,8 +727,9 @@@ static struct redisCommand cmdTable[] 
      {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
      {"zrem",zremCommand,3,REDIS_CMD_BULK,1,1,1},
      {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,1,1,1},
 +    {"zremrangebyrank",zremrangebyrankCommand,4,REDIS_CMD_INLINE,1,1,1},
+     {"zunion",zunionCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,0,0,0},
+     {"zinter",zinterCommand,-4,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,0,0,0},
      {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,1,1,1},
      {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,1,1,1},
      {"zcount",zcountCommand,4,REDIS_CMD_INLINE,1,1,1},
      {"zcard",zcardCommand,2,REDIS_CMD_INLINE,1,1,1},
      {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
      {"zrank",zrankCommand,3,REDIS_CMD_INLINE,1,1,1},
 +    {"zrevrank",zrevrankCommand,3,REDIS_CMD_INLINE,1,1,1},
      {"hset",hsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
      {"hget",hgetCommand,3,REDIS_CMD_BULK,1,1,1},
      {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
@@@ -2938,7 -2937,7 +2942,7 @@@ static int rdbSaveLen(FILE *fp, uint32_
  /* String objects in the form "2391" "-100" without any space and with a
   * range of values that can fit in an 8, 16 or 32 bit signed value can be
   * encoded as integers to save space */
 -static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
 +static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) {
      long long value;
      char *endptr, buf[32];
  
  
      /* If the number converted back into a string is not identical
       * then it's not possible to encode the string as integer */
 -    if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
 +    if (strlen(buf) != len || memcmp(buf,s,len)) return 0;
  
      /* Finally check if it fits in our ranges */
      if (value >= -(1<<7) && value <= (1<<7)-1) {
      }
  }
  
 -static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
 -    unsigned int comprlen, outlen;
 +static int rdbSaveLzfStringObject(FILE *fp, unsigned char *s, size_t len) {
 +    size_t comprlen, outlen;
      unsigned char byte;
      void *out;
  
      /* We require at least four bytes compression for this to be worth it */
 -    outlen = sdslen(obj->ptr)-4;
 -    if (outlen <= 0) return 0;
 +    if (len <= 4) return 0;
 +    outlen = len-4;
      if ((out = zmalloc(outlen+1)) == NULL) return 0;
 -    comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
 +    comprlen = lzf_compress(s, len, out, outlen);
      if (comprlen == 0) {
          zfree(out);
          return 0;
      byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
      if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
      if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
 -    if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
 +    if (rdbSaveLen(fp,len) == -1) goto writeerr;
      if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
      zfree(out);
      return comprlen;
@@@ -3003,13 -3002,16 +3007,13 @@@ writeerr
  
  /* Save a string objet as [len][data] on disk. If the object is a string
   * representation of an integer value we try to safe it in a special form */
 -static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
 -    size_t len;
 +static int rdbSaveRawString(FILE *fp, unsigned char *s, size_t len) {
      int enclen;
  
 -    len = sdslen(obj->ptr);
 -
      /* Try integer encoding */
      if (len <= 11) {
          unsigned char buf[5];
 -        if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
 +        if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
              if (fwrite(buf,enclen,1,fp) == 0) return -1;
              return 0;
          }
      if (server.rdbcompression && len > 20) {
          int retval;
  
 -        retval = rdbSaveLzfStringObject(fp,obj);
 +        retval = rdbSaveLzfStringObject(fp,s,len);
          if (retval == -1) return -1;
          if (retval > 0) return 0;
          /* retval == 0 means data can't be compressed, save the old way */
  
      /* Store verbatim */
      if (rdbSaveLen(fp,len) == -1) return -1;
 -    if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
 +    if (len && fwrite(s,len,1,fp) == 0) return -1;
      return 0;
  }
  
@@@ -3043,10 -3045,10 +3047,10 @@@ static int rdbSaveStringObject(FILE *fp
       * this in order to avoid bugs) */
      if (obj->encoding != REDIS_ENCODING_RAW) {
          obj = getDecodedObject(obj);
 -        retval = rdbSaveStringObjectRaw(fp,obj);
 +        retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
          decrRefCount(obj);
      } else {
 -        retval = rdbSaveStringObjectRaw(fp,obj);
 +        retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
      }
      return retval;
  }
@@@ -3124,33 -3126,6 +3128,33 @@@ static int rdbSaveObject(FILE *fp, rob
              if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
          }
          dictReleaseIterator(di);
 +    } else if (o->type == REDIS_HASH) {
 +        /* Save a hash value */
 +        if (o->encoding == REDIS_ENCODING_ZIPMAP) {
 +            unsigned char *p = zipmapRewind(o->ptr);
 +            unsigned int count = zipmapLen(o->ptr);
 +            unsigned char *key, *val;
 +            unsigned int klen, vlen;
 +
 +            if (rdbSaveLen(fp,count) == -1) return -1;
 +            while((p = zipmapNext(p,&key,&klen,&val,&vlen)) != NULL) {
 +                if (rdbSaveRawString(fp,key,klen) == -1) return -1;
 +                if (rdbSaveRawString(fp,val,vlen) == -1) return -1;
 +            }
 +        } else {
 +            dictIterator *di = dictGetIterator(o->ptr);
 +            dictEntry *de;
 +
 +            if (rdbSaveLen(fp,dictSize((dict*)o->ptr)) == -1) return -1;
 +            while((de = dictNext(di)) != NULL) {
 +                robj *key = dictGetEntryKey(de);
 +                robj *val = dictGetEntryVal(de);
 +
 +                if (rdbSaveStringObject(fp,key) == -1) return -1;
 +                if (rdbSaveStringObject(fp,val) == -1) return -1;
 +            }
 +            dictReleaseIterator(di);
 +        }
      } else {
          redisAssert(0 != 0);
      }
@@@ -3475,7 -3450,7 +3479,7 @@@ static robj *rdbLoadObject(int type, FI
          }
      } else if (type == REDIS_ZSET) {
          /* Read list/set value */
 -        uint32_t zsetlen;
 +        size_t zsetlen;
          zset *zs;
  
          if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
              zslInsert(zs->zsl,*score,ele);
              incrRefCount(ele); /* added to skiplist */
          }
 +    } else if (type == REDIS_HASH) {
 +        size_t hashlen;
 +
 +        if ((hashlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
 +        o = createHashObject();
 +        /* Too many entries? Use an hash table. */
 +        if (hashlen > server.hash_max_zipmap_entries)
 +            convertToRealHash(o);
 +        /* Load every key/value, then set it into the zipmap or hash
 +         * table, as needed. */
 +        while(hashlen--) {
 +            robj *key, *val;
 +
 +            if ((key = rdbLoadStringObject(fp)) == NULL) return NULL;
 +            if ((val = rdbLoadStringObject(fp)) == NULL) return NULL;
 +            /* If we are using a zipmap and there are too big values
 +             * the object is converted to real hash table encoding. */
 +            if (o->encoding != REDIS_ENCODING_HT &&
 +               (sdslen(key->ptr) > server.hash_max_zipmap_value ||
 +                sdslen(val->ptr) > server.hash_max_zipmap_value))
 +            {
 +                    convertToRealHash(o);
 +            }
 +
 +            if (o->encoding == REDIS_ENCODING_ZIPMAP) {
 +                unsigned char *zm = o->ptr;
 +
 +                zm = zipmapSet(zm,key->ptr,sdslen(key->ptr),
 +                                  val->ptr,sdslen(val->ptr),NULL);
 +                o->ptr = zm;
 +                decrRefCount(key);
 +                decrRefCount(val);
 +            } else {
 +                tryObjectEncoding(key);
 +                tryObjectEncoding(val);
 +                dictAdd((dict*)o->ptr,key,val);
 +                incrRefCount(key);
 +                incrRefCount(val);
 +            }
 +        }
      } else {
          redisAssert(0 != 0);
      }
@@@ -4040,8 -3975,7 +4044,8 @@@ static void typeCommand(redisClient *c
          case REDIS_LIST: type = "+list"; break;
          case REDIS_SET: type = "+set"; break;
          case REDIS_ZSET: type = "+zset"; break;
 -        default: type = "unknown"; break;
 +        case REDIS_HASH: type = "+hash"; break;
 +        default: type = "+unknown"; break;
          }
      }
      addReplySds(c,sdsnew(type));
@@@ -4837,6 -4771,7 +4841,7 @@@ static void sinterstoreCommand(redisCli
  
  #define REDIS_OP_UNION 0
  #define REDIS_OP_DIFF 1
+ #define REDIS_OP_INTER 2
  
  static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
      dict **dv = zmalloc(sizeof(dict*)*setsnum);
@@@ -5080,31 -5015,6 +5085,31 @@@ static void zslInsert(zskiplist *zsl, d
      zsl->length++;
  }
  
 +/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
 +void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
 +    int i;
 +    for (i = 0; i < zsl->level; i++) {
 +        if (update[i]->forward[i] == x) {
 +            if (i > 0) {
 +                update[i]->span[i-1] += x->span[i-1] - 1;
 +            }
 +            update[i]->forward[i] = x->forward[i];
 +        } else {
 +            /* invariant: i > 0, because update[0]->forward[0]
 +             * is always equal to x */
 +            update[i]->span[i-1] -= 1;
 +        }
 +    }
 +    if (x->forward[0]) {
 +        x->forward[0]->backward = x->backward;
 +    } else {
 +        zsl->tail = x->backward;
 +    }
 +    while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
 +        zsl->level--;
 +    zsl->length--;
 +}
 +
  /* Delete an element with matching score/object from the skiplist. */
  static int zslDelete(zskiplist *zsl, double score, robj *obj) {
      zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
       * is to find the element with both the right score and object. */
      x = x->forward[0];
      if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
 -        for (i = 0; i < zsl->level; i++) {
 -            if (update[i]->forward[i] == x) {
 -                if (i > 0) {
 -                    update[i]->span[i-1] += x->span[i-1] - 1;
 -                }
 -                update[i]->forward[i] = x->forward[i];
 -            } else {
 -                /* invariant: i > 0, because update[0]->forward[0]
 -                 * is always equal to x */
 -                update[i]->span[i-1] -= 1;
 -            }
 -        }
 -        if (x->forward[0]) {
 -            x->forward[0]->backward = x->backward;
 -        } else {
 -            zsl->tail = x->backward;
 -        }
 +        zslDeleteNode(zsl, x, update);
          zslFreeNode(x);
 -        while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
 -            zsl->level--;
 -        zsl->length--;
          return 1;
      } else {
          return 0; /* not found */
   * Min and mx are inclusive, so a score >= min || score <= max is deleted.
   * Note that this function takes the reference to the hash table view of the
   * sorted set, in order to remove the elements from the hash table too. */
 -static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
 +static unsigned long zslDeleteRangeByScore(zskiplist *zsl, double min, double max, dict *dict) {
      zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
      unsigned long removed = 0;
      int i;
       * is to find the element with both the right score and object. */
      x = x->forward[0];
      while (x && x->score <= max) {
 -        zskiplistNode *next;
 +        zskiplistNode *next = x->forward[0];
 +        zslDeleteNode(zsl, x, update);
 +        dictDelete(dict,x->obj);
 +        zslFreeNode(x);
 +        removed++;
 +        x = next;
 +    }
 +    return removed; /* not found */
 +}
  
 -        for (i = 0; i < zsl->level; i++) {
 -            if (update[i]->forward[i] == x) {
 -                if (i > 0) {
 -                    update[i]->span[i-1] += x->span[i-1] - 1;
 -                }
 -                update[i]->forward[i] = x->forward[i];
 -            } else {
 -                /* invariant: i > 0, because update[0]->forward[0]
 -                 * is always equal to x */
 -                update[i]->span[i-1] -= 1;
 -            }
 -        }
 -        if (x->forward[0]) {
 -            x->forward[0]->backward = x->backward;
 -        } else {
 -            zsl->tail = x->backward;
 +/* Delete all the elements with rank between start and end from the skiplist.
 + * Start and end are inclusive. Note that start and end need to be 1-based */
 +static unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
 +    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
 +    unsigned long traversed = 0, removed = 0;
 +    int i;
 +
 +    x = zsl->header;
 +    for (i = zsl->level-1; i >= 0; i--) {
 +        while (x->forward[i] && (traversed + (i > 0 ? x->span[i-1] : 1)) < start) {
 +            traversed += i > 0 ? x->span[i-1] : 1;
 +            x = x->forward[i];
          }
 -        next = x->forward[0];
 +        update[i] = x;
 +    }
 +
 +    traversed++;
 +    x = x->forward[0];
 +    while (x && traversed <= end) {
 +        zskiplistNode *next = x->forward[0];
 +        zslDeleteNode(zsl, x, update);
          dictDelete(dict,x->obj);
          zslFreeNode(x);
 -        while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
 -            zsl->level--;
 -        zsl->length--;
          removed++;
 +        traversed++;
          x = next;
      }
 -    return removed; /* not found */
 +    return removed;
  }
  
  /* Find the first node having a score equal or greater than the specified one.
@@@ -5403,54 -5323,178 +5408,219 @@@ static void zremrangebyscoreCommand(red
              return;
          }
          zs = zsetobj->ptr;
 -        deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
 +        deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict);
          if (htNeedsResize(zs->dict)) dictResize(zs->dict);
          server.dirty += deleted;
          addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
      }
  }
  
 +static void zremrangebyrankCommand(redisClient *c) {
 +    int start = atoi(c->argv[2]->ptr);
 +    int end = atoi(c->argv[3]->ptr);
 +    robj *zsetobj;
 +    zset *zs;
 +
 +    zsetobj = lookupKeyWrite(c->db,c->argv[1]);
 +    if (zsetobj == NULL) {
 +        addReply(c,shared.czero);
 +    } else {
 +        if (zsetobj->type != REDIS_ZSET) {
 +            addReply(c,shared.wrongtypeerr);
 +            return;
 +        }
 +
 +        zs = zsetobj->ptr;
 +        int llen = zs->zsl->length;
 +        long deleted;
 +
 +        /* convert negative indexes */
 +        if (start < 0) start = llen+start;
 +        if (end < 0) end = llen+end;
 +        if (start < 0) start = 0;
 +        if (end < 0) end = 0;
 +
 +        /* indexes sanity checks */
 +        if (start > end || start >= llen) {
 +            addReply(c,shared.czero);
 +            return;
 +        }
 +        if (end >= llen) end = llen-1;
 +
 +        /* increment start and end because zsl*Rank functions
 +         * use 1-based rank */
 +        deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
 +        if (htNeedsResize(zs->dict)) dictResize(zs->dict);
 +        server.dirty += deleted;
 +        addReplyLong(c, deleted);
 +    }
 +}
 +
+ typedef struct {
+     dict *dict;
+     double weight;
+ } zsetopsrc;
+ static int qsortCompareZsetopsrcByCardinality(const void *s1, const void *s2) {
+     zsetopsrc *d1 = (void*) s1, *d2 = (void*) s2;
+     unsigned long size1, size2;
+     size1 = d1->dict ? dictSize(d1->dict) : 0;
+     size2 = d2->dict ? dictSize(d2->dict) : 0;
+     return size1 - size2;
+ }
+ static void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
+     int i, j, zsetnum;
+     zsetopsrc *src;
+     robj *dstobj;
+     zset *dstzset;
+     dictIterator *di;
+     dictEntry *de;
+     /* expect zsetnum input keys to be given */
+     zsetnum = atoi(c->argv[2]->ptr);
+     if (zsetnum < 1) {
+         addReplySds(c,sdsnew("-ERR at least 1 input key is needed for ZUNION/ZINTER\r\n"));
+         return;
+     }
+     /* test if the expected number of keys would overflow */
+     if (3+zsetnum > c->argc) {
+         addReply(c,shared.syntaxerr);
+         return;
+     }
+     /* read keys to be used for input */
+     src = malloc(sizeof(zsetopsrc) * zsetnum);
+     for (i = 0, j = 3; i < zsetnum; i++, j++) {
+         robj *zsetobj = lookupKeyWrite(c->db,c->argv[j]);
+         if (!zsetobj) {
+             src[i].dict = NULL;
+         } else {
+             if (zsetobj->type != REDIS_ZSET) {
+                 zfree(src);
+                 addReply(c,shared.wrongtypeerr);
+                 return;
+             }
+             src[i].dict = ((zset*)zsetobj->ptr)->dict;
+         }
+         /* default all weights to 1 */
+         src[i].weight = 1.0;
+     }
+     /* parse optional extra arguments */
+     if (j < c->argc) {
+         int remaining = c->argc-j;
+         while (remaining) {
+             if (!strcasecmp(c->argv[j]->ptr,"weights")) {
+                 j++; remaining--;
+                 if (remaining < zsetnum) {
+                     zfree(src);
+                     addReplySds(c,sdsnew("-ERR not enough weights for ZUNION/ZINTER\r\n"));
+                     return;
+                 }
+                 for (i = 0; i < zsetnum; i++, j++, remaining--) {
+                     src[i].weight = strtod(c->argv[j]->ptr, NULL);
+                 }
+             } else {
+                 zfree(src);
+                 addReply(c,shared.syntaxerr);
+                 return;
+             }
+         }
+     }
+     dstobj = createZsetObject();
+     dstzset = dstobj->ptr;
+     if (op == REDIS_OP_INTER) {
+         /* sort sets from the smallest to largest, this will improve our
+          * algorithm's performance */
+         qsort(src,zsetnum,sizeof(zsetopsrc), qsortCompareZsetopsrcByCardinality);
+         /* skip going over all entries if the smallest zset is NULL or empty */
+         if (src[0].dict && dictSize(src[0].dict) > 0) {
+             /* precondition: as src[0].dict is non-empty and the zsets are ordered
+              * from small to large, all src[i > 0].dict are non-empty too */
+             di = dictGetIterator(src[0].dict);
+             while((de = dictNext(di)) != NULL) {
+                 double *score = zmalloc(sizeof(double));
+                 *score = 0.0;
+                 for (j = 0; j < zsetnum; j++) {
+                     dictEntry *other = (j == 0) ? de : dictFind(src[j].dict,dictGetEntryKey(de));
+                     if (other) {
+                         *score = *score + src[j].weight * (*(double*)dictGetEntryVal(other));
+                     } else {
+                         break;
+                     }
+                 }
+                 /* skip entry when not present in every source dict */
+                 if (j != zsetnum) {
+                     zfree(score);
+                 } else {
+                     robj *o = dictGetEntryKey(de);
+                     dictAdd(dstzset->dict,o,score);
+                     incrRefCount(o); /* added to dictionary */
+                     zslInsert(dstzset->zsl,*score,o);
+                     incrRefCount(o); /* added to skiplist */
+                 }
+             }
+             dictReleaseIterator(di);
+         }
+     } else if (op == REDIS_OP_UNION) {
+         for (i = 0; i < zsetnum; i++) {
+             if (!src[i].dict) continue;
+             di = dictGetIterator(src[i].dict);
+             while((de = dictNext(di)) != NULL) {
+                 /* skip key when already processed */
+                 if (dictFind(dstzset->dict,dictGetEntryKey(de)) != NULL) continue;
+                 double *score = zmalloc(sizeof(double));
+                 *score = 0.0;
+                 for (j = 0; j < zsetnum; j++) {
+                     if (!src[j].dict) continue;
+                     dictEntry *other = (i == j) ? de : dictFind(src[j].dict,dictGetEntryKey(de));
+                     if (other) {
+                         *score = *score + src[j].weight * (*(double*)dictGetEntryVal(other));
+                     }
+                 }
+                 robj *o = dictGetEntryKey(de);
+                 dictAdd(dstzset->dict,o,score);
+                 incrRefCount(o); /* added to dictionary */
+                 zslInsert(dstzset->zsl,*score,o);
+                 incrRefCount(o); /* added to skiplist */
+             }
+             dictReleaseIterator(di);
+         }
+     } else {
+         /* unknown operator */
+         redisAssert(op == REDIS_OP_INTER || op == REDIS_OP_UNION);
+     }
+     deleteKey(c->db,dstkey);
+     dictAdd(c->db->dict,dstkey,dstobj);
+     incrRefCount(dstkey);
+     addReplyLong(c, dstzset->zsl->length);
+     server.dirty++;
+     zfree(src);
+ }
+ static void zunionCommand(redisClient *c) {
+     zunionInterGenericCommand(c,c->argv[1], REDIS_OP_UNION);
+ }
+ static void zinterCommand(redisClient *c) {
+     zunionInterGenericCommand(c,c->argv[1], REDIS_OP_INTER);
+ }
  static void zrangeGenericCommand(redisClient *c, int reverse) {
      robj *o;
      int start = atoi(c->argv[2]->ptr);
              /* check if starting point is trivial, before searching
               * the element in log(N) time */
              if (reverse) {
 -                ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen - start);
 +                ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start);
              } else {
 -                ln = start == 0 ? zsl->header->forward[0] : zslGetElementByRank(zsl, start + 1);
 +                ln = start == 0 ? zsl->header->forward[0] : zslGetElementByRank(zsl, start+1);
              }
  
              /* Return the result in form of a multi-bulk reply */
@@@ -5697,7 -5741,7 +5867,7 @@@ static void zscoreCommand(redisClient *
      }
  }
  
 -static void zrankCommand(redisClient *c) {
 +static void zrankGenericCommand(redisClient *c, int reverse) {
      robj *o;
      o = lookupKeyRead(c->db,c->argv[1]);
      if (o == NULL) {
          double *score = dictGetEntryVal(de);
          rank = zslGetRank(zsl, *score, c->argv[2]);
          if (rank) {
 -            addReplyLong(c, rank-1);
 +            if (reverse) {
 +                addReplyLong(c, zsl->length - rank);
 +            } else {
 +                addReplyLong(c, rank-1);
 +            }
          } else {
              addReply(c,shared.nullbulk);
          }
      }
  }
  
 +static void zrankCommand(redisClient *c) {
 +    zrankGenericCommand(c, 0);
 +}
 +
 +static void zrevrankCommand(redisClient *c) {
 +    zrankGenericCommand(c, 1);
 +}
 +
  /* =================================== Hashes =============================== */
  static void hsetCommand(redisClient *c) {
      int update = 0;
      }
      if (o->encoding == REDIS_ENCODING_ZIPMAP) {
          unsigned char *zm = o->ptr;
 +        robj *valobj = getDecodedObject(c->argv[3]);
  
          zm = zipmapSet(zm,c->argv[2]->ptr,sdslen(c->argv[2]->ptr),
 -            c->argv[3]->ptr,sdslen(c->argv[3]->ptr),&update);
 +            valobj->ptr,sdslen(valobj->ptr),&update);
 +        decrRefCount(valobj);
          o->ptr = zm;
      } else {
          if (dictAdd(o->ptr,c->argv[2],c->argv[3]) == DICT_OK) {
@@@ -5813,27 -5843,6 +5983,27 @@@ static void hgetCommand(redisClient *c
      }
  }
  
 +static void convertToRealHash(robj *o) {
 +    unsigned char *key, *val, *p, *zm = o->ptr;
 +    unsigned int klen, vlen;
 +    dict *dict = dictCreate(&hashDictType,NULL);
 +
 +    assert(o->type == REDIS_HASH && o->encoding != REDIS_ENCODING_HT);
 +    p = zipmapRewind(zm);
 +    while((p = zipmapNext(p,&key,&klen,&val,&vlen)) != NULL) {
 +        robj *keyobj, *valobj;
 +
 +        keyobj = createStringObject((char*)key,klen);
 +        valobj = createStringObject((char*)val,vlen);
 +        tryObjectEncoding(keyobj);
 +        tryObjectEncoding(valobj);
 +        dictAdd(dict,keyobj,valobj);
 +    }
 +    o->encoding = REDIS_ENCODING_HT;
 +    o->ptr = dict;
 +    zfree(zm);
 +}
 +
  /* ========================= Non type-specific commands  ==================== */
  
  static void flushdbCommand(redisClient *c) {
diff --combined test-redis.tcl
index 9139f544754ae8abd17217b9881e63b4de921de9,54b614fd3a3fa4b896cf45becdc7811db0a3bce4..00370a4c4b955f8bc442d135613dfad3661b7f4f
@@@ -1204,10 -1204,6 +1204,10 @@@ proc main {server port} 
          list [$r zrank zranktmp x] [$r zrank zranktmp y] [$r zrank zranktmp z]
      } {0 1 2}
  
 +    test {ZREVRANK basics} {
 +        list [$r zrevrank zranktmp x] [$r zrevrank zranktmp y] [$r zrevrank zranktmp z]
 +    } {2 1 0}
 +
      test {ZRANK - after deletion} {
          $r zrem zranktmp y
          list [$r zrank zranktmp x] [$r zrank zranktmp z]
          $r zrangebyscore zset 20 50 LIMIT 2 3 withscores
      } {d 40 e 50}
  
 -    test {ZREMRANGE basics} {
 +    test {ZREMRANGEBYSCORE basics} {
          $r del zset
          $r zadd zset 1 a
          $r zadd zset 2 b
          list [$r zremrangebyscore zset 2 4] [$r zrange zset 0 -1]
      } {3 {a e}}
  
 -    test {ZREMRANGE from -inf to +inf} {
 +    test {ZREMRANGEBYSCORE from -inf to +inf} {
          $r del zset
          $r zadd zset 1 a
          $r zadd zset 2 b
          list [$r zremrangebyscore zset -inf +inf] [$r zrange zset 0 -1]
      } {5 {}}
  
 +    test {ZREMRANGEBYRANK basics} {
 +        $r del zset
 +        $r zadd zset 1 a
 +        $r zadd zset 2 b
 +        $r zadd zset 3 c
 +        $r zadd zset 4 d
 +        $r zadd zset 5 e
 +        list [$r zremrangebyrank zset 1 3] [$r zrange zset 0 -1]
 +    } {3 {a e}}
 +
+     test {ZUNION basics} {
+         $r del zseta zsetb zsetc
+         $r zadd zseta 1 a
+         $r zadd zseta 2 b
+         $r zadd zseta 3 c
+         $r zadd zsetb 1 b
+         $r zadd zsetb 2 c
+         $r zadd zsetb 3 d
+         list [$r zunion zsetc 2 zseta zsetb] [$r zrange zsetc 0 -1 withscores]
+     } {4 {a 1 b 3 d 3 c 5}}
+     test {ZUNION with weights} {
+         list [$r zunion zsetc 2 zseta zsetb weights 2 3] [$r zrange zsetc 0 -1 withscores]
+     } {4 {a 2 b 7 d 9 c 12}}
+     test {ZINTER basics} {
+         list [$r zinter zsetc 2 zseta zsetb] [$r zrange zsetc 0 -1 withscores]
+     } {2 {b 3 c 5}}
+     test {ZINTER with weights} {
+         list [$r zinter zsetc 2 zseta zsetb weights 2 3] [$r zrange zsetc 0 -1 withscores]
+     } {2 {b 7 c 12}}
      test {SORT against sorted sets} {
          $r del zset
          $r zadd zset 1 a