]> git.saurik.com Git - redis.git/commitdiff
Merged ZREMBYRANK from Pietern
authorantirez <antirez@gmail.com>
Tue, 9 Mar 2010 15:22:50 +0000 (16:22 +0100)
committerantirez <antirez@gmail.com>
Tue, 9 Mar 2010 15:22:50 +0000 (16:22 +0100)
1  2 
redis.c
test-redis.tcl

diff --combined redis.c
index c1777c761931b29a2e56d85e4b810b253f8ffc40,ab0b9be4aec6ced9a9b277d2c0bab13d4646a2f3..8658bc1979263919d8ab0b809d03db3c8f2226e8
+++ b/redis.c
@@@ -75,7 -75,6 +75,7 @@@
  #include "zmalloc.h" /* total memory usage aware version of malloc/free */
  #include "lzf.h"    /* LZF compression library */
  #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
 +#include "zipmap.h"
  
  /* Error codes */
  #define REDIS_OK                0
  #define REDIS_ZSET 3
  #define REDIS_HASH 4
  
 -/* Objects encoding */
 +/* Objects encoding. Some kind of objects like Strings and Hashes can be
 + * internally represented in multiple ways. The 'encoding' field of the object
 + * is set to one of this fields for this object. */
  #define REDIS_ENCODING_RAW 0    /* Raw representation */
  #define REDIS_ENCODING_INT 1    /* Encoded as integer */
 +#define REDIS_ENCODING_ZIPMAP 2 /* Encoded as zipmap */
 +#define REDIS_ENCODING_HT 3     /* Encoded as an hash table */
  
  /* Object types only used for dumping to disk */
  #define REDIS_EXPIRETIME 253
  #define APPENDFSYNC_ALWAYS 1
  #define APPENDFSYNC_EVERYSEC 2
  
 +/* Hashes related defaults */
 +#define REDIS_HASH_MAX_ZIPMAP_ENTRIES 64
 +#define REDIS_HASH_MAX_ZIPMAP_VALUE 512
 +
  /* We can print the stacktrace, so our assert is defined this way: */
  #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e,__FILE__,__LINE__),_exit(1)))
  static void _redisAssert(char *estr, char *file, int line);
@@@ -395,9 -386,6 +395,9 @@@ struct redisServer 
      off_t vm_page_size;
      off_t vm_pages;
      unsigned long long vm_max_memory;
 +    /* Hashes config */
 +    size_t hash_max_zipmap_entries;
 +    size_t hash_max_zipmap_value;
      /* Virtual memory state */
      FILE *vm_fp;
      int vm_fd;
@@@ -590,7 -578,6 +590,7 @@@ static void readQueryFromClient(aeEvent
  static struct redisCommand *lookupCommand(char *name);
  static void call(redisClient *c, struct redisCommand *cmd);
  static void resetClient(redisClient *c);
 +static void convertToRealHash(robj *o);
  
  static void authCommand(redisClient *c);
  static void pingCommand(redisClient *c);
@@@ -674,9 -661,7 +674,10 @@@ static void brpopCommand(redisClient *c
  static void appendCommand(redisClient *c);
  static void substrCommand(redisClient *c);
  static void zrankCommand(redisClient *c);
 +static void zrevrankCommand(redisClient *c);
 +static void hsetCommand(redisClient *c);
 +static void hgetCommand(redisClient *c);
+ static void zremrangebyrankCommand(redisClient *c);
  
  /*================================= Globals ================================= */
  
@@@ -724,6 -709,7 +725,7 @@@ static struct redisCommand cmdTable[] 
      {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
      {"zrem",zremCommand,3,REDIS_CMD_BULK,1,1,1},
      {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE,1,1,1},
+     {"zremrangebyrank",zremrangebyrankCommand,4,REDIS_CMD_INLINE,1,1,1},
      {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE,1,1,1},
      {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE,1,1,1},
      {"zcount",zcountCommand,4,REDIS_CMD_INLINE,1,1,1},
      {"zcard",zcardCommand,2,REDIS_CMD_INLINE,1,1,1},
      {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
      {"zrank",zrankCommand,3,REDIS_CMD_INLINE,1,1,1},
 +    {"zrevrank",zrevrankCommand,3,REDIS_CMD_INLINE,1,1,1},
 +    {"hset",hsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
 +    {"hget",hgetCommand,3,REDIS_CMD_BULK,1,1,1},
      {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
      {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM,1,1,1},
      {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM,1,1,1},
@@@ -1030,7 -1013,7 +1032,7 @@@ static dictType zsetDictType = 
  };
  
  /* Db->dict */
 -static dictType hashDictType = {
 +static dictType dbDictType = {
      dictObjHash,                /* hash function */
      NULL,                       /* key dup */
      NULL,                       /* val dup */
@@@ -1049,16 -1032,6 +1051,16 @@@ static dictType keyptrDictType = 
      NULL                       /* val destructor */
  };
  
 +/* Hash type hash table (note that small hashes are represented with zimpaps) */
 +static dictType hashDictType = {
 +    dictEncObjHash,             /* hash function */
 +    NULL,                       /* key dup */
 +    NULL,                       /* val dup */
 +    dictEncObjKeyCompare,       /* key compare */
 +    dictRedisObjectDestructor,  /* key destructor */
 +    dictRedisObjectDestructor   /* val destructor */
 +};
 +
  /* Keylist hash table type has unencoded redis objects as keys and
   * lists as values. It's used for blocking operations (BLPOP) and to
   * map swapped keys to a list of clients waiting for this keys to be loaded. */
@@@ -1475,8 -1448,6 +1477,8 @@@ static void initServerConfig() 
      server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
      server.vm_max_threads = 4;
      server.vm_blocked_clients = 0;
 +    server.hash_max_zipmap_entries = REDIS_HASH_MAX_ZIPMAP_ENTRIES;
 +    server.hash_max_zipmap_value = REDIS_HASH_MAX_ZIPMAP_VALUE;
  
      resetServerSaveParams();
  
@@@ -1524,7 -1495,7 +1526,7 @@@ static void initServer() 
          exit(1);
      }
      for (j = 0; j < server.dbnum; j++) {
 -        server.db[j].dict = dictCreate(&hashDictType,NULL);
 +        server.db[j].dict = dictCreate(&dbDictType,NULL);
          server.db[j].expires = dictCreate(&keyptrDictType,NULL);
          server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
          if (server.vm_enabled)
@@@ -1737,12 -1708,6 +1739,12 @@@ static void loadServerConfig(char *file
              server.vm_pages = strtoll(argv[1], NULL, 10);
          } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) {
              server.vm_max_threads = strtoll(argv[1], NULL, 10);
 +        } else if (!strcasecmp(argv[0],"hash-max-zipmap-entries") && argc == 2){
 +            server.hash_max_zipmap_entries = strtol(argv[1], NULL, 10);
 +        } else if (!strcasecmp(argv[0],"hash-max-zipmap-value") && argc == 2){
 +            server.hash_max_zipmap_value = strtol(argv[1], NULL, 10);
 +        } else if (!strcasecmp(argv[0],"vm-max-threads") && argc == 2) {
 +            server.vm_max_threads = strtoll(argv[1], NULL, 10);
          } else {
              err = "Bad directive or wrong number of arguments"; goto loaderr;
          }
@@@ -2580,16 -2545,6 +2582,16 @@@ static robj *createSetObject(void) 
      return createObject(REDIS_SET,d);
  }
  
 +static robj *createHashObject(void) {
 +    /* All the Hashes start as zipmaps. Will be automatically converted
 +     * into hash tables if there are enough elements or big elements
 +     * inside. */
 +    unsigned char *zm = zipmapNew();
 +    robj *o = createObject(REDIS_HASH,zm);
 +    o->encoding = REDIS_ENCODING_ZIPMAP;
 +    return o;
 +}
 +
  static robj *createZsetObject(void) {
      zset *zs = zmalloc(sizeof(*zs));
  
@@@ -2621,17 -2576,7 +2623,17 @@@ static void freeZsetObject(robj *o) 
  }
  
  static void freeHashObject(robj *o) {
 -    dictRelease((dict*) o->ptr);
 +    switch (o->encoding) {
 +    case REDIS_ENCODING_HT:
 +        dictRelease((dict*) o->ptr);
 +        break;
 +    case REDIS_ENCODING_ZIPMAP:
 +        zfree(o->ptr);
 +        break;
 +    default:
 +        redisAssert(0);
 +        break;
 +    }
  }
  
  static void incrRefCount(robj *o) {
@@@ -2936,7 -2881,7 +2938,7 @@@ static int rdbSaveLen(FILE *fp, uint32_
  /* String objects in the form "2391" "-100" without any space and with a
   * range of values that can fit in an 8, 16 or 32 bit signed value can be
   * encoded as integers to save space */
 -static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
 +static int rdbTryIntegerEncoding(char *s, size_t len, unsigned char *enc) {
      long long value;
      char *endptr, buf[32];
  
  
      /* If the number converted back into a string is not identical
       * then it's not possible to encode the string as integer */
 -    if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
 +    if (strlen(buf) != len || memcmp(buf,s,len)) return 0;
  
      /* Finally check if it fits in our ranges */
      if (value >= -(1<<7) && value <= (1<<7)-1) {
      }
  }
  
 -static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
 -    unsigned int comprlen, outlen;
 +static int rdbSaveLzfStringObject(FILE *fp, unsigned char *s, size_t len) {
 +    size_t comprlen, outlen;
      unsigned char byte;
      void *out;
  
      /* We require at least four bytes compression for this to be worth it */
 -    outlen = sdslen(obj->ptr)-4;
 -    if (outlen <= 0) return 0;
 +    if (len <= 4) return 0;
 +    outlen = len-4;
      if ((out = zmalloc(outlen+1)) == NULL) return 0;
 -    comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
 +    comprlen = lzf_compress(s, len, out, outlen);
      if (comprlen == 0) {
          zfree(out);
          return 0;
      byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
      if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
      if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
 -    if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
 +    if (rdbSaveLen(fp,len) == -1) goto writeerr;
      if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
      zfree(out);
      return comprlen;
@@@ -3001,13 -2946,16 +3003,13 @@@ writeerr
  
  /* Save a string objet as [len][data] on disk. If the object is a string
   * representation of an integer value we try to safe it in a special form */
 -static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
 -    size_t len;
 +static int rdbSaveRawString(FILE *fp, unsigned char *s, size_t len) {
      int enclen;
  
 -    len = sdslen(obj->ptr);
 -
      /* Try integer encoding */
      if (len <= 11) {
          unsigned char buf[5];
 -        if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
 +        if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
              if (fwrite(buf,enclen,1,fp) == 0) return -1;
              return 0;
          }
      if (server.rdbcompression && len > 20) {
          int retval;
  
 -        retval = rdbSaveLzfStringObject(fp,obj);
 +        retval = rdbSaveLzfStringObject(fp,s,len);
          if (retval == -1) return -1;
          if (retval > 0) return 0;
          /* retval == 0 means data can't be compressed, save the old way */
  
      /* Store verbatim */
      if (rdbSaveLen(fp,len) == -1) return -1;
 -    if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
 +    if (len && fwrite(s,len,1,fp) == 0) return -1;
      return 0;
  }
  
@@@ -3041,10 -2989,10 +3043,10 @@@ static int rdbSaveStringObject(FILE *fp
       * this in order to avoid bugs) */
      if (obj->encoding != REDIS_ENCODING_RAW) {
          obj = getDecodedObject(obj);
 -        retval = rdbSaveStringObjectRaw(fp,obj);
 +        retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
          decrRefCount(obj);
      } else {
 -        retval = rdbSaveStringObjectRaw(fp,obj);
 +        retval = rdbSaveRawString(fp,obj->ptr,sdslen(obj->ptr));
      }
      return retval;
  }
@@@ -3122,33 -3070,6 +3124,33 @@@ static int rdbSaveObject(FILE *fp, rob
              if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
          }
          dictReleaseIterator(di);
 +    } else if (o->type == REDIS_HASH) {
 +        /* Save a hash value */
 +        if (o->encoding == REDIS_ENCODING_ZIPMAP) {
 +            unsigned char *p = zipmapRewind(o->ptr);
 +            unsigned int count = zipmapLen(o->ptr);
 +            unsigned char *key, *val;
 +            unsigned int klen, vlen;
 +
 +            if (rdbSaveLen(fp,count) == -1) return -1;
 +            while((p = zipmapNext(p,&key,&klen,&val,&vlen)) != NULL) {
 +                if (rdbSaveRawString(fp,key,klen) == -1) return -1;
 +                if (rdbSaveRawString(fp,val,vlen) == -1) return -1;
 +            }
 +        } else {
 +            dictIterator *di = dictGetIterator(o->ptr);
 +            dictEntry *de;
 +
 +            if (rdbSaveLen(fp,dictSize((dict*)o->ptr)) == -1) return -1;
 +            while((de = dictNext(di)) != NULL) {
 +                robj *key = dictGetEntryKey(de);
 +                robj *val = dictGetEntryVal(de);
 +
 +                if (rdbSaveStringObject(fp,key) == -1) return -1;
 +                if (rdbSaveStringObject(fp,val) == -1) return -1;
 +            }
 +            dictReleaseIterator(di);
 +        }
      } else {
          redisAssert(0 != 0);
      }
@@@ -3473,7 -3394,7 +3475,7 @@@ static robj *rdbLoadObject(int type, FI
          }
      } else if (type == REDIS_ZSET) {
          /* Read list/set value */
 -        uint32_t zsetlen;
 +        size_t zsetlen;
          zset *zs;
  
          if ((zsetlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
              zslInsert(zs->zsl,*score,ele);
              incrRefCount(ele); /* added to skiplist */
          }
 +    } else if (type == REDIS_HASH) {
 +        size_t hashlen;
 +
 +        if ((hashlen = rdbLoadLen(fp,NULL)) == REDIS_RDB_LENERR) return NULL;
 +        o = createHashObject();
 +        /* Too many entries? Use an hash table. */
 +        if (hashlen > server.hash_max_zipmap_entries)
 +            convertToRealHash(o);
 +        /* Load every key/value, then set it into the zipmap or hash
 +         * table, as needed. */
 +        while(hashlen--) {
 +            robj *key, *val;
 +
 +            if ((key = rdbLoadStringObject(fp)) == NULL) return NULL;
 +            if ((val = rdbLoadStringObject(fp)) == NULL) return NULL;
 +            /* If we are using a zipmap and there are too big values
 +             * the object is converted to real hash table encoding. */
 +            if (o->encoding != REDIS_ENCODING_HT &&
 +               (sdslen(key->ptr) > server.hash_max_zipmap_value ||
 +                sdslen(val->ptr) > server.hash_max_zipmap_value))
 +            {
 +                    convertToRealHash(o);
 +            }
 +
 +            if (o->encoding == REDIS_ENCODING_ZIPMAP) {
 +                unsigned char *zm = o->ptr;
 +
 +                zm = zipmapSet(zm,key->ptr,sdslen(key->ptr),
 +                                  val->ptr,sdslen(val->ptr),NULL);
 +                o->ptr = zm;
 +                decrRefCount(key);
 +                decrRefCount(val);
 +            } else {
 +                tryObjectEncoding(key);
 +                tryObjectEncoding(val);
 +                dictAdd((dict*)o->ptr,key,val);
 +                incrRefCount(key);
 +                incrRefCount(val);
 +            }
 +        }
      } else {
          redisAssert(0 != 0);
      }
@@@ -4038,8 -3919,7 +4040,8 @@@ static void typeCommand(redisClient *c
          case REDIS_LIST: type = "+list"; break;
          case REDIS_SET: type = "+set"; break;
          case REDIS_ZSET: type = "+zset"; break;
 -        default: type = "unknown"; break;
 +        case REDIS_HASH: type = "+hash"; break;
 +        default: type = "+unknown"; break;
          }
      }
      addReplySds(c,sdsnew(type));
@@@ -5078,6 -4958,31 +5080,31 @@@ static void zslInsert(zskiplist *zsl, d
      zsl->length++;
  }
  
+ /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
+ void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
+     int i;
+     for (i = 0; i < zsl->level; i++) {
+         if (update[i]->forward[i] == x) {
+             if (i > 0) {
+                 update[i]->span[i-1] += x->span[i-1] - 1;
+             }
+             update[i]->forward[i] = x->forward[i];
+         } else {
+             /* invariant: i > 0, because update[0]->forward[0]
+              * is always equal to x */
+             update[i]->span[i-1] -= 1;
+         }
+     }
+     if (x->forward[0]) {
+         x->forward[0]->backward = x->backward;
+     } else {
+         zsl->tail = x->backward;
+     }
+     while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
+         zsl->level--;
+     zsl->length--;
+ }
  /* Delete an element with matching score/object from the skiplist. */
  static int zslDelete(zskiplist *zsl, double score, robj *obj) {
      zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
       * is to find the element with both the right score and object. */
      x = x->forward[0];
      if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
-         for (i = 0; i < zsl->level; i++) {
-             if (update[i]->forward[i] == x) {
-                 if (i > 0) {
-                     update[i]->span[i-1] += x->span[i-1] - 1;
-                 }
-                 update[i]->forward[i] = x->forward[i];
-             } else {
-                 /* invariant: i > 0, because update[0]->forward[0]
-                  * is always equal to x */
-                 update[i]->span[i-1] -= 1;
-             }
-         }
-         if (x->forward[0]) {
-             x->forward[0]->backward = x->backward;
-         } else {
-             zsl->tail = x->backward;
-         }
+         zslDeleteNode(zsl, x, update);
          zslFreeNode(x);
-         while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
-             zsl->level--;
-         zsl->length--;
          return 1;
      } else {
          return 0; /* not found */
   * Min and mx are inclusive, so a score >= min || score <= max is deleted.
   * Note that this function takes the reference to the hash table view of the
   * sorted set, in order to remove the elements from the hash table too. */
- static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
+ static unsigned long zslDeleteRangeByScore(zskiplist *zsl, double min, double max, dict *dict) {
      zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
      unsigned long removed = 0;
      int i;
       * is to find the element with both the right score and object. */
      x = x->forward[0];
      while (x && x->score <= max) {
-         zskiplistNode *next;
+         zskiplistNode *next = x->forward[0];
+         zslDeleteNode(zsl, x, update);
+         dictDelete(dict,x->obj);
+         zslFreeNode(x);
+         removed++;
+         x = next;
+     }
+     return removed; /* not found */
+ }
  
-         for (i = 0; i < zsl->level; i++) {
-             if (update[i]->forward[i] == x) {
-                 if (i > 0) {
-                     update[i]->span[i-1] += x->span[i-1] - 1;
-                 }
-                 update[i]->forward[i] = x->forward[i];
-             } else {
-                 /* invariant: i > 0, because update[0]->forward[0]
-                  * is always equal to x */
-                 update[i]->span[i-1] -= 1;
-             }
-         }
-         if (x->forward[0]) {
-             x->forward[0]->backward = x->backward;
-         } else {
-             zsl->tail = x->backward;
+ /* Delete all the elements with rank between start and end from the skiplist.
+  * Start and end are inclusive. Note that start and end need to be 1-based */
+ static unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) {
+     zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
+     unsigned long traversed = 0, removed = 0;
+     int i;
+     x = zsl->header;
+     for (i = zsl->level-1; i >= 0; i--) {
+         while (x->forward[i] && (traversed + (i > 0 ? x->span[i-1] : 1)) < start) {
+             traversed += i > 0 ? x->span[i-1] : 1;
+             x = x->forward[i];
          }
-         next = x->forward[0];
+         update[i] = x;
+     }
+     traversed++;
+     x = x->forward[0];
+     while (x && traversed <= end) {
+         zskiplistNode *next = x->forward[0];
+         zslDeleteNode(zsl, x, update);
          dictDelete(dict,x->obj);
          zslFreeNode(x);
-         while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
-             zsl->level--;
-         zsl->length--;
          removed++;
+         traversed++;
          x = next;
      }
-     return removed; /* not found */
+     return removed;
  }
  
  /* Find the first node having a score equal or greater than the specified one.
@@@ -5386,13 -5281,54 +5403,54 @@@ static void zremrangebyscoreCommand(red
              return;
          }
          zs = zsetobj->ptr;
-         deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
+         deleted = zslDeleteRangeByScore(zs->zsl,min,max,zs->dict);
          if (htNeedsResize(zs->dict)) dictResize(zs->dict);
          server.dirty += deleted;
          addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
      }
  }
  
+ static void zremrangebyrankCommand(redisClient *c) {
+     int start = atoi(c->argv[2]->ptr);
+     int end = atoi(c->argv[3]->ptr);
+     robj *zsetobj;
+     zset *zs;
+     zsetobj = lookupKeyWrite(c->db,c->argv[1]);
+     if (zsetobj == NULL) {
+         addReply(c,shared.czero);
+     } else {
+         if (zsetobj->type != REDIS_ZSET) {
+             addReply(c,shared.wrongtypeerr);
+             return;
+         }
+         zs = zsetobj->ptr;
+         int llen = zs->zsl->length;
+         long deleted;
+         /* convert negative indexes */
+         if (start < 0) start = llen+start;
+         if (end < 0) end = llen+end;
+         if (start < 0) start = 0;
+         if (end < 0) end = 0;
+         /* indexes sanity checks */
+         if (start > end || start >= llen) {
+             addReply(c,shared.czero);
+             return;
+         }
+         if (end >= llen) end = llen-1;
+         /* increment start and end because zsl*Rank functions
+          * use 1-based rank */
+         deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
+         if (htNeedsResize(zs->dict)) dictResize(zs->dict);
+         server.dirty += deleted;
+         addReplyLong(c, deleted);
+     }
+ }
  static void zrangeGenericCommand(redisClient *c, int reverse) {
      robj *o;
      int start = atoi(c->argv[2]->ptr);
              /* check if starting point is trivial, before searching
               * the element in log(N) time */
              if (reverse) {
-                 ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen - start);
+                 ln = start == 0 ? zsl->tail : zslGetElementByRank(zsl, llen-start);
              } else {
-                 ln = start == 0 ? zsl->header->forward[0] : zslGetElementByRank(zsl, start + 1);
+                 ln = start == 0 ? zsl->header->forward[0] : zslGetElementByRank(zsl, start+1);
              }
  
              /* Return the result in form of a multi-bulk reply */
@@@ -5639,7 -5575,7 +5697,7 @@@ static void zscoreCommand(redisClient *
      }
  }
  
 -static void zrankCommand(redisClient *c) {
 +static void zrankGenericCommand(redisClient *c, int reverse) {
      robj *o;
      o = lookupKeyRead(c->db,c->argv[1]);
      if (o == NULL) {
          double *score = dictGetEntryVal(de);
          rank = zslGetRank(zsl, *score, c->argv[2]);
          if (rank) {
 -            addReplyLong(c, rank-1);
 +            if (reverse) {
 +                addReplyLong(c, zsl->length - rank);
 +            } else {
 +                addReplyLong(c, rank-1);
 +            }
          } else {
              addReply(c,shared.nullbulk);
          }
      }
  }
  
 +static void zrankCommand(redisClient *c) {
 +    zrankGenericCommand(c, 0);
 +}
 +
 +static void zrevrankCommand(redisClient *c) {
 +    zrankGenericCommand(c, 1);
 +}
 +
 +/* =================================== Hashes =============================== */
 +static void hsetCommand(redisClient *c) {
 +    int update = 0;
 +    robj *o = lookupKeyWrite(c->db,c->argv[1]);
 +
 +    if (o == NULL) {
 +        o = createHashObject();
 +        dictAdd(c->db->dict,c->argv[1],o);
 +        incrRefCount(c->argv[1]);
 +    } else {
 +        if (o->type != REDIS_HASH) {
 +            addReply(c,shared.wrongtypeerr);
 +            return;
 +        }
 +    }
 +    if (o->encoding == REDIS_ENCODING_ZIPMAP) {
 +        unsigned char *zm = o->ptr;
 +        robj *valobj = getDecodedObject(c->argv[3]);
 +
 +        zm = zipmapSet(zm,c->argv[2]->ptr,sdslen(c->argv[2]->ptr),
 +            valobj->ptr,sdslen(valobj->ptr),&update);
 +        decrRefCount(valobj);
 +        o->ptr = zm;
 +    } else {
 +        if (dictAdd(o->ptr,c->argv[2],c->argv[3]) == DICT_OK) {
 +            incrRefCount(c->argv[2]);
 +        } else {
 +            update = 1;
 +        }
 +        incrRefCount(c->argv[3]);
 +    }
 +    server.dirty++;
 +    addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",update == 0));
 +}
 +
 +static void hgetCommand(redisClient *c) {
 +    robj *o = lookupKeyRead(c->db,c->argv[1]);
 +
 +    if (o == NULL) {
 +        addReply(c,shared.nullbulk);
 +        return;
 +    } else {
 +        if (o->encoding == REDIS_ENCODING_ZIPMAP) {
 +            unsigned char *zm = o->ptr;
 +            unsigned char *val;
 +            unsigned int vlen;
 +
 +            if (zipmapGet(zm,c->argv[2]->ptr,sdslen(c->argv[2]->ptr), &val,&vlen)) {
 +                addReplySds(c,sdscatprintf(sdsempty(),"$%u\r\n", vlen));
 +                addReplySds(c,sdsnewlen(val,vlen));
 +                addReply(c,shared.crlf);
 +                return;
 +            } else {
 +                addReply(c,shared.nullbulk);
 +                return;
 +            }
 +        } else {
 +            struct dictEntry *de;
 +
 +            de = dictFind(o->ptr,c->argv[2]);
 +            if (de == NULL) {
 +                addReply(c,shared.nullbulk);
 +            } else {
 +                robj *e = dictGetEntryVal(de);
 +
 +                addReplyBulkLen(c,e);
 +                addReply(c,e);
 +                addReply(c,shared.crlf);
 +            }
 +        }
 +    }
 +}
 +
 +static void convertToRealHash(robj *o) {
 +    unsigned char *key, *val, *p, *zm = o->ptr;
 +    unsigned int klen, vlen;
 +    dict *dict = dictCreate(&hashDictType,NULL);
 +
 +    assert(o->type == REDIS_HASH && o->encoding != REDIS_ENCODING_HT);
 +    p = zipmapRewind(zm);
 +    while((p = zipmapNext(p,&key,&klen,&val,&vlen)) != NULL) {
 +        robj *keyobj, *valobj;
 +
 +        keyobj = createStringObject((char*)key,klen);
 +        valobj = createStringObject((char*)val,vlen);
 +        tryObjectEncoding(keyobj);
 +        tryObjectEncoding(valobj);
 +        dictAdd(dict,keyobj,valobj);
 +    }
 +    o->encoding = REDIS_ENCODING_HT;
 +    o->ptr = dict;
 +    zfree(zm);
 +}
 +
  /* ========================= Non type-specific commands  ==================== */
  
  static void flushdbCommand(redisClient *c) {
@@@ -6916,7 -6746,7 +6974,7 @@@ static void updateSlavesWaitingBgsave(i
  
  static int syncWithMaster(void) {
      char buf[1024], tmpfile[256], authcmd[1024];
 -    int dumpsize;
 +    long dumpsize;
      int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
      int dfd;
  
          redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
          return REDIS_ERR;
      }
 -    dumpsize = atoi(buf+1);
 -    redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
 +    dumpsize = strtol(buf+1,NULL,10);
 +    redisLog(REDIS_NOTICE,"Receiving %ld bytes data dump from MASTER",dumpsize);
      /* Read the bulk write data on a temp file */
      snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
      dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
diff --combined test-redis.tcl
index 74b955da3fd602206ef1c6e62c05dea28021e5b8,b1c4da01fd022147868a991432928184a1818e93..9139f544754ae8abd17217b9881e63b4de921de9
@@@ -1204,10 -1204,6 +1204,10 @@@ proc main {server port} 
          list [$r zrank zranktmp x] [$r zrank zranktmp y] [$r zrank zranktmp z]
      } {0 1 2}
  
 +    test {ZREVRANK basics} {
 +        list [$r zrevrank zranktmp x] [$r zrevrank zranktmp y] [$r zrevrank zranktmp z]
 +    } {2 1 0}
 +
      test {ZRANK - after deletion} {
          $r zrem zranktmp y
          list [$r zrank zranktmp x] [$r zrank zranktmp z]
          $r zrangebyscore zset 20 50 LIMIT 2 3 withscores
      } {d 40 e 50}
  
-     test {ZREMRANGE basics} {
+     test {ZREMRANGEBYSCORE basics} {
          $r del zset
          $r zadd zset 1 a
          $r zadd zset 2 b
          list [$r zremrangebyscore zset 2 4] [$r zrange zset 0 -1]
      } {3 {a e}}
  
-     test {ZREMRANGE from -inf to +inf} {
+     test {ZREMRANGEBYSCORE from -inf to +inf} {
          $r del zset
          $r zadd zset 1 a
          $r zadd zset 2 b
          list [$r zremrangebyscore zset -inf +inf] [$r zrange zset 0 -1]
      } {5 {}}
  
+     test {ZREMRANGEBYRANK basics} {
+         $r del zset
+         $r zadd zset 1 a
+         $r zadd zset 2 b
+         $r zadd zset 3 c
+         $r zadd zset 4 d
+         $r zadd zset 5 e
+         list [$r zremrangebyrank zset 1 3] [$r zrange zset 0 -1]
+     } {3 {a e}}
      test {SORT against sorted sets} {
          $r del zset
          $r zadd zset 1 a